From 49993d5cc5bb4926f80a8b055344a91a25853512 Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Wed, 17 Apr 2024 11:52:59 +0200 Subject: [PATCH] refactor: switch e2e tests to data plane signaling (#4116) * refactor: switch E2E tests to data plane signaling * cleanup runtimes * dependencies --- DEPENDENCIES | 2 +- .../test/system/utils/Participant.java | 3 +- .../org/eclipse/edc/test/e2e/Runtimes.java | 95 ++++--- ....java => TransferEndToEndParticipant.java} | 191 +++++-------- .../test/e2e/TransferEndToEndTestBase.java | 20 +- .../test/e2e/TransferPullEndToEndTest.java | 248 ++++++++++++---- .../test/e2e/TransferPushEndToEndTest.java | 91 +++--- .../e2e/TransferStreamingEndToEndTest.java | 12 +- .../e2e/annotations/KafkaIntegrationTest.java | 33 --- .../EndToEndTransferParticipant.java | 44 --- .../e2e/participant/SignalingParticipant.java | 92 ------ .../EmbeddedDataPlaneSignalingRuntimes.java | 82 ------ .../signaling/InMemorySignalingRuntimes.java | 84 ------ .../signaling/PostgresSignalingRuntimes.java | 87 ------ .../signaling/SignalingEndToEndTestBase.java | 48 ---- .../TransferSignalingPullEndToEndTest.java | 264 ------------------ .../TransferSignalingPushEndToEndTest.java | 110 -------- 17 files changed, 387 insertions(+), 1119 deletions(-) rename system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/{participant/BaseEndToEndParticipant.java => TransferEndToEndParticipant.java} (72%) delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/annotations/KafkaIntegrationTest.java delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/SignalingParticipant.java delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedDataPlaneSignalingRuntimes.java delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/InMemorySignalingRuntimes.java delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/PostgresSignalingRuntimes.java delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTestBase.java delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingPullEndToEndTest.java delete mode 100644 system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingPushEndToEndTest.java diff --git a/DEPENDENCIES b/DEPENDENCIES index cb32afcc7e8..1e4e2ca6c67 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -77,7 +77,7 @@ maven/mavencentral/com.lmax/disruptor/3.4.4, Apache-2.0, approved, clearlydefine maven/mavencentral/com.networknt/json-schema-validator/1.0.76, Apache-2.0, approved, CQ22638 maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.28, Apache-2.0, approved, clearlydefined maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.37.3, Apache-2.0, approved, #11701 -maven/mavencentral/com.puppycrawl.tools/checkstyle/10.15.0, , restricted, clearlydefined +maven/mavencentral/com.puppycrawl.tools/checkstyle/10.15.0, LGPL-2.1-or-later, restricted, clearlydefined maven/mavencentral/com.samskivert/jmustache/1.15, BSD-2-Clause, approved, clearlydefined maven/mavencentral/com.squareup.okhttp3/okhttp-dnsoverhttps/4.12.0, Apache-2.0, approved, #11159 maven/mavencentral/com.squareup.okhttp3/okhttp/4.12.0, Apache-2.0, approved, #11156 diff --git a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java index 21987e3773d..fed38071335 100644 --- a/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java +++ b/extensions/control-plane/api/management-api/management-api-test-fixtures/src/testFixtures/java/org/eclipse/edc/connector/controlplane/test/system/utils/Participant.java @@ -299,7 +299,6 @@ public String initContractNegotiation(Participant provider, JsonObject policy) { var requestBody = createObjectBuilder() .add(CONTEXT, createObjectBuilder().add(VOCAB, EDC_NAMESPACE)) .add(TYPE, "ContractRequest") - .add("providerId", provider.id) .add("counterPartyAddress", provider.protocolEndpoint.getUrl().toString()) .add("protocol", protocol) .add("policy", jsonLd.compact(policy).getContent()) @@ -437,7 +436,9 @@ public JsonArray getTransferProcesses(JsonObject query) { * @param privateProperties private properties of the data request * @param destination data destination * @return transfer process id. + * @deprecated transferType will become mandatory, please use {@link #requestAsset(Participant, String, JsonObject, JsonObject, String)} */ + @Deprecated(since = "0.6.1") public String requestAsset(Participant provider, String assetId, JsonObject privateProperties, JsonObject destination) { return requestAsset(provider, assetId, privateProperties, destination, null); } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java index a74c9c6082a..8d4253f4c8c 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/Runtimes.java @@ -20,55 +20,72 @@ public interface Runtimes { - static EdcRuntimeExtension controlPlane(String name, Map configuration) { - return new EdcRuntimeExtension(name, configuration, - ":system-tests:e2e-transfer-test:control-plane", - ":extensions:control-plane:transfer:transfer-data-plane", - ":extensions:data-plane:data-plane-client" - ); - } + interface InMemory { - static EdcRuntimeExtension controlPlaneSignaling(String name, Map configuration) { - return new EdcRuntimeExtension(name, configuration, + static EdcRuntimeExtension controlPlane(String name, Map configuration) { + return new EdcRuntimeExtension(name, configuration, + ":system-tests:e2e-transfer-test:control-plane", + ":core:common:edr-store-core", + ":extensions:control-plane:transfer:transfer-data-plane-signaling", + ":extensions:control-plane:api:management-api:edr-cache-api", + ":extensions:control-plane:edr:edr-store-receiver", + ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client", + ":extensions:control-plane:callback:callback-event-dispatcher", + ":extensions:control-plane:callback:callback-http-dispatcher" + ); + } + + static EdcRuntimeExtension controlPlaneEmbeddedDataPlane(String name, Map configuration) { + return new EdcRuntimeExtension(name, configuration, ":system-tests:e2e-transfer-test:control-plane", ":extensions:control-plane:transfer:transfer-data-plane-signaling", - ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client" - ); - } - - static EdcRuntimeExtension dataPlane(String name, Map configuration) { - return new EdcRuntimeExtension(name, configuration, ":system-tests:e2e-transfer-test:data-plane", - ":extensions:data-plane:data-plane-public-api" - ); - } + ":extensions:data-plane:data-plane-public-api-v2" + ); + } - static EdcRuntimeExtension backendService(String name, Map configuration) { - return new EdcRuntimeExtension(name, configuration, - ":system-tests:e2e-transfer-test:backend-service" - ); + static EdcRuntimeExtension dataPlane(String name, Map configuration) { + return new EdcRuntimeExtension(name, configuration, + ":system-tests:e2e-transfer-test:data-plane", + ":extensions:data-plane:data-plane-public-api-v2" + ); + } } - static EdcRuntimeExtension postgresControlPlane(String name, Map configuration) { - return new EdcRuntimeExtension(name, configuration, - ":system-tests:e2e-transfer-test:control-plane", - ":extensions:control-plane:transfer:transfer-data-plane", - ":extensions:data-plane:data-plane-client", - ":extensions:control-plane:store:sql:control-plane-sql", - ":extensions:common:sql:sql-pool:sql-pool-apache-commons", - ":extensions:common:transaction:transaction-local", - ":extensions:common:api:management-api-configuration", - ":extensions:policy-monitor:store:sql:policy-monitor-store-sql" - ); + interface Postgres { + + static EdcRuntimeExtension controlPlane(String name, Map configuration) { + return new EdcRuntimeExtension(name, configuration, + ":system-tests:e2e-transfer-test:control-plane", + ":core:common:edr-store-core", + ":extensions:common:store:sql:edr-index-sql", + ":extensions:common:sql:sql-pool:sql-pool-apache-commons", + ":extensions:common:transaction:transaction-local", + ":extensions:control-plane:transfer:transfer-data-plane-signaling", + ":extensions:control-plane:api:management-api:edr-cache-api", + ":extensions:control-plane:edr:edr-store-receiver", + ":extensions:control-plane:store:sql:control-plane-sql", + ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client", + ":extensions:control-plane:callback:callback-event-dispatcher", + ":extensions:control-plane:callback:callback-http-dispatcher" + ); + } + + static EdcRuntimeExtension dataPlane(String name, Map configuration) { + return new EdcRuntimeExtension(name, configuration, + ":system-tests:e2e-transfer-test:data-plane", + ":extensions:data-plane:store:sql:data-plane-store-sql", + ":extensions:common:sql:sql-pool:sql-pool-apache-commons", + ":extensions:common:transaction:transaction-local", + ":extensions:data-plane:data-plane-public-api-v2" + ); + } + } - static EdcRuntimeExtension postgresDataPlane(String name, Map configuration) { + static EdcRuntimeExtension backendService(String name, Map configuration) { return new EdcRuntimeExtension(name, configuration, - ":system-tests:e2e-transfer-test:data-plane", - ":extensions:data-plane:data-plane-public-api", - ":extensions:data-plane:store:sql:data-plane-store-sql", - ":extensions:common:sql:sql-pool:sql-pool-apache-commons", - ":extensions:common:transaction:transaction-local" + ":system-tests:e2e-transfer-test:backend-service" ); } } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/BaseEndToEndParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java similarity index 72% rename from system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/BaseEndToEndParticipant.java rename to system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java index ca49b2998bf..0dea2238bcd 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/BaseEndToEndParticipant.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndParticipant.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -12,31 +12,27 @@ * */ -package org.eclipse.edc.test.e2e.participant; +package org.eclipse.edc.test.e2e; import io.restassured.common.mapper.TypeRef; import jakarta.json.Json; import jakarta.json.JsonObject; import org.eclipse.edc.connector.controlplane.test.system.utils.Participant; -import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.edc.spi.types.domain.DataAddress; import org.hamcrest.Matcher; import org.jetbrains.annotations.NotNull; import java.net.URI; -import java.time.Duration; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import static io.restassured.RestAssured.given; import static io.restassured.http.ContentType.JSON; import static jakarta.json.Json.createArrayBuilder; import static jakarta.json.Json.createObjectBuilder; import static java.io.File.separator; -import static org.awaitility.Awaitility.await; import static org.eclipse.edc.boot.BootServicesExtension.PARTICIPANT_ID; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.ID; @@ -45,9 +41,7 @@ import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.defaultDatasourceConfiguration; import static org.eclipse.edc.util.io.Ports.getFreePort; -public class BaseEndToEndParticipant extends Participant { - - private final Duration timeout = Duration.ofSeconds(30); +public class TransferEndToEndParticipant extends Participant { private final URI controlPlaneDefault = URI.create("http://localhost:" + getFreePort()); private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); @@ -57,7 +51,7 @@ public class BaseEndToEndParticipant extends Participant { private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); private final URI backendService = URI.create("http://localhost:" + getFreePort()); - protected BaseEndToEndParticipant() { + protected TransferEndToEndParticipant() { super(); } @@ -72,94 +66,10 @@ public JsonObject dynamicReceiverPrivateProperties() { .build(); } - /** - * Get the latest EDR received by the backend service. - * - * @param id EDR id - * @return endpoint data reference. - */ - public EndpointDataReference getDataReference(String id) { - var dataReference = new AtomicReference(); - - await().atMost(timeout).untilAsserted(() -> { - var result = given() - .baseUri(backendService.toString()) - .when() - .get("/api/consumer/dataReference/{id}", id) - .then() - .statusCode(200) - .extract() - .body() - .as(EndpointDataReference.class); - dataReference.set(result); - }); - - return dataReference.get(); - } - - /** - * Get all EDR received by the backend service. - * - * @param id transfer process id. - * @return list of endpoint data references. - */ - public List getAllDataReferences(String id) { - var dataReference = new AtomicReference>(); - - var listType = new TypeRef>() { - }; - - await().atMost(timeout).untilAsserted(() -> { - var result = given() - .baseUri(backendService.toString()) - .when() - .get("/api/consumer/dataReference/{id}/all", id) - .then() - .statusCode(200) - .extract() - .body() - .as(listType); - dataReference.set(result); - }); - - return dataReference.get(); - } - - /** - * Pull data from provider using EDR. - * - * @param edr endpoint data reference - * @param queryParams query parameters - * @param bodyMatcher matcher for response body - */ - public void pullData(EndpointDataReference edr, Map queryParams, Matcher bodyMatcher) { - given() - .baseUri(edr.getEndpoint()) - .header(edr.getAuthKey(), edr.getAuthCode()) - .queryParams(queryParams) - .when() - .get() - .then() - .log().ifError() - .statusCode(200) - .body("message", bodyMatcher); - } - public URI backendService() { return backendService; } - public URI publicDataPlane() { - return dataPlanePublic; - } - - /** - * Register a data plane using the old data plane control API URL and no transfer types - */ - public void registerDataPlane() { - registerDataPlane(dataPlaneControl + "/transfer", Set.of()); - } - /** * Register a data plane using with input transfer type using the data plane signaling API url */ @@ -167,26 +77,6 @@ public void registerDataPlane(Set transferTypes) { registerDataPlane(dataPlaneSignaling + "/v1/dataflows", Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka"), transferTypes); } - /** - * Register a data plane - * - * @param url The data plane url - * @param transferTypes supported transfer types - */ - public void registerDataPlane(String url, Set transferTypes) { - registerDataPlane(url, Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka"), transferTypes); - } - - /** - * Register a data plane with the old data plane control API url - * - * @param sources The allowed source types - * @param destinations The allowed destination types - */ - public void registerDataPlane(Set sources, Set destinations) { - registerDataPlane(dataPlaneControl + "/transfer", sources, destinations, Set.of()); - } - /** * Register a data plane * @@ -278,25 +168,88 @@ public Map dataPlaneConfiguration() { }; } + public Map controlPlaneEmbeddedDataPlaneConfiguration() { + var cfg = dataPlaneConfiguration(); + cfg.putAll(controlPlaneConfiguration()); + return cfg; + } + public Map dataPlanePostgresConfiguration() { var baseConfiguration = dataPlaneConfiguration(); baseConfiguration.putAll(defaultDatasourceConfiguration(getName())); return baseConfiguration; } + public Map backendServiceConfiguration() { + return new HashMap<>() { + { + put("web.http.port", String.valueOf(backendService.getPort())); + } + }; + } + + /** + * Get the EDR from the EDR cache by transfer process id. + * + * @param transferProcessId The transfer process id + * @return The cached {@link DataAddress} + */ + public DataAddress getEdr(String transferProcessId) { + var dataAddressRaw = managementEndpoint.baseRequest() + .contentType(JSON) + .when() + .get("/v1/edrs/{id}/dataaddress", transferProcessId) + .then() + .log().ifError() + .statusCode(200) + .contentType(JSON) + .extract().body().as(new TypeRef>() { + }); + + + var builder = DataAddress.Builder.newInstance(); + dataAddressRaw.forEach(builder::property); + return builder.build(); + + } + + /** + * Pull data from provider using EDR. + * + * @param edr endpoint data reference + * @param queryParams query parameters + * @param bodyMatcher matcher for response body + */ + public void pullData(DataAddress edr, Map queryParams, Matcher bodyMatcher) { + given() + .baseUri(edr.getStringProperty("endpoint")) + .header("Authorization", edr.getStringProperty("authorization")) + .queryParams(queryParams) + .when() + .get() + .then() + .log().ifError() + .statusCode(200) + .body("message", bodyMatcher); + } + @NotNull private String resourceAbsolutePath(String filename) { return System.getProperty("user.dir") + separator + "build" + separator + "resources" + separator + "test" + separator + filename; } - public static class Builder

> extends Participant.Builder { + public static class Builder extends Participant.Builder { + + public static Builder newInstance() { + return new Builder(); + } - protected Builder(P participant) { - super(participant); + protected Builder() { + super(new TransferEndToEndParticipant()); } @Override - public BaseEndToEndParticipant build() { + public TransferEndToEndParticipant build() { super.managementEndpoint(new Endpoint(URI.create("http://localhost:" + getFreePort() + "/api/management"))); super.protocolEndpoint(new Endpoint(URI.create("http://localhost:" + getFreePort() + "/protocol"))); super.build(); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java index 49f35948dbc..b5d0f54d65c 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferEndToEndTestBase.java @@ -15,28 +15,36 @@ package org.eclipse.edc.test.e2e; import jakarta.json.JsonObject; -import org.eclipse.edc.test.e2e.participant.EndToEndTransferParticipant; import java.time.Duration; import java.util.Map; +import java.util.Set; import java.util.UUID; +import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; + public abstract class TransferEndToEndTestBase { protected final Duration timeout = Duration.ofSeconds(60); - protected static final EndToEndTransferParticipant CONSUMER = EndToEndTransferParticipant.Builder.newInstance() + protected static final TransferEndToEndParticipant CONSUMER = TransferEndToEndParticipant.Builder.newInstance() .name("consumer") .id("urn:connector:consumer") .build(); - protected static final EndToEndTransferParticipant PROVIDER = EndToEndTransferParticipant.Builder.newInstance() + protected static final TransferEndToEndParticipant PROVIDER = TransferEndToEndParticipant.Builder.newInstance() .name("provider") .id("urn:connector:provider") .build(); - protected void createResourcesOnProvider(String assetId, JsonObject policy, Map dataAddressProperties) { + protected void createResourcesOnProvider(String assetId, JsonObject contractPolicy, Map dataAddressProperties) { PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); - var policyDefinition = PROVIDER.createPolicyDefinition(policy); - PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), policyDefinition, policyDefinition); + var accessPolicyId = PROVIDER.createPolicyDefinition(noConstraintPolicy()); + var contractPolicyId = PROVIDER.createPolicyDefinition(contractPolicy); + PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), accessPolicyId, contractPolicyId); + } + + protected void registerDataPlanes() { + PROVIDER.registerDataPlane(Set.of("HttpData-PUSH", "HttpData-PULL")); } + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index a2d9022b40a..68b28364da2 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -14,41 +14,64 @@ package org.eclipse.edc.test.e2e; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; import jakarta.json.Json; +import jakarta.json.JsonArrayBuilder; import jakarta.json.JsonObject; +import org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures; +import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted; +import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; +import org.eclipse.edc.spi.event.EventEnvelope; import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.model.HttpStatusCode; +import org.mockserver.model.MediaType; import java.time.Instant; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import static jakarta.json.Json.createObjectBuilder; import static java.time.Duration.ofDays; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.inForceDatePolicy; import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED; import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase; import static org.eclipse.edc.test.e2e.Runtimes.backendService; -import static org.eclipse.edc.test.e2e.Runtimes.controlPlane; -import static org.eclipse.edc.test.e2e.Runtimes.dataPlane; -import static org.eclipse.edc.test.e2e.Runtimes.postgresControlPlane; -import static org.eclipse.edc.test.e2e.Runtimes.postgresDataPlane; +import static org.eclipse.edc.util.io.Ports.getFreePort; import static org.hamcrest.CoreMatchers.equalTo; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; +import static org.mockserver.stop.Stop.stopQuietly; -public class TransferPullEndToEndTest { + +class TransferPullEndToEndTest { @Nested @EndToEndTest @@ -56,11 +79,25 @@ class InMemory extends Tests { @RegisterExtension static final EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( - controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration()), - backendService("consumer-backend-service", Map.of("web.http.port", String.valueOf(CONSUMER.backendService().getPort()))), - controlPlane("provider-control-plane", PROVIDER.controlPlaneConfiguration()), - dataPlane("provider-data-plane", PROVIDER.dataPlaneConfiguration()), - backendService("provider-backend-service", Map.of("web.http.port", String.valueOf(PROVIDER.backendService().getPort()))) + Runtimes.InMemory.controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration()), + backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration()), + Runtimes.InMemory.dataPlane("provider-data-plane", PROVIDER.dataPlaneConfiguration()), + Runtimes.InMemory.controlPlane("provider-control-plane", PROVIDER.controlPlaneConfiguration()), + backendService("provider-backend-service", PROVIDER.backendServiceConfiguration()) + ); + + } + + @Nested + @EndToEndTest + class EmbeddedDataPlane extends Tests { + + @RegisterExtension + static final EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( + Runtimes.InMemory.controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration()), + backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration()), + Runtimes.InMemory.controlPlaneEmbeddedDataPlane("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration()), + backendService("provider-backend-service", PROVIDER.backendServiceConfiguration()) ); } @@ -77,71 +114,146 @@ class Postgres extends Tests { @RegisterExtension static final EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( - postgresControlPlane("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration()), - backendService("consumer-backend-service", Map.of("web.http.port", String.valueOf(CONSUMER.backendService().getPort()))), - postgresDataPlane("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration()), - postgresControlPlane("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration()), - backendService("provider-backend-service", Map.of("web.http.port", String.valueOf(PROVIDER.backendService().getPort()))) + Runtimes.Postgres.controlPlane("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration()), + backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration()), + Runtimes.Postgres.dataPlane("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration()), + Runtimes.Postgres.controlPlane("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration()), + backendService("provider-backend-service", PROVIDER.backendServiceConfiguration()) ); } abstract static class Tests extends TransferEndToEndTestBase { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String CALLBACK_PATH = "hooks"; + private static final int CALLBACK_PORT = getFreePort(); + private static ClientAndServer callbacksEndpoint; @BeforeEach - void setUp() { - PROVIDER.registerDataPlane(); + void beforeEach() { + registerDataPlanes(); + callbacksEndpoint = startClientAndServer(CALLBACK_PORT); + } + + @AfterEach + void tearDown() { + stopQuietly(callbacksEndpoint); } @Test - void pullFromHttp() { + void httpPull_dataTransfer_withCallbacks() { var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress()); + var callbacks = Json.createArrayBuilder() + .add(createCallback(callbackUrl(), true, Set.of("transfer.process.started"))) + .build(); + + var request = request().withPath("/" + CALLBACK_PATH) + .withMethod(HttpMethod.POST.name()); + + var events = new ConcurrentHashMap(); + + callbacksEndpoint.when(request).respond(req -> this.cacheEdr(req, events)); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL", callbacks); + await().atMost(timeout).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); assertThat(state).isEqualTo(STARTED.name()); }); - // retrieve the data reference - var edr = CONSUMER.getDataReference(transferProcessId); - - // pull the data without query parameter - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of(), equalTo("some information"))); + await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); - // pull the data with additional query parameter + var event = events.get(transferProcessId); var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), equalTo(msg))); - // We expect two EDR because in the test runtime we have two receiver extensions static and dynamic one - assertThat(CONSUMER.getAllDataReferences(transferProcessId)).hasSize(2); } @Test - void pullFromHttp_withTransferType() { + void httpPull_dataTransfer_withEdrCache() { var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), httpDataAddressProperties()); var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL"); + await().atMost(timeout).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); assertThat(state).isEqualTo(STARTED.name()); }); - // retrieve the data reference - var edr = CONSUMER.getDataReference(transferProcessId); + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); + + // Do the transfer + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); + + // checks that the EDR is gone once the contract expires + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); + + // checks that transfer fails + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg)))); + } + + @Test + void suspendAndResume_httpPull_dataTransfer_withEdrCache() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, CONSUMER.dynamicReceiverPrivateProperties(), + syncDataAddress(), "HttpData-PULL"); + + awaitTransferToBeInState(transferProcessId, STARTED); - // pull the data without query parameter - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of(), equalTo("some information"))); + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); - // pull the data with additional query parameter var msg = UUID.randomUUID().toString(); await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); - // We expect two EDR because in the test runtime we have two receiver extensions static and dynamic one - assertThat(CONSUMER.getAllDataReferences(transferProcessId)).hasSize(2); + CONSUMER.suspendTransfer(transferProcessId, "supension"); + + awaitTransferToBeInState(transferProcessId, SUSPENDED); + + // checks that the EDR is gone once the transfer has been suspended + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); + // checks that transfer fails + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg)))); + + CONSUMER.resumeTransfer(transferProcessId); + + // check that transfer is available again + awaitTransferToBeInState(transferProcessId, STARTED); + var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); + var secondMessage = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), equalTo(secondMessage))); + + } + + @Test + void pullFromHttp_httpProvision() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), Map.of( + "name", "transfer-test", + "baseUrl", PROVIDER.backendService() + "/api/provider/data", + "type", "HttpProvision", + "proxyQueryParams", "true" + )); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, CONSUMER.dynamicReceiverPrivateProperties(), + syncDataAddress(), "HttpData-PULL"); + + awaitTransferToBeInState(transferProcessId, STARTED); + + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); + CONSUMER.pullData(edr, Map.of("message", "some information"), equalTo("some information")); + }); } @Test @@ -153,7 +265,7 @@ void shouldTerminateTransfer_whenContractExpires_fixedInForcePeriod() { var contractPolicy = inForceDatePolicy("gteq", now.minus(ofDays(10)), "lteq", now.minus(ofDays(5))); createResourcesOnProvider(assetId, contractPolicy, httpDataAddressProperties()); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress(), "HttpData-PULL"); await().atMost(timeout).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); assertThat(state).isEqualTo(TERMINATED.name()); @@ -168,31 +280,18 @@ void shouldTerminateTransfer_whenContractExpires_durationInForcePeriod() { var contractPolicy = inForceDatePolicy("gteq", now.minus(ofDays(10)), "lteq", "contractAgreement+1s"); createResourcesOnProvider(assetId, contractPolicy, httpDataAddressProperties()); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress(), "HttpData-PULL"); await().atMost(timeout).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); assertThat(state).isEqualTo(TERMINATED.name()); }); } - @Test - void pullFromHttp_httpProvision() { - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), Map.of( - "name", "transfer-test", - "baseUrl", PROVIDER.backendService() + "/api/provider/data", - "type", "HttpProvision", - "proxyQueryParams", "true" - )); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), syncDataAddress()); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); - - var edr = CONSUMER.getDataReference(transferProcessId); - CONSUMER.pullData(edr, Map.of(), equalTo("some information")); - }); + private void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { + await().atMost(timeout).until( + () -> CONSUMER.getTransferProcessState(transferProcessId), + it -> Objects.equals(it, state.name()) + ); } private JsonObject syncDataAddress() { @@ -202,6 +301,18 @@ private JsonObject syncDataAddress() { .build(); } + public JsonObject createCallback(String url, boolean transactional, Set events) { + return Json.createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "CallbackAddress") + .add(EDC_NAMESPACE + "transactional", transactional) + .add(EDC_NAMESPACE + "uri", url) + .add(EDC_NAMESPACE + "events", events + .stream() + .collect(Json::createArrayBuilder, JsonArrayBuilder::add, JsonArrayBuilder::add) + .build()) + .build(); + } + @NotNull private Map httpDataAddressProperties() { return Map.of( @@ -212,8 +323,31 @@ private Map httpDataAddressProperties() { ); } + private HttpResponse cacheEdr(HttpRequest request, Map events) { + + try { + var event = MAPPER.readValue(request.getBody().toString(), new TypeReference>() { + }); + events.put(event.getPayload().getTransferProcessId(), event.getPayload()); + return response() + .withStatusCode(HttpStatusCode.OK_200.code()) + .withHeader(HttpHeaderNames.CONTENT_TYPE.toString(), MediaType.PLAIN_TEXT_UTF_8.toString()) + .withBody("{}"); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } + private JsonObject noPrivateProperty() { return Json.createObjectBuilder().build(); } + + private String callbackUrl() { + return String.format("http://localhost:%d/%s", callbacksEndpoint.getLocalPort(), CALLBACK_PATH); + } + } + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java index a0f36ea0bfe..529d4eff866 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPushEndToEndTest.java @@ -19,8 +19,8 @@ import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.BeforeAllCallback; @@ -38,16 +38,16 @@ import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase; +import static org.eclipse.edc.test.e2e.Runtimes.InMemory.controlPlane; +import static org.eclipse.edc.test.e2e.Runtimes.InMemory.controlPlaneEmbeddedDataPlane; +import static org.eclipse.edc.test.e2e.Runtimes.InMemory.dataPlane; import static org.eclipse.edc.test.e2e.Runtimes.backendService; -import static org.eclipse.edc.test.e2e.Runtimes.controlPlane; -import static org.eclipse.edc.test.e2e.Runtimes.dataPlane; -import static org.eclipse.edc.test.e2e.Runtimes.postgresControlPlane; -import static org.eclipse.edc.test.e2e.Runtimes.postgresDataPlane; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -public class TransferPushEndToEndTest { + +class TransferPushEndToEndTest { @Nested @EndToEndTest @@ -56,10 +56,24 @@ class InMemory extends Tests { @RegisterExtension static final EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration()), - backendService("consumer-backend-service", Map.of("web.http.port", String.valueOf(CONSUMER.backendService().getPort()))), + backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration()), dataPlane("provider-data-plane", PROVIDER.dataPlaneConfiguration()), controlPlane("provider-control-plane", PROVIDER.controlPlaneConfiguration()), - backendService("provider-backend-service", Map.of("web.http.port", String.valueOf(PROVIDER.backendService().getPort()))) + backendService("provider-backend-service", PROVIDER.backendServiceConfiguration()) + ); + + } + + @Nested + @EndToEndTest + class EmbeddedDataPlane extends Tests { + + @RegisterExtension + static final EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( + controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration()), + backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration()), + controlPlaneEmbeddedDataPlane("provider-control-plane", PROVIDER.controlPlaneEmbeddedDataPlaneConfiguration()), + backendService("provider-backend-service", PROVIDER.backendServiceConfiguration()) ); } @@ -76,50 +90,25 @@ class Postgres extends Tests { @RegisterExtension static final EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( - postgresControlPlane("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration()), - backendService("consumer-backend-service", Map.of("web.http.port", String.valueOf(CONSUMER.backendService().getPort()))), - postgresDataPlane("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration()), - postgresControlPlane("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration()), - backendService("provider-backend-service", Map.of("web.http.port", String.valueOf(PROVIDER.backendService().getPort()))) + Runtimes.Postgres.controlPlane("consumer-control-plane", CONSUMER.controlPlanePostgresConfiguration()), + backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration()), + Runtimes.Postgres.dataPlane("provider-data-plane", PROVIDER.dataPlanePostgresConfiguration()), + Runtimes.Postgres.controlPlane("provider-control-plane", PROVIDER.controlPlanePostgresConfiguration()), + backendService("provider-backend-service", PROVIDER.backendServiceConfiguration()) ); } abstract static class Tests extends TransferEndToEndTestBase { - private final String assetId = UUID.randomUUID().toString(); - @BeforeEach - void setUp() { - PROVIDER.registerDataPlane(); - } - - @Test - void httpToHttp() { - var url = PROVIDER.backendService() + "/api/provider/data"; - Map dataAddressProperties = Map.of("type", "HttpData", "baseUrl", url); - createResourcesOnProvider(assetId, noConstraintPolicy(), dataAddressProperties); - var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(COMPLETED.name()); - - given() - .baseUri(CONSUMER.backendService().toString()) - .when() - .get("/api/consumer/data") - .then() - .statusCode(anyOf(is(200), is(204))) - .body(is(notNullValue())); - }); + void beforeEach() { + registerDataPlanes(); } @Test - void httpToHttp_withTransferType() { - var url = PROVIDER.backendService() + "/api/provider/data"; - Map dataAddressProperties = Map.of("type", "HttpData", "baseUrl", url); - createResourcesOnProvider(assetId, noConstraintPolicy(), dataAddressProperties); + void httpPushDataTransfer() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); @@ -138,8 +127,8 @@ void httpToHttp_withTransferType() { } @Test - @DisplayName("Provider pushes data to Consumer, Provider needs to authenticate the data request through an oauth2 server") void httpToHttp_oauth2Provisioning() { + var assetId = UUID.randomUUID().toString(); var sourceDataAddressProperties = Map.of( "type", "HttpData", "baseUrl", PROVIDER.backendService() + "/api/provider/oauth2data", @@ -151,7 +140,7 @@ void httpToHttp_oauth2Provisioning() { createResourcesOnProvider(assetId, noConstraintPolicy(), sourceDataAddressProperties); var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination); + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); await().atMost(timeout).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); @@ -175,9 +164,19 @@ private JsonObject httpDataAddress(String baseUrl) { .build(); } + @NotNull + private Map httpDataAddressProperties() { + return Map.of( + "name", "transfer-test", + "baseUrl", PROVIDER.backendService() + "/api/provider/data", + "type", "HttpData", + "proxyQueryParams", "true" + ); + } + private JsonObject noPrivateProperty() { return Json.createObjectBuilder().build(); } - } + } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java index 01135babd58..22dbbbc5be1 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferStreamingEndToEndTest.java @@ -61,9 +61,9 @@ import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.test.e2e.Runtimes.InMemory.controlPlane; +import static org.eclipse.edc.test.e2e.Runtimes.InMemory.dataPlane; import static org.eclipse.edc.test.e2e.Runtimes.backendService; -import static org.eclipse.edc.test.e2e.Runtimes.controlPlaneSignaling; -import static org.eclipse.edc.test.e2e.Runtimes.dataPlane; import static org.eclipse.edc.util.io.Ports.getFreePort; import static org.mockserver.integration.ClientAndServer.startClientAndServer; import static org.mockserver.model.HttpRequest.request; @@ -80,11 +80,11 @@ class InMemory extends Tests { @RegisterExtension static final EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( - controlPlaneSignaling("consumer-control-plane", CONSUMER.controlPlaneConfiguration()), - backendService("consumer-backend-service", Map.of("web.http.port", String.valueOf(CONSUMER.backendService().getPort()))), - controlPlaneSignaling("provider-control-plane", PROVIDER.controlPlaneConfiguration()), + controlPlane("consumer-control-plane", CONSUMER.controlPlaneConfiguration()), + backendService("consumer-backend-service", CONSUMER.backendServiceConfiguration()), dataPlane("provider-data-plane", PROVIDER.dataPlaneConfiguration()), - backendService("provider-backend-service", Map.of("web.http.port", String.valueOf(PROVIDER.backendService().getPort()))) + controlPlane("provider-control-plane", PROVIDER.controlPlaneConfiguration()), + backendService("provider-backend-service", PROVIDER.backendServiceConfiguration()) ); } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/annotations/KafkaIntegrationTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/annotations/KafkaIntegrationTest.java deleted file mode 100644 index d021bfe204c..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/annotations/KafkaIntegrationTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (c) 2023 Amadeus - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Amadeus - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.annotations; - -import org.eclipse.edc.junit.annotations.IntegrationTest; -import org.junit.jupiter.api.Tag; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Annotation for End to End integration testing based on Kafka. - */ -@Target({ ElementType.TYPE }) -@Retention(RetentionPolicy.RUNTIME) -@IntegrationTest -@Tag("KafkaIntegrationTest") -public @interface KafkaIntegrationTest { -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java deleted file mode 100644 index c7e99d306a8..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.participant; - -import com.fasterxml.jackson.annotation.JsonCreator; - -import java.net.URI; - -import static org.eclipse.edc.util.io.Ports.getFreePort; - -public class EndToEndTransferParticipant extends BaseEndToEndParticipant { - - public static final class Builder extends BaseEndToEndParticipant.Builder { - - private Builder() { - super(new EndToEndTransferParticipant()); - } - - @JsonCreator - public static Builder newInstance() { - return new Builder(); - } - - @Override - public EndToEndTransferParticipant build() { - super.managementEndpoint(new Endpoint(URI.create("http://localhost:" + getFreePort() + "/api/management"))); - super.protocolEndpoint(new Endpoint(URI.create("http://localhost:" + getFreePort() + "/protocol"))); - super.build(); - return participant; - } - } -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/SignalingParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/SignalingParticipant.java deleted file mode 100644 index f79ed8efb01..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/SignalingParticipant.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.participant; - -import com.fasterxml.jackson.annotation.JsonCreator; -import io.restassured.common.mapper.TypeRef; -import org.eclipse.edc.spi.types.domain.DataAddress; -import org.hamcrest.Matcher; - -import java.util.Map; - -import static io.restassured.RestAssured.given; -import static io.restassured.http.ContentType.JSON; - -public class SignalingParticipant extends BaseEndToEndParticipant { - - /** - * Get the EDR from the EDR cache by transfer process id. - * - * @param transferProcessId The transfer process id - * @return The cached {@link DataAddress} - */ - public DataAddress getEdr(String transferProcessId) { - var dataAddressRaw = managementEndpoint.baseRequest() - .contentType(JSON) - .when() - .get("/v1/edrs/{id}/dataaddress", transferProcessId) - .then() - .log().ifError() - .statusCode(200) - .contentType(JSON) - .extract().body().as(new TypeRef>() { - }); - - - var builder = DataAddress.Builder.newInstance(); - dataAddressRaw.forEach(builder::property); - return builder.build(); - - } - - /** - * Pull data from provider using EDR. - * - * @param edr endpoint data reference - * @param queryParams query parameters - * @param bodyMatcher matcher for response body - */ - public void pullData(DataAddress edr, Map queryParams, Matcher bodyMatcher) { - given() - .baseUri(edr.getStringProperty("endpoint")) - .header("Authorization", edr.getStringProperty("authorization")) - .queryParams(queryParams) - .when() - .get() - .then() - .log().ifError() - .statusCode(200) - .body("message", bodyMatcher); - } - - public static class Builder extends BaseEndToEndParticipant.Builder { - - protected Builder() { - super(new SignalingParticipant()); - } - - @JsonCreator - public static Builder newInstance() { - return new Builder(); - } - - @Override - public SignalingParticipant build() { - super.build(); - return participant; - } - - } -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedDataPlaneSignalingRuntimes.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedDataPlaneSignalingRuntimes.java deleted file mode 100644 index 4f21ca417fd..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/EmbeddedDataPlaneSignalingRuntimes.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.signaling; - -import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; -import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.junit.jupiter.api.extension.RegisterExtension; - -import java.util.HashMap; -import java.util.Map; - -import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.CONSUMER; -import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.PROVIDER; - -public interface EmbeddedDataPlaneSignalingRuntimes { - - String[] PROVIDER_MODULES = new String[]{ - ":system-tests:e2e-transfer-test:control-plane", - ":extensions:control-plane:transfer:transfer-data-plane-signaling", - ":system-tests:e2e-transfer-test:data-plane", - ":extensions:data-plane:data-plane-public-api-v2" - }; - String[] CONSUMER_MODULES = new String[]{ - ":system-tests:e2e-transfer-test:control-plane", - ":core:common:edr-store-core", - ":extensions:control-plane:api:management-api:edr-cache-api", - ":extensions:control-plane:edr:edr-store-receiver", - ":extensions:control-plane:callback:callback-event-dispatcher", - ":extensions:control-plane:callback:callback-http-dispatcher" - }; - - @RegisterExtension - EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( - new EdcRuntimeExtension( - "consumer-control-plane", - CONSUMER.controlPlaneConfiguration(), - CONSUMER_MODULES - ), - new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:backend-service", - "consumer-backend-service", - new HashMap<>() { - { - put("web.http.port", String.valueOf(CONSUMER.backendService().getPort())); - } - } - ), - new EdcRuntimeExtension( - "provider-control-plane", - providerConfig(), - PROVIDER_MODULES - ), - new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:backend-service", - "provider-backend-service", - new HashMap<>() { - { - put("web.http.port", String.valueOf(PROVIDER.backendService().getPort())); - } - } - ) - ); - - private static Map providerConfig() { - var cfg = PROVIDER.dataPlaneConfiguration(); - cfg.putAll(PROVIDER.controlPlaneConfiguration()); - return cfg; - } - -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/InMemorySignalingRuntimes.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/InMemorySignalingRuntimes.java deleted file mode 100644 index 0432ed3edb6..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/InMemorySignalingRuntimes.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.signaling; - -import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; -import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.junit.jupiter.api.extension.RegisterExtension; - -import java.util.HashMap; - -import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.CONSUMER; -import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.PROVIDER; - -public interface InMemorySignalingRuntimes { - - String[] CONTROL_PLANE_MODULES = new String[]{ - ":system-tests:e2e-transfer-test:control-plane", - ":core:common:edr-store-core", - ":extensions:control-plane:transfer:transfer-data-plane-signaling", - ":extensions:control-plane:api:management-api:edr-cache-api", - ":extensions:control-plane:edr:edr-store-receiver", - ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client", - ":extensions:control-plane:callback:callback-event-dispatcher", - ":extensions:control-plane:callback:callback-http-dispatcher" - }; - - String[] DATA_PLANE_MODULES = new String[]{ - ":system-tests:e2e-transfer-test:data-plane", - ":extensions:data-plane:data-plane-public-api-v2" - }; - - EdcRuntimeExtension DATA_PLANE = new EdcRuntimeExtension( - "provider-data-plane", - PROVIDER.dataPlaneConfiguration(), - DATA_PLANE_MODULES - ); - - @RegisterExtension - EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( - new EdcRuntimeExtension( - "consumer-control-plane", - CONSUMER.controlPlaneConfiguration(), - CONTROL_PLANE_MODULES - ), - new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:backend-service", - "consumer-backend-service", - new HashMap<>() { - { - put("web.http.port", String.valueOf(CONSUMER.backendService().getPort())); - } - } - ), - DATA_PLANE, - new EdcRuntimeExtension( - "provider-control-plane", - PROVIDER.controlPlaneConfiguration(), - CONTROL_PLANE_MODULES - ), - new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:backend-service", - "provider-backend-service", - new HashMap<>() { - { - put("web.http.port", String.valueOf(PROVIDER.backendService().getPort())); - } - } - ) - ); - - -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/PostgresSignalingRuntimes.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/PostgresSignalingRuntimes.java deleted file mode 100644 index 9744a379dbe..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/PostgresSignalingRuntimes.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.signaling; - -import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; -import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; -import org.junit.jupiter.api.extension.RegisterExtension; - -import java.util.HashMap; - -import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.CONSUMER; -import static org.eclipse.edc.test.e2e.signaling.SignalingEndToEndTestBase.PROVIDER; - -public interface PostgresSignalingRuntimes { - - String[] CONTROL_PLANE_MODULES = new String[]{ - ":system-tests:e2e-transfer-test:control-plane", - ":core:common:edr-store-core", - ":extensions:common:store:sql:edr-index-sql", - ":extensions:common:sql:sql-pool:sql-pool-apache-commons", - ":extensions:common:transaction:transaction-local", - ":extensions:control-plane:transfer:transfer-data-plane-signaling", - ":extensions:control-plane:api:management-api:edr-cache-api", - ":extensions:control-plane:edr:edr-store-receiver", - ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client", - ":extensions:control-plane:callback:callback-event-dispatcher", - ":extensions:control-plane:callback:callback-http-dispatcher" - }; - - String[] DATA_PLANE_MODULES = new String[]{ - ":system-tests:e2e-transfer-test:data-plane", - ":extensions:data-plane:data-plane-public-api-v2" - }; - - EdcRuntimeExtension DATA_PLANE = new EdcRuntimeExtension( - "provider-data-plane", - PROVIDER.dataPlaneConfiguration(), - DATA_PLANE_MODULES - ); - - @RegisterExtension - EdcClassRuntimesExtension RUNTIMES = new EdcClassRuntimesExtension( - new EdcRuntimeExtension( - "consumer-control-plane", - CONSUMER.controlPlanePostgresConfiguration(), - CONTROL_PLANE_MODULES - ), - new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:backend-service", - "consumer-backend-service", - new HashMap<>() { - { - put("web.http.port", String.valueOf(CONSUMER.backendService().getPort())); - } - } - ), - DATA_PLANE, - new EdcRuntimeExtension( - "provider-control-plane", - PROVIDER.controlPlanePostgresConfiguration(), - CONTROL_PLANE_MODULES - ), - new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:backend-service", - "provider-backend-service", - new HashMap<>() { - { - put("web.http.port", String.valueOf(PROVIDER.backendService().getPort())); - } - } - ) - ); - - -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTestBase.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTestBase.java deleted file mode 100644 index fbbc66687e5..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTestBase.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.signaling; - -import jakarta.json.JsonObject; -import org.eclipse.edc.test.e2e.participant.SignalingParticipant; - -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; - -public abstract class SignalingEndToEndTestBase { - - protected static final SignalingParticipant CONSUMER = SignalingParticipant.Builder.newInstance() - .name("consumer") - .id("urn:connector:consumer") - .build(); - protected static final SignalingParticipant PROVIDER = SignalingParticipant.Builder.newInstance() - .name("provider") - .id("urn:connector:provider") - .build(); - - protected void createResourcesOnProvider(String assetId, JsonObject contractPolicy, Map dataAddressProperties) { - PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); - var accessPolicyId = PROVIDER.createPolicyDefinition(noConstraintPolicy()); - var contractPolicyId = PROVIDER.createPolicyDefinition(contractPolicy); - PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), accessPolicyId, contractPolicyId); - } - - protected void registerDataPlanes() { - PROVIDER.registerDataPlane(Set.of("HttpData-PUSH", "HttpData-PULL")); - } - -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingPullEndToEndTest.java deleted file mode 100644 index 2eb80a0a401..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingPullEndToEndTest.java +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.signaling; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpMethod; -import jakarta.json.Json; -import jakarta.json.JsonArrayBuilder; -import jakarta.json.JsonObject; -import org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures; -import org.eclipse.edc.connector.controlplane.transfer.spi.event.TransferProcessStarted; -import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates; -import org.eclipse.edc.junit.annotations.EndToEndTest; -import org.eclipse.edc.junit.annotations.PostgresqlIntegrationTest; -import org.eclipse.edc.spi.event.EventEnvelope; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.BeforeAllCallback; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.mockserver.integration.ClientAndServer; -import org.mockserver.model.HttpRequest; -import org.mockserver.model.HttpResponse; -import org.mockserver.model.HttpStatusCode; -import org.mockserver.model.MediaType; - -import java.time.Duration; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import static jakarta.json.Json.createObjectBuilder; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; -import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.STARTED; -import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.SUSPENDED; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; -import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; -import static org.eclipse.edc.sql.testfixtures.PostgresqlEndToEndInstance.createDatabase; -import static org.eclipse.edc.util.io.Ports.getFreePort; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.mockserver.integration.ClientAndServer.startClientAndServer; -import static org.mockserver.model.HttpRequest.request; -import static org.mockserver.model.HttpResponse.response; -import static org.mockserver.stop.Stop.stopQuietly; - - -class TransferSignalingPullEndToEndTest { - - @Nested - @EndToEndTest - class InMemory extends Tests implements InMemorySignalingRuntimes { - - } - - @Nested - @EndToEndTest - class EmbeddedDataPlane extends Tests implements EmbeddedDataPlaneSignalingRuntimes { - - } - - @Nested - @PostgresqlIntegrationTest - class Postgres extends Tests implements PostgresSignalingRuntimes { - - @RegisterExtension - static final BeforeAllCallback CREATE_DATABASES = context -> { - createDatabase(CONSUMER.getName()); - createDatabase(PROVIDER.getName()); - }; - } - - abstract static class Tests extends SignalingEndToEndTestBase { - private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final String CALLBACK_PATH = "hooks"; - private static final int CALLBACK_PORT = getFreePort(); - private static ClientAndServer callbacksEndpoint; - protected final Duration timeout = Duration.ofSeconds(60); - - @BeforeEach - void beforeEach() { - registerDataPlanes(); - callbacksEndpoint = startClientAndServer(CALLBACK_PORT); - } - - @AfterEach - void tearDown() { - stopQuietly(callbacksEndpoint); - } - - @Test - void httpPull_dataTransfer_withCallbacks() { - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); - - var callbacks = Json.createArrayBuilder() - .add(createCallback(callbackUrl(), true, Set.of("transfer.process.started"))) - .build(); - - var request = request().withPath("/" + CALLBACK_PATH) - .withMethod(HttpMethod.POST.name()); - - var events = new ConcurrentHashMap(); - - callbacksEndpoint.when(request).respond(req -> this.cacheEdr(req, events)); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL", callbacks); - - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); - }); - - await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); - - var event = events.get(transferProcessId); - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), equalTo(msg))); - - } - - @Test - void httpPull_dataTransfer_withEdrCache() { - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), httpDataAddressProperties()); - var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); - - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL"); - - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); - }); - - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); - - // Do the transfer - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); - - // checks that the EDR is gone once the contract expires - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); - - // checks that transfer fails - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg)))); - } - - @Test - void suspendAndResume_httpPull_dataTransfer_withEdrCache() { - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, CONSUMER.dynamicReceiverPrivateProperties(), - syncDataAddress(), "HttpData-PULL"); - - awaitTransferToBeInState(transferProcessId, STARTED); - - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); - - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); - - CONSUMER.suspendTransfer(transferProcessId, "supension"); - - awaitTransferToBeInState(transferProcessId, SUSPENDED); - - // checks that the EDR is gone once the transfer has been suspended - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); - // checks that transfer fails - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg)))); - - CONSUMER.resumeTransfer(transferProcessId); - - // check that transfer is available again - awaitTransferToBeInState(transferProcessId, STARTED); - var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); - var secondMessage = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), equalTo(secondMessage))); - - } - - private void awaitTransferToBeInState(String transferProcessId, TransferProcessStates state) { - await().atMost(timeout).until( - () -> CONSUMER.getTransferProcessState(transferProcessId), - it -> Objects.equals(it, state.name()) - ); - } - - private JsonObject syncDataAddress() { - return createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "HttpProxy") - .build(); - } - - public JsonObject createCallback(String url, boolean transactional, Set events) { - return Json.createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "CallbackAddress") - .add(EDC_NAMESPACE + "transactional", transactional) - .add(EDC_NAMESPACE + "uri", url) - .add(EDC_NAMESPACE + "events", events - .stream() - .collect(Json::createArrayBuilder, JsonArrayBuilder::add, JsonArrayBuilder::add) - .build()) - .build(); - } - - @NotNull - private Map httpDataAddressProperties() { - return Map.of( - "name", "transfer-test", - "baseUrl", PROVIDER.backendService() + "/api/provider/data", - "type", "HttpData", - "proxyQueryParams", "true" - ); - } - - private HttpResponse cacheEdr(HttpRequest request, Map events) { - - try { - var event = MAPPER.readValue(request.getBody().toString(), new TypeReference>() { - }); - events.put(event.getPayload().getTransferProcessId(), event.getPayload()); - return response() - .withStatusCode(HttpStatusCode.OK_200.code()) - .withHeader(HttpHeaderNames.CONTENT_TYPE.toString(), MediaType.PLAIN_TEXT_UTF_8.toString()) - .withBody("{}"); - - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - - } - - private String callbackUrl() { - return String.format("http://localhost:%d/%s", callbacksEndpoint.getLocalPort(), CALLBACK_PATH); - } - - } - -} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingPushEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingPushEndToEndTest.java deleted file mode 100644 index 88973459d82..00000000000 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/TransferSignalingPushEndToEndTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - -package org.eclipse.edc.test.e2e.signaling; - -import jakarta.json.Json; -import jakarta.json.JsonObject; -import org.eclipse.edc.junit.annotations.EndToEndTest; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -import java.time.Duration; -import java.util.Map; -import java.util.UUID; - -import static io.restassured.RestAssured.given; -import static jakarta.json.Json.createObjectBuilder; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.eclipse.edc.connector.controlplane.test.system.utils.PolicyFixtures.noConstraintPolicy; -import static org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates.COMPLETED; -import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; -import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; - - -class TransferSignalingPushEndToEndTest { - - @Nested - @EndToEndTest - class InMemory extends Tests implements InMemorySignalingRuntimes { - - } - - @Nested - @EndToEndTest - class EmbeddedDataPlane extends Tests implements EmbeddedDataPlaneSignalingRuntimes { - - } - - abstract static class Tests extends SignalingEndToEndTestBase { - protected final Duration timeout = Duration.ofSeconds(60); - - @BeforeEach - void beforeEach() { - registerDataPlanes(); - } - - @Test - void httpPushDataTransfer() { - var assetId = UUID.randomUUID().toString(); - createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); - var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); - - var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); - await().atMost(timeout).untilAsserted(() -> { - var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(COMPLETED.name()); - - given() - .baseUri(CONSUMER.backendService().toString()) - .when() - .get("/api/consumer/data") - .then() - .statusCode(anyOf(is(200), is(204))) - .body(is(notNullValue())); - }); - } - - private JsonObject httpDataAddress(String baseUrl) { - return createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "HttpData") - .add(EDC_NAMESPACE + "properties", createObjectBuilder() - .add(EDC_NAMESPACE + "baseUrl", baseUrl) - .build()) - .build(); - } - - @NotNull - private Map httpDataAddressProperties() { - return Map.of( - "name", "transfer-test", - "baseUrl", PROVIDER.backendService() + "/api/provider/data", - "type", "HttpData", - "proxyQueryParams", "true" - ); - } - - private JsonObject noPrivateProperty() { - return Json.createObjectBuilder().build(); - } - } - -}