Skip to content

Commit

Permalink
feat: DPS API controller (#3955)
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger authored Mar 4, 2024
1 parent 77636db commit 818e1e0
Show file tree
Hide file tree
Showing 24 changed files with 669 additions and 147 deletions.
1 change: 0 additions & 1 deletion .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: eclipse-edc/.github/.github/actions/setup-build@main

- name: End to End Integration Tests
uses: eclipse-edc/.github/.github/actions/run-tests@main
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.dataplane.spi.DataFlow;
import org.eclipse.edc.connector.dataplane.spi.DataFlowStates;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.spi.entity.StatefulEntity;
Expand All @@ -29,6 +30,7 @@
import org.eclipse.edc.statemachine.Processor;
import org.eclipse.edc.statemachine.ProcessorImpl;
import org.eclipse.edc.statemachine.StateMachineManager;
import org.jetbrains.annotations.Nullable;

import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -79,32 +81,36 @@ public void initiate(DataFlowStartMessage dataRequest) {
}

@Override
public DataFlowStates transferState(String processId) {
public DataFlowStates getTransferState(String processId) {
return Optional.ofNullable(store.findById(processId)).map(StatefulEntity::getState)
.map(DataFlowStates::from).orElse(null);
}

@Override
public StatusResult<Void> terminate(String dataFlowId) {
public StatusResult<Void> terminate(String dataFlowId, @Nullable String reason) {
var result = store.findByIdAndLease(dataFlowId);
if (result.succeeded()) {
var dataFlow = result.getContent();
var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest());
if (result.failed()) {
return StatusResult.from(result).map(it -> null);
}

if (transferService == null) {
return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId));
}
var dataFlow = result.getContent();
var transferService = transferServiceRegistry.resolveTransferService(dataFlow.toRequest());

var terminateResult = transferService.terminate(dataFlow);
if (terminateResult.failed()) {
if (transferService == null) {
return StatusResult.failure(FATAL_ERROR, "TransferService cannot be resolved for DataFlow %s".formatted(dataFlowId));
}

var terminateResult = transferService.terminate(dataFlow);
if (terminateResult.failed()) {
if (terminateResult.reason().equals(StreamFailure.Reason.NOT_FOUND)) {
monitor.warning("No source was found for DataFlow '%s'. This may indicate an inconsistent state.".formatted(dataFlowId));
} else {
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail()));
}
dataFlow.transitToTerminated();
store.save(dataFlow);
return StatusResult.success();
} else {
return StatusResult.from(result).map(it -> null);
}
dataFlow.transitToTerminated(reason);
store.save(dataFlow);
return StatusResult.success();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;


Expand All @@ -29,7 +27,7 @@ class PublicEndpointGeneratorServiceImplTest {

@Test
void generateFor() {
var endpoint = new Endpoint(Map.of("fizz", "buzz"), "bar-type");
var endpoint = new Endpoint("fizz", "bar-type");
generatorService.addGeneratorFunction("testtype", dataAddress -> endpoint);

assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build())).isSucceeded()
Expand All @@ -43,5 +41,5 @@ void generateFor_noFunction() {
.detail()
.isEqualTo("No Endpoint generator function registered for source data type 'testtype'");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class DataPlaneAuthorizationServiceImplTest {

@BeforeEach
void setup() {
when(endpointGenerator.generateFor(any())).thenReturn(Result.success(new Endpoint(Map.of("url", "http://example.com"), "https://w3id.org/idsa/v4.1/HTTP")));
when(endpointGenerator.generateFor(any())).thenReturn(Result.success(Endpoint.url("http://example.com")));
}

@Test
Expand All @@ -70,7 +70,7 @@ void createEndpointDataReference() {
assertThat(result).isSucceeded()
.satisfies(da -> {
assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP");
assertThat(da.getProperties().get("endpoint")).isEqualTo(Map.of("url", "http://example.com"));
assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com");
assertThat(da.getProperties().get("endpointType")).isEqualTo(da.getType());
assertThat(da.getStringProperty("authorization")).isEqualTo("footoken");
});
Expand Down Expand Up @@ -98,7 +98,7 @@ void createEndpointDataReference_withAuthType() {
assertThat(result).isSucceeded()
.satisfies(da -> {
assertThat(da.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP");
assertThat(da.getProperties().get("endpoint")).isEqualTo(Map.of("url", "http://example.com"));
assertThat(da.getProperties().get("endpoint")).isEqualTo("http://example.com");
assertThat(da.getStringProperty("authorization")).isEqualTo("footoken");
assertThat(da.getStringProperty("authType")).isEqualTo("bearer");
assertThat(da.getStringProperty("fizz")).isEqualTo("buzz");
Expand Down Expand Up @@ -165,4 +165,4 @@ private DataFlowStartMessage.Builder createStartMessage() {
.destinationDataAddress(DataAddress.Builder.newInstance().type("test-dest").build())
.properties(Map.of("foo", "bar", "fizz", "buzz"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.dataplane.spi.DataFlow.TERMINATION_REASON;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.COMPLETED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.FAILED;
import static org.eclipse.edc.connector.dataplane.spi.DataFlowStates.NOTIFIED;
Expand Down Expand Up @@ -124,6 +125,20 @@ void terminate_shouldTerminateDataFlow() {
verify(transferService).terminate(dataFlow);
}

@Test
void terminate_shouldTerminateDataFlow_withReason() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.terminate(any())).thenReturn(StreamResult.success());

var result = manager.terminate("dataFlowId", "test-reason");

assertThat(result).isSucceeded();
verify(store).save(argThat(d -> d.getState() == TERMINATED.code() && d.getProperties().get(TERMINATION_REASON).equals("test-reason")));
verify(transferService).terminate(dataFlow);
}

@Test
void terminate_shouldReturnFatalError_whenDataFlowDoesNotExist() {
when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.notFound("not found"));
Expand Down Expand Up @@ -172,6 +187,19 @@ void terminate_shouldReturnFatalError_whenDataFlowCannotBeTerminated() {
verify(store, never()).save(any());
}

@Test
void terminate_shouldStillTerminate_whenDataFlowHasNoSource() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow));
when(registry.resolveTransferService(any())).thenReturn(transferService);
when(transferService.terminate(any())).thenReturn(StreamResult.notFound());

var result = manager.terminate("dataFlowId", "test-reason");

assertThat(result).isSucceeded();
verify(store).save(argThat(f -> f.getProperties().containsKey(TERMINATION_REASON)));
}

@Test
void received_shouldStartTransferTransitionAndTransitionToStarted() {
var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build();
Expand Down
2 changes: 2 additions & 0 deletions extensions/common/crypto/crypto-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ dependencies {
implementation(libs.bouncyCastle.bcprovJdk18on)
testImplementation(testFixtures(project(":core:common:junit")))
testImplementation(testFixtures(project(":spi:common:identity-trust-spi")))
testRuntimeOnly(libs.tink)

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import static java.lang.String.format;

@Path("/transfer")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
@Consumes({ MediaType.APPLICATION_JSON })
@Produces({ MediaType.APPLICATION_JSON })
public class DataPlaneControlApiController implements DataPlaneControlApi {
private final DataPlaneManager dataPlaneManager;

Expand Down Expand Up @@ -65,7 +65,7 @@ public void initiateTransfer(DataFlowStartMessage request, @Suspended AsyncRespo
@Override
@Path("/{transferProcessId}")
public DataFlowStates getTransferState(@PathParam("transferProcessId") String transferProcessId) {
return dataPlaneManager.transferState(transferProcessId);
return dataPlaneManager.getTransferState(transferProcessId);
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

package org.eclipse.edc.connector.api.signaling.configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.json.Json;
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry;
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistryImpl;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataAddressTransformer;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowResponseMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataAddressTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowStartMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowSuspendMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowTerminateMessageTransformer;
import org.eclipse.edc.jsonld.spi.JsonLd;
Expand All @@ -36,6 +39,7 @@
import org.eclipse.edc.web.spi.WebService;
import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer;
import org.eclipse.edc.web.spi.configuration.WebServiceSettings;
import org.jetbrains.annotations.NotNull;

import java.util.Map;

Expand Down Expand Up @@ -87,7 +91,7 @@ public void initialize(ServiceExtensionContext context) {
context.registerService(SignalingApiConfiguration.class, new SignalingApiConfiguration(webServiceConfiguration));

jsonLd.registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, SIGNALING_SCOPE);
var jsonLdMapper = typeManager.getMapper(JSON_LD);
var jsonLdMapper = getJsonLdMapper();
webService.registerResource(webServiceConfiguration.getContextAlias(), new ObjectMapperProvider(jsonLdMapper));
webService.registerResource(webServiceConfiguration.getContextAlias(), new JerseyJsonLdInterceptor(jsonLd, jsonLdMapper, SIGNALING_SCOPE));
}
Expand All @@ -97,10 +101,17 @@ public SignalingApiTransformerRegistry managementApiTypeTransformerRegistry() {
var factory = Json.createBuilderFactory(Map.of());

var registry = new SignalingApiTransformerRegistryImpl(this.transformerRegistry);
registry.register(new JsonObjectToDataFlowStartMessageTransformer());
registry.register(new JsonObjectToDataFlowSuspendMessageTransformer());
registry.register(new JsonObjectToDataFlowTerminateMessageTransformer());
registry.register(new JsonObjectToDataAddressTransformer());
registry.register(new JsonObjectFromDataFlowResponseMessageTransformer(factory));
registry.register(new JsonObjectFromDataAddressTransformer(factory, getJsonLdMapper()));
return registry;
}

@NotNull
private ObjectMapper getJsonLdMapper() {
return typeManager.getMapper(JSON_LD);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ dependencies {
api(project(":spi:data-plane:data-plane-spi"))

implementation(project(":extensions:common:api:control-api-configuration"))
implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform"))
implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api-configuration"))
implementation(libs.jakarta.rsApi)
testImplementation(libs.restAssured)
testImplementation(testFixtures(project(":extensions:common:http:jersey-core")))

}
edcBuild {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
package org.eclipse.edc.connector.dataplane.api;

import org.eclipse.edc.connector.api.signaling.configuration.SignalingApiConfiguration;
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry;
import org.eclipse.edc.connector.dataplane.api.controller.v1.DataPlaneSignalingApiController;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.ServiceExtension;
Expand All @@ -34,13 +37,21 @@ public class DataPlaneSignalingApiExtension implements ServiceExtension {
@Inject
private SignalingApiConfiguration controlApiConfiguration;

@Inject
private SignalingApiTransformerRegistry transformerRegistry;
@Inject
private DataPlaneAuthorizationService authorizationService;
@Inject
private DataPlaneManager dataPlaneManager;

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
webService.registerResource(controlApiConfiguration.getContextAlias(), new DataPlaneSignalingApiController());
var controller = new DataPlaneSignalingApiController(transformerRegistry, authorizationService, dataPlaneManager, context.getMonitor().withPrefix("SignalingAPI"));
webService.registerResource(controlApiConfiguration.getContextAlias(), controller);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.json.JsonObject;
import jakarta.ws.rs.container.AsyncResponse;
import org.eclipse.edc.connector.dataplane.api.model.DataFlowState;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage;
Expand Down Expand Up @@ -51,7 +50,7 @@ public interface DataPlaneSignalingApi {
content = @Content(schema = @Schema(implementation = DataFlowResponseMessageSchema.class))),
}
)
JsonObject start(JsonObject dataFlowStartMessage, AsyncResponse response);
JsonObject start(JsonObject dataFlowStartMessage);

@Operation(description = "Get the current state of a data transfer.",
responses = {
Expand Down
Loading

0 comments on commit 818e1e0

Please sign in to comment.