Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks assignment strategy - commons integration - [KCON-63] #384

Merged
merged 4 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ public final class FileNameFragment extends ConfigFragment {
static final String FILE_MAX_RECORDS = "file.max.records";
static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}";

public static final String FILE_PATH_PREFIX_TEMPLATE_CONFIG = "file.prefix.template";
static final String DEFAULT_FILE_PATH_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/";

public FileNameFragment(final AbstractConfig cfg) {
super(cfg);
}
Expand Down Expand Up @@ -109,9 +112,18 @@ public void ensureValid(final String name, final Object value) {
configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(),
new TimestampSourceValidator(), ConfigDef.Importance.LOW,
"Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, fileGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PATH_PREFIX_TEMPLATE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"The template for file prefix on S3. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic` and `partition` "
+ "and are mandatory to have these in the directory structure."
+ "Example prefix : topics/{{topic}}/partition/{{partition}}/",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG);

return configDef;
}

Expand Down Expand Up @@ -185,4 +197,8 @@ public int getMaxRecordsPerFile() {
return cfg.getInt(FILE_MAX_RECORDS);
}

public String getFilePathPrefixTemplateConfig() {
return cfg.getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;

public class SourceCommonConfig extends CommonConfig {

Expand Down Expand Up @@ -69,6 +70,10 @@ public ErrorsTolerance getErrorsTolerance() {
return sourceConfigFragment.getErrorsTolerance();
}

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return sourceConfigFragment.getObjectDistributionStrategy();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add in getTaskId and getMaxTasksId in CommonConfig.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it might be good in here as the existing sink connectors dont currently require it it can be moved up a level later if required.

public int getMaxPollRecords() {
return sourceConfigFragment.getMaxPollRecords();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package io.aiven.kafka.connect.common.config;

import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.OBJECT_HASH;
import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.PARTITION_IN_FILENAME;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;

import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;

public final class SourceConfigFragment extends ConfigFragment {
private static final String GROUP_OTHER = "OTHER_CFG";
Expand All @@ -32,6 +36,8 @@ public final class SourceConfigFragment extends ConfigFragment {
public static final String TARGET_TOPICS = "topics";
public static final String ERRORS_TOLERANCE = "errors.tolerance";

public static final String OBJECT_DISTRIBUTION_STRATEGY = "object.distribution.strategy";

/**
* Construct the ConfigFragment..
*
Expand Down Expand Up @@ -67,7 +73,14 @@ public static ConfigDef update(final ConfigDef configDef) {
ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS);
configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC,
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); // NOPMD
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS);
configDef.define(OBJECT_DISTRIBUTION_STRATEGY, ConfigDef.Type.STRING, OBJECT_HASH.name(),
aindriu-aiven marked this conversation as resolved.
Show resolved Hide resolved
new ObjectDistributionStrategyValidator(), ConfigDef.Importance.MEDIUM,
"Based on tasks.max config and this strategy, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", "
+ PARTITION_IN_FILENAME,
GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD
// UnusedAssignment

return configDef;
}
Expand All @@ -92,6 +105,10 @@ public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE));
}

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return ObjectDistributionStrategy.forName(cfg.getString(OBJECT_DISTRIBUTION_STRATEGY));
}

private static class ErrorsToleranceValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
Expand All @@ -103,4 +120,15 @@ public void ensureValid(final String name, final Object value) {
}
}

private static class ObjectDistributionStrategyValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
final String objectDistributionStrategy = (String) value;
if (StringUtils.isNotBlank(objectDistributionStrategy)) {
// This will throw an Exception if not a valid value.
ObjectDistributionStrategy.forName(objectDistributionStrategy);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.function.IOSupplier;
import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See notes elsewhere on partition strategy for background.

This class should have a method to extract all the patterns from a string and make them available so that we can call it once to get all the values set early in the process.

It should probably have methods to return the values (e.g. getTopic()) that return Optional types. returning Optional.empty() when the type name is not found in the pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated FilePatternUtils to have those methods reusable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution is close but... The FileNameUtils

  • Should have a constructor that takes the pattern and basically does what the current configurePattern() does.
  • Should have a method that takes a fileName and produces an object (Call it a context) Optional<Context> process(String fileName) This method will return a context if the fileName matches, or an empty optional if it does not.

Context should be a separate class.interface and should have methods getTopic(), getPartition() and getTimestamp(). These should be populated during the FileNameUtils.process() call noted above. Context should be a simple bean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@muralibasani
I don't see these changes noted in KCON-98. Is there another ticket for that or is it part of KCON-98?

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.input.utils;

import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;

import org.apache.commons.lang3.StringUtils;

public final class FilePatternUtils {

public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String START_OFFSET_PATTERN = "{{start_offset}}";
public static final String TIMESTAMP_PATTERN = "{{timestamp}}";
public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";
public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}";

// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.
public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";

private FilePatternUtils() {
// hidden
}
public static Pattern configurePattern(final String expectedSourceNameFormat) {
Claudenw marked this conversation as resolved.
Show resolved Hide resolved
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
expectedSourceNameFormat));
}
// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
try {
return Pattern.compile(regexString);
} catch (IllegalArgumentException iae) {
throw new ConfigException(
String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString),
iae);
}
}

public static Optional<String> getTopic(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY));
}

public static Optional<Integer> getPartitionId(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).flatMap(matcher -> {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
} catch (NumberFormatException e) {
return Optional.empty();
}
});
}

private static Optional<Matcher> matchPattern(final Pattern filePattern, final String sourceName) {
if (filePattern == null || sourceName == null) {
throw new IllegalArgumentException("filePattern and sourceName must not be null");
}

final Matcher matcher = filePattern.matcher(sourceName);
return matcher.find() ? Optional.of(matcher) : Optional.empty();
}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that DistributionStrategy should be an abstract class that has one instance variable maxTasks
It should have 3 methods:

  • getMaxTasks()
  • getTaskFor(String)
  • configure(int maxTasks)

If we don't put the current task into the class then we can share the class across all instances of task.
We can construct the DistributionStrategy instance in the AivenKafkaCOnnectS3CSourceConnector.

Classes like PartitionInFilenameDistribution should have the pattern specified in the constructor.

In the end by not binding the instance to a specific task we will end up with a more flexible implementation that is not tightly coupled with a single task.

It might make sens to pass a CommonConfig in the DistributionStrategy constructor so that any implementation can have access to the known set of configuration properties.

It may also make sens for configure to accept the CommonConfig rather than the int maxTasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shall refactor these in next pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be done correctly now this is not some minor change to an internal method this is the decision about how DistributionStrategy should be architected.

Your current design has opened the door for DistributionStrategies that require multiple parameters to require those parameters be known up and down the stack.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;

/**
* An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files)
* into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
* <p>
* The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the
* overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
* workers than tasks, and they will be assigned the remaining tasks as work completes.
*/
public interface DistributionStrategy {
/**
* Check if the object should be processed by the task with the given {@code taskId}. Any single object should be
* assigned deterministically to a single taskId.
*
* @param taskId
* a task ID, usually for the currently running task
* @param valueToBeEvaluated
* The value to be evaluated to determine if it should be processed by the task.
* @return true if the task should process the object, false if it should not.
*/
boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should just be int getTaskFor(Context) where Context is the class defined above for FileNameProcessing. This means that Distribution strategy is adding the requirement for Context to contain the file name.

In fact the interface should be an abstract class and it should handle tracking the number of tasks. Implementations should take the Context and calculate a long value as an implementation of an abstract method. Then getTaskFor(Context) can call that method and return that value % numberOfTasks.

All the classes become smaller and easier to maintain.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Claudenw I see this as an improvement and we don't have to make all the refactorings for the mvp version.


/**
* When a connector receives a reconfigure event this method should be called to ensure that the distribution
* strategy is updated correctly.
*
* @param maxTasks
* The maximum number of tasks created for the Connector
*/
void configureDistributionStrategy(int maxTasks);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DistributionStrategy is an abstract class than this becomes an implementation that is simply

long generateValue(Context) { return context.getFileName().hashCode() };

The class can focus on what it does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we discussed this not to make it abstract. Even if we need, let's move it to after mvp version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add items to KCON-98 that identify all the changes that were requested in in PR so that we don't lose them.

Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,27 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link HashObjectDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of
* the object's filename, which is uniformly distributed and deterministic across workers.
* {@link HashDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of the
* object's filename, which is uniformly distributed and deterministic across workers.
* <p>
* This is well-suited to use cases where the order of events between records from objects is not important, especially
* when ingesting files into Kafka that were not previously created by a supported cloud storage Sink.
*/
public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class);
public final class HashDistributionStrategy implements DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class);
private int maxTasks;
HashObjectDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
public HashDistributionStrategy(final int maxTasks) {
configureDistributionStrategy(maxTasks);
}

@Override
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated) {
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated, final Pattern filePattern) {
if (filenameToBeEvaluated == null) {
LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
return false;
Expand All @@ -46,8 +48,8 @@ public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated
}

@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedFormat) {
setMaxTasks(maxTasks);
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}

public void setMaxTasks(final int maxTasks) {
Expand Down
Loading
Loading