Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed StreamConfig within ShardInfo #1304

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public class StreamIdentifier {
* or {@link #streamName} in single-stream mode.
*/
public String serialize() {
if (!streamCreationEpochOptional.isPresent()) {
// creation epoch is expected to be empty in single-stream mode
if (!isMultiStreamInstance()) {
return streamName;
}

Expand All @@ -85,6 +84,16 @@ public String toString() {
return serialize();
}

/**
* Determine whether this {@link StreamIdentifier} is a multi-stream instance.
*
* @return true if this is a multi-stream instance, false otherwise.
*/
public boolean isMultiStreamInstance() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont think this needs to be public, see my comment on the usage of this in during the construction of FanOutRecordsPublisher

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if you make the other change and we probably dont need a method because its used in only one place. Also I'd like to avoid saying a streamIdentifier is single stream instance or multi stream instance. What does it even mean? You can create a streamIdentifier from streamName, streamArn or streamSer, and its just an ID. I know this is existing code, it was probably meant to force customers to use a streamIdentifier constructed using streamArn or streamArn in multistream mode, but why? If is use just streamName in multi-stream mode, it should still work just fine?

// creation epoch is expected to be present if and only if in multi-stream mode
return streamCreationEpochOptional.isPresent();
}

/**
* Create a multi stream instance for StreamIdentifier from serialized stream identifier
* of format {@link #STREAM_IDENTIFIER_PATTERN}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardPrioritization;
import software.amazon.kinesis.leases.ShardSyncTaskManager;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
Expand Down Expand Up @@ -812,7 +811,7 @@ Callable<GracefulShutdownContext> createWorkerShutdownCallable() {
for (Lease lease : leases) {
ShutdownNotification shutdownNotification = new ShardConsumerShutdownNotification(leaseCoordinator,
lease, notificationCompleteLatch, shutdownCompleteLatch);
ShardInfo shardInfo = DynamoDBLeaseCoordinator.convertLeaseToAssignment(lease);
final ShardInfo shardInfo = constructShardInfoFromLease(lease);
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
if (consumer != null) {
consumer.gracefulShutdown(shutdownNotification);
Expand Down Expand Up @@ -895,7 +894,9 @@ private void finalShutdown() {
}

private List<ShardInfo> getShardInfoForAssignments() {
List<ShardInfo> assignedStreamShards = leaseCoordinator.getCurrentAssignments();
final List<ShardInfo> assignedStreamShards = leaseCoordinator.getAssignments().stream()
.map(this::constructShardInfoFromLease)
.collect(Collectors.toList());
List<ShardInfo> prioritizedShards = shardPrioritization.prioritize(assignedStreamShards);

if ((prioritizedShards != null) && (!prioritizedShards.isEmpty())) {
Expand Down Expand Up @@ -952,26 +953,20 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
@NonNull final LeaseCleanupManager leaseCleanupManager) {
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());

// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
// to gracefully complete the reading.
StreamConfig streamConfig = currentStreamConfigMap.get(streamIdentifier);
if (streamConfig == null) {
streamConfig = streamTracker.createStreamConfig(streamIdentifier);
final StreamConfig streamConfig = shardInfo.streamConfig();
if (!currentStreamConfigMap.containsKey(streamConfig.streamIdentifier())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be moved to where we first find out it is an orphan

log.info("Created orphan {}", streamConfig);
}
Validate.notNull(streamConfig, "StreamConfig should not be null");
/*
* NOTE: RecordsPublisher#createGetRecordsCache(ShardInfo, StreamConfig, MetricsFactory) is deprecated.
* RecordsPublisher#createGetRecordsCache(ShardInfo, MetricsFactory) will be called directly in the future.
*/
RecordsPublisher cache = retrievalConfig.retrievalFactory().createGetRecordsCache(shardInfo, streamConfig, metricsFactory);
Comment on lines +960 to 964
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason we are not changing this now is that there could be some clients which pass in their own interface of RetrievalFactory and may not implement the new function yet? Wouldn't that still mean its backwards incompatible if we ever change its, so it would have to be a major version upgrade?

ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo,
streamConfig.streamIdentifier(),
leaseCoordinator,
executorService,
cache,
shardRecordProcessorFactory.shardRecordProcessor(streamIdentifier),
shardRecordProcessorFactory.shardRecordProcessor(streamConfig.streamIdentifier()),
checkpoint,
checkpointer,
parentShardPollIntervalMillis,
Expand All @@ -981,7 +976,6 @@ protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
maxListShardsRetryAttempts,
processorConfig.callProcessRecordsEvenForEmptyRecordList(),
shardConsumerDispatchPollIntervalMillis,
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpetedChildShards,
shardDetectorProvider.apply(streamConfig),
Expand Down Expand Up @@ -1039,18 +1033,6 @@ private void logExecutorState() {
executorStateEvent.accept(diagnosticEventHandler);
}

private StreamIdentifier getStreamIdentifier(Optional<String> streamIdentifierString) {
final StreamIdentifier streamIdentifier;
if (streamIdentifierString.isPresent()) {
streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierString.get());
} else {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
streamIdentifier = this.currentStreamConfigMap.values().iterator().next().streamIdentifier();
}
Validate.notNull(streamIdentifier, "Stream identifier should not be empty");
return streamIdentifier;
}

/**
* Logger for suppressing too much INFO logging. To avoid too much logging information Worker will output logging at
* INFO level for a single pass through the main loop every minute. At DEBUG level it will output all INFO logs on
Expand Down Expand Up @@ -1090,6 +1072,31 @@ private void resetInfoLogging() {
}
}

private ShardInfo constructShardInfoFromLease(final Lease lease) {
final boolean isMultiStreamLease = lease instanceof MultiStreamLease;

final Optional<String> streamIdentifierSerialization = isMultiStreamLease ?
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as below, encapsulate this inside the Lease and MultiStreamLease classes so you dont need these if checks, you can do
Optional.ofNullable(lease.streamIdentifier), make Lease.streamIdentifier return null

final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization);

final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these should be encapsulated in the class itself. lease.shardId() should give leaseKey if its Lease and shardId if its MultiStreamLease.

return new ShardInfo(
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig);
}

private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) {
if (!streamIdentifierSerialization.isPresent()) {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double negations are always hard to read, can you use Validate.isFalse(isMultistreamMod, "...")

final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next();
Validate.notNull(streamConfig, "StreamConfig should not be null");
return streamConfig;
}

final StreamIdentifier streamIdentifier =
StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get());
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we need the log that we are processing an orphan stream

}
Comment on lines +1075 to +1098
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private ShardInfo constructShardInfoFromLease(final Lease lease) {
final boolean isMultiStreamLease = lease instanceof MultiStreamLease;
final Optional<String> streamIdentifierSerialization = isMultiStreamLease ?
Optional.of(((MultiStreamLease) lease).streamIdentifier()) : Optional.empty();
final StreamConfig streamConfig = getOrCreateStreamConfig(streamIdentifierSerialization);
final String shardId = isMultiStreamLease ? ((MultiStreamLease) lease).shardId() : lease.leaseKey();
return new ShardInfo(
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig);
}
private StreamConfig getOrCreateStreamConfig(final Optional<String> streamIdentifierSerialization) {
if (!streamIdentifierSerialization.isPresent()) {
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
final StreamConfig streamConfig = currentStreamConfigMap.values().iterator().next();
Validate.notNull(streamConfig, "StreamConfig should not be null");
return streamConfig;
}
final StreamIdentifier streamIdentifier =
StreamIdentifier.multiStreamInstance(streamIdentifierSerialization.get());
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier));
}
private ShardInfo constructShardInfoFromLease(final Lease lease) {
Optional<MultiStreamLease> msl = Optional.of(lease).filter(l->l instanceof MultiStreamLease).map(l->(MultiStreamLease) l);
String shardId = msl.map(MultiStreamLease::shardId).orElse(lease.leaseKey());
StreamConfig streamConfig = msl.map(MultiStreamLease::streamIdentifier).map(streamIdentifierSerialization -> {
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierSerialization);
return currentStreamConfigMap.getOrDefault(streamIdentifier, streamTracker.createStreamConfig(streamIdentifier));
}).orElseGet(()->{
Validate.isTrue(!isMultiStreamMode, "Should not be in MultiStream Mode");
return Optional.ofNullable(currentStreamConfigMap.values().iterator().next()).get();
});
return new ShardInfo(
shardId, lease.concurrencyToken().toString(), lease.parentShardIds(), lease.checkpoint(), streamConfig);
}

This makes it a little more clear that the empty optional represents an uncastable Lease.


@Deprecated
public Future<Void> requestShutdown() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ public void shutdown() {
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
final Lease lease = leasePendingDeletion.lease();
if (lease == null) {
log.warn("Cannot enqueue {} for {} as instance doesn't hold the lease for that shard.",
leasePendingDeletion.shardInfo(), leasePendingDeletion.streamIdentifier());
log.warn("Cannot enqueue {} as instance doesn't hold the lease for that shard.",
leasePendingDeletion.shardInfo());
} else {
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
if (!deletionQueue.add(leasePendingDeletion)) {
Expand Down Expand Up @@ -166,7 +166,7 @@ public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion
InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
final Lease lease = leasePendingDeletion.lease();
final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
final StreamIdentifier streamIdentifier = shardInfo.streamConfig().streamIdentifier();

final AWSExceptionManager exceptionManager = createExceptionManager();

Expand Down Expand Up @@ -328,7 +328,8 @@ void cleanupLeases() {
while (!deletionQueue.isEmpty()) {
final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll();
final String leaseKey = leasePendingDeletion.lease().leaseKey();
final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
final StreamIdentifier streamIdentifier =
leasePendingDeletion.shardInfo().streamConfig().streamIdentifier();
boolean deletionSucceeded = false;
try {
final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,14 @@ boolean updateLease(Lease lease, UUID concurrencyToken, String operation, String

/**
* @return Current shard/lease assignments
* @deprecated This method is deprecated and will be removed in future versions.
* {@link LeaseCoordinator} implementations should not be required to construct and return
* {@link ShardInfo} objects. {@link #getAssignments()} can be used to return the currently held leases.
*/
List<ShardInfo> getCurrentAssignments();
@Deprecated
default List<ShardInfo> getCurrentAssignments() {
throw new UnsupportedOperationException("This method is deprecated and should not be used.");
}

/**
* Default implementation returns an empty list and concrete implementation is expected to return all leases
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,37 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

import lombok.AccessLevel;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;

import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.Accessors;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

/**
* Used to pass shard related info among different classes and as a key to the map of shard consumers.
*/
@Getter
@Accessors(fluent = true)
@ToString
@ToString(exclude = {"isMultiStreamMode", "streamIdentifierStr"})
public class ShardInfo {

private final Optional<String> streamIdentifierSerOpt;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok to remove, but please implement the fluent accessor for this field for backward compatibility and mark it as deprecated

private final String shardId;
private final String concurrencyToken;
// Sorted list of parent shardIds.
private final List<String> parentShardIds;
private final ExtendedSequenceNumber checkpoint;
private final StreamConfig streamConfig;

@Getter(AccessLevel.NONE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thank you, this is necessary because this is class exposes a public contract which cannot be violated and we should not expose more than what is necessary. But I do agree with lucien that we probably dont need them and get it from streamConfig when needed.

private final boolean isMultiStreamMode;
@Getter(AccessLevel.NONE)
private final String streamIdentifierStr;
Comment on lines +48 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these two member variables if they are just extracted out from streamConfig? I see you are using them in some functions in this class for comparisons but would it be more clean to extract it from streamConfig individually instead?


/**
* Creates a new ShardInfo object. The checkpoint is not part of the equality, but is used for debugging output.
Expand All @@ -55,28 +61,14 @@ public class ShardInfo {
* Parent shards of the shard identified by Kinesis shardId
* @param checkpoint
* the latest checkpoint from lease
*/
public ShardInfo(@NonNull final String shardId,
final String concurrencyToken,
final Collection<String> parentShardIds,
final ExtendedSequenceNumber checkpoint) {
this(shardId, concurrencyToken, parentShardIds, checkpoint, null);
}

/**
* Creates a new ShardInfo object that has an option to pass a serialized streamIdentifier.
* The checkpoint is not part of the equality, but is used for debugging output.
* @param shardId
* @param concurrencyToken
* @param parentShardIds
* @param checkpoint
* @param streamIdentifierSer
* @param streamConfig
* The {@link StreamConfig} instance for the stream that the shard belongs to
*/
public ShardInfo(@NonNull final String shardId,
final String concurrencyToken,
final Collection<String> parentShardIds,
final ExtendedSequenceNumber checkpoint,
final String streamIdentifierSer) {
@NonNull final StreamConfig streamConfig) {
this.shardId = shardId;
this.concurrencyToken = concurrencyToken;
this.parentShardIds = new LinkedList<>();
Expand All @@ -87,7 +79,9 @@ public ShardInfo(@NonNull final String shardId,
// This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds);
this.checkpoint = checkpoint;
this.streamIdentifierSerOpt = Optional.ofNullable(streamIdentifierSer);
this.streamConfig = streamConfig;
this.isMultiStreamMode = streamConfig.streamIdentifier().isMultiStreamInstance();
this.streamIdentifierStr = streamConfig.streamIdentifier().serialize();
}

/**
Expand All @@ -114,7 +108,8 @@ public boolean isCompleted() {
@Override
public int hashCode() {
return new HashCodeBuilder()
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierSerOpt.orElse("")).toHashCode();
.append(concurrencyToken).append(parentShardIds).append(shardId).append(streamIdentifierStr)
.toHashCode();
}

/**
Expand All @@ -139,7 +134,7 @@ public boolean equals(Object obj) {
ShardInfo other = (ShardInfo) obj;
return new EqualsBuilder().append(concurrencyToken, other.concurrencyToken)
.append(parentShardIds, other.parentShardIds).append(shardId, other.shardId)
.append(streamIdentifierSerOpt.orElse(""), other.streamIdentifierSerOpt.orElse("")).isEquals();
.append(streamIdentifierStr, other.streamIdentifierStr).isEquals();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh you want to compare StreamConfig instead you no longer need the streamIdentifierStr


}

Expand All @@ -159,8 +154,8 @@ public static String getLeaseKey(ShardInfo shardInfo) {
* @return lease key
*/
public static String getLeaseKey(ShardInfo shardInfo, String shardIdOverride) {
return shardInfo.streamIdentifierSerOpt().isPresent() ?
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierSerOpt().get(), shardIdOverride) :
return shardInfo.isMultiStreamMode ?
MultiStreamLease.getLeaseKey(shardInfo.streamIdentifierStr, shardIdOverride) :
shardIdOverride;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand All @@ -29,16 +28,13 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.LeaseRenewer;
import software.amazon.kinesis.leases.LeaseTaker;
import software.amazon.kinesis.leases.MultiStreamLease;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
Expand Down Expand Up @@ -366,34 +362,6 @@ private static ExecutorService getLeaseRenewalExecutorService(int maximumPoolSiz
new LinkedTransferQueue<>(), LEASE_RENEWAL_THREAD_FACTORY);
}

@Override
public List<ShardInfo> getCurrentAssignments() {
Collection<Lease> leases = getAssignments();
return convertLeasesToAssignments(leases);
}

private static List<ShardInfo> convertLeasesToAssignments(final Collection<Lease> leases) {
if (leases == null) {
return Collections.emptyList();
}
return leases.stream().map(DynamoDBLeaseCoordinator::convertLeaseToAssignment).collect(Collectors.toList());
}

/**
* Utility method to convert the basic lease or multistream lease to ShardInfo
* @param lease
* @return ShardInfo
*/
public static ShardInfo convertLeaseToAssignment(final Lease lease) {
if (lease instanceof MultiStreamLease) {
return new ShardInfo(((MultiStreamLease) lease).shardId(), lease.concurrencyToken().toString(), lease.parentShardIds(),
lease.checkpoint(), ((MultiStreamLease) lease).streamIdentifier());
} else {
return new ShardInfo(lease.leaseKey(), lease.concurrencyToken().toString(), lease.parentShardIds(),
lease.checkpoint());
}
}

/**
* {@inheritDoc}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.experimental.Accessors;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
Expand All @@ -36,7 +35,6 @@
@Value
public class LeasePendingDeletion {

StreamIdentifier streamIdentifier;
Lease lease;
ShardInfo shardInfo;
ShardDetector shardDetector;
Expand Down
Loading
Loading