Skip to content

Commit

Permalink
start invokes dataplanemanager
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger committed Mar 1, 2024
1 parent 25baebf commit 54dd8e2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry;
import org.eclipse.edc.connector.dataplane.api.controller.v1.DataPlaneSignalingApiController;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.system.ServiceExtension;
Expand All @@ -40,6 +41,8 @@ public class DataPlaneSignalingApiExtension implements ServiceExtension {
private SignalingApiTransformerRegistry transformerRegistry;
@Inject
private DataPlaneAuthorizationService authorizationService;
@Inject
private DataPlaneManager dataPlaneManager;

@Override
public String name() {
Expand All @@ -48,6 +51,7 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
webService.registerResource(controlApiConfiguration.getContextAlias(), new DataPlaneSignalingApiController(transformerRegistry, authorizationService));
var controller = new DataPlaneSignalingApiController(transformerRegistry, authorizationService, dataPlaneManager, context.getMonitor().withPrefix("SignalingAPI"));
webService.registerResource(controlApiConfiguration.getContextAlias(), controller);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.json.JsonObject;
import jakarta.ws.rs.container.AsyncResponse;
import org.eclipse.edc.connector.dataplane.api.model.DataFlowState;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage;
Expand Down Expand Up @@ -51,7 +50,7 @@ public interface DataPlaneSignalingApi {
content = @Content(schema = @Schema(implementation = DataFlowResponseMessageSchema.class))),
}
)
void start(JsonObject dataFlowStartMessage, AsyncResponse response);
JsonObject start(JsonObject dataFlowStartMessage);

@Operation(description = "Get the current state of a data transfer.",
responses = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.web.spi.exception.InvalidRequestException;

Expand All @@ -38,47 +38,61 @@ public class DataPlaneSignalingApiController implements DataPlaneSignalingApi {

private final TypeTransformerRegistry typeTransformerRegistry;
private final DataPlaneAuthorizationService dataPlaneAuthorizationService;
private final DataPlaneManager dataPlaneManager;
private final Monitor monitor;

public DataPlaneSignalingApiController(TypeTransformerRegistry typeTransformerRegistry, DataPlaneAuthorizationService dataPlaneAuthorizationService, Monitor monitor) {
public DataPlaneSignalingApiController(TypeTransformerRegistry typeTransformerRegistry, DataPlaneAuthorizationService dataPlaneAuthorizationService, DataPlaneManager dataPlaneManager, Monitor monitor) {
this.typeTransformerRegistry = typeTransformerRegistry;
this.dataPlaneAuthorizationService = dataPlaneAuthorizationService;
this.dataPlaneManager = dataPlaneManager;
this.monitor = monitor;
}

@POST
@Override
public void start(JsonObject dataFlowStartMessage, @Suspended AsyncResponse response) {
public JsonObject start(JsonObject dataFlowStartMessage) {
var startMsg = typeTransformerRegistry.transform(dataFlowStartMessage, DataFlowStartMessage.class)
.onFailure(f -> monitor.warning("Error transforming %s: %s".formatted(DataFlowStartMessage.class, f.getFailureDetail())))
.orElseThrow(InvalidRequestException::new);

var dataAddress = dataPlaneAuthorizationService.createEndpointDataReference(startMsg);
if (dataAddress.failed()) {
monitor.warning("Error obtaining EDR DataAddress: " + dataAddress.getFailureDetail());
response.resume(new InvalidRequestException(dataAddress.getFailure()));
}
dataPlaneManager.validate(startMsg)
.onFailure(f -> monitor.warning("Failed to validate request: " + f.getFailureDetail()))
.orElseThrow(f -> f.getMessages().isEmpty() ?
new InvalidRequestException("Failed to validate request: %s".formatted(startMsg.getId())) :
new InvalidRequestException(f.getMessages()));

var result = typeTransformerRegistry.transform(dataAddress.getContent(), JsonObject.class);
result
.onFailure(f -> {
monitor.warning("Error obtaining EDR DataAddress: " + result.getFailureDetail());
response.resume(new EdcException(f.getFailureDetail()));
})
.onSuccess(response::resume);
monitor.debug("Create EDR");
var dataAddress = dataPlaneAuthorizationService.createEndpointDataReference(startMsg)
.onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: " + f.getFailureDetail()))
.orElseThrow(InvalidRequestException::new);

dataPlaneManager.initiate(startMsg);

return typeTransformerRegistry.transform(dataAddress, JsonObject.class)
.onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: " + f.getFailureDetail()))
.orElseThrow(f -> new EdcException(f.getFailureDetail()));
}

@GET
@Path("/{id}/state")
@Override
public JsonObject getTransferState(@PathParam("id") String transferProcessId) {
throw new UnsupportedOperationException();
var state = dataPlaneManager.transferState(transferProcessId);

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'DataFlowStates state' is never read.
return null;
}

@POST
@Path("/{id}/terminate")
@Override
public void terminate(@PathParam("id") String transferProcessId, JsonObject terminationMessage) {
throw new UnsupportedOperationException();
public void terminate(@PathParam("id") String dataFlowId, JsonObject terminationMessage) {

var msg = typeTransformerRegistry.transform(terminationMessage, DataFlowTerminateMessage.class)
.onFailure(f -> monitor.warning("Error transforming %s: %s".formatted(DataFlowTerminateMessage.class, f.getFailureDetail())))
.orElseThrow(InvalidRequestException::new);

Check notice

Code scanning / CodeQL

Unread local variable Note

Variable 'DataFlowTerminateMessage msg' is never read.

// todo: add msg.reason() to the method signature:
dataPlaneManager.terminate(dataFlowId)
.orElseThrow(InvalidRequestException::new);
}

@POST
Expand Down

0 comments on commit 54dd8e2

Please sign in to comment.