-
Notifications
You must be signed in to change notification settings - Fork 467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Embed StreamConfig within ShardInfo #1304
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -68,7 +68,6 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.kinesis.leases.ShardInfo; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.kinesis.leases.ShardPrioritization; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.kinesis.leases.ShardSyncTaskManager; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import software.amazon.kinesis.leases.exceptions.DependencyException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -812,7 +811,7 @@ Callable<GracefulShutdownContext> createWorkerShutdownCallable() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for (Lease lease : leases) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
lease, notificationCompleteLatch, shutdownCompleteLatch); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ShardInfo shardInfo = DynamoDBLeaseCoordinator.convertLeaseToAssignment(lease); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final ShardInfo shardInfo = constructShardInfoFromLease(lease); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (consumer != null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
consumer.gracefulShutdown(shutdownNotification); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -895,7 +894,9 @@ private void finalShutdown() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private List<ShardInfo> getShardInfoForAssignments() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<ShardInfo> assignedStreamShards = leaseCoordinator.getCurrentAssignments(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final List<ShardInfo> assignedStreamShards = leaseCoordinator.getAssignments().stream() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.map(this::constructShardInfoFromLease) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
.collect(Collectors.toList()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
List<ShardInfo> prioritizedShards = shardPrioritization.prioritize(assignedStreamShards); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if ((prioritizedShards != null) && (!prioritizedShards.isEmpty())) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -952,26 +953,20 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@NonNull final LeaseCleanupManager leaseCleanupManager) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
checkpoint); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// The only case where streamName is not available will be when multistreamtracker not set. In this case, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// get the default stream name for the single stream application. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// Irrespective of single stream app or multi stream app, streamConfig should always be available. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
// to gracefully complete the reading. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (streamConfig == null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamConfig = streamTracker.createStreamConfig(streamIdentifier); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final StreamConfig streamConfig = shardInfo.streamConfig(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!currentStreamConfigMap.containsKey(streamConfig.streamIdentifier())) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can be moved to where we first find out it is an orphan |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
log.info("Created orphan {}", streamConfig); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Validate.notNull(streamConfig, "StreamConfig should not be null"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/* | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* NOTE: RecordsPublisher#createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory) is deprecated. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* RecordsPublisher#createGetRecordsCache(ShardInfo, MetricsFactory) will be called directly in the future. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+960
to
964
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the reason we are not changing this now is that there could be some clients which pass in their own interface of RetrievalFactory and may not implement the new function yet? Wouldn't that still mean its backwards incompatible if we ever change its, so it would have to be a major version upgrade? |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamConfig.streamIdentifier(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
leaseCoordinator, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
executorService, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cache, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardRecordProcessorFactory.shardRecordProcessor(streamIdentifier), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardRecordProcessorFactory.shardRecordProcessor(streamConfig.streamIdentifier()), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
checkpoint, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
checkpointer, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
parentShardPollIntervalMillis, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -981,7 +976,6 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
maxListShardsRetryAttempts, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
processorConfig.callProcessRecordsEvenForEmptyRecordList(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardConsumerDispatchPollIntervalMillis, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamConfig.initialPositionInStreamExtended(), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
cleanupLeasesUponShardCompletion, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ignoreUnexpetedChildShards, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardDetectorProvider.apply(streamConfig), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1039,18 +1033,6 @@ private void logExecutorState() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
executorStateEvent.accept(diagnosticEventHandler); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final StreamIdentifier streamIdentifier; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (streamIdentifierString.isPresent()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Validate.notNull(streamIdentifier, "Stream identifier should not be empty"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return streamIdentifier; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
/** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -1090,6 +1072,31 @@ private void resetInfoLogging() { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private ShardInfo constructShardInfoFromLease(final Lease lease) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final boolean isMultiStreamLease = lease instanceof MultiStreamLease; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final Optional<String> streamIdentifierSerialization = isMultiStreamLease ? | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as below, encapsulate this inside the Lease and MultiStreamLease classes so you dont need these if checks, you can do |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally these should be encapsulated in the class itself. lease.shardId() should give leaseKey if its Lease and shardId if its MultiStreamLease. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return new ShardInfo( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if (!streamIdentifierSerialization.isPresent()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: double negations are always hard to read, can you use Validate.isFalse(isMultistreamMod, "...") |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Validate.notNull(streamConfig, "StreamConfig should not be null"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return streamConfig; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
final StreamIdentifier streamIdentifier = | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is where we need the log that we are processing an orphan stream |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+1075
to
+1098
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This makes it a little more clear that the empty optional represents an uncastable Lease. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
@Deprecated | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
public Future<Void> requestShutdown() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return null; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,31 +18,37 @@ | |
import java.util.Collections; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
import lombok.AccessLevel; | ||
import org.apache.commons.lang3.builder.EqualsBuilder; | ||
import org.apache.commons.lang3.builder.HashCodeBuilder; | ||
|
||
import lombok.Getter; | ||
import lombok.NonNull; | ||
import lombok.ToString; | ||
import lombok.experimental.Accessors; | ||
import software.amazon.kinesis.common.StreamConfig; | ||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; | ||
|
||
/** | ||
* Used to pass shard related info among different classes and as a key to the map of shard consumers. | ||
*/ | ||
@Getter | ||
@Accessors(fluent = true) | ||
@ToString | ||
@ToString(exclude = {"isMultiStreamMode", "streamIdentifierStr"}) | ||
public class ShardInfo { | ||
|
||
private final Optional<String> streamIdentifierSerOpt; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is ok to remove, but please implement the fluent accessor for this field for backward compatibility and mark it as deprecated |
||
private final String shardId; | ||
private final String concurrencyToken; | ||
// Sorted list of parent shardIds. | ||
private final List<String> parentShardIds; | ||
private final ExtendedSequenceNumber checkpoint; | ||
private final StreamConfig streamConfig; | ||
|
||
@Getter(AccessLevel.NONE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes thank you, this is necessary because this is class exposes a public contract which cannot be violated and we should not expose more than what is necessary. But I do agree with lucien that we probably dont need them and get it from streamConfig when needed. |
||
private final boolean isMultiStreamMode; | ||
@Getter(AccessLevel.NONE) | ||
private final String streamIdentifierStr; | ||
Comment on lines
+48
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need these two member variables if they are just extracted out from streamConfig? I see you are using them in some functions in this class for comparisons but would it be more clean to extract it from streamConfig individually instead? |
||
|
||
/** | ||
* Creates a new ShardInfo object. The checkpoint is not part of the equality, but is used for debugging output. | ||
|
@@ -55,28 +61,14 @@ public class ShardInfo { | |
* Parent shards of the shard identified by Kinesis shardId | ||
* @param checkpoint | ||
* the latest checkpoint from lease | ||
*/ | ||
public ShardInfo(@NonNull final String shardId, | ||
final String concurrencyToken, | ||
final Collection<String> parentShardIds, | ||
final ExtendedSequenceNumber checkpoint) { | ||
this(shardId, concurrencyToken, parentShardIds, checkpoint, null); | ||
} | ||
|
||
/** | ||
* Creates a new ShardInfo object that has an option to pass a serialized streamIdentifier. | ||
* The checkpoint is not part of the equality, but is used for debugging output. | ||
* @param shardId | ||
* @param concurrencyToken | ||
* @param parentShardIds | ||
* @param checkpoint | ||
* @param streamIdentifierSer | ||
* @param streamConfig | ||
* The {@link StreamConfig} instance for the stream that the shard belongs to | ||
*/ | ||
public ShardInfo(@NonNull final String shardId, | ||
final String concurrencyToken, | ||
final Collection<String> parentShardIds, | ||
final ExtendedSequenceNumber checkpoint, | ||
final String streamIdentifierSer) { | ||
@NonNull final StreamConfig streamConfig) { | ||
this.shardId = shardId; | ||
this.concurrencyToken = concurrencyToken; | ||
this.parentShardIds = new LinkedList<>(); | ||
|
@@ -87,7 +79,9 @@ public ShardInfo(@NonNull final String shardId, | |
// This makes it easy to check for equality in ShardInfo.equals method. | ||
Collections.sort(this.parentShardIds); | ||
this.checkpoint = checkpoint; | ||
this.streamIdentifierSerOpt = Optional.ofNullable(streamIdentifierSer); | ||
this.streamConfig = streamConfig; | ||
this.isMultiStreamMode = streamConfig.streamIdentifier().isMultiStreamInstance(); | ||
this.streamIdentifierStr = streamConfig.streamIdentifier().serialize(); | ||
} | ||
|
||
/** | ||
|
@@ -114,7 +108,8 @@ public boolean isCompleted() { | |
@Override | ||
public int hashCode() { | ||
return new HashCodeBuilder() | ||
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierSerOpt.orElse("")).toHashCode(); | ||
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierStr) | ||
.toHashCode(); | ||
} | ||
|
||
/** | ||
|
@@ -139,7 +134,7 @@ public boolean equals(Object obj) { | |
ShardInfo other = (ShardInfo) obj; | ||
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken) | ||
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId) | ||
.append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals(); | ||
.append(streamIdentifierStr, other.streamIdentifierStr).isEquals(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh you want to compare StreamConfig instead you no longer need the streamIdentifierStr |
||
|
||
} | ||
|
||
|
@@ -159,8 +154,8 @@ public static String getLeaseKey(ShardInfo shardInfo) { | |
* @return lease key | ||
*/ | ||
public static String getLeaseKey(ShardInfo shardInfo, String shardIdOverride) { | ||
return shardInfo.streamIdentifierSerOpt().isPresent() ? | ||
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardIdOverride) : | ||
return shardInfo.isMultiStreamMode ? | ||
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierStr, shardIdOverride) : | ||
shardIdOverride; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont think this needs to be public, see my comment on the usage of this in during the construction of FanOutRecordsPublisher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also if you make the other change and we probably dont need a method because its used in only one place. Also I'd like to avoid saying a streamIdentifier is single stream instance or multi stream instance. What does it even mean? You can create a streamIdentifier from streamName, streamArn or streamSer, and its just an ID. I know this is existing code, it was probably meant to force customers to use a streamIdentifier constructed using streamArn or streamArn in multistream mode, but why? If is use just streamName in multi-stream mode, it should still work just fine?