diff --git a/.github/scripts/backwards_compatibility_check.sh b/.github/scripts/backwards_compatibility_check.sh index 070e9c171..03eee014f 100755 --- a/.github/scripts/backwards_compatibility_check.sh +++ b/.github/scripts/backwards_compatibility_check.sh @@ -58,14 +58,18 @@ is_non_public_class() { return $? } -# Ignore methods that change from abstract to non-abstract (and vice versa) if the class is an interface. -ignore_abstract_changes_in_interfaces() { +# Ignore methods that change from abstract to non-abstract (and vice-versa) if the class is an interface.\ +# Ignore methods that change from synchronized to non-synchronized (and vice-versa) +ignore_non_breaking_changes() { local current_class="$1" local class_definition=$(javap -classpath "$LATEST_JAR" "$current_class" | head -2 | tail -1) if [[ $class_definition == *"interface"* ]] then - LATEST_METHODS=${LATEST_METHODS// abstract / } - CURRENT_METHODS=${CURRENT_METHODS// abstract / } + LATEST_METHODS=${LATEST_METHODS//abstract /} + CURRENT_METHODS=${CURRENT_METHODS//abstract /} + else + LATEST_METHODS=${LATEST_METHODS//synchronized /} + CURRENT_METHODS=${CURRENT_METHODS//synchronized /} fi } @@ -103,7 +107,7 @@ find_removed_methods() { LATEST_METHODS=$(javap -classpath "$LATEST_JAR" "$class") - ignore_abstract_changes_in_interfaces "$class" + ignore_non_breaking_changes "$class" local removed_methods=$(diff <(echo "$LATEST_METHODS") <(echo "$CURRENT_METHODS") | grep '^<') diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 124a23efb..9e31537bf 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -90,6 +90,42 @@ netty-nio-client ${awssdk.version} + + software.amazon.awssdk + sdk-core + ${awssdk.version} + + + software.amazon.awssdk + aws-core + ${awssdk.version} + + + software.amazon.awssdk + arns + ${awssdk.version} + + + software.amazon.awssdk + regions + ${awssdk.version} + + + software.amazon.awssdk + utils + ${awssdk.version} + + + software.amazon.awssdk + http-client-spi + ${awssdk.version} + + + software.amazon.awssdk + dynamodb-enhanced + ${awssdk.version} + + software.amazon.glue schema-registry-serde @@ -127,6 +163,36 @@ commons-collections 3.2.2 + + org.apache.commons + commons-collections4 + 4.4 + + + io.netty + netty-handler + 4.1.108.Final + + + com.google.code.findbugs + jsr305 + 3.0.2 + + + com.fasterxml.jackson.core + jackson-databind + 2.10.1 + + + org.reactivestreams + reactive-streams + 1.0.4 + + + software.amazon.awssdk + annotations + 2.25.64 + org.slf4j slf4j-api @@ -153,6 +219,18 @@ + + software.amazon.awssdk + sts + ${awssdk.version} + test + + + software.amazon.awssdk + auth + ${awssdk.version} + test + org.junit.jupiter @@ -180,12 +258,24 @@ 3.12.4 test + + org.mockito + mockito-core + 3.12.4 + test + org.hamcrest hamcrest-all 1.3 test + + org.hamcrest + hamcrest-core + 1.3 + test + @@ -464,6 +554,25 @@ + + org.apache.maven.plugins + maven-dependency-plugin + 3.1.2 + + + analyze-dependencies + verify + + analyze-only + + + true + + true + + + + diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java index c4aecdda2..e9740ccac 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java @@ -223,6 +223,9 @@ void shutdown() { workerMetricsThreadPool.shutdown(); try { if (!lamThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.info( + "LamThreadPool did not shutdown in {}s, forcefully shutting down", + SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS); lamThreadPool.shutdownNow(); } } catch (final InterruptedException e) { @@ -232,6 +235,9 @@ void shutdown() { try { if (!workerMetricsThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + log.info( + "WorkerMetricsThreadPool did not shutdown in {}s, forcefully shutting down", + SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS); workerMetricsThreadPool.shutdownNow(); } } catch (final InterruptedException e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java index a39866aeb..660426512 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java @@ -17,8 +17,10 @@ import java.util.List; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.Lease; +@KinesisClientInternalApi public interface LeaseAssignmentDecider { /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java index 6235c5a93..912f0dc9a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java @@ -87,7 +87,7 @@ public synchronized void enter(final ClientVersion fromClientVersion) throws Dep } @Override - public void leave() { + public synchronized void leave() { if (entered && !left) { log.info("Leaving {}", this); cancelRollbackMonitor(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java index c1d8507ed..c2e3feac9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java @@ -14,11 +14,13 @@ */ package software.amazon.kinesis.coordinator.migration; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.exceptions.DependencyException; /** * Interface of a state implementation for the MigrationStateMachine */ +@KinesisClientInternalApi public interface MigrationClientVersionState { /** diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java index 4698feb08..6dff4e0c5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java @@ -14,6 +14,7 @@ */ package software.amazon.kinesis.coordinator.migration; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -28,6 +29,7 @@ * 3. Instant roll-forwards - Once any issue has been mitigated, rollfowards are supported instantly * with KCL Migration tool. */ +@KinesisClientInternalApi public interface MigrationStateMachine { /** * Initialize the state machine by identifying the initial state when the KCL worker comes up for the first time. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java index 96e16a0f5..ad744bfa9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java @@ -126,7 +126,7 @@ public void shutdown() { if (!stateMachineThreadPool.isShutdown()) { stateMachineThreadPool.shutdown(); try { - if (stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + if (!stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { log.info( "StateMachineThreadPool did not shutdown within {} seconds, forcefully shutting down", THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java index 9f6ce776e..6056d1fd8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java @@ -17,10 +17,12 @@ import java.util.List; +import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; +@KinesisClientInternalApi public interface LeaseDiscoverer { /** * Identifies the leases that are assigned to the current worker but are not being tracked and processed by the diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 7eb4c4f1a..4f4d7886d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -383,7 +383,6 @@ public void stop() { } leaseRenewalThreadpool.shutdownNow(); - leaseCoordinatorThreadPool.shutdownNow(); leaseGracefulShutdownHandler.stop(); synchronized (shutdownLock) { leaseRenewer.clearCurrentlyHeldLeases(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java index 16d719bf8..0ad34b69e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java @@ -54,7 +54,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer { private static final String CHILD_SHARD_IDS_KEY = "childShardIds"; private static final String STARTING_HASH_KEY = "startingHashKey"; private static final String ENDING_HASH_KEY = "endingHashKey"; - private static final String THROUGHOUT_PUT_KBPS = "throughputKBps"; + private static final String THROUGHPUT_KBPS = "throughputKBps"; private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint"; static final String CHECKPOINT_OWNER = "checkpointOwner"; static final String LEASE_OWNER_KEY = "leaseOwner"; @@ -113,7 +113,7 @@ public Map toDynamoRecord(final Lease lease) { } if (lease.throughputKBps() != null) { - result.put(THROUGHOUT_PUT_KBPS, DynamoUtils.createAttributeValue(lease.throughputKBps())); + result.put(THROUGHPUT_KBPS, DynamoUtils.createAttributeValue(lease.throughputKBps())); } if (lease.checkpointOwner() != null) { @@ -155,8 +155,8 @@ public Lease fromDynamoRecord(Map dynamoRecord, Lease le leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey)); } - if (DynamoUtils.safeGetDouble(dynamoRecord, THROUGHOUT_PUT_KBPS) != null) { - leaseToUpdate.throughputKBps(DynamoUtils.safeGetDouble(dynamoRecord, THROUGHOUT_PUT_KBPS)); + if (DynamoUtils.safeGetDouble(dynamoRecord, THROUGHPUT_KBPS) != null) { + leaseToUpdate.throughputKBps(DynamoUtils.safeGetDouble(dynamoRecord, THROUGHPUT_KBPS)); } if (DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_OWNER) != null) { @@ -466,7 +466,7 @@ public Map getDynamoLeaseThroughputKbpsUpdate(Leas .value(DynamoUtils.createAttributeValue(lease.throughputKBps())) .action(AttributeAction.PUT) .build(); - result.put(THROUGHOUT_PUT_KBPS, avu); + result.put(THROUGHPUT_KBPS, avu); return result; }