-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Reduce S3 memory usage by clearing records already sent to S3
Signed-off-by: Aindriu Lavelle <[email protected]>
- Loading branch information
1 parent
f719afd
commit 77893fc
Showing
8 changed files
with
218 additions
and
71 deletions.
There are no files selected for viewing
77 changes: 77 additions & 0 deletions
77
commons/src/main/java/io/aiven/kafka/connect/common/grouper/GroupedSinkRecord.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
/* | ||
* Copyright 2024 Aiven Oy | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.aiven.kafka.connect.common.grouper; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
import org.apache.kafka.connect.sink.SinkRecord; | ||
|
||
public class GroupedSinkRecord { | ||
|
||
private int numberOfRecords; | ||
final private List<SinkRecord> sinkRecords; | ||
final private String filename; | ||
final private long recordCreationDate = System.currentTimeMillis(); | ||
|
||
public GroupedSinkRecord(final String filename) { | ||
this.filename = filename; | ||
sinkRecords = new ArrayList<>(); | ||
numberOfRecords = 0; | ||
} | ||
|
||
public GroupedSinkRecord(final String filename, final List<SinkRecord> sinkRecords) { | ||
this.filename = filename; | ||
this.sinkRecords = new ArrayList<>(sinkRecords); | ||
numberOfRecords = sinkRecords.size(); | ||
} | ||
public GroupedSinkRecord(final String filename, final SinkRecord sinkRecord) { | ||
this.filename = filename; | ||
this.sinkRecords = new ArrayList<>(); | ||
this.sinkRecords.add(sinkRecord); | ||
numberOfRecords = 1; | ||
} | ||
|
||
public void addSinkRecord(final SinkRecord sinkRecord) { | ||
this.sinkRecords.add(sinkRecord); | ||
this.numberOfRecords++; | ||
} | ||
|
||
public List<SinkRecord> getSinkRecords() { | ||
// Ensure access to the Sink Records can only be changed through the apis and not accidentally by another | ||
// process. | ||
return Collections.unmodifiableList(sinkRecords); | ||
} | ||
|
||
public void removeSinkRecords(final List<SinkRecord> sinkRecords) { | ||
this.sinkRecords.removeAll(sinkRecords); | ||
} | ||
|
||
public int getNumberOfRecords() { | ||
return numberOfRecords; | ||
} | ||
|
||
public String getFilename() { | ||
return filename; | ||
} | ||
|
||
public long getRecordCreationDate() { | ||
return recordCreationDate; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
44 changes: 44 additions & 0 deletions
44
commons/src/main/java/io/aiven/kafka/connect/common/grouper/PartitionOffset.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
/* | ||
* Copyright 2024 Aiven Oy | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.aiven.kafka.connect.common.grouper; | ||
|
||
public class PartitionOffset { | ||
|
||
private Long offset; | ||
private int partition; | ||
|
||
public PartitionOffset(final int partition, final Long offset) { | ||
this.offset = offset; | ||
this.partition = partition; | ||
} | ||
|
||
public int getPartition() { | ||
return partition; | ||
} | ||
|
||
public void setPartition(final int partition) { | ||
this.partition = partition; | ||
} | ||
|
||
public Long getOffset() { | ||
return offset; | ||
} | ||
|
||
public void setOffset(final Long offset) { | ||
this.offset = offset; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.