From 18f4dd417884e94eb22b10391a977d65546102a9 Mon Sep 17 00:00:00 2001 From: Emanuela Epure <67077116+emanuelaepure10@users.noreply.github.com> Date: Thu, 14 Dec 2023 22:56:22 +0100 Subject: [PATCH] feat: Add actual transformation functionality Add actual transformation functionality to hale-transformer-api service. SVC-1714 --- .gitignore | 1 + .../hale/transformer/RunContext.java | 28 ++ .../hale/transformer/Transformer.java | 434 ++++++++++++++++-- .../hale/transformer/TransformerConfig.java | 6 - .../hale/transformer/api/Init.java | 19 + .../api/internal/CountdownLatchConfig.java | 20 + .../CustomMetaClassCreationHandle.java | 76 +++ .../TransformationMessageConsumer.java | 16 +- src/main/resources/application.properties | 3 +- 9 files changed, 564 insertions(+), 39 deletions(-) create mode 100644 src/main/java/to/wetransform/hale/transformer/RunContext.java delete mode 100644 src/main/java/to/wetransform/hale/transformer/TransformerConfig.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/Init.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/internal/CountdownLatchConfig.java create mode 100644 src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java diff --git a/.gitignore b/.gitignore index bec0c6a..a4faa29 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,4 @@ out/ *.code-workspace # Local History for Visual Studio Code .history/ + diff --git a/src/main/java/to/wetransform/hale/transformer/RunContext.java b/src/main/java/to/wetransform/hale/transformer/RunContext.java new file mode 100644 index 0000000..0c5a817 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/RunContext.java @@ -0,0 +1,28 @@ +package to.wetransform.hale.transformer; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.io.FileUtils; + +public class RunContext { + + private final List tempFiles = new ArrayList<>(); + + public File createTempDir() throws IOException { + Path path = Files.createTempDirectory("hale-transformer"); + tempFiles.add(path); + return path.toFile(); + } + + public void cleanUp() throws IOException { + for (Path path : tempFiles) { + FileUtils.deleteDirectory(path.toFile()); + } + tempFiles.clear(); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/Transformer.java b/src/main/java/to/wetransform/hale/transformer/Transformer.java index 4f1465b..262d02b 100644 --- a/src/main/java/to/wetransform/hale/transformer/Transformer.java +++ b/src/main/java/to/wetransform/hale/transformer/Transformer.java @@ -1,66 +1,237 @@ package to.wetransform.hale.transformer; -import java.io.InputStream; +import java.io.*; import java.net.URI; -import java.text.MessageFormat; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.*; +import java.util.Map.Entry; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import com.google.common.base.Strings; import eu.esdihumboldt.hale.app.transform.ExecContext; +import eu.esdihumboldt.hale.app.transform.ExecTransformation; +import eu.esdihumboldt.hale.app.transform.ExecUtil; import eu.esdihumboldt.hale.common.core.HalePlatform; +import eu.esdihumboldt.hale.common.core.io.HaleIO; +import eu.esdihumboldt.hale.common.core.io.IOProvider; +import eu.esdihumboldt.hale.common.core.io.Value; +import eu.esdihumboldt.hale.common.core.io.extension.IOProviderDescriptor; +import eu.esdihumboldt.hale.common.core.io.extension.IOProviderExtension; +import eu.esdihumboldt.hale.common.core.io.project.model.IOConfiguration; import eu.esdihumboldt.hale.common.core.io.project.model.Project; import eu.esdihumboldt.hale.common.core.io.supplier.DefaultInputSupplier; +import eu.esdihumboldt.hale.common.core.report.Report; +import eu.esdihumboldt.hale.common.core.report.ReportSession; +import eu.esdihumboldt.hale.common.core.report.util.StatisticsHelper; +import eu.esdihumboldt.hale.common.core.report.writer.ReportReader; +import eu.esdihumboldt.hale.common.instance.io.InstanceIO; +import eu.esdihumboldt.hale.common.instance.io.InstanceWriter; +import eu.esdihumboldt.util.groovy.collector.StatsCollector; import eu.esdihumboldt.util.io.IOUtils; +import org.apache.commons.io.output.TeeOutputStream; +import org.eclipse.core.runtime.content.IContentType; +import org.json.JSONException; +import org.json.JSONObject; import org.osgi.framework.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import to.wetransform.halecli.internal.Init; +import to.wetransform.hale.transformer.api.Init; public class Transformer { private static final Logger LOG = LoggerFactory.getLogger(Transformer.class); - private CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch = new CountDownLatch(1); - public void transform(/* TODO add parameters for data and project sources */ ) { - // TODO setup log files for reports and transformation log + public void transform(String sourceDataURL, String projectURL, String targetURL) { + File transformationLogFile = null; - long heapMaxSize = Runtime.getRuntime().maxMemory(); - LOG.info("Maximum heap size configured as " + IOUtils.humanReadableByteCount(heapMaxSize, false)); + try { + Path tempDirectory = createTempDirectory(); + transformationLogFile = createTransformationLogFile(tempDirectory); - Init.init(); + setupLogging(tempDirectory, transformationLogFile); + File reportFile = createReportFile(tempDirectory); - Version version = HalePlatform.getCoreVersion(); - LOG.info(MessageFormat.format("Launching hale-transformer {0}...", version.toString())); + LOG.info("Startup..."); + logHeapSize(); - ExecContext context = new ExecContext(); + Init.init(); + logPlatformVersion(); - // URI projectUri = .... - // context.setProject(projectUri); - // Project project = loadProject(projectUri); + ExecContext context = new ExecContext(); - // context.setSources(...) - // context.setSourceProviderIds(...) - // context.setSourcesSettings(...) + // Set up project URI + URI projectUri = new URI(projectURL); + context.setProject(projectUri); + // Load project + Project project = loadProject(projectUri); - // Value sourceCrs = null; - // TODO determine source CRS + Value sourceCrs = initializeSourceConfig(context, sourceDataURL); - // TargetConfig targetConfig = configureTarget(project, sourceCrs); + TargetConfig targetConfig = configureTarget(project, sourceCrs); + configureTargetContext(context, tempDirectory, targetConfig, reportFile); - try { // run the transformation + LOG.info("Transforming started."); + new ExecTransformation().run(context); - LOG.info("Transforming..."); - TimeUnit.SECONDS.sleep(30); - // new ExecTransformation().run(context); - - LOG.info("Transformation complete."); + // evaluate results + boolean success = evaluateTransformationResults(reportFile); + LOG.info("Transformation complete with success = {}", success); } catch (Throwable t) { - LOG.error("Failed to execute transformation: " + t.getMessage(), t); + LOG.error("Failed to execute transformation: {}", t.getMessage(), t); } finally { latch.countDown(); + deleteDir(transformationLogFile.getParentFile()); + } + } + + // Extracted Methods + + private Path createTempDirectory() throws IOException { + return Files.createTempDirectory("hale-transformer"); + } + + private File createTransformationLogFile(Path tempDirectory) throws IOException { + File transformationLogFile = + Files.createTempFile(tempDirectory, "transformation", ".log").toFile(); + transformationLogFile.delete(); + transformationLogFile.createNewFile(); + return transformationLogFile; + } + + private void setupLogging(Path tempDirectory, File transformationLogFile) throws IOException { + FileOutputStream transformationLogOut = new FileOutputStream(transformationLogFile, true); + OutputStream tOut = new TeeOutputStream(System.out, transformationLogOut); + System.setOut(new PrintStream(tOut)); + } + + private File createReportFile(Path tempDirectory) throws IOException { + File reportFile = Files.createTempFile(tempDirectory, "reports", ".log").toFile(); + reportFile.delete(); + reportFile.createNewFile(); + return reportFile; + } + + private void logHeapSize() { + long heapMaxSize = Runtime.getRuntime().maxMemory(); + LOG.info("Maximum heap size configured as {}", IOUtils.humanReadableByteCount(heapMaxSize, false)); + } + + private void logPlatformVersion() { + Version version = HalePlatform.getCoreVersion(); + LOG.info("Launching hale-transformer {}...", version); + } + + private Value initializeSourceConfig(ExecContext context, String sourceDataURL) throws URISyntaxException { + Map defaultSrs = initializeDefaultSrs(); + SourceConfig sourceConfig = initializeSourceConfig(sourceDataURL, defaultSrs); + + List sourceConfigs = new ArrayList<>(); + sourceConfigs.add(sourceConfig); + + context.setSources(sourceConfigs.stream() + .filter(sourceConfigList -> sourceConfigList.transform()) + .map(sourceConfigList -> sourceConfig.location()) + .collect(Collectors.toList())); + context.setSourceProviderIds(sourceConfigs.stream() + .map(sourceConfigList -> sourceConfigList.providerId()) + .collect(Collectors.toList())); + context.setSourcesSettings(sourceConfigs.stream() + .map(sourceConfigList -> sourceConfigList.settings()) + .collect(Collectors.toList())); + + return extractedDetectedCRS(sourceConfigs); + } + + private Value extractedDetectedCRS(List sourceConfigs) { + // extract detected source data crs and use in target config + Value sourceCrs = null; + Throwable error = null; + // try each source config, in case it is not set for some + for (int i = 0; i < sourceConfigs.size() && (sourceCrs == null || sourceCrs.isEmpty()); i++) { + try { + sourceCrs = ((SourceConfig) sourceConfigs.get(i)).settings().get("defaultSrs"); + } catch (Throwable t) { + error = t; + } + } + + if (error != null) { + LOG.warn("Could not determine source data CRS", error); + } else if (sourceCrs == null || sourceCrs.isEmpty()) { + LOG.warn( + "Unable to determine source data CRS: None of {} sources is configured with a CRS", + sourceConfigs.size()); + } + return sourceCrs; + } + + private Map initializeDefaultSrs() { + Map defaultSrs = new HashMap<>(); + defaultSrs.put("defaultSrs", Value.of("EPSG:4326")); + defaultSrs.put("xml.pretty", Value.of(true)); + defaultSrs.put("crs.epsg.prefix", Value.of("http://www.opengis.net/def/crs/EPSG/0/")); + defaultSrs.put("crs", Value.of("code:EPSG:4326")); + return defaultSrs; + } + + private SourceConfig initializeSourceConfig(String sourceDataURL, Map defaultSrs) + throws URISyntaxException { + return new SourceConfig( + new URI(sourceDataURL), "eu.esdihumboldt.hale.io.gml.reader", defaultSrs, true, new ArrayList<>()); + } + + private void configureTargetContext( + ExecContext context, Path tempDirectory, TargetConfig targetConfig, File reportFile) + throws URISyntaxException { + File resultDir = new File(tempDirectory.toFile(), "result"); + resultDir.mkdir(); + + String targetFilename = targetConfig.filename(); + if (targetFilename == null) { + targetFilename = "result.out"; + } + + File targetFile = new File(resultDir, targetFilename); + context.setTarget(targetFile.toURI()); + + String preset = targetConfig.preset(); + CustomTarget customConfig = targetConfig.customTarget(); + if (preset != null) { + context.setPreset(preset); + } else { + if (customConfig == null || customConfig.providerId() == null) { + throw new IllegalStateException("No configuration on how to write transformed data available"); + } + context.setTargetProviderId(customConfig.providerId()); + } + + if (customConfig != null) { + context.setTargetSettings(customConfig.settings()); + } else { + context.setTargetSettings(new HashMap<>()); } + + context.setReportsOut(reportFile); + + // general configuration + context.setLogException(true); + } + + private boolean evaluateTransformationResults(File reportFile) throws IOException { + // evaluate results TODO to be uncommented + ReportReader reader = new ReportReader(); + ReportSession reports = reader.readFile(reportFile); + JSONObject stats = getStats(reports); + boolean success = evaluateReports(reports.getAllReports().values(), false); + + LOG.info("Transformation complete with success = " + success); + return success; } private Project loadProject(URI projectUri) { @@ -74,6 +245,211 @@ private Project loadProject(URI projectUri) { return result; } + private TargetConfig configureTarget(Project lp, Value sourceCrs) { + String filename; + String preset = null; + CustomTarget customTarget = null; + + Map presets = getPresets(lp); + + // Preset names + String defaultPreset = "default"; + String hcPreset = "hale-connect"; + + if (presets.containsKey(hcPreset)) { + // Project contains hale-connect preset + preset = hcPreset; + IOConfiguration ioConfiguration = presets.get(hcPreset); + filename = determineTargetFileName(ioConfiguration); + } else if (presets.containsKey(defaultPreset)) { + // Project contains default preset + preset = defaultPreset; + IOConfiguration ioConfiguration = presets.get(defaultPreset); + filename = determineTargetFileName(ioConfiguration); + } else { + // No specific presets found, creating a custom target configuration + + Map targetMap = new HashMap<>(); + + // Specify target provider for GML FeatureCollection + String targetProvider = "eu.esdihumboldt.hale.io.gml.xplan.writer"; // "eu.esdihumboldt.hale.io.gml.writer"; + + // Additional settings for testing + targetMap.put("xml.pretty", Value.of(true)); + targetMap.put("crs.epsg.prefix", Value.of("http://www.opengis.net/def/crs/EPSG/0/")); + + // Use CRS from source data analysis if available and a valid EPSG code, + // otherwise fallback to EPSG:4326 + Value targetCrs = + (sourceCrs != null && sourceCrs.getStringRepresentation().startsWith("code:EPSG")) + ? sourceCrs + : Value.of("code:EPSG:4326"); + + targetMap.put("crs", targetCrs); + LOG.info("Using {} as the transformation target CRS", targetCrs.getStringRepresentation()); + + // Create a custom target configuration + CustomTarget target = new CustomTarget(targetProvider, targetMap); + + filename = "inspire.gml"; + customTarget = target; + } + + // Create and return the target configuration + return new TargetConfig(filename, preset, customTarget); + } + + /** + * Determine the name of the target file based on an export preset. + * + * @param preset the export preset + * @return the file name for the target file + */ + public static String determineTargetFileName(IOConfiguration preset) { + // Default extension to "xml" to reflect old behavior + String extension = "xml"; + + IContentType contentType = determineContentType(preset); + if (contentType != null) { + // Derive extension from content type + String[] extensions = contentType.getFileSpecs(IContentType.FILE_EXTENSION_SPEC); + if (extensions != null && extensions.length > 0) { + extension = extensions[0]; // Choose the first one + } + } + + // If extension would be "gml," use "xml" instead for backward compatibility + extension = "gml".equalsIgnoreCase(extension) ? "xml" : extension; + + LOG.info("Chose .{} as the extension for the target file", extension); + + return "result." + extension; + } + + private static IContentType determineContentType(IOConfiguration preset) { + // Usually, the content type is part of the settings + Value value = preset.getProviderConfiguration().get(IOProvider.PARAM_CONTENT_TYPE); + if (value != null && !value.isEmpty()) { + return HalePlatform.getContentTypeManager().getContentType(value.as(String.class)); + } + + // Try to determine based on provider ID + String providerId = preset.getProviderId(); + if (providerId != null) { + IOProviderDescriptor providerDescriptor = + IOProviderExtension.getInstance().getFactory(providerId); + if (providerDescriptor != null) { + Set supportedTypes = providerDescriptor.getSupportedTypes(); + if (!supportedTypes.isEmpty()) { + IContentType contentType = supportedTypes.iterator().next(); + + if (supportedTypes.size() > 1) { + LOG.warn( + "Multiple content types as candidates ({}), chose {}", + supportedTypes.stream().map(IContentType::getId).collect(Collectors.joining(", ")), + contentType.getId()); + } + + return contentType; + } + } + } + + return null; + } + + /** + * Get all export presets from the project. + * + * @param project the hale project object + * @return the map of presets + */ + private Map getPresets(Project project) { + Map exportPresets = new HashMap<>(); + + if (project == null) { + return exportPresets; + } + + for (Entry entry : + project.getExportConfigurations().entrySet()) { + IOConfiguration originalConfiguration = entry.getValue(); + + if (InstanceIO.ACTION_SAVE_TRANSFORMED_DATA.equals(originalConfiguration.getActionId())) { + String presetName = entry.getKey(); + IOConfiguration clonedConfiguration = originalConfiguration.clone(); + + // Check and add the I/O provider to exportPresets + checkAndAddIOProvider(presetName, clonedConfiguration, exportPresets); + } + } + + return exportPresets; + } + + private void checkAndAddIOProvider( + String presetName, IOConfiguration configuration, Map exportPresets) { + String providerId = configuration.getProviderId(); + IOProviderDescriptor factory = HaleIO.findIOProviderFactory(InstanceWriter.class, null, providerId); + + if (factory != null) { + String name = Strings.isNullOrEmpty(presetName) ? factory.getDisplayName() : presetName; + exportPresets.computeIfAbsent(name, k -> configuration); + } else { + LOG.error("I/O provider {} for export preset {} not found", providerId, presetName); + } + } + + /** + * After transformation, assemble stats from reports. + * + * @param reports the reports + */ + private JSONObject getStats(ReportSession reports) { + StatsCollector root = + new StatisticsHelper().getStatistics(reports.getAllReports().values(), true); + + try { + return new JSONObject(root.saveToJson(false)); + } catch (JSONException e) { + LOG.error("Error assembling stats JSON representation", e); + return null; + } + } + + private boolean evaluateReports(Collection> reports, boolean detailed) { + boolean ok = true; + ExecUtil.info("Transformation tasks summaries:"); + + for (Report report : reports) { + if (!report.isSuccess() || !report.getErrors().isEmpty()) { + ok = false; + + ExecUtil.error(report.getTaskName() + ": " + report.getSummary()); + if (detailed) { + report.getErrors().forEach(e -> { + ExecUtil.error(e.getStackTrace()); + }); + } + } else { + ExecUtil.info(report.getTaskName() + ": " + report.getSummary()); + } + // TODO process information, provide in a usable way? + } + + return ok; + } + + void deleteDir(File file) { + File[] contents = file.listFiles(); + if (contents != null) { + for (File f : contents) { + deleteDir(f); + } + } + file.delete(); + } + public CountDownLatch getLatch() { return latch; } diff --git a/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java b/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java deleted file mode 100644 index e0b76a4..0000000 --- a/src/main/java/to/wetransform/hale/transformer/TransformerConfig.java +++ /dev/null @@ -1,6 +0,0 @@ -package to.wetransform.hale.transformer; - -public class TransformerConfig { - - // empty for now -} diff --git a/src/main/java/to/wetransform/hale/transformer/api/Init.java b/src/main/java/to/wetransform/hale/transformer/api/Init.java new file mode 100644 index 0000000..03f02f2 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/Init.java @@ -0,0 +1,19 @@ +package to.wetransform.hale.transformer.api; + +import groovy.lang.GroovySystem; +import org.eclipse.equinox.nonosgi.registry.RegistryFactoryHelper; +import org.slf4j.bridge.SLF4JBridgeHandler; +import to.wetransform.hale.transformer.api.internal.CustomMetaClassCreationHandle; + +public class Init { + + public static void init() { + SLF4JBridgeHandler.install(); + + // initialize registry + RegistryFactoryHelper.getRegistry(); + + // initialize meta extensions + GroovySystem.getMetaClassRegistry().setMetaClassCreationHandle(new CustomMetaClassCreationHandle()); + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/internal/CountdownLatchConfig.java b/src/main/java/to/wetransform/hale/transformer/api/internal/CountdownLatchConfig.java new file mode 100644 index 0000000..480a203 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/internal/CountdownLatchConfig.java @@ -0,0 +1,20 @@ +package to.wetransform.hale.transformer.api.internal; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ComponentScan +public class CountdownLatchConfig { + @Value("${countdownLatch.waiting-time}") + private long waitingTime; + + public long getWaitingTime() { + return waitingTime; + } + + public void setWaitingTime(long waitingTime) { + this.waitingTime = waitingTime; + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java b/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java new file mode 100644 index 0000000..6b6c055 --- /dev/null +++ b/src/main/java/to/wetransform/hale/transformer/api/internal/CustomMetaClassCreationHandle.java @@ -0,0 +1,76 @@ +package to.wetransform.hale.transformer.api.internal; + +import java.lang.reflect.Constructor; + +import eu.esdihumboldt.util.groovy.meta.extension.MetaClassDescriptor; +import eu.esdihumboldt.util.groovy.meta.extension.MetaClassExtension; +import groovy.lang.MetaClass; +import groovy.lang.MetaClassRegistry; +import groovy.lang.MetaClassRegistry.MetaClassCreationHandle; +import org.eclipse.equinox.nonosgi.registry.RegistryFactoryHelper; + +/** + * Adapts created meta classes with delegating meta classes registered in the + * {@link MetaClassExtension}. + */ +public class CustomMetaClassCreationHandle extends MetaClassCreationHandle { + + private final MetaClassExtension ext; + + public CustomMetaClassCreationHandle() { + // initialize registry + RegistryFactoryHelper.getRegistry(); + + ext = new MetaClassExtension(); + } + + @Override + protected MetaClass createNormalMetaClass( + @SuppressWarnings("rawtypes") Class theClass, MetaClassRegistry registry) { + MetaClass metaClass = super.createNormalMetaClass(theClass, registry); + + for (MetaClassDescriptor descriptor : ext.getElements()) { + if (descriptorApplies(descriptor, theClass)) { + // create meta class + Class delegatingMetaClass = descriptor.getMetaClass(); + try { + Constructor constructor = delegatingMetaClass.getConstructor(MetaClass.class); + metaClass = (MetaClass) constructor.newInstance(metaClass); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + return metaClass; + } + + /** + * Check if a meta class descriptor applies to a given class. + * + * @param descriptor the meta class descriptor + * @param theClass the class for which should be determined if the descriptor + * applies + * @return true if the descriptor applies to the class, + * false otherwise + */ + private boolean descriptorApplies(MetaClassDescriptor descriptor, @SuppressWarnings("rawtypes") Class theClass) { + Class forClass = descriptor.getForClass(); + if (descriptor.isForArray()) { + if (theClass.isArray()) { + Class componentClass = theClass.getComponentType(); + if (componentClass != null) { + return forClass.equals(componentClass) || forClass.isAssignableFrom(componentClass); + } else { + // should actually not happen + return false; + } + } else { + // no array + return false; + } + } else { + return forClass.equals(theClass) || forClass.isAssignableFrom(theClass); + } + } +} diff --git a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java index 1f5d524..5e713ef 100644 --- a/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java +++ b/src/main/java/to/wetransform/hale/transformer/api/messaging/TransformationMessageConsumer.java @@ -7,9 +7,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import to.wetransform.hale.transformer.Transformer; import to.wetransform.hale.transformer.api.TransformerApiApplication; +import to.wetransform.hale.transformer.api.internal.CountdownLatchConfig; @Service public class TransformationMessageConsumer { @@ -22,23 +24,31 @@ public record TransformationMessage( private static final Logger LOG = LoggerFactory.getLogger(TransformationMessageConsumer.class); + private final CountdownLatchConfig countdownLatchConfig; + + @Autowired + public TransformationMessageConsumer(CountdownLatchConfig countdownLatchConfig) { + this.countdownLatchConfig = countdownLatchConfig; + } + @RabbitListener(queues = TransformerApiApplication.QUEUE_NAME) public void receiveMessage(final TransformationMessage message) { LOG.info("Received projectUrl = " + message.projectUrl + " sourceDataUrl = " + message.sourceDataUrl); // TODO Implement mechanism to only accept a message from the queue if no // transformation is currently running - if (message.projectUrl != null && message.sourceDataUrl() != null) { Transformer tx = new Transformer(); try { - tx.transform(); - tx.getLatch().await(10, TimeUnit.MINUTES); // TODO make configurable + LOG.info("Transformation started"); + tx.transform(message.sourceDataUrl(), message.projectUrl, null); + tx.getLatch().await(countdownLatchConfig.getWaitingTime(), TimeUnit.MINUTES); } catch (InterruptedException e) { // TODO What should be done when the transformation fails or times out? // - Simply requeuing the message is probably not helpful // - Send a message back so that the producer can react? + Thread.currentThread().interrupt(); LOG.error("Transformation process timed out: " + e.getMessage(), e); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8b13789..695bf6f 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1,2 @@ - +# Configuration for CountDownLatch waiting time in milliseconds +countdownLatch.waiting-time=10 \ No newline at end of file