Skip to content

Commit

Permalink
Add support Hadoop-Compatible File System when use HDFS as exchange type
Browse files Browse the repository at this point in the history
  • Loading branch information
hqbhoho committed Jan 3, 2025
1 parent 2ef6dc1 commit e0cd6a2
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class ExchangeHdfsConfig
{
private DataSize hdfsStorageBlockSize = DataSize.of(4, MEGABYTE);
private boolean hdfsCompatibleFsEnabled;
private List<File> resourceConfigFiles = ImmutableList.of();

@NotNull
Expand All @@ -49,6 +50,19 @@ public ExchangeHdfsConfig setHdfsStorageBlockSize(DataSize hdfsStorageBlockSize)
return this;
}

public boolean isHdfsCompatibleFsEnabled()
{
return hdfsCompatibleFsEnabled;
}

@Config("exchange.hdfs.compatible-fs.enable")
@ConfigDescription("Enable use Hadoop-Compatible File System")
public ExchangeHdfsConfig setHdfsCompatibleFsEnabled(boolean hdfsCompatibleFsEnabled)
{
this.hdfsCompatibleFsEnabled = hdfsCompatibleFsEnabled;
return this;
}

@NotNull
public List<@FileExists File> getResourceConfigFiles()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.util.List;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.spi.StandardErrorCode.CONFIGURATION_INVALID;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
Expand All @@ -47,10 +46,13 @@ protected void setup(Binder binder)
binder.addError(new TrinoException(CONFIGURATION_INVALID, "Multiple schemes in exchange base directories"));
return;
}

String scheme = baseDirectories.get(0).getScheme();
if (scheme.equalsIgnoreCase("hdfs")) {

boolean hdfsCompatibleFsEnabled = buildConfigObject(ExchangeHdfsConfig.class).isHdfsCompatibleFsEnabled();

if (scheme.equalsIgnoreCase("hdfs") || hdfsCompatibleFsEnabled) {
binder.bind(FileSystemExchangeStorage.class).to(HadoopFileSystemExchangeStorage.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(ExchangeHdfsConfig.class);
}
else {
binder.addError(new TrinoException(NOT_SUPPORTED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public void testDefaults()
{
assertRecordedDefaults(recordDefaults(ExchangeHdfsConfig.class)
.setResourceConfigFiles(ImmutableList.of())
.setHdfsStorageBlockSize(DataSize.of(4, MEGABYTE)));
.setHdfsStorageBlockSize(DataSize.of(4, MEGABYTE))
.setHdfsCompatibleFsEnabled(false));
}

@Test
Expand All @@ -48,11 +49,13 @@ public void testExplicitPropertyMappings()
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("hdfs.config.resources", resource1 + "," + resource2)
.put("exchange.hdfs.block-size", "8MB")
.put("exchange.hdfs.compatible-fs.enable", "true")
.buildOrThrow();

ExchangeHdfsConfig expected = new ExchangeHdfsConfig()
.setResourceConfigFiles(ImmutableList.of(resource1.toString(), resource2.toString()))
.setHdfsStorageBlockSize(DataSize.of(8, MEGABYTE));
.setHdfsStorageBlockSize(DataSize.of(8, MEGABYTE))
.setHdfsCompatibleFsEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down

0 comments on commit e0cd6a2

Please sign in to comment.