Skip to content

Commit

Permalink
Use ROW_NUMBER in JdbcAggregateJobQueryDao to improve performance
Browse files Browse the repository at this point in the history
See #5524
  • Loading branch information
onobc committed Dec 6, 2023
1 parent d6241df commit d7bf6b8
Show file tree
Hide file tree
Showing 17 changed files with 435 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.cloud.dataflow.core.database.support;

import java.sql.DatabaseMetaData;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -100,6 +101,24 @@ public static DatabaseType fromProductName(String productName) {
}
}

/**
* Determines if the Database that the datasource refers to supports the {@code ROW_NUMBER()} SQL function.
* @param dataSource the datasource pointing to the DB in question
* @return whether the database supports the SQL {@code ROW_NUMBER()} function
* @throws MetaDataAccessException if error occurs
*/
public static boolean supportsRowNumberFunction(DataSource dataSource) throws MetaDataAccessException {
DatabaseType databaseType = DatabaseType.fromMetaData(dataSource);
if (databaseType == DatabaseType.H2 || databaseType == DatabaseType.HSQL) {
return false;
}
if (databaseType != DatabaseType.MYSQL) {
return true;
}
int majorVersion = JdbcUtils.extractDatabaseMetaData(dataSource, DatabaseMetaData::getDatabaseMajorVersion);
return (majorVersion >= 8);
}

private String getProductName() {
return productName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.springframework.cloud.task.repository.support.DatabaseType;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.transaction.PlatformTransactionManager;

Expand Down Expand Up @@ -151,17 +152,13 @@ public TaskDeploymentReader taskDeploymentReader(TaskDeploymentRepository reposi
}

@Bean
public AggregateJobQueryDao aggregateJobQueryDao(DataSource dataSource, SchemaService schemaService, JobServiceContainer jobServiceContainer) throws Exception {
return new JdbcAggregateJobQueryDao(dataSource, schemaService, jobServiceContainer);
public AggregateJobQueryDao aggregateJobQueryDao(DataSource dataSource, SchemaService schemaService,
JobServiceContainer jobServiceContainer, Environment environment) throws Exception {
return new JdbcAggregateJobQueryDao(dataSource, schemaService, jobServiceContainer, environment);
}

@Bean
public TaskBatchDaoContainer taskBatchDaoContainer(DataSource dataSource, SchemaService schemaService) {
return new TaskBatchDaoContainer(dataSource, schemaService);
}

@PostConstruct
public void setup() {
logger.info("created: org.springframework.cloud.dataflow.server.config.AggregateDataFlowContainerConfiguration");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* Provides for reading job execution data for Batch 4 and 5 schema versions.
*
* @author Corneil du Plessis
* @since 2.11.0
*/
public interface AggregateJobQueryDao {
Page<JobInstanceExecutions> listJobInstances(String jobName, Pageable pageable) throws NoSuchJobException;
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.dataflow.server.repository;

import javax.sql.DataSource;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.server.service.JobServiceContainer;
import org.springframework.mock.env.MockEnvironment;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

/**
* Unit tests for the row number optimization feature of {@link JdbcAggregateJobQueryDao}.
*
* @author Chris Bono
*/
@Testcontainers(disabledWithoutDocker = true)
class JdbcAggregateJobQueryDaoRowNumberOptimizationTests {

@Container
private static final JdbcDatabaseContainer container = new MariaDBContainer("mariadb:10.9.3");

private static DataSource dataSource;

@BeforeAll
static void startContainer() {
dataSource = DataSourceBuilder.create()
.url(container.getJdbcUrl())
.username(container.getUsername())
.password(container.getPassword())
.driverClassName(container.getDriverClassName())
.build();
}

@Test
void shouldUseOptimizationWhenPropertyNotSpecified() throws Exception {
MockEnvironment mockEnv = new MockEnvironment();
JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobServiceContainer.class), mockEnv);
assertThat(dao).hasFieldOrPropertyWithValue("useRowNumberOptimization", true);
}

@Test
void shouldUseOptimizationWhenPropertyEnabled() throws Exception {
MockEnvironment mockEnv = new MockEnvironment();
mockEnv.setProperty("spring.cloud.dataflow.task.jdbc.row-number-optimization.enabled", "true");
JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobServiceContainer.class), mockEnv);
assertThat(dao).hasFieldOrPropertyWithValue("useRowNumberOptimization", true);
}

@Test
void shouldNotUseOptimizationWhenPropertyDisabled() throws Exception {
MockEnvironment mockEnv = new MockEnvironment();
mockEnv.setProperty("spring.cloud.dataflow.task.jdbc.row-number-optimization.enabled", "false");
JdbcAggregateJobQueryDao dao = new JdbcAggregateJobQueryDao(dataSource, mock(SchemaService.class), mock(JobServiceContainer.class), mockEnv);
assertThat(dao).hasFieldOrPropertyWithValue("useRowNumberOptimization", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,58 @@
*/
package org.springframework.cloud.dataflow.server.db.migration;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import javax.sql.DataSource;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.containers.JdbcDatabaseContainer;

import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.dataflow.aggregate.task.AggregateTaskExplorer;
import org.springframework.cloud.dataflow.aggregate.task.TaskRepositoryContainer;
import org.springframework.cloud.dataflow.core.StreamDefinition;
import org.springframework.cloud.dataflow.core.database.support.DatabaseType;
import org.springframework.cloud.dataflow.rest.job.TaskJobExecution;
import org.springframework.cloud.dataflow.schema.AggregateTaskExecution;
import org.springframework.cloud.dataflow.schema.SchemaVersionTarget;
import org.springframework.cloud.dataflow.schema.service.SchemaService;
import org.springframework.cloud.dataflow.schema.service.impl.DefaultSchemaService;
import org.springframework.cloud.dataflow.server.controller.support.TaskExecutionControllerDeleteAction;
import org.springframework.cloud.dataflow.server.repository.StreamDefinitionRepository;
import org.springframework.cloud.dataflow.server.repository.TaskBatchDaoContainer;
import org.springframework.cloud.dataflow.server.repository.TaskExecutionDaoContainer;
import org.springframework.cloud.dataflow.server.service.TaskDeleteService;
import org.springframework.cloud.dataflow.server.service.TaskJobService;
import org.springframework.cloud.dataflow.server.single.DataFlowServerApplication;
import org.springframework.cloud.task.repository.TaskExecution;
import org.springframework.cloud.task.repository.TaskRepository;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.TestPropertySource;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -50,12 +75,12 @@
*
* @author Corneil du Plessis
*/
@SpringBootTest(classes = {DataFlowServerApplication.class}, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = {
"spring.jpa.hibernate.ddl-auto=none"
})
@SpringBootTest(classes = DataFlowServerApplication.class,
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = "spring.jpa.hibernate.ddl-auto=none")
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
@ExtendWith(OutputCaptureExtension.class)
public abstract class AbstractSmokeTest {
private final static Logger logger = LoggerFactory.getLogger(AbstractSmokeTest.class);

protected static JdbcDatabaseContainer<?> container;

Expand All @@ -68,52 +93,102 @@ static void databaseProperties(DynamicPropertyRegistry registry) {
}

@Autowired
SchemaService schemaService;
private SchemaService schemaService;

@Autowired
private TaskRepositoryContainer taskRepositoryContainer;

@Autowired
TaskRepositoryContainer taskRepositoryContainer;
private AggregateTaskExplorer taskExplorer;

@Autowired
protected AggregateTaskExplorer taskExplorer;
private StreamDefinitionRepository streamDefinitionRepository;

@Autowired
protected StreamDefinitionRepository streamDefinitionRepository;
private PlatformTransactionManager transactionManager;

@Autowired
protected PlatformTransactionManager transactionManager;
private TaskDeleteService taskDeleteService;

private MultiValueMap<SchemaVersionTarget, Long> createdExecutionIdsBySchemaTarget = new LinkedMultiValueMap<>();

@Test
void streamCreation() {
TransactionTemplate tx = new TransactionTemplate(transactionManager);
tx.execute(status -> {
StreamDefinition streamDefinition = new StreamDefinition("timelogger", "time | log");
streamDefinition = streamDefinitionRepository.save(streamDefinition);
Optional<StreamDefinition> loaded = streamDefinitionRepository.findById(streamDefinition.getName());
assertThat(loaded).isPresent();
assertThat(loaded.get().getDslText()).isEqualTo("time | log");
return true;
});
}

@Test
public void testTaskCreation() {
logger.info("testTaskCreation:started:{}", getClass().getSimpleName());
void taskCreation() {
long originalCount = this.taskExplorer.getTaskExecutionCount();
TransactionTemplate tx = new TransactionTemplate(transactionManager);
tx.execute(status -> {
for (SchemaVersionTarget schemaVersionTarget : schemaService.getTargets().getSchemas()) {
TaskRepository taskRepository = this.taskRepositoryContainer.get(schemaVersionTarget.getName());
TaskExecution taskExecution = taskRepository.createTaskExecution(schemaVersionTarget.getName() + "_test_task");
createdExecutionIdsBySchemaTarget.add(schemaVersionTarget, taskExecution.getExecutionId());
assertThat(taskExecution.getExecutionId()).isGreaterThan(0L);
}
return true;
});
assertThat(taskExplorer.getTaskExecutionCount()).isEqualTo(2);
Page<AggregateTaskExecution> page = taskExplorer.findAll(Pageable.ofSize(100));
List<AggregateTaskExecution> taskExecutions = page.getContent();
logger.info("TaskExecutions:{}", taskExecutions);
assertThat(taskExecutions.size()).isEqualTo(2);
taskExecutions.forEach(taskExecution -> {
assertThat(taskExecution.getExecutionId()).isNotEqualTo(0L);
});
logger.info("testTaskCreation:completed:{}", getClass().getSimpleName());
long expectedNewCount = originalCount + 2;
assertThat(taskExplorer.getTaskExecutionCount()).isEqualTo(expectedNewCount);
List<AggregateTaskExecution> taskExecutions = taskExplorer.findAll(Pageable.ofSize(100)).getContent();
assertThat(taskExecutions)
.hasSize((int)expectedNewCount)
.allSatisfy((taskExecution) -> assertThat(taskExecution.getExecutionId()).isNotEqualTo(0L));
}

@Test
public void streamCreation() {
TransactionTemplate tx = new TransactionTemplate(transactionManager);
tx.execute(status -> {
StreamDefinition streamDefinition = new StreamDefinition("timelogger", "time | log");
streamDefinition = streamDefinitionRepository.save(streamDefinition);
Optional<StreamDefinition> loaded = streamDefinitionRepository.findById(streamDefinition.getName());
assertThat(loaded).isPresent();
assertThat(loaded.get().getDslText()).isEqualTo("time | log");
return true;
});
void shouldSupportRowNumberFunction(@Autowired DataSource dataSource) throws MetaDataAccessException {
assertThat(DatabaseType.supportsRowNumberFunction(dataSource)).isEqualTo(supportsRowNumberFunction());
}

@ParameterizedTest
@MethodSource("schemaVersionTargetsProvider")
void shouldListJobExecutionsUsingPerformantRowNumberQuery(
SchemaVersionTarget schemaVersionTarget,
CapturedOutput output,
@Autowired TaskJobService taskJobService,
@Autowired TaskExecutionDaoContainer taskExecutionDaoContainer,
@Autowired TaskBatchDaoContainer taskBatchDaoContainer) throws NoSuchJobExecutionException {
Page<TaskJobExecution> jobExecutions = taskJobService.listJobExecutionsWithStepCount(Pageable.ofSize(100));
int originalCount = jobExecutions.getContent().size();
JobExecutionTestUtils testUtils = new JobExecutionTestUtils(taskExecutionDaoContainer, taskBatchDaoContainer);
TaskExecution execution1 = testUtils.createSampleJob("job1", 1, BatchStatus.STARTED, new JobParameters(), schemaVersionTarget);
createdExecutionIdsBySchemaTarget.add(schemaVersionTarget, execution1.getExecutionId());
TaskExecution execution2 = testUtils.createSampleJob("job2", 3, BatchStatus.COMPLETED, new JobParameters(), schemaVersionTarget);
createdExecutionIdsBySchemaTarget.add(schemaVersionTarget, execution2.getExecutionId());
jobExecutions = taskJobService.listJobExecutionsWithStepCount(Pageable.ofSize(100));
assertThat(jobExecutions).hasSize(originalCount + 4);
String expectedSqlFragment = (this.supportsRowNumberFunction()) ?
"as STEP_COUNT, ROW_NUMBER() OVER (PARTITION" :
"as STEP_COUNT FROM AGGREGATE_JOB_INSTANCE";
Awaitility.waitAtMost(Duration.ofSeconds(5))
.untilAsserted(() -> assertThat(output).contains(expectedSqlFragment));
}

static Stream<SchemaVersionTarget> schemaVersionTargetsProvider() {
return new DefaultSchemaService().getTargets().getSchemas().stream();
}

@AfterEach
void cleanupAfterTest() {
Set<TaskExecutionControllerDeleteAction> actions = new HashSet<>();
actions.add(TaskExecutionControllerDeleteAction.CLEANUP);
actions.add(TaskExecutionControllerDeleteAction.REMOVE_DATA);
createdExecutionIdsBySchemaTarget.forEach((schemaTarget, executionIds) ->
this.taskDeleteService.cleanupExecutions(actions, new HashSet<>(executionIds), schemaTarget.getName()));
}

protected boolean supportsRowNumberFunction() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.springframework.cloud.dataflow.server.db.migration;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.Db2Container;


Expand All @@ -25,7 +24,6 @@
*
* @author Corneil du Plessis
*/
@Disabled("Will fix once PR is merged to run all tests")
public class DB2SmokeTest extends AbstractSmokeTest {
@BeforeAll
static void startContainer() {
Expand Down
Loading

0 comments on commit d7bf6b8

Please sign in to comment.