From 026657551d45066c3b73901099bdf21f30d87372 Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Fri, 1 Dec 2023 09:39:29 +0100 Subject: [PATCH] refactor: create distributions from `DataFlowManager` transfer types (#3669) refactor: create distributions from DataFlowManager transfer types --- .../catalog-core/build.gradle.kts | 2 +- .../CatalogDefaultServicesExtension.java | 6 +- .../catalog/DatasetResolverImpl.java | 2 +- .../catalog/DefaultDistributionResolver.java | 17 +- .../catalog/DatasetResolverImplTest.java | 2 +- .../DefaultDistributionResolverTest.java | 24 ++- .../ContractNegotiationEventDispatchTest.java | 1 - .../transfer/flow/DataFlowManagerImpl.java | 13 ++ .../flow/DataFlowManagerImplTest.java | 186 ++++++++++-------- ...onsumerPullTransferDataFlowController.java | 8 + ...roviderPushTransferDataFlowController.java | 16 ++ ...merPullTransferDataFlowControllerTest.java | 10 + ...derPushTransferDataFlowControllerTest.java | 23 ++- .../edc/catalog/spi/DistributionResolver.java | 5 +- .../transfer/spi/flow/DataFlowController.java | 34 +--- .../transfer/spi/flow/DataFlowManager.java | 10 + .../connector/transfer/spi/flow/FlowType.java | 23 +++ 17 files changed, 241 insertions(+), 141 deletions(-) create mode 100644 spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/FlowType.java diff --git a/core/control-plane/catalog-core/build.gradle.kts b/core/control-plane/catalog-core/build.gradle.kts index 8b1b5291c9a..4bb13954a2e 100644 --- a/core/control-plane/catalog-core/build.gradle.kts +++ b/core/control-plane/catalog-core/build.gradle.kts @@ -19,7 +19,7 @@ plugins { dependencies { api(project(":spi:common:catalog-spi")) api(project(":spi:control-plane:contract-spi")) - api(project(":spi:data-plane-selector:data-plane-selector-spi")) + api(project(":spi:control-plane:transfer-spi")) testImplementation(project(":core:common:junit")) testImplementation(project(":core:control-plane:control-plane-core")) diff --git a/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/CatalogDefaultServicesExtension.java b/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/CatalogDefaultServicesExtension.java index d697fa80bab..3ab641ee4de 100644 --- a/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/CatalogDefaultServicesExtension.java +++ b/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/CatalogDefaultServicesExtension.java @@ -16,7 +16,7 @@ import org.eclipse.edc.catalog.spi.DataServiceRegistry; import org.eclipse.edc.catalog.spi.DistributionResolver; -import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provider; @@ -29,7 +29,7 @@ public class CatalogDefaultServicesExtension implements ServiceExtension { public static final String NAME = "Catalog Default Services"; @Inject - private DataPlaneInstanceStore dataPlaneInstanceStore; + private DataFlowManager dataFlowManager; private DataServiceRegistry dataServiceRegistry; @@ -50,6 +50,6 @@ public DataServiceRegistry dataServiceRegistry() { @Provider(isDefault = true) public DistributionResolver distributionResolver() { - return new DefaultDistributionResolver(dataServiceRegistry, dataPlaneInstanceStore); + return new DefaultDistributionResolver(dataServiceRegistry, dataFlowManager); } } diff --git a/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DatasetResolverImpl.java b/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DatasetResolverImpl.java index e7f85a790c7..945c920aaab 100644 --- a/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DatasetResolverImpl.java +++ b/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DatasetResolverImpl.java @@ -76,7 +76,7 @@ public Dataset getById(ParticipantAgent agent, String id) { private Dataset toDataset(List contractDefinitions, Asset asset) { - var distributions = distributionResolver.getDistributions(asset, null); // TODO: data addresses should be retrieved + var distributions = distributionResolver.getDistributions(asset); var datasetBuilder = Dataset.Builder.newInstance() .id(asset.getId()) .distributions(distributions) diff --git a/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DefaultDistributionResolver.java b/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DefaultDistributionResolver.java index a24941462ae..f669ebfef5c 100644 --- a/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DefaultDistributionResolver.java +++ b/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DefaultDistributionResolver.java @@ -18,29 +18,24 @@ import org.eclipse.edc.catalog.spi.DataServiceRegistry; import org.eclipse.edc.catalog.spi.Distribution; import org.eclipse.edc.catalog.spi.DistributionResolver; -import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore; -import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.spi.types.domain.asset.Asset; import java.util.List; -import java.util.stream.Collectors; public class DefaultDistributionResolver implements DistributionResolver { private final DataServiceRegistry dataServiceRegistry; - private final DataPlaneInstanceStore dataPlaneInstanceStore; + private final DataFlowManager dataFlowManager; - public DefaultDistributionResolver(DataServiceRegistry dataServiceRegistry, DataPlaneInstanceStore dataPlaneInstanceStore) { + public DefaultDistributionResolver(DataServiceRegistry dataServiceRegistry, DataFlowManager dataFlowManager) { this.dataServiceRegistry = dataServiceRegistry; - this.dataPlaneInstanceStore = dataPlaneInstanceStore; + this.dataFlowManager = dataFlowManager; } @Override - public List getDistributions(Asset asset, DataAddress dataAddress) { - return dataPlaneInstanceStore.getAll() - .flatMap(it -> it.getAllowedDestTypes().stream()) - .map(this::createDistribution) - .collect(Collectors.toList()); + public List getDistributions(Asset asset) { + return dataFlowManager.transferTypesFor(asset).stream().map(this::createDistribution).toList(); } private Distribution createDistribution(String format) { diff --git a/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplTest.java b/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplTest.java index 98fdcfeb2ea..70683cfd3d9 100644 --- a/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplTest.java +++ b/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplTest.java @@ -81,7 +81,7 @@ void query_shouldReturnOneDatasetPerAsset() { when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition)); when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(Stream.of(createAsset("assetId").property("key", "value").build())); when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); - when(distributionResolver.getDistributions(isA(Asset.class), any())).thenReturn(List.of(distribution)); + when(distributionResolver.getDistributions(isA(Asset.class))).thenReturn(List.of(distribution)); var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); diff --git a/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DefaultDistributionResolverTest.java b/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DefaultDistributionResolverTest.java index 0d6fd5e3afb..85cd87622ae 100644 --- a/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DefaultDistributionResolverTest.java +++ b/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DefaultDistributionResolverTest.java @@ -16,38 +16,36 @@ import org.eclipse.edc.catalog.spi.DataService; import org.eclipse.edc.catalog.spi.DataServiceRegistry; -import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; -import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.asset.Asset; import org.junit.jupiter.api.Test; import java.util.List; -import java.util.stream.Stream; +import java.util.Set; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; class DefaultDistributionResolverTest { private final DataService dataService = DataService.Builder.newInstance().build(); - private DataServiceRegistry dataServiceRegistry = mock(DataServiceRegistry.class); - private final DataPlaneInstanceStore dataPlaneInstanceStore = mock(DataPlaneInstanceStore.class); + private final DataServiceRegistry dataServiceRegistry = mock(); + private final DataFlowManager dataFlowManager = mock(); - private final DefaultDistributionResolver resolver = new DefaultDistributionResolver(dataServiceRegistry, dataPlaneInstanceStore); + private final DefaultDistributionResolver resolver = new DefaultDistributionResolver(dataServiceRegistry, dataFlowManager); @Test - void shouldReturnDistributionForEverySupportedDestType() { + void shouldReturnDistributionsForEveryTransferType() { when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService)); - - var dataPlane1 = DataPlaneInstance.Builder.newInstance().url("http://data-plane-one").allowedDestType("type1").build(); - var dataPlane2 = DataPlaneInstance.Builder.newInstance().url("http://data-plane-two").allowedDestType("type2").build(); - when(dataPlaneInstanceStore.getAll()).thenReturn(Stream.of(dataPlane1, dataPlane2)); - var asset = Asset.Builder.newInstance().build(); + when(dataFlowManager.transferTypesFor(any())).thenReturn(Set.of("type1", "type2")); + var dataAddress = DataAddress.Builder.newInstance().type("any").build(); + var asset = Asset.Builder.newInstance().dataAddress(dataAddress).build(); - var distributions = resolver.getDistributions(asset, dataAddress); + var distributions = resolver.getDistributions(asset); assertThat(distributions).hasSize(2) .anySatisfy(distribution -> { diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationEventDispatchTest.java index 64cff7d1d95..41fd671e05c 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationEventDispatchTest.java @@ -66,7 +66,6 @@ @ExtendWith(EdcExtension.class) class ContractNegotiationEventDispatchTest { private static final String CONSUMER = "consumer"; - private static final String PROVIDER = "provider"; private final EventSubscriber eventSubscriber = mock(EventSubscriber.class); private final IdentityService identityService = mock(); diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java index 8a4908c599d..0bf244b12e4 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImpl.java @@ -21,14 +21,18 @@ import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.asset.Asset; import org.jetbrains.annotations.NotNull; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.function.Function; import static java.lang.String.format; +import static java.util.stream.Collectors.toSet; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; /** @@ -63,6 +67,15 @@ public void register(int priority, DataFlowController controller) { return chooseControllerAndApply(transferProcess, controller -> controller.terminate(transferProcess)); } + @Override + public Set transferTypesFor(Asset asset) { + return controllers.stream() + .map(it -> it.controller) + .map(it -> it.transferTypesFor(asset)) + .flatMap(Collection::stream) + .collect(toSet()); + } + @NotNull private StatusResult chooseControllerAndApply(TransferProcess transferProcess, Function> function) { return controllers.stream() diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java index 1c4115e11de..09ad7c2a01e 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/flow/DataFlowManagerImplTest.java @@ -22,8 +22,12 @@ import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import java.util.Set; + import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; @@ -37,94 +41,120 @@ class DataFlowManagerImplTest { private final DataFlowManagerImpl manager = new DataFlowManagerImpl(); - @Test - void initiate_shouldInitiateFlowOnCorrectController() { - var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); - var policy = Policy.Builder.newInstance().build(); - var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); - - when(controller.canHandle(any())).thenReturn(true); - when(controller.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); - manager.register(controller); - - var response = manager.initiate(transferProcess, policy); - - assertThat(response.succeeded()).isTrue(); - } - - @Test - void initiate_shouldReturnFatalError_whenNoControllerCanHandleTheRequest() { - var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); - var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); - var policy = Policy.Builder.newInstance().build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); - - when(controller.canHandle(any())).thenReturn(false); - manager.register(controller); - - var response = manager.initiate(transferProcess, policy); - - assertThat(response.succeeded()).isFalse(); - assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR); + @Nested + class Initiate { + @Test + void shouldInitiateFlowOnCorrectController() { + var controller = mock(DataFlowController.class); + var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var policy = Policy.Builder.newInstance().build(); + var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); + var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + + when(controller.canHandle(any())).thenReturn(true); + when(controller.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); + manager.register(controller); + + var response = manager.initiate(transferProcess, policy); + + assertThat(response.succeeded()).isTrue(); + } + + @Test + void shouldReturnFatalError_whenNoControllerCanHandleTheRequest() { + var controller = mock(DataFlowController.class); + var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); + var policy = Policy.Builder.newInstance().build(); + var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + + when(controller.canHandle(any())).thenReturn(false); + manager.register(controller); + + var response = manager.initiate(transferProcess, policy); + + assertThat(response.succeeded()).isFalse(); + assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR); + } + + @Test + void shouldCatchExceptionsAndReturnFatalError() { + var controller = mock(DataFlowController.class); + var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); + var policy = Policy.Builder.newInstance().build(); + var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + + var errorMsg = "Test Error Message"; + when(controller.canHandle(any())).thenReturn(true); + when(controller.initiateFlow(any(), any())).thenThrow(new EdcException(errorMsg)); + manager.register(controller); + + var response = manager.initiate(transferProcess, policy); + + assertThat(response.succeeded()).isFalse(); + assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR); + assertThat(response.getFailureMessages()).hasSize(1).first().matches(message -> message.contains(errorMsg)); + } + + @Test + void shouldChooseHighestPriorityController() { + var highPriority = createDataFlowController(); + var lowPriority = createDataFlowController(); + manager.register(1, lowPriority); + manager.register(2, highPriority); + + manager.initiate(TransferProcess.Builder.newInstance().build(), Policy.Builder.newInstance().build()); + + verify(highPriority).initiateFlow(any(), any()); + verifyNoInteractions(lowPriority); + } + + private DataFlowController createDataFlowController() { + var dataFlowController = mock(DataFlowController.class); + when(dataFlowController.canHandle(any())).thenReturn(true); + when(dataFlowController.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); + return dataFlowController; + } } - @Test - void initiate_shouldCatchExceptionsAndReturnFatalError() { - var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); - var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); - var policy = Policy.Builder.newInstance().build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + @Nested + class Terminate { + @Test + void shouldChooseControllerAndTerminate() { + var controller = mock(DataFlowController.class); + var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); + var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); + var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); - var errorMsg = "Test Error Message"; - when(controller.canHandle(any())).thenReturn(true); - when(controller.initiateFlow(any(), any())).thenThrow(new EdcException(errorMsg)); - manager.register(controller); + when(controller.canHandle(any())).thenReturn(true); + when(controller.terminate(any())).thenReturn(StatusResult.success()); + manager.register(controller); - var response = manager.initiate(transferProcess, policy); + var result = manager.terminate(transferProcess); - assertThat(response.succeeded()).isFalse(); - assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR); - assertThat(response.getFailureMessages()).hasSize(1).first().matches(message -> message.contains(errorMsg)); + assertThat(result).isSucceeded(); + verify(controller).terminate(transferProcess); + } } - @Test - void initiate_shouldChooseHighestPriorityController() { - var highPriority = createDataFlowController(); - var lowPriority = createDataFlowController(); - manager.register(1, lowPriority); - manager.register(2, highPriority); + @Nested + class TransferTypesFor { + @Test + void shouldReturnTransferTypesFromControllers() { + var controllerOne = mock(DataFlowController.class); + when(controllerOne.transferTypesFor(any())).thenReturn(Set.of("Type1")); + var controllerTwo = mock(DataFlowController.class); + when(controllerTwo.transferTypesFor(any())).thenReturn(Set.of("Type2", "Type3")); + manager.register(controllerOne); + manager.register(controllerTwo); + var asset = Asset.Builder.newInstance().build(); - manager.initiate(TransferProcess.Builder.newInstance().build(), Policy.Builder.newInstance().build()); - - verify(highPriority).initiateFlow(any(), any()); - verifyNoInteractions(lowPriority); - } - @Test - void terminate_shouldChooseControllerAndTerminate() { - var controller = mock(DataFlowController.class); - var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build(); - var dataAddress = DataAddress.Builder.newInstance().type("test-type").build(); - var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build(); + var result = manager.transferTypesFor(asset); - when(controller.canHandle(any())).thenReturn(true); - when(controller.terminate(any())).thenReturn(StatusResult.success()); - manager.register(controller); - - var result = manager.terminate(transferProcess); - - assertThat(result).isSucceeded(); - verify(controller).terminate(transferProcess); + assertThat(result).containsExactlyInAnyOrder("Type1", "Type2", "Type3"); + } } - private DataFlowController createDataFlowController() { - var dataFlowController = mock(DataFlowController.class); - when(dataFlowController.canHandle(any())).thenReturn(true); - when(dataFlowController.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build())); - return dataFlowController; - } } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java index 572d6efd387..202ce616bea 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java @@ -22,12 +22,15 @@ import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.asset.Asset; import org.jetbrains.annotations.NotNull; import java.util.Optional; +import java.util.Set; import static java.lang.String.format; import static org.eclipse.edc.connector.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY; +import static org.eclipse.edc.connector.transfer.spi.flow.FlowType.PULL; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; import static org.eclipse.edc.spi.response.StatusResult.failure; @@ -63,6 +66,11 @@ public StatusResult terminate(TransferProcess transferProcess) { return StatusResult.success(); } + @Override + public Set transferTypesFor(Asset asset) { + return Set.of("%s-%s".formatted("Http", PULL)); + } + private DataFlowResponse toResponse(DataAddress address) { return DataFlowResponse.Builder.newInstance().dataAddress(address).build(); } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java index a241c84e2f4..1f0e990fe09 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java @@ -16,18 +16,24 @@ import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl; import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.asset.Asset; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; import org.jetbrains.annotations.NotNull; +import java.util.Collection; +import java.util.Set; import java.util.UUID; +import static java.util.stream.Collectors.toSet; import static org.eclipse.edc.connector.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY; +import static org.eclipse.edc.connector.transfer.spi.flow.FlowType.PUSH; public class ProviderPushTransferDataFlowController implements DataFlowController { @@ -71,4 +77,14 @@ public StatusResult terminate(TransferProcess transferProcess) { .orElse(StatusResult.success()); } + @Override + public Set transferTypesFor(Asset asset) { + return selectorClient.getAll().stream() + .filter(it -> it.getAllowedSourceTypes().contains(asset.getDataAddress().getType())) + .map(DataPlaneInstance::getAllowedDestTypes) + .flatMap(Collection::stream) + .map(it -> "%s-%s".formatted(it, PUSH)) + .collect(toSet()); + } + } diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java index 351cb40e4e9..3d592e6e300 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowControllerTest.java @@ -23,6 +23,7 @@ import org.eclipse.edc.spi.result.Failure; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.asset.Asset; import org.junit.jupiter.api.Test; import java.util.UUID; @@ -110,6 +111,15 @@ void terminate_shouldAlwaysReturnSuccess() { assertThat(result).isSucceeded(); } + @Test + void transferTypes_shouldReturnHttpPull() { + var asset = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("any").build()).build(); + + var transferTypes = flowController.transferTypesFor(asset); + + assertThat(transferTypes).hasSize(1).contains("Http-PULL"); + } + private TransferProcess transferProcess(String destinationType) { return TransferProcess.Builder.newInstance() .dataRequest(DataRequest.Builder.newInstance().destinationType(destinationType).build()) diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java index 3cba2c5bccc..a38f6c7a346 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowControllerTest.java @@ -24,7 +24,9 @@ import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.asset.Asset; import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -121,8 +123,27 @@ void terminate_shouldCallTerminate() { verify(dataPlaneClient).terminate("transferProcessId"); } + @Test + void transferTypes_shouldReturnTypesForSpecifiedAsset() { + when(selectorService.getAll()).thenReturn(List.of( + dataPlaneInstanceBuilder().allowedSourceType("TargetSrc").allowedDestType("TargetDest").build(), + dataPlaneInstanceBuilder().allowedSourceType("TargetSrc").allowedDestType("AnotherTargetDest").build(), + dataPlaneInstanceBuilder().allowedSourceType("AnotherSrc").allowedDestType("ThisWontBeListed").build() + )); + var asset = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build(); + + var transferTypes = flowController.transferTypesFor(asset); + + assertThat(transferTypes).containsExactly("TargetDest-PUSH", "AnotherTargetDest-PUSH"); + } + private DataPlaneInstance createDataPlaneInstance() { - return DataPlaneInstance.Builder.newInstance().url("http://any").build(); + return dataPlaneInstanceBuilder().build(); + } + + @NotNull + private static DataPlaneInstance.Builder dataPlaneInstanceBuilder() { + return DataPlaneInstance.Builder.newInstance().url("http://any"); } private DataAddress testDataAddress() { diff --git a/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/DistributionResolver.java b/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/DistributionResolver.java index be4076ed527..797e2727fba 100644 --- a/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/DistributionResolver.java +++ b/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/DistributionResolver.java @@ -14,7 +14,6 @@ package org.eclipse.edc.catalog.spi; -import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.asset.Asset; import java.util.List; @@ -25,9 +24,9 @@ public interface DistributionResolver { /** - * Return all the {@link Distribution}s for the given {@link Asset} and {@link DataAddress} + * Return all the {@link Distribution}s for the given {@link Asset}. * * @return a list of Distributions, always not null */ - List getDistributions(Asset asset, DataAddress dataAddress); + List getDistributions(Asset asset); } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java index 5407fc5bc31..1555f67fbbb 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowController.java @@ -15,14 +15,15 @@ package org.eclipse.edc.connector.transfer.spi.flow; import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; -import org.eclipse.edc.connector.transfer.spi.types.DataRequest; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.response.ResponseStatus; import org.eclipse.edc.spi.response.StatusResult; -import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.asset.Asset; import org.jetbrains.annotations.NotNull; +import java.util.Set; + /** * Handles a data flow. */ @@ -57,32 +58,9 @@ public interface DataFlowController { StatusResult terminate(TransferProcess transferProcess); /** - * Returns true if the manager can handle the data type. + * Returns transfer types that the controller can handle for the specified Asset. * - * @param dataRequest the request - * @param contentAddress the address to resolve the asset contents. This may be the original asset address or an address resolving to generated content. - * @deprecated please use {@link #canHandle(TransferProcess)} + * @return transfer type set. */ - @Deprecated(since = "0.2.1", forRemoval = true) - default boolean canHandle(DataRequest dataRequest, DataAddress contentAddress) { - return canHandle(TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(contentAddress).build()); - } - - /** - * Initiate a data flow. - * - *

Implementations should not throw exceptions. If an unexpected exception occurs and the flow should be re-attempted, set {@link ResponseStatus#ERROR_RETRY} in the - * response. If an exception occurs and re-tries should not be re-attempted, set {@link ResponseStatus#FATAL_ERROR} in the response.

- * - * @param dataRequest the request - * @param contentAddress the address to resolve the asset contents. This may be the original asset address or an address resolving to generated content. - * @param policy the contract agreement usage policy for the asset being transferred - * @deprecated please use {@link #initiateFlow(TransferProcess, Policy)} - */ - @NotNull - @Deprecated(since = "0.2.1", forRemoval = true) - default StatusResult initiateFlow(DataRequest dataRequest, DataAddress contentAddress, Policy policy) { - return initiateFlow(TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(contentAddress).build(), policy); - } - + Set transferTypesFor(Asset asset); } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java index deae9c487aa..6afc019dfde 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/DataFlowManager.java @@ -19,8 +19,11 @@ import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.asset.Asset; import org.jetbrains.annotations.NotNull; +import java.util.Set; + /** * Manages data flows and dispatches to {@link DataFlowController}s. * Priority is used to decide which controller should be chosen first, higher priority values will make the controller @@ -61,4 +64,11 @@ public interface DataFlowManager { @NotNull StatusResult terminate(TransferProcess transferProcess); + /** + * Returns the transfer types available for a specific asset. + * + * @param asset the asset. + * @return tranfer types list. + */ + Set transferTypesFor(Asset asset); } diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/FlowType.java b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/FlowType.java new file mode 100644 index 00000000000..b2b86c0dd9a --- /dev/null +++ b/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/FlowType.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.spi.flow; + +/** + * Data Flow types, generally they can be Push (provider pushing data to the consumer) and Pull (consumer pulling data + * from the provider) + */ +public enum FlowType { + PUSH, PULL +}