From 31ffe6ba09fac3717a71ba221f191b8d92ca8cca Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Tue, 10 Dec 2024 17:47:46 +0800 Subject: [PATCH] [test] Fix some unstable test cases while running CI --- .../admin/ClientToServerITCaseBase.java | 4 +- .../fluss/client/admin/FlussAdminITCase.java | 100 ++++++++++++------ .../scanner/log/FlussLogScannerITCase.java | 27 +++-- .../client/scanner/log/LogFetcherTest.java | 32 +++--- .../scanner/log/RemoteLogScannerITCase.java | 10 +- .../snapshot/SnapshotScannerITCase.java | 1 - .../table/FlussFailServerTableITCase.java | 65 +++++++++--- .../table/FlussPartitionedTableITCase.java | 25 ++--- .../fluss/client/table/FlussTableITCase.java | 87 ++++++++------- .../flink/catalog/FlinkCatalogITCase.java | 46 +++++--- .../flink/catalog/FlinkCatalogTest.java | 50 ++++----- .../flink/metrics/FlinkMetricsITCase.java | 7 +- .../flink/sink/FlinkTableSinkITCase.java | 55 ++++++---- .../source/FlinkTableSourceBatchITCase.java | 25 +++-- .../flink/source/FlinkTableSourceITCase.java | 49 ++++++--- .../enumerator/FlinkSourceEnumeratorTest.java | 53 ++++++---- .../source/reader/FlinkSourceReaderTest.java | 2 +- .../reader/FlinkSourceSplitReaderTest.java | 2 +- .../flink/source/testutils/FlinkTestBase.java | 3 - .../flink/FlinkUnionReadLogTableITCase.java | 3 +- .../FlinkUnionReadPrimaryKeyTableITCase.java | 3 +- .../paimon/flink/LakeTableEnumeratorTest.java | 4 +- .../paimon/sink/PaimonSyncITCase.java | 5 +- .../rpc/netty/client/ServerConnection.java | 5 +- .../rpc/protocol/RequestChannelTest.java | 2 + .../com/alibaba/fluss/server/kv/KvTablet.java | 3 + .../coordinator/AutoPartitionManagerTest.java | 2 +- .../coordinator/NotifyLeaderAndIsrITCase.java | 7 +- .../coordinator/TableManagerITCase.java | 44 ++++---- .../server/log/remote/RemoteLogITCase.java | 19 ++-- .../fluss/server/replica/AdjustIsrITCase.java | 14 ++- .../replica/KvReplicaRestoreITCase.java | 2 +- .../server/replica/KvSnapshotITCase.java | 3 +- .../replica/fetcher/ReplicaFetcherITCase.java | 29 +++-- .../server/tablet/TabletServiceITCase.java | 36 +++---- .../testutils/FlussClusterExtension.java | 25 ++++- 36 files changed, 517 insertions(+), 332 deletions(-) diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java index ced9b604a..a291ffa9d 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/ClientToServerITCaseBase.java @@ -96,7 +96,7 @@ protected long createTable( return admin.getTable(tablePath).get().getTableId(); } - private static Configuration initConfig() { + public static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); // set a shorter interval for testing purpose @@ -117,7 +117,7 @@ protected static LogScanner createLogScanner(Table table, int[] projectFields) { return table.getLogScanner(new LogScan().withProjectedFields(projectFields)); } - protected static void subscribeFromBeginning(LogScanner logScanner, Table table) { + public static void subscribeFromBeginning(LogScanner logScanner, Table table) { int bucketCount = getBucketCount(table); for (int i = 0; i < bucketCount; i++) { logScanner.subscribeFromBeginning(i); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java index 325d37afe..1c47a0397 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java @@ -65,7 +65,6 @@ /** Test for {@link FlussAdmin}. */ class FlussAdminITCase extends ClientToServerITCaseBase { - protected static final TablePath DEFAULT_TABLE_PATH = TablePath.of("test_db", "person"); protected static final Schema DEFAULT_SCHEMA = Schema.newBuilder() .primaryKey("id") @@ -80,7 +79,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase { TableDescriptor.builder() .schema(DEFAULT_SCHEMA) .comment("test table") - .distributedBy(10, "id") + .distributedBy(3, "id") .property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1)) .customProperty("connector", "fluss") .build(); @@ -88,18 +87,19 @@ class FlussAdminITCase extends ClientToServerITCaseBase { @BeforeEach protected void setup() throws Exception { super.setup(); - // create a default table in fluss. - createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false); } @Test void testMultiClient() throws Exception { + TablePath tablePath = TablePath.of("test_db", "multi_client_test_t1"); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + Admin admin1 = conn.getAdmin(); Admin admin2 = conn.getAdmin(); assertThat(admin1).isNotSameAs(admin2); - TableInfo t1 = admin1.getTable(DEFAULT_TABLE_PATH).get(); - TableInfo t2 = admin2.getTable(DEFAULT_TABLE_PATH).get(); + TableInfo t1 = admin1.getTable(tablePath).get(); + TableInfo t2 = admin2.getTable(tablePath).get(); assertThat(t1).isEqualTo(t2); admin1.close(); @@ -108,14 +108,17 @@ void testMultiClient() throws Exception { @Test void testGetTableAndSchema() throws Exception { - SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get(); + TablePath tablePath = TablePath.of("test_db", "test_get_table_and_schema"); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + + SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get(); assertThat(schemaInfo.getSchema()).isEqualTo(DEFAULT_SCHEMA); assertThat(schemaInfo.getSchemaId()).isEqualTo(1); - SchemaInfo schemaInfo2 = admin.getTableSchema(DEFAULT_TABLE_PATH, 1).get(); + SchemaInfo schemaInfo2 = admin.getTableSchema(tablePath, 1).get(); assertThat(schemaInfo2).isEqualTo(schemaInfo); // get default table. - TableInfo tableInfo = admin.getTable(DEFAULT_TABLE_PATH).get(); + TableInfo tableInfo = admin.getTable(tablePath).get(); assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId()); assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR); @@ -157,8 +160,11 @@ void testCreateInvalidDatabaseAndTable() { } @Test - void testCreateTableWithInvalidProperty() { - TablePath tablePath = TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test_property"); + void testCreateTableWithInvalidProperty() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_create_table_with_property"); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + + TablePath tablePath1 = TablePath.of("test_db", "test_property"); TableDescriptor t1 = TableDescriptor.builder() .schema(DEFAULT_SCHEMA) @@ -167,7 +173,7 @@ void testCreateTableWithInvalidProperty() { .property("connector", "fluss") .build(); // should throw exception - assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get()) + assertThatThrownBy(() -> admin.createTable(tablePath1, t1, false).get()) .cause() .isInstanceOf(InvalidConfigException.class) .hasMessageContaining("'connector' is not a Fluss table property."); @@ -180,7 +186,7 @@ void testCreateTableWithInvalidProperty() { .property("table.log.ttl", "unknown") .build(); // should throw exception - assertThatThrownBy(() -> admin.createTable(tablePath, t2, false).get()) + assertThatThrownBy(() -> admin.createTable(tablePath1, t2, false).get()) .cause() .isInstanceOf(InvalidConfigException.class) .hasMessageContaining( @@ -194,7 +200,7 @@ void testCreateTableWithInvalidProperty() { .property(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "0") .build(); // should throw exception - assertThatThrownBy(() -> admin.createTable(tablePath, t3, false).get()) + assertThatThrownBy(() -> admin.createTable(tablePath1, t3, false).get()) .cause() .isInstanceOf(InvalidConfigException.class) .hasMessage("'table.log.tiered.local-segments' must be greater than 0."); @@ -202,7 +208,13 @@ void testCreateTableWithInvalidProperty() { @Test void testCreateTableWithInvalidReplicationFactor() throws Exception { - TablePath tablePath = TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "t1"); + TablePath tablePath = + TablePath.of("test_db", "test_create_table_with_invalid_replication_factor_t1"); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + + TablePath tablePath1 = + TablePath.of("test_db", "test_create_table_with_invalid_replication_factor_t2"); + // set replica factor to a non positive number, should also throw exception TableDescriptor nonPositiveReplicaFactorTable = TableDescriptor.builder() @@ -214,7 +226,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception { // should throw exception assertThatThrownBy( () -> - admin.createTable(tablePath, nonPositiveReplicaFactorTable, false) + admin.createTable(tablePath1, nonPositiveReplicaFactorTable, false) .get()) .cause() .isInstanceOf(InvalidReplicationFactorException.class) @@ -234,7 +246,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception { .customProperty("connector", "fluss") .property(ConfigOptions.TABLE_REPLICATION_FACTOR.key(), "3") .build(); - assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get()) + assertThatThrownBy(() -> admin.createTable(tablePath1, tableDescriptor, false).get()) .cause() .isInstanceOf(InvalidReplicationFactorException.class) .hasMessageContaining( @@ -248,18 +260,21 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception { FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata(); // we can create the table now - admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get(); - TableInfo tableInfo = admin.getTable(DEFAULT_TABLE_PATH).get(); + admin.createTable(tablePath1, DEFAULT_TABLE_DESCRIPTOR, false).get(); + TableInfo tableInfo = admin.getTable(tablePath).get(); assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR); } @Test void testCreateExistedTable() throws Exception { - assertThatThrownBy(() -> createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false)) + TablePath tablePath = TablePath.of("test_db", "test_create_table_existed_t1"); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + + assertThatThrownBy(() -> createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false)) .cause() .isInstanceOf(DatabaseAlreadyExistException.class); // no exception - createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, true); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true); // database not exists, throw exception assertThatThrownBy( @@ -275,6 +290,9 @@ void testCreateExistedTable() throws Exception { @Test void testDropDatabaseAndTable() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_drop_database_and_table_t1"); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + // drop not existed database with ignoreIfNotExists false. assertThatThrownBy(() -> admin.deleteDatabase("unknown_db", false, true).get()) .cause() @@ -294,7 +312,7 @@ void testDropDatabaseAndTable() throws Exception { assertThat(admin.databaseExists("test_db").get()).isFalse(); // re-create. - createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); // drop not existed table with ignoreIfNotExists false. assertThatThrownBy( @@ -308,22 +326,35 @@ void testDropDatabaseAndTable() throws Exception { admin.deleteTable(TablePath.of("test_db", "unknown_table"), true).get(); // drop existed table. - assertThat(admin.tableExists(DEFAULT_TABLE_PATH).get()).isTrue(); - admin.deleteTable(DEFAULT_TABLE_PATH, true).get(); - assertThat(admin.tableExists(DEFAULT_TABLE_PATH).get()).isFalse(); + assertThat(admin.tableExists(tablePath).get()).isTrue(); + admin.deleteTable(tablePath, true).get(); + assertThat(admin.tableExists(tablePath).get()).isFalse(); } @Test void testListDatabasesAndTables() throws Exception { + TablePath tablePath = TablePath.of("test_db", "test_drop_database_and_table"); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + admin.createDatabase("db1", true).get(); admin.createDatabase("db2", true).get(); admin.createDatabase("db3", true).get(); assertThat(admin.listDatabases().get()) .containsExactlyInAnyOrder("test_db", "db1", "db2", "db3", "fluss"); - admin.createTable(TablePath.of("db1", "table1"), DEFAULT_TABLE_DESCRIPTOR, true).get(); - admin.createTable(TablePath.of("db1", "table2"), DEFAULT_TABLE_DESCRIPTOR, true).get(); - assertThat(admin.listTables("db1").get()).containsExactlyInAnyOrder("table1", "table2"); + admin.createTable( + TablePath.of("db1", "list_database_and_table_t1"), + DEFAULT_TABLE_DESCRIPTOR, + true) + .get(); + admin.createTable( + TablePath.of("db1", "list_database_and_table_t2"), + DEFAULT_TABLE_DESCRIPTOR, + true) + .get(); + assertThat(admin.listTables("db1").get()) + .containsExactlyInAnyOrder( + "list_database_and_table_t1", "list_database_and_table_t2"); assertThat(admin.listTables("db2").get()).isEmpty(); assertThatThrownBy(() -> admin.listTables("unknown_db").get()) @@ -333,8 +364,10 @@ void testListDatabasesAndTables() throws Exception { @Test void testListPartitionInfos() throws Exception { - String dbName = DEFAULT_TABLE_PATH.getDatabaseName(); + String dbName = "test_db"; TablePath nonPartitionedTablePath = TablePath.of(dbName, "test_non_partitioned_table"); + createTable(nonPartitionedTablePath, DEFAULT_TABLE_DESCRIPTOR, false); + admin.createTable(nonPartitionedTablePath, DEFAULT_TABLE_DESCRIPTOR, true).get(); assertThatThrownBy(() -> admin.listPartitionInfos(nonPartitionedTablePath).get()) .cause() @@ -350,7 +383,7 @@ void testListPartitionInfos() throws Exception { .column("pt", DataTypes.STRING()) .build()) .comment("test table") - .distributedBy(10, "id") + .distributedBy(3, "id") .partitionedBy("pt") .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) .property( @@ -372,8 +405,11 @@ void testListPartitionInfos() throws Exception { @Test void testGetKvSnapshot() throws Exception { - TablePath tablePath1 = - TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test-table-snapshot"); + TablePath tablePath = TablePath.of("test_db", "test-table-snapshot_t1"); + createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + + TablePath tablePath1 = TablePath.of("test_db", "test-table-snapshot_t2"); + int bucketNum = 3; TableDescriptor tableDescriptor = TableDescriptor.builder() diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java index 1df38465a..cd54ae54a 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/FlussLogScannerITCase.java @@ -31,6 +31,7 @@ import com.alibaba.fluss.types.DataTypes; import com.alibaba.fluss.types.RowType; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -46,7 +47,6 @@ import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.row; import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; @@ -55,14 +55,20 @@ /** ITCase for {@link FlussLogScanner}. */ public class FlussLogScannerITCase extends ClientToServerITCaseBase { + @BeforeEach + protected void setup() throws Exception { + super.setup(); + } + @Test void testPoll() throws Exception { - createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false); + TablePath tablePath = new TablePath("test_db_1", "test_poll_t1"); + createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false); // append a batch of data. int recordSize = 10; List expectedRows = new ArrayList<>(); - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); for (int i = 0; i < recordSize; i++) { IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"}); @@ -89,16 +95,18 @@ void testPoll() throws Exception { @Test void testPollWhileCreateTableNotReady() throws Exception { + TablePath tablePath = + new TablePath("test_db_1", "test_poll_while_create_table_not_ready_t1"); // create one table with 100 buckets. int bucketNumber = 100; TableDescriptor tableDescriptor = TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(bucketNumber).build(); - createTable(DATA1_TABLE_PATH, tableDescriptor, false); + createTable(tablePath, tableDescriptor, false); // append a batch of data. int recordSize = 10; List expectedRows = new ArrayList<>(); - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); for (int i = 0; i < recordSize; i++) { IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"}); @@ -125,12 +133,13 @@ void testPollWhileCreateTableNotReady() throws Exception { @Test void testLogScannerMultiThreadAccess() throws Exception { - createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false); + TablePath tablePath = new TablePath("test_db_1", "test_log_scanner_multi_thread_access_t1"); + createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false); // append a batch of data. int recordSize = 10; List expectedRows = new ArrayList<>(); - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); for (int i = 0; i < recordSize; i++) { IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"}); @@ -166,7 +175,7 @@ void testLogScannerMultiThreadAccess() throws Exception { @Test void testLogHeavyWriteAndScan() throws Exception { final String db = "db"; - final String tbl = "kv_heavy_table"; + final String tbl = "log_heavy_table_and_scan"; // create table TableDescriptor descriptor = TableDescriptor.builder() @@ -220,7 +229,7 @@ void testLogHeavyWriteAndScan() throws Exception { @Test void testKvHeavyWriteAndScan() throws Exception { final String db = "db"; - final String tbl = "kv_heavy_table"; + final String tbl = "kv_heavy_table_and_scan"; // create table TableDescriptor descriptor = TableDescriptor.builder() diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetcherTest.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetcherTest.java index 03047fdc4..383ba6838 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetcherTest.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/LogFetcherTest.java @@ -22,6 +22,8 @@ import com.alibaba.fluss.client.scanner.RemoteFileDownloader; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.record.MemoryLogRecords; import com.alibaba.fluss.rpc.RpcClient; import com.alibaba.fluss.rpc.gateway.TabletServerGateway; @@ -38,8 +40,8 @@ import java.util.Map; import static com.alibaba.fluss.record.TestData.DATA1; +import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newProduceLogRequest; import static com.alibaba.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; @@ -48,34 +50,39 @@ /** Test for {@link LogFetcher}. */ public class LogFetcherTest extends ClientToServerITCaseBase { private LogFetcher logFetcher; - private long tableId; - private final int bucketId0 = 0; - private final int bucketId1 = 1; + private RpcClient rpcClient; + private MetadataUpdater metadataUpdater; // TODO covert this test to UT as kafka. @BeforeEach protected void setup() throws Exception { super.setup(); + this.rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient(); + this.metadataUpdater = new MetadataUpdater(clientConf, rpcClient); + } - // We create table data1NonPkTablePath previously. - tableId = createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false); + @Test + void testFetch() throws Exception { + // We create table previously. + TablePath tablePath = TablePath.of("test_db_1", "test_table_for_log_fetcher"); + long tableId = createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false); FLUSS_CLUSTER_EXTENSION.waitUtilTableReady(tableId); - - RpcClient rpcClient = FLUSS_CLUSTER_EXTENSION.getRpcClient(); - MetadataUpdater metadataUpdater = new MetadataUpdater(clientConf, rpcClient); - metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(DATA1_TABLE_PATH)); + metadataUpdater.checkAndUpdateTableMetadata(Collections.singleton(tablePath)); Map scanBuckets = new HashMap<>(); // add bucket 0 and bucket 1 to log scanner status. + int bucketId0 = 0; scanBuckets.put(new TableBucket(tableId, bucketId0), 0L); + int bucketId1 = 1; scanBuckets.put(new TableBucket(tableId, bucketId1), 0L); LogScannerStatus logScannerStatus = new LogScannerStatus(); logScannerStatus.assignScanBuckets(scanBuckets); TestingScannerMetricGroup scannerMetricGroup = TestingScannerMetricGroup.newInstance(); + TableInfo tableInfo = new TableInfo(tablePath, tableId, DATA1_TABLE_DESCRIPTOR, 1); logFetcher = new LogFetcher( - DATA1_TABLE_INFO, + tableInfo, null, rpcClient, logScannerStatus, @@ -83,10 +90,7 @@ protected void setup() throws Exception { metadataUpdater, scannerMetricGroup, new RemoteFileDownloader(1)); - } - @Test - void testFetch() throws Exception { // add one batch records to tb0. TableBucket tb0 = new TableBucket(tableId, bucketId0); addRecordsToBucket(tb0, genMemoryLogRecordsByObject(DATA1), 0L); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java index 9ac8b544b..a2d160763 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/log/RemoteLogScannerITCase.java @@ -49,9 +49,7 @@ import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA2_ROW_TYPE; -import static com.alibaba.fluss.record.TestData.DATA2_TABLE_PATH; import static com.alibaba.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; @@ -76,14 +74,15 @@ protected void setup() throws Exception { @Test void testScanFromRemote() throws Exception { + TablePath tablePath = TablePath.of("test_db1", "test_scan_from_remote_table_1"); TableDescriptor tableDescriptor = TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build(); - long tableId = createTable(DATA1_TABLE_PATH, tableDescriptor); + long tableId = createTable(tablePath, tableDescriptor); // append a batch of data. int recordSize = 20; List expectedRows = new ArrayList<>(); - Table table = conn.getTable(DATA1_TABLE_PATH); + Table table = conn.getTable(tablePath); AppendWriter appendWriter = table.getAppendWriter(); for (int i = 0; i < recordSize; i++) { InternalRow row = row(DATA1_ROW_TYPE, new Object[] {i, "aaaaa"}); @@ -120,7 +119,8 @@ void testPartitionTableFetchFromRemote() throws Exception { .column("c", DataTypes.STRING()) .withComment("c is adding column") .build(); - final TablePath tablePath = DATA2_TABLE_PATH; + final TablePath tablePath = + new TablePath("test_db1", "test_partition_table_fetch_from_remote"); TableDescriptor partitionTableDescriptor = TableDescriptor.builder() diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java index 68118947d..47c1cb9fd 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/scanner/snapshot/SnapshotScannerITCase.java @@ -90,7 +90,6 @@ protected void teardown() throws Exception { remoteFileDownloader.close(); remoteFileDownloader = null; } - super.teardown(); } @Test diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java index 8f87a2f81..7f4e6d9d6 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussFailServerTableITCase.java @@ -16,48 +16,83 @@ package com.alibaba.fluss.client.table; -import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.Connection; +import com.alibaba.fluss.client.ConnectionFactory; +import com.alibaba.fluss.client.admin.Admin; import com.alibaba.fluss.client.scanner.ScanRecord; +import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; import com.alibaba.fluss.client.scanner.log.ScanRecords; import com.alibaba.fluss.client.table.writer.AppendWriter; import com.alibaba.fluss.client.table.writer.UpsertWriter; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.row.InternalRow; import com.alibaba.fluss.row.indexed.IndexedRow; +import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import static com.alibaba.fluss.client.admin.ClientToServerITCaseBase.initConfig; +import static com.alibaba.fluss.client.admin.ClientToServerITCaseBase.subscribeFromBeginning; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO_PK; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.row; import static com.alibaba.fluss.testutils.InternalRowListAssert.assertThatRows; /** IT case for {@link FlussTable} in the case of one tablet server fails. */ -class FlussFailServerTableITCase extends ClientToServerITCaseBase { +class FlussFailServerTableITCase { + + @RegisterExtension + public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = + FlussClusterExtension.builder() + .setNumOfTabletServers(3) + .setClusterConf(initConfig()) + .build(); private static final int SERVER = 0; + private Connection conn; + protected Admin admin; @BeforeEach - void beforeEach() throws Exception { + void beforeEach() { + Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + conn = ConnectionFactory.createConnection(clientConf); + admin = conn.getAdmin(); + // since we kill and start one tablet server in each test, // we need to wait for metadata to be updated to servers FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata(); - super.setup(); + } + + @AfterEach + protected void teardown() throws Exception { + if (admin != null) { + admin.close(); + admin = null; + } + + if (conn != null) { + conn.close(); + conn = null; + } } @Test void testAppend() throws Exception { - createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false); - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + TablePath tablePath = TablePath.of("test_db_1", "test_fail_append_table_1"); + admin.createDatabase(tablePath.getDatabaseName(), true).get(); + admin.createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false).get(); + try (Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {1, "a"}); @@ -80,9 +115,11 @@ void testAppend() throws Exception { @Test void testPut() throws Exception { - createTable(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK.getTableDescriptor(), false); + TablePath tablePath = TablePath.of("test_db_1", "test_fail_kv_table_1"); + admin.createDatabase(tablePath.getDatabaseName(), true).get(); + admin.createTable(tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor(), false).get(); // put one row - try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) { + try (Table table = conn.getTable(tablePath)) { UpsertWriter upsertWriter = table.getUpsertWriter(); InternalRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); upsertWriter.upsert(row).get(); @@ -106,11 +143,13 @@ void testPut() throws Exception { @Test void testLogScan() throws Exception { - createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false); + TablePath tablePath = TablePath.of("test_db_1", "test_fail_log_scan_table_1"); + admin.createDatabase(tablePath.getDatabaseName(), true).get(); + admin.createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false).get(); // append one row. IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {1, "a"}); - try (Table table = conn.getTable(DATA1_TABLE_PATH); - LogScanner logScanner = createLogScanner(table)) { + try (Table table = conn.getTable(tablePath); + LogScanner logScanner = table.getLogScanner(new LogScan())) { subscribeFromBeginning(logScanner, table); AppendWriter appendWriter = table.getAppendWriter(); appendWriter.append(row).get(); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java index 09e16589d..34cf04412 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java @@ -42,8 +42,6 @@ import java.util.List; import java.util.Map; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; import static com.alibaba.fluss.testutils.DataTestUtils.row; @@ -55,10 +53,11 @@ class FlussPartitionedTableITCase extends ClientToServerITCaseBase { @Test void testPartitionedPrimaryKeyTable() throws Exception { - Schema schema = createPartitionedTable(DATA1_TABLE_PATH_PK, true); + TablePath tablePath = TablePath.of("test_db_1", "partitioned_primary_key_table_1"); + Schema schema = createPartitionedTable(tablePath, true); Map partitionIdByNames = - FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH_PK); - Table table = conn.getTable(DATA1_TABLE_PATH_PK); + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); + Table table = conn.getTable(tablePath); UpsertWriter upsertWriter = table.getUpsertWriter(); int recordsPerPartition = 5; // now, put some data to the partitions @@ -94,10 +93,11 @@ void testPartitionedPrimaryKeyTable() throws Exception { @Test void testPartitionedLogTable() throws Exception { - Schema schema = createPartitionedTable(DATA1_TABLE_PATH, false); + TablePath tablePath = TablePath.of("test_db_1", "partitioned_log_table_1"); + Schema schema = createPartitionedTable(tablePath, false); Map partitionIdByNames = - FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(DATA1_TABLE_PATH); - Table table = conn.getTable(DATA1_TABLE_PATH); + FLUSS_CLUSTER_EXTENSION.waitUtilPartitionAllReady(tablePath); + Table table = conn.getTable(tablePath); AppendWriter appendWriter = table.getAppendWriter(); int recordsPerPartition = 5; Map> expectPartitionAppendRows = new HashMap<>(); @@ -194,8 +194,9 @@ private Map> pollRecords( @Test void testOperateNotExistPartitionShouldThrowException() throws Exception { - Schema schema = createPartitionedTable(DATA1_TABLE_PATH_PK, true); - Table table = conn.getTable(DATA1_TABLE_PATH_PK); + TablePath tablePath = TablePath.of("test_db_1", "test_operate_not_exist_partition"); + Schema schema = createPartitionedTable(tablePath, true); + Table table = conn.getTable(tablePath); // test get for a not exist partition assertThatThrownBy( @@ -211,7 +212,7 @@ void testOperateNotExistPartitionShouldThrowException() throws Exception { .isInstanceOf(PartitionNotExistException.class) .hasMessageContaining( "Table partition '%s' does not exist.", - PhysicalTablePath.of(DATA1_TABLE_PATH_PK, "notExistPartition")); + PhysicalTablePath.of(tablePath, "notExistPartition")); // test write to not exist partition UpsertWriter upsertWriter = table.getUpsertWriter(); @@ -221,7 +222,7 @@ void testOperateNotExistPartitionShouldThrowException() throws Exception { .isInstanceOf(PartitionNotExistException.class) .hasMessageContaining( "Table partition '%s' does not exist.", - PhysicalTablePath.of(DATA1_TABLE_PATH_PK, "notExistPartition")); + PhysicalTablePath.of(tablePath, "notExistPartition")); // test scan a not exist partition's log LogScan logScan = new LogScan(); diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index feaddf7ec..6319d27a0 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -81,20 +81,22 @@ class FlussTableITCase extends ClientToServerITCaseBase { @Test void testGetDescriptor() throws Exception { - createTable(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK.getTableDescriptor(), false); + TablePath tablePath = TablePath.of("test_db_1", "test_get_descriptor_table"); + createTable(tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor(), false); // get table descriptor. - Table table = conn.getTable(DATA1_TABLE_PATH_PK); + Table table = conn.getTable(tablePath); TableDescriptor tableDescriptor = table.getDescriptor(); assertThat(tableDescriptor).isEqualTo(DATA1_TABLE_INFO_PK.getTableDescriptor()); } @Test void testAppendOnly() throws Exception { - createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false); + TablePath tablePath = TablePath.of("test_db_1", "test_append_only_table"); + createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false); // append data. InternalRow row = row(DATA1_ROW_TYPE, new Object[] {1, "a"}); - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); appendWriter.append(row).get(); } @@ -149,9 +151,10 @@ void testAppendWithSmallBuffer(boolean indexedFormat) throws Exception { @Test void testUpsertWithSmallBuffer() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_upsert_with_small_buffer_table"); TableDescriptor desc = TableDescriptor.builder().schema(DATA1_SCHEMA_PK).distributedBy(1, "a").build(); - createTable(DATA1_TABLE_PATH, desc, false); + createTable(tablePath, desc, false); Configuration config = new Configuration(clientConf); // only 1kb memory size, and 64 bytes page size. config.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new MemorySize(2048)); @@ -159,7 +162,7 @@ void testUpsertWithSmallBuffer() throws Exception { config.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(256)); int expectedSize = 20; try (Connection conn = ConnectionFactory.createConnection(config)) { - Table table = conn.getTable(DATA1_TABLE_PATH); + Table table = conn.getTable(tablePath); UpsertWriter upsertWriter = table.getUpsertWriter(); BinaryString value = BinaryString.fromString(StringUtils.repeat("a", 100)); // should exceed the buffer size, but append successfully @@ -205,7 +208,7 @@ void testPutAndLookup() throws Exception { TableDescriptor.builder().schema(schema).distributedBy(3, "b").build(); // create the table TablePath data1PkTablePath2 = - TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_pk_table_2"); + TablePath.of(DATA1_TABLE_PATH_PK.getDatabaseName(), "test_put_and_lookup_table_2"); createTable(data1PkTablePath2, tableDescriptor, true); // now, check put/lookup data verifyPutAndLookup(data1PkTablePath2, schema, new Object[] {"a", 1}); @@ -213,13 +216,13 @@ void testPutAndLookup() throws Exception { @Test void testLimitScanPrimaryTable() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_limit_scan_primary_table"); TableDescriptor descriptor = TableDescriptor.builder().schema(DATA1_SCHEMA_PK).distributedBy(1).build(); - long tableId = createTable(DATA1_TABLE_PATH_PK, descriptor, true); + long tableId = createTable(tablePath, descriptor, true); int insertSize = 10; int limitSize = 5; - try (Connection conn = ConnectionFactory.createConnection(clientConf)) { - Table table = conn.getTable(DATA1_TABLE_PATH_PK); + try (Table table = conn.getTable(tablePath)) { UpsertWriter upsertWriter = table.getUpsertWriter(); List expectedRows = new ArrayList<>(); @@ -264,14 +267,14 @@ void testLimitScanPrimaryTable() throws Exception { @Test void testLimitScanLogTable() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_limit_scan_log_table"); TableDescriptor descriptor = TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(1).build(); - long tableId = createTable(DATA1_TABLE_PATH, descriptor, true); + long tableId = createTable(tablePath, descriptor, true); int insertSize = 10; int limitSize = 5; - try (Connection conn = ConnectionFactory.createConnection(clientConf)) { - Table table = conn.getTable(DATA1_TABLE_PATH); + try (Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); List expectedRows = new ArrayList<>(); @@ -339,6 +342,7 @@ private InternalRow lookupRow(TablePath tablePath, IndexedRow keyRow) throws Exc @Test void testPartialPutAndDelete() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_partial_put_and_delete"); Schema schema = Schema.newBuilder() .column("a", DataTypes.INT()) @@ -350,14 +354,14 @@ void testPartialPutAndDelete() throws Exception { RowType pkRowType = RowType.of(DataTypes.INT()); TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a").build(); - createTable(DATA1_TABLE_PATH_PK, tableDescriptor, true); + createTable(tablePath, tableDescriptor, true); // test put a full row - verifyPutAndLookup(DATA1_TABLE_PATH_PK, schema, new Object[] {1, "a", 1, true}); + verifyPutAndLookup(tablePath, schema, new Object[] {1, "a", 1, true}); // partial update columns: a, b UpsertWrite partialUpdate = new UpsertWrite().withPartialUpdate(new int[] {0, 1}); - Table table = conn.getTable(DATA1_TABLE_PATH_PK); + Table table = conn.getTable(tablePath); UpsertWriter upsertWriter = table.getUpsertWriter(partialUpdate); upsertWriter .upsert(compactedRow(schema.toRowType(), new Object[] {1, "aaa", null, null})) @@ -365,7 +369,7 @@ void testPartialPutAndDelete() throws Exception { // check the row IndexedRow rowKey = row(pkRowType, new Object[] {1}); - assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey)) + assertThat(lookupRow(tablePath, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "aaa", 1, true})); // partial update columns columns: a,b,c @@ -376,14 +380,14 @@ void testPartialPutAndDelete() throws Exception { .get(); // lookup the row - assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey)) + assertThat(lookupRow(tablePath, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, true})); // test partial delete, target column is a,b,c upsertWriter .delete(compactedRow(schema.toRowType(), new Object[] {1, "bbb", 222, null})) .get(); - assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey)) + assertThat(lookupRow(tablePath, rowKey)) .isEqualTo(compactedRow(schema.toRowType(), new Object[] {1, null, null, true})); // partial delete, target column is d @@ -394,13 +398,14 @@ void testPartialPutAndDelete() throws Exception { .get(); // the row should be deleted, shouldn't get the row again - assertThat(lookupRow(DATA1_TABLE_PATH_PK, rowKey)).isNull(); + assertThat(lookupRow(tablePath, rowKey)).isNull(); table.close(); } @Test void testInvalidPartialUpdate() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_invalid_partial_update"); Schema schema = Schema.newBuilder() .column("a", DataTypes.INT()) @@ -410,9 +415,9 @@ void testInvalidPartialUpdate() throws Exception { .build(); TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).distributedBy(3, "a").build(); - createTable(DATA1_TABLE_PATH_PK, tableDescriptor, true); + createTable(tablePath, tableDescriptor, true); - try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) { + try (Table table = conn.getTable(tablePath)) { // the target columns doesn't contains the primary column, should // throw exception assertThatThrownBy( @@ -440,27 +445,29 @@ void testInvalidPartialUpdate() throws Exception { @Test void testDelete() throws Exception { - createTable(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK.getTableDescriptor(), false); + TablePath tablePath = TablePath.of("test_db_1", "test_delete"); + createTable(tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor(), false); // put key. InternalRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"}); - try (Table table = conn.getTable(DATA1_TABLE_PATH_PK)) { + try (Table table = conn.getTable(tablePath)) { UpsertWriter upsertWriter = table.getUpsertWriter(); upsertWriter.upsert(row).get(); // lookup this key. IndexedRow keyRow = keyRow(DATA1_SCHEMA_PK, new Object[] {1, "a"}); - assertThat(lookupRow(DATA1_TABLE_PATH_PK, keyRow)).isEqualTo(row); + assertThat(lookupRow(tablePath, keyRow)).isEqualTo(row); // delete this key. upsertWriter.delete(row).get(); // lookup this key again, will return null. - assertThat(lookupRow(DATA1_TABLE_PATH_PK, keyRow)).isNull(); + assertThat(lookupRow(tablePath, keyRow)).isNull(); } } @Test void testAppendWhileTableMaybeNotReady() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_append_while_table_maybe_not_ready"); // Create table request will complete if the table info was registered in zk, but the table // maybe not ready immediately. So, the metadata request possibly get incomplete table info, // like the unknown leader. In this case, the append request need retry until the table is @@ -468,11 +475,11 @@ void testAppendWhileTableMaybeNotReady() throws Exception { int bucketNumber = 10; TableDescriptor tableDescriptor = TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(bucketNumber).build(); - createTable(DATA1_TABLE_PATH, tableDescriptor, false); + createTable(tablePath, tableDescriptor, false); // append data. IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {1, "a"}); - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { AppendWriter appendWriter = table.getAppendWriter(); appendWriter.append(row).get(); @@ -494,16 +501,19 @@ void testAppendWhileTableMaybeNotReady() throws Exception { @ParameterizedTest @ValueSource(strings = {"INDEXED", "ARROW"}) void testAppendAndPoll(String format) throws Exception { - verifyAppendOrPut(true, format, null); + TablePath tablePath = TablePath.of("test_db_1", "test_append_and_poll_" + format); + verifyAppendOrPut(tablePath, true, format, null); } @ParameterizedTest @ValueSource(strings = {"INDEXED", "COMPACTED"}) void testPutAndPoll(String kvFormat) throws Exception { - verifyAppendOrPut(false, "ARROW", kvFormat); + TablePath tablePath = TablePath.of("test_db_1", "test_put_and_poll_" + kvFormat); + verifyAppendOrPut(tablePath, false, "ARROW", kvFormat); } - void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvFormat) + void verifyAppendOrPut( + TablePath tablePath, boolean append, String logFormat, @Nullable String kvFormat) throws Exception { Schema schema = append @@ -526,10 +536,10 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm builder.kvFormat(KvFormat.fromString(kvFormat)); } TableDescriptor tableDescriptor = builder.build(); - createTable(DATA1_TABLE_PATH, tableDescriptor, false); + createTable(tablePath, tableDescriptor, false); int expectedSize = 30; - try (Table table = conn.getTable(DATA1_TABLE_PATH)) { + try (Table table = conn.getTable(tablePath)) { TableWriter tableWriter; if (append) { tableWriter = table.getAppendWriter(); @@ -565,7 +575,7 @@ void verifyAppendOrPut(boolean append, String logFormat, @Nullable String kvForm } // fetch data. - try (Table table = conn.getTable(DATA1_TABLE_PATH); + try (Table table = conn.getTable(tablePath); LogScanner logScanner = createLogScanner(table)) { subscribeFromBeginning(logScanner, table); int count = 0; @@ -657,7 +667,7 @@ void testPutAndProject() throws Exception { .primaryKey("a") .build(); TableDescriptor tableDescriptor = TableDescriptor.builder().schema(schema).build(); - TablePath tablePath = TablePath.of("test_db_1", "test_pk_table_1"); + TablePath tablePath = TablePath.of("test_db_1", "test_put_and_project"); createTable(tablePath, tableDescriptor, false); int batches = 3; @@ -763,16 +773,17 @@ void testPutAndProject() throws Exception { @Test void testInvalidColumnProjection() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_invalid_column_projection"); TableDescriptor tableDescriptor = TableDescriptor.builder().schema(DATA1_SCHEMA).logFormat(LogFormat.INDEXED).build(); - createTable(DATA1_TABLE_PATH, tableDescriptor, false); - Table table = conn.getTable(DATA1_TABLE_PATH); + createTable(tablePath, tableDescriptor, false); + Table table = conn.getTable(tablePath); // validation on projection assertThatThrownBy(() -> createLogScanner(table, new int[] {1})) .isInstanceOf(IllegalArgumentException.class) .hasMessage( "Only ARROW log format supports column projection, but the log format " - + "of table 'test_db_1.test_non_pk_table_1' is INDEXED"); + + "of table 'test_db_1.test_invalid_column_projection' is INDEXED"); } } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java index 16fbe0bb6..c14906a6a 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogITCase.java @@ -102,7 +102,7 @@ void before() { void testCreateTable() throws Exception { // create a table will all supported data types tEnv.executeSql( - "create table test_table " + "create table test_catalog_create_table_table1 " + "(a int not null primary key not enforced," + " b CHAR(3)," + " c STRING not null COMMENT 'STRING COMMENT'," @@ -148,7 +148,9 @@ void testCreateTable() throws Exception { .primaryKey("a"); Schema expectedSchema = schemaBuilder.build(); CatalogTable table = - (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, "test_table")); + (CatalogTable) + catalog.getTable( + new ObjectPath(DEFAULT_DB, "test_catalog_create_table_table1")); assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); } @@ -158,7 +160,7 @@ void testCreateUnSupportedTable() { assertThatThrownBy( () -> tEnv.executeSql( - "create table test_table_unsupported" + "create table test_catalog_table_unsupported" + " (a int, b int) partitioned by (b)")) .cause() .isInstanceOf(CatalogException.class) @@ -169,12 +171,15 @@ void testCreateUnSupportedTable() { @Test void testCreateNoPkTable() throws Exception { - tEnv.executeSql("create table append_only_table(a int, b int) with ('bucket.num' = '10')"); + tEnv.executeSql( + "create table test_catalog_append_only_table(a int, b int) with ('bucket.num' = '10')"); Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("a", DataTypes.INT()).column("b", DataTypes.INT()); Schema expectedSchema = schemaBuilder.build(); CatalogTable table = - (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, "append_only_table")); + (CatalogTable) + catalog.getTable( + new ObjectPath(DEFAULT_DB, "test_catalog_append_only_table")); assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); Map expectedOptions = new HashMap<>(); expectedOptions.put("bucket.num", "10"); @@ -184,7 +189,7 @@ void testCreateNoPkTable() throws Exception { @Test void testCreatePartitionedTable() throws Exception { tEnv.executeSql( - "create table test_partitioned_table (a int, b string) partitioned by (b) " + "create table test_catalog_partitioned_table (a int, b string) partitioned by (b) " + "with ('table.auto-partition.enabled' = 'true'," + " 'table.auto-partition.time-unit' = 'day')"); Schema.Builder schemaBuilder = Schema.newBuilder(); @@ -192,7 +197,8 @@ void testCreatePartitionedTable() throws Exception { Schema expectedSchema = schemaBuilder.build(); CatalogTable table = (CatalogTable) - catalog.getTable(new ObjectPath(DEFAULT_DB, "test_partitioned_table")); + catalog.getTable( + new ObjectPath(DEFAULT_DB, "test_catalog_partitioned_table")); assertThat(table.getUnresolvedSchema()).isEqualTo(expectedSchema); assertThat(table.getPartitionKeys()).isEqualTo(Collections.singletonList("b")); } @@ -238,7 +244,7 @@ void testCreateWithUnSupportDataType() { assertThatThrownBy( () -> tEnv.executeSql( - "create table test_table_unsupported (a varchar(10))")) + "create table test_catalog_table_unsupported_1 (a varchar(10))")) .cause() .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Unsupported data type: VARCHAR(10)"); @@ -247,7 +253,7 @@ void testCreateWithUnSupportDataType() { assertThatThrownBy( () -> tEnv.executeSql( - "create table test_table_unsupported (a varbinary(10))")) + "create table test_catalog_table_unsupported_1 (a varbinary(10))")) .cause() .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Unsupported data type: VARBINARY(10)"); @@ -256,7 +262,7 @@ void testCreateWithUnSupportDataType() { assertThatThrownBy( () -> tEnv.executeSql( - "create table test_table_unsupported (a multiset)")) + "create table test_catalog_table_unsupported_1 (a multiset)")) .cause() .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Unsupported data type: MULTISET"); @@ -264,12 +270,12 @@ void testCreateWithUnSupportDataType() { @Test void testCreateDatabase() { - tEnv.executeSql("create database test_db"); + tEnv.executeSql("create database test_catalog_db1"); List databases = CollectionUtil.iteratorToList(tEnv.executeSql("show databases").collect()); assertThat(databases.toString()) - .isEqualTo(String.format("[+I[%s], +I[test_db]]", DEFAULT_DB)); - tEnv.executeSql("drop database test_db"); + .isEqualTo(String.format("[+I[%s], +I[test_catalog_db1]]", DEFAULT_DB)); + tEnv.executeSql("drop database test_catalog_db1"); databases = CollectionUtil.iteratorToList(tEnv.executeSql("show databases").collect()); assertThat(databases.toString()).isEqualTo(String.format("[+I[%s]]", DEFAULT_DB)); } @@ -278,9 +284,12 @@ void testCreateDatabase() { void testFactoryCannotFindForCreateTemporaryTable() { // create fluss temporary table is not supported tEnv.executeSql( - "create temporary table test_temp_table (a int, b int)" + "create temporary table test_catalog_cannot_found_temp_table (a int, b int)" + " with ('connector' = 'fluss', 'bootstrap.servers' = 'localhost:9092')"); - assertThatThrownBy(() -> tEnv.executeSql("insert into test_temp_table values (1, 2)")) + assertThatThrownBy( + () -> + tEnv.executeSql( + "insert into test_catalog_cannot_found_temp_table values (1, 2)")) .cause() .isInstanceOf(ValidationException.class) .hasMessage("Cannot discover a connector using option: 'connector'='fluss'"); @@ -291,9 +300,12 @@ void testFactoryCannotFindForCreateCatalogTable() { // create fluss table under non-fluss catalog is not supported tEnv.executeSql("use catalog " + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue()); tEnv.executeSql( - "create table test_catalog_table (a int, b int)" + "create table test_catalog_cannot_found_table (a int, b int)" + " with ('connector' = 'fluss', 'bootstrap.servers' = 'localhost:9092')"); - assertThatThrownBy(() -> tEnv.executeSql("insert into test_catalog_table values (1, 2)")) + assertThatThrownBy( + () -> + tEnv.executeSql( + "insert into test_catalog_cannot_found_table values (1, 2)")) .cause() .isInstanceOf(ValidationException.class) .hasMessage("Cannot discover a connector using option: 'connector'='fluss'"); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java index b27deda14..641c695b7 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java @@ -72,7 +72,6 @@ class FlinkCatalogTest { private static final String CATALOG_NAME = "test-catalog"; private static final String DEFAULT_DB = "default"; static Catalog catalog; - private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB, "t1"); private ResolvedSchema createSchema() { return new ResolvedSchema( @@ -137,26 +136,27 @@ void beforeEach() throws Exception { @Test void testCreateTable() throws Exception { Map options = new HashMap<>(); - assertThatThrownBy(() -> catalog.getTable(tableInDefaultDb)) + ObjectPath objectPath = new ObjectPath(DEFAULT_DB, "catalog_create_table_test_t1"); + assertThatThrownBy(() -> catalog.getTable(objectPath)) .isInstanceOf(TableNotExistException.class) .hasMessage( String.format( "Table (or view) %s does not exist in Catalog %s.", - tableInDefaultDb, CATALOG_NAME)); + objectPath, CATALOG_NAME)); CatalogTable table = this.newCatalogTable(options); - catalog.createTable(this.tableInDefaultDb, table, false); - assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue(); + catalog.createTable(objectPath, table, false); + assertThat(catalog.tableExists(objectPath)).isTrue(); // create the table again, should throw exception with ignore if exist = false - assertThatThrownBy(() -> catalog.createTable(this.tableInDefaultDb, table, false)) + assertThatThrownBy(() -> catalog.createTable(objectPath, table, false)) .isInstanceOf(TableAlreadyExistException.class) .hasMessage( String.format( "Table (or view) %s already exists in Catalog %s.", - this.tableInDefaultDb, CATALOG_NAME)); + objectPath, CATALOG_NAME)); // should be ok since we set ignore if exist = true - catalog.createTable(this.tableInDefaultDb, table, true); + catalog.createTable(objectPath, table, true); // get the table and check - CatalogBaseTable tableCreated = catalog.getTable(this.tableInDefaultDb); + CatalogBaseTable tableCreated = catalog.getTable(objectPath); // put bucket key option Map addedOptions = new HashMap<>(); @@ -169,18 +169,18 @@ void testCreateTable() throws Exception { // list tables List tables = catalog.listTables(DEFAULT_DB); assertThat(tables.size()).isEqualTo(1L); - assertThat(tables.get(0)).isEqualTo(this.tableInDefaultDb.getObjectName()); - catalog.dropTable(this.tableInDefaultDb, false); + assertThat(tables.get(0)).isEqualTo(objectPath.getObjectName()); + catalog.dropTable(objectPath, false); assertThat(catalog.listTables(DEFAULT_DB)).isEmpty(); // drop the table again, should throw exception with ignoreIfNotExists = false - assertThatThrownBy(() -> catalog.dropTable(this.tableInDefaultDb, false)) + assertThatThrownBy(() -> catalog.dropTable(objectPath, false)) .isInstanceOf(TableNotExistException.class) .hasMessage( String.format( "Table (or view) %s does not exist in Catalog %s.", - this.tableInDefaultDb, CATALOG_NAME)); + objectPath, CATALOG_NAME)); // should be ok since we set ignoreIfNotExists = true - catalog.dropTable(this.tableInDefaultDb, true); + catalog.dropTable(objectPath, true); // create table from an non-exist db ObjectPath nonExistDbPath = ObjectPath.fromString("non.exist"); @@ -204,8 +204,8 @@ void testCreateTable() throws Exception { Collections.singletonList("first"), options), resolvedSchema); - catalog.createTable(this.tableInDefaultDb, table2, false); - tableCreated = catalog.getTable(this.tableInDefaultDb); + catalog.createTable(objectPath, table2, false); + tableCreated = catalog.getTable(objectPath); // need to over write the option addedOptions.put(BUCKET_KEY.key(), "third"); @@ -216,13 +216,14 @@ void testCreateTable() throws Exception { @Test void testCreateTableWithBucket() throws Exception { + ObjectPath objectPath = new ObjectPath(DEFAULT_DB, "create_table_with_bucket_test_t1"); // for pk table; // set bucket count and bucket key; Map options = new HashMap<>(); options.put(BUCKET_NUMBER.key(), "10"); options.put(BUCKET_KEY.key(), "first,third"); - createAndCheckAndDropTable(createSchema(), tableInDefaultDb, options); + createAndCheckAndDropTable(createSchema(), objectPath, options); // for non pk table // set nothing; @@ -231,18 +232,18 @@ void testCreateTableWithBucket() throws Exception { options = new HashMap<>(); // default is 1 options.put(BUCKET_NUMBER.key(), "1"); - createAndCheckAndDropTable(schema, tableInDefaultDb, options); + createAndCheckAndDropTable(schema, objectPath, options); // set bucket count; options.put(BUCKET_NUMBER.key(), "10"); - createAndCheckAndDropTable(schema, tableInDefaultDb, options); + createAndCheckAndDropTable(schema, objectPath, options); // set bucket count and bucket key; options.put("bucket-key", "first"); - createAndCheckAndDropTable(schema, tableInDefaultDb, options); + createAndCheckAndDropTable(schema, objectPath, options); // only set bucket key - createAndCheckAndDropTable(schema, tableInDefaultDb, options); + createAndCheckAndDropTable(schema, objectPath, options); } @Test @@ -270,7 +271,7 @@ void testCreateTableWithWatermarkAndComputedCol() throws Exception { Collections.emptyList(), new HashMap<>(options)); CatalogTable originResolvedTable = new ResolvedCatalogTable(origin, resolvedSchema); - ObjectPath path = new ObjectPath(DEFAULT_DB, "t2"); + ObjectPath path = new ObjectPath(DEFAULT_DB, "create_table_with_watermark_t2"); catalog.createTable(path, originResolvedTable, false); CatalogTable tableCreated = (CatalogTable) catalog.getTable(path); // resolve it and check @@ -293,11 +294,12 @@ void testCreateTableWithWatermarkAndComputedCol() throws Exception { @Test void testUnsupportedTable() { + ObjectPath objectPath = new ObjectPath(DEFAULT_DB, "unsupported_table_test_t1"); // test create non fluss table Map options = new HashMap<>(); options.put(FactoryUtil.CONNECTOR.key(), "kafka"); final CatalogTable table = this.newCatalogTable(options); - assertThatThrownBy(() -> catalog.createTable(this.tableInDefaultDb, table, false)) + assertThatThrownBy(() -> catalog.createTable(objectPath, table, false)) .isInstanceOf(CatalogException.class) .hasMessageContaining("Fluss Catalog only supports fluss tables"); options = new HashMap<>(); @@ -312,7 +314,7 @@ void testUnsupportedTable() { UniqueConstraint.primaryKey( "PK_first", Collections.singletonList("first"))); CatalogTable table1 = this.newCatalogTable(resolvedSchema, options); - assertThatThrownBy(() -> catalog.createTable(this.tableInDefaultDb, table1, false)) + assertThatThrownBy(() -> catalog.createTable(objectPath, table1, false)) .isInstanceOf(CatalogException.class) .hasMessage("Metadata column %s is not supported.", metaDataCol); } diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/metrics/FlinkMetricsITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/metrics/FlinkMetricsITCase.java index 96dfeede1..2ded2f057 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/metrics/FlinkMetricsITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/metrics/FlinkMetricsITCase.java @@ -111,12 +111,13 @@ void testMetricsReport() throws Exception { .column("name", DataTypes.STRING()) .build()) .build(); - TablePath tablePath = TablePath.of(DEFAULT_DB, "test"); + TablePath tablePath = TablePath.of(DEFAULT_DB, "test_metrics_report_t1"); createTable(tablePath, tableDescriptor); // test write TableResult tableResult = - tEnv.executeSql("insert into test values (1, 'name1'), (2, 'name2'), (3, 'name3')"); + tEnv.executeSql( + "insert into test_metrics_report_t1 values (1, 'name1'), (2, 'name2'), (3, 'name3')"); JobClient client = tableResult.getJobClient().get(); JobID jobID = client.getJobID(); tableResult.await(); @@ -131,7 +132,7 @@ void testMetricsReport() throws Exception { assertThat((Long) ((Gauge) sendLatencyMetrics).getValue()).isGreaterThan(0); // test scan - tableResult = tEnv.executeSql("select * from test"); + tableResult = tEnv.executeSql("select * from test_metrics_report_t1"); client = tableResult.getJobClient().get(); jobID = client.getJobID(); assertResultsIgnoreOrder( diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index 4c9f1f1a1..d20ab3dbd 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -125,10 +125,10 @@ void after() { @Test void testAppendLog() throws Exception { tEnv.executeSql( - "create table sink_test (a int not null, b bigint, c string) with " + "create table test_append_log_sink_test (a int not null, b bigint, c string) with " + "('bucket.num' = '3')"); tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " + "INSERT INTO test_append_log_sink_test(a, b, c) " + "VALUES (1, 3501, 'Tim'), " + "(2, 3502, 'Fabian'), " + "(3, 3503, 'coco'), " @@ -137,7 +137,8 @@ void testAppendLog() throws Exception { + "(6, 3506, 'stave')") .await(); - CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); + CloseableIterator rowIter = + tEnv.executeSql("select * from test_append_log_sink_test").collect(); List expectedRows = Arrays.asList( "+I[1, 3501, Tim]", "+I[2, 3502, Fabian]", @@ -149,10 +150,10 @@ void testAppendLog() throws Exception { @Test void testAppendLogWithBucketKey() throws Exception { tEnv.executeSql( - "create table sink_test (a int not null, b bigint, c string) with " + "create table append_log_with_bucket_key_sink (a int not null, b bigint, c string) with " + "('bucket.num' = '3', 'bucket.key' = 'c')"); tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " + "INSERT INTO append_log_with_bucket_key_sink(a, b, c) " + "VALUES (1, 3501, 'Tim'), " + "(2, 3502, 'Fabian'), " + "(3, 3503, 'Tim'), " @@ -166,7 +167,8 @@ void testAppendLogWithBucketKey() throws Exception { + "(12, 3512, 'Tim')") .await(); - CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); + CloseableIterator rowIter = + tEnv.executeSql("select * from append_log_with_bucket_key_sink").collect(); //noinspection ArraysAsListWithZeroOrOneArgument List> expectedGroups = Arrays.asList( @@ -208,10 +210,10 @@ void testAppendLogWithBucketKey() throws Exception { @Test void testAppendLogWithRoundRobin() throws Exception { tEnv.executeSql( - "create table sink_test (a int not null, b bigint, c string) with " + "create table append_log_with_round_robin_sink (a int not null, b bigint, c string) with " + "('bucket.num' = '3', 'client.writer.bucket.no-key-assigner' = 'round_robin')"); tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " + "INSERT INTO append_log_with_round_robin_sink(a, b, c) " + "VALUES (1, 3501, 'Tim'), " + "(2, 3502, 'Fabian'), " + "(3, 3503, 'coco'), " @@ -223,7 +225,8 @@ void testAppendLogWithRoundRobin() throws Exception { Map> rows = new HashMap<>(); Configuration clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); Connection conn = ConnectionFactory.createConnection(clientConf); - try (Table table = conn.getTable(TablePath.of(DEFAULT_DB, "sink_test"))) { + try (Table table = + conn.getTable(TablePath.of(DEFAULT_DB, "append_log_with_round_robin_sink"))) { LogScanner logScanner = table.getLogScanner(new LogScan()); logScanner.subscribeFromBeginning(0); @@ -255,12 +258,12 @@ void testAppendLogWithRoundRobin() throws Exception { @Test void testAppendLogWithMultiBatch() throws Exception { tEnv.executeSql( - "create table sink_test (a int not null, b bigint, c string) with " + "create table append_log_with_multi_batch_sink (a int not null, b bigint, c string) with " + "('bucket.num' = '3')"); int batchSize = 3; for (int i = 0; i < batchSize; i++) { tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " + "INSERT INTO append_log_with_multi_batch_sink(a, b, c) " + "VALUES (1, 3501, 'Tim'), " + "(2, 3502, 'Fabian'), " + "(3, 3503, 'coco'), " @@ -270,7 +273,8 @@ void testAppendLogWithMultiBatch() throws Exception { .await(); } - CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); + CloseableIterator rowIter = + tEnv.executeSql("select * from append_log_with_multi_batch_sink").collect(); List expectedRows = new ArrayList<>(); for (int i = 0; i < batchSize; i++) { expectedRows.addAll( @@ -285,9 +289,9 @@ void testAppendLogWithMultiBatch() throws Exception { @Test void testPut() throws Exception { tEnv.executeSql( - "create table sink_test (a int not null primary key not enforced, b bigint, c string) with('bucket.num' = '3')"); + "create table put_sink_1 (a int not null primary key not enforced, b bigint, c string) with('bucket.num' = '3')"); tEnv.executeSql( - "INSERT INTO sink_test(a, b, c) " + "INSERT INTO put_sink_1(a, b, c) " + "VALUES (1, 3501, 'Tim'), " + "(2, 3502, 'Fabian'), " + "(3, 3503, 'coco'), " @@ -296,7 +300,7 @@ void testPut() throws Exception { + "(6, 3506, 'stave')") .await(); - CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); + CloseableIterator rowIter = tEnv.executeSql("select * from put_sink_1").collect(); List expectedRows = Arrays.asList( "+I[1, 3501, Tim]", "+I[2, 3502, Fabian]", @@ -308,13 +312,16 @@ void testPut() throws Exception { @Test void testPartialUpsert() throws Exception { tEnv.executeSql( - "create table sink_test (a int not null primary key not enforced, b bigint, c string) with('bucket.num' = '3')"); + "create table partial_upsert_sink_1 (a int not null primary key not enforced, b bigint, c string) with('bucket.num' = '3')"); // partial insert - tEnv.executeSql("INSERT INTO sink_test(a, b) VALUES (1, 111), (2, 222)").await(); - tEnv.executeSql("INSERT INTO sink_test(c, a) VALUES ('c1', 1), ('c2', 2)").await(); + tEnv.executeSql("INSERT INTO partial_upsert_sink_1(a, b) VALUES (1, 111), (2, 222)") + .await(); + tEnv.executeSql("INSERT INTO partial_upsert_sink_1(c, a) VALUES ('c1', 1), ('c2', 2)") + .await(); - CloseableIterator rowIter = tEnv.executeSql("select * from sink_test").collect(); + CloseableIterator rowIter = + tEnv.executeSql("select * from partial_upsert_sink_1").collect(); List expectedRows = Arrays.asList( @@ -344,14 +351,16 @@ void testPartialUpsert() throws Exception { tEnv.createTemporaryView("changeLog", changeLogTable); // check the target fields in row 1 is set to null - tEnv.executeSql("INSERT INTO sink_test(a, b) SELECT f0, f1 FROM changeLog").await(); + tEnv.executeSql("INSERT INTO partial_upsert_sink_1(a, b) SELECT f0, f1 FROM changeLog") + .await(); expectedRows = Arrays.asList( "-U[1, 111, c1]", "+U[1, 333, c1]", "-U[1, 333, c1]", "+U[1, null, c1]"); assertResultsIgnoreOrder(rowIter, expectedRows, false); // check the row 1 will be deleted finally since all the fields in the row are set to null - tEnv.executeSql("INSERT INTO sink_test(a, c) SELECT f0, f2 FROM changeLog").await(); + tEnv.executeSql("INSERT INTO partial_upsert_sink_1(a, c) SELECT f0, f2 FROM changeLog") + .await(); expectedRows = Arrays.asList("-U[1, null, c1]", "+U[1, null, c11]", "-D[1, null, c11]"); assertResultsIgnoreOrder(rowIter, expectedRows, true); } @@ -641,7 +650,7 @@ void testUnsupportedDeleteAndUpdateStmtOnLogTable(boolean isPartitionedTable) { @Test void testUnsupportedDeleteAndUpdateStmtOnPartialPK() { // test primary-key table - String t1 = "t1"; + String t1 = "unsupported_delete_and_update_t1"; tBatchEnv.executeSql( String.format( "create table %s (" @@ -676,7 +685,7 @@ void testUnsupportedDeleteAndUpdateStmtOnPartialPK() { "Currently, Fluss table only supports UPDATE statement with conditions on primary key."); // test partitioned primary-key table - String t2 = "t2"; + String t2 = "unsupported_delete_and_update_t2"; tBatchEnv.executeSql( String.format( "create table %s (" diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java index cc010b10c..85a9fd805 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceBatchITCase.java @@ -92,7 +92,7 @@ void after() { @Test void testScanSingleRowFilter() throws Exception { - String tableName = prepareSourceTable(new String[] {"name", "id"}, null); + String tableName = prepareSourceTable("scan_single_1", new String[] {"name", "id"}, null); String query = String.format("SELECT * FROM %s WHERE id = 1 AND name = 'name1'", tableName); assertThat(tEnv.explainSql(query)) @@ -109,7 +109,7 @@ void testScanSingleRowFilter() throws Exception { @Test void testScanSingleRowFilter2() throws Exception { - String tableName = prepareSourceTable(new String[] {"id", "name"}, null); + String tableName = prepareSourceTable("scan_single_2", new String[] {"id", "name"}, null); String query = String.format("SELECT * FROM %s WHERE id = 1 AND name = 'name1'", tableName); assertThat(tEnv.explainSql(query)) @@ -126,7 +126,7 @@ void testScanSingleRowFilter2() throws Exception { @Test void testScanSingleRowFilter3() throws Exception { - String tableName = prepareSourceTable(new String[] {"id"}, null); + String tableName = prepareSourceTable("scan_single_3", new String[] {"id"}, null); String query = String.format("SELECT id,name FROM %s WHERE id = 1", tableName); assertThat(tEnv.explainSql(query)) @@ -143,7 +143,7 @@ void testScanSingleRowFilter3() throws Exception { @Test void testScanSingleRowFilterOnPartitionedTable() throws Exception { - String tableName = prepareSourceTable(new String[] {"id", "dt"}, "dt"); + String tableName = prepareSourceTable("scan_single_4", new String[] {"id", "dt"}, "dt"); TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); Map partitionNameById = waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath); @@ -169,7 +169,7 @@ void testScanSingleRowFilterOnPartitionedTable() throws Exception { @Test void testScanSingleRowFilterException() throws Exception { - String tableName = prepareSourceTable(new String[] {"id", "name"}, null); + String tableName = prepareSourceTable("scan_single_5", new String[] {"id", "name"}, null); String query = String.format("SELECT * FROM %s WHERE id = 1", tableName); // doesn't have all condition for primary key, doesn't support to execute @@ -182,7 +182,8 @@ void testScanSingleRowFilterException() throws Exception { @Test void testLakeTableQueryOnLakeDisabledTable() throws Exception { - String tableName = prepareSourceTable(new String[] {"id", "name"}, null); + String tableName = + prepareSourceTable("lake_table_query_1", new String[] {"id", "name"}, null); assertThatThrownBy(() -> tEnv.executeSql(String.format("SELECT * FROM %s$lake", tableName))) .cause() .cause() @@ -194,7 +195,7 @@ void testLakeTableQueryOnLakeDisabledTable() throws Exception { @Test void testLimitPrimaryTableScan() throws Exception { - String tableName = prepareSourceTable(new String[] {"id"}, null); + String tableName = prepareSourceTable("limit_primary_1", new String[] {"id"}, null); // normal scan String query = String.format("SELECT * FROM %s limit 2", tableName); CloseableIterator iterRows = tEnv.executeSql(query).collect(); @@ -311,7 +312,9 @@ void testCountPushDown(boolean partitionTable) throws Exception { "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); // test not support primary key now - String primaryTableName = prepareSourceTable(new String[] {"id"}, null); + String primaryTableName = + prepareSourceTable( + "count_push_down_is_partition" + partitionTable, new String[] {"id"}, null); assertThatThrownBy( () -> tEnv.explainSql( @@ -323,9 +326,11 @@ void testCountPushDown(boolean partitionTable) throws Exception { "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); } - private String prepareSourceTable(String[] keys, String partitionedKey) throws Exception { + private String prepareSourceTable(String namePrefix, String[] keys, String partitionedKey) + throws Exception { String tableName = - String.format("test_%s_%s", String.join("_", keys), RandomUtils.nextInt()); + String.format( + "%s_test_%s_%s", namePrefix, String.join("_", keys), RandomUtils.nextInt()); if (partitionedKey == null) { tEnv.executeSql( String.format( diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java index c88d5635d..6faf05c6d 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/FlinkTableSourceITCase.java @@ -196,7 +196,7 @@ void testNonPkTableRead() throws Exception { void testTableProjectPushDown(String mode) throws Exception { boolean isPkTable = mode.startsWith("PK"); boolean testPkLog = mode.equals("PK_LOG"); - String tableName = "table_" + mode; + String tableName = "table_projection_push_down" + mode; String pkDDL = isPkTable ? ", primary key (a) not enforced" : ""; tEnv.executeSql( String.format( @@ -311,7 +311,9 @@ void testPkTableReadMixSnapshotAndLog() throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws Exception { - String tableName = "tab1_" + (isPartitioned ? "partitioned" : "non_partitioned"); + String tableName = + "tab1_read_with_different_mode_" + + (isPartitioned ? "partitioned" : "non_partitioned"); String partitionName = null; TablePath tablePath = TablePath.of(DEFAULT_DB, tableName); if (!isPartitioned) { @@ -543,10 +545,10 @@ void testReadPrimaryKeyPartitionedTable() throws Exception { new DataField("b", com.alibaba.fluss.types.DataTypes.STRING()), new DataField("c", com.alibaba.fluss.types.DataTypes.STRING())); tEnv.executeSql( - "create table partitioned_table" + "create table primary_key_partitioned_table_1" + " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) " + "with ('table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year')"); - TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table"); + TablePath tablePath = TablePath.of(DEFAULT_DB, "primary_key_partitioned_table_1"); // write data into partitions and wait snapshot is done Map partitionNameById = @@ -556,7 +558,7 @@ void testReadPrimaryKeyPartitionedTable() throws Exception { waitUtilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values()); org.apache.flink.util.CloseableIterator rowIter = - tEnv.executeSql("select * from partitioned_table").collect(); + tEnv.executeSql("select * from primary_key_partitioned_table_1").collect(); assertResultsIgnoreOrder(rowIter, expectedRowValues, false); // then create some new partitions, and write rows to the new partitions @@ -584,7 +586,9 @@ private static Stream lookupArgs() { @ParameterizedTest @MethodSource("lookupArgs") void testLookup1PkTable(Caching caching, boolean async) throws Exception { - String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null); + String dim = + prepareDimTableAndSourceTable( + caching, async, new String[] {"id"}, null, "t_lookup1"); String dimJoinQuery = String.format( "SELECT a, c, h.name FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -603,7 +607,9 @@ void testLookup1PkTable(Caching caching, boolean async) throws Exception { @ParameterizedTest @MethodSource("lookupArgs") void testLookup1PkTableWith2Conditions(Caching caching, boolean async) throws Exception { - String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null); + String dim = + prepareDimTableAndSourceTable( + caching, async, new String[] {"id"}, null, "t_lookup2"); String dimJoinQuery = String.format( "SELECT a, b, h.name FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -622,7 +628,9 @@ void testLookup1PkTableWith2Conditions(Caching caching, boolean async) throws Ex @ParameterizedTest @MethodSource("lookupArgs") void testLookup1PkTableWith3Conditions(Caching caching, boolean async) throws Exception { - String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, null); + String dim = + prepareDimTableAndSourceTable( + caching, async, new String[] {"id"}, null, "t_lookup3"); String dimJoinQuery = String.format( "SELECT a, b, c, h.address FROM src LEFT JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -644,7 +652,8 @@ void testLookup1PkTableWith3Conditions(Caching caching, boolean async) throws Ex @MethodSource("lookupArgs") void testLookup2PkTable(Caching caching, boolean async) throws Exception { String dim = - prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null); + prepareDimTableAndSourceTable( + caching, async, new String[] {"id", "name"}, null, "t_lookup4"); String dimJoinQuery = String.format( "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -665,7 +674,8 @@ void testLookup2PkTable(Caching caching, boolean async) throws Exception { void testLookup2PkTableWithUnorderedKey(Caching caching, boolean async) throws Exception { // the primary key is (name, id) but the schema order is (id, name) String dim = - prepareDimTableAndSourceTable(caching, async, new String[] {"name", "id"}, null); + prepareDimTableAndSourceTable( + caching, async, new String[] {"name", "id"}, null, "t_lookup5"); String dimJoinQuery = String.format( "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -685,7 +695,8 @@ void testLookup2PkTableWithUnorderedKey(Caching caching, boolean async) throws E @MethodSource("lookupArgs") void testLookup2PkTableWith1KeyInCondition(Caching caching, boolean async) throws Exception { String dim = - prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null); + prepareDimTableAndSourceTable( + caching, async, new String[] {"id", "name"}, null, "t_lookup6"); String dimJoinQuery = String.format( "SELECT a, b, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -706,7 +717,8 @@ void testLookup2PkTableWith1KeyInCondition(Caching caching, boolean async) throw @MethodSource("lookupArgs") void testLookup2PkTableWith3Conditions(Caching caching, boolean async) throws Exception { String dim = - prepareDimTableAndSourceTable(caching, async, new String[] {"id", "name"}, null); + prepareDimTableAndSourceTable( + caching, async, new String[] {"id", "name"}, null, "t_lookup7"); String dimJoinQuery = String.format( "SELECT a, h.name, h.address FROM src JOIN %s FOR SYSTEM_TIME AS OF src.proc as h" @@ -721,7 +733,9 @@ void testLookup2PkTableWith3Conditions(Caching caching, boolean async) throws Ex @ParameterizedTest @MethodSource("lookupArgs") void testLookupPartitionedTable(Caching caching, boolean async) throws Exception { - String dim = prepareDimTableAndSourceTable(caching, async, new String[] {"id"}, "p_date"); + String dim = + prepareDimTableAndSourceTable( + caching, async, new String[] {"id"}, "p_date", "t_lookup8"); String dimJoinQuery = String.format( @@ -766,7 +780,11 @@ private InternalRow genRow(boolean isPkTable, RowType rowType, Object[] objects) * @return the table name of the dim table */ private String prepareDimTableAndSourceTable( - Caching caching, boolean async, String[] keys, @Nullable String partitionedKey) + Caching caching, + boolean async, + String[] keys, + @Nullable String partitionedKey, + String tablePrefix) throws Exception { String options = async ? "'lookup.async' = 'true'" : "'lookup.async' = 'false'"; if (caching == Caching.ENABLE_CACHE) { @@ -779,7 +797,8 @@ private String prepareDimTableAndSourceTable( // create dim table String tableName = String.format( - "lookup_test_%s_%s_pk_%s_%s", + "%s_lookup_test_%s_%s_pk_%s_%s", + tablePrefix, caching.name().toLowerCase(), async ? "async" : "sync", String.join("_", keys), diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumeratorTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumeratorTest.java index e01061c86..516720ead 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumeratorTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/enumerator/FlinkSourceEnumeratorTest.java @@ -82,14 +82,15 @@ protected static void beforeAll() { @Test void testPkTableNoSnapshotSplits() throws Throwable { - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + TablePath tablePath = TablePath.of(DEFAULT_DB, "pk_table_no_snapshot_splits_t1"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); int numSubtasks = 3; // test get snapshot split & log split and the assignment try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, true, false, @@ -124,18 +125,19 @@ void testPkTableNoSnapshotSplits() throws Throwable { @Test void testPkTableWithSnapshotSplits() throws Throwable { - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + TablePath tablePath = TablePath.of(DEFAULT_DB, "pk_table_with_snapshot_splits_t1"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); int numSubtasks = 5; // write data and wait snapshot finish to make sure // we can hava snapshot split - Map bucketIdToNumRecords = putRows(DEFAULT_TABLE_PATH, 10); + Map bucketIdToNumRecords = putRows(tablePath, 10); waitUntilSnapshot(tableId, 0); try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, true, false, @@ -205,7 +207,7 @@ void testNonPkTable() throws Throwable { .distributedBy(DEFAULT_BUCKET_NUM, "id") .build(); - TablePath path1 = TablePath.of(DEFAULT_DB, "test-non-pk-table"); + TablePath path1 = TablePath.of(DEFAULT_DB, "test-non-pk-table_t1"); admin.createTable(path1, nonPkTableDescriptor, true).get(); long tableId = admin.getTable(path1).get().getTableId(); @@ -252,14 +254,16 @@ void testNonPkTable() throws Throwable { @Test void testReaderRegistrationTriggerAssignments() throws Throwable { - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + TablePath tablePath = + TablePath.of(DEFAULT_DB, "test_reader_registration_trigger_assignments_t1"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); int numSubtasks = 3; // test get snapshot split & log split and the assignment try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, true, false, @@ -287,14 +291,15 @@ void testReaderRegistrationTriggerAssignments() throws Throwable { @Test void testAddSplitBack() throws Throwable { - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + TablePath tablePath = TablePath.of(DEFAULT_DB, "test_add_split_back_t1"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); int numSubtasks = 3; // test get snapshot split & log split and the assignment try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks)) { FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, true, false, @@ -337,7 +342,8 @@ void testAddSplitBack() throws Throwable { @Test void testRestore() throws Throwable { - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + TablePath tablePath = TablePath.of(DEFAULT_DB, "test_restore_t1"); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); int numSubtasks = 3; // test get snapshot split & log split and the assignment try (MockSplitEnumeratorContext context = @@ -350,7 +356,7 @@ void testRestore() throws Throwable { // mock restore with assigned buckets FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, false, false, @@ -388,13 +394,17 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa isPrimaryKeyTable ? DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR : DEFAULT_AUTO_PARTITIONED_LOG_TABLE_DESCRIPTOR; - long tableId = createTable(DEFAULT_TABLE_PATH, tableDescriptor); + TablePath tablePath = + TablePath.of( + DEFAULT_DB, + "test_discover_partitions_periodically_t1_" + isPrimaryKeyTable); + long tableId = createTable(tablePath, tableDescriptor); ZooKeeperClient zooKeeperClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks); FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, isPrimaryKeyTable, true, @@ -402,8 +412,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa OffsetsInitializer.initial(), DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS, streaming)) { - Map partitionNameByIds = - waitUntilPartitions(zooKeeperClient, DEFAULT_TABLE_PATH); + Map partitionNameByIds = waitUntilPartitions(zooKeeperClient, tablePath); enumerator.start(); // register all readers for (int i = 0; i < numSubtasks; i++) { @@ -425,7 +434,7 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa List newPartitions = Arrays.asList("newPartition1", "newPartition2"); Map newPartitionNameIds = - createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions); + createPartitions(zooKeeperClient, tablePath, newPartitions); /// invoke partition discovery callable again and there should assignments. runPeriodicPartitionDiscovery(context); @@ -439,9 +448,8 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa Map expectedRemovedPartitions = newPartitionNameIds; newPartitions = Collections.singletonList("newPartition3"); - dropPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, dropPartitions); - newPartitionNameIds = - createPartitions(zooKeeperClient, DEFAULT_TABLE_PATH, newPartitions); + dropPartitions(zooKeeperClient, tablePath, dropPartitions); + newPartitionNameIds = createPartitions(zooKeeperClient, tablePath, newPartitions); // invoke partition discovery callable again runPeriodicPartitionDiscovery(context); @@ -496,13 +504,14 @@ void testDiscoverPartitionsPeriodically(boolean isPrimaryKeyTable) throws Throwa @Test void testGetSplitOwner() throws Exception { + TablePath tablePath = TablePath.of(DEFAULT_DB, "test_get_split_owner_t1"); int numSubtasks = 3; - long tableId = createTable(DEFAULT_TABLE_PATH, DEFAULT_PK_TABLE_DESCRIPTOR); + long tableId = createTable(tablePath, DEFAULT_PK_TABLE_DESCRIPTOR); try (MockSplitEnumeratorContext context = new MockSplitEnumeratorContext<>(numSubtasks); FlinkSourceEnumerator enumerator = new FlinkSourceEnumerator( - DEFAULT_TABLE_PATH, + tablePath, flussConf, false, true, diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReaderTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReaderTest.java index 459149eeb..d29e103b0 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReaderTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceReaderTest.java @@ -56,7 +56,7 @@ class FlinkSourceReaderTest extends FlinkTestBase { @Test void testHandlePartitionsRemovedEvent() throws Exception { - TablePath tablePath = TablePath.of(DEFAULT_DB, "test_partitioned_table"); + TablePath tablePath = TablePath.of(DEFAULT_DB, "test_handle_partitions_remove_event_t1"); TableDescriptor tableDescriptor = DEFAULT_AUTO_PARTITIONED_PK_TABLE_DESCRIPTOR; long tableId = createTable(tablePath, tableDescriptor); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java index 355ea1439..98d419178 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/reader/FlinkSourceSplitReaderTest.java @@ -68,7 +68,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase { @Test void testSanityCheck() throws Exception { - TablePath tablePath1 = TablePath.of(DEFAULT_DB, "test1"); + TablePath tablePath1 = TablePath.of(DEFAULT_DB, "test_sanity_check_t1"); Schema schema1 = Schema.newBuilder() .primaryKey("id") diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java index 559f33bb6..18fb9e5b8 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/source/testutils/FlinkTestBase.java @@ -126,9 +126,6 @@ public class FlinkTestBase { protected static final String DEFAULT_DB = "test-flink-db"; - protected static final TablePath DEFAULT_TABLE_PATH = - TablePath.of(DEFAULT_DB, "test-flink-table"); - protected static Connection conn; protected static Admin admin; diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java index 4bb0f0be3..722b1ee20 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadLogTableITCase.java @@ -56,7 +56,8 @@ void testReadLogTable(boolean isPartitioned) throws Exception { builder.build(); JobClient jobClient = execEnv.executeAsync(); - String tableName = "logTable_" + (isPartitioned ? "partitioned" : "non_partitioned"); + String tableName = + "union_read_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned"); TablePath t1 = TablePath.of(DEFAULT_DB, tableName); List writtenRows = new ArrayList<>(); diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java index 033d33485..03de748e9 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java @@ -65,7 +65,8 @@ void testPrimaryKeyTable(boolean isPartitioned) throws Exception { builder.build(); JobClient jobClient = execEnv.executeAsync(); - String tableName = "pkTable_" + (isPartitioned ? "partitioned" : "non_partitioned"); + String tableName = + "union_read_test_pkTable_" + (isPartitioned ? "partitioned" : "non_partitioned"); TablePath t1 = TablePath.of(DEFAULT_DB, tableName); Map bucketLogEndOffset = new HashMap<>(); diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java index fb50c486c..1c144ebf4 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/flink/LakeTableEnumeratorTest.java @@ -70,7 +70,7 @@ void testLogTableEnumerator() throws Throwable { builder.build(); JobClient jobClient = execEnv.executeAsync(); - TablePath t1 = TablePath.of(DEFAULT_DB, "logTable"); + TablePath t1 = TablePath.of(DEFAULT_DB, "lake_table_enumerator_test_logTable"); Map bucketLogEndOffset = new HashMap<>(); long t1Id = prepareLogTable(t1, DEFAULT_BUCKET_NUM, bucketLogEndOffset); @@ -140,7 +140,7 @@ void testLogTableEnumerator() throws Throwable { @Test void testPrimaryKeyTableEnumerator() throws Throwable { - TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable"); + TablePath t1 = TablePath.of(DEFAULT_DB, "lake_table_enumerator_test_pkTable"); PaimonDataBaseSyncSinkBuilder builder = getDatabaseSyncSinkBuilder(execEnv); builder.build(); JobClient jobClient = execEnv.executeAsync(); diff --git a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/sink/PaimonSyncITCase.java b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/sink/PaimonSyncITCase.java index 89e34b63e..a35d0dd0a 100644 --- a/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/sink/PaimonSyncITCase.java +++ b/fluss-lakehouse/fluss-lakehouse-paimon/src/test/java/com/alibaba/fluss/lakehouse/paimon/sink/PaimonSyncITCase.java @@ -70,7 +70,7 @@ protected static void beforeAll() { @Test void testDatabaseSync() throws Exception { // create a pk table, write some records and wait until snapshot finished - TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable"); + TablePath t1 = TablePath.of(DEFAULT_DB, "paimon_sync_test_pkTable"); long t1Id = createPkTable(t1); TableBucket t1Bucket = new TableBucket(t1Id, 0); // write records @@ -134,7 +134,8 @@ void testDatabaseSync() throws Exception { checkDataInPaimonPrimayKeyTable(t1, rows); // then create partitioned table and wait partitions are ready - TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, "partitionedTable"); + TablePath partitionedTablePath = + TablePath.of(DEFAULT_DB, "paimon_sync_test_partitionedTable"); Tuple2 tableIdAndDescriptor = createPartitionedTable(partitionedTablePath); Map partitionNameByIds = diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java index 21886e74e..124212178 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java @@ -318,7 +318,10 @@ private CompletableFuture doSend( return responseFuture; } - connectionMetricGroup.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize()); + if (connectionMetricGroup != null && rawRequest != null) { + connectionMetricGroup.updateMetricsBeforeSendRequest( + apiKey, rawRequest.totalSize()); + } channel.writeAndFlush(byteBuf) .addListener( diff --git a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java index 1139be256..566ad2b65 100644 --- a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java +++ b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/protocol/RequestChannelTest.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.shaded.netty4.io.netty.buffer.EmptyByteBuf; import com.alibaba.fluss.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -34,6 +35,7 @@ public class RequestChannelTest { @Test + @Disabled("TODO, fix me in #116") void testRequestPriority() throws Exception { RequestChannel channel = new RequestChannel(100); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index cc360519b..fc9dc5592 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -424,6 +424,9 @@ public void flush(long exclusiveUpToLogOffset, FatalErrorHandler fatalErrorHandl kvPreWriteBuffer.flush(exclusiveUpToLogOffset); flushedLogOffset = exclusiveUpToLogOffset; } catch (Throwable t) { + LOG.error( + "Failed to flush kv pre-write buffer. There maybe a data lost risk.", + t); fatalErrorHandler.onFatalError( new KvStorageException("Failed to flush kv pre-write buffer.")); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java index 6dc84d659..ec2f121fb 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java @@ -317,7 +317,7 @@ private TableInfo createPartitionedTable(AutoPartitionTimeUnit timeUnit) throws .primaryKey("id", "dt") .build()) .comment("partitioned table") - .distributedBy(16) + .distributedBy(3) .partitionedBy("dt") .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) .property(ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, timeUnit) diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/NotifyLeaderAndIsrITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/NotifyLeaderAndIsrITCase.java index 20933fa42..46189f300 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/NotifyLeaderAndIsrITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/NotifyLeaderAndIsrITCase.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.replica.Replica; import com.alibaba.fluss.server.replica.ReplicaManager; import com.alibaba.fluss.server.tablet.TabletServer; @@ -35,7 +36,6 @@ import java.util.stream.Collectors; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue; import static org.assertj.core.api.Assertions.assertThat; @@ -58,11 +58,10 @@ void beforeEach() { @Test void testNotifyLeaderAndIsr() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_notify_leader_and_isr_t1"); long tableId = RpcMessageTestUtils.createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, - DATA1_TABLE_INFO.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_INFO.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); LeaderAndIsr leaderAndIsr = diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java index 46b903b64..b8c22de81 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java @@ -161,7 +161,7 @@ void testCreateInvalidDatabaseAndTable() { @ValueSource(booleans = {true, false}) void testDatabaseManagement(boolean isCoordinatorServer) throws Exception { AdminReadOnlyGateway gateway = getAdminOnlyGateway(isCoordinatorServer); - String db1 = "db1"; + String db1 = "test_database_management_db1_is_coordinator_" + isCoordinatorServer; assertThat(gateway.databaseExists(newDatabaseExistsRequest(db1)).get().isExists()) .isFalse(); @@ -181,13 +181,16 @@ void testDatabaseManagement(boolean isCoordinatorServer) throws Exception { .get()) .cause() .isInstanceOf(DatabaseAlreadyExistException.class) - .hasMessageContaining("Database db1 already exists."); + .hasMessageContaining( + "Database test_database_management_db1_is_coordinator_" + + isCoordinatorServer + + " already exists."); // with ignore if exists, shouldn't throw exception again adminGateway.createDatabase(newCreateDatabaseRequest(db1, true)).get(); // create another database - String db2 = "db2"; + String db2 = "test_database_management_db2_is_coordinator_" + isCoordinatorServer; adminGateway.createDatabase(newCreateDatabaseRequest(db2, false)).get(); // list database @@ -218,7 +221,10 @@ void testDatabaseManagement(boolean isCoordinatorServer) throws Exception { .get()) .cause() .isInstanceOf(DatabaseNotExistException.class) - .hasMessageContaining("Database db1 does not exist."); + .hasMessageContaining( + "Database test_database_management_db1_is_coordinator_" + + isCoordinatorServer + + " does not exist."); } @ParameterizedTest @@ -227,8 +233,8 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception { AdminReadOnlyGateway gateway = getAdminOnlyGateway(isCoordinatorServer); AdminGateway adminGateway = getAdminGateway(); - String db1 = "db1"; - String tb1 = "tb1"; + String db1 = "table_manager_db1_is_coordinator_" + isCoordinatorServer; + String tb1 = "test_table_management_tb1_is_coordinator_" + isCoordinatorServer; TablePath tablePath = TablePath.of(db1, tb1); // first create a database adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); @@ -296,7 +302,7 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception { adminGateway.createTable(newCreateTableRequest(tablePath, tableDescriptor, true)).get(); // create another table without setting distribution - String tb2 = "tb2"; + String tb2 = "test_table_management_tb2"; TableDescriptor tableDescriptor1 = newTableWithoutSettingDistribution(); adminGateway .createTable( @@ -338,8 +344,8 @@ void testTableManagement(boolean isCoordinatorServer) throws Exception { @EnumSource(AutoPartitionTimeUnit.class) void testPartitionedTableManagement(AutoPartitionTimeUnit timeUnit) throws Exception { AdminGateway adminGateway = getAdminGateway(); - String db1 = "db1"; - String tb1 = "tb1_" + timeUnit.name(); + String db1 = "test_partition_db1_" + timeUnit.name(); + String tb1 = "test_partitioned_table_management_tb1_" + timeUnit.name(); TablePath tablePath = TablePath.of(db1, tb1); // first create a database adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); @@ -385,8 +391,8 @@ void testPartitionedTableManagement(AutoPartitionTimeUnit timeUnit) throws Excep @Test void testCreateInvalidPartitionedTable() throws Exception { AdminGateway adminGateway = getAdminGateway(); - String db1 = "db1"; - String tb1 = "tb1"; + String db1 = "test_create_invalid_db1"; + String tb1 = "test_create_invalid_partitioned_table_tb1"; TablePath tablePath = TablePath.of(db1, tb1); // first create a database adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); @@ -442,8 +448,8 @@ void testMetadata(boolean isCoordinatorServer) throws Exception { AdminReadOnlyGateway gateway = getAdminOnlyGateway(isCoordinatorServer); AdminGateway adminGateway = getAdminGateway(); - String db1 = "db1"; - String tb1 = "tb1"; + String db1 = "test_metadata_db1_is_coordinator_" + isCoordinatorServer; + String tb1 = "test_metadata_tb1_is_coordinator_" + isCoordinatorServer; TablePath tablePath = TablePath.of(db1, tb1); // first create a database adminGateway.createDatabase(newCreateDatabaseRequest(db1, false)).get(); @@ -507,8 +513,8 @@ void testMetadata(boolean isCoordinatorServer) throws Exception { void testMetadataWithPartition(boolean isCoordinatorServer) throws Exception { AdminReadOnlyGateway gateway = getAdminOnlyGateway(isCoordinatorServer); AdminGateway adminGateway = getAdminGateway(); - String db1 = "db1"; - String tb1 = "tb1"; + String db1 = "test_metadata_with_partition_db1_is_coordinator_" + isCoordinatorServer; + String tb1 = "test_metadata_with_partition_tb1_is_coordinator_" + isCoordinatorServer; // create a partitioned table, and request a not exist partition, should throw partition not // exist exception TablePath tablePath = TablePath.of(db1, tb1); @@ -564,7 +570,9 @@ void testMetadataWithPartition(boolean isCoordinatorServer) throws Exception { .cause() .isInstanceOf(PartitionNotExistException.class) .hasMessage( - "Table partition 'db1.partitioned_tb(p=not_exist_partition)' does not exist."); + "Table partition 'test_metadata_with_partition_db1_is_coordinator_" + + isCoordinatorServer + + ".partitioned_tb(p=not_exist_partition)' does not exist."); } private void checkBucketMetadata(int expectBucketCount, List bucketMetadata) { @@ -649,7 +657,7 @@ private static TableDescriptor.Builder newPartitionedTableBuilder( return TableDescriptor.builder() .schema(builder.build()) .comment("partitioned table") - .distributedBy(16) + .distributedBy(3) .partitionedBy("dt") .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED.key(), "true") .property( @@ -662,7 +670,7 @@ private static TableDescriptor newTable() { return TableDescriptor.builder() .schema(newSchema()) .comment("first table") - .distributedBy(16, "a") + .distributedBy(3, "a") .build(); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java index af1029ab2..60abd96f9 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogITCase.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.rpc.gateway.CoordinatorGateway; import com.alibaba.fluss.rpc.gateway.TabletServerGateway; import com.alibaba.fluss.server.tablet.TabletServer; @@ -37,7 +38,6 @@ import static com.alibaba.fluss.record.TestData.DATA1; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.createTable; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newDropTableRequest; @@ -57,11 +57,10 @@ public class RemoteLogITCase { @Test void testDeleteRemoteLog() throws Exception { + TablePath tablePath = new TablePath("test_db_1", "test_delete_remote_log_t1"); long tableId = createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, - DATA1_TABLE_INFO.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_INFO.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); @@ -89,7 +88,7 @@ void testDeleteRemoteLog() throws Exception { FsPath fsPath = FlussPaths.remoteLogTabletDir( tabletServer.getReplicaManager().getRemoteLogManager().remoteLogDir(), - PhysicalTablePath.of(DATA1_TABLE_PATH), + PhysicalTablePath.of(tablePath), tb); FileSystem fileSystem = fsPath.getFileSystem(); assertThat(fileSystem.exists(fsPath)).isTrue(); @@ -99,20 +98,18 @@ void testDeleteRemoteLog() throws Exception { coordinatorGateway .dropTable( newDropTableRequest( - DATA1_TABLE_PATH.getDatabaseName(), - DATA1_TABLE_PATH.getTableName(), - true)) + tablePath.getDatabaseName(), tablePath.getTableName(), true)) .get(); retry(Duration.ofMinutes(2), () -> assertThat(fileSystem.exists(fsPath)).isFalse()); } @Test void testFollowerFetchAlreadyMoveToRemoteLog() throws Exception { + TablePath tablePath = + new TablePath("test_db_1", "test_follower_fetch_already_move_to_remote_log_t1"); long tableId = createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, - DATA1_TABLE_INFO.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_INFO.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrITCase.java index 0382499ac..78029ede2 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/AdjustIsrITCase.java @@ -21,6 +21,7 @@ import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.rpc.gateway.TabletServerGateway; import com.alibaba.fluss.rpc.messages.PbProduceLogRespForBucket; import com.alibaba.fluss.rpc.messages.ProduceLogResponse; @@ -41,7 +42,6 @@ import static com.alibaba.fluss.record.TestData.DATA1; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue; @@ -66,7 +66,7 @@ void beforeEach() { @Test void testIsrShrinkAndExpend() throws Exception { - long tableId = createLogTable(); + long tableId = createLogTable(TablePath.of("test_db_1", "test_shrink_and_expend_t1")); TableBucket tb = new TableBucket(tableId, 0); LeaderAndIsr currentLeaderAndIsr = @@ -140,7 +140,7 @@ void testIsrShrinkAndExpend() throws Exception { @Test void testIsrSetSizeLessThanMinInSynReplicasNumber() throws Exception { - long tableId = createLogTable(); + long tableId = createLogTable(TablePath.of("test_db_1", "test_isr_set_size_less_t1")); TableBucket tb = new TableBucket(tableId, 0); LeaderAndIsr currentLeaderAndIsr = @@ -221,11 +221,11 @@ void testIsrSetSizeLessThanMinInSynReplicasNumber() throws Exception { .isEqualTo(0L); } - private long createLogTable() throws Exception { + private long createLogTable(TablePath tablePath) throws Exception { // Set bucket to 1 to easy for debug. TableInfo data1NonPkTableInfo = new TableInfo( - DATA1_TABLE_PATH, + tablePath, DATA1_TABLE_ID, TableDescriptor.builder() .schema(DATA1_SCHEMA) @@ -233,9 +233,7 @@ private long createLogTable() throws Exception { .build(), 1); return RpcMessageTestUtils.createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, - data1NonPkTableInfo.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, data1NonPkTableInfo.getTableDescriptor()); } private static Configuration initConfig() { diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java index 4584e6b60..7a3fa58f2 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvReplicaRestoreITCase.java @@ -80,7 +80,7 @@ void testRestore() throws Exception { int bucketNum = 3; List tableBuckets = new ArrayList<>(); for (int i = 0; i < tableNum; i++) { - TablePath tablePath = TablePath.of("test_db", "test_table_" + i); + TablePath tablePath = TablePath.of("kv_restore_test_db", "kv_restore_test_table_" + i); long tableId = createTable( FLUSS_CLUSTER_EXTENSION, diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvSnapshotITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvSnapshotITCase.java index f98e6de48..d124c0a62 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvSnapshotITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/KvSnapshotITCase.java @@ -74,7 +74,8 @@ void testKvSnapshot() throws Exception { int tableNum = 3; List tableBuckets = new ArrayList<>(); for (int i = 0; i < tableNum; i++) { - TablePath tablePath = TablePath.of("test_db", "test_table_" + i); + TablePath tablePath = + TablePath.of("kv_snapshot_test_db", "kv_snapshot_test_table_" + i); long tableId = RpcMessageTestUtils.createTable( FLUSS_CLUSTER_EXTENSION, diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java index 7d2bb114b..411f1d534 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/ReplicaFetcherITCase.java @@ -21,6 +21,7 @@ import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.record.KvRecordBatch; import com.alibaba.fluss.record.LogRecords; import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket; @@ -53,9 +54,6 @@ import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID_PK; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static com.alibaba.fluss.record.TestData.EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK; import static com.alibaba.fluss.server.testutils.KvTestUtils.assertLookupResponse; @@ -91,10 +89,11 @@ void beforeEach() { @Test void testProduceLogNeedAck() throws Exception { + TablePath tablePath = TablePath.of("test_db1", "test_produce_log_need_ack_t1"); // set bucket count to 1 to easy for debug. TableInfo data1NonPkTableInfo = new TableInfo( - DATA1_TABLE_PATH, + tablePath, DATA1_TABLE_ID, TableDescriptor.builder() .schema(DATA1_SCHEMA) @@ -109,7 +108,7 @@ void testProduceLogNeedAck() throws Exception { long tableId = createTable( FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, + tablePath, data1NonPkTableInfo.getTableDescriptor()); int bucketId = 0; TableBucket tb = new TableBucket(tableId, bucketId); @@ -180,7 +179,8 @@ void testProduceLogNeedAck() throws Exception { @Test void testPutKvNeedAck() throws Exception { // set bucket count to 1 to easy for debug. - TableInfo data1PkTableInfo = createPkTable(); + TablePath tablePath = TablePath.of("test_db1", "test_put_kv_need_ack_t1"); + TableInfo data1PkTableInfo = createPkTable(tablePath, 150003L); // wait until all the gateway has same metadata because the follower fetcher manager need // to get the leader address from server metadata while make follower. @@ -188,9 +188,7 @@ void testPutKvNeedAck() throws Exception { long tableId = createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH_PK, - data1PkTableInfo.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, data1PkTableInfo.getTableDescriptor()); int bucketId = 0; TableBucket tb = new TableBucket(tableId, bucketId); @@ -261,14 +259,13 @@ void testPutKvNeedAck() throws Exception { @Test void testFlushForPutKvNeedAck() throws Exception { - TableInfo data1PkTableInfo = createPkTable(); + TablePath tablePath = TablePath.of("test_db1", "test_flush_for_put_kv_need_ack_t1"); + TableInfo data1PkTableInfo = createPkTable(tablePath, 150010L); // create a table and wait all replica ready long tableId = createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH_PK, - data1PkTableInfo.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, data1PkTableInfo.getTableDescriptor()); int bucketId = 0; TableBucket tb = new TableBucket(tableId, bucketId); @@ -335,10 +332,10 @@ void testFlushForPutKvNeedAck() throws Exception { } } - private TableInfo createPkTable() { + private TableInfo createPkTable(TablePath tablePath, long tableId) { return new TableInfo( - DATA1_TABLE_PATH_PK, - DATA1_TABLE_ID_PK, + tablePath, + tableId, TableDescriptor.builder().schema(DATA1_SCHEMA_PK).distributedBy(1, "a").build(), 1); } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java index af039c92f..740955e86 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java @@ -52,8 +52,6 @@ import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO_PK; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; -import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.server.testutils.KvTestUtils.assertLookupResponse; @@ -82,11 +80,10 @@ public class TabletServiceITCase { @Test void testProduceLog() throws Exception { + TablePath tablePath = new TablePath("test_db_1", "test_produce_log_t1"); long tableId = createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, - DATA1_TABLE_INFO.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_INFO.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); @@ -132,11 +129,10 @@ void testProduceLog() throws Exception { @Test void testFetchLog() throws Exception { + TablePath tablePath = new TablePath("test_db_1", "test_fetch_log_t1"); long tableId = createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, - DATA1_TABLE_INFO.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_INFO.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); @@ -244,7 +240,7 @@ void testInvalidFetchLog() throws Exception { long tableId = createTable( FLUSS_CLUSTER_EXTENSION, - TablePath.of("test_db_1", "test_indexed_table_1"), + TablePath.of("test_db_1", "test_invalid_fetch_log_indexed_table_1"), TableDescriptor.builder() .schema(DATA1_SCHEMA) .logFormat(LogFormat.INDEXED) @@ -262,15 +258,16 @@ void testInvalidFetchLog() throws Exception { 0, Errors.INVALID_COLUMN_PROJECTION.code(), "Column projection is only supported for ARROW format, " - + "but the table test_db_1.test_indexed_table_1 is INDEXED format."); + + "but the table test_db_1.test_invalid_fetch_log_indexed_table_1 is INDEXED format."); } @Test void testPutKv() throws Exception { + TablePath tablePath = new TablePath("test_db_1", "test_put_kv_t1"); long tableId = createTable( FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH_PK, + tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); @@ -306,10 +303,11 @@ void testPutKv() throws Exception { @Test void testGetKey() throws Exception { + TablePath tablePath = new TablePath("test_db_1", "test_get_key_t1"); long tableId = createTable( FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH_PK, + tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); @@ -360,10 +358,11 @@ void testGetKey() throws Exception { "Unknown table or bucket: TableBucket{tableId=10005, bucket=6}"); // Get key from a non-pk table. + TablePath logTablePath = new TablePath("test_db_1", "test_get_key_log_t1"); long logTableId = createTable( FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, + logTablePath, DATA1_TABLE_INFO.getTableDescriptor()); TableBucket logTableBucket = new TableBucket(logTableId, 0); @@ -385,10 +384,11 @@ void testGetKey() throws Exception { @Test void testLimitScanPrimaryKeyTable() throws Exception { + TablePath tablePath = new TablePath("test_db_1", "test_limit_scan_primary_key_table_1"); long tableId = createTable( FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH_PK, + tablePath, DATA1_TABLE_INFO_PK.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); @@ -420,10 +420,11 @@ void testLimitScanPrimaryKeyTable() throws Exception { @Test void testLimitScanLogTable() throws Exception { + TablePath logTablePath = new TablePath("test_db_1", "test_limit_scan_log_table_1"); long logTableId = createTable( FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, + logTablePath, DATA1_TABLE_INFO.getTableDescriptor()); TableBucket logTableBucket = new TableBucket(logTableId, 0); FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(logTableBucket); @@ -465,11 +466,10 @@ void testLimitScanLogTable() throws Exception { @Test void testListOffsets() throws Exception { + TablePath tablePath = new TablePath("test_db_1", "test_list_offsets_1"); long tableId = createTable( - FLUSS_CLUSTER_EXTENSION, - DATA1_TABLE_PATH, - DATA1_TABLE_INFO.getTableDescriptor()); + FLUSS_CLUSTER_EXTENSION, tablePath, DATA1_TABLE_INFO.getTableDescriptor()); TableBucket tb = new TableBucket(tableId, 0); FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java index e52caa6e0..4b15b5ab0 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/FlussClusterExtension.java @@ -57,6 +57,8 @@ import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.Extension; import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.nio.file.Files; @@ -70,6 +72,8 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static com.alibaba.fluss.server.utils.RpcMessageUtils.toServerNode; @@ -87,6 +91,8 @@ public final class FlussClusterExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { + private static final Logger LOG = LoggerFactory.getLogger(FlussClusterExtension.class); + public static final String BUILTIN_DATABASE = "fluss"; private static final String HOST_ADDRESS = "127.0.0.1"; @@ -195,6 +201,7 @@ public void close() throws Exception { coordinatorServer.close(); coordinatorServer = null; } + if (zooKeeperClient != null) { zooKeeperClient.close(); zooKeeperClient = null; @@ -359,9 +366,23 @@ public TabletServerGateway newTabletServerClientForNode(int serverId) { public void waitUtilAllGatewayHasSameMetadata() { for (AdminReadOnlyGateway gateway : collectAllRpcGateways()) { retry( - Duration.ofMinutes(1), + Duration.ofMinutes(2), () -> { - MetadataResponse response = gateway.metadata(new MetadataRequest()).get(); + MetadataResponse response; + long startTime = System.currentTimeMillis(); + while (true) { + try { + response = + gateway.metadata(new MetadataRequest()) + .get(20L, TimeUnit.SECONDS); + break; + } catch (TimeoutException e) { + if (System.currentTimeMillis() - startTime > 120000L) { + throw e; + } + } + } + assertThat(response.hasCoordinatorServer()).isTrue(); // check coordinator server node ServerNode coordinatorNode =