Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks assignment strategy - commons integration - [KCON-63] #384

Merged
merged 4 commits into from
Jan 14, 2025

Conversation

muralibasani
Copy link
Contributor

@muralibasani muralibasani commented Jan 6, 2025

[KCON-63]

  • Integrate Task assignment strategies of common module into s3 release feature branch
  • Delete hard coding of file pattern from s3 iterator class
  • Update existing tests
  • Added new integration tests to verify other strategy use cases

@muralibasani muralibasani changed the title Kcon63 tasks strategy Kcon63 tasks strategy - [KCON-63] Jan 7, 2025
@aindriu-aiven aindriu-aiven mentioned this pull request Jan 7, 2025
@muralibasani muralibasani changed the title Kcon63 tasks strategy - [KCON-63] Tasks assignment strategy - commons integration - [KCON-63] Jan 7, 2025
@muralibasani muralibasani marked this pull request as ready for review January 7, 2025 08:35
@muralibasani muralibasani requested review from a team as code owners January 7, 2025 08:35
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategies;

import org.codehaus.plexus.util.StringUtils;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @muralibasani I know you didn't add it here but can you update this import to use
'import org.apache.commons.lang3.StringUtils;' instead of plexus which we are pulling in from the kafka library (and later versions dont include)

"Based on tasks.max config and this strategy, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", "
+ PARTITION_IN_FILENAME + ", " + PARTITION_IN_FILEPATH,
GROUP_OTHER, sourcePollingConfigCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding SourcePollingConfigCounter++ here means we should remove the //NOPMD from line 66 also can you add the comment // Unused Assignment as an explanation for this here please? (removing it from line 67)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was an incorrect usage of counter. Updating it to use offsetStorageGroupCounter.

@@ -92,6 +105,10 @@ public String getErrorsTolerance() {
return cfg.getString(ERRORS_TOLERANCE);
}

public String getObjectDistributionStrategy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public String getObjectDistributionStrategy() {
public ObjectDistributionStrategies getObjectDistributionStrategy() {
return ObjectDistributionStrategies.forName(sourceConfigFragment.getObjectDistributionStrategy());

The same is being done for ErrorsTolerance in another PR so we should keep this consistent.

Also I am not sure should it be called ObjectDistributionStrategies or ObjectDistributionStrategy as the enum will return one strategy ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought so, but ObjectDistributionStrategy already exists as an interface in commons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the interface to be "DistributionStrategy" as each of the strategies are implementations of a distribution strategy. Then you can use ObjectDistributionStrategy as the enum name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this to return ObjectDistributionStrategy instead of String?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muralibasani this one is still outstanding just to update the returning value to be the object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed it, updated.


@Override
public Pattern getFilePattern() {
return filePattern;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the HashObjectDistributionStrategy does not use the filePattern I think getFilePattern here should throw a NotImplementedExcepetion() as it is unexpected that it ever gets called here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for object hash strategy, pattern is used in iterator to extract topic and partition

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extraction of topic and partition should not depend on the distribution strategy going forward.

They are separate concerns and should be implemented as such.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Pattern configuring is moved to source task.


private void configureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) {
this.maxTasks = maxTasks;
this.filePattern = configurePattern(expectedSourceNameFormat);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filePattern should not need to be set as it is unused here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pattern is required in this hash object strategy too, pls check source iterator class.

* Based on the format of the file name or prefix, Pattern is created for each of the strategies.
*/
default Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for this configure Pattern it is really strategy implementation and shouldn't be added here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern is must for all the strategies on source connectors. Based on this,

  • extract topic and partition in source iterator
  • task assignments
    are done.
    If pattern is not done here, we would have to duplicate the whole piece of code again to create pattern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since some task distributions require topic and partition, topic and partition extraction should be done first and should be made available to the task distribution. This way, if future S3 implementations use a different strategy for topic and/or partition identification these strategies will work fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, moved.

private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class);

private String prefix;
private String s3Prefix;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is commons code so we shouldn't have any reference specifically to S3 here.

@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3Object> predicate) {
filterPredicate = predicate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be just passing in the Strategy as a predicate here, and if it matches that predicate then, in the future as we add features, predicates can be chained making it really easy to extend this quickly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DistributionStrategy has a getTaskFor(String) method the predicate can be constructed as
s3Object -> task == distributionStrategy.getTaskFor(s3Object.key())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want a new method in AWSV2SourceClient called addPredicate(Predicate<S3Object> newPredicate) that will do

this.filterPredicate = this.filterPredicate.and(newPredicate)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this still needs to be updated as well, so we can link the predicates here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed failed object keys from the predicate list, I do not see any valid processing for it, as data exceptions are handled.
Code updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should still be a way to link the predicates this allows better control and configuration of the records returned.

@@ -97,13 +97,14 @@ public void start(final Map<String, String> props) {
this.transformer = TransformerFactory.getTransformer(s3SourceConfig);
offsetManager = new OffsetManager(context, s3SourceConfig);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
awsv2SourceClient.initializeObjectDistributionStrategy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should initialize the ObjctDistributionStrategy here in S3SourceTask and add it as a predicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially started with it, but stuck with some issue and couldn't back to this. thx

Copy link
Contributor

@Claudenw Claudenw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks like a good start. Getting it settled into the streaming strategy will take a bit of work.

@@ -92,6 +105,10 @@ public String getErrorsTolerance() {
return cfg.getString(ERRORS_TOLERANCE);
}

public String getObjectDistributionStrategy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the interface to be "DistributionStrategy" as each of the strategies are implementations of a distribution strategy. Then you can use ObjectDistributionStrategy as the enum name.


@Override
public Pattern getFilePattern() {
return filePattern;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extraction of topic and partition should not depend on the distribution strategy going forward.

They are separate concerns and should be implemented as such.

String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";
String START_OFFSET_PATTERN = "{{start_offset}}";
String TIMESTAMP_PATTERN = "{{timestamp}}";
String DEFAULT_PREFIX_FILE_PATH_PATTERN = "topics/{{topic}}/partition={{partition}}/";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The files based distribution strategies will need to use these as will any file base topic and partition based extraction. So how about an interface or static class that contains the file based patterns. Call it something like FileExctractionPatterns

* Based on the format of the file name or prefix, Pattern is created for each of the strategies.
*/
default Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since some task distributions require topic and partition, topic and partition extraction should be done first and should be made available to the task distribution. This way, if future S3 implementations use a different strategy for topic and/or partition identification these strategies will work fine.

/**
* @param s3SourceConfig
* configuration for Source connector
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) {
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys,
final DistributionStrategy distributionStrategy, final int taskId, final Pattern filePattern) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the AWS client should be agnostic to how the task assignment works its only real focus should be on talking to AWS, so we should not set the filePattern/taskId/distributionStrategy here.

We should call setFilterPredicate either from the S3SourceTask or the SourceRecordIterator.

final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks);
return taskAssignment == taskId;
public void setFilterPredicate(final Predicate<S3Object> basePredicate) {
this.filterPredicate = basePredicate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do need this update to look something like the below and this way we just and additional predicates as we need them.

Suggested change
this.filterPredicate = basePredicate
public void setFilterPredicate(final Predicate<S3Object> predicate) {
this.filterPredicate = this.filterPredicate.and(predicate);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't account for or predicates but we dont have a use case for that yet so I think it should be ok for the moment.

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return ObjectDistributionStrategy.forName(sourceConfigFragment.getObjectDistributionStrategy());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add in getTaskId and getMaxTasksId in CommonConfig.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it might be good in here as the existing sink connectors dont currently require it it can be moved up a level later if required.

this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks;
DistributionStrategy distributionStrategy;

switch (objectDistributionStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we actually need two separate regex's here.
One for the SourceRecordIterator and one for the Distribution Strategy.

The reason being that the fileNameFragment set in the Partition in filepath for example has an any filename pattern at the end, but users will still want to be able to set the filename template separately to only include certain files in for example they may want to put *.png and only send images or they want files with a certain pattern.

The other thing I can't 100% tell here is the difference between
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().originalTemplate()
and
s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()

is one of them the default pattern we had previously? Ignore if not but if it is we should always look to get the custom one that has been configured and fall back on the default if not configured (also throw an error if its missing partition etc when we configure PARTITION_IN_FILENAME)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no difference between original template and tostr. Just checked. Added integration tests with default value and non default.

For prefix, introduced a new config for pattern.

This should allow users to provide any patterns on file names or prefixes as long as topic, partition are available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for checking that for me

@@ -112,6 +115,16 @@ public void ensureValid(final String name, final Object value) {
// UnusedAssignment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: to remove those now we have the one below.

if (currentObjectKey != null) {
if (validateTaskDistributionStrategy(currentObjectKey)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with the new changes this might not be needed anymore as the predicate checks inside of the AWSV2Client as it builds the stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your earlier point was correct to keep task distribution in source task/iterator. as it is not relevant in Aws client. Keeping as is.
We shall also delete the failed objects from aws client sometime, and keeping only listing objects in aws client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes agree as long as the predicate is created outside of the client and then we pass it in with the set predicate we should be good.

*/

package io.aiven.kafka.connect.common.source.input.utils;
public class FileExtractionPatterns {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a final class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted this class

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that DistributionStrategy should be an abstract class that has one instance variable maxTasks
It should have 3 methods:

  • getMaxTasks()
  • getTaskFor(String)
  • configure(int maxTasks)

If we don't put the current task into the class then we can share the class across all instances of task.
We can construct the DistributionStrategy instance in the AivenKafkaCOnnectS3CSourceConnector.

Classes like PartitionInFilenameDistribution should have the pattern specified in the constructor.

In the end by not binding the instance to a specific task we will end up with a more flexible implementation that is not tightly coupled with a single task.

It might make sens to pass a CommonConfig in the DistributionStrategy constructor so that any implementation can have access to the known set of configuration properties.

It may also make sens for configure to accept the CommonConfig rather than the int maxTasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shall refactor these in next pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.

Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.

@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3Object> predicate) {
filterPredicate = predicate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DistributionStrategy has a getTaskFor(String) method the predicate can be constructed as
s3Object -> task == distributionStrategy.getTaskFor(s3Object.key())

@@ -128,19 +151,25 @@ public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3Object> predicate) {
filterPredicate = predicate;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want a new method in AWSV2SourceClient called addPredicate(Predicate<S3Object> newPredicate) that will do

this.filterPredicate = this.filterPredicate.and(newPredicate)

Comment on lines 51 to 61
public void reconfigureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}

public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}

private void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 methods do exactly the same thing. This is an indication that there should be one method to configure the distribution strategy after construction and that it should take a maxTasks argument. Change reconfigureDistributionStrategy to configureDistributionStrategy in the base class. and just call that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.

Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See notes elsewhere on partition strategy for background.

This class should have a method to extract all the patterns from a string and make them available so that we can call it once to get all the values set early in the process.

It should probably have methods to return the values (e.g. getTopic()) that return Optional types. returning Optional.empty() when the type name is not found in the pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated FilePatternUtils to have those methods reusable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution is close but... The FileNameUtils

  • Should have a constructor that takes the pattern and basically does what the current configurePattern() does.
  • Should have a method that takes a fileName and produces an object (Call it a context) Optional<Context> process(String fileName) This method will return a context if the fileName matches, or an empty optional if it does not.

Context should be a separate class.interface and should have methods getTopic(), getPartition() and getTimestamp(). These should be populated during the FileNameUtils.process() call noted above. Context should be a simple bean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muralibasani
I don't see these changes noted in KCON-98. Is there another ticket for that or is it part of KCON-98?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution is close but... The FileNameUtils

  • Should have a constructor that takes the pattern and basically does what the current configurePattern() does.
  • Should have a method that takes a fileName and produces an object (Call it a context) Optional<Context> process(String fileName) This method will return a context if the fileName matches, or an empty optional if it does not.

Context should be a separate class.interface and should have methods getTopic(), getPartition() and getTimestamp(). These should be populated during the FileNameUtils.process() call noted above. Context should be a simple bean.

@@ -37,18 +38,16 @@ public interface ObjectDistributionStrategy {
* The value to be evaluated to determine if it should be processed by the task.
* @return true if the task should process the object, false if it should not.
*/
boolean isPartOfTask(int taskId, String valueToBeEvaluated);
boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should just be int getTaskFor(Context) where Context is the class defined above for FileNameProcessing. This means that Distribution strategy is adding the requirement for Context to contain the file name.

In fact the interface should be an abstract class and it should handle tracking the number of tasks. Implementations should take the Context and calculate a long value as an implementation of an abstract method. Then getTaskFor(Context) can call that method and return that value % numberOfTasks.

All the classes become smaller and easier to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Claudenw I see this as an improvement and we don't have to make all the refactorings for the mvp version.

@@ -78,14 +77,12 @@ default boolean taskMatchesPartition(final int taskId, final int partitionId) {
* @return true if the task supplied should handle the supplied partition
*/
default boolean taskMatchesModOfPartitionAndMaxTask(final int taskId, final int maxTasks, final int partitionId) {

return taskMatchesPartition(taskId, partitionId % maxTasks);
}

default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, final int partitionId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method should be deleted.

*/
void reconfigureDistributionStrategy(int maxTasks, String expectedFormat);
void reconfigureDistributionStrategy(int maxTasks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should just be called configure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DistributionStrategy is an abstract class than this becomes an implementation that is simply

long generateValue(Context) { return context.getFileName().hashCode() };

The class can focus on what it does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we discussed this not to make it abstract. Even if we need, let's move it to after mvp version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them.

Comment on lines 99 to 105
private boolean isFileMatchingPattern(final S3Object s3Object) {
final Optional<String> optionalTopic = FilePatternUtils.getTopic(filePattern, s3Object.key());
final Optional<Integer> optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, s3Object.key());

if (optionalTopic.isPresent() && optionalPartitionId.isPresent()) {
topic = optionalTopic.get();
partitionId = optionalPartitionId.get();
return true;
}
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If FilePatternUtils is implemented as noted above then there should be an instance already created and this method becomes something like

private boolean isFileMatchingPattern(final S3Object s3Object) {
        final Optional<Context> ctxt = filePatternUtils.process(s3Object.key());
        if (ctxt.isPresent()) {
             this.context = ctxt.get();
             this.context.setS3Object(s3Object);
             return true;
        }
        return false;
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 111 to 109
private boolean isFileAssignedToTask(final S3Object s3Object) {
return distributionStrategy.isPartOfTask(taskId, s3Object.key(), filePattern);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With changes noted above this becomes

    private boolean isFileAssignedToTask(final S3Object s3Object) {
        return  task == distributionStrategy.getTask(this.context);
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muralibasani
Copy link
Contributor Author

@Claudenw for further distribution strategy refactorings, I have created a ticket https://aiven.atlassian.net/browse/KCON-98.

I think for our current MVP version, I do not see the immediate need of it.

@muralibasani muralibasani force-pushed the kcon63-tasks-strategy branch from bfc6349 to d7ca010 Compare January 13, 2025 13:49
Copy link
Contributor

@aindriu-aiven aindriu-aiven left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM Thank you

Copy link
Contributor

@Claudenw Claudenw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them.

@muralibasani
Copy link
Contributor Author

Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them.

Ticket is updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muralibasani
I don't see these changes noted in KCON-98. Is there another ticket for that or is it part of KCON-98?

@muralibasani
Copy link
Contributor Author

muralibasani commented Jan 14, 2025

@Claudenw https://aiven.atlassian.net/browse/KCON-98 here with links description is updated, so we don't miss anything else. If you think it's not clear, may be you can update on what we need.

@Claudenw Claudenw merged commit 6b967d3 into s3-source-release Jan 14, 2025
8 checks passed
@muralibasani muralibasani deleted the kcon63-tasks-strategy branch January 16, 2025 09:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants