Skip to content

Commit

Permalink
feat: implements revocation in DataPlaneAuthorizationService (#4019)
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood authored Mar 19, 2024
1 parent 1055dac commit 964e40b
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,17 @@ public void completed(TransferProcess process) {
}

@Override
public void suspended(TransferProcess process) {
var event = TransferProcessSuspended.Builder.newInstance()
public void terminated(TransferProcess process) {
var event = withBaseProperties(TransferProcessTerminated.Builder.newInstance(), process)
.reason(process.getErrorDetail())
.transferProcessId(process.getId())
.callbackAddresses(process.getCallbackAddresses())
.build();

publish(event);
}

@Override
public void terminated(TransferProcess process) {
var event = withBaseProperties(TransferProcessTerminated.Builder.newInstance(), process)
public void suspended(TransferProcess process) {
var event = withBaseProperties(TransferProcessSuspended.Builder.newInstance(), process)
.reason(process.getErrorDetail())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public Result<DataAddress> authorize(String token, Map<String, Object> requestDa
.map(u -> accessTokenDataResult.getContent().dataAddress());
}

@Override
public Result<Void> revokeEndpointDataReference(String transferProcessId, String reason) {
return accessTokenService.revoke(transferProcessId, reason);
}

private Result<DataAddress> createDataAddress(TokenRepresentation tokenRepresentation, Endpoint publicEndpoint) {
var address = DataAddress.Builder.newInstance()
.type(publicEndpoint.endpointType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.eclipse.edc.spi.iam.TokenParameters;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.token.spi.KeyIdDecorator;
Expand Down Expand Up @@ -133,4 +135,27 @@ public Result<AccessTokenData> resolve(String token) {
Result.failure("AccessTokenData with ID '%s' does not exist.".formatted(tokenId)) :
Result.success(existingAccessToken);
}

@Override
public Result<Void> revoke(String transferProcessId, String reason) {

var query = QuerySpec.Builder.newInstance()
.filter(new Criterion("additionalProperties.process_id", "=", transferProcessId))
.build();

var tokens = accessTokenDataStore.query(query);
return tokens.stream().map(this::deleteTokenData)
.reduce(Result::merge)
.orElseGet(() -> Result.failure("AccessTokenData associated to the transfer with ID '%s' does not exist.".formatted(transferProcessId)));

}

private Result<Void> deleteTokenData(AccessTokenData tokenData) {
var result = accessTokenDataStore.deleteById(tokenData.id());
if (result.failed()) {
return Result.failure(result.getFailureDetail());
} else {
return Result.success();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public Result<DataFlowResponseMessage> start(DataFlowStartMessage startMessage)
var response = DataFlowResponseMessage.Builder.newInstance()
.dataAddress(result.getContent().orElse(null))
.build();

update(dataFlowBuilder.build());

return Result.success(response);
Expand All @@ -106,20 +106,20 @@ public DataFlowStates getTransferState(String processId) {
}

@Override
public StatusResult<Void> terminate(String dataFlowId, @Nullable String reason) {
public StatusResult<Void> suspend(String dataFlowId) {
return stop(dataFlowId)
.map(dataFlow -> {
dataFlow.transitToTerminated(reason);
dataFlow.transitToSuspended();
store.save(dataFlow);
return null;
});
}

@Override
public StatusResult<Void> suspend(String dataFlowId) {
return stop(dataFlowId)
public StatusResult<Void> terminate(String dataFlowId, @Nullable String reason) {
return stop(dataFlowId, reason)
.map(dataFlow -> {
dataFlow.transitToSuspended();
dataFlow.transitToTerminated(reason);
store.save(dataFlow);
return null;
});
Expand All @@ -134,6 +134,10 @@ protected StateMachineManager.Builder configureStateMachineManager(StateMachineM
}

private StatusResult<DataFlow> stop(String dataFlowId) {
return stop(dataFlowId, null);
}

private StatusResult<DataFlow> stop(String dataFlowId, String reason) {
var result = store.findByIdAndLease(dataFlowId);
if (result.failed()) {
return StatusResult.from(result).map(it -> null);
Expand All @@ -156,6 +160,11 @@ private StatusResult<DataFlow> stop(String dataFlowId) {
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail()));
}
}
} else {
var revokeResult = authorizationService.revokeEndpointDataReference(dataFlowId, reason);
if (revokeResult.failed()) {
return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, revokeResult.getFailureDetail()));
}
}

return StatusResult.success(dataFlow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,27 @@ void authorize_accessNotGranted() {
verifyNoMoreInteractions(accessTokenService, accessControlService);
}

@Test
void revoke() {
when(accessTokenService.revoke(eq("id"), eq("reason"))).thenReturn(Result.success());

assertThat(authorizationService.revokeEndpointDataReference("id", "reason")).isSucceeded();

verify(accessTokenService).revoke(eq("id"), eq("reason"));
verifyNoMoreInteractions(accessTokenService, accessControlService);
}

@Test
void revoke_error() {
when(accessTokenService.revoke(eq("id"), eq("reason"))).thenReturn(Result.failure("failure"));

assertThat(authorizationService.revokeEndpointDataReference("id", "reason")).isFailed()
.detail().contains("failure");

verify(accessTokenService).revoke(eq("id"), eq("reason"));
verifyNoMoreInteractions(accessTokenService, accessControlService);
}

private DataFlowStartMessage.Builder createStartMessage() {
return DataFlowStartMessage.Builder.newInstance()
.processId("test-processid")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.iam.TokenParameters;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.DataAddress;
Expand All @@ -28,6 +30,7 @@
import org.eclipse.edc.token.spi.TokenValidationService;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

Expand All @@ -39,6 +42,7 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -180,4 +184,58 @@ void resolve_whenTokenIdNotFound() {
verify(tokenValidationService).validate(eq("some-jwt"), any(), anyList());
verify(store).getById(eq(tokenId));
}


@Test
void revoke() {
var tokenId = "test-id";
var processId = "tp-id";

var querySpec = QuerySpec.Builder.newInstance().filter(new Criterion("additionalProperties.process_id", "=", processId)).build();

var tokenData = new AccessTokenData(tokenId, ClaimToken.Builder.newInstance().build(),
DataAddress.Builder.newInstance().type("test-type").build());

when(store.query(querySpec)).thenReturn(List.of(tokenData));
when(store.deleteById(tokenId)).thenReturn(StoreResult.success());

var result = accessTokenService.revoke("tp-id", "reason");
assertThat(result).isSucceeded();

verify(store).deleteById(eq(tokenId));

}

@Test
void revoke_storeError() {
var tokenId = "test-id";
var processId = "tp-id";

var querySpec = QuerySpec.Builder.newInstance().filter(new Criterion("additionalProperties.process_id", "=", processId)).build();

var tokenData = new AccessTokenData(tokenId, ClaimToken.Builder.newInstance().build(),
DataAddress.Builder.newInstance().type("test-type").build());

when(store.query(querySpec)).thenReturn(List.of(tokenData));
when(store.deleteById(tokenId)).thenReturn(StoreResult.generalError("storeError"));

var result = accessTokenService.revoke("tp-id", "reason");
assertThat(result).isFailed().detail().contains("storeError");

verify(store).deleteById(eq(tokenId));
}

@Test
void revoke_notTokensFound() {
var processId = "tp-id";
var querySpec = QuerySpec.Builder.newInstance().filter(new Criterion("additionalProperties.process_id", "=", processId)).build();

when(store.query(querySpec)).thenReturn(List.of());

var result = accessTokenService.revoke("tp-id", "reason");
assertThat(result).isFailed().detail().contains("AccessTokenData associated to the transfer with ID");

verify(store, never()).deleteById(any());

}
}
Loading

0 comments on commit 964e40b

Please sign in to comment.