Skip to content

Commit

Permalink
[test] Fix some unstable test cases while running CI
Browse files Browse the repository at this point in the history
  • Loading branch information
swuferhong committed Dec 12, 2024
1 parent a40bf13 commit 0c6dcaf
Show file tree
Hide file tree
Showing 39 changed files with 541 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected long createTable(
return admin.getTable(tablePath).get().getTableId();
}

private static Configuration initConfig() {
public static Configuration initConfig() {
Configuration conf = new Configuration();
conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
// set a shorter interval for testing purpose
Expand All @@ -117,7 +117,7 @@ protected static LogScanner createLogScanner(Table table, int[] projectFields) {
return table.getLogScanner(new LogScan().withProjectedFields(projectFields));
}

protected static void subscribeFromBeginning(LogScanner logScanner, Table table) {
public static void subscribeFromBeginning(LogScanner logScanner, Table table) {
int bucketCount = getBucketCount(table);
for (int i = 0; i < bucketCount; i++) {
logScanner.subscribeFromBeginning(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
/** Test for {@link FlussAdmin}. */
class FlussAdminITCase extends ClientToServerITCaseBase {

protected static final TablePath DEFAULT_TABLE_PATH = TablePath.of("test_db", "person");
protected static final Schema DEFAULT_SCHEMA =
Schema.newBuilder()
.primaryKey("id")
Expand All @@ -80,26 +79,27 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA)
.comment("test table")
.distributedBy(10, "id")
.distributedBy(3, "id")
.property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1))
.customProperty("connector", "fluss")
.build();

@BeforeEach
protected void setup() throws Exception {
super.setup();
// create a default table in fluss.
createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false);
}

@Test
void testMultiClient() throws Exception {
TablePath tablePath = TablePath.of("test_db", "multi_client_test_t1");
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

Admin admin1 = conn.getAdmin();
Admin admin2 = conn.getAdmin();
assertThat(admin1).isNotSameAs(admin2);

TableInfo t1 = admin1.getTable(DEFAULT_TABLE_PATH).get();
TableInfo t2 = admin2.getTable(DEFAULT_TABLE_PATH).get();
TableInfo t1 = admin1.getTable(tablePath).get();
TableInfo t2 = admin2.getTable(tablePath).get();
assertThat(t1).isEqualTo(t2);

admin1.close();
Expand All @@ -108,14 +108,17 @@ void testMultiClient() throws Exception {

@Test
void testGetTableAndSchema() throws Exception {
SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get();
TablePath tablePath = TablePath.of("test_db", "test_get_table_and_schema");
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
assertThat(schemaInfo.getSchema()).isEqualTo(DEFAULT_SCHEMA);
assertThat(schemaInfo.getSchemaId()).isEqualTo(1);
SchemaInfo schemaInfo2 = admin.getTableSchema(DEFAULT_TABLE_PATH, 1).get();
SchemaInfo schemaInfo2 = admin.getTableSchema(tablePath, 1).get();
assertThat(schemaInfo2).isEqualTo(schemaInfo);

// get default table.
TableInfo tableInfo = admin.getTable(DEFAULT_TABLE_PATH).get();
TableInfo tableInfo = admin.getTable(tablePath).get();
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR);

Expand Down Expand Up @@ -157,8 +160,11 @@ void testCreateInvalidDatabaseAndTable() {
}

@Test
void testCreateTableWithInvalidProperty() {
TablePath tablePath = TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test_property");
void testCreateTableWithInvalidProperty() throws Exception {
TablePath tablePath = TablePath.of("test_db", "test_create_table_with_property");
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

TablePath tablePath1 = TablePath.of("test_db", "test_property");
TableDescriptor t1 =
TableDescriptor.builder()
.schema(DEFAULT_SCHEMA)
Expand All @@ -167,7 +173,7 @@ void testCreateTableWithInvalidProperty() {
.property("connector", "fluss")
.build();
// should throw exception
assertThatThrownBy(() -> admin.createTable(tablePath, t1, false).get())
assertThatThrownBy(() -> admin.createTable(tablePath1, t1, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining("'connector' is not a Fluss table property.");
Expand All @@ -180,7 +186,7 @@ void testCreateTableWithInvalidProperty() {
.property("table.log.ttl", "unknown")
.build();
// should throw exception
assertThatThrownBy(() -> admin.createTable(tablePath, t2, false).get())
assertThatThrownBy(() -> admin.createTable(tablePath1, t2, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessageContaining(
Expand All @@ -194,15 +200,21 @@ void testCreateTableWithInvalidProperty() {
.property(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), "0")
.build();
// should throw exception
assertThatThrownBy(() -> admin.createTable(tablePath, t3, false).get())
assertThatThrownBy(() -> admin.createTable(tablePath1, t3, false).get())
.cause()
.isInstanceOf(InvalidConfigException.class)
.hasMessage("'table.log.tiered.local-segments' must be greater than 0.");
}

@Test
void testCreateTableWithInvalidReplicationFactor() throws Exception {
TablePath tablePath = TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "t1");
TablePath tablePath =
TablePath.of("test_db", "test_create_table_with_invalid_replication_factor_t1");
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

TablePath tablePath1 =
TablePath.of("test_db", "test_create_table_with_invalid_replication_factor_t2");

// set replica factor to a non positive number, should also throw exception
TableDescriptor nonPositiveReplicaFactorTable =
TableDescriptor.builder()
Expand All @@ -214,7 +226,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
// should throw exception
assertThatThrownBy(
() ->
admin.createTable(tablePath, nonPositiveReplicaFactorTable, false)
admin.createTable(tablePath1, nonPositiveReplicaFactorTable, false)
.get())
.cause()
.isInstanceOf(InvalidReplicationFactorException.class)
Expand All @@ -234,7 +246,7 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
.customProperty("connector", "fluss")
.property(ConfigOptions.TABLE_REPLICATION_FACTOR.key(), "3")
.build();
assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get())
assertThatThrownBy(() -> admin.createTable(tablePath1, tableDescriptor, false).get())
.cause()
.isInstanceOf(InvalidReplicationFactorException.class)
.hasMessageContaining(
Expand All @@ -248,18 +260,21 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
FLUSS_CLUSTER_EXTENSION.waitUtilAllGatewayHasSameMetadata();

// we can create the table now
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();
TableInfo tableInfo = admin.getTable(DEFAULT_TABLE_PATH).get();
admin.createTable(tablePath1, DEFAULT_TABLE_DESCRIPTOR, false).get();
TableInfo tableInfo = admin.getTable(tablePath).get();
assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR);
}

@Test
void testCreateExistedTable() throws Exception {
assertThatThrownBy(() -> createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false))
TablePath tablePath = TablePath.of("test_db", "test_create_table_existed_t1");
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

assertThatThrownBy(() -> createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false))
.cause()
.isInstanceOf(DatabaseAlreadyExistException.class);
// no exception
createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, true);
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true);

// database not exists, throw exception
assertThatThrownBy(
Expand All @@ -275,6 +290,9 @@ void testCreateExistedTable() throws Exception {

@Test
void testDropDatabaseAndTable() throws Exception {
TablePath tablePath = TablePath.of("test_db", "test_drop_database_and_table_t1");
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

// drop not existed database with ignoreIfNotExists false.
assertThatThrownBy(() -> admin.deleteDatabase("unknown_db", false, true).get())
.cause()
Expand All @@ -294,7 +312,7 @@ void testDropDatabaseAndTable() throws Exception {
assertThat(admin.databaseExists("test_db").get()).isFalse();

// re-create.
createTable(DEFAULT_TABLE_PATH, DEFAULT_TABLE_DESCRIPTOR, false);
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

// drop not existed table with ignoreIfNotExists false.
assertThatThrownBy(
Expand All @@ -308,22 +326,35 @@ void testDropDatabaseAndTable() throws Exception {
admin.deleteTable(TablePath.of("test_db", "unknown_table"), true).get();

// drop existed table.
assertThat(admin.tableExists(DEFAULT_TABLE_PATH).get()).isTrue();
admin.deleteTable(DEFAULT_TABLE_PATH, true).get();
assertThat(admin.tableExists(DEFAULT_TABLE_PATH).get()).isFalse();
assertThat(admin.tableExists(tablePath).get()).isTrue();
admin.deleteTable(tablePath, true).get();
assertThat(admin.tableExists(tablePath).get()).isFalse();
}

@Test
void testListDatabasesAndTables() throws Exception {
TablePath tablePath = TablePath.of("test_db", "test_drop_database_and_table");
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

admin.createDatabase("db1", true).get();
admin.createDatabase("db2", true).get();
admin.createDatabase("db3", true).get();
assertThat(admin.listDatabases().get())
.containsExactlyInAnyOrder("test_db", "db1", "db2", "db3", "fluss");

admin.createTable(TablePath.of("db1", "table1"), DEFAULT_TABLE_DESCRIPTOR, true).get();
admin.createTable(TablePath.of("db1", "table2"), DEFAULT_TABLE_DESCRIPTOR, true).get();
assertThat(admin.listTables("db1").get()).containsExactlyInAnyOrder("table1", "table2");
admin.createTable(
TablePath.of("db1", "list_database_and_table_t1"),
DEFAULT_TABLE_DESCRIPTOR,
true)
.get();
admin.createTable(
TablePath.of("db1", "list_database_and_table_t2"),
DEFAULT_TABLE_DESCRIPTOR,
true)
.get();
assertThat(admin.listTables("db1").get())
.containsExactlyInAnyOrder(
"list_database_and_table_t1", "list_database_and_table_t2");
assertThat(admin.listTables("db2").get()).isEmpty();

assertThatThrownBy(() -> admin.listTables("unknown_db").get())
Expand All @@ -333,8 +364,10 @@ void testListDatabasesAndTables() throws Exception {

@Test
void testListPartitionInfos() throws Exception {
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();
String dbName = "test_db";
TablePath nonPartitionedTablePath = TablePath.of(dbName, "test_non_partitioned_table");
createTable(nonPartitionedTablePath, DEFAULT_TABLE_DESCRIPTOR, false);

admin.createTable(nonPartitionedTablePath, DEFAULT_TABLE_DESCRIPTOR, true).get();
assertThatThrownBy(() -> admin.listPartitionInfos(nonPartitionedTablePath).get())
.cause()
Expand All @@ -350,7 +383,7 @@ void testListPartitionInfos() throws Exception {
.column("pt", DataTypes.STRING())
.build())
.comment("test table")
.distributedBy(10, "id")
.distributedBy(3, "id")
.partitionedBy("pt")
.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true)
.property(
Expand All @@ -372,8 +405,11 @@ void testListPartitionInfos() throws Exception {

@Test
void testGetKvSnapshot() throws Exception {
TablePath tablePath1 =
TablePath.of(DEFAULT_TABLE_PATH.getDatabaseName(), "test-table-snapshot");
TablePath tablePath = TablePath.of("test_db", "test-table-snapshot_t1");
createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);

TablePath tablePath1 = TablePath.of("test_db", "test-table-snapshot_t2");

int bucketNum = 3;
TableDescriptor tableDescriptor =
TableDescriptor.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.alibaba.fluss.types.DataTypes;
import com.alibaba.fluss.types.RowType;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -46,7 +47,6 @@
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
Expand All @@ -55,14 +55,20 @@
/** ITCase for {@link FlussLogScanner}. */
public class FlussLogScannerITCase extends ClientToServerITCaseBase {

@BeforeEach
protected void setup() throws Exception {
super.setup();
}

@Test
void testPoll() throws Exception {
createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false);
TablePath tablePath = new TablePath("test_db_1", "test_poll_t1");
createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false);

// append a batch of data.
int recordSize = 10;
List<IndexedRow> expectedRows = new ArrayList<>();
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
try (Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.getAppendWriter();
for (int i = 0; i < recordSize; i++) {
IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"});
Expand All @@ -89,16 +95,18 @@ void testPoll() throws Exception {

@Test
void testPollWhileCreateTableNotReady() throws Exception {
TablePath tablePath =
new TablePath("test_db_1", "test_poll_while_create_table_not_ready_t1");
// create one table with 100 buckets.
int bucketNumber = 100;
TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(DATA1_SCHEMA).distributedBy(bucketNumber).build();
createTable(DATA1_TABLE_PATH, tableDescriptor, false);
createTable(tablePath, tableDescriptor, false);

// append a batch of data.
int recordSize = 10;
List<IndexedRow> expectedRows = new ArrayList<>();
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
try (Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.getAppendWriter();
for (int i = 0; i < recordSize; i++) {
IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"});
Expand All @@ -125,12 +133,13 @@ void testPollWhileCreateTableNotReady() throws Exception {

@Test
void testLogScannerMultiThreadAccess() throws Exception {
createTable(DATA1_TABLE_PATH, DATA1_TABLE_INFO.getTableDescriptor(), false);
TablePath tablePath = new TablePath("test_db_1", "test_log_scanner_multi_thread_access_t1");
createTable(tablePath, DATA1_TABLE_INFO.getTableDescriptor(), false);

// append a batch of data.
int recordSize = 10;
List<IndexedRow> expectedRows = new ArrayList<>();
try (Table table = conn.getTable(DATA1_TABLE_PATH)) {
try (Table table = conn.getTable(tablePath)) {
AppendWriter appendWriter = table.getAppendWriter();
for (int i = 0; i < recordSize; i++) {
IndexedRow row = row(DATA1_ROW_TYPE, new Object[] {i, "a"});
Expand Down Expand Up @@ -166,7 +175,7 @@ void testLogScannerMultiThreadAccess() throws Exception {
@Test
void testLogHeavyWriteAndScan() throws Exception {
final String db = "db";
final String tbl = "kv_heavy_table";
final String tbl = "log_heavy_table_and_scan";
// create table
TableDescriptor descriptor =
TableDescriptor.builder()
Expand Down Expand Up @@ -220,7 +229,7 @@ void testLogHeavyWriteAndScan() throws Exception {
@Test
void testKvHeavyWriteAndScan() throws Exception {
final String db = "db";
final String tbl = "kv_heavy_table";
final String tbl = "kv_heavy_table_and_scan";
// create table
TableDescriptor descriptor =
TableDescriptor.builder()
Expand Down
Loading

0 comments on commit 0c6dcaf

Please sign in to comment.