From 38ac0295b8184383224ea86d4c38cc77ccbe32c8 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Fri, 10 Jan 2025 14:24:21 +0100 Subject: [PATCH] Move JdbcRecordSetProvider construction to a module --- .../main/java/io/trino/plugin/jdbc/JdbcModule.java | 14 ++++++++++++++ .../trino/plugin/jdbc/JdbcPageSourceProvider.java | 9 ++++----- .../trino/plugin/phoenix5/PhoenixClientModule.java | 3 +++ .../plugin/redshift/RedshiftClientModule.java | 3 --- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java index 30ff42a0e8c6..b16895f6e148 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcModule.java @@ -15,10 +15,14 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Key; import com.google.inject.Provider; import com.google.inject.Scopes; +import com.google.inject.Singleton; import com.google.inject.multibindings.Multibinder; +import com.google.inject.multibindings.ProvidesIntoOptional; +import dev.failsafe.RetryPolicy; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.plugin.base.mapping.IdentifierMappingModule; import io.trino.plugin.base.session.SessionPropertiesProvider; @@ -29,6 +33,7 @@ import io.trino.spi.connector.ConnectorAccessControl; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.function.table.ConnectorTableFunction; import io.trino.spi.procedure.Procedure; @@ -37,6 +42,7 @@ import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; +import static com.google.inject.multibindings.ProvidesIntoOptional.Type.DEFAULT; import static io.airlift.configuration.ConditionalModule.conditionalModule; import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.plugin.base.ClosingBinder.closingBinder; @@ -143,4 +149,12 @@ public static void bindTablePropertiesProvider(Binder binder, Class policy) + { + return new JdbcRecordSetProvider(jdbcClient, executor, policy); + } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSourceProvider.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSourceProvider.java index 8ca06b65dba2..4264080ef017 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSourceProvider.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcPageSourceProvider.java @@ -15,11 +15,11 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Inject; -import dev.failsafe.RetryPolicy; import io.trino.plugin.jdbc.MergeJdbcPageSource.ColumnAdaptation; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; import io.trino.spi.connector.ConnectorTableHandle; @@ -29,7 +29,6 @@ import java.util.List; import java.util.Optional; -import java.util.concurrent.ExecutorService; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -43,13 +42,13 @@ public class JdbcPageSourceProvider implements ConnectorPageSourceProvider { private final JdbcClient jdbcClient; - private final JdbcRecordSetProvider recordSetProvider; + private final ConnectorRecordSetProvider recordSetProvider; @Inject - public JdbcPageSourceProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy policy) + public JdbcPageSourceProvider(JdbcClient jdbcClient, ConnectorRecordSetProvider recordSetProvider) { this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); - this.recordSetProvider = new JdbcRecordSetProvider(jdbcClient, executor, policy); + this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null"); } @Override diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java index 425281792c6f..b0b7d03deaed 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClientModule.java @@ -44,6 +44,7 @@ import io.trino.plugin.jdbc.JdbcMetadataConfig; import io.trino.plugin.jdbc.JdbcMetadataSessionProperties; import io.trino.plugin.jdbc.JdbcPageSourceProvider; +import io.trino.plugin.jdbc.JdbcRecordSetProvider; import io.trino.plugin.jdbc.JdbcWriteConfig; import io.trino.plugin.jdbc.JdbcWriteSessionProperties; import io.trino.plugin.jdbc.LazyConnectionFactory; @@ -63,6 +64,7 @@ import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorPageSourceProvider; +import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.procedure.Procedure; import org.apache.hadoop.conf.Configuration; @@ -113,6 +115,7 @@ protected void setup(Binder binder) binder.bind(ConnectorPageSinkProvider.class).to(ClassLoaderSafeConnectorPageSinkProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSourceProvider.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcPageSourceProvider.class).in(Scopes.SINGLETON); binder.bind(ConnectorPageSourceProvider.class).to(ClassLoaderSafeConnectorPageSourceProvider.class).in(Scopes.SINGLETON); + binder.bind(ConnectorRecordSetProvider.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON); binder.bind(QueryBuilder.class).to(DefaultQueryBuilder.class).in(Scopes.SINGLETON); newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class)); diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java index 0c6e01999ab1..669422797ef9 100644 --- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java +++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClientModule.java @@ -34,14 +34,12 @@ import io.trino.plugin.jdbc.JdbcJoinPushdownSupportModule; import io.trino.plugin.jdbc.JdbcMetadataConfig; import io.trino.plugin.jdbc.JdbcQueryEventListener; -import io.trino.plugin.jdbc.JdbcRecordSetProvider; import io.trino.plugin.jdbc.JdbcSplitManager; import io.trino.plugin.jdbc.JdbcStatisticsConfig; import io.trino.plugin.jdbc.RemoteQueryCancellationModule; import io.trino.plugin.jdbc.credential.CredentialProvider; import io.trino.plugin.jdbc.ptf.Query; import io.trino.spi.connector.Connector; -import io.trino.spi.connector.ConnectorRecordSetProvider; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.function.table.ConnectorTableFunction; @@ -70,7 +68,6 @@ public void setup(Binder binder) install(new DecimalModule()); install(new JdbcJoinPushdownSupportModule()); install(new RemoteQueryCancellationModule()); - binder.bind(ConnectorRecordSetProvider.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON); install(conditionalModule( RedshiftConfig.class,