From e0cd6a2433eb2324f30e836a773b38a6e5bd34f6 Mon Sep 17 00:00:00 2001 From: hqbhoho Date: Sat, 4 Jan 2025 07:44:32 +0800 Subject: [PATCH] Add support Hadoop-Compatible File System when use HDFS as exchange type --- .../plugin/exchange/hdfs/ExchangeHdfsConfig.java | 14 ++++++++++++++ .../plugin/exchange/hdfs/HdfsExchangeModule.java | 8 +++++--- .../exchange/hdfs/TestExchangeHdfsConfig.java | 7 +++++-- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/plugin/trino-exchange-hdfs/src/main/java/io/trino/plugin/exchange/hdfs/ExchangeHdfsConfig.java b/plugin/trino-exchange-hdfs/src/main/java/io/trino/plugin/exchange/hdfs/ExchangeHdfsConfig.java index 092ba2688056..0d7d67d16f97 100644 --- a/plugin/trino-exchange-hdfs/src/main/java/io/trino/plugin/exchange/hdfs/ExchangeHdfsConfig.java +++ b/plugin/trino-exchange-hdfs/src/main/java/io/trino/plugin/exchange/hdfs/ExchangeHdfsConfig.java @@ -31,6 +31,7 @@ public class ExchangeHdfsConfig { private DataSize hdfsStorageBlockSize = DataSize.of(4, MEGABYTE); + private boolean hdfsCompatibleFsEnabled; private List resourceConfigFiles = ImmutableList.of(); @NotNull @@ -49,6 +50,19 @@ public ExchangeHdfsConfig setHdfsStorageBlockSize(DataSize hdfsStorageBlockSize) return this; } + public boolean isHdfsCompatibleFsEnabled() + { + return hdfsCompatibleFsEnabled; + } + + @Config("exchange.hdfs.compatible-fs.enable") + @ConfigDescription("Enable use Hadoop-Compatible File System") + public ExchangeHdfsConfig setHdfsCompatibleFsEnabled(boolean hdfsCompatibleFsEnabled) + { + this.hdfsCompatibleFsEnabled = hdfsCompatibleFsEnabled; + return this; + } + @NotNull public List<@FileExists File> getResourceConfigFiles() { diff --git a/plugin/trino-exchange-hdfs/src/main/java/io/trino/plugin/exchange/hdfs/HdfsExchangeModule.java b/plugin/trino-exchange-hdfs/src/main/java/io/trino/plugin/exchange/hdfs/HdfsExchangeModule.java index 5c30087c0b73..ebd4eae06331 100644 --- a/plugin/trino-exchange-hdfs/src/main/java/io/trino/plugin/exchange/hdfs/HdfsExchangeModule.java +++ b/plugin/trino-exchange-hdfs/src/main/java/io/trino/plugin/exchange/hdfs/HdfsExchangeModule.java @@ -25,7 +25,6 @@ import java.net.URI; import java.util.List; -import static io.airlift.configuration.ConfigBinder.configBinder; import static io.trino.spi.StandardErrorCode.CONFIGURATION_INVALID; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; @@ -47,10 +46,13 @@ protected void setup(Binder binder) binder.addError(new TrinoException(CONFIGURATION_INVALID, "Multiple schemes in exchange base directories")); return; } + String scheme = baseDirectories.get(0).getScheme(); - if (scheme.equalsIgnoreCase("hdfs")) { + + boolean hdfsCompatibleFsEnabled = buildConfigObject(ExchangeHdfsConfig.class).isHdfsCompatibleFsEnabled(); + + if (scheme.equalsIgnoreCase("hdfs") || hdfsCompatibleFsEnabled) { binder.bind(FileSystemExchangeStorage.class).to(HadoopFileSystemExchangeStorage.class).in(Scopes.SINGLETON); - configBinder(binder).bindConfig(ExchangeHdfsConfig.class); } else { binder.addError(new TrinoException(NOT_SUPPORTED, diff --git a/plugin/trino-exchange-hdfs/src/test/java/io/trino/plugin/exchange/hdfs/TestExchangeHdfsConfig.java b/plugin/trino-exchange-hdfs/src/test/java/io/trino/plugin/exchange/hdfs/TestExchangeHdfsConfig.java index 190258e93c81..4e79bacd41ce 100644 --- a/plugin/trino-exchange-hdfs/src/test/java/io/trino/plugin/exchange/hdfs/TestExchangeHdfsConfig.java +++ b/plugin/trino-exchange-hdfs/src/test/java/io/trino/plugin/exchange/hdfs/TestExchangeHdfsConfig.java @@ -35,7 +35,8 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(ExchangeHdfsConfig.class) .setResourceConfigFiles(ImmutableList.of()) - .setHdfsStorageBlockSize(DataSize.of(4, MEGABYTE))); + .setHdfsStorageBlockSize(DataSize.of(4, MEGABYTE)) + .setHdfsCompatibleFsEnabled(false)); } @Test @@ -48,11 +49,13 @@ public void testExplicitPropertyMappings() Map properties = ImmutableMap.builder() .put("hdfs.config.resources", resource1 + "," + resource2) .put("exchange.hdfs.block-size", "8MB") + .put("exchange.hdfs.compatible-fs.enable", "true") .buildOrThrow(); ExchangeHdfsConfig expected = new ExchangeHdfsConfig() .setResourceConfigFiles(ImmutableList.of(resource1.toString(), resource2.toString())) - .setHdfsStorageBlockSize(DataSize.of(8, MEGABYTE)); + .setHdfsStorageBlockSize(DataSize.of(8, MEGABYTE)) + .setHdfsCompatibleFsEnabled(true); assertFullMapping(properties, expected); }