Skip to content

Commit

Permalink
refactor: create distributions from DataFlowManager transfer types (#…
Browse files Browse the repository at this point in the history
…3669)

refactor: create distributions from DataFlowManager transfer types
  • Loading branch information
ndr-brt authored Dec 1, 2023
1 parent 96734fc commit 0266575
Show file tree
Hide file tree
Showing 17 changed files with 241 additions and 141 deletions.
2 changes: 1 addition & 1 deletion core/control-plane/catalog-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ plugins {
dependencies {
api(project(":spi:common:catalog-spi"))
api(project(":spi:control-plane:contract-spi"))
api(project(":spi:data-plane-selector:data-plane-selector-spi"))
api(project(":spi:control-plane:transfer-spi"))

testImplementation(project(":core:common:junit"))
testImplementation(project(":core:control-plane:control-plane-core"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import org.eclipse.edc.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.catalog.spi.DistributionResolver;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
Expand All @@ -29,7 +29,7 @@ public class CatalogDefaultServicesExtension implements ServiceExtension {
public static final String NAME = "Catalog Default Services";

@Inject
private DataPlaneInstanceStore dataPlaneInstanceStore;
private DataFlowManager dataFlowManager;

private DataServiceRegistry dataServiceRegistry;

Expand All @@ -50,6 +50,6 @@ public DataServiceRegistry dataServiceRegistry() {

@Provider(isDefault = true)
public DistributionResolver distributionResolver() {
return new DefaultDistributionResolver(dataServiceRegistry, dataPlaneInstanceStore);
return new DefaultDistributionResolver(dataServiceRegistry, dataFlowManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Dataset getById(ParticipantAgent agent, String id) {

private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset) {

var distributions = distributionResolver.getDistributions(asset, null); // TODO: data addresses should be retrieved
var distributions = distributionResolver.getDistributions(asset);
var datasetBuilder = Dataset.Builder.newInstance()
.id(asset.getId())
.distributions(distributions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,24 @@
import org.eclipse.edc.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.catalog.spi.Distribution;
import org.eclipse.edc.catalog.spi.DistributionResolver;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.spi.types.domain.asset.Asset;

import java.util.List;
import java.util.stream.Collectors;

public class DefaultDistributionResolver implements DistributionResolver {

private final DataServiceRegistry dataServiceRegistry;
private final DataPlaneInstanceStore dataPlaneInstanceStore;
private final DataFlowManager dataFlowManager;

public DefaultDistributionResolver(DataServiceRegistry dataServiceRegistry, DataPlaneInstanceStore dataPlaneInstanceStore) {
public DefaultDistributionResolver(DataServiceRegistry dataServiceRegistry, DataFlowManager dataFlowManager) {
this.dataServiceRegistry = dataServiceRegistry;
this.dataPlaneInstanceStore = dataPlaneInstanceStore;
this.dataFlowManager = dataFlowManager;
}

@Override
public List<Distribution> getDistributions(Asset asset, DataAddress dataAddress) {
return dataPlaneInstanceStore.getAll()
.flatMap(it -> it.getAllowedDestTypes().stream())
.map(this::createDistribution)
.collect(Collectors.toList());
public List<Distribution> getDistributions(Asset asset) {
return dataFlowManager.transferTypesFor(asset).stream().map(this::createDistribution).toList();
}

private Distribution createDistribution(String format) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void query_shouldReturnOneDatasetPerAsset() {
when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition));
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(Stream.of(createAsset("assetId").property("key", "value").build()));
when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build());
when(distributionResolver.getDistributions(isA(Asset.class), any())).thenReturn(List.of(distribution));
when(distributionResolver.getDistributions(isA(Asset.class))).thenReturn(List.of(distribution));

var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,36 @@

import org.eclipse.edc.catalog.spi.DataService;
import org.eclipse.edc.catalog.spi.DataServiceRegistry;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.selector.spi.store.DataPlaneInstanceStore;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.stream.Stream;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class DefaultDistributionResolverTest {

private final DataService dataService = DataService.Builder.newInstance().build();
private DataServiceRegistry dataServiceRegistry = mock(DataServiceRegistry.class);
private final DataPlaneInstanceStore dataPlaneInstanceStore = mock(DataPlaneInstanceStore.class);
private final DataServiceRegistry dataServiceRegistry = mock();
private final DataFlowManager dataFlowManager = mock();

private final DefaultDistributionResolver resolver = new DefaultDistributionResolver(dataServiceRegistry, dataPlaneInstanceStore);
private final DefaultDistributionResolver resolver = new DefaultDistributionResolver(dataServiceRegistry, dataFlowManager);

@Test
void shouldReturnDistributionForEverySupportedDestType() {
void shouldReturnDistributionsForEveryTransferType() {
when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService));

var dataPlane1 = DataPlaneInstance.Builder.newInstance().url("http://data-plane-one").allowedDestType("type1").build();
var dataPlane2 = DataPlaneInstance.Builder.newInstance().url("http://data-plane-two").allowedDestType("type2").build();
when(dataPlaneInstanceStore.getAll()).thenReturn(Stream.of(dataPlane1, dataPlane2));
var asset = Asset.Builder.newInstance().build();
when(dataFlowManager.transferTypesFor(any())).thenReturn(Set.of("type1", "type2"));

var dataAddress = DataAddress.Builder.newInstance().type("any").build();
var asset = Asset.Builder.newInstance().dataAddress(dataAddress).build();

var distributions = resolver.getDistributions(asset, dataAddress);
var distributions = resolver.getDistributions(asset);

assertThat(distributions).hasSize(2)
.anySatisfy(distribution -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
@ExtendWith(EdcExtension.class)
class ContractNegotiationEventDispatchTest {
private static final String CONSUMER = "consumer";
private static final String PROVIDER = "provider";

private final EventSubscriber eventSubscriber = mock(EventSubscriber.class);
private final IdentityService identityService = mock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.jetbrains.annotations.NotNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.function.Function;

import static java.lang.String.format;
import static java.util.stream.Collectors.toSet;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;

/**
Expand Down Expand Up @@ -63,6 +67,15 @@ public void register(int priority, DataFlowController controller) {
return chooseControllerAndApply(transferProcess, controller -> controller.terminate(transferProcess));
}

@Override
public Set<String> transferTypesFor(Asset asset) {
return controllers.stream()
.map(it -> it.controller)
.map(it -> it.transferTypesFor(asset))
.flatMap(Collection::stream)
.collect(toSet());
}

@NotNull
private <T> StatusResult<T> chooseControllerAndApply(TransferProcess transferProcess, Function<DataFlowController, StatusResult<T>> function) {
return controllers.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;
Expand All @@ -37,94 +41,120 @@ class DataFlowManagerImplTest {

private final DataFlowManagerImpl manager = new DataFlowManagerImpl();

@Test
void initiate_shouldInitiateFlowOnCorrectController() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var policy = Policy.Builder.newInstance().build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(true);
when(controller.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
manager.register(controller);

var response = manager.initiate(transferProcess, policy);

assertThat(response.succeeded()).isTrue();
}

@Test
void initiate_shouldReturnFatalError_whenNoControllerCanHandleTheRequest() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var policy = Policy.Builder.newInstance().build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(false);
manager.register(controller);

var response = manager.initiate(transferProcess, policy);

assertThat(response.succeeded()).isFalse();
assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR);
@Nested
class Initiate {
@Test
void shouldInitiateFlowOnCorrectController() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var policy = Policy.Builder.newInstance().build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(true);
when(controller.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
manager.register(controller);

var response = manager.initiate(transferProcess, policy);

assertThat(response.succeeded()).isTrue();
}

@Test
void shouldReturnFatalError_whenNoControllerCanHandleTheRequest() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var policy = Policy.Builder.newInstance().build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(false);
manager.register(controller);

var response = manager.initiate(transferProcess, policy);

assertThat(response.succeeded()).isFalse();
assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR);
}

@Test
void shouldCatchExceptionsAndReturnFatalError() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var policy = Policy.Builder.newInstance().build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();

var errorMsg = "Test Error Message";
when(controller.canHandle(any())).thenReturn(true);
when(controller.initiateFlow(any(), any())).thenThrow(new EdcException(errorMsg));
manager.register(controller);

var response = manager.initiate(transferProcess, policy);

assertThat(response.succeeded()).isFalse();
assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR);
assertThat(response.getFailureMessages()).hasSize(1).first().matches(message -> message.contains(errorMsg));
}

@Test
void shouldChooseHighestPriorityController() {
var highPriority = createDataFlowController();
var lowPriority = createDataFlowController();
manager.register(1, lowPriority);
manager.register(2, highPriority);

manager.initiate(TransferProcess.Builder.newInstance().build(), Policy.Builder.newInstance().build());

verify(highPriority).initiateFlow(any(), any());
verifyNoInteractions(lowPriority);
}

private DataFlowController createDataFlowController() {
var dataFlowController = mock(DataFlowController.class);
when(dataFlowController.canHandle(any())).thenReturn(true);
when(dataFlowController.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
return dataFlowController;
}
}

@Test
void initiate_shouldCatchExceptionsAndReturnFatalError() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var policy = Policy.Builder.newInstance().build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();
@Nested
class Terminate {
@Test
void shouldChooseControllerAndTerminate() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();

var errorMsg = "Test Error Message";
when(controller.canHandle(any())).thenReturn(true);
when(controller.initiateFlow(any(), any())).thenThrow(new EdcException(errorMsg));
manager.register(controller);
when(controller.canHandle(any())).thenReturn(true);
when(controller.terminate(any())).thenReturn(StatusResult.success());
manager.register(controller);

var response = manager.initiate(transferProcess, policy);
var result = manager.terminate(transferProcess);

assertThat(response.succeeded()).isFalse();
assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR);
assertThat(response.getFailureMessages()).hasSize(1).first().matches(message -> message.contains(errorMsg));
assertThat(result).isSucceeded();
verify(controller).terminate(transferProcess);
}
}

@Test
void initiate_shouldChooseHighestPriorityController() {
var highPriority = createDataFlowController();
var lowPriority = createDataFlowController();
manager.register(1, lowPriority);
manager.register(2, highPriority);
@Nested
class TransferTypesFor {
@Test
void shouldReturnTransferTypesFromControllers() {
var controllerOne = mock(DataFlowController.class);
when(controllerOne.transferTypesFor(any())).thenReturn(Set.of("Type1"));
var controllerTwo = mock(DataFlowController.class);
when(controllerTwo.transferTypesFor(any())).thenReturn(Set.of("Type2", "Type3"));
manager.register(controllerOne);
manager.register(controllerTwo);
var asset = Asset.Builder.newInstance().build();

manager.initiate(TransferProcess.Builder.newInstance().build(), Policy.Builder.newInstance().build());

verify(highPriority).initiateFlow(any(), any());
verifyNoInteractions(lowPriority);
}

@Test
void terminate_shouldChooseControllerAndTerminate() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();
var result = manager.transferTypesFor(asset);

when(controller.canHandle(any())).thenReturn(true);
when(controller.terminate(any())).thenReturn(StatusResult.success());
manager.register(controller);

var result = manager.terminate(transferProcess);

assertThat(result).isSucceeded();
verify(controller).terminate(transferProcess);
assertThat(result).containsExactlyInAnyOrder("Type1", "Type2", "Type3");
}
}

private DataFlowController createDataFlowController() {
var dataFlowController = mock(DataFlowController.class);
when(dataFlowController.canHandle(any())).thenReturn(true);
when(dataFlowController.initiateFlow(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
return dataFlowController;
}
}
Loading

0 comments on commit 0266575

Please sign in to comment.