Skip to content
This repository has been archived by the owner on May 16, 2023. It is now read-only.

Commit

Permalink
Chunk-wise CSV export (#274)
Browse files Browse the repository at this point in the history
  • Loading branch information
f11h authored Dec 19, 2022
1 parent f2ce658 commit 811e99d
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
import app.coronawarn.quicktest.archive.domain.Archive;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import javax.persistence.RollbackException;
import javax.persistence.TypedQuery;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.data.domain.PageRequest;

// see app.coronawarn.quicktest.config.ArchiveEntityManagerConfig
@RequiredArgsConstructor
Expand Down Expand Up @@ -65,7 +64,11 @@ public Archive save(Archive archive) {
*/
public List<Archive> findAll() {
this.em.getTransaction().begin();
final List<Archive> result = this.em.createQuery("SELECT a FROM Archive a", Archive.class).getResultList();

final List<Archive> result = em
.createQuery("SELECT a FROM Archive a", Archive.class)
.getResultList();

this.em.getTransaction().commit();
return result;
}
Expand All @@ -77,8 +80,11 @@ public List<Archive> findAll() {
*/
public List<Archive> findAllByPocId(final String pocId) {
this.em.getTransaction().begin();
TypedQuery<Archive> query = this.em.createQuery("SELECT a FROM Archive a WHERE a.pocId = ?1", Archive.class);
final List<Archive> result = query.setParameter(1, pocId).getResultList();

final List<Archive> result = em
.createQuery("SELECT a FROM Archive a WHERE a.pocId = ?1", Archive.class)
.setParameter(1, pocId).getResultList();

this.em.getTransaction().commit();
return result;
}
Expand All @@ -88,10 +94,16 @@ public List<Archive> findAllByPocId(final String pocId) {
*
* @return {@link List} of {@link Archive}
*/
public List<Archive> findAllByTenantId(final String tenantId) {
public List<Archive> findAllByTenantId(final String tenantId, PageRequest pageRequest) {
em.getTransaction().begin();
TypedQuery<Archive> query = em.createQuery("SELECT a FROM Archive a WHERE a.tenantId = ?1", Archive.class);
final List<Archive> result = query.setParameter(1, tenantId).getResultList();

final List<Archive> result = em
.createQuery("SELECT a FROM Archive a WHERE a.tenantId = ?1", Archive.class)
.setFirstResult(pageRequest.getPageNumber() * pageRequest.getPageSize())
.setMaxResults(pageRequest.getPageSize())
.setParameter(1, tenantId)
.getResultList();

em.getTransaction().commit();
return result;
}
Expand All @@ -101,9 +113,12 @@ public List<Archive> findAllByTenantId(final String tenantId) {
*/
public void deleteAllByTenantId(final String tenantId) {
em.getTransaction().begin();
Query query = em.createQuery("DELETE FROM Archive a WHERE a.tenantId = ?1");
query.setParameter(1, tenantId);
int rows = query.executeUpdate();

em
.createQuery("DELETE FROM Archive a WHERE a.tenantId = ?1")
.setParameter(1, tenantId)
.executeUpdate();

em.getTransaction().commit();
}

Expand All @@ -114,9 +129,12 @@ public void deleteAllByTenantId(final String tenantId) {
*/
public List<String> findAllHashedGuids(final List<String> search) {
this.em.getTransaction().begin();
TypedQuery<String> query =
this.em.createQuery("SELECT a.hashedGuid FROM Archive a WHERE a.hashedGuid IN ?1", String.class);
final List<String> result = query.setParameter(1, search).getResultList();

final List<String> result = em
.createQuery("SELECT a.hashedGuid FROM Archive a WHERE a.hashedGuid IN ?1", String.class)
.setParameter(1, search)
.getResultList();

this.em.getTransaction().commit();
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ public List<ArchiveCipherDtoV1> getQuicktestsFromLongterm(final String pocId, fi
/**
* Get longterm archives by tenantId.
*/
public List<ArchiveCipherDtoV1> getQuicktestsFromLongtermByTenantId(final String tenantId) {
List<Archive> allByPocId = longTermArchiveRepository.findAllByTenantId(createHash(tenantId));
public List<ArchiveCipherDtoV1> getQuicktestsFromLongtermByTenantId(final String tenantId, int page, int pageSize) {
List<Archive> allByPocId = longTermArchiveRepository
.findAllByTenantId(createHash(tenantId), PageRequest.of(page, pageSize));
List<ArchiveCipherDtoV1> dtos = new ArrayList<>(allByPocId.size());
for (Archive archive : allByPocId) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class CancellationSchedulingService {
*/
@Scheduled(cron = "${archive.cancellationArchiveJob.cron}")
@SchedulerLock(name = "CancellationArchiveJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.cancellationArchiveJob.locklimit}")
lockAtMostFor = "${archive.cancellationArchiveJob.locklimit}")
public void cancellationArchiveJob() {
log.info("Starting Job: cancellationArchiveJob");
processCancellationArchiveBatchRecursion(cancellationService.getReadyToArchiveBatch());
Expand All @@ -87,7 +87,7 @@ private void processCancellationArchiveBatchRecursion(List<Cancellation> cancell
*/
@Scheduled(cron = "${archive.csvUploadJob.cron}")
@SchedulerLock(name = "CsvUploadJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.csvUploadJob.locklimit}")
lockAtMostFor = "${archive.csvUploadJob.locklimit}")
public void csvUploadJob() {
log.info("Starting Job: csvUploadJob");
processCsvUploadBatchRecursion(cancellationService.getReadyToUploadBatch());
Expand All @@ -97,34 +97,51 @@ public void csvUploadJob() {
private void processCsvUploadBatchRecursion(List<Cancellation> cancellations) {
log.info("Process CSV Upload Batch with size of {}", cancellations.size());
for (Cancellation cancellation : cancellations) {
log.info("Processing CSV for Partner {}", cancellation.getPartnerId());
try {
List<ArchiveCipherDtoV1> quicktests =
archiveService.getQuicktestsFromLongtermByTenantId(cancellation.getPartnerId());

StringWriter stringWriter = new StringWriter();
CSVWriter csvWriter =
new CSVWriter(stringWriter, '\t', CSVWriter.DEFAULT_QUOTE_CHARACTER,
'\\', CSVWriter.DEFAULT_LINE_END);
CSVWriter csvWriter = new CSVWriter(
stringWriter,
'\t',
CSVWriter.DEFAULT_QUOTE_CHARACTER,

'\\',
CSVWriter.DEFAULT_LINE_END);

StatefulBeanToCsv<ArchiveCipherDtoV1> beanToCsv =
new StatefulBeanToCsvBuilder<ArchiveCipherDtoV1>(csvWriter)
.build();
beanToCsv.write(quicktests);
byte[] csvBytes = stringWriter.toString().getBytes(StandardCharsets.UTF_8);
new StatefulBeanToCsvBuilder<ArchiveCipherDtoV1>(csvWriter).build();

int page = 0;
int pageSize = 500;
int totalEntityCount = 0;
List<ArchiveCipherDtoV1> quicktests;
do {
log.info("Loading Archive Chunk {} for Partner {}", page, cancellation.getPartnerId());
quicktests = archiveService.getQuicktestsFromLongtermByTenantId(
cancellation.getPartnerId(), page, pageSize);
totalEntityCount += quicktests.size();
log.info("Found {} Quicktests in Archive for Chunk {} for Partner {}",
quicktests.size(), page, cancellation.getPartnerId());
beanToCsv.write(quicktests);
page++;
} while (!quicktests.isEmpty());
log.info("Got {} Quicktests for Partner {}", totalEntityCount, cancellation.getPartnerId());

byte[] csvBytes = stringWriter.toString().getBytes(StandardCharsets.UTF_8);
String objectId = cancellation.getPartnerId() + ".csv";

ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(csvBytes.length);

s3Client.putObject(
s3Config.getBucketName(),
objectId,
new ByteArrayInputStream(csvBytes), metadata);
s3Config.getBucketName(),
objectId,
new ByteArrayInputStream(csvBytes), metadata);

log.info("File stored to S3 with id {}", objectId);
log.info("File stored to S3 with id: {}, size: {}, hash: {}",
objectId, csvBytes.length, getHash(csvBytes));

cancellationService.updateCsvCreated(cancellation, ZonedDateTime.now(), objectId,
getHash(csvBytes), quicktests.size(), csvBytes.length);
getHash(csvBytes), totalEntityCount, csvBytes.length);
} catch (Exception e) {
String errorMessage = e.getClass().getName() + ": " + e.getMessage();

Expand All @@ -144,7 +161,7 @@ private void processCsvUploadBatchRecursion(List<Cancellation> cancellations) {
*/
@Scheduled(cron = "${archive.cancellationSearchPortalDeleteJob.cron}")
@SchedulerLock(name = "CancellationSearchPortalDeleteJob", lockAtLeastFor = "PT0S",
lockAtMostFor = "${archive.cancellationSearchPortalDeleteJob.locklimit}")
lockAtMostFor = "${archive.cancellationSearchPortalDeleteJob.locklimit}")
public void cancellationSearchPortalDeleteJob() {
log.info("Starting Job: cancellationSearchPortalDeleteJob");
processCancellationDeleteSearchPortalBatch(cancellationService.getReadyToDeleteSearchPortal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -94,7 +95,7 @@ void setUp() {
public static final String PARTNER_ID_HASH = "212e58b487b6d6b486b71c6ebb3fedc0db3c69114f125fb3cd2fbc72e6ffc25f";

@ParameterizedTest
@ValueSource(ints = {1, 5000})
@ValueSource(ints = {1, 5_000})
@Transactional
void testCsvExport(int n) throws IOException, NoSuchAlgorithmException, CsvException {
Cancellation cancellation = new Cancellation();
Expand All @@ -107,12 +108,12 @@ void testCsvExport(int n) throws IOException, NoSuchAlgorithmException, CsvExcep
}

Assertions.assertEquals(n, shortTermArchiveRepository.findAllByTenantId(PARTNER_ID, Pageable.unpaged()).count());
Assertions.assertEquals(0, longTermArchiveRepository.findAllByTenantId(PARTNER_ID_HASH).size());
Assertions.assertEquals(0, longTermArchiveRepository.findAllByTenantId(PARTNER_ID_HASH, PageRequest.of(0, Integer.MAX_VALUE)).size());

cancellationSchedulingService.cancellationArchiveJob();

Assertions.assertEquals(0, shortTermArchiveRepository.findAllByTenantId(PARTNER_ID, Pageable.unpaged()).count());
Assertions.assertEquals(n, longTermArchiveRepository.findAllByTenantId(PARTNER_ID_HASH).size());
Assertions.assertEquals(n, longTermArchiveRepository.findAllByTenantId(PARTNER_ID_HASH, PageRequest.of(0, Integer.MAX_VALUE)).size());

ArgumentCaptor<InputStream> inputStreamArgumentCaptor = ArgumentCaptor.forClass(InputStream.class);
String expectedFileName = PARTNER_ID + ".csv";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,16 +302,16 @@ void testFinalDeleteJob() {
final QuickTestArchive test = buildCancellationQuickTestArchive();
quickTestArchiveRepository.saveAndFlush(test);
archiveService.moveToArchiveByTenantId(PARTNER_ID);
var archiveEntries = archiveService.getQuicktestsFromLongtermByTenantId(PARTNER_ID);
var archiveEntries = archiveService.getQuicktestsFromLongtermByTenantId(PARTNER_ID, 0, Integer.MAX_VALUE);
assertFalse(archiveEntries.isEmpty());
Cancellation cancellation = cancellationService.createCancellation(PARTNER_ID, CANCELLATION_DATE);
cancellationService.finalDeleteJob();
archiveEntries = archiveService.getQuicktestsFromLongtermByTenantId(PARTNER_ID);
archiveEntries = archiveService.getQuicktestsFromLongtermByTenantId(PARTNER_ID, 0, Integer.MAX_VALUE);
assertFalse(archiveEntries.isEmpty());
cancellation.setCancellationDate(ZonedDateTime.now().minusWeeks(4));
cancellationRepository.save(cancellation);
cancellationService.finalDeleteJob();
archiveEntries = archiveService.getQuicktestsFromLongtermByTenantId(PARTNER_ID);
archiveEntries = archiveService.getQuicktestsFromLongtermByTenantId(PARTNER_ID, 0, Integer.MAX_VALUE);
assertTrue(archiveEntries.isEmpty());
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,4 @@ s3:
endpoint: http://localhost:9000
name: eu-de
secretKey: minioadmin
expiration: 3600
expiration: 3600

0 comments on commit 811e99d

Please sign in to comment.