Skip to content

Commit

Permalink
refactor: inline DataRequest value object into TransferProcess (#…
Browse files Browse the repository at this point in the history
…4009)

* refactor: inline data request

* simplify sql schema
  • Loading branch information
ndr-brt authored Mar 20, 2024
1 parent 3cde990 commit 169eecc
Show file tree
Hide file tree
Showing 39 changed files with 348 additions and 823 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable;
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessStartedData;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage;
Expand Down Expand Up @@ -154,22 +153,19 @@ public ServiceResult<TransferProcess> findById(String id, TokenRepresentation to
private ServiceResult<TransferProcess> requestedAction(TransferRequestMessage message, String assetId) {
var destination = message.getDataDestination() != null
? message.getDataDestination() : DataAddress.Builder.newInstance().type(HTTP_PROXY).build();
var dataRequest = DataRequest.Builder.newInstance()
.id(message.getConsumerPid())
.protocol(message.getProtocol())
.connectorAddress(message.getCallbackAddress())
.dataDestination(destination)
.assetId(assetId)
.contractId(message.getContractId())
.build();

var existingTransferProcess = transferProcessStore.findForCorrelationId(dataRequest.getId());
var existingTransferProcess = transferProcessStore.findForCorrelationId(message.getConsumerPid());
if (existingTransferProcess != null) {
return ServiceResult.success(existingTransferProcess);
}
var process = TransferProcess.Builder.newInstance()
.id(randomUUID().toString())
.dataRequest(dataRequest)
.protocol(message.getProtocol())
.correlationId(message.getConsumerPid())
.counterPartyAddress(message.getCallbackAddress())
.dataDestination(destination)
.assetId(assetId)
.contractId(message.getContractId())
.transferType(message.getTransferType())
.type(PROVIDER)
.clock(clock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessListener;
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessStartedData;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferCompletionMessage;
Expand Down Expand Up @@ -139,7 +138,7 @@ void notifyRequested_validAgreement_shouldInitiateTransfer() {

assertThat(result).isSucceeded().satisfies(tp -> {
assertThat(tp.getCorrelationId()).isEqualTo("consumerPid");
assertThat(tp.getConnectorAddress()).isEqualTo("http://any");
assertThat(tp.getCounterPartyAddress()).isEqualTo("http://any");
assertThat(tp.getAssetId()).isEqualTo("assetId");
});
verify(listener).preCreated(any());
Expand Down Expand Up @@ -241,7 +240,7 @@ void notifyRequested_missingDestination_shouldInitiateTransfer() {

assertThat(result).isSucceeded().satisfies(transferProcess -> {
assertThat(transferProcess.getCorrelationId()).isEqualTo("consumerPid");
assertThat(transferProcess.getConnectorAddress()).isEqualTo("http://any");
assertThat(transferProcess.getCounterPartyAddress()).isEqualTo("http://any");
assertThat(transferProcess.getAssetId()).isEqualTo("assetId");
assertThat(transferProcess.getDataDestination().getType()).isEqualTo(HTTP_PROXY);
});
Expand Down Expand Up @@ -782,7 +781,8 @@ private TransferProcess transferProcess(TransferProcessStates state, String id)

private TransferProcess.Builder transferProcessBuilder() {
return TransferProcess.Builder.newInstance()
.dataRequest(dataRequest());
.contractId("contractId")
.dataDestination(DataAddress.Builder.newInstance().type("type").build());
}

private ParticipantAgent participantAgent() {
Expand All @@ -805,13 +805,6 @@ private ContractAgreement contractAgreement() {
.build();
}

private DataRequest dataRequest() {
return DataRequest.Builder.newInstance()
.contractId("contractId")
.destinationType("type")
.build();
}

@FunctionalInterface
private interface MethodCall<M extends RemoteMessage> {
ServiceResult<?> call(TransferProcessProtocolService service, M message, TokenRepresentation token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.connector.transfer.spi.types.TransferRequest;
Expand Down Expand Up @@ -262,7 +261,7 @@ private TransferProcess transferProcess(TransferProcessStates state, String id)
return TransferProcess.Builder.newInstance()
.state(state.code())
.id(id)
.dataRequest(DataRequest.Builder.newInstance().dataDestination(DataAddress.Builder.newInstance().type("any").build()).build())
.dataDestination(DataAddress.Builder.newInstance().type("any").build())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public InMemoryTransferProcessStore(String leaserId, Clock clock, CriterionOpera

@Override
public @Nullable TransferProcess findForCorrelationId(String correlationId) {
var querySpec = QuerySpec.Builder.newInstance().filter(criterion("dataRequest.id", "=", correlationId)).build();
var querySpec = QuerySpec.Builder.newInstance().filter(criterion("correlationId", "=", correlationId)).build();

return super.findAll(querySpec).findFirst().orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator;
import org.eclipse.edc.connector.transfer.spi.retry.TransferWaitStrategy;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionedContentResource;
import org.eclipse.edc.runtime.metamodel.annotation.CoreExtension;
Expand Down Expand Up @@ -219,7 +218,6 @@ private EntityRetryProcessConfiguration getEntityRetryProcessConfiguration(Servi
}

private void registerTypes(TypeManager typeManager) {
typeManager.registerTypes(DataRequest.class);
typeManager.registerTypes(ProvisionedContentResource.class);
typeManager.registerTypes(DeprovisionedResource.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.ResourceManifest;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
Expand Down Expand Up @@ -133,20 +132,15 @@ public StatusResult<TransferProcess> initiateConsumerRequest(TransferRequest tra
if (existingTransferProcess != null) {
return StatusResult.success(existingTransferProcess);
}
var dataRequest = DataRequest.Builder.newInstance()

var process = TransferProcess.Builder.newInstance()
.id(id)
.assetId(transferRequest.getAssetId())
.dataDestination(transferRequest.getDataDestination())
.connectorAddress(transferRequest.getCounterPartyAddress())
.counterPartyAddress(transferRequest.getCounterPartyAddress())
.contractId(transferRequest.getContractId())
.destinationType(transferRequest.getDataDestination().getType())
.protocol(transferRequest.getProtocol())
.dataDestination(transferRequest.getDataDestination())
.build();

var process = TransferProcess.Builder.newInstance()
.id(id)
.dataRequest(dataRequest)
.type(CONSUMER)
.clock(clock)
.transferType(transferRequest.getTransferType())
Expand Down Expand Up @@ -297,7 +291,7 @@ private boolean processRequesting(TransferProcess process) {
.onRetryExhausted(this::transitionToTerminated)
.onFailure((t, throwable) -> transitionToRequesting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.execute("send transfer request to " + process.getConnectorAddress());
.execute("send transfer request to " + process.getCounterPartyAddress());
}

/**
Expand Down Expand Up @@ -338,7 +332,7 @@ private boolean processCompleting(TransferProcess process) {
.onFailure((t, throwable) -> transitionToCompleting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.execute("send transfer completion to " + process.getConnectorAddress());
.execute("send transfer completion to " + process.getCounterPartyAddress());
}

/**
Expand Down Expand Up @@ -412,7 +406,7 @@ private void sendTransferStartMessage(TransferProcess process, DataFlowResponse
.onFailure((t, throwable) -> transitionToStarting(t))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.execute("send transfer start to " + process.getConnectorAddress());
.execute("send transfer start to " + process.getCounterPartyAddress());
}

@NotNull
Expand Down Expand Up @@ -442,7 +436,7 @@ private boolean sendTransferSuspensionMessage(TransferProcess process) {
.onFailure((t, throwable) -> transitionToSuspending(t, throwable.getMessage()))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.execute("send transfer suspension to " + process.getConnectorAddress());
.execute("send transfer suspension to " + process.getCounterPartyAddress());
}

private boolean sendTransferTerminationMessage(TransferProcess process) {
Expand All @@ -459,15 +453,15 @@ private boolean sendTransferTerminationMessage(TransferProcess process) {
.onFailure((t, throwable) -> transitionToTerminating(t, throwable.getMessage(), throwable))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted(this::transitionToTerminated)
.execute("send transfer termination to " + process.getConnectorAddress());
.execute("send transfer termination to " + process.getCounterPartyAddress());
}

private <T, M extends TransferRemoteMessage, B extends TransferRemoteMessage.Builder<M, B>> AsyncStatusResultRetryProcess<TransferProcess, T, ?>
dispatch(B messageBuilder, TransferProcess process, Policy policy, Class<T> responseType) {

messageBuilder.protocol(process.getProtocol())
.counterPartyAddress(process.getConnectorAddress())
.processId(process.getCorrelationId())
.counterPartyAddress(process.getCounterPartyAddress())
.processId(Optional.ofNullable(process.getCorrelationId()).orElse(process.getId()))
.policy(policy);

if (process.lastSentProtocolMessage() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.EdcException;
Expand Down Expand Up @@ -46,10 +45,11 @@ class Initiate {
@Test
void shouldInitiateFlowOnCorrectController() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var policy = Policy.Builder.newInstance().build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();
var transferProcess = TransferProcess.Builder.newInstance()
.dataDestination(DataAddress.Builder.newInstance().type("test-dest-type").build())
.contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(true);
when(controller.start(any(), any())).thenReturn(StatusResult.success(DataFlowResponse.Builder.newInstance().build()));
Expand All @@ -63,10 +63,10 @@ void shouldInitiateFlowOnCorrectController() {
@Test
void shouldReturnFatalError_whenNoControllerCanHandleTheRequest() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var policy = Policy.Builder.newInstance().build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();
var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(false);
manager.register(controller);
Expand All @@ -80,10 +80,10 @@ void shouldReturnFatalError_whenNoControllerCanHandleTheRequest() {
@Test
void shouldCatchExceptionsAndReturnFatalError() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var policy = Policy.Builder.newInstance().build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();
var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build();

var errorMsg = "Test Error Message";
when(controller.canHandle(any())).thenReturn(true);
Expand Down Expand Up @@ -123,9 +123,9 @@ class Suspend {
@Test
void shouldChooseControllerAndSuspend() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();
var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(true);
when(controller.suspend(any())).thenReturn(StatusResult.success());
Expand All @@ -143,9 +143,9 @@ class Terminate {
@Test
void shouldChooseControllerAndTerminate() {
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("test-dest-type").build();
var dataDestination = DataAddress.Builder.newInstance().type("test-dest-type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test-type").build();
var transferProcess = TransferProcess.Builder.newInstance().dataRequest(dataRequest).contentDataAddress(dataAddress).build();
var transferProcess = TransferProcess.Builder.newInstance().dataDestination(dataDestination).contentDataAddress(dataAddress).build();

when(controller.canHandle(any())).thenReturn(true);
when(controller.terminate(any())).thenReturn(StatusResult.success());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.eclipse.edc.connector.transfer.spi.provision.ResourceManifestGenerator;
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionedResourceSet;
import org.eclipse.edc.connector.transfer.spi.types.ResourceManifest;
Expand All @@ -45,6 +44,7 @@
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -173,18 +173,13 @@ private ProvisionedResourceSet provisionedResourceSet() {
}

private TransferProcess.Builder transferProcessBuilder() {
var processId = UUID.randomUUID().toString();
var dataRequest = DataRequest.Builder.newInstance()
.id(processId)
.destinationType("test-type")
.contractId(UUID.randomUUID().toString())
.build();

return TransferProcess.Builder.newInstance()
.provisionedResourceSet(ProvisionedResourceSet.Builder.newInstance().build())
.type(CONSUMER)
.id("test-process-" + processId)
.dataRequest(dataRequest);
.id("test-process-" + UUID.randomUUID())
.correlationId(UUID.randomUUID().toString())
.dataDestination(DataAddress.Builder.newInstance().type("test-type").build())
.contractId(UUID.randomUUID().toString());
}

@Nested
Expand Down
Loading

0 comments on commit 169eecc

Please sign in to comment.