-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added doc for leases and the lease lifecycle. #1218
Changes from 4 commits
22f6445
af5501b
c5e3d5e
1e7fa74
013aeba
888c596
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
# Leases and Lease Lifecycle | ||
|
||
Leases are created in, and leveraged by, KCL to manage a stream's shards. | ||
This document should help provide insights into the lease lifecycle. | ||
|
||
**Nota bene (N.B.):** while actual shard ids are formatted like `shardId-000000000042`, this document uses `shardId[_-]42` for concision. | ||
|
||
## Leases | ||
|
||
In KCL, a lease provides a temporal assignment between one Kinesis shard and an assigned worker. | ||
Leases are persistent for the duration of shard processing (detailed later). | ||
However, the worker that is processing a lease may change since leases may be "stolen" by other workers in the same KCL application. | ||
|
||
## Lease Table | ||
|
||
To persist metadata about lease state (e.g., last read checkpoint, current assigned worker), KCL creates a lease table in [DynamoDB][dynamodb]. | ||
stair-aws marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Each KCL application will have its own distinct lease table that includes the application name. | ||
More information, including schema, is provided at [KCL LeaseTable][kcl-leasetable]. | ||
|
||
## Lease Assignment | ||
|
||
The "life" of a lease closely mirrors the duration that a shard is being processed. | ||
stair-aws marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
![Activity diagram of KCL shard-to-lease assignments. | ||
shard-0 (lease-0) is unmodified. | ||
shard-1 (lease-1) is split into shard-4 (lease-4) and shard-5 (lease-5). | ||
shard-2 (lease-2) and shard-3 (lease-3) are merged into shard-6 (lease-6). | ||
](images/leases-and-mutations.png) | ||
|
||
Specifically, leases are unique to the shard and are not recycled for stream mutations (i.e., split, merge). | ||
Shards created by stream mutations will generate a new lease. | ||
|
||
It should be noted that the number of tracked leases may exceed the number of shards. | ||
Per the diagram (above), this can occur when there are stream mutations propagating through KCL. | ||
For example, a 10-shard stream that is split on every shard may temporarily have up-to 30 leases: 10 original + 20 children. | ||
|
||
N.B. Leases are uniquely identified by their `leaseKey` which looks vastly different than `lease_X`. | ||
For details on the `leaseKey` format, please see [KCL LeaseTable][kcl-leasetable]. | ||
|
||
## Lease Lifecycle | ||
|
||
Leases follow a relatively simple, progressive state machine: | ||
`DISCOVERY -> CREATION -> PROCESSING -> SHARD_END -> DELETION` | ||
stair-aws marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Excluding `SHARD_END`, these phases are illustrative of KCL logic and are not explicitly codified. | ||
|
||
1. `DISCOVERY`: KCL [shard-syncing](#lease-syncing) identifies new shards. | ||
Discovered shards may result from: | ||
* First time starting KCL with an empty lease table. | ||
* Stream mutations (i.e., split, merge) that create child shards. | ||
* In multi-stream mode, dynamic discovery of a new stream. | ||
1. `CREATION`: Leases are created 1:1 for each discovered shard. | ||
* Leases are only created if they are eligible for processing. | ||
For example, child shards will not have leases created until its parent(s) have reached `SHARD_END`. | ||
* Leases are initialized at the configured initial position. | ||
* A notable exception is that child leases are initialized at `TRIM_HORIZON` to avoid processing gaps from their parent lease(s). | ||
1. `PROCESSING`: Leases are processed, and continually updated with new checkpoints. | ||
* In general, leases spend the majority of their life in this state. | ||
1. `SHARD_END`: The associated shard is `SHARD_END` and all records have been processed by KCL for the shard. | ||
1. `DELETION`: Since there are no more records to process, KCL will delete the lease from the lease table. | ||
* Lease deletion will not occur until after its child lease(s) enter `PROCESSING`. | ||
* This tombstone helps KCL ensure durability and convergence for all discovered leases. | ||
* To dive deep, see [LeaseCleanupManager#cleanupLeaseForCompletedShard(...)][lease-cleanup-manager-cleanupleaseforcompletedshard][^fixed-commit-footnote] | ||
* [Deletion is configurable][lease-cleanup-config] yet recommended to minimize I/O of lease table scans. | ||
|
||
### Lease Syncing | ||
|
||
Lease syncing is a complex responsibility owned by the "leader" host in a KCL application. | ||
By invoking the [ListShards API][list-shards], KCL will identify the shards for the configured stream(s). | ||
This process is scheduled at a [configurable interval][lease-auditor-config] so KCL can decide whether it should query for potentially-new shards. | ||
|
||
![Abridged sequence diagram of the Shard Sync process. | ||
Listed participants are the Scheduler, LeaseCoordinator, PeriodicShardSyncManager, ShardSyncTask, | ||
Lease Table (DDB), LeaseRefresher, LeaseSynchronizer, HierarchicalShardSyncer, and ShardDetector. | ||
](images/lease-shard-sync.png) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This image is hard to read and small. I don't know if we can increase text size. Also it introduces a lot of new terms that the customer maynot be familiar with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Agreed, and this is after creating the abridged version that omits several classes/calls. It's likely a safe assumption that KCL users know how to click-into an image, or zoom-in via their browsers.
Anything specific to call out? Code links are provided immediately after so any reader can dive deeper. |
||
|
||
For convenience, links to code: | ||
* `Scheduler`: [implementation][scheduler] | ||
* `LeaseCoordinator`: [interface][lease-coordinator], [implementation][lease-coordinator-impl] | ||
* `PeriodicShardSyncManager`: [implementation][periodic-shard-sync-manager] | ||
* `ShardSyncTask`: [interface][consumer-task], [implementation][consumer-task-impl] | ||
* `LeaseRefresher`: [interface][lease-refresher], [implementation][lease-refresher-impl] | ||
* `LeaseSynchronizer`: [implementation][non-empty-lease-table-synchronizer] | ||
* `HierarchicalShardSyncer`: [implementation][hierarchical-shard-syncer] | ||
* `ShardDetector`: [interface][shard-detector], [implementation][shard-detector-impl] | ||
|
||
Lease creation is a deterministic process. | ||
Assume a stream has the following shard hierarchy: | ||
<pre> | ||
Shard structure (each level depicts a stream segment): | ||
0 1 2 3 4 5 - shards till epoch 102 | ||
\ / \ / | | | ||
6 7 4 5 - shards from epoch 103 - 205 | ||
\ / | / \ | ||
8 4 9 10 - shards from epoch 206+ (still open) | ||
</pre> | ||
|
||
Then [NonEmptyLeaseTableSynchronizer][non-empty-lease-table-synchronizer] | ||
would create leases dependent on the configured initial position. | ||
Assuming leases `(4, 5, 7)` already exist, the leases created for an initial position would be: | ||
* `LATEST` creates `(6)` to resolve the gap on-par with epochs 103-205 which is required to eventually reach `LATEST` | ||
* `TRIM_HORIZON` creates `(0, 1)` to resolve the gap starting from the `TRIM_HORIZON` | ||
* `AT_TIMESTAMP(epoch=200)` creates `(0, 1)` to resolve the gap leading into epoch 200 | ||
|
||
#### Avoiding a Shard-Sync | ||
|
||
To reduce Kinesis API calls, KCL will attempt to avoid unnecessary shard syncs. | ||
For example, if the discovered shards cover the entire partition range then a shard-sync is unlikely to yield a material difference. | ||
To dive deeper, see | ||
[PeriodicShardSyncManager#checkForShardSync(...)][periodic-shard-sync-manager-checkforshardsync])[^fixed-commit-footnote]. | ||
|
||
## Lease Balancing | ||
|
||
KCL will, at a cadence configured by `leaseDuration` and `epsilonMillis`, attempt to "balance" leases across workers. | ||
Lease balancing is done to protect against stagnation if a worker stops updating the lease table (e.g., host failure). | ||
This operation is naive and only attempts to equally distribute leases across the available hosts; | ||
shards are not guaranteed to be equal in their `put` workloads, and balancing is blind to this I/O skew. | ||
|
||
![Sequence diagram of the KCL Lease Taking workflow. | ||
Participants include the LeaseCoordinator, LeaseTaker, LeaseRefresher, and Lease Table (DDB). | ||
LeaseRefresher is leveraged to acquire the leases from the lease table. | ||
LeaseTaker identifies which leases are eligible for taking/stealing. | ||
All taken/stolen leases are passed through LeaseRefresher to update the lease table. | ||
](images/lease-taking.png) | ||
|
||
For convenience, links to code: | ||
* `LeaseCoordinator`: [interface][lease-coordinator], [implementation][lease-coordinator-impl] | ||
* `LeaseTaker`: [interface][lease-taker], [implementation][lease-taker-impl] | ||
* `LeaseRefresher`: [interface][lease-refresher], [implementation][lease-refresher-impl] | ||
|
||
Leases are stolen if-and-only-if there were zero expired leases and the looking-to-steal-worker desires more leases. | ||
Stolen leases are randomly selected from whichever worker has the most leases. | ||
The maximum number of leases to steal on each loop is configured via [maxLeasesToStealAtOneTime][max-leases-to-steal-config]. | ||
|
||
Customers should consider the following trade-offs when configuring the lease-taking cadence: | ||
1. `LeaseRefresher` invokes a DDB `scan` against the lease table which has a cost proportional to the number of leases. | ||
1. Frequent balancing may cause high lease turn-over which incurs DDB `write` costs, and potentially redundant work for stolen leases. | ||
1. Low `maxLeasesToStealAtOneTime` may increase the time to fully (re)assign leases after an impactful event (e.g., deployment, host failure). | ||
|
||
# Additional Reading | ||
|
||
Informative articles that are recommended (in no particular order): | ||
* https://aws.amazon.com/blogs/big-data/processing-amazon-dynamodb-streams-using-the-amazon-kinesis-client-library/ | ||
|
||
[^fixed-commit-footnote]: This link is a point-in-time reference to a specific commit to guarantee static line numbers. | ||
This code reference is not guaranteed to remain consistent with the `master` branch. | ||
|
||
[consumer-task]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerTask.java | ||
[consumer-task-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java | ||
[dynamodb]: https://aws.amazon.com/dynamodb/ | ||
[hierarchical-shard-syncer]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java | ||
[kcl-leasetable]: https://docs.aws.amazon.com/streams/latest/dev/shared-throughput-kcl-consumers.html#shared-throughput-kcl-consumers-leasetable | ||
[lease-auditor-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L204-L209 | ||
[lease-cleanup-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L112-L128 | ||
[lease-cleanup-manager-cleanupleaseforcompletedshard]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java#L263-L294 | ||
[lease-coordinator]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCoordinator.java | ||
[lease-coordinator-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java | ||
[lease-refresher]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java | ||
[lease-refresher-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java | ||
[lease-taker]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseTaker.java | ||
[lease-taker-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java | ||
[list-shards]: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html | ||
[max-leases-to-steal-config]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java#L142-L149 | ||
[non-empty-lease-table-synchronizer]: https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java#L857-L910 | ||
[periodic-shard-sync-manager]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java | ||
[periodic-shard-sync-manager-checkforshardsync]: https://github.com/awslabs/amazon-kinesis-client/blob/3d6800874cdc5e4c18df6ea0197f607f6298cab7/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/PeriodicShardSyncManager.java#L267-L300 | ||
[scheduler]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java | ||
[shard-detector]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java | ||
[shard-detector-impl]: /amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
@startuml | ||
'https://plantuml.com/sequence-diagram | ||
|
||
autonumber | ||
|
||
title KCL Shard Syncing (Abridged) | ||
|
||
participant Scheduler as S | ||
participant LeaseCoordinator as LC | ||
participant PeriodShardSyncManager as PSS | ||
participant ShardSyncTask as SST | ||
participant "Lease Table (DDB)" as DDB | ||
participant LeaseRefresher as LR | ||
participant LeaseSynchronizer as LS | ||
participant HierarchicalShardSyncer as HSS | ||
participant ShardDetector as SD | ||
|
||
alt on initialization | ||
S->S: create PeriodicShardSyncManager(\n ..., leaseRefresher, leasesRecoveryAuditorExecutionFrequencyMillis, ...) | ||
S->LC: initialize() | ||
opt if lease table does not exist | ||
LC->DDB: create lease table | ||
end | ||
S->PSS: start() | ||
PSS->PSS: schedule self every\n leasesRecoveryAuditorExecutionFrequencyMillis | ||
end | ||
|
||
loop every leasesRecoveryAuditorExecutionFrequencyMillis | ||
opt if worker is not leader | ||
PSS->PSS: go back to sleep | ||
end | ||
|
||
PSS->PSS: runShardSync() | ||
opt if not required to sync shards | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would a leader not be required to sync shards? Is this if there are no shards, and if so can we say that explicitly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some examples:
The more specific/nuanced documentation is, the more likely it will suffer bitrot with an implementation change. We should balance documentation to convey ideas vs. repacking the implementation as English. Update pending to provide some additional details, yet not a full enumeration. |
||
PSS->PSS: go back to sleep | ||
end | ||
|
||
PSS->SST: call() | ||
SST->HSS: checkAndCreateLeasesForNewShards(\n shardDetector, initialPosition, ...) | ||
opt if lease table is empty | ||
HSS->HSS: getShardListAtInitialPosition(shardDetector, initialPosition) | ||
HSS->SD: listShardsWithFilter(initialPositionFilter) | ||
else lease table is non-empty | ||
HSS->HSS: getShardList(shardDetector) | ||
HSS->SD: listShards(...) | ||
note over SD | ||
ShardDetector invokes the Kinesis ListShards API. | ||
end note | ||
end | ||
HSS->HSS: checkAndCreateLeaseForNewShards(\n shardDetector, leaseRefresher, initialPosition, ...) | ||
HSS->LR: listLeases() | ||
LR->DDB: scan(:streamName=streamName) | ||
DDB->LR: leases from table | ||
LR->HSS: leases from table | ||
HSS->LS: determine leases to create | ||
LS->HSS: leases that are eligible for processing | ||
loop every lease to create | ||
HSS->LR: createLeaseIfNotExists(lease) | ||
LR->DDB: putItem(lease) | ||
end | ||
end | ||
|
||
@enduml |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
@startuml | ||
'https://plantuml.com/sequence-diagram | ||
|
||
autonumber | ||
|
||
title KCL Lease Taking | ||
|
||
participant LeaseCoordinator as LC | ||
participant LeaseTaker as LT | ||
participant LeaseRefresher as LR | ||
participant "Lease Table (DDB)" as DDB | ||
|
||
loop every 2*(leaseDurationMillis + epsilonMillis) | ||
LC->LT: takeLeases() | ||
LT->LT: updateAllLeases() | ||
LT->LR: listLeases() | ||
LR->DDB: scan(:streamName=streamName) | ||
DDB->LR: leases from table | ||
LR->LT: leases from table | ||
LT->LT: evict from memory any lease not returned from table | ||
LT->LT: getExpiredLeases() | ||
note over LT | ||
Leases are "expired" if their last scan time exceeds the max lease duration. | ||
end note | ||
LT->LT: calculate lease count per host | ||
LT->LT: chooseLeasesToSteal() | ||
loop for each lease to take/steal | ||
LT->LR: takeLease(lease, worker) | ||
LR->DDB: updateItem(lease) | ||
end | ||
end | ||
|
||
@enduml |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
@startuml | ||
'https://plantuml.com/activity-diagram-legacy | ||
|
||
title KCL Shard<->Lease Assignments | ||
|
||
[*] --> ShardNoOp | ||
[*] --> ShardSplit | ||
[*] --> ShardMerge | ||
ShardNoOp --> [*] | ||
ShardSplit --> [*] | ||
ShardMerge --> [*] | ||
|
||
state ShardNoOp { | ||
shard_0 : lease_0 | ||
} | ||
|
||
state ShardSplit { | ||
shard_1 : lease_1 | ||
shard_4 : lease_4 | ||
shard_5 : lease_5 | ||
|
||
shard_1 --> shard_4 : split | ||
shard_1 --> shard_5 : split | ||
} | ||
|
||
state ShardMerge { | ||
shard_2 : lease_2 | ||
shard_3 : lease_3 | ||
shard_6 : lease_6 | ||
|
||
shard_2 --> shard_6 : merge | ||
shard_3 --> shard_6 : merge | ||
} | ||
|
||
@enduml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these changes intended ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Per Lombok @Value:
Explicit
private final
are unnecessary/redundant, and get flagged by IntelliJ as such.