From 5fd6b4b417bb3a75a3ab4b2acf240f5cc716a47d Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Fri, 10 Jan 2025 14:09:10 +0100 Subject: [PATCH] ci: fix flaky testcontainers tests --- .../edc/junit/extensions/EmbeddedRuntime.java | 48 +++++++- .../PostgresqlStoreSetupExtension.java | 6 + .../test/system/utils/LazySupplier.java | 43 +++++++ .../edc/test/e2e/AbstractDataPlaneTest.java | 3 +- .../e2e/ClusteredDataPlaneEndToEndTest.java | 25 ++--- .../e2e/participant/DataPlaneParticipant.java | 34 +++--- .../eclipse/edc/test/e2e/KafkaExtension.java | 86 ++++++++++++++ .../org/eclipse/edc/test/e2e/Runtimes.java | 6 +- .../test/e2e/TransferEndToEndParticipant.java | 87 ++++++++------- .../test/e2e/TransferPullEndToEndTest.java | 46 ++++++-- .../test/e2e/TransferPushEndToEndTest.java | 32 ++++-- .../e2e/TransferStreamingEndToEndTest.java | 105 +++++++++--------- 12 files changed, 369 insertions(+), 152 deletions(-) create mode 100644 extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/LazySupplier.java create mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/KafkaExtension.java diff --git a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java index 9bc313f7cfa..5fa49968688 100644 --- a/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java +++ b/core/common/junit/src/main/java/org/eclipse/edc/junit/extensions/EmbeddedRuntime.java @@ -27,7 +27,9 @@ import java.net.URL; import java.net.URLClassLoader; +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -35,6 +37,7 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static java.util.concurrent.TimeUnit.SECONDS; @@ -44,37 +47,63 @@ public class EmbeddedRuntime extends BaseRuntime { private final String name; - private final Map properties; private final LinkedHashMap, Object> serviceMocks = new LinkedHashMap<>(); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final MultiSourceServiceLocator serviceLocator; private final URL[] classPathEntries; private Future runtimeThread; private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final List> configurationProviders = new ArrayList<>(); + public EmbeddedRuntime(String name, String... additionalModules) { + this(new MultiSourceServiceLocator(), name, ClasspathReader.classpathFor(additionalModules)); + } + + public EmbeddedRuntime(String name, URL[] classpathEntries) { + this(new MultiSourceServiceLocator(), name, classpathEntries); + } + + /** + * Deprecated, configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily + * + * @deprecated configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily + */ + @Deprecated(since = "0.11.0") public EmbeddedRuntime(String name, Map properties, String... additionalModules) { this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules)); } + /** + * Deprecated, configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily + * + * @deprecated configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily + */ + @Deprecated(since = "0.11.0") public EmbeddedRuntime(String name, Map properties, URL[] classpathEntries) { this(new MultiSourceServiceLocator(), name, properties, classpathEntries); } - private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, Map properties, URL[] classPathEntries) { + private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, URL[] classPathEntries) { super(serviceLocator); this.serviceLocator = serviceLocator; this.name = name; - this.properties = properties; this.classPathEntries = classPathEntries; } + @Deprecated(since = "0.11.0") + private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, Map properties, URL[] classPathEntries) { + this(serviceLocator, name, classPathEntries); + this.configurationProviders.add(() -> ConfigFactory.fromMap(properties)); + } + @Override public void boot(boolean addShutdownHook) { var monitor = super.createMonitor(); monitor.info("Starting runtime %s".formatted(name)); - serviceLocator.registerSystemExtension(ConfigurationExtension.class, (ConfigurationExtension) () -> ConfigFactory.fromMap(properties)); + configurationProviders.forEach(provider -> serviceLocator + .registerSystemExtension(ConfigurationExtension.class, (ConfigurationExtension) provider::get)); var runtimeThrowable = new AtomicReference(); var latch = new CountDownLatch(1); @@ -155,4 +184,15 @@ public ServiceExtensionContext getContext() { public boolean isRunning() { return isRunning.get(); } + + /** + * Adds a configuration provider, that will be invoked during connector startup. + * + * @param configurationProvider the configuration provider. + * @return self. + */ + public EmbeddedRuntime configurationProvider(Supplier configurationProvider) { + configurationProviders.add(configurationProvider); + return this; + } } diff --git a/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlStoreSetupExtension.java b/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlStoreSetupExtension.java index f2ef93d6b75..d03e923d0d4 100644 --- a/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlStoreSetupExtension.java +++ b/extensions/common/sql/sql-test-fixtures/src/testFixtures/java/org/eclipse/edc/sql/testfixtures/PostgresqlStoreSetupExtension.java @@ -14,6 +14,8 @@ package org.eclipse.edc.sql.testfixtures; +import org.eclipse.edc.spi.system.configuration.Config; +import org.eclipse.edc.spi.system.configuration.ConfigFactory; import org.eclipse.edc.sql.DriverManagerConnectionFactory; import org.eclipse.edc.sql.QueryExecutor; import org.eclipse.edc.sql.SqlQueryExecutor; @@ -135,6 +137,10 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte return null; } + public Config getDatasourceConfig() { + return ConfigFactory.fromMap(getDatasourceConfiguration()); + } + public Map getDatasourceConfiguration() { return postgres.createDefaultDatasourceConfiguration(postgreSqlContainer.getDatabaseName()); } diff --git a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/LazySupplier.java b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/LazySupplier.java new file mode 100644 index 00000000000..6d31b9d96cf --- /dev/null +++ b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/LazySupplier.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.connector.controlplane.test.system.utils; + +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +/** + * Lazy implementation of the {@link Supplier} interface. + */ +public class LazySupplier implements Supplier { + + private final Supplier dataSupplier; + private final AtomicReference data = new AtomicReference<>(); + + public LazySupplier(Supplier dataSupplier) { + this.dataSupplier = dataSupplier; + } + + @Override + public T get() { + var currentValue = data.get(); + if (currentValue == null) { + var newValue = dataSupplier.get(); + data.compareAndExchange(null, newValue); + return newValue; + } + return currentValue; + } + +} diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java index 40542180bc5..527a0b4209b 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java @@ -33,9 +33,8 @@ public abstract class AbstractDataPlaneTest { protected static RuntimeExtension runtime = new RuntimePerClassExtension(new EmbeddedRuntime( "data-plane", - DATAPLANE.dataPlaneConfiguration(), ":system-tests:e2e-dataplane-tests:runtimes:data-plane" - )); + ).configurationProvider(DATAPLANE::dataPlaneConfig)); protected void seedVault() { var vault = runtime.getService(Vault.class); diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/ClusteredDataPlaneEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/ClusteredDataPlaneEndToEndTest.java index 67281facbe3..37f236c3299 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/ClusteredDataPlaneEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/ClusteredDataPlaneEndToEndTest.java @@ -20,6 +20,7 @@ import org.eclipse.edc.junit.extensions.EmbeddedRuntime; import org.eclipse.edc.junit.extensions.RuntimeExtension; import org.eclipse.edc.junit.extensions.RuntimePerClassExtension; +import org.eclipse.edc.spi.system.configuration.ConfigFactory; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; @@ -63,19 +64,17 @@ public class ClusteredDataPlaneEndToEndTest { .id("urn:connector:provider") .build(); - private static final BiFunction RUNTIME_SUPPLIER = (name, dataPlaneParticipant) -> { - var config = dataPlaneParticipant.dataPlaneConfiguration(); - config.put("edc.runtime.id", name); - config.put("edc.sql.schema.autocreate", "true"); - config.put("edc.core.retry.retries.max", "0"); - config.putAll(POSTGRESQL.getDatasourceConfiguration()); - return new EmbeddedRuntime( - name, - config, - ":system-tests:e2e-dataplane-tests:runtimes:data-plane", - ":dist:bom:dataplane-feature-sql-bom" - ); - }; + private static final BiFunction RUNTIME_SUPPLIER = + (name, dataPlaneParticipant) -> new EmbeddedRuntime( + name, + ":system-tests:e2e-dataplane-tests:runtimes:data-plane", + ":dist:bom:dataplane-feature-sql-bom") + .configurationProvider(dataPlaneParticipant::dataPlaneConfig) + .configurationProvider(POSTGRESQL::getDatasourceConfig) + .configurationProvider(() -> ConfigFactory.fromMap(Map.of( + "edc.runtime.id", name, + "edc.sql.schema.autocreate", "true" + ))); private static final EmbeddedRuntime FOO_RUNTIME = RUNTIME_SUPPLIER.apply("foo", FOO_DATAPLANE); private static final EmbeddedRuntime BAR_RUNTIME = RUNTIME_SUPPLIER.apply("bar", BAR_DATAPLANE); diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java index edecb88edee..91b6918e6f4 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java @@ -15,7 +15,10 @@ package org.eclipse.edc.test.e2e.participant; import com.fasterxml.jackson.annotation.JsonCreator; +import org.eclipse.edc.connector.controlplane.test.system.utils.LazySupplier; import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; +import org.eclipse.edc.spi.system.configuration.Config; +import org.eclipse.edc.spi.system.configuration.ConfigFactory; import org.jetbrains.annotations.NotNull; import java.net.URI; @@ -28,42 +31,42 @@ public class DataPlaneParticipant extends Participant { - private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); - private final URI dataPlaneDefault = URI.create("http://localhost:" + getFreePort()); - private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); - private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); - private final URI dataplanePublicResponse = dataPlanePublic.resolve("/public/responseChannel"); - private final String componentId = UUID.randomUUID().toString(); + private final LazySupplier dataPlaneControl = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/control")); + private final LazySupplier dataPlanePublic = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/public")); private DataPlaneParticipant() { super(); } public Endpoint getDataPlaneControlEndpoint() { - return new Endpoint(dataPlaneControl); + return new Endpoint(dataPlaneControl.get()); } public Endpoint getDataPlanePublicEndpoint() { - return new Endpoint(dataPlanePublic); + return new Endpoint(dataPlanePublic.get()); + } + + public Config dataPlaneConfig() { + return ConfigFactory.fromMap(dataPlaneConfiguration()); } public Map dataPlaneConfiguration() { return new HashMap<>() { { - put("edc.component.id", componentId); - put("web.http.port", String.valueOf(dataPlaneDefault.getPort())); + put("edc.component.id", UUID.randomUUID().toString()); + put("web.http.port", String.valueOf(getFreePort())); put("web.http.path", "/api"); - put("web.http.public.port", String.valueOf(dataPlanePublic.getPort())); + put("web.http.public.port", String.valueOf(dataPlanePublic.get().getPort())); put("web.http.public.path", "/public"); - put("web.http.control.port", String.valueOf(dataPlaneControl.getPort())); - put("web.http.control.path", dataPlaneControl.getPath()); + put("web.http.control.port", String.valueOf(dataPlaneControl.get().getPort())); + put("web.http.control.path", dataPlaneControl.get().getPath()); put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); put("edc.keystore.password", "123456"); - put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token"); put("edc.dataplane.http.sink.partition.size", "1"); put("edc.transfer.proxy.token.signer.privatekey.alias", "1"); put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); - put("edc.dataplane.api.public.response.baseurl", dataplanePublicResponse.toString()); + put("edc.dataplane.api.public.response.baseurl", dataPlanePublic.get().resolve("/public/responseChannel").toString()); + put("edc.core.retry.retries.max", "0"); } }; } @@ -92,4 +95,5 @@ public DataPlaneParticipant build() { return participant; } } + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/KafkaExtension.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/KafkaExtension.java new file mode 100644 index 00000000000..c8cfeba3f54 --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/KafkaExtension.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2025 Cofinity-X + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Cofinity-X - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; +import org.testcontainers.kafka.ConfluentKafkaContainer; +import org.testcontainers.utility.DockerImageName; + +import java.util.Properties; + +public class KafkaExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver { + + private final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.7.1")); + + @Override + public void beforeAll(ExtensionContext context) { + kafka.start(); + } + + @Override + public void afterAll(ExtensionContext context) { + kafka.stop(); + kafka.close(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + var type = parameterContext.getParameter().getParameterizedType(); + return type instanceof KafkaExtension; + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + var type = parameterContext.getParameter().getParameterizedType(); + if (type instanceof KafkaExtension) { + return this; + } + return null; + } + + public Producer createKafkaProducer() { + var props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + public Consumer createKafkaConsumer() { + var props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + return new KafkaConsumer<>(props); + } + + public String getBootstrapServers() { + return kafka.getBootstrapServers(); + } +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java index f7071f480a2..6804717310b 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java @@ -18,7 +18,6 @@ import org.eclipse.edc.junit.extensions.EmbeddedRuntime; import java.net.URL; -import java.util.Map; /** * Runtimes for E2E transfer test. @@ -66,10 +65,11 @@ public enum Runtimes { this.modules = modules; } - public EmbeddedRuntime create(String name, Map configuration) { + public EmbeddedRuntime create(String name) { if (classpathEntries == null) { classpathEntries = ClasspathReader.classpathFor(modules); } - return new EmbeddedRuntime(name, configuration, classpathEntries); + return new EmbeddedRuntime(name, classpathEntries); } + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java index 16314490662..b64085e48e8 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java @@ -16,8 +16,12 @@ import io.restassured.common.mapper.TypeRef; import org.assertj.core.api.ThrowingConsumer; +import org.eclipse.edc.connector.controlplane.test.system.utils.LazySupplier; import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; +import org.eclipse.edc.spi.system.configuration.Config; +import org.eclipse.edc.spi.system.configuration.ConfigFactory; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.util.io.Ports; import org.jetbrains.annotations.NotNull; import java.net.URI; @@ -34,37 +38,31 @@ public class TransferEndToEndParticipant extends Participant { - private final URI controlPlaneDefault = URI.create("http://localhost:" + getFreePort()); - private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); - private final URI dataPlaneDefault = URI.create("http://localhost:" + getFreePort()); - private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); - private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); - private final int httpProvisionerPort = getFreePort(); + private final LazySupplier controlPlaneControl = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/control")); + private final LazySupplier dataPlaneControl = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/control")); + private final LazySupplier dataPlanePublic = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/public")); + private final LazySupplier httpProvisionerPort = new LazySupplier<>(Ports::getFreePort); protected TransferEndToEndParticipant() { super(); } - public int getHttpProvisionerPort() { - return httpProvisionerPort; - } - - public Map controlPlaneConfiguration() { - return new HashMap<>() { + public Config controlPlaneConfig() { + var settings = new HashMap() { { put(PARTICIPANT_ID, id); - put("web.http.port", String.valueOf(controlPlaneDefault.getPort())); + put("web.http.port", String.valueOf(getFreePort())); put("web.http.path", "/api"); put("web.http.protocol.port", String.valueOf(protocolEndpoint.getUrl().getPort())); put("web.http.protocol.path", protocolEndpoint.getUrl().getPath()); put("web.http.management.port", String.valueOf(managementEndpoint.getUrl().getPort())); put("web.http.management.path", managementEndpoint.getUrl().getPath()); - put("web.http.control.port", String.valueOf(controlPlaneControl.getPort())); - put("web.http.control.path", controlPlaneControl.getPath()); + put("web.http.control.port", String.valueOf(controlPlaneControl.get().getPort())); + put("web.http.control.path", controlPlaneControl.get().getPath()); put("edc.dsp.callback.address", protocolEndpoint.getUrl().toString()); put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); put("edc.keystore.password", "123456"); - put("edc.transfer.proxy.endpoint", dataPlanePublic.toString()); + put("edc.transfer.proxy.endpoint", dataPlanePublic.get().toString()); put("edc.transfer.send.retry.limit", "1"); put("edc.transfer.send.retry.base-delay.ms", "100"); put("edc.negotiation.consumer.send.retry.limit", "1"); @@ -77,52 +75,50 @@ public Map controlPlaneConfiguration() { put("edc.transfer.state-machine.iteration-wait-millis", "50"); put("provisioner.http.entries.default.provisioner.type", "provider"); - put("provisioner.http.entries.default.endpoint", "http://localhost:%d/provision".formatted(httpProvisionerPort)); + put("provisioner.http.entries.default.endpoint", "http://localhost:%d/provision".formatted(httpProvisionerPort.get())); put("provisioner.http.entries.default.data.address.type", "HttpProvision"); } }; - } - public Map controlPlanePostgresConfiguration() { - var baseConfiguration = controlPlaneConfiguration(); - baseConfiguration.putAll(defaultDatasourceConfiguration(getName())); - baseConfiguration.put("edc.sql.schema.autocreate", "true"); - return baseConfiguration; + return ConfigFactory.fromMap(settings); } - public Map dataPlaneConfiguration() { - return new HashMap<>() { + public Config dataPlaneConfig() { + var settings = new HashMap() { { - put("web.http.port", String.valueOf(dataPlaneDefault.getPort())); + put("web.http.port", String.valueOf(getFreePort())); put("web.http.path", "/api"); - put("web.http.public.port", String.valueOf(dataPlanePublic.getPort())); + put("web.http.public.port", String.valueOf(dataPlanePublic.get().getPort())); put("web.http.public.path", "/public"); - put("web.http.control.port", String.valueOf(dataPlaneControl.getPort())); - put("web.http.control.path", dataPlaneControl.getPath()); + put("web.http.control.port", String.valueOf(dataPlaneControl.get().getPort())); + put("web.http.control.path", dataPlaneControl.get().getPath()); put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); put("edc.keystore.password", "123456"); - put("edc.dataplane.api.public.baseurl", dataPlanePublic + "/v2/"); - put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token"); + put("edc.dataplane.api.public.baseurl", dataPlanePublic.get() + "/v2/"); put("edc.transfer.proxy.token.signer.privatekey.alias", "private-key"); put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); put("edc.dataplane.http.sink.partition.size", "1"); put("edc.dataplane.state-machine.iteration-wait-millis", "50"); - put("edc.dpf.selector.url", controlPlaneControl + "/v1/dataplanes"); + put("edc.dpf.selector.url", controlPlaneControl.get() + "/v1/dataplanes"); } }; + return ConfigFactory.fromMap(settings); } - public Map controlPlaneEmbeddedDataPlaneConfiguration() { - var cfg = dataPlaneConfiguration(); - cfg.putAll(controlPlaneConfiguration()); - return cfg; + public Config controlPlanePostgresConfig() { + return controlPlaneConfig().merge(postgresConfig()); } - public Map dataPlanePostgresConfiguration() { - var baseConfiguration = dataPlaneConfiguration(); - baseConfiguration.putAll(defaultDatasourceConfiguration(getName())); - baseConfiguration.put("edc.sql.schema.autocreate", "true"); - return baseConfiguration; + public Config dataPlanePostgresConfig() { + return dataPlaneConfig().merge(postgresConfig()); + } + + public Config controlPlaneEmbeddedDataPlaneConfig() { + return controlPlaneConfig().merge(dataPlaneConfig()); + } + + public int getHttpProvisionerPort() { + return httpProvisionerPort.get(); } /** @@ -147,7 +143,6 @@ public DataAddress getEdr(String transferProcessId) { var builder = DataAddress.Builder.newInstance(); dataAddressRaw.forEach(builder::property); return builder.build(); - } /** @@ -177,6 +172,13 @@ private String resourceAbsolutePath(String filename) { return System.getProperty("user.dir") + separator + "build" + separator + "resources" + separator + "test" + separator + filename; } + private Config postgresConfig() { + var settings = new HashMap(); + settings.put("edc.sql.schema.autocreate", "true"); + settings.putAll(defaultDatasourceConfiguration(getName())); + return ConfigFactory.fromMap(settings); + } + public static class Builder extends Participant.Builder { protected Builder() { @@ -195,4 +197,5 @@ public TransferEndToEndParticipant build() { return participant; } } + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index ad6a6629402..021db41a1ee 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -385,15 +385,21 @@ class InMemory extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlaneConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlaneConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane") + .configurationProvider(PROVIDER::dataPlaneConfig) + ); @Override protected Vault getDataplaneVault() { @@ -408,13 +414,21 @@ class InMemoryV2024Rev1 extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlaneConfig) + ); + @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlaneConfig) + ); + @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane") + .configurationProvider(PROVIDER::dataPlaneConfig) + ); // TODO: replace with something better. Temporary hack @BeforeAll @@ -439,11 +453,15 @@ class EmbeddedDataPlane extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneEmbeddedDataPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlaneEmbeddedDataPlaneConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlaneEmbeddedDataPlaneConfig) + ); @Override protected Vault getDataplaneVault() { @@ -464,15 +482,21 @@ class Postgres extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlanePostgresConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlanePostgresConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration())); + Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane") + .configurationProvider(PROVIDER::dataPlanePostgresConfig) + ); @Override protected Vault getDataplaneVault() { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java index ae400fe707d..e88bf768715 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java @@ -132,15 +132,21 @@ class InMemory extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlaneConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlaneConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane") + .configurationProvider(PROVIDER::dataPlaneConfig) + ); @Override protected Vault getDataplaneVault() { @@ -154,11 +160,15 @@ class EmbeddedDataPlane extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlaneConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE_EMBEDDED_DATA_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlaneEmbeddedDataPlaneConfig) + ); @Override protected Vault getDataplaneVault() { @@ -179,15 +189,21 @@ class Postgres extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlanePostgresConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlanePostgresConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration())); + Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane") + .configurationProvider(PROVIDER::dataPlanePostgresConfig) + ); @Override protected Vault getDataplaneVault() { diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index d4c48988aab..d88134570e7 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -18,14 +18,7 @@ import jakarta.json.Json; import jakarta.json.JsonObject; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; import org.eclipse.edc.junit.extensions.EmbeddedRuntime; @@ -38,15 +31,11 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.RegisterExtension; -import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; -import org.testcontainers.kafka.ConfluentKafkaContainer; -import org.testcontainers.utility.DockerImageName; import java.time.Duration; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.UUID; import javax.validation.constraints.NotNull; @@ -74,29 +63,41 @@ public class TransferStreamingEndToEndTest { - private static final DockerImageName KAFKA_CONTAINER_VERSION = DockerImageName.parse("confluentinc/cp-kafka:7.7.1"); - @Nested @EndToEndTest class InMemory extends Tests { + @Order(0) + @RegisterExtension + static final KafkaExtension KAFKA_EXTENSION = new KafkaExtension(); + @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlaneConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlaneConfiguration())); + Runtimes.IN_MEMORY_CONTROL_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlaneConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension( - Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlaneConfiguration())); + Runtimes.IN_MEMORY_DATA_PLANE.create("provider-data-plane") + .configurationProvider(PROVIDER::dataPlaneConfig) + ); @Override protected Vault getDataplaneVault() { return PROVIDER_DATA_PLANE.getService(Vault.class); } + @Override + protected KafkaExtension getKafkaExtension() { + return KAFKA_EXTENSION; + } } @Nested @@ -105,6 +106,10 @@ class Postgres extends Tests { @Order(0) @RegisterExtension + static final KafkaExtension KAFKA_EXTENSION = new KafkaExtension(); + + @Order(1) + @RegisterExtension static final BeforeAllCallback CREATE_DATABASES = context -> { createDatabase(CONSUMER.getName()); createDatabase(PROVIDER.getName()); @@ -112,13 +117,18 @@ class Postgres extends Tests { @RegisterExtension static final RuntimeExtension CONSUMER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("consumer-control-plane") + .configurationProvider(CONSUMER::controlPlanePostgresConfig) + ); @RegisterExtension static final RuntimeExtension PROVIDER_CONTROL_PLANE = new RuntimePerClassExtension( - Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration())); + Runtimes.POSTGRES_CONTROL_PLANE.create("provider-control-plane") + .configurationProvider(PROVIDER::controlPlanePostgresConfig) + ); - private static final EmbeddedRuntime PROVIDER_DATA_PLANE_RUNTIME = Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration()); + private static final EmbeddedRuntime PROVIDER_DATA_PLANE_RUNTIME = Runtimes.POSTGRES_DATA_PLANE.create("provider-data-plane") + .configurationProvider(PROVIDER::dataPlanePostgresConfig); @RegisterExtension static final RuntimeExtension PROVIDER_DATA_PLANE = new RuntimePerClassExtension(PROVIDER_DATA_PLANE_RUNTIME); @@ -128,16 +138,21 @@ protected Vault getDataplaneVault() { return PROVIDER_DATA_PLANE.getService(Vault.class); } + @Override + protected KafkaExtension getKafkaExtension() { + return KAFKA_EXTENSION; + } + @Test void shouldResumeTransfer_whenDataPlaneRestarts() { - try (var consumer = createKafkaConsumer()) { + try (var consumer = getKafkaExtension().createKafkaConsumer()) { consumer.subscribe(List.of(sinkTopic)); var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, kafkaSourceProperty()); + createResourcesOnProvider(assetId, kafkaSourceProperty(getKafkaExtension().getBootstrapServers())); var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); + .withDestination(kafkaSink(getKafkaExtension().getBootstrapServers())).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); PROVIDER_DATA_PLANE_RUNTIME.shutdown(); @@ -155,15 +170,14 @@ void shouldResumeTransfer_whenDataPlaneRestarts() { @Testcontainers abstract static class Tests extends TransferEndToEndTestBase { - @Container - private static final ConfluentKafkaContainer KAFKA = new ConfluentKafkaContainer(KAFKA_CONTAINER_VERSION); - private final String sourceTopic = "source_topic_" + UUID.randomUUID(); protected final String sinkTopic = "sink_topic_" + UUID.randomUUID(); + protected abstract KafkaExtension getKafkaExtension(); + @BeforeEach void setUp() { - var producer = createKafkaProducer(); + var producer = getKafkaExtension().createKafkaProducer(); newSingleThreadScheduledExecutor().scheduleAtFixedRate( () -> producer.send(new ProducerRecord<>(sourceTopic, sampleMessage())), @@ -179,7 +193,7 @@ void kafkaToHttpTransfer() { destinationServer.when(request).respond(response()); var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, contractExpiresIn("10s"), kafkaSourceProperty()); + createResourcesOnProvider(assetId, contractExpiresIn("10s"), kafkaSourceProperty(getKafkaExtension().getBootstrapServers())); var destination = httpSink(destinationServer.getLocalPort(), "/api/service"); var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) @@ -208,14 +222,14 @@ void kafkaToHttpTransfer() { @Test void kafkaToKafkaTransfer() { - try (var consumer = createKafkaConsumer()) { + try (var consumer = getKafkaExtension().createKafkaConsumer()) { consumer.subscribe(List.of(sinkTopic)); var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, contractExpiresIn("10s"), kafkaSourceProperty()); + createResourcesOnProvider(assetId, contractExpiresIn("10s"), kafkaSourceProperty(getKafkaExtension().getBootstrapServers())); var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); + .withDestination(kafkaSink(getKafkaExtension().getBootstrapServers())).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); CONSUMER.awaitTransferToBeInState(transferProcessId, TERMINATED); @@ -225,14 +239,14 @@ void kafkaToKafkaTransfer() { @Test void shouldSuspendAndResumeTransfer() { - try (var consumer = createKafkaConsumer()) { + try (var consumer = getKafkaExtension().createKafkaConsumer()) { consumer.subscribe(List.of(sinkTopic)); var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, kafkaSourceProperty()); + createResourcesOnProvider(assetId, kafkaSourceProperty(getKafkaExtension().getBootstrapServers())); var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withDestination(kafkaSink()).withTransferType("Kafka-PUSH").execute(); + .withDestination(kafkaSink(getKafkaExtension().getBootstrapServers())).withTransferType("Kafka-PUSH").execute(); assertMessagesAreSentTo(consumer); CONSUMER.suspendTransfer(transferProcessId, "any kind of reason"); @@ -274,45 +288,28 @@ private JsonObject httpSink(Integer port, String path) { } @NotNull - protected JsonObject kafkaSink() { + protected JsonObject kafkaSink(String bootstrapServers) { return Json.createObjectBuilder() .add(TYPE, EDC_NAMESPACE + "DataAddress") .add(EDC_NAMESPACE + "type", "Kafka") .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() .add(EDC_NAMESPACE + "topic", sinkTopic) - .add(EDC_NAMESPACE + kafkaProperty("bootstrap.servers"), KAFKA.getBootstrapServers()) + .add(EDC_NAMESPACE + kafkaProperty("bootstrap.servers"), bootstrapServers) .build()) .build(); } @NotNull - protected Map kafkaSourceProperty() { + protected Map kafkaSourceProperty(String bootstrapServers) { return Map.of( "name", "data", "type", "Kafka", "topic", sourceTopic, - kafkaProperty("bootstrap.servers"), KAFKA.getBootstrapServers(), + kafkaProperty("bootstrap.servers"), bootstrapServers, kafkaProperty("max.poll.records"), "100" ); } - protected Consumer createKafkaConsumer() { - var props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - return new KafkaConsumer<>(props); - } - - private Producer createKafkaProducer() { - var props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - return new KafkaProducer<>(props); - } - private String kafkaProperty(String property) { return "kafka." + property; }