Skip to content

Commit

Permalink
Refactor from review
Browse files Browse the repository at this point in the history
  • Loading branch information
muralibasani committed Jan 13, 2025
1 parent fd62149 commit bfc6349
Show file tree
Hide file tree
Showing 16 changed files with 39 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;

public class SourceCommonConfig extends CommonConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package io.aiven.kafka.connect.common.config;

import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy.OBJECT_HASH;
import static io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy.PARTITION_IN_FILENAME;
import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.OBJECT_HASH;
import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.PARTITION_IN_FILENAME;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;

import org.apache.commons.lang3.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,42 +47,5 @@ public interface DistributionStrategy {
* @param maxTasks
* The maximum number of tasks created for the Connector
*/
void reconfigureDistributionStrategy(int maxTasks);

/**
* Check if the task is responsible for this set of files by checking if the given task matches the partition id.
*
* @param taskId
* the current running task
* @param partitionId
* The partitionId recovered from the file path.
* @return true if this task is responsible for this partition. false if it is not responsible for this task.
*/
default boolean taskMatchesPartition(final int taskId, final int partitionId) {
// The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this
// will break.
return taskId == partitionId;
}

/**
* In the event of more partitions existing then tasks configured, the task will be required to take up additional
* tasks that match.
*
* @param taskId
* the current running task.
* @param maxTasks
* The maximum number of configured tasks allowed to run for this connector.
* @param partitionId
* The partitionId recovered from the file path.
* @return true if the task supplied should handle the supplied partition
*/
default boolean taskMatchesModOfPartitionAndMaxTask(final int taskId, final int maxTasks, final int partitionId) {
return taskMatchesPartition(taskId, partitionId % maxTasks);
}

default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, final int partitionId) {
return partitionId < maxTasks
? taskMatchesPartition(taskId, partitionId)
: taskMatchesModOfPartitionAndMaxTask(taskId, maxTasks, partitionId);
}
void configureDistributionStrategy(int maxTasks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,11 @@ public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated
}

@Override
public void reconfigureDistributionStrategy(final int maxTasks) {
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}

public void setMaxTasks(final int maxTasks) {
this.maxTasks = maxTasks;
}

private void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,20 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat
sourceNameToBeEvaluated);

if (optionalPartitionId.isPresent()) {
return toBeProcessedByThisTask(taskId, maxTasks, optionalPartitionId.get());
return optionalPartitionId.get() < maxTasks
? taskMatchesPartition(taskId, optionalPartitionId.get())
: taskMatchesPartition(taskId, optionalPartitionId.get() % maxTasks);
}
LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated);
return false;
}

boolean taskMatchesPartition(final int taskId, final int partitionId) {
// The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this
// will break.
return taskId == partitionId;
}

/**
* When a connector reconfiguration event is received this method should be called to ensure the correct strategy is
* being implemented by the connector.
Expand All @@ -70,7 +78,7 @@ public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluat
* maximum number of configured tasks for this connector
*/
@Override
public void reconfigureDistributionStrategy(final int maxTasks) {
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package io.aiven.kafka.connect.common.config.enums;
package io.aiven.kafka.connect.common.source.task.enums;

import java.util.Arrays;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void hashDistributionExactlyOnceWithReconfigureEvent(final String path) {
Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE);
results.clear();
maxTasks = 5;
taskDistribution.reconfigureDistributionStrategy(maxTasks);
taskDistribution.configureDistributionStrategy(maxTasks);
for (int taskId = 0; taskId < maxTasks; taskId++) {
results.add(taskDistribution.isPartOfTask(taskId, path,
FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void filenameDistributionExactlyOnceDistributionWithTaskReconfiguration(final in
}
assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE);
taskDistribution.reconfigureDistributionStrategy(maxTaskAfterReConfig);
taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig);

results.clear();
for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) {
Expand Down Expand Up @@ -285,7 +285,7 @@ void partitionPathDistributionExactlyOnceDistributionWithTaskReconfiguration(fin
}
assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE);
taskDistribution.reconfigureDistributionStrategy(maxTaskAfterReConfig);
taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig);

results.clear();
for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) {

final OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig);

final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig);

final Iterator<S3SourceRecord> sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient, new HashDistributionStrategy(1),
Expand Down Expand Up @@ -216,7 +216,7 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException {

final OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig);

final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig);

final Iterator<S3SourceRecord> sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager,
TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient,
Expand Down Expand Up @@ -273,7 +273,7 @@ void verifyIteratorRehydration(final TestInfo testInfo) {
assertThat(testBucketAccessor.listObjects()).hasSize(2);

final S3SourceConfig s3SourceConfig = new S3SourceConfig(configData);
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>());
final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig);
final Iterator<S3Object> iter = sourceClient.getS3ObjectIterator(null);

assertThat(iter).hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor;
import io.aiven.kafka.connect.s3.source.testutils.ContentUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,21 @@

package io.aiven.kafka.connect.s3.source;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;

import org.apache.kafka.connect.source.SourceRecord;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.config.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.common.source.AbstractSourceTask;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils;
import io.aiven.kafka.connect.common.source.task.DistributionStrategy;
import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.PartitionDistributionStrategy;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
Expand Down Expand Up @@ -69,8 +67,6 @@ public class S3SourceTask extends AbstractSourceTask {
/** The AWS Source client */

private AWSV2SourceClient awsv2SourceClient;
/** The list of failed object keys */
private final Set<String> failedObjectKeys = new HashSet<>();
/** The offset manager this task uses */
private OffsetManager offsetManager;
private S3SourceConfig s3SourceConfig;
Expand Down Expand Up @@ -139,7 +135,7 @@ protected SourceCommonConfig configure(final Map<String, String> props) {
this.s3SourceConfig = new S3SourceConfig(props);
this.transformer = s3SourceConfig.getTransformer();
offsetManager = new OffsetManager(context, s3SourceConfig);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig);
setS3SourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer,
awsv2SourceClient, initializeObjectDistributionStrategy(), filePattern, taskId));
return s3SourceConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
package io.aiven.kafka.connect.s3.source.utils;

import java.io.InputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand All @@ -47,17 +45,14 @@ public class AWSV2SourceClient {
private final S3Client s3Client;
private final String bucketName;

private Predicate<S3Object> filterPredicate = s3Object -> s3Object.size() > 0;
private final Set<String> failedObjectKeys;
private final Predicate<S3Object> filterPredicate = s3Object -> s3Object.size() > 0;

/**
* @param s3SourceConfig
* configuration for Source connector
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String> failedObjectKeys) {
this(new S3ClientFactory().createAmazonS3Client(s3SourceConfig), s3SourceConfig, failedObjectKeys);
public AWSV2SourceClient(final S3SourceConfig s3SourceConfig) {
this(new S3ClientFactory().createAmazonS3Client(s3SourceConfig), s3SourceConfig);
}

/**
Expand All @@ -67,15 +62,11 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set<String>
* amazonS3Client
* @param s3SourceConfig
* configuration for Source connector
* @param failedObjectKeys
* all objectKeys which have already been tried but have been unable to process.
*/
AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig,
final Set<String> failedObjectKeys) {
AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig) {
this.s3SourceConfig = s3SourceConfig;
this.s3Client = s3Client;
this.bucketName = s3SourceConfig.getAwsS3BucketName();
this.failedObjectKeys = new HashSet<>(failedObjectKeys);
}

/**
Expand All @@ -92,7 +83,6 @@ private Stream<S3Object> getS3ObjectStream(final String startToken) {
.prefix(StringUtils.defaultIfBlank(s3SourceConfig.getAwsS3Prefix(), null))
.startAfter(StringUtils.defaultIfBlank(startToken, null))
.build();
setFilterPredicate(filterPredicate);

return Stream.iterate(s3Client.listObjectsV2(request), Objects::nonNull, response -> {
// This is called every time next() is called on the iterator.
Expand Down Expand Up @@ -138,14 +128,6 @@ public IOSupplier<InputStream> getObject(final String objectKey) {
return s3ObjectResponse::asInputStream;
}

public void addFailedObjectKeys(final String objectKey) {
this.failedObjectKeys.add(objectKey);
}

public void setFilterPredicate(final Predicate<S3Object> basePredicate) {
this.filterPredicate = basePredicate.and(objectSummary -> !failedObjectKeys.contains(objectSummary.key()));
}

public void shutdown() {
s3Client.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecor
if (ErrorsTolerance.NONE.equals(s3SourceConfig.getErrorsTolerance())) {
throw new ConnectException("Data Exception caught during S3 record to source record transformation", e);
} else {
sourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey());
LOGGER.warn(
"Data Exception caught during S3 record to source record transformation {} . errors.tolerance set to 'all', logging warning and continuing to process.",
e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,7 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan
outer = Collections.emptyIterator();
}

public Predicate<S3Object> isValidFileForProcessing() {
return s3Object -> isFileMatchingPattern(s3Object) && isFileAssignedToTask(s3Object);
}

private boolean isFileMatchingPattern(final S3Object s3Object) {
public boolean isFileMatchingPattern(final S3Object s3Object) {
final Optional<String> optionalTopic = FilePatternUtils.getTopic(filePattern, s3Object.key());
final Optional<Integer> optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, s3Object.key());

Expand All @@ -108,7 +104,7 @@ private boolean isFileMatchingPattern(final S3Object s3Object) {
return false;
}

private boolean isFileAssignedToTask(final S3Object s3Object) {
public boolean isFileAssignedToTask(final S3Object s3Object) {
return distributionStrategy.isPartOfTask(taskId, s3Object.key(), filePattern);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void testFetchObjectWithPrefix() {
configMap.put(AWS_S3_PREFIX_CONFIG, "test/");
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig);
requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
final S3Object object1 = createObjectSummary(1, "topics/key1/1/key1.txt");
final S3Object object2 = createObjectSummary(1, "topics/key2/2/key2.txt");
Expand Down Expand Up @@ -147,7 +147,7 @@ void testFetchObjectWithInitialStartAfter() {
final String startAfter = "file-option-1-12000.txt";
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig);
requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class);
final S3Object object1 = createObjectSummary(1, "key1-1-10000");
final S3Object object2 = createObjectSummary(1, "key2-2-20000");
Expand Down Expand Up @@ -202,7 +202,7 @@ private void initializeWithTaskConfigs() {
final Map<String, String> configMap = getConfigMap();
final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap);
s3Client = mock(S3Client.class);
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet());
awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig);
}

private ListObjectsV2Response getListObjectsV2Response() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final in
when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais);
final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer,
mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId);
final Predicate<S3Object> s3ObjectPredicate = iterator.isValidFileForProcessing();
final Predicate<S3Object> s3ObjectPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object)
&& iterator.isFileAssignedToTask(s3Object);
// Assert
assertThat(s3ObjectPredicate).accepts(obj);
}
Expand All @@ -209,7 +210,8 @@ void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final
when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais);
final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer,
mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId);
final Predicate<S3Object> stringPredicate = iterator.isValidFileForProcessing();
final Predicate<S3Object> stringPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object)
&& iterator.isFileAssignedToTask(s3Object);
// Assert
assertThat(stringPredicate.test(obj)).as("Predicate should accept the objectKey: " + objectKey).isFalse();
}
Expand All @@ -221,5 +223,4 @@ private static void mockPatternMatcher(final Pattern filePattern) {
when(fileMatcher.group(PATTERN_TOPIC_KEY)).thenReturn("testtopic");
when(fileMatcher.group(PATTERN_PARTITION_KEY)).thenReturn("0");
}

}

0 comments on commit bfc6349

Please sign in to comment.