From 22ad5ab66cb01015c478eba2f16ccd2273b4f977 Mon Sep 17 00:00:00 2001 From: Honnix Date: Mon, 8 Aug 2022 11:49:09 +0200 Subject: [PATCH] Use ExecutionCreatedAt in filter (#1004) * Use ExecutionCreatedAt in filter --- .../spotify/styx/flyte/client/RpcHelper.java | 6 +-- .../styx/flyte/client/RpcHelperTest.java | 5 ++- .../styx/flyte/FlyteAdminClientRunner.java | 6 --- ...dminClientRunnerTerminateDanglingTest.java | 37 ++++++------------- 4 files changed, 17 insertions(+), 37 deletions(-) diff --git a/styx-flyte-client/src/main/java/com/spotify/styx/flyte/client/RpcHelper.java b/styx-flyte-client/src/main/java/com/spotify/styx/flyte/client/RpcHelper.java index 5ea9547a06..91a29b5323 100644 --- a/styx-flyte-client/src/main/java/com/spotify/styx/flyte/client/RpcHelper.java +++ b/styx-flyte-client/src/main/java/com/spotify/styx/flyte/client/RpcHelper.java @@ -37,12 +37,12 @@ public static String getExecutionsListFilter(Instant timeNow, Duration since, Du .toLocalDateTime() .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")); - - String dateTo = timeNow.minus(to) + final String dateTo = timeNow.minus(to) .atZone(ZoneId.of("UTC")) .toLocalDateTime() .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")); - return String.format("value_in(phase,RUNNING)+gte(started_at,%s)+lte(started_at,%s)",dateSince,dateTo); + return String.format("value_in(phase,RUNNING)+gte(execution_created_at,%s)+lte(execution_created_at,%s)", dateSince, + dateTo); } } diff --git a/styx-flyte-client/src/test/java/com/spotify/styx/flyte/client/RpcHelperTest.java b/styx-flyte-client/src/test/java/com/spotify/styx/flyte/client/RpcHelperTest.java index b8e185317f..1343d96982 100644 --- a/styx-flyte-client/src/test/java/com/spotify/styx/flyte/client/RpcHelperTest.java +++ b/styx-flyte-client/src/test/java/com/spotify/styx/flyte/client/RpcHelperTest.java @@ -36,7 +36,7 @@ public class RpcHelperTest { @Test public void testGtExecutionsListFilter() { - final Instant someTime = LocalDateTime.of(2022, 2, 2, 12, 12, 05) + final Instant someTime = LocalDateTime.of(2022, 2, 2, 12, 12, 5) .atZone(ZoneOffset.UTC) .toInstant(); @@ -46,7 +46,8 @@ public void testGtExecutionsListFilter() { Duration.of(3, ChronoUnit.MINUTES)); assertThat(executionsListFilter, - equalTo("value_in(phase,RUNNING)+gte(started_at,2022-02-01T12:12:05)+lte(started_at,2022-02-02T12:09:05)")); + equalTo("value_in(phase,RUNNING)+gte(execution_created_at,2022-02-01T12:12:05)+lte(execution_created_at," + + "2022-02-02T12:09:05)")); } } diff --git a/styx-scheduler-service/src/main/java/com/spotify/styx/flyte/FlyteAdminClientRunner.java b/styx-scheduler-service/src/main/java/com/spotify/styx/flyte/FlyteAdminClientRunner.java index 27a92b49d5..61a626ca36 100644 --- a/styx-scheduler-service/src/main/java/com/spotify/styx/flyte/FlyteAdminClientRunner.java +++ b/styx-scheduler-service/src/main/java/com/spotify/styx/flyte/FlyteAdminClientRunner.java @@ -54,7 +54,6 @@ import com.spotify.styx.util.TriggerUtil; import flyteidl.admin.ExecutionOuterClass; import flyteidl.admin.ExecutionOuterClass.ExecutionMetadata.ExecutionMode; -import flyteidl.core.Execution; import flyteidl.core.IdentifierOuterClass; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -313,11 +312,6 @@ private void tryTerminateDanglingFlyteExecutions() { } private boolean haveBeenRunningForAWhile(ExecutionOuterClass.Execution exec) { - var isRunning = exec.getClosure().getPhase() == Execution.WorkflowExecution.Phase.RUNNING; - if (!isRunning) { - return false; - } - var startedAt = exec.getClosure().getStartedAt(); var startedAtInstant = Instant.ofEpochSecond(startedAt.getSeconds(), startedAt.getNanos()); var age = Duration.between(startedAtInstant, time.get()); diff --git a/styx-scheduler-service/src/test/java/com/spotify/styx/flyte/FlyteAdminClientRunnerTerminateDanglingTest.java b/styx-scheduler-service/src/test/java/com/spotify/styx/flyte/FlyteAdminClientRunnerTerminateDanglingTest.java index 129cc9b200..eb97b580d1 100644 --- a/styx-scheduler-service/src/test/java/com/spotify/styx/flyte/FlyteAdminClientRunnerTerminateDanglingTest.java +++ b/styx-scheduler-service/src/test/java/com/spotify/styx/flyte/FlyteAdminClientRunnerTerminateDanglingTest.java @@ -83,8 +83,6 @@ public class FlyteAdminClientRunnerTerminateDanglingTest { private static final String RUNNING_1 = "running-1"; private static final String RUNNING_2 = "running-2"; - private static final String NON_RUNNING_1 = "non-running-1"; - private static final String NON_RUNNING_2 = "non-running-2"; private static final String NON_STYX_1 = "non-styx-1"; private static final String NON_STYX_2 = "non-styx-2"; private static final String NA_DANGLING_1 = "na-dangling-1"; @@ -131,12 +129,11 @@ public void setUp() { var oldIdInAnnotationDangling = executions(this::oldIdInAnnotationDanglingExecution, OA_DANGLING_1, OA_DANGLING_2); var running = executions(this::runningExecution, RUNNING_1, RUNNING_2); var nonStyx = executions(this::runningNonStyxExecution, NON_STYX_1, NON_STYX_2); - var nonRunning = executions(this::nonRunningExecutions, NON_RUNNING_1, NON_RUNNING_2); time.setOffset(BACK_ENOUGH_TO_MAKE_EXECUTIONS_YOUNG); var nonActiveYoungDangling = executions(this::nonActiveDanglingExecution, NA_DANGLING_YOUNG_1, NA_DANGLING_YOUNG_2); stubListExecutions(nonActiveYoungDangling, nonActiveDangling, noIdInStateDangling, noIdInAnnotationDangling, - oldIdInAnnotationDangling, running, nonStyx, nonRunning); + oldIdInAnnotationDangling, running, nonStyx); time.reset(); } @@ -155,8 +152,8 @@ public void shouldCallListExecutionsWithFilter() { var filters = filterCatcher.getValue().split("\\+"); assertThat(filters, arrayContainingInAnyOrder( equalTo("value_in(phase,RUNNING)"), - startsWith("gte(started_at,"), - startsWith("lte(started_at,")) + startsWith("gte(execution_created_at,"), + startsWith("lte(execution_created_at,")) ); } @@ -216,14 +213,6 @@ public void shouldNotTerminateNonStyxFlyteExecutions() { verifyTerminateExecution(never(), NON_STYX_2); } - @Test - public void shouldNotTerminateFlyteExecutionsInTerminalPhase() { - runner.terminateDanglingFlyteExecutions(); - - verifyTerminateExecution(never(), NON_RUNNING_1); - verifyTerminateExecution(never(), NON_RUNNING_2); - } - @Test public void shouldScheduleTerminationOnInit() { runner.init(); @@ -256,32 +245,28 @@ private void stubListExecutions(List... executions) { when(stateManager.listActiveInstances()).thenReturn(activeInstances); } - private Execution nonRunningExecutions(String name) { - return execution(name, styxAnnotations(name, workflowInstance(name)), Phase.SUCCEEDED); - } - private Execution noIdInStateDanglingExecution(String name) { final var workflowInstance = workflowInstance(name); addToActiveStates(workflowInstance, StateData.newBuilder().build()); - return execution(name, styxAnnotations(name, workflowInstance), Phase.RUNNING); + return execution(name, styxAnnotations(name, workflowInstance)); } private Execution oldIdInAnnotationDanglingExecution(String name) { final var workflowInstance = workflowInstance(name); addToActiveStates(workflowInstance, StateData.newBuilder().executionId(name).build()); - return execution(name, oldRunIdStyxAnnotation(name, workflowInstance), Phase.RUNNING); + return execution(name, oldRunIdStyxAnnotation(name, workflowInstance)); } private Execution noIdInAnnotationDanglingExecution(String name) { final var workflowInstance = workflowInstance(name); addToActiveStates(workflowInstance, StateData.newBuilder().executionId(name).build()); - return execution(name, noRunIdStyxAnnotation(workflowInstance), Phase.RUNNING); + return execution(name, noRunIdStyxAnnotation(workflowInstance)); } private Execution runningExecution(String name) { final var workflowInstance = workflowInstance(name); addToActiveStates(workflowInstance, StateData.newBuilder().executionId(name).build()); - return execution(name, styxAnnotations(name, workflowInstance), Phase.RUNNING); + return execution(name, styxAnnotations(name, workflowInstance)); } private void addToActiveStates(WorkflowInstance workflowInstance, StateData state) { @@ -295,14 +280,14 @@ private void addToActiveStates(WorkflowInstance workflowInstance, StateData stat } private Execution nonActiveDanglingExecution(String name) { - return execution(name, styxAnnotations(name, workflowInstance(name)), Phase.RUNNING); + return execution(name, styxAnnotations(name, workflowInstance(name))); } private Execution runningNonStyxExecution(String name) { - return execution(name, emptyAnnotations(), Phase.RUNNING); + return execution(name, emptyAnnotations()); } - private Execution execution(String name, Common.Annotations annotations, Phase phase) { + private Execution execution(String name, Common.Annotations annotations) { var timestamp = nowTimestamp(); var identifier = IdentifierOuterClass.WorkflowExecutionIdentifier.newBuilder() .setProject(PROJECT) @@ -315,7 +300,7 @@ private Execution execution(String name, Common.Annotations annotations, Phase p .setAnnotations(annotations) .build()) .setClosure(ExecutionOuterClass.ExecutionClosure.newBuilder() - .setPhase(phase) + .setPhase(Phase.RUNNING) .setCreatedAt(timestamp) .setStartedAt(timestamp) .build())