-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tasks assignment strategy - commons integration - [KCON-63] #384
Conversation
import org.apache.kafka.common.config.AbstractConfig; | ||
import org.apache.kafka.common.config.ConfigDef; | ||
|
||
import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance; | ||
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies; | ||
|
||
import org.codehaus.plexus.util.StringUtils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @muralibasani I know you didn't add it here but can you update this import to use
'import org.apache.commons.lang3.StringUtils;' instead of plexus which we are pulling in from the kafka library (and later versions dont include)
"Based on tasks.max config and this strategy, objects are processed in distributed" | ||
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", " | ||
+ PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH, | ||
GROUP_OTHER, sourcePollingConfigCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding SourcePollingConfigCounter++ here means we should remove the //NOPMD from line 66 also can you add the comment // Unused Assignment as an explanation for this here please? (removing it from line 67)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was an incorrect usage of counter. Updating it to use offsetStorageGroupCounter.
@@ -92,6 +105,10 @@ public String getErrorsTolerance() { | |||
return cfg.getString(ERRORS_TOLERANCE); | |||
} | |||
|
|||
public String getObjectDistributionStrategy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public String getObjectDistributionStrategy() { | |
public ObjectDistributionStrategies getObjectDistributionStrategy() { | |
return ObjectDistributionStrategies.forName(sourceConfigFragment.getObjectDistributionStrategy()); |
The same is being done for ErrorsTolerance in another PR so we should keep this consistent.
Also I am not sure should it be called ObjectDistributionStrategies or ObjectDistributionStrategy as the enum will return one strategy ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought so, but ObjectDistributionStrategy already exists as an interface in commons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the interface to be "DistributionStrategy" as each of the strategies are implementations of a distribution strategy. Then you can use ObjectDistributionStrategy as the enum name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update this to return ObjectDistributionStrategy instead of String?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@muralibasani this one is still outstanding just to update the returning value to be the object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed it, updated.
|
||
@Override | ||
public Pattern getFilePattern() { | ||
return filePattern; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the HashObjectDistributionStrategy does not use the filePattern I think getFilePattern here should throw a NotImplementedExcepetion() as it is unexpected that it ever gets called here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even for object hash strategy, pattern is used in iterator to extract topic and partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extraction of topic and partition should not depend on the distribution strategy going forward.
They are separate concerns and should be implemented as such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Pattern configuring is moved to source task.
|
||
private void configureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) { | ||
this.maxTasks = maxTasks; | ||
this.filePattern = configurePattern(expectedSourceNameFormat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This filePattern should not need to be set as it is unused here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this pattern is required in this hash object strategy too, pls check source iterator class.
* Based on the format of the file name or prefix, Pattern is created for each of the strategies. | ||
*/ | ||
default Pattern configurePattern(final String expectedSourceNameFormat) { | ||
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this configure Pattern it is really strategy implementation and shouldn't be added here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pattern is must for all the strategies on source connectors. Based on this,
- extract topic and partition in source iterator
- task assignments
are done.
If pattern is not done here, we would have to duplicate the whole piece of code again to create pattern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since some task distributions require topic and partition, topic and partition extraction should be done first and should be made available to the task distribution. This way, if future S3 implementations use a different strategy for topic and/or partition identification these strategies will work fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, moved.
private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class); | ||
|
||
private String prefix; | ||
private String s3Prefix; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is commons code so we shouldn't have any reference specifically to S3 here.
...main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java
Outdated
Show resolved
Hide resolved
@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) { | |||
this.failedObjectKeys.add(objectKey); | |||
} | |||
|
|||
public void setFilterPredicate(final Predicate<S3Object> predicate) { | |||
filterPredicate = predicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be just passing in the Strategy as a predicate here, and if it matches that predicate then, in the future as we add features, predicates can be chained making it really easy to extend this quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If DistributionStrategy has a getTaskFor(String)
method the predicate can be constructed as
s3Object -> task == distributionStrategy.getTaskFor(s3Object.key())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want a new method in AWSV2SourceClient
called addPredicate(Predicate<S3Object> newPredicate)
that will do
this.filterPredicate = this.filterPredicate.and(newPredicate)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this still needs to be updated as well, so we can link the predicates here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed failed object keys from the predicate list, I do not see any valid processing for it, as data exceptions are handled.
Code updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should still be a way to link the predicates this allows better control and configuration of the records returned.
@@ -97,13 +97,14 @@ public void start(final Map<String, String> props) { | |||
this.transformer = TransformerFactory.getTransformer(s3SourceConfig); | |||
offsetManager = new OffsetManager(context, s3SourceConfig); | |||
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); | |||
awsv2SourceClient.initializeObjectDistributionStrategy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should initialize the ObjctDistributionStrategy here in S3SourceTask and add it as a predicate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially started with it, but stuck with some issue and couldn't back to this. thx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks like a good start. Getting it settled into the streaming strategy will take a bit of work.
@@ -92,6 +105,10 @@ public String getErrorsTolerance() { | |||
return cfg.getString(ERRORS_TOLERANCE); | |||
} | |||
|
|||
public String getObjectDistributionStrategy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the interface to be "DistributionStrategy" as each of the strategies are implementations of a distribution strategy. Then you can use ObjectDistributionStrategy as the enum name.
|
||
@Override | ||
public Pattern getFilePattern() { | ||
return filePattern; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The extraction of topic and partition should not depend on the distribution strategy going forward.
They are separate concerns and should be implemented as such.
String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; | ||
String START_OFFSET_PATTERN = "{{start_offset}}"; | ||
String TIMESTAMP_PATTERN = "{{timestamp}}"; | ||
String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{topic}}/partition={{partition}}/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The files based distribution strategies will need to use these as will any file base topic and partition based extraction. So how about an interface or static class that contains the file based patterns. Call it something like FileExctractionPatterns
* Based on the format of the file name or prefix, Pattern is created for each of the strategies. | ||
*/ | ||
default Pattern configurePattern(final String expectedSourceNameFormat) { | ||
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since some task distributions require topic and partition, topic and partition extraction should be done first and should be made available to the task distribution. This way, if future S3 implementations use a different strategy for topic and/or partition identification these strategies will work fine.
da924e3
to
f342fb9
Compare
commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Show resolved
Hide resolved
6e9e77b
to
6157773
Compare
/** | ||
* @param s3SourceConfig | ||
* configuration for Source connector | ||
* @param failedObjectKeys | ||
* all objectKeys which have already been tried but have been unable to process. | ||
*/ | ||
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) { | ||
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys, | ||
final DistributionStrategy distributionStrategy, final int taskId, final Pattern filePattern) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the AWS client should be agnostic to how the task assignment works its only real focus should be on talking to AWS, so we should not set the filePattern/taskId/distributionStrategy here.
We should call setFilterPredicate either from the S3SourceTask or the SourceRecordIterator.
final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks); | ||
return taskAssignment == taskId; | ||
public void setFilterPredicate(final Predicate<S3Object> basePredicate) { | ||
this.filterPredicate = basePredicate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do need this update to look something like the below and this way we just and additional predicates as we need them.
this.filterPredicate = basePredicate | |
public void setFilterPredicate(final Predicate<S3Object> predicate) { | |
this.filterPredicate = this.filterPredicate.and(predicate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't account for or predicates but we dont have a use case for that yet so I think it should be ok for the moment.
public ObjectDistributionStrategy getObjectDistributionStrategy() { | ||
return ObjectDistributionStrategy.forName(sourceConfigFragment.getObjectDistributionStrategy()); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add in getTaskId and getMaxTasksId in CommonConfig.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it might be good in here as the existing sink connectors dont currently require it it can be moved up a level later if required.
this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; | ||
DistributionStrategy distributionStrategy; | ||
|
||
switch (objectDistributionStrategy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we actually need two separate regex's here.
One for the SourceRecordIterator and one for the Distribution Strategy.
The reason being that the fileNameFragment set in the Partition in filepath for example has an any filename pattern at the end, but users will still want to be able to set the filename template separately to only include certain files in for example they may want to put *.png and only send images or they want files with a certain pattern.
The other thing I can't 100% tell here is the difference between
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()
and
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()
is one of them the default pattern we had previously? Ignore if not but if it is we should always look to get the custom one that has been configured and fall back on the default if not configured (also throw an error if its missing partition etc when we configure PARTITION_IN_FILENAME)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no difference between original template and tostr. Just checked. Added integration tests with default value and non default.
For prefix, introduced a new config for pattern.
This should allow users to provide any patterns on file names or prefixes as long as topic
, partition
are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for checking that for me
e23306c
to
bb8c4bd
Compare
@@ -112,6 +115,16 @@ public void ensureValid(final String name, final Object value) { | |||
// UnusedAssignment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: to remove those now we have the one below.
if (currentObjectKey != null) { | ||
if (validateTaskDistributionStrategy(currentObjectKey)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with the new changes this might not be needed anymore as the predicate checks inside of the AWSV2Client as it builds the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your earlier point was correct to keep task distribution in source task/iterator. as it is not relevant in Aws client. Keeping as is.
We shall also delete the failed objects from aws client sometime, and keeping only listing objects in aws client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes agree as long as the predicate is created outside of the client and then we pass it in with the set predicate we should be good.
commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java
Show resolved
Hide resolved
*/ | ||
|
||
package io.aiven.kafka.connect.common.source.input.utils; | ||
public class FileExtractionPatterns { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a final class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted this class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that DistributionStrategy should be an abstract class that has one instance variable maxTasks
It should have 3 methods:
getMaxTasks()
getTaskFor(String)
configure(int maxTasks)
If we don't put the current task into the class then we can share the class across all instances of task.
We can construct the DistributionStrategy instance in the AivenKafkaCOnnectS3CSourceConnector
.
Classes like PartitionInFilenameDistribution should have the pattern specified in the constructor.
In the end by not binding the instance to a specific task we will end up with a more flexible implementation that is not tightly coupled with a single task.
It might make sens to pass a CommonConfig
in the DistributionStrategy constructor so that any implementation can have access to the known set of configuration properties.
It may also make sens for configure
to accept the CommonConfig
rather than the int maxTasks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I shall refactor these in next pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.
Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.
@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) { | |||
this.failedObjectKeys.add(objectKey); | |||
} | |||
|
|||
public void setFilterPredicate(final Predicate<S3Object> predicate) { | |||
filterPredicate = predicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If DistributionStrategy has a getTaskFor(String)
method the predicate can be constructed as
s3Object -> task == distributionStrategy.getTaskFor(s3Object.key())
@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) { | |||
this.failedObjectKeys.add(objectKey); | |||
} | |||
|
|||
public void setFilterPredicate(final Predicate<S3Object> predicate) { | |||
filterPredicate = predicate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might want a new method in AWSV2SourceClient
called addPredicate(Predicate<S3Object> newPredicate)
that will do
this.filterPredicate = this.filterPredicate.and(newPredicate)
3c96864
to
6c437ae
Compare
public void reconfigureDistributionStrategy(final int maxTasks) { | ||
this.maxTasks = maxTasks; | ||
} | ||
|
||
public void setMaxTasks(final int maxTasks) { | ||
this.maxTasks = maxTasks; | ||
} | ||
|
||
private void configureDistributionStrategy(final int maxTasks) { | ||
this.maxTasks = maxTasks; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 methods do exactly the same thing. This is an indication that there should be one method to configure the distribution strategy after construction and that it should take a maxTasks argument. Change reconfigureDistributionStrategy
to configureDistributionStrategy
in the base class. and just call that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.
Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.
.../java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See notes elsewhere on partition strategy for background.
This class should have a method to extract all the patterns from a string and make them available so that we can call it once to get all the values set early in the process.
It should probably have methods to return the values (e.g. getTopic()
) that return Optional types. returning Optional.empty() when the type name is not found in the pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated FilePatternUtils to have those methods reusable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution is close but... The FileNameUtils
- Should have a constructor that takes the pattern and basically does what the current
configurePattern()
does. - Should have a method that takes a fileName and produces an object (Call it a context)
Optional<Context> process(String fileName)
This method will return a context if the fileName matches, or an empty optional if it does not.
Context should be a separate class.interface and should have methods getTopic()
, getPartition()
and getTimestamp()
. These should be populated during the FileNameUtils.process()
call noted above. Context
should be a simple bean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@muralibasani
I don't see these changes noted in KCON-98. Is there another ticket for that or is it part of KCON-98?
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution is close but... The FileNameUtils
- Should have a constructor that takes the pattern and basically does what the current
configurePattern()
does. - Should have a method that takes a fileName and produces an object (Call it a context)
Optional<Context> process(String fileName)
This method will return a context if the fileName matches, or an empty optional if it does not.
Context should be a separate class.interface and should have methods getTopic()
, getPartition()
and getTimestamp()
. These should be populated during the FileNameUtils.process()
call noted above. Context
should be a simple bean.
@@ -37,18 +38,16 @@ public interface ObjectDistributionStrategy { | |||
* The value to be evaluated to determine if it should be processed by the task. | |||
* @return true if the task should process the object, false if it should not. | |||
*/ | |||
boolean isPartOfTask(int taskId, String valueToBeEvaluated); | |||
boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should just be int getTaskFor(Context)
where Context
is the class defined above for FileNameProcessing. This means that Distribution strategy is adding the requirement for Context to contain the file name.
In fact the interface should be an abstract class and it should handle tracking the number of tasks. Implementations should take the Context
and calculate a long value as an implementation of an abstract method. Then getTaskFor(Context)
can call that method and return that value % numberOfTasks.
All the classes become smaller and easier to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Claudenw I see this as an improvement and we don't have to make all the refactorings for the mvp version.
@@ -78,14 +77,12 @@ default boolean taskMatchesPartition(final int taskId, final int partitionId) { | |||
* @return true if the task supplied should handle the supplied partition | |||
*/ | |||
default boolean taskMatchesModOfPartitionAndMaxTask(final int taskId, final int maxTasks, final int partitionId) { | |||
|
|||
return taskMatchesPartition(taskId, partitionId % maxTasks); | |||
} | |||
|
|||
default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, final int partitionId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method should be deleted.
*/ | ||
void reconfigureDistributionStrategy(int maxTasks, String expectedFormat); | ||
void reconfigureDistributionStrategy(int maxTasks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should just be called configure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If DistributionStrategy is an abstract class than this becomes an implementation that is simply
long generateValue(Context) { return context.getFileName().hashCode() };
The class can focus on what it does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we discussed this not to make it abstract. Even if we need, let's move it to after mvp version
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://aiven.atlassian.net/browse/KCON-98 should do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them.
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
private boolean isFileMatchingPattern(final S3Object s3Object) { | ||
final Optional<String> optionalTopic = FilePatternUtils.getTopic(filePattern, s3Object.key()); | ||
final Optional<Integer> optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, s3Object.key()); | ||
|
||
if (optionalTopic.isPresent() && optionalPartitionId.isPresent()) { | ||
topic = optionalTopic.get(); | ||
partitionId = optionalPartitionId.get(); | ||
return true; | ||
} | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If FilePatternUtils is implemented as noted above then there should be an instance already created and this method becomes something like
private boolean isFileMatchingPattern(final S3Object s3Object) {
final Optional<Context> ctxt = filePatternUtils.process(s3Object.key());
if (ctxt.isPresent()) {
this.context = ctxt.get();
this.context.setS3Object(s3Object);
return true;
}
return false;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://aiven.atlassian.net/browse/KCON-98 should do this.
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
private boolean isFileAssignedToTask(final S3Object s3Object) { | ||
return distributionStrategy.isPartOfTask(taskId, s3Object.key(), filePattern); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With changes noted above this becomes
private boolean isFileAssignedToTask(final S3Object s3Object) {
return task == distributionStrategy.getTask(this.context);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://aiven.atlassian.net/browse/KCON-98 should do this.
@Claudenw for further distribution strategy refactorings, I have created a ticket https://aiven.atlassian.net/browse/KCON-98. I think for our current MVP version, I do not see the immediate need of it. |
bfc6349
to
d7ca010
Compare
...rc/main/java/io/aiven/kafka/connect/common/source/task/enums/ObjectDistributionStrategy.java
Show resolved
Hide resolved
...connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them.
Ticket is updated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@muralibasani
I don't see these changes noted in KCON-98. Is there another ticket for that or is it part of KCON-98?
@Claudenw https://aiven.atlassian.net/browse/KCON-98 here with links description is updated, so we don't miss anything else. If you think it's not clear, may be you can update on what we need. |
[KCON-63]