Skip to content

Commit

Permalink
Get unassigned leases in leasesToTake (awslabs#1320)
Browse files Browse the repository at this point in the history
Consider all null leases as a possible lease to take alongside expired leases

---------

Co-authored-by: Brendan Lynch <[email protected]>
  • Loading branch information
brendan-p-lynch and Brendan Lynch authored Apr 30, 2024
1 parent ec34ed1 commit 34fe58c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ public void update(final Lease lease) {
childShardIds(lease.childShardIds);
}

/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
* @return true if lease lease is ready to be taken
*/
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) {
return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos);
}

/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
Expand All @@ -190,6 +199,13 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) {
}
}

/**
* @return true if lease is not currently owned
*/
private boolean isUnassigned() {
return leaseOwner == null;
}

/**
* Sets lastCounterIncrementNanos
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
updateAllLeases(timeProvider);
success = true;
} catch (ProvisionedThroughputException e) {
log.info("Worker {} could not find expired leases on try {} out of {}", workerIdentifier, i,
log.info("Worker {} could not find available leases on try {} out of {}", workerIdentifier, i,
TAKE_RETRIES);
lastException = e;
}
Expand All @@ -203,9 +203,9 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
return takenLeases;
}

List<Lease> expiredLeases = getExpiredLeases();
List<Lease> availableLeases = getAvailableLeases();

Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases, timeProvider);
Set<Lease> leasesToTake = computeLeasesToTake(availableLeases, timeProvider);
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);

Set<String> untakenLeaseKeys = new HashSet<>();
Expand Down Expand Up @@ -309,7 +309,7 @@ static String stringJoin(Collection<String> strings, String delimiter) {
*
* @param timeProvider callable that supplies the current time
*
* @return list of expired leases, possibly empty, never null.
* @return list of available leases, possibly empty, never null.
*
* @throws ProvisionedThroughputException if listLeases fails due to lack of provisioned throughput
* @throws InvalidStateException if the lease table does not exist
Expand Down Expand Up @@ -370,45 +370,36 @@ private void updateAllLeases(Callable<Long> timeProvider)
}

/**
* @return list of leases that were expired as of our last scan.
* @return list of leases that available as of our last scan.
*/
private List<Lease> getExpiredLeases() {
List<Lease> expiredLeases = new ArrayList<>();

for (Lease lease : allLeases.values()) {
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
expiredLeases.add(lease);
}
}

return expiredLeases;
private List<Lease> getAvailableLeases() {
return allLeases.values().stream()
.filter(lease->lease.isAvailable(leaseDurationNanos, lastScanTimeNanos))
.collect(Collectors.toList());
}

/**
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
* @param availableLeases list of leases we determined to be available
* @param timeProvider callable which returns the current time in nanos
* @return set of leases to take.
*/
@VisibleForTesting
Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timeProvider) throws DependencyException {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
Set<Lease> computeLeasesToTake(List<Lease> availableLeases, Callable<Long> timeProvider) throws DependencyException {
Map<String, Integer> leaseCounts = computeLeaseCounts(availableLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);

final int numAvailableLeases = expiredLeases.size();
int numLeases = 0;
int numWorkers = 0;
final int numAvailableLeases = availableLeases.size();
final int numLeases = allLeases.size();
final int numWorkers = leaseCounts.size();
int numLeasesToReachTarget = 0;
int leaseSpillover = 0;
int veryOldLeaseCount = 0;

try {
numLeases = allLeases.size();
numWorkers = leaseCounts.size();

if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any.
return leasesToTake;
Expand Down Expand Up @@ -475,19 +466,19 @@ Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timePro
return leasesToTake;
}

// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
// Shuffle availableLeases so workers don't all try to contend for the same leases.
Collections.shuffle(availableLeases);

if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
if (availableLeases.size() > 0) {
// If we have available leases, get up to <needed> leases from availableLeases
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(availableLeases.remove(0));
}
} else {
// If there are no expired leases and we need a lease, consider stealing.
// If there are no available leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
log.info("Worker {} needed {} leases but none were available, so it will steal lease {} from {}",
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
leaseToSteal.leaseOwner());
leasesToTake.add(leaseToSteal);
Expand All @@ -502,7 +493,7 @@ Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timePro
leasesToTake.size());
}
} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
Expand Down Expand Up @@ -598,19 +589,19 @@ private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int ne
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
* leases.
*
* @param expiredLeases list of leases that are currently expired
* @param availableLeases list of leases that are currently available
* @return map of workerIdentifier to lease count
*/
@VisibleForTesting
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
Map<String, Integer> computeLeaseCounts(List<Lease> availableLeases) {
Map<String, Integer> leaseCounts = new HashMap<>();
// The set will give much faster lookup than the original list, an
// important optimization when the list is large
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
Set<Lease> availableLeasesSet = new HashSet<>(availableLeases);

// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
// Compute the number of leases per worker by looking through allLeases and ignoring leases that are available.
for (Lease lease : allLeases.values()) {
if (!expiredLeasesSet.contains(lease)) {
if (!availableLeasesSet.contains(lease)) {
String leaseOwner = lease.leaseOwner();
Integer oldCount = leaseCounts.get(leaseOwner);
if (oldCount == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package software.amazon.kinesis.leases;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;

import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

@RunWith(MockitoJUnitRunner.class)
public class LeaseTest {

private static final long MOCK_CURRENT_TIME = 10000000000L;
private static final long LEASE_DURATION_MILLIS = 1000L;

private static final long LEASE_DURATION_NANOS = TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS);

//Write a unit test for software.amazon.kinesis.leases.Lease to test leaseOwner as null and epired
@Test
public void testLeaseOwnerNullAndExpired() {
long expiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS - 1;
Lease lease = createLease(null, "leaseKey", expiredTime);
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME));
Assert.assertNull(lease.leaseOwner());
}

@Test
public void testLeaseOwnerNotNullAndExpired() {
long expiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS - 1;
Lease lease = createLease("leaseOwner", "leaseKey", expiredTime);
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME));
Assert.assertEquals("leaseOwner", lease.leaseOwner());
}

@Test
public void testLeaseOwnerNotNullAndNotExpired() {
long notExpiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS + 1;
Lease lease = createLease("leaseOwner", "leaseKey", notExpiredTime);
Assert.assertFalse(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME));
Assert.assertEquals("leaseOwner", lease.leaseOwner());
}

@Test
public void testLeaseOwnerNullAndNotExpired() {
long notExpiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS + 1;
Lease lease = createLease(null, "leaseKey", notExpiredTime);
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME));
Assert.assertNull(lease.leaseOwner());
}

private Lease createLease(String leaseOwner, String leaseKey, long lastCounterIncrementNanos) {
final Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
lease.ownerSwitchesSinceCheckpoint(0L);
lease.leaseCounter(0L);
lease.leaseOwner(leaseOwner);
lease.parentShardIds(Collections.singleton("parentShardId"));
lease.childShardIds(new HashSet<>());
lease.leaseKey(leaseKey);
lease.lastCounterIncrementNanos(lastCounterIncrementNanos);
return lease;
}
}

0 comments on commit 34fe58c

Please sign in to comment.