Skip to content

Commit

Permalink
Support for multiple connections
Browse files Browse the repository at this point in the history
  • Loading branch information
jcamachor committed Jun 15, 2023
1 parent 19e7e79 commit 57e8afc
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 18 deletions.
17 changes: 10 additions & 7 deletions src/main/java/com/microsoft/lst_bench/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry;
import com.microsoft.lst_bench.telemetry.TelemetryHook;
import java.io.File;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
Expand Down Expand Up @@ -114,14 +116,15 @@ public static void main(String[] args) throws Exception {
final TelemetryConfig telemetryConfig =
mapper.readValue(new File(inputTelemetryConfigFile), TelemetryConfig.class);

// Create connections manager
Map<String, ConnectionManager> idToConnectionManager = new LinkedHashMap<>();
// Create connections managers
Set<String> connectionManagerIds = new HashSet<>();
List<ConnectionManager> connectionManagers = new ArrayList<>();
for (ConnectionConfig connectionConfig : connectionsConfig.getConnections()) {
ConnectionManager connectionManager = ConnectionManager.from(connectionConfig);
if (idToConnectionManager.containsKey(connectionConfig.getId())) {
if (!connectionManagerIds.add(connectionConfig.getId())) {
throw new IllegalArgumentException("Duplicate connection id: " + connectionConfig.getId());
}
idToConnectionManager.put(connectionConfig.getId(), connectionManager);
connectionManagers.add(connectionManager);
}

// Create log utility
Expand All @@ -143,7 +146,7 @@ public static void main(String[] args) throws Exception {

// Run experiment
final BenchmarkRunnable experiment =
new LSTBenchmarkExecutor(idToConnectionManager, benchmarkConfig, telemetryRegistry);
new LSTBenchmarkExecutor(connectionManagers, benchmarkConfig, telemetryRegistry);
experiment.execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ObjectUtils;

/** A benchmark configuration. */
public class BenchmarkConfig {
Expand Down Expand Up @@ -185,7 +186,8 @@ private static SessionExec createSessionExec(
taskTemplateIdToParameterValuesCounter);
tasks.add(taskExec);
}
return ImmutableSessionExec.of(sessionId, tasks);
return ImmutableSessionExec.of(
sessionId, tasks, ObjectUtils.defaultIfNull(session.getConnectionManager(), 0));
}

private static TaskExec createTaskExec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,19 @@ public class LSTBenchmarkExecutor extends BenchmarkRunnable {

private static final Logger LOGGER = LoggerFactory.getLogger(LSTBenchmarkExecutor.class);

private final Map<String, ConnectionManager> idToConnectionManager;
private final List<ConnectionManager> connectionManagers;
private final BenchmarkConfig config;
private final JDBCTelemetryRegistry telemetryRegistry;

// UUID to identify the experiment run. The experiment telemetry will be tagged with this UUID.
private final UUID experimentRunId;

public LSTBenchmarkExecutor(
Map<String, ConnectionManager> idToConnectionManager,
List<ConnectionManager> connectionManagers,
BenchmarkConfig config,
JDBCTelemetryRegistry telemetryRegistry) {
super();
this.idToConnectionManager = Collections.unmodifiableMap(idToConnectionManager);
this.connectionManagers = Collections.unmodifiableList(connectionManagers);
this.config = config;
this.telemetryRegistry = telemetryRegistry;
this.experimentRunId = UUID.randomUUID();
Expand Down Expand Up @@ -112,8 +112,7 @@ public void execute() throws Exception {
for (SessionExec session : phase.getSessions()) {
threads.add(
new Worker(
// TODO: Multiple connections
idToConnectionManager.values().iterator().next(),
connectionManagers.get(session.getConnectionManager()),
session,
runtimeParameterValues,
phaseIdToEndTime));
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/microsoft/lst_bench/exec/SessionExec.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ public interface SessionExec {
String getId();

List<TaskExec> getTasks();

/** Connection manager for this session (positional index). */
int getConnectionManager();
}
5 changes: 5 additions & 0 deletions src/main/java/com/microsoft/lst_bench/input/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
package com.microsoft.lst_bench.input;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.util.List;
import javax.annotation.Nullable;
import org.immutables.value.Value;

/** POJO class meant to be used to deserialize an input session. */
Expand All @@ -29,4 +31,7 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
public interface Session {
List<Task> getTasks();

@JsonProperty("connection_manager")
@Nullable Integer getConnectionManager();
}
83 changes: 83 additions & 0 deletions src/main/resources/config/tpcds/wp3_rw_concurrency_multi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Description: WP3: R/W concurrency
---
version: 1
id: wp3_rw_concurrency
phases:
- id: setup
sessions:
- tasks:
- template_id: setup
- id: setup_data_maintenance
sessions:
- tasks:
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- template_id: setup_data_maintenance
- id: init
sessions:
- tasks:
- template_id: init
- id: build
sessions:
- tasks:
- template_id: build
- id: single_user_1_data_maintenance_1
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
connection_manager: 1
- id: single_user_2_optimize_1
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: optimize_delta
connection_manager: 1
- id: single_user_2o_data_maintenance_2
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
connection_manager: 1
- id: single_user_3_optimize_2
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: optimize_delta
connection_manager: 1
- id: single_user_3o_data_maintenance_3
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
- template_id: data_maintenance_delta
connection_manager: 1
- id: single_user_4_optimize_3
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: optimize_delta
connection_manager: 1
5 changes: 5 additions & 0 deletions src/main/resources/schemas/workload.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
}
}
}
},
"connection_manager": {
"type": "integer",
"title": "Connection manager index",
"description": "Positional index of the connection manager within the connections configuration"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.microsoft.lst_bench.telemetry.JDBCTelemetryRegistry;
import java.io.File;
import java.net.URL;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -60,7 +60,7 @@ void tearDown() {
*/
@Test
void testNoOpSetup() throws Exception {
var idToConnectionManager = new HashMap<String, ConnectionManager>();
var idToConnectionManager = new ArrayList<ConnectionManager>();
ExperimentConfig experimentConfig =
ImmutableExperimentConfig.builder().id("telemetryTest").version(1).repetitions(1).build();
TaskLibrary taskLibrary = ImmutableTaskLibrary.builder().version(1).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public void testValidationTaskLibrary() throws IOException {
"src/main/resources/config/tpcds/wp1_longevity.yaml",
"src/main/resources/config/tpcds/wp2_resilience.yaml",
"src/main/resources/config/tpcds/wp3_rw_concurrency.yaml",
"src/main/resources/config/tpcds/wp3_rw_concurrency_multi.yaml",
"src/main/resources/config/tpcds/wp4_time_travel.yaml"
})
public void testValidationWorkload(String workloadFilePath) throws IOException {
Expand Down Expand Up @@ -190,6 +191,7 @@ public void testValidationWorkload(String workloadFilePath) throws IOException {
"src\\main\\resources\\config\\tpcds\\wp1_longevity.yaml",
"src\\main\\resources\\config\\tpcds\\wp2_resilience.yaml",
"src\\main\\resources\\config\\tpcds\\wp3_rw_concurrency.yaml",
"src\\main\\resources\\config\\tpcds\\wp3_rw_concurrency_multi.yaml",
"src\\main\\resources\\config\\tpcds\\wp4_time_travel.yaml"
})
public void testValidationWorkloadWin(String workloadFilePath) throws IOException {
Expand Down
5 changes: 5 additions & 0 deletions src/test/resources/config/spark/connections_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ connections:
url: jdbc:hive2://127.0.0.1:10000
username: admin
password: p@ssw0rd0
- id: spark_1
driver: org.apache.hive.jdbc.HiveDriver
url: jdbc:hive2://127.0.0.1:10000
username: admin
password: p@ssw0rd0
9 changes: 8 additions & 1 deletion src/test/resources/config/spark/w_all_tpcds_delta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ phases:
- id: optimize
sessions:
- tasks:
- template_id: optimize_delta
- template_id: optimize_delta
- id: multi_single_user
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: single_user
connection_manager: 1
9 changes: 8 additions & 1 deletion src/test/resources/config/spark/w_all_tpcds_hudi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,11 @@ phases:
- id: optimize
sessions:
- tasks:
- template_id: optimize_hudi
- template_id: optimize_hudi
- id: multi_single_user
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: single_user
connection_manager: 1
9 changes: 8 additions & 1 deletion src/test/resources/config/spark/w_all_tpcds_iceberg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,11 @@ phases:
- id: optimize
sessions:
- tasks:
- template_id: optimize_iceberg
- template_id: optimize_iceberg
- id: multi_single_user
sessions:
- tasks:
- template_id: single_user
- tasks:
- template_id: single_user
connection_manager: 1

0 comments on commit 57e8afc

Please sign in to comment.