Skip to content

Commit

Permalink
add closing of input files readers
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim_konstantinov committed Aug 7, 2024
1 parent d393125 commit 64d3bb2
Showing 1 changed file with 23 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,9 @@ public void close() throws IOException {
}

public void processBlocks() throws IOException {
TransParquetFileReader readerToJoin = inputFilesToJoin.peek();
TransParquetFileReader readerToJoin = null;
IndexCache indexCacheToJoin = null;
int blockIdxToJoin = -1;
int blockIdxToJoin = 0;
List<ColumnDescriptor> outColumns = outSchema.getColumns();

while (!inputFiles.isEmpty()) {
Expand All @@ -315,23 +315,26 @@ public void processBlocks() throws IOException {
Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
blockMetaData.getColumns().stream().collect(Collectors.toMap(x -> x.getPath(), x -> x));

if (readerToJoin != null
&& (blockIdxToJoin == -1
|| ++blockIdxToJoin
== readerToJoin.getFooter().getBlocks().size())) {
blockIdxToJoin = 0;
readerToJoin = inputFilesToJoin.poll();
Set<ColumnPath> columnPathsToJoin = readerToJoin.getFileMetaData().getSchema().getColumns().stream()
.map(x -> ColumnPath.get(x.getPath()))
.collect(Collectors.toSet());
if (indexCacheToJoin != null) {
indexCacheToJoin.clean();
if (!inputFilesToJoin.isEmpty()) {
if (readerToJoin == null
|| ++blockIdxToJoin
== readerToJoin.getFooter().getBlocks().size()) {
if (readerToJoin != null) readerToJoin.close();
blockIdxToJoin = 0;
readerToJoin = inputFilesToJoin.poll();
Set<ColumnPath> columnPathsToJoin =
readerToJoin.getFileMetaData().getSchema().getColumns().stream()
.map(x -> ColumnPath.get(x.getPath()))
.collect(Collectors.toSet());
if (indexCacheToJoin != null) {
indexCacheToJoin.clean();
}
indexCacheToJoin = IndexCache.create(readerToJoin, columnPathsToJoin, indexCacheStrategy, true);
indexCacheToJoin.setBlockMetadata(
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
} else {
blockIdxToJoin++;
}
indexCacheToJoin = IndexCache.create(readerToJoin, columnPathsToJoin, indexCacheStrategy, true);
indexCacheToJoin.setBlockMetadata(
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
} else {
blockIdxToJoin++;
}

for (int outColumnIdx = 0; outColumnIdx < outColumns.size(); outColumnIdx++) {
Expand Down Expand Up @@ -361,7 +364,9 @@ public void processBlocks() throws IOException {

indexCache.clean();
LOG.info("Finish rewriting input file: {}", reader.getFile());
reader.close();
}
if (readerToJoin != null) readerToJoin.close();
}

private void processBlock(
Expand Down

0 comments on commit 64d3bb2

Please sign in to comment.