Skip to content

Commit

Permalink
fix: transfer type resolution on dp self registration (#4687)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelmag110 authored Jan 13, 2025
1 parent 06a0fab commit 036bdba
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class TransferTypeParserImpl implements TransferTypeParser {

/**
* Parses a compose transfer type string into a {@link TransferType}:
* {@code DESTTYPE-{PUSH|PULL}(-RESPONSETYPE)}, for example {@code HttpData-PULL/Websocket}
* {@code DESTTYPE-{PUSH|PULL}(-RESPONSETYPE)}, for example {@code HttpData-PULL-Websocket}
*
* @param transferType the transfer type string representation.
* @return a {@link TransferType}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void shutdown() {

private @NotNull Stream<String> toTransferTypes(FlowType pull, Set<String> types, Set<String> responseTypes) {
var transferTypes = types.stream().map(it -> "%s-%s".formatted(it, pull));
return Stream.concat(transferTypes, responseTypes.stream().flatMap(responseType -> types.stream().map(it -> "%s-%s/%s".formatted(it, pull, responseType))));
return Stream.concat(transferTypes, responseTypes.stream().flatMap(responseType -> types.stream().map(it -> "%s-%s-%s".formatted(it, pull, responseType))));
}

private class DataPlaneHealthCheck implements LivenessProvider, ReadinessProvider, StartupStatusProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extensio
when(pipelineService.supportedSinkTypes()).thenReturn(Set.of("sinkType", "anotherSinkType"));
when(pipelineService.supportedSourceTypes()).thenReturn(Set.of("sourceType", "anotherSourceType"));
when(publicEndpointGeneratorService.supportedDestinationTypes()).thenReturn(Set.of("pullDestType", "anotherPullDestType"));
when(publicEndpointGeneratorService.supportedResponseTypes()).thenReturn(Set.of("responseType", "anotherResponseType"));
when(dataPlaneSelectorService.addInstance(any())).thenReturn(ServiceResult.success());

extension.initialize(context);
Expand All @@ -87,7 +88,18 @@ void shouldRegisterInstanceAtStartup(DataplaneSelfRegistrationExtension extensio
assertThat(dataPlaneInstance.getUrl()).isEqualTo(new URL("http://control/api/url/v1/dataflows"));
assertThat(dataPlaneInstance.getAllowedSourceTypes()).containsExactlyInAnyOrder("sourceType", "anotherSourceType");
assertThat(dataPlaneInstance.getAllowedTransferTypes())
.containsExactlyInAnyOrder("pullDestType-PULL", "anotherPullDestType-PULL", "sinkType-PUSH", "anotherSinkType-PUSH");
.containsExactlyInAnyOrder("anotherPullDestType-PULL-anotherResponseType",
"anotherSinkType-PUSH-anotherResponseType",
"anotherPullDestType-PULL",
"anotherSinkType-PUSH-responseType",
"anotherSinkType-PUSH",
"pullDestType-PULL",
"anotherPullDestType-PULL-responseType",
"pullDestType-PULL-anotherResponseType",
"sinkType-PUSH-anotherResponseType",
"pullDestType-PULL-responseType",
"sinkType-PUSH-responseType",
"sinkType-PUSH");

verify(healthCheckService).addStartupStatusProvider(any());
verify(healthCheckService).addLivenessProvider(any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public String stateAsString() {
public boolean canHandle(DataAddress sourceAddress, @Nullable String transferType) {
Objects.requireNonNull(sourceAddress, "source cannot be null!");
Objects.requireNonNull(transferType, "transferType cannot be null!");
// startsWith: the allowed transferType could be HttpData-PULL/someResponseChannel, and we only need to match the HttpData-PULL
// startsWith: the allowed transferType could be HttpData-PULL-someResponseChannel, and we only need to match the HttpData-PULL
return allowedSourceTypes.contains(sourceAddress.getType()) && allowedTransferTypes.contains(transferType);
}

Expand Down

0 comments on commit 036bdba

Please sign in to comment.