Skip to content

Commit

Permalink
feat: data plane client for signaling protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Mar 1, 2024
1 parent ab9338b commit 63a16e3
Show file tree
Hide file tree
Showing 9 changed files with 751 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/


plugins {
`java-library`
}

dependencies {
api(project(":spi:common:http-spi"))
api(project(":spi:common:core-spi"))
api(project(":spi:data-plane-selector:data-plane-selector-spi"))
implementation(project(":core:common:transform-core")) // for the transformer registry impl
implementation(project(":core:common:util"))
implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform"))

implementation(libs.opentelemetry.instrumentation.annotations)

testImplementation(project(":core:common:junit"))
testImplementation(project(":extensions:common:json-ld"))
testImplementation(libs.restAssured)
testImplementation(libs.mockserver.netty)
testImplementation(libs.mockserver.client)
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import jakarta.json.JsonObject;
import okhttp3.MediaType;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;

import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;

import static java.lang.String.format;
import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR;

/**
* Implementation of a {@link DataPlaneClient} that uses a remote {@link DataPlaneManager} accessible from a REST API using
* the data plane signaling protocol.
*/
public class DataPlaneSignalingClient implements DataPlaneClient {
public static final MediaType TYPE_JSON = MediaType.parse("application/json");
private final EdcHttpClient httpClient;
private final DataPlaneInstance dataPlane;
private final TypeTransformerRegistry transformerRegistry;
private final JsonLd jsonLd;

private final ObjectMapper mapper;

public DataPlaneSignalingClient(EdcHttpClient httpClient, TypeTransformerRegistry transformerRegistry, JsonLd jsonLd, ObjectMapper mapper, DataPlaneInstance dataPlane) {
this.httpClient = httpClient;
this.transformerRegistry = transformerRegistry;
this.jsonLd = jsonLd;
this.mapper = mapper;
this.dataPlane = dataPlane;
}

@WithSpan
@Override
public StatusResult<DataFlowResponseMessage> start(DataFlowStartMessage message) {
var requestBuilder = transformerRegistry.transform(message, JsonObject.class)
.compose(jsonLd::compact)
.map(body -> RequestBody.create(serialize(body), TYPE_JSON))
.map(body -> new Request.Builder().post(body).url(dataPlane.getUrl()).build());

if (requestBuilder.succeeded()) {
return sendRequest(requestBuilder.getContent(), message.getProcessId(), this::handleStartResponse);
} else {
return StatusResult.failure(FATAL_ERROR, requestBuilder.getFailureDetail());
}
}

@Override
public StatusResult<Void> terminate(String transferProcessId) {
var url = "%s/%s/terminate".formatted(dataPlane.getUrl(), transferProcessId);
var message = DataFlowTerminateMessage.Builder.newInstance().build();
var requestBuilder = transformerRegistry.transform(message, JsonObject.class)
.compose(jsonLd::compact)
.map(body -> RequestBody.create(serialize(body), TYPE_JSON))
.map(body -> new Request.Builder().post(body).url(url).build());

if (requestBuilder.succeeded()) {
return sendRequest(requestBuilder.getContent(), transferProcessId, (r) -> StatusResult.success());
} else {
return StatusResult.failure(FATAL_ERROR, requestBuilder.getFailureDetail());
}
}

private StatusResult<DataFlowResponseMessage> handleStartResponse(Response response) {
try (var body = response.body()) {
return Optional.ofNullable(body)
.map(this::deserializeStartMessage)
.orElseGet(() -> StatusResult.failure(FATAL_ERROR, "Body missing"));
}
}

private StatusResult<DataFlowResponseMessage> deserializeStartMessage(ResponseBody body) {
try {
var jsonObject = mapper.readValue(body.string(), JsonObject.class);
var result = jsonLd.expand(jsonObject)
.compose(expanded -> transformerRegistry.transform(expanded, DataFlowResponseMessage.class));
if (result.succeeded()) {
return StatusResult.success(result.getContent());
} else {
return StatusResult.failure(FATAL_ERROR, result.getFailureDetail());
}
} catch (IOException e) {
return StatusResult.failure(FATAL_ERROR, e.getMessage());

}
}

private String serialize(Object message) {
try {
return mapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
throw new EdcException(e);
}
}

private <T> StatusResult<T> sendRequest(Request request, String transferProcessId, Function<Response, StatusResult<T>> bodyMapper) {
try (var response = httpClient.execute(request)) {
return handleResponse(response, transferProcessId, bodyMapper);
} catch (IOException e) {
return StatusResult.failure(FATAL_ERROR, e.getMessage());
}
}

private <T> StatusResult<T> handleResponse(Response response, String requestId, Function<Response, StatusResult<T>> bodyMapper) {
if (response.isSuccessful()) {
return bodyMapper.apply(response);
} else {
return handleError(response, requestId);
}
}

private <T> StatusResult<T> handleError(Response response, String requestId) {
return StatusResult.failure(FATAL_ERROR, format("Transfer request failed with status code %s for request %s", response.code(), requestId));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.client;

import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.types.TypeManager;

import static org.eclipse.edc.spi.CoreConstants.JSON_LD;

/**
* This extension provides an implementation of {@link DataPlaneClient} compliant with the data plane signaling protocol
*/
@Extension(value = DataPlaneSignalingClientExtension.NAME)
public class DataPlaneSignalingClientExtension implements ServiceExtension {
public static final String NAME = "Data Plane Signaling Client";


@Inject(required = false)
private EdcHttpClient httpClient;

@Inject
private TypeManager typeManager;

@Inject
private SignalingApiTransformerRegistry transformerRegistry;

private JsonLd jsonLd;

@Override
public String name() {
return NAME;
}

@Provider
public DataPlaneClientFactory dataPlaneClientFactory() {
var mapper = typeManager.getMapper(JSON_LD);
return instance -> new DataPlaneSignalingClient(httpClient, transformerRegistry, jsonLd, mapper, instance);
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.client;

import jakarta.json.Json;
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry;
import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistryImpl;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowStartMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowSuspendMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowTerminateMessageTransformer;
import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowResponseMessageTransformer;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;

import java.util.Map;

import static org.eclipse.edc.spi.CoreConstants.JSON_LD;

/**
* This extension provides an implementation of {@link SignalingApiTransformerRegistry}
* with all transformers relevant for the data plane signaling protocol
*/
@Extension(value = DataPlaneSignalingClientTransformExtension.NAME)
public class DataPlaneSignalingClientTransformExtension implements ServiceExtension {

public static final String NAME = "Data Plane Signaling transform Client";

@Inject
private TypeTransformerRegistry transformerRegistry;

@Inject
private TypeManager typeManager;

@Override
public String name() {
return NAME;
}

@Provider
public SignalingApiTransformerRegistry signalingApiTransformerRegistry() {
var mapper = typeManager.getMapper(JSON_LD);
var factory = Json.createBuilderFactory(Map.of());

var registry = new SignalingApiTransformerRegistryImpl(this.transformerRegistry);
registry.register(new JsonObjectFromDataFlowStartMessageTransformer(factory, mapper));
registry.register(new JsonObjectFromDataFlowSuspendMessageTransformer(factory));
registry.register(new JsonObjectFromDataFlowTerminateMessageTransformer(factory));
registry.register(new JsonObjectToDataFlowResponseMessageTransformer());
return registry;
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
#
# Contributors:
# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
#
#

org.eclipse.edc.connector.dataplane.client.DataPlaneSignalingClientExtension
org.eclipse.edc.connector.dataplane.client.DataPlaneSignalingClientTransformExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.client;

import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;

@ExtendWith(DependencyInjectionExtension.class)
class DataPlaneSignalingClientExtensionTest {

@Test
void verifyDataPlaneClientFactory(DataPlaneSignalingClientExtension extension) {

var client = extension.dataPlaneClientFactory().createClient(createDataPlaneInstance());

assertThat(client).isInstanceOf(DataPlaneSignalingClient.class);
}


private DataPlaneInstance createDataPlaneInstance() {
return DataPlaneInstance.Builder.newInstance().url("http://any").build();
}
}
Loading

0 comments on commit 63a16e3

Please sign in to comment.