Skip to content

Commit

Permalink
Move JdbcRecordSetProvider construction to a module
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Jan 10, 2025
1 parent e208752 commit 38ac029
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@

import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.multibindings.ProvidesIntoOptional;
import dev.failsafe.RetryPolicy;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.mapping.IdentifierMappingModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
Expand All @@ -29,6 +33,7 @@
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.function.table.ConnectorTableFunction;
import io.trino.spi.procedure.Procedure;
Expand All @@ -37,6 +42,7 @@

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static com.google.inject.multibindings.ProvidesIntoOptional.Type.DEFAULT;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
Expand Down Expand Up @@ -143,4 +149,12 @@ public static void bindTablePropertiesProvider(Binder binder, Class<? extends Ta
{
tablePropertiesProviderBinder(binder).addBinding().to(type).in(Scopes.SINGLETON);
}

@ProvidesIntoOptional(DEFAULT)
@Inject
@Singleton
ConnectorRecordSetProvider recordSetProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy<Object> policy)
{
return new JdbcRecordSetProvider(jdbcClient, executor, policy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import dev.failsafe.RetryPolicy;
import io.trino.plugin.jdbc.MergeJdbcPageSource.ColumnAdaptation;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplit;
import io.trino.spi.connector.ConnectorTableHandle;
Expand All @@ -29,7 +29,6 @@

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -43,13 +42,13 @@ public class JdbcPageSourceProvider
implements ConnectorPageSourceProvider
{
private final JdbcClient jdbcClient;
private final JdbcRecordSetProvider recordSetProvider;
private final ConnectorRecordSetProvider recordSetProvider;

@Inject
public JdbcPageSourceProvider(JdbcClient jdbcClient, @ForRecordCursor ExecutorService executor, RetryPolicy<Object> policy)
public JdbcPageSourceProvider(JdbcClient jdbcClient, ConnectorRecordSetProvider recordSetProvider)
{
this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null");
this.recordSetProvider = new JdbcRecordSetProvider(jdbcClient, executor, policy);
this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.trino.plugin.jdbc.JdbcMetadataConfig;
import io.trino.plugin.jdbc.JdbcMetadataSessionProperties;
import io.trino.plugin.jdbc.JdbcPageSourceProvider;
import io.trino.plugin.jdbc.JdbcRecordSetProvider;
import io.trino.plugin.jdbc.JdbcWriteConfig;
import io.trino.plugin.jdbc.JdbcWriteSessionProperties;
import io.trino.plugin.jdbc.LazyConnectionFactory;
Expand All @@ -63,6 +64,7 @@
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.procedure.Procedure;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -113,6 +115,7 @@ protected void setup(Binder binder)
binder.bind(ConnectorPageSinkProvider.class).to(ClassLoaderSafeConnectorPageSinkProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSourceProvider.class).annotatedWith(ForClassLoaderSafe.class).to(JdbcPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorPageSourceProvider.class).to(ClassLoaderSafeConnectorPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(ConnectorRecordSetProvider.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);

binder.bind(QueryBuilder.class).to(DefaultQueryBuilder.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, Key.get(int.class, MaxDomainCompactionThreshold.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@
import io.trino.plugin.jdbc.JdbcJoinPushdownSupportModule;
import io.trino.plugin.jdbc.JdbcMetadataConfig;
import io.trino.plugin.jdbc.JdbcQueryEventListener;
import io.trino.plugin.jdbc.JdbcRecordSetProvider;
import io.trino.plugin.jdbc.JdbcSplitManager;
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.RemoteQueryCancellationModule;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.ptf.Query;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorRecordSetProvider;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.function.table.ConnectorTableFunction;

Expand Down Expand Up @@ -70,7 +68,6 @@ public void setup(Binder binder)
install(new DecimalModule());
install(new JdbcJoinPushdownSupportModule());
install(new RemoteQueryCancellationModule());
binder.bind(ConnectorRecordSetProvider.class).to(JdbcRecordSetProvider.class).in(Scopes.SINGLETON);

install(conditionalModule(
RedshiftConfig.class,
Expand Down

0 comments on commit 38ac029

Please sign in to comment.