Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for multiple protocol webooks #4730

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.connector.core.event.EventExecutorServiceContainer;
import org.eclipse.edc.connector.core.event.EventRouterImpl;
import org.eclipse.edc.connector.core.message.RemoteMessageDispatcherRegistryImpl;
import org.eclipse.edc.connector.core.protocol.ProtocolWebhookRegistryImpl;
import org.eclipse.edc.connector.core.validator.DataAddressValidatorRegistryImpl;
import org.eclipse.edc.connector.core.validator.JsonObjectValidatorRegistryImpl;
import org.eclipse.edc.http.client.ControlApiHttpClientImpl;
Expand All @@ -42,6 +43,7 @@
import org.eclipse.edc.spi.command.CommandHandlerRegistry;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry;
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
import org.eclipse.edc.spi.system.Hostname;
import org.eclipse.edc.spi.system.ServiceExtension;
Expand All @@ -61,11 +63,8 @@ public class CoreServicesExtension implements ServiceExtension {

@Setting(description = "The name of the claim key used to determine the participant identity", defaultValue = DEFAULT_IDENTITY_CLAIM_KEY)
public static final String EDC_AGENT_IDENTITY_KEY = "edc.agent.identity.key";

private static final String DEFAULT_EDC_HOSTNAME = "localhost";

public static final String EDC_HOSTNAME = "edc.hostname";

private static final String DEFAULT_EDC_HOSTNAME = "localhost";
@Setting(description = "Connector hostname, which e.g. is used in referer urls", defaultValue = DEFAULT_EDC_HOSTNAME, key = EDC_HOSTNAME, warnOnMissingConfig = true)
public static String hostname;

Expand Down Expand Up @@ -173,6 +172,11 @@ public CriterionOperatorRegistry criterionOperatorRegistry() {
public ControlApiHttpClient controlApiHttpClient() {
return new ControlApiHttpClientImpl(edcHttpClient, controlClientAuthenticationProvider);
}

@Provider
public ProtocolWebhookRegistry protocolWebhookRegistry() {
return new ProtocolWebhookRegistryImpl();
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2025 Metaform Systems, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Metaform Systems, Inc. - initial API and implementation
*
*/

package org.eclipse.edc.connector.core.protocol;

import org.eclipse.edc.spi.protocol.ProtocolWebhook;
import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry;
import org.jetbrains.annotations.Nullable;

import java.util.HashMap;
import java.util.Map;

public class ProtocolWebhookRegistryImpl implements ProtocolWebhookRegistry {
private final Map<String, ProtocolWebhook> webhooks = new HashMap<>();

@Override
public void registerWebhook(String protocol, ProtocolWebhook webhook) {
webhooks.put(protocol, webhook);
}

@Override
public @Nullable ProtocolWebhook resolve(String protocol) {
return webhooks.get(protocol);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public CatalogProtocolServiceImpl(DatasetResolver datasetResolver,
public ServiceResult<Catalog> getCatalog(CatalogRequestMessage message, TokenRepresentation tokenRepresentation) {
return transactionContext.execute(() -> protocolTokenValidator.verify(tokenRepresentation, RequestCatalogPolicyContext::new, message)
.map(agent -> {
try (var datasets = datasetResolver.query(agent, message.getQuerySpec())) {
var dataServices = dataServiceRegistry.getDataServices();
try (var datasets = datasetResolver.query(agent, message.getQuerySpec(), message.getProtocol())) {
var dataServices = dataServiceRegistry.getDataServices(message.getProtocol());

return Catalog.Builder.newInstance()
.dataServices(dataServices)
Expand All @@ -69,9 +69,9 @@ public ServiceResult<Catalog> getCatalog(CatalogRequestMessage message, TokenRep
}

@Override
public @NotNull ServiceResult<Dataset> getDataset(String datasetId, TokenRepresentation tokenRepresentation) {
public @NotNull ServiceResult<Dataset> getDataset(String datasetId, TokenRepresentation tokenRepresentation, String protocol) {
return transactionContext.execute(() -> protocolTokenValidator.verify(tokenRepresentation, RequestCatalogPolicyContext::new)
.map(agent -> datasetResolver.getById(agent, datasetId))
.map(agent -> datasetResolver.getById(agent, datasetId, protocol))
.compose(dataset -> {
if (dataset == null) {
return ServiceResult.notFound(format("Dataset %s does not exist", datasetId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ void shouldReturnCatalogWithConnectorDataServiceAndItsDataset() {
var dataService = DataService.Builder.newInstance().build();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any(), eq(message))).thenReturn(ServiceResult.success(participantAgent));
when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService));
when(datasetResolver.query(any(), any())).thenReturn(Stream.of(createDataset()));
when(dataServiceRegistry.getDataServices(any())).thenReturn(List.of(dataService));
when(datasetResolver.query(any(), any(), any())).thenReturn(Stream.of(createDataset()));

var result = service.getCatalog(message, tokenRepresentation);

Expand All @@ -97,7 +97,7 @@ void shouldReturnCatalogWithConnectorDataServiceAndItsDataset() {
assertThat(catalog.getDatasets()).hasSize(1);
assertThat(catalog.getParticipantId()).isEqualTo("participantId");
});
verify(datasetResolver).query(eq(participantAgent), eq(querySpec));
verify(datasetResolver).query(eq(participantAgent), eq(querySpec), eq("protocol"));
verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class));
}

Expand Down Expand Up @@ -125,12 +125,12 @@ void shouldReturnDataset() {
var dataset = createDataset();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any())).thenReturn(ServiceResult.success(participantAgent));
when(datasetResolver.getById(any(), any())).thenReturn(dataset);
when(datasetResolver.getById(any(), any(), any())).thenReturn(dataset);

var result = service.getDataset("datasetId", tokenRepresentation);
var result = service.getDataset("datasetId", tokenRepresentation, "protocol");

assertThat(result).isSucceeded().isEqualTo(dataset);
verify(datasetResolver).getById(participantAgent, "datasetId");
verify(datasetResolver).getById(participantAgent, "datasetId", "protocol");
verify(transactionContext).execute(any(TransactionContext.ResultTransactionBlock.class));
}

Expand All @@ -140,9 +140,9 @@ void shouldFail_whenDatasetIsNull() {
var tokenRepresentation = createTokenRepresentation();

when(protocolTokenValidator.verify(eq(tokenRepresentation), any())).thenReturn(ServiceResult.success(participantAgent));
when(datasetResolver.getById(any(), any())).thenReturn(null);
when(datasetResolver.getById(any(), any(), any())).thenReturn(null);

var result = service.getDataset("datasetId", tokenRepresentation);
var result = service.getDataset("datasetId", tokenRepresentation, "protocol");

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND);
}
Expand All @@ -153,7 +153,7 @@ void shouldFail_whenTokenValidationFails() {

when(protocolTokenValidator.verify(eq(tokenRepresentation), any())).thenReturn(ServiceResult.unauthorized("unauthorized"));

var result = service.getDataset("datasetId", tokenRepresentation);
var result = service.getDataset("datasetId", tokenRepresentation, "protocol");

assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(UNAUTHORIZED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.message.RemoteMessageDispatcher;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.protocol.ProtocolWebhook;
import org.eclipse.edc.spi.protocol.ProtocolWebhookRegistry;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -84,6 +85,8 @@
public class TransferProcessEventDispatchTest {

public static final Duration TIMEOUT = Duration.ofSeconds(30);
private static final ProtocolWebhookRegistry PROTOCOL_WEBHOOK_REGISTRY = mock(ProtocolWebhookRegistry.class);

@RegisterExtension
static final RuntimeExtension RUNTIME = new RuntimePerClassExtension()
.setConfiguration(Map.of(
Expand All @@ -93,13 +96,20 @@ public class TransferProcessEventDispatchTest {
.registerServiceMock(TransferWaitStrategy.class, () -> 1)
.registerServiceMock(EventExecutorServiceContainer.class, new EventExecutorServiceContainer(newSingleThreadExecutor()))
.registerServiceMock(IdentityService.class, mock())
.registerServiceMock(ProtocolWebhook.class, () -> "http://dummy")
.registerServiceMock(ProtocolWebhookRegistry.class, PROTOCOL_WEBHOOK_REGISTRY)
.registerServiceMock(PolicyArchive.class, mock())
.registerServiceMock(ContractNegotiationStore.class, mock())
.registerServiceMock(ParticipantAgentService.class, mock())
.registerServiceMock(DataPlaneClientFactory.class, mock());

private final EventSubscriber eventSubscriber = mock();

@BeforeEach
void setup() {
// setup
when(PROTOCOL_WEBHOOK_REGISTRY.resolve(any())).thenReturn(() -> "http://dummy");
}

@Test
void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService service,
TransferProcessProtocolService protocolService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class DataServiceRegistryImpl implements DataServiceRegistry {

private final List<DataService> dataServices = new ArrayList<>();
private final Map<String, List<DataService>> dataServices = new ConcurrentHashMap<>();

@Override
public void register(DataService dataService) {
dataServices.add(dataService);
public void register(String protocol, DataService dataService) {
dataServices.computeIfAbsent(protocol, k -> new ArrayList<>()).add(dataService);
}

@Override
public List<DataService> getDataServices() {
return dataServices;
public List<DataService> getDataServices(String protocol) {
return dataServices.computeIfAbsent(protocol, k -> new ArrayList<>());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,32 @@ public DatasetResolverImpl(ContractDefinitionResolver contractDefinitionResolver

@Override
@NotNull
public Stream<Dataset> query(ParticipantAgent agent, QuerySpec querySpec) {
public Stream<Dataset> query(ParticipantAgent agent, QuerySpec querySpec, String protocol) {
var resolved = contractDefinitionResolver.resolveFor(agent);
var contractDefinitions = resolved.contractDefinitions();
if (contractDefinitions.isEmpty()) {
return Stream.empty();
}

var assetsQuery = QuerySpec.Builder.newInstance().offset(0).limit(MAX_VALUE).filter(querySpec.getFilterExpression()).build();
return assetIndex.queryAssets(assetsQuery)
.map(asset -> toDataset(contractDefinitions, asset, resolved.policies()))
.map(asset -> toDataset(contractDefinitions, asset, resolved.policies(), protocol))
.filter(Dataset::hasOffers)
.skip(querySpec.getOffset())
.limit(querySpec.getLimit());
}

@Override
public Dataset getById(ParticipantAgent agent, String id) {
public Dataset getById(ParticipantAgent agent, String id, String protocol) {
var resolved = contractDefinitionResolver.resolveFor(agent);
var contractDefinitions = resolved.contractDefinitions();
if (contractDefinitions.isEmpty()) {
return null;
}

return Optional.of(id)
.map(assetIndex::findById)
.map(asset -> toDataset(contractDefinitions, asset, resolved.policies()))
.map(asset -> toDataset(contractDefinitions, asset, resolved.policies(), protocol))
.filter(Dataset::hasOffers)
.orElse(null);
}
Expand All @@ -106,9 +106,9 @@ public Dataset getById(ParticipantAgent agent, String id) {
.build());
}

private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset, Map<String, Policy> policies) {
private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset, Map<String, Policy> policies, String protocol) {

var distributions = distributionResolver.getDistributions(asset);
var distributions = distributionResolver.getDistributions(protocol, asset);
var datasetBuilder = buildDataset(asset)
.id(asset.getId())
.distributions(distributions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public DefaultDistributionResolver(DataServiceRegistry dataServiceRegistry, Data
}

@Override
public List<Distribution> getDistributions(Asset asset) {
public List<Distribution> getDistributions(String protocol, Asset asset) {
if (asset.isCatalog()) {
return List.of(Distribution.Builder.newInstance()
.format(asset.getDataAddress().getType())
Expand All @@ -45,12 +45,12 @@ public List<Distribution> getDistributions(Asset asset) {
.build())
.build());
}
return dataFlowManager.transferTypesFor(asset).stream().map(this::createDistribution).toList();
return dataFlowManager.transferTypesFor(asset).stream().map((format) -> createDistribution(protocol, format)).toList();
}

private Distribution createDistribution(String format) {
private Distribution createDistribution(String protocol, String format) {
var builder = Distribution.Builder.newInstance().format(format);
dataServiceRegistry.getDataServices().forEach(builder::dataService);
dataServiceRegistry.getDataServices(protocol).forEach(builder::dataService);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,24 @@ class DataServiceRegistryImplTest {
void shouldReturnRegisteredDataService() {
var dataService = DataService.Builder.newInstance().build();

registry.register(dataService);
var dataServices = registry.getDataServices();
var protocol = "protocol";

registry.register(protocol, dataService);
var dataServices = registry.getDataServices(protocol);

assertThat(dataServices).containsExactly(dataService);
}

@Test
void shouldReturnEmptyDataServices() {
var dataService = DataService.Builder.newInstance().build();

var protocol = "protocol";

registry.register(protocol, dataService);
var dataServices = registry.getDataServices("unknownProtocol");

assertThat(dataServices).isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void shouldLimitResult_withHeterogeneousChunks() {
var to = 50;
var querySpec = QuerySpec.Builder.newInstance().range(new Range(from, to)).build();

var datasets = resolver.query(createAgent(), querySpec);
var datasets = resolver.query(createAgent(), querySpec, "protocol");

assertThat(datasets).hasSize(to - from);
}
Expand All @@ -126,7 +126,7 @@ void should_return_offers_subset_when_across_multiple_contract_definitions(int f
when(contractDefinitionResolver.resolveFor(isA(ParticipantAgent.class))).thenReturn(new ResolvedContractDefinitions(List.of(contractDefinition1, contractDefinition2)));
var querySpec = QuerySpec.Builder.newInstance().range(new Range(from, to)).build();

var datasets = resolver.query(createAgent(), querySpec);
var datasets = resolver.query(createAgent(), querySpec, "protocol");

assertThat(datasets).hasSize(min(requestedRange, maximumRange));
}
Expand All @@ -149,7 +149,7 @@ void shouldLimitResult_insufficientAssets() {
var querySpec = QuerySpec.Builder.newInstance().range(new Range(from, to)).build();

// 4 definitions, 10 assets each = 40 offers total -> offset 20 ==> result = 20
var dataset = resolver.query(createAgent(), querySpec);
var dataset = resolver.query(createAgent(), querySpec, "protocol");

assertThat(dataset).hasSize(4);
}
Expand All @@ -163,7 +163,7 @@ void shouldLimitResult_pageOffsetLargerThanNumAssets() {
var to = 50;
var querySpec = QuerySpec.Builder.newInstance().range(new Range(from, to)).build();
// 2 definitions, 10 assets each = 20 offers total -> offset of 25 is outside
var datasets = resolver.query(createAgent(), querySpec);
var datasets = resolver.query(createAgent(), querySpec, "protocol");

assertThat(datasets).isEmpty();
}
Expand Down
Loading
Loading