Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Add endpoint with enabled state (#1056)
Browse files Browse the repository at this point in the history
  • Loading branch information
sonjaer authored Mar 13, 2023
1 parent 8a758f1 commit 4b878ba
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 0 deletions.
41 changes: 41 additions & 0 deletions doc/api.apib
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,47 @@ Resources related to the Workflows registered in Styx.
}
}]


## Workflows [/{version}/workflows/full]

+ Parameters
+ version: `v3` (enum[string]) - API version
+ Members
+ `v3`

### Get Workflows with State [GET]

+ Response 200 (application/json)

[{
"workflow": {
"component_id": "styx-canary",
"workflow_id": "StyxCanary",
"component_uri": "file:///etc/styx/schedule.yaml",
"configuration": {
"id": "StyxCanary",
"schedule": "hours",
"offset": null,
"docker_image": null,
"docker_args": [
"luigi",
"--module",
"canary_job",
"CanaryJob"
],
"docker_termination_logging": false,
"env": {"FOO": "bar"},
"resources": [],
"running_timeout": "PT2H"
}
},
"state": {
"enabled": "true",
"next_natural_trigger": "2017-01-01T01:00:00Z",
"next_natural_offset_trigger": "2017-01-01T02:00:00Z"
}
}]

## Workflows [/{version}/workflows/{component_id}]

+ Parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public Stream<Route<AsyncHandler<Response<ByteString>>>> routes(
Route.with(
json(), "GET", BASE + "/<cid>/<wfid>/full",
rc -> workflowWithState(arg("cid", rc), arg("wfid", rc))),
Route.with(
json(), "GET", BASE + "/full",
rc -> workflowsWithState()),
Route.with(
json(), "GET", BASE,
rc -> workflows(rc.request())),
Expand Down Expand Up @@ -216,6 +219,15 @@ private WorkflowConfiguration readFromJsonWithDefaults(ByteString payload)
return WorkflowConfigurationBuilder.from(workflowConfig).deploymentTime(time.get()).build();
}

private Response<Collection<WorkflowWithState>> workflowsWithState() {
try {
Collection<WorkflowWithState> workflows = storage.workflowsWithState().values();
return Response.forPayload(workflows);
} catch (IOException e) {
throw new RuntimeException("Failed to get workflows", e);
}
}

private Response<Collection<Workflow>> workflows(Request request) {
try {
var paramFilters = Stream.of(QueryParams.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static com.spotify.styx.testdata.TestData.QUERY_THRESHOLD_BEFORE;
import static com.spotify.styx.testdata.TestData.TEST_DEPLOYMENT_TIME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -74,6 +75,7 @@
import com.spotify.styx.model.WorkflowId;
import com.spotify.styx.model.WorkflowInstance;
import com.spotify.styx.model.WorkflowState;
import com.spotify.styx.model.WorkflowWithState;
import com.spotify.styx.state.Trigger;
import com.spotify.styx.storage.AggregateStorage;
import com.spotify.styx.storage.BigtableMocker;
Expand All @@ -87,6 +89,7 @@
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -822,6 +825,27 @@ public void shouldReturnWorkflows() throws Exception {
assertJson(response, "[*]", hasSize(2));
}

@Test
public void shouldReturnWorkflowsWithState() throws Exception {
sinceVersion(Api.Version.V3);

Response<ByteString> response = awaitResponse(
serviceHelper.request("GET", path("/full")));

var parsedResponse = Arrays.asList(deserialize(response.payload().orElseThrow(), WorkflowWithState[].class));
var expectedWF1 = WorkflowWithState.create(FLYTE_EXEC_WORKFLOW, WorkflowState.builder().enabled(false).build());
var expectedWF2 = WorkflowWithState.create(WORKFLOW, WorkflowState.builder().enabled(false).build());

assertThat(response, hasStatus(withCode(Status.OK)));
assertJson(response, "[*]", hasSize(2));
assertThat(parsedResponse,
containsInAnyOrder(
expectedWF1,
expectedWF2
)
);
}

@Test
public void shouldReturnFilteredDeploymentTypeWorkflow() throws Exception {
sinceVersion(Api.Version.V3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.HashMap;
import org.apache.hadoop.hbase.client.Connection;

/**
Expand Down Expand Up @@ -196,6 +197,11 @@ public Map<WorkflowId, Workflow> workflows() throws IOException {
return datastoreStorage.workflows();
}

@Override
public HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException {
return datastoreStorage.workflowsWithState();
}

@Override
public Map<WorkflowId, Workflow> workflows(Set<WorkflowId> workflowIds) {
return datastoreStorage.workflows(workflowIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,26 @@ Optional<WorkflowWithState> workflowWithState(WorkflowId workflowId) throws IOEx
return Optional.of(WorkflowWithState.create(workflow, workflowState));
}

HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException {
HashMap<WorkflowId, WorkflowWithState> workflows = new HashMap<>();
var query = Query.newEntityQueryBuilder().setKind(KIND_WORKFLOW).build();
datastore.query(query, entity -> {
Workflow workflow;
WorkflowState workflowState;
WorkflowWithState workflowWithState;
try {
workflow = OBJECT_MAPPER.readValue(entity.getString(PROPERTY_WORKFLOW_JSON), Workflow.class);
workflowState = workflowState(Optional.of(entity));
workflowWithState = WorkflowWithState.create(workflow, workflowState);
} catch (IOException e) {
log.warn("Failed to read workflow {}.", entity.getKey(), e);
return;
}
workflows.put(workflow.id(), workflowWithState);
});
return workflows;
}

private WorkflowState workflowState(Optional<Entity> workflowEntity) {
var builder = WorkflowState.builder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.HashMap;

/**
* The interface to the persistence layer.
Expand Down Expand Up @@ -129,6 +130,13 @@ public interface Storage extends Closeable {
*/
Map<WorkflowId, Workflow> workflows() throws IOException;


/**
* Get all {@link WorkflowWithState}s.
* @return
*/
HashMap<WorkflowId, WorkflowWithState> workflowsWithState() throws IOException;

/** Get all {@link Workflow}s by doing strongly consistent batch fetch.
*
* @param workflowIds set of {@link WorkflowId}s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import com.google.cloud.datastore.StringValue;
import com.google.common.collect.ImmutableSet;
import com.spotify.styx.model.Backfill;
import com.spotify.styx.model.WorkflowWithState;
import com.spotify.styx.model.BackfillBuilder;
import com.spotify.styx.model.ExecutionDescription;
import com.spotify.styx.model.Resource;
Expand Down Expand Up @@ -556,6 +557,39 @@ public void allFieldsAreSetWhenRetrievingWorkflowState() throws Exception {
assertThat(retrieved, is(state));
}

@Test
public void shouldReturnWorkflowsWithState() throws Exception {
assertThat(storage.workflowsWithState().isEmpty(), is(true));

Workflow workflow1 = workflow(WORKFLOW_ID1);
Workflow workflow2 = workflow(WORKFLOW_ID2);
Workflow workflow3 = workflow(WORKFLOW_ID3);

storage.store(workflow1);
storage.store(workflow2);
storage.store(workflow3);

var instant = Instant.parse("2016-03-14T14:00:00Z");
var stateWorkflow1 = WorkflowState.builder()
.enabled(true)
.nextNaturalTrigger(instant)
.nextNaturalOffsetTrigger(instant.plus(1, ChronoUnit.DAYS))
.build();
var stateWorkflow2 = stateWorkflow1.toBuilder().enabled(false).build();
var stateWorkflow3 = stateWorkflow1.toBuilder().nextNaturalOffsetTrigger(instant.plus(2, ChronoUnit.DAYS)).build();

storage.patchState(WORKFLOW_ID1, stateWorkflow1);
storage.patchState(WORKFLOW_ID2, stateWorkflow2);
storage.patchState(WORKFLOW_ID3, stateWorkflow3);

var workflows = storage.workflowsWithState();
assertThat(workflows.size(), is(3));

assertThat(workflows, hasEntry(WORKFLOW_ID1, WorkflowWithState.create(workflow1, stateWorkflow1)));
assertThat(workflows, hasEntry(WORKFLOW_ID2, WorkflowWithState.create(workflow2, stateWorkflow2)));
assertThat(workflows, hasEntry(WORKFLOW_ID3, WorkflowWithState.create(workflow3, stateWorkflow3)));
}

@Test
public void shouldReturnWorkflowWithState() throws Exception {
storage.store(WORKFLOW);
Expand Down

0 comments on commit 4b878ba

Please sign in to comment.