From 22ee675c9be3d34b4056e363b792e2a4882232ef Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Fri, 3 Jan 2025 17:50:32 +0800 Subject: [PATCH] convert to fluss genericrow --- .../flink/source/lookup/FlinkAsyncLookupFunction.java | 6 ++---- .../flink/source/lookup/FlinkLookupFunction.java | 6 ++---- .../flink/utils/FlinkRowToFlussRowConverter.java | 11 +++++++++++ .../flink/utils/FlinkRowToFlussRowConverterTest.java | 8 ++++++++ 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java index ae53093d7..1a3f413f6 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -89,12 +89,10 @@ public void open(FunctionContext context) { LOG.info("start open ..."); connection = ConnectionFactory.createConnection(flussConfig); table = connection.getTable(tablePath); - // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes), - table.getDescriptor().getKvFormat()); + FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes)); final RowType outputRowType; if (projection == null) { @@ -116,7 +114,7 @@ public void open(FunctionContext context) { public CompletableFuture> asyncLookup(RowData keyRow) { RowData normalizedKeyRow = lookupNormalizer.normalizeLookupKey(keyRow); RemainingFilter remainingFilter = lookupNormalizer.createRemainingFilter(keyRow); - InternalRow flussKeyRow = flinkRowToFlussRowConverter.toInternalRow(normalizedKeyRow); + InternalRow flussKeyRow = flinkRowToFlussRowConverter.toGenericRow(normalizedKeyRow); CompletableFuture> future = new CompletableFuture<>(); // fetch result fetchResult(future, 0, flussKeyRow, remainingFilter); diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java index 20788e53d..005a87a33 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java @@ -84,12 +84,10 @@ public void open(FunctionContext context) { LOG.info("start open ..."); connection = ConnectionFactory.createConnection(flussConfig); table = connection.getTable(tablePath); - // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization int[] lookupKeyIndexes = lookupNormalizer.getLookupKeyIndexes(); flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes), - table.getDescriptor().getKvFormat()); + FlinkUtils.projectRowType(flinkRowType, lookupKeyIndexes)); final RowType outputRowType; if (projection == null) { @@ -121,7 +119,7 @@ public Collection lookup(RowData keyRow) { // first is converting from flink row to fluss row, // second is extracting key from the fluss row when calling method table.get(flussKeyRow) // todo: may be reduce to one data conversion when it's a bottle neck - InternalRow flussKeyRow = flinkRowToFlussRowConverter.toInternalRow(normalizedKeyRow); + InternalRow flussKeyRow = flinkRowToFlussRowConverter.toGenericRow(normalizedKeyRow); for (int retry = 0; retry <= maxRetryTimes; retry++) { try { if (flussLookupType == LookupType.LOOKUP) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkRowToFlussRowConverter.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkRowToFlussRowConverter.java index aa755a396..5622e85a3 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkRowToFlussRowConverter.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkRowToFlussRowConverter.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.metadata.KvFormat; import com.alibaba.fluss.row.BinaryString; import com.alibaba.fluss.row.Decimal; +import com.alibaba.fluss.row.GenericRow; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.TimestampLtz; import com.alibaba.fluss.row.TimestampNtz; @@ -89,6 +90,16 @@ public InternalRow toInternalRow(RowData rowData) { return rowEncoder.finishRow(); } + public InternalRow toGenericRow(RowData rowData) { + GenericRow genericRow = new GenericRow(fieldLength); + for (int i = 0; i < fieldLength; i++) { + genericRow.setField( + i, + toFlussFieldConverters[i].serialize(fieldGetters[i].getFieldOrNull(rowData))); + } + return genericRow; + } + private FlussSerializationConverter createNullableInternalConverter(LogicalType flinkField) { return wrapIntoNullableInternalConverter(createInternalConverter(flinkField)); } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkRowToFlussRowConverterTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkRowToFlussRowConverterTest.java index 77e5842cd..271cb0055 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkRowToFlussRowConverterTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/utils/FlinkRowToFlussRowConverterTest.java @@ -61,6 +61,14 @@ void testConverter() throws Exception { assertThat(internalRow.getFieldCount()).isEqualTo(19); assertAllTypeEquals(internalRow); } + + // test GenericRow row converter + try (FlinkRowToFlussRowConverter converter = + FlinkRowToFlussRowConverter.create(toFlinkRowType(flussRowType))) { + InternalRow internalRow = converter.toGenericRow(genRowDataForAllType()); + assertThat(internalRow.getFieldCount()).isEqualTo(19); + assertAllTypeEquals(internalRow); + } } private static RowData genRowDataForAllType() {