From c88fee7fc691a65ecee9e0165639f4b0e59865e8 Mon Sep 17 00:00:00 2001 From: AndrYurk Date: Fri, 17 Jan 2025 12:43:38 +0100 Subject: [PATCH] refactor: e2e pull transfer tests --- .../test/e2e/TransferPullEndToEndTest.java | 190 ++++++++---------- 1 file changed, 82 insertions(+), 108 deletions(-) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java index ad6a662940..193db5dde9 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/TransferPullEndToEndTest.java @@ -20,6 +20,7 @@ import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; import jakarta.json.Json; +import jakarta.json.JsonArray; import jakarta.json.JsonArrayBuilder; import jakarta.json.JsonObject; import okhttp3.Request; @@ -33,6 +34,7 @@ import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.types.domain.DataAddress; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -115,26 +117,13 @@ void httpPull_dataTransfer_withCallbacks() { var callbacks = Json.createArrayBuilder() .add(createCallback(callbackUrl, true, Set.of("transfer.process.started"))) .build(); - var request = request().withPath("/hooks") .withMethod(HttpMethod.POST.name()); - var events = new ConcurrentHashMap(); - callbacksEndpoint.when(request).respond(req -> this.cacheEdr(req, events)); - var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withTransferType("HttpData-PULL") - .withCallbacks(callbacks) - .execute(); - - CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); - - await().atMost(timeout).untilAsserted(() -> assertThat(events.get(transferProcessId)).isNotNull()); - - var event = events.get(transferProcessId); - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); + StartedTransferContext startedTransferContext = startTransferProcess(assetId, callbacks); + assertDataIsAccessible(startedTransferContext.consumerTransferProcessId, events); providerDataSource.verify(request("/source").withMethod("GET")); stopQuietly(callbacksEndpoint); @@ -143,27 +132,10 @@ void httpPull_dataTransfer_withCallbacks() { @Test void httpPull_dataTransfer_withEdrCache() { var assetId = UUID.randomUUID().toString(); - var sourceDataAddress = httpSourceDataAddress(); - createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), sourceDataAddress); - - var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withTransferType("HttpData-PULL") - .execute(); - - CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); - - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); - - // Do the transfer - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); - - // checks that the EDR is gone once the contract expires - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); - - // checks that transfer fails - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); - + createResourcesOnProvider(assetId, PolicyFixtures.contractExpiresIn("10s"), httpSourceDataAddress()); + StartedTransferContext startedTransferContext = startTransferProcess(assetId); + EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); + assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext); providerDataSource.verify(request("/source").withMethod("GET")); } @@ -171,34 +143,18 @@ void httpPull_dataTransfer_withEdrCache() { void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); + StartedTransferContext startedTransferContext = startTransferProcess(assetId); + EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); - var transferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withTransferType("HttpData-PULL") - .execute(); - - CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); - - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); - - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); - - CONSUMER.suspendTransfer(transferProcessId, "supension"); + CONSUMER.suspendTransfer(startedTransferContext.consumerTransferProcessId, "suspension"); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED); + assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext); - CONSUMER.awaitTransferToBeInState(transferProcessId, SUSPENDED); - - // checks that the EDR is gone once the transfer has been suspended - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); - // checks that transfer fails - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); - - CONSUMER.resumeTransfer(transferProcessId); - - // check that transfer is available again - CONSUMER.awaitTransferToBeInState(transferProcessId, STARTED); - var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(transferProcessId), Objects::nonNull); - var secondMessage = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); + CONSUMER.resumeTransfer(startedTransferContext.consumerTransferProcessId); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED); + assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); providerDataSource.verify(request("/source").withMethod("GET")); } @@ -207,38 +163,18 @@ void suspendAndResumeByConsumer_httpPull_dataTransfer_withEdrCache() { void suspendAndResumeByProvider_httpPull_dataTransfer_withEdrCache() { var assetId = UUID.randomUUID().toString(); createResourcesOnProvider(assetId, httpSourceDataAddress()); + StartedTransferContext startedTransferContext = startTransferProcess(assetId); + EdrMessageContext edrMessageContext = assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); - var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withTransferType("HttpData-PULL") - .execute(); - - CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); - - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); - - var msg = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); - - var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() - .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) - .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + PROVIDER.suspendTransfer(startedTransferContext.providerTransferProcessId, "suspension"); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, SUSPENDED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, SUSPENDED); + assertDataIsNotAccessible(startedTransferContext.consumerTransferProcessId, edrMessageContext); - PROVIDER.suspendTransfer(providerTransferProcessId, "supension"); - - PROVIDER.awaitTransferToBeInState(providerTransferProcessId, SUSPENDED); - - // checks that the EDR is gone once the transfer has been suspended - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); - // checks that transfer fails - await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data")))); - - PROVIDER.resumeTransfer(providerTransferProcessId); - - // check that transfer is available again - PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); - var secondEdr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); - var secondMessage = UUID.randomUUID().toString(); - await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(secondEdr, Map.of("message", secondMessage), body -> assertThat(body).isEqualTo("data"))); + PROVIDER.resumeTransfer(startedTransferContext.providerTransferProcessId); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, STARTED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, STARTED); + assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); providerDataSource.verify(request("/source").withMethod("GET")); } @@ -256,26 +192,15 @@ void pullFromHttp_httpProvision() { "proxyQueryParams", "true" )); - var consumerTransferProcessId = CONSUMER.requestAssetFrom(assetId, PROVIDER) - .withTransferType("HttpData-PULL") - .execute(); - - CONSUMER.awaitTransferToBeInState(consumerTransferProcessId, STARTED); - - await().atMost(timeout).untilAsserted(() -> { - var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); - CONSUMER.pullData(edr, Map.of("message", "some information"), body -> assertThat(body).isEqualTo("data")); - }); + StartedTransferContext startedTransferContext = startTransferProcess(assetId); + assertDataIsAccessible(startedTransferContext.consumerTransferProcessId); provisionServer.verify(request("/provision")); provisionServer.clear(request("provision")); - var providerTransferProcessId = PROVIDER.getTransferProcesses().stream() - .filter(filter -> filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) - .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); - - PROVIDER.terminateTransfer(providerTransferProcessId); - PROVIDER.awaitTransferToBeInState(providerTransferProcessId, DEPROVISIONED); + PROVIDER.terminateTransfer(startedTransferContext.providerTransferProcessId); + PROVIDER.awaitTransferToBeInState(startedTransferContext.providerTransferProcessId, DEPROVISIONED); + CONSUMER.awaitTransferToBeInState(startedTransferContext.consumerTransferProcessId, TERMINATED); provisionServer.verify(request("/provision")); @@ -340,6 +265,55 @@ private HttpResponse cacheEdr(HttpRequest request, Map filter.asJsonObject().getString("correlationId").equals(consumerTransferProcessId)) + .map(id -> id.asJsonObject().getString("@id")).findFirst().orElseThrow(); + PROVIDER.awaitTransferToBeInState(providerTransferProcessId, STARTED); + + return new StartedTransferContext(consumerTransferProcessId, providerTransferProcessId); + } + + private EdrMessageContext assertDataIsAccessible(String consumerTransferProcessId) { + var edr = await().atMost(timeout).until(() -> CONSUMER.getEdr(consumerTransferProcessId), Objects::nonNull); + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); + + return new EdrMessageContext(edr, msg); + } + + private EdrMessageContext assertDataIsAccessible(String consumerTransferProcessId, ConcurrentHashMap events) { + await().atMost(timeout).untilAsserted(() -> assertThat(events.get(consumerTransferProcessId)).isNotNull()); + + var event = events.get(consumerTransferProcessId); + var msg = UUID.randomUUID().toString(); + var edr = event.getDataAddress(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(event.getDataAddress(), Map.of("message", msg), body -> assertThat(body).isEqualTo("data"))); + + return new EdrMessageContext(edr, msg); + } + + private void assertDataIsNotAccessible(String consumerTransferProcessId, EdrMessageContext edrMessageContext) { + // checks that the EDR is gone once the transfer has been suspended + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(consumerTransferProcessId))); + // checks that transfer fails + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edrMessageContext.edr, Map.of("message", edrMessageContext.message), body -> assertThat(body).isEqualTo("data")))); + } + + private record StartedTransferContext(String consumerTransferProcessId, String providerTransferProcessId) { } + + private record EdrMessageContext(DataAddress edr, String message) { } + /** * Mocked http provisioner */