From 57e8afc5f3ca528ee9fd2266e80f9e9079e25e9d Mon Sep 17 00:00:00 2001 From: Jesus Camacho Rodriguez Date: Wed, 14 Jun 2023 18:27:50 -0700 Subject: [PATCH] Support for multiple connections --- .../java/com/microsoft/lst_bench/Driver.java | 17 ++-- .../lst_bench/common/BenchmarkConfig.java | 4 +- .../common/LSTBenchmarkExecutor.java | 9 +- .../microsoft/lst_bench/exec/SessionExec.java | 3 + .../microsoft/lst_bench/input/Session.java | 5 ++ .../tpcds/wp3_rw_concurrency_multi.yaml | 83 +++++++++++++++++++ src/main/resources/schemas/workload.json | 5 ++ .../common/LSTBenchmarkExecutorTest.java | 4 +- .../lst_bench/input/ValidationTest.java | 2 + .../config/spark/connections_config.yaml | 5 ++ .../config/spark/w_all_tpcds_delta.yaml | 9 +- .../config/spark/w_all_tpcds_hudi.yaml | 9 +- .../config/spark/w_all_tpcds_iceberg.yaml | 9 +- 13 files changed, 146 insertions(+), 18 deletions(-) create mode 100644 src/main/resources/config/tpcds/wp3_rw_concurrency_multi.yaml diff --git a/src/main/java/com/microsoft/lst_bench/Driver.java b/src/main/java/com/microsoft/lst_bench/Driver.java index 0dfb584c..619550de 100644 --- a/src/main/java/com/microsoft/lst_bench/Driver.java +++ b/src/main/java/com/microsoft/lst_bench/Driver.java @@ -30,8 +30,10 @@ import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry; import com.microsoft.lst_bench.telemetry.TelemetryHook; import java.io.File; -import java.util.LinkedHashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -114,14 +116,15 @@ public static void main(String[] args) throws Exception { final TelemetryConfig telemetryConfig = mapper.readValue(new File(inputTelemetryConfigFile), TelemetryConfig.class); - // Create connections manager - Map idToConnectionManager = new LinkedHashMap<>(); + // Create connections managers + Set connectionManagerIds = new HashSet<>(); + List connectionManagers = new ArrayList<>(); for (ConnectionConfig connectionConfig : connectionsConfig.getConnections()) { ConnectionManager connectionManager = ConnectionManager.from(connectionConfig); - if (idToConnectionManager.containsKey(connectionConfig.getId())) { + if (!connectionManagerIds.add(connectionConfig.getId())) { throw new IllegalArgumentException("Duplicate connection id: " + connectionConfig.getId()); } - idToConnectionManager.put(connectionConfig.getId(), connectionManager); + connectionManagers.add(connectionManager); } // Create log utility @@ -143,7 +146,7 @@ public static void main(String[] args) throws Exception { // Run experiment final BenchmarkRunnable experiment = - new LSTBenchmarkExecutor(idToConnectionManager, benchmarkConfig, telemetryRegistry); + new LSTBenchmarkExecutor(connectionManagers, benchmarkConfig, telemetryRegistry); experiment.execute(); } diff --git a/src/main/java/com/microsoft/lst_bench/common/BenchmarkConfig.java b/src/main/java/com/microsoft/lst_bench/common/BenchmarkConfig.java index 067279b1..5d7b3124 100644 --- a/src/main/java/com/microsoft/lst_bench/common/BenchmarkConfig.java +++ b/src/main/java/com/microsoft/lst_bench/common/BenchmarkConfig.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.commons.lang3.ObjectUtils; /** A benchmark configuration. */ public class BenchmarkConfig { @@ -185,7 +186,8 @@ private static SessionExec createSessionExec( taskTemplateIdToParameterValuesCounter); tasks.add(taskExec); } - return ImmutableSessionExec.of(sessionId, tasks); + return ImmutableSessionExec.of( + sessionId, tasks, ObjectUtils.defaultIfNull(session.getConnectionManager(), 0)); } private static TaskExec createTaskExec( diff --git a/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java b/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java index d6e7a1ce..6549f39f 100644 --- a/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java +++ b/src/main/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutor.java @@ -57,7 +57,7 @@ public class LSTBenchmarkExecutor extends BenchmarkRunnable { private static final Logger LOGGER = LoggerFactory.getLogger(LSTBenchmarkExecutor.class); - private final Map idToConnectionManager; + private final List connectionManagers; private final BenchmarkConfig config; private final JDBCTelemetryRegistry telemetryRegistry; @@ -65,11 +65,11 @@ public class LSTBenchmarkExecutor extends BenchmarkRunnable { private final UUID experimentRunId; public LSTBenchmarkExecutor( - Map idToConnectionManager, + List connectionManagers, BenchmarkConfig config, JDBCTelemetryRegistry telemetryRegistry) { super(); - this.idToConnectionManager = Collections.unmodifiableMap(idToConnectionManager); + this.connectionManagers = Collections.unmodifiableList(connectionManagers); this.config = config; this.telemetryRegistry = telemetryRegistry; this.experimentRunId = UUID.randomUUID(); @@ -112,8 +112,7 @@ public void execute() throws Exception { for (SessionExec session : phase.getSessions()) { threads.add( new Worker( - // TODO: Multiple connections - idToConnectionManager.values().iterator().next(), + connectionManagers.get(session.getConnectionManager()), session, runtimeParameterValues, phaseIdToEndTime)); diff --git a/src/main/java/com/microsoft/lst_bench/exec/SessionExec.java b/src/main/java/com/microsoft/lst_bench/exec/SessionExec.java index 9ec5fadc..852b5bc5 100644 --- a/src/main/java/com/microsoft/lst_bench/exec/SessionExec.java +++ b/src/main/java/com/microsoft/lst_bench/exec/SessionExec.java @@ -26,4 +26,7 @@ public interface SessionExec { String getId(); List getTasks(); + + /** Connection manager for this session (positional index). */ + int getConnectionManager(); } diff --git a/src/main/java/com/microsoft/lst_bench/input/Session.java b/src/main/java/com/microsoft/lst_bench/input/Session.java index 90142827..39c7c4bb 100644 --- a/src/main/java/com/microsoft/lst_bench/input/Session.java +++ b/src/main/java/com/microsoft/lst_bench/input/Session.java @@ -16,9 +16,11 @@ package com.microsoft.lst_bench.input; import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.util.List; +import javax.annotation.Nullable; import org.immutables.value.Value; /** POJO class meant to be used to deserialize an input session. */ @@ -29,4 +31,7 @@ @JsonInclude(JsonInclude.Include.NON_NULL) public interface Session { List getTasks(); + + @JsonProperty("connection_manager") + @Nullable Integer getConnectionManager(); } diff --git a/src/main/resources/config/tpcds/wp3_rw_concurrency_multi.yaml b/src/main/resources/config/tpcds/wp3_rw_concurrency_multi.yaml new file mode 100644 index 00000000..7861671d --- /dev/null +++ b/src/main/resources/config/tpcds/wp3_rw_concurrency_multi.yaml @@ -0,0 +1,83 @@ +# Description: WP3: R/W concurrency +--- +version: 1 +id: wp3_rw_concurrency +phases: +- id: setup + sessions: + - tasks: + - template_id: setup +- id: setup_data_maintenance + sessions: + - tasks: + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance + - template_id: setup_data_maintenance +- id: init + sessions: + - tasks: + - template_id: init +- id: build + sessions: + - tasks: + - template_id: build +- id: single_user_1_data_maintenance_1 + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + connection_manager: 1 +- id: single_user_2_optimize_1 + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: optimize_delta + connection_manager: 1 +- id: single_user_2o_data_maintenance_2 + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + connection_manager: 1 +- id: single_user_3_optimize_2 + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: optimize_delta + connection_manager: 1 +- id: single_user_3o_data_maintenance_3 + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + - template_id: data_maintenance_delta + connection_manager: 1 +- id: single_user_4_optimize_3 + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: optimize_delta + connection_manager: 1 diff --git a/src/main/resources/schemas/workload.json b/src/main/resources/schemas/workload.json index ff818b69..4a235e68 100644 --- a/src/main/resources/schemas/workload.json +++ b/src/main/resources/schemas/workload.json @@ -78,6 +78,11 @@ } } } + }, + "connection_manager": { + "type": "integer", + "title": "Connection manager index", + "description": "Positional index of the connection manager within the connections configuration" } } } diff --git a/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java b/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java index 71cc709e..0d6e9b3c 100644 --- a/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java +++ b/src/test/java/com/microsoft/lst_bench/common/LSTBenchmarkExecutorTest.java @@ -29,7 +29,7 @@ import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry; import java.io.File; import java.net.URL; -import java.util.HashMap; +import java.util.ArrayList; import java.util.UUID; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -60,7 +60,7 @@ void tearDown() { */ @Test void testNoOpSetup() throws Exception { - var idToConnectionManager = new HashMap(); + var idToConnectionManager = new ArrayList(); ExperimentConfig experimentConfig = ImmutableExperimentConfig.builder().id("telemetryTest").version(1).repetitions(1).build(); TaskLibrary taskLibrary = ImmutableTaskLibrary.builder().version(1).build(); diff --git a/src/test/java/com/microsoft/lst_bench/input/ValidationTest.java b/src/test/java/com/microsoft/lst_bench/input/ValidationTest.java index 12f781de..92d3049e 100644 --- a/src/test/java/com/microsoft/lst_bench/input/ValidationTest.java +++ b/src/test/java/com/microsoft/lst_bench/input/ValidationTest.java @@ -156,6 +156,7 @@ public void testValidationTaskLibrary() throws IOException { "src/main/resources/config/tpcds/wp1_longevity.yaml", "src/main/resources/config/tpcds/wp2_resilience.yaml", "src/main/resources/config/tpcds/wp3_rw_concurrency.yaml", + "src/main/resources/config/tpcds/wp3_rw_concurrency_multi.yaml", "src/main/resources/config/tpcds/wp4_time_travel.yaml" }) public void testValidationWorkload(String workloadFilePath) throws IOException { @@ -190,6 +191,7 @@ public void testValidationWorkload(String workloadFilePath) throws IOException { "src\\main\\resources\\config\\tpcds\\wp1_longevity.yaml", "src\\main\\resources\\config\\tpcds\\wp2_resilience.yaml", "src\\main\\resources\\config\\tpcds\\wp3_rw_concurrency.yaml", + "src\\main\\resources\\config\\tpcds\\wp3_rw_concurrency_multi.yaml", "src\\main\\resources\\config\\tpcds\\wp4_time_travel.yaml" }) public void testValidationWorkloadWin(String workloadFilePath) throws IOException { diff --git a/src/test/resources/config/spark/connections_config.yaml b/src/test/resources/config/spark/connections_config.yaml index ee6b6866..bcd87b03 100644 --- a/src/test/resources/config/spark/connections_config.yaml +++ b/src/test/resources/config/spark/connections_config.yaml @@ -7,3 +7,8 @@ connections: url: jdbc:hive2://127.0.0.1:10000 username: admin password: p@ssw0rd0 +- id: spark_1 + driver: org.apache.hive.jdbc.HiveDriver + url: jdbc:hive2://127.0.0.1:10000 + username: admin + password: p@ssw0rd0 diff --git a/src/test/resources/config/spark/w_all_tpcds_delta.yaml b/src/test/resources/config/spark/w_all_tpcds_delta.yaml index d839b0b0..65c30772 100644 --- a/src/test/resources/config/spark/w_all_tpcds_delta.yaml +++ b/src/test/resources/config/spark/w_all_tpcds_delta.yaml @@ -30,4 +30,11 @@ phases: - id: optimize sessions: - tasks: - - template_id: optimize_delta \ No newline at end of file + - template_id: optimize_delta +- id: multi_single_user + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: single_user + connection_manager: 1 diff --git a/src/test/resources/config/spark/w_all_tpcds_hudi.yaml b/src/test/resources/config/spark/w_all_tpcds_hudi.yaml index 73cb6d9d..27cbc6a7 100644 --- a/src/test/resources/config/spark/w_all_tpcds_hudi.yaml +++ b/src/test/resources/config/spark/w_all_tpcds_hudi.yaml @@ -33,4 +33,11 @@ phases: - id: optimize sessions: - tasks: - - template_id: optimize_hudi \ No newline at end of file + - template_id: optimize_hudi +- id: multi_single_user + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: single_user + connection_manager: 1 diff --git a/src/test/resources/config/spark/w_all_tpcds_iceberg.yaml b/src/test/resources/config/spark/w_all_tpcds_iceberg.yaml index a7d5a5b1..07e7cb3e 100644 --- a/src/test/resources/config/spark/w_all_tpcds_iceberg.yaml +++ b/src/test/resources/config/spark/w_all_tpcds_iceberg.yaml @@ -30,4 +30,11 @@ phases: - id: optimize sessions: - tasks: - - template_id: optimize_iceberg \ No newline at end of file + - template_id: optimize_iceberg +- id: multi_single_user + sessions: + - tasks: + - template_id: single_user + - tasks: + - template_id: single_user + connection_manager: 1