Skip to content

Commit

Permalink
Use ROW_NUMBER in JdbcAggregateJobQueryDao to improve performance
Browse files Browse the repository at this point in the history
See #5524
  • Loading branch information
onobc committed Dec 5, 2023
1 parent e1f7250 commit 320ec57
Showing 1 changed file with 34 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.springframework.cloud.dataflow.server.db.migration;

import java.sql.Timestamp;
import java.sql.Types;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Date;

Expand All @@ -25,7 +28,6 @@
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.repository.dao.JdbcJobExecutionDao;
import org.springframework.batch.core.repository.dao.JdbcJobInstanceDao;
import org.springframework.batch.item.database.support.DataFieldMaxValueIncrementerFactory;
import org.springframework.cloud.dataflow.core.database.support.DatabaseType;
Expand All @@ -40,7 +42,9 @@
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.support.MetaDataAccessException;
import org.springframework.jdbc.support.incrementer.DataFieldMaxValueIncrementer;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.StringUtils;

class JobExecutionTestUtils
{
Expand Down Expand Up @@ -71,19 +75,17 @@ TaskExecution createSampleJob(String jobName, int jobExecutionCount, BatchStatus
jobInstanceDao.setTablePrefix(schemaVersionTarget.getBatchPrefix());
jobInstanceDao.setJobIncrementer(incrementerFactory.getIncrementer(incrementerFallbackType.name(), schemaVersionTarget.getBatchPrefix() + "JOB_SEQ"));

JdbcJobExecutionDao jobExecutionDao = new JdbcJobExecutionDao();
jobExecutionDao.setJdbcTemplate(jdbcTemplate);
jobExecutionDao.setTablePrefix(schemaVersionTarget.getBatchPrefix());
jobExecutionDao.setJobExecutionIncrementer(incrementerFactory.getIncrementer(incrementerFallbackType.name(), schemaVersionTarget.getBatchPrefix() + "JOB_EXECUTION_SEQ"));

// BATCH_JOB_EXECUTION differs and the DAO can not be used for BATCH4/5 inserting
DataFieldMaxValueIncrementer jobExecutionIncrementer = incrementerFactory.getIncrementer(incrementerFallbackType.name(), schemaVersionTarget.getBatchPrefix() + "JOB_EXECUTION_SEQ");
TaskBatchDao taskBatchDao = this.taskBatchDaoContainer.get(schemaVersion);

TaskExecution taskExecution = taskExecutionDao.createTaskExecution(jobName, new Date(), new ArrayList<>(), null);
JobInstance jobInstance = jobInstanceDao.createJobInstance(jobName, jobParameters);
for (int i = 0; i < jobExecutionCount; i++) {
JobExecution jobExecution = new JobExecution(jobInstance, new JobParameters());
jobExecution.setStatus(batchStatus);
jobExecutionDao.saveJobExecution(jobExecution);
jobExecution.setId(jobExecutionIncrementer.nextLongValue());
jobExecution.setStartTime(new Date());
saveJobExecution(jobExecution, jdbcTemplate, schemaVersionTarget);
taskBatchDao.saveRelationship(taskExecution, jobExecution);
}
return taskExecution;
Expand All @@ -101,4 +103,28 @@ private DatabaseType determineIncrementerFallbackType(DataSource dataSource) {
}
return databaseType;
}

private JobExecution saveJobExecution(JobExecution jobExecution, JdbcTemplate jdbcTemplate, SchemaVersionTarget schemaVersionTarget) {
jobExecution.setStartTime(new Date());
jobExecution.setVersion(1);
Timestamp startTime = timestampFromDate(jobExecution.getStartTime());
Timestamp endTime = timestampFromDate(jobExecution.getEndTime());
Timestamp createTime = timestampFromDate(jobExecution.getCreateTime());
Timestamp lastUpdated = timestampFromDate(jobExecution.getLastUpdated());
Object[] parameters = new Object[] { jobExecution.getId(), jobExecution.getJobId(), startTime, endTime,
jobExecution.getStatus().toString(), jobExecution.getExitStatus().getExitCode(),
jobExecution.getExitStatus().getExitDescription(), jobExecution.getVersion(), createTime, lastUpdated };
String sql = "INSERT INTO %PREFIX%JOB_EXECUTION(JOB_EXECUTION_ID, " +
"JOB_INSTANCE_ID, START_TIME, END_TIME, STATUS, EXIT_CODE, EXIT_MESSAGE, VERSION, CREATE_TIME, LAST_UPDATED) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
sql = StringUtils.replace(sql, "%PREFIX%", schemaVersionTarget.getBatchPrefix());
jdbcTemplate.update(sql, parameters,
new int[] { Types.BIGINT, Types.BIGINT, Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.INTEGER, Types.TIMESTAMP, Types.TIMESTAMP });
return jobExecution;
}

private Timestamp timestampFromDate(Date date) {
return (date != null) ? Timestamp.valueOf(date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime()) : null;
}
}

0 comments on commit 320ec57

Please sign in to comment.