From b6b83a7870ce4818240c7f55d4a8bca091ee4cc9 Mon Sep 17 00:00:00 2001 From: Michiel De Smet Date: Fri, 27 Dec 2024 10:01:03 +0800 Subject: [PATCH] Take into account new metadata location in CREATE OR REPLACE TABLE --- .../AbstractIcebergTableOperations.java | 13 ++++- .../iceberg/BaseIcebergConnectorTest.java | 47 +++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index 498a718b567a..756af335d204 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -26,6 +26,7 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.connector.TableNotFoundException; import jakarta.annotation.Nullable; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; @@ -135,10 +136,18 @@ public TableMetadata refresh() public TableMetadata refresh(boolean invalidateCaches) { if (location.isPresent()) { - refreshFromMetadataLocation(null); + try { + String newLocation = fixBrokenMetadataLocation(getRefreshedLocation(invalidateCaches)); + refreshFromMetadataLocation(newLocation); + return currentMetadata; + } + catch (TableNotFoundException e) { + refreshFromMetadataLocation(null); + } return currentMetadata; } - refreshFromMetadataLocation(fixBrokenMetadataLocation(getRefreshedLocation(invalidateCaches))); + String newLocation = fixBrokenMetadataLocation(getRefreshedLocation(invalidateCaches)); + refreshFromMetadataLocation(newLocation); return currentMetadata; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 670200247be5..7435475bc567 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -20,6 +20,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.airlift.concurrent.MoreFutures; +import io.airlift.log.Level; +import io.airlift.log.Logging; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.Session; @@ -90,6 +93,9 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -155,6 +161,7 @@ import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; +import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -7095,6 +7102,46 @@ public void testCreateOrReplaceTableWithChangeInLocation() } } + @Test + public void testConcurrentCreateReplaceAndInserts() + throws Exception + { + Logging logging = Logging.initialize(); + logging.setLevel("io.trino.event.QueryMonitor", Level.DEBUG); + int threads = 2; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + String tableName = "test_concurrent_update_and_inserts_table_" + randomNameSuffix(); + + assertUpdate("CREATE TABLE " + tableName + " (a) AS VALUES 1", 1); + assertUpdate("CREATE TABLE sourceTable AS select a, b, rand() as r from UNNEST(SEQUENCE(1, 9001), SEQUENCE(1, 9001)) AS t(a, b)", 9001); + + try { + // Replace the table while concurrently adding new blind inserts + executor.invokeAll(ImmutableList.>builder() + .add(() -> { + barrier.await(10, SECONDS); + assertUpdate("CREATE OR REPLACE TABLE " + tableName + " AS SELECT a FROM sourceTable", 9001); + return null; + }) + .add(() -> { + barrier.await(10, SECONDS); + getQueryRunner().execute("INSERT INTO " + tableName + " VALUES (2)"); + return null; + }) + .build()) + .forEach(MoreFutures::getDone); + + // TODO: validate query results + } + finally { + assertUpdate("DROP TABLE " + tableName); + assertUpdate("DROP TABLE sourceTable"); + executor.shutdownNow(); + assertThat(executor.awaitTermination(10, SECONDS)).isTrue(); + } + } + @Test public void testMergeSimpleSelectPartitioned() {