Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: fix flaky testcontainers tests #4712

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@

import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static java.util.concurrent.TimeUnit.SECONDS;

Expand All @@ -44,37 +47,63 @@
public class EmbeddedRuntime extends BaseRuntime {

private final String name;
private final Map<String, String> properties;
private final LinkedHashMap<Class<?>, Object> serviceMocks = new LinkedHashMap<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final MultiSourceServiceLocator serviceLocator;
private final URL[] classPathEntries;
private Future<?> runtimeThread;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private final List<Supplier<Config>> configurationProviders = new ArrayList<>();

public EmbeddedRuntime(String name, String... additionalModules) {
this(new MultiSourceServiceLocator(), name, ClasspathReader.classpathFor(additionalModules));
}

public EmbeddedRuntime(String name, URL[] classpathEntries) {
this(new MultiSourceServiceLocator(), name, classpathEntries);
}

/**
* Deprecated, configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily
*
* @deprecated configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily
*/
@Deprecated(since = "0.11.0")
public EmbeddedRuntime(String name, Map<String, String> properties, String... additionalModules) {
this(new MultiSourceServiceLocator(), name, properties, ClasspathReader.classpathFor(additionalModules));
}

/**
* Deprecated, configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily
*
* @deprecated configuration properties should be passed through {@link #configurationProvider(Supplier)} to being evaluated lazily
*/
@Deprecated(since = "0.11.0")
public EmbeddedRuntime(String name, Map<String, String> properties, URL[] classpathEntries) {
this(new MultiSourceServiceLocator(), name, properties, classpathEntries);
}

private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, Map<String, String> properties, URL[] classPathEntries) {
private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, URL[] classPathEntries) {
super(serviceLocator);
this.serviceLocator = serviceLocator;
this.name = name;
this.properties = properties;
this.classPathEntries = classPathEntries;
}

@Deprecated(since = "0.11.0")
private EmbeddedRuntime(MultiSourceServiceLocator serviceLocator, String name, Map<String, String> properties, URL[] classPathEntries) {
this(serviceLocator, name, classPathEntries);
this.configurationProviders.add(() -> ConfigFactory.fromMap(properties));
}

@Override
public void boot(boolean addShutdownHook) {
var monitor = super.createMonitor();

monitor.info("Starting runtime %s".formatted(name));

serviceLocator.registerSystemExtension(ConfigurationExtension.class, (ConfigurationExtension) () -> ConfigFactory.fromMap(properties));
configurationProviders.forEach(provider -> serviceLocator
.registerSystemExtension(ConfigurationExtension.class, (ConfigurationExtension) provider::get));

var runtimeThrowable = new AtomicReference<Throwable>();
var latch = new CountDownLatch(1);
Expand Down Expand Up @@ -155,4 +184,15 @@ public ServiceExtensionContext getContext() {
public boolean isRunning() {
return isRunning.get();
}

/**
* Adds a configuration provider, that will be invoked during connector startup.
*
* @param configurationProvider the configuration provider.
* @return self.
*/
public EmbeddedRuntime configurationProvider(Supplier<Config> configurationProvider) {
configurationProviders.add(configurationProvider);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.eclipse.edc.sql.testfixtures;

import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.sql.DriverManagerConnectionFactory;
import org.eclipse.edc.sql.QueryExecutor;
import org.eclipse.edc.sql.SqlQueryExecutor;
Expand Down Expand Up @@ -135,6 +137,10 @@ public Object resolveParameter(ParameterContext parameterContext, ExtensionConte
return null;
}

public Config getDatasourceConfig() {
return ConfigFactory.fromMap(getDatasourceConfiguration());
}

public Map<String, String> getDatasourceConfiguration() {
return postgres.createDefaultDatasourceConfiguration(postgreSqlContainer.getDatabaseName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2025 Cofinity-X
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Cofinity-X - initial API and implementation
*
*/

package org.eclipse.edc.connector.controlplane.test.system.utils;

import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Lazy implementation of the {@link Supplier} interface.
*/
public class LazySupplier<T> implements Supplier<T> {

private final Supplier<T> dataSupplier;
private final AtomicReference<T> data = new AtomicReference<>();

public LazySupplier(Supplier<T> dataSupplier) {
this.dataSupplier = dataSupplier;
}

@Override
public T get() {
var currentValue = data.get();
if (currentValue == null) {
var newValue = dataSupplier.get();
data.compareAndExchange(null, newValue);
return newValue;
}
return currentValue;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ public abstract class AbstractDataPlaneTest {
protected static RuntimeExtension runtime =
new RuntimePerClassExtension(new EmbeddedRuntime(
"data-plane",
DATAPLANE.dataPlaneConfiguration(),
":system-tests:e2e-dataplane-tests:runtimes:data-plane"
));
).configurationProvider(DATAPLANE::dataPlaneConfig));

protected void seedVault() {
var vault = runtime.getService(Vault.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.junit.extensions.EmbeddedRuntime;
import org.eclipse.edc.junit.extensions.RuntimeExtension;
import org.eclipse.edc.junit.extensions.RuntimePerClassExtension;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
Expand Down Expand Up @@ -63,19 +64,17 @@ public class ClusteredDataPlaneEndToEndTest {
.id("urn:connector:provider")
.build();

private static final BiFunction<String, DataPlaneParticipant, EmbeddedRuntime> RUNTIME_SUPPLIER = (name, dataPlaneParticipant) -> {
var config = dataPlaneParticipant.dataPlaneConfiguration();
config.put("edc.runtime.id", name);
config.put("edc.sql.schema.autocreate", "true");
config.put("edc.core.retry.retries.max", "0");
config.putAll(POSTGRESQL.getDatasourceConfiguration());
return new EmbeddedRuntime(
name,
config,
":system-tests:e2e-dataplane-tests:runtimes:data-plane",
":dist:bom:dataplane-feature-sql-bom"
);
};
private static final BiFunction<String, DataPlaneParticipant, EmbeddedRuntime> RUNTIME_SUPPLIER =
(name, dataPlaneParticipant) -> new EmbeddedRuntime(
name,
":system-tests:e2e-dataplane-tests:runtimes:data-plane",
":dist:bom:dataplane-feature-sql-bom")
.configurationProvider(dataPlaneParticipant::dataPlaneConfig)
.configurationProvider(POSTGRESQL::getDatasourceConfig)
.configurationProvider(() -> ConfigFactory.fromMap(Map.of(
"edc.runtime.id", name,
"edc.sql.schema.autocreate", "true"
)));

private static final EmbeddedRuntime FOO_RUNTIME = RUNTIME_SUPPLIER.apply("foo", FOO_DATAPLANE);
private static final EmbeddedRuntime BAR_RUNTIME = RUNTIME_SUPPLIER.apply("bar", BAR_DATAPLANE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package org.eclipse.edc.test.e2e.participant;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.eclipse.edc.connector.controlplane.test.system.utils.LazySupplier;
import org.eclipse.edc.connector.controlplane.test.system.utils.Participant;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
Expand All @@ -28,42 +31,42 @@

public class DataPlaneParticipant extends Participant {

private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control");
private final URI dataPlaneDefault = URI.create("http://localhost:" + getFreePort());
private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control");
private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public");
private final URI dataplanePublicResponse = dataPlanePublic.resolve("/public/responseChannel");
private final String componentId = UUID.randomUUID().toString();
private final LazySupplier<URI> dataPlaneControl = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/control"));
private final LazySupplier<URI> dataPlanePublic = new LazySupplier<>(() -> URI.create("http://localhost:" + getFreePort() + "/public"));

private DataPlaneParticipant() {
super();
}

public Endpoint getDataPlaneControlEndpoint() {
return new Endpoint(dataPlaneControl);
return new Endpoint(dataPlaneControl.get());
}

public Endpoint getDataPlanePublicEndpoint() {
return new Endpoint(dataPlanePublic);
return new Endpoint(dataPlanePublic.get());
}

public Config dataPlaneConfig() {
return ConfigFactory.fromMap(dataPlaneConfiguration());
}

public Map<String, String> dataPlaneConfiguration() {
return new HashMap<>() {
{
put("edc.component.id", componentId);
put("web.http.port", String.valueOf(dataPlaneDefault.getPort()));
put("edc.component.id", UUID.randomUUID().toString());
put("web.http.port", String.valueOf(getFreePort()));
put("web.http.path", "/api");
put("web.http.public.port", String.valueOf(dataPlanePublic.getPort()));
put("web.http.public.port", String.valueOf(dataPlanePublic.get().getPort()));
put("web.http.public.path", "/public");
put("web.http.control.port", String.valueOf(dataPlaneControl.getPort()));
put("web.http.control.path", dataPlaneControl.getPath());
put("web.http.control.port", String.valueOf(dataPlaneControl.get().getPort()));
put("web.http.control.path", dataPlaneControl.get().getPath());
put("edc.keystore", resourceAbsolutePath("certs/cert.pfx"));
put("edc.keystore.password", "123456");
put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token");
put("edc.dataplane.http.sink.partition.size", "1");
put("edc.transfer.proxy.token.signer.privatekey.alias", "1");
put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key");
put("edc.dataplane.api.public.response.baseurl", dataplanePublicResponse.toString());
put("edc.dataplane.api.public.response.baseurl", dataPlanePublic.get().resolve("/public/responseChannel").toString());
put("edc.core.retry.retries.max", "0");
}
};
}
Expand Down Expand Up @@ -92,4 +95,5 @@ public DataPlaneParticipant build() {
return participant;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2025 Cofinity-X
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Cofinity-X - initial API and implementation
*
*/

package org.eclipse.edc.test.e2e;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ParameterContext;
import org.junit.jupiter.api.extension.ParameterResolutionException;
import org.junit.jupiter.api.extension.ParameterResolver;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Properties;

public class KafkaExtension implements BeforeAllCallback, AfterAllCallback, ParameterResolver {

private final ConfluentKafkaContainer kafka = new ConfluentKafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.7.1"));

@Override
public void beforeAll(ExtensionContext context) {
kafka.start();
}

@Override
public void afterAll(ExtensionContext context) {
kafka.stop();
kafka.close();
}

@Override
public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
var type = parameterContext.getParameter().getParameterizedType();
return type instanceof KafkaExtension;
}

@Override
public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException {
var type = parameterContext.getParameter().getParameterizedType();
if (type instanceof KafkaExtension) {
return this;
}
return null;
}

public Producer<String, String> createKafkaProducer() {
var props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}

public Consumer<String, String> createKafkaConsumer() {
var props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}

public String getBootstrapServers() {
return kafka.getBootstrapServers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.eclipse.edc.junit.extensions.EmbeddedRuntime;

import java.net.URL;
import java.util.Map;

/**
* Runtimes for E2E transfer test.
Expand Down Expand Up @@ -66,10 +65,11 @@ public enum Runtimes {
this.modules = modules;
}

public EmbeddedRuntime create(String name, Map<String, String> configuration) {
public EmbeddedRuntime create(String name) {
if (classpathEntries == null) {
classpathEntries = ClasspathReader.classpathFor(modules);
}
return new EmbeddedRuntime(name, configuration, classpathEntries);
return new EmbeddedRuntime(name, classpathEntries);
}

}
Loading
Loading