diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index f761a9a71..b1602eb8f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -171,6 +171,15 @@ public void update(final Lease lease) { childShardIds(lease.childShardIds); } + /** + * @param leaseDurationNanos duration of lease in nanoseconds + * @param asOfNanos time in nanoseconds to check expiration as-of + * @return true if lease lease is ready to be taken + */ + public boolean isAvailable(long leaseDurationNanos, long asOfNanos) { + return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos); + } + /** * @param leaseDurationNanos duration of lease in nanoseconds * @param asOfNanos time in nanoseconds to check expiration as-of @@ -190,6 +199,13 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) { } } + /** + * @return true if lease is not currently owned + */ + private boolean isUnassigned() { + return leaseOwner == null; + } + /** * Sets lastCounterIncrementNanos * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index 7020a94b1..a4aabafda 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -186,7 +186,7 @@ synchronized Map takeLeases(Callable timeProvider) updateAllLeases(timeProvider); success = true; } catch (ProvisionedThroughputException e) { - log.info("Worker {} could not find expired leases on try {} out of {}", workerIdentifier, i, + log.info("Worker {} could not find available leases on try {} out of {}", workerIdentifier, i, TAKE_RETRIES); lastException = e; } @@ -203,9 +203,9 @@ synchronized Map takeLeases(Callable timeProvider) return takenLeases; } - List expiredLeases = getExpiredLeases(); + List availableLeases = getAvailableLeases(); - Set leasesToTake = computeLeasesToTake(expiredLeases, timeProvider); + Set leasesToTake = computeLeasesToTake(availableLeases, timeProvider); leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake); Set untakenLeaseKeys = new HashSet<>(); @@ -309,7 +309,7 @@ static String stringJoin(Collection strings, String delimiter) { * * @param timeProvider callable that supplies the current time * - * @return list of expired leases, possibly empty, never null. + * @return list of available leases, possibly empty, never null. * * @throws ProvisionedThroughputException if listLeases fails due to lack of provisioned throughput * @throws InvalidStateException if the lease table does not exist @@ -370,45 +370,36 @@ private void updateAllLeases(Callable timeProvider) } /** - * @return list of leases that were expired as of our last scan. + * @return list of leases that available as of our last scan. */ - private List getExpiredLeases() { - List expiredLeases = new ArrayList<>(); - - for (Lease lease : allLeases.values()) { - if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) { - expiredLeases.add(lease); - } - } - - return expiredLeases; + private List getAvailableLeases() { + return allLeases.values().stream() + .filter(lease->lease.isAvailable(leaseDurationNanos, lastScanTimeNanos)) + .collect(Collectors.toList()); } /** * Compute the number of leases I should try to take based on the state of the system. * - * @param expiredLeases list of leases we determined to be expired + * @param availableLeases list of leases we determined to be available * @param timeProvider callable which returns the current time in nanos * @return set of leases to take. */ @VisibleForTesting - Set computeLeasesToTake(List expiredLeases, Callable timeProvider) throws DependencyException { - Map leaseCounts = computeLeaseCounts(expiredLeases); + Set computeLeasesToTake(List availableLeases, Callable timeProvider) throws DependencyException { + Map leaseCounts = computeLeaseCounts(availableLeases); Set leasesToTake = new HashSet<>(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); - final int numAvailableLeases = expiredLeases.size(); - int numLeases = 0; - int numWorkers = 0; + final int numAvailableLeases = availableLeases.size(); + final int numLeases = allLeases.size(); + final int numWorkers = leaseCounts.size(); int numLeasesToReachTarget = 0; int leaseSpillover = 0; int veryOldLeaseCount = 0; try { - numLeases = allLeases.size(); - numWorkers = leaseCounts.size(); - if (numLeases == 0) { // If there are no leases, I shouldn't try to take any. return leasesToTake; @@ -475,19 +466,19 @@ Set computeLeasesToTake(List expiredLeases, Callable timePro return leasesToTake; } - // Shuffle expiredLeases so workers don't all try to contend for the same leases. - Collections.shuffle(expiredLeases); + // Shuffle availableLeases so workers don't all try to contend for the same leases. + Collections.shuffle(availableLeases); - if (expiredLeases.size() > 0) { - // If we have expired leases, get up to leases from expiredLeases - for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) { - leasesToTake.add(expiredLeases.remove(0)); + if (availableLeases.size() > 0) { + // If we have available leases, get up to leases from availableLeases + for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) { + leasesToTake.add(availableLeases.remove(0)); } } else { - // If there are no expired leases and we need a lease, consider stealing. + // If there are no available leases and we need a lease, consider stealing. List leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target); for (Lease leaseToSteal : leasesToSteal) { - log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}", + log.info("Worker {} needed {} leases but none were available, so it will steal lease {} from {}", workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(), leaseToSteal.leaseOwner()); leasesToTake.add(leaseToSteal); @@ -502,7 +493,7 @@ Set computeLeasesToTake(List expiredLeases, Callable timePro leasesToTake.size()); } } finally { - scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED); scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED); @@ -598,19 +589,19 @@ private List chooseLeasesToSteal(Map leaseCounts, int ne * Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding * leases. * - * @param expiredLeases list of leases that are currently expired + * @param availableLeases list of leases that are currently available * @return map of workerIdentifier to lease count */ @VisibleForTesting - Map computeLeaseCounts(List expiredLeases) { + Map computeLeaseCounts(List availableLeases) { Map leaseCounts = new HashMap<>(); // The set will give much faster lookup than the original list, an // important optimization when the list is large - Set expiredLeasesSet = new HashSet<>(expiredLeases); + Set availableLeasesSet = new HashSet<>(availableLeases); - // Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired. + // Compute the number of leases per worker by looking through allLeases and ignoring leases that are available. for (Lease lease : allLeases.values()) { - if (!expiredLeasesSet.contains(lease)) { + if (!availableLeasesSet.contains(lease)) { String leaseOwner = lease.leaseOwner(); Integer oldCount = leaseCounts.get(leaseOwner); if (oldCount == null) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseTest.java new file mode 100644 index 000000000..ad3827a9d --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseTest.java @@ -0,0 +1,67 @@ +package software.amazon.kinesis.leases; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.util.Collections; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; + +@RunWith(MockitoJUnitRunner.class) +public class LeaseTest { + + private static final long MOCK_CURRENT_TIME = 10000000000L; + private static final long LEASE_DURATION_MILLIS = 1000L; + + private static final long LEASE_DURATION_NANOS = TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS); + + //Write a unit test for software.amazon.kinesis.leases.Lease to test leaseOwner as null and epired + @Test + public void testLeaseOwnerNullAndExpired() { + long expiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS - 1; + Lease lease = createLease(null, "leaseKey", expiredTime); + Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME)); + Assert.assertNull(lease.leaseOwner()); + } + + @Test + public void testLeaseOwnerNotNullAndExpired() { + long expiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS - 1; + Lease lease = createLease("leaseOwner", "leaseKey", expiredTime); + Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME)); + Assert.assertEquals("leaseOwner", lease.leaseOwner()); + } + + @Test + public void testLeaseOwnerNotNullAndNotExpired() { + long notExpiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS + 1; + Lease lease = createLease("leaseOwner", "leaseKey", notExpiredTime); + Assert.assertFalse(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME)); + Assert.assertEquals("leaseOwner", lease.leaseOwner()); + } + + @Test + public void testLeaseOwnerNullAndNotExpired() { + long notExpiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS + 1; + Lease lease = createLease(null, "leaseKey", notExpiredTime); + Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME)); + Assert.assertNull(lease.leaseOwner()); + } + + private Lease createLease(String leaseOwner, String leaseKey, long lastCounterIncrementNanos) { + final Lease lease = new Lease(); + lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); + lease.ownerSwitchesSinceCheckpoint(0L); + lease.leaseCounter(0L); + lease.leaseOwner(leaseOwner); + lease.parentShardIds(Collections.singleton("parentShardId")); + lease.childShardIds(new HashSet<>()); + lease.leaseKey(leaseKey); + lease.lastCounterIncrementNanos(lastCounterIncrementNanos); + return lease; + } +}