From b154acf7f5fbb9b35adaac7cb625f4883f6f61d9 Mon Sep 17 00:00:00 2001
From: Aravinda Kidambi Srinivasan
<129209066+akidambisrinivasan@users.noreply.github.com>
Date: Tue, 12 Nov 2024 13:20:16 -0700
Subject: [PATCH] Address KCLv3 issues from github (#1398)
* Address KCLv3 issues reported on github
1. Fix transitive dependencies and add a maven plugin to catch these
at build time
2. Remove the redundant shutdown of the leaseCoordinatorThreadPool
3. Fix typo THROUGHOUT_PUT_KBPS
4. Fix shutdown sequence - make sure
scheduler shutdown without invoking run works
5. Fix backward compatibility check - Avoid flagging methods as deleted
if it is marked synchronized. Also mark interfaces introduced in KCLv3 as internal.
---
.../scripts/backwards_compatibility_check.sh | 14 ++-
amazon-kinesis-client/pom.xml | 109 ++++++++++++++++++
...DynamicMigrationComponentsInitializer.java | 6 +
.../assignment/LeaseAssignmentDecider.java | 2 +
...ationClientVersion3xWithRollbackState.java | 2 +-
.../MigrationClientVersionState.java | 2 +
.../migration/MigrationStateMachine.java | 2 +
.../migration/MigrationStateMachineImpl.java | 2 +-
.../kinesis/leases/LeaseDiscoverer.java | 2 +
.../dynamodb/DynamoDBLeaseCoordinator.java | 1 -
.../dynamodb/DynamoDBLeaseSerializer.java | 10 +-
11 files changed, 139 insertions(+), 13 deletions(-)
diff --git a/.github/scripts/backwards_compatibility_check.sh b/.github/scripts/backwards_compatibility_check.sh
index 070e9c171..03eee014f 100755
--- a/.github/scripts/backwards_compatibility_check.sh
+++ b/.github/scripts/backwards_compatibility_check.sh
@@ -58,14 +58,18 @@ is_non_public_class() {
return $?
}
-# Ignore methods that change from abstract to non-abstract (and vice versa) if the class is an interface.
-ignore_abstract_changes_in_interfaces() {
+# Ignore methods that change from abstract to non-abstract (and vice-versa) if the class is an interface.\
+# Ignore methods that change from synchronized to non-synchronized (and vice-versa)
+ignore_non_breaking_changes() {
local current_class="$1"
local class_definition=$(javap -classpath "$LATEST_JAR" "$current_class" | head -2 | tail -1)
if [[ $class_definition == *"interface"* ]]
then
- LATEST_METHODS=${LATEST_METHODS// abstract / }
- CURRENT_METHODS=${CURRENT_METHODS// abstract / }
+ LATEST_METHODS=${LATEST_METHODS//abstract /}
+ CURRENT_METHODS=${CURRENT_METHODS//abstract /}
+ else
+ LATEST_METHODS=${LATEST_METHODS//synchronized /}
+ CURRENT_METHODS=${CURRENT_METHODS//synchronized /}
fi
}
@@ -103,7 +107,7 @@ find_removed_methods() {
LATEST_METHODS=$(javap -classpath "$LATEST_JAR" "$class")
- ignore_abstract_changes_in_interfaces "$class"
+ ignore_non_breaking_changes "$class"
local removed_methods=$(diff <(echo "$LATEST_METHODS") <(echo "$CURRENT_METHODS") | grep '^<')
diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml
index 124a23efb..9e31537bf 100644
--- a/amazon-kinesis-client/pom.xml
+++ b/amazon-kinesis-client/pom.xml
@@ -90,6 +90,42 @@
netty-nio-client
${awssdk.version}
+
+ software.amazon.awssdk
+ sdk-core
+ ${awssdk.version}
+
+
+ software.amazon.awssdk
+ aws-core
+ ${awssdk.version}
+
+
+ software.amazon.awssdk
+ arns
+ ${awssdk.version}
+
+
+ software.amazon.awssdk
+ regions
+ ${awssdk.version}
+
+
+ software.amazon.awssdk
+ utils
+ ${awssdk.version}
+
+
+ software.amazon.awssdk
+ http-client-spi
+ ${awssdk.version}
+
+
+ software.amazon.awssdk
+ dynamodb-enhanced
+ ${awssdk.version}
+
+
software.amazon.glue
schema-registry-serde
@@ -127,6 +163,36 @@
commons-collections
3.2.2
+
+ org.apache.commons
+ commons-collections4
+ 4.4
+
+
+ io.netty
+ netty-handler
+ 4.1.108.Final
+
+
+ com.google.code.findbugs
+ jsr305
+ 3.0.2
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.10.1
+
+
+ org.reactivestreams
+ reactive-streams
+ 1.0.4
+
+
+ software.amazon.awssdk
+ annotations
+ 2.25.64
+
org.slf4j
slf4j-api
@@ -153,6 +219,18 @@
+
+ software.amazon.awssdk
+ sts
+ ${awssdk.version}
+ test
+
+
+ software.amazon.awssdk
+ auth
+ ${awssdk.version}
+ test
+
org.junit.jupiter
@@ -180,12 +258,24 @@
3.12.4
test
+
+ org.mockito
+ mockito-core
+ 3.12.4
+ test
+
org.hamcrest
hamcrest-all
1.3
test
+
+ org.hamcrest
+ hamcrest-core
+ 1.3
+ test
+
@@ -464,6 +554,25 @@
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.1.2
+
+
+ analyze-dependencies
+ verify
+
+ analyze-only
+
+
+ true
+
+ true
+
+
+
+
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java
index c4aecdda2..e9740ccac 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.java
@@ -223,6 +223,9 @@ void shutdown() {
workerMetricsThreadPool.shutdown();
try {
if (!lamThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ log.info(
+ "LamThreadPool did not shutdown in {}s, forcefully shutting down",
+ SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS);
lamThreadPool.shutdownNow();
}
} catch (final InterruptedException e) {
@@ -232,6 +235,9 @@ void shutdown() {
try {
if (!workerMetricsThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ log.info(
+ "WorkerMetricsThreadPool did not shutdown in {}s, forcefully shutting down",
+ SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS);
workerMetricsThreadPool.shutdownNow();
}
} catch (final InterruptedException e) {
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java
index a39866aeb..660426512 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/assignment/LeaseAssignmentDecider.java
@@ -17,8 +17,10 @@
import java.util.List;
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
+@KinesisClientInternalApi
public interface LeaseAssignmentDecider {
/**
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java
index 6235c5a93..912f0dc9a 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersion3xWithRollbackState.java
@@ -87,7 +87,7 @@ public synchronized void enter(final ClientVersion fromClientVersion) throws Dep
}
@Override
- public void leave() {
+ public synchronized void leave() {
if (entered && !left) {
log.info("Leaving {}", this);
cancelRollbackMonitor();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java
index c1d8507ed..c2e3feac9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationClientVersionState.java
@@ -14,11 +14,13 @@
*/
package software.amazon.kinesis.coordinator.migration;
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.exceptions.DependencyException;
/**
* Interface of a state implementation for the MigrationStateMachine
*/
+@KinesisClientInternalApi
public interface MigrationClientVersionState {
/**
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java
index 4698feb08..6dff4e0c5 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachine.java
@@ -14,6 +14,7 @@
*/
package software.amazon.kinesis.coordinator.migration;
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@@ -28,6 +29,7 @@
* 3. Instant roll-forwards - Once any issue has been mitigated, rollfowards are supported instantly
* with KCL Migration tool.
*/
+@KinesisClientInternalApi
public interface MigrationStateMachine {
/**
* Initialize the state machine by identifying the initial state when the KCL worker comes up for the first time.
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java
index 96e16a0f5..ad744bfa9 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/migration/MigrationStateMachineImpl.java
@@ -126,7 +126,7 @@ public void shutdown() {
if (!stateMachineThreadPool.isShutdown()) {
stateMachineThreadPool.shutdown();
try {
- if (stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
+ if (!stateMachineThreadPool.awaitTermination(THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.info(
"StateMachineThreadPool did not shutdown within {} seconds, forcefully shutting down",
THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS);
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java
index 9f6ce776e..6056d1fd8 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseDiscoverer.java
@@ -17,10 +17,12 @@
import java.util.List;
+import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
+@KinesisClientInternalApi
public interface LeaseDiscoverer {
/**
* Identifies the leases that are assigned to the current worker but are not being tracked and processed by the
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
index 7eb4c4f1a..4f4d7886d 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java
@@ -383,7 +383,6 @@ public void stop() {
}
leaseRenewalThreadpool.shutdownNow();
- leaseCoordinatorThreadPool.shutdownNow();
leaseGracefulShutdownHandler.stop();
synchronized (shutdownLock) {
leaseRenewer.clearCurrentlyHeldLeases();
diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
index 16d719bf8..0ad34b69e 100644
--- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
+++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseSerializer.java
@@ -54,7 +54,7 @@ public class DynamoDBLeaseSerializer implements LeaseSerializer {
private static final String CHILD_SHARD_IDS_KEY = "childShardIds";
private static final String STARTING_HASH_KEY = "startingHashKey";
private static final String ENDING_HASH_KEY = "endingHashKey";
- private static final String THROUGHOUT_PUT_KBPS = "throughputKBps";
+ private static final String THROUGHPUT_KBPS = "throughputKBps";
private static final String CHECKPOINT_SEQUENCE_NUMBER_KEY = "checkpoint";
static final String CHECKPOINT_OWNER = "checkpointOwner";
static final String LEASE_OWNER_KEY = "leaseOwner";
@@ -113,7 +113,7 @@ public Map toDynamoRecord(final Lease lease) {
}
if (lease.throughputKBps() != null) {
- result.put(THROUGHOUT_PUT_KBPS, DynamoUtils.createAttributeValue(lease.throughputKBps()));
+ result.put(THROUGHPUT_KBPS, DynamoUtils.createAttributeValue(lease.throughputKBps()));
}
if (lease.checkpointOwner() != null) {
@@ -155,8 +155,8 @@ public Lease fromDynamoRecord(Map dynamoRecord, Lease le
leaseToUpdate.hashKeyRange(HashKeyRangeForLease.deserialize(startingHashKey, endingHashKey));
}
- if (DynamoUtils.safeGetDouble(dynamoRecord, THROUGHOUT_PUT_KBPS) != null) {
- leaseToUpdate.throughputKBps(DynamoUtils.safeGetDouble(dynamoRecord, THROUGHOUT_PUT_KBPS));
+ if (DynamoUtils.safeGetDouble(dynamoRecord, THROUGHPUT_KBPS) != null) {
+ leaseToUpdate.throughputKBps(DynamoUtils.safeGetDouble(dynamoRecord, THROUGHPUT_KBPS));
}
if (DynamoUtils.safeGetString(dynamoRecord, CHECKPOINT_OWNER) != null) {
@@ -466,7 +466,7 @@ public Map getDynamoLeaseThroughputKbpsUpdate(Leas
.value(DynamoUtils.createAttributeValue(lease.throughputKBps()))
.action(AttributeAction.PUT)
.build();
- result.put(THROUGHOUT_PUT_KBPS, avu);
+ result.put(THROUGHPUT_KBPS, avu);
return result;
}