-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tasks assignment strategy - commons integration - [KCON-63] #384
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See notes elsewhere on partition strategy for background. This class should have a method to extract all the patterns from a string and make them available so that we can call it once to get all the values set early in the process. It should probably have methods to return the values (e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated FilePatternUtils to have those methods reusable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This solution is close but... The FileNameUtils
Context should be a separate class.interface and should have methods There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @muralibasani |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright 2025 Aiven Oy | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.aiven.kafka.connect.common.source.input.utils; | ||
|
||
import java.util.Optional; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
import org.apache.kafka.common.config.ConfigException; | ||
|
||
import org.apache.commons.lang3.StringUtils; | ||
|
||
public final class FilePatternUtils { | ||
|
||
public static final String PATTERN_PARTITION_KEY = "partition"; | ||
public static final String PATTERN_TOPIC_KEY = "topic"; | ||
public static final String START_OFFSET_PATTERN = "{{start_offset}}"; | ||
public static final String TIMESTAMP_PATTERN = "{{timestamp}}"; | ||
public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}"; | ||
public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}"; | ||
|
||
// Use a named group to return the partition in a complex string to always get the correct information for the | ||
// partition number. | ||
public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)"; | ||
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)"; | ||
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; | ||
|
||
private FilePatternUtils() { | ||
// hidden | ||
} | ||
public static Pattern configurePattern(final String expectedSourceNameFormat) { | ||
Claudenw marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { | ||
throw new ConfigException(String.format( | ||
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", | ||
expectedSourceNameFormat)); | ||
} | ||
// Build REGEX Matcher | ||
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN); | ||
regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN); | ||
regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN); | ||
regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN); | ||
try { | ||
return Pattern.compile(regexString); | ||
} catch (IllegalArgumentException iae) { | ||
throw new ConfigException( | ||
String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString), | ||
iae); | ||
} | ||
} | ||
|
||
public static Optional<String> getTopic(final Pattern filePattern, final String sourceName) { | ||
return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY)); | ||
} | ||
|
||
public static Optional<Integer> getPartitionId(final Pattern filePattern, final String sourceName) { | ||
return matchPattern(filePattern, sourceName).flatMap(matcher -> { | ||
try { | ||
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY))); | ||
} catch (NumberFormatException e) { | ||
return Optional.empty(); | ||
} | ||
}); | ||
} | ||
|
||
private static Optional<Matcher> matchPattern(final Pattern filePattern, final String sourceName) { | ||
if (filePattern == null || sourceName == null) { | ||
throw new IllegalArgumentException("filePattern and sourceName must not be null"); | ||
} | ||
|
||
final Matcher matcher = filePattern.matcher(sourceName); | ||
return matcher.find() ? Optional.of(matcher) : Optional.empty(); | ||
} | ||
|
||
} |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that DistributionStrategy should be an abstract class that has one instance variable
If we don't put the current task into the class then we can share the class across all instances of task. Classes like PartitionInFilenameDistribution should have the pattern specified in the constructor. In the end by not binding the instance to a specific task we will end up with a more flexible implementation that is not tightly coupled with a single task. It might make sens to pass a It may also make sens for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I shall refactor these in next pr. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected. Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright 2024 Aiven Oy | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.aiven.kafka.connect.common.source.task; | ||
|
||
import java.util.regex.Pattern; | ||
|
||
/** | ||
* An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files) | ||
* into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers. | ||
* <p> | ||
* The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the | ||
* overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together | ||
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer | ||
* workers than tasks, and they will be assigned the remaining tasks as work completes. | ||
*/ | ||
public interface DistributionStrategy { | ||
/** | ||
* Check if the object should be processed by the task with the given {@code taskId}. Any single object should be | ||
* assigned deterministically to a single taskId. | ||
* | ||
* @param taskId | ||
* a task ID, usually for the currently running task | ||
* @param valueToBeEvaluated | ||
* The value to be evaluated to determine if it should be processed by the task. | ||
* @return true if the task should process the object, false if it should not. | ||
*/ | ||
boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method should just be In fact the interface should be an abstract class and it should handle tracking the number of tasks. Implementations should take the All the classes become smaller and easier to maintain. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Claudenw I see this as an improvement and we don't have to make all the refactorings for the mvp version. |
||
|
||
/** | ||
* When a connector receives a reconfigure event this method should be called to ensure that the distribution | ||
* strategy is updated correctly. | ||
* | ||
* @param maxTasks | ||
* The maximum number of tasks created for the Connector | ||
*/ | ||
void configureDistributionStrategy(int maxTasks); | ||
} |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If DistributionStrategy is an abstract class than this becomes an implementation that is simply
The class can focus on what it does. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we discussed this not to make it abstract. Even if we need, let's move it to after mvp version There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://aiven.atlassian.net/browse/KCON-98 should do this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also add in getTaskId and getMaxTasksId in CommonConfig.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it might be good in here as the existing sink connectors dont currently require it it can be moved up a level later if required.