diff --git a/styx-scheduler-service/src/main/java/com/spotify/styx/state/PersistentStateManager.java b/styx-scheduler-service/src/main/java/com/spotify/styx/state/PersistentStateManager.java index 325abfd40b..2057107e89 100644 --- a/styx-scheduler-service/src/main/java/com/spotify/styx/state/PersistentStateManager.java +++ b/styx-scheduler-service/src/main/java/com/spotify/styx/state/PersistentStateManager.java @@ -210,11 +210,11 @@ public void receive(Event event, long expectedCounter) throws IsClosedException ensureRunning(); log.info("Received event {}", event); - var nextRunState = transition(event, expectedCounter); - postTransition(event, nextRunState); + var maybeNextRunState = transition(event, expectedCounter); + maybeNextRunState.ifPresent(nextRunState -> postTransition(event, nextRunState)); } - private RunState transition(Event event, long expectedCounter) { + private Optional transition(Event event, long expectedCounter) { try { return storage.runInTransactionWithRetries(tx -> transition0(tx, event, expectedCounter)); } catch (Throwable e) { @@ -224,7 +224,16 @@ private RunState transition(Event event, long expectedCounter) { } } - private RunState transition0(StorageTransaction tx, Event event, long expectedCounter) + /** + * Transition the workflow instance state in the storage based on the {@code event} passed. + * @param tx the current open transaction in the storage + * @param event the event causing the transition. It contains the workflow id inside. + * @param expectedCounter expected counter used for event sorting + * @return If the workflow instance is no longer active, then an @{link Optional::empty} is returned, otherwise + * the transition will be applied and the new {@link RunState} will be returned wrapped in an {@link Optional::of} + * @throws IOException if problems reading the sctive state or updating the new state + */ + private Optional transition0(StorageTransaction tx, Event event, long expectedCounter) throws IOException { // Read active state from datastore @@ -233,7 +242,7 @@ private RunState transition0(StorageTransaction tx, Event event, long expectedCo if (currentRunStateOpt.isEmpty()) { var message = "Received event for unknown workflow instance: " + event; log.warn(message); - throw new IllegalArgumentException(message); + return Optional.empty(); } var currentRunState = currentRunStateOpt.orElseThrow(); @@ -255,7 +264,7 @@ private RunState transition0(StorageTransaction tx, Event event, long expectedCo tx.updateActiveState(event.workflowInstance(), nextRunState); } - return nextRunState; + return Optional.of(nextRunState); } private RunState nextRunState(Event event, RunState runState) { diff --git a/styx-scheduler-service/src/test/java/com/spotify/styx/state/PersistentStateManagerTest.java b/styx-scheduler-service/src/test/java/com/spotify/styx/state/PersistentStateManagerTest.java index dd8bd9cff7..8069d3b720 100644 --- a/styx-scheduler-service/src/test/java/com/spotify/styx/state/PersistentStateManagerTest.java +++ b/styx-scheduler-service/src/test/java/com/spotify/styx/state/PersistentStateManagerTest.java @@ -22,12 +22,11 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -43,7 +42,6 @@ import static org.mockito.Mockito.when; import com.google.cloud.datastore.DatastoreException; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.spotify.styx.model.Event; @@ -74,11 +72,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.BiConsumer; + import org.junit.After; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -115,8 +112,6 @@ public class PersistentStateManagerTest { private PersistentStateManager stateManager; - @Rule public ExpectedException exception = ExpectedException.none(); - @Captor private ArgumentCaptor runStateCaptor; @Mock private Storage storage; @@ -305,6 +300,21 @@ public void shouldNotFailWhenMissingResourceIdsWhenTransitionFromPrepareToError( verify(storage).writeEvent(SequenceEvent.create(event, 18, NOW.toEpochMilli())); } + @Test + public void shouldSkipProcessingEventWhenInactiveWFIds() + throws Exception { + Optional runState = Optional.empty(); + when(transaction.readActiveState(INSTANCE)).thenReturn(runState); + + Event event = Event.halt(INSTANCE); + stateManager.receive(event); + + verify(storage, never()).writeEvent(any()); + verify(transaction, never()).updateCounter(any(), any(), anyInt()); + verify(transaction, never()).deleteActiveState(any()); + verify(transaction, never()).updateActiveState(any(), any()); + } + @Test public void shouldFailTriggerWFIfAlreadyActive() throws Exception { reset(storage); @@ -316,54 +326,54 @@ public void shouldFailTriggerWFIfAlreadyActive() throws Exception { when(transactionException.isAlreadyExists()).thenReturn(true); doThrow(transactionException).when(transaction).writeActiveState(any(), any()); when(storage.runInTransactionWithRetries(any())).thenAnswer(a -> - a.getArgument(0).apply(transaction)); + a.getArgument(0).apply(transaction)); - try { - stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS); - fail(); - } catch (AlreadyInitializedException ignore) { - } + assertThrows(AlreadyInitializedException.class, () -> stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS)); } @Test public void shouldFailTriggerIfGetLatestCounterFails() throws Exception { - var cause = new IOException(); - when(storage.getLatestStoredCounter(any())).thenThrow(cause); - exception.expectCause(is(cause)); - stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS); + when(storage.getLatestStoredCounter(any())).thenThrow(new IOException()); + + assertThrows(RuntimeException.class, + () -> stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS)); } @Test public void shouldFailTriggerIfWorkflowNotFound() throws Exception { when(storage.getLatestStoredCounter(any())).thenReturn(Optional.empty()); when(transaction.workflow(INSTANCE.workflowId())).thenReturn(Optional.empty()); - exception.expect(instanceOf(IllegalArgumentException.class)); - exception.expectMessage("Workflow not found: " + INSTANCE.workflowId().toKey()); - stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS); + + var thrown = assertThrows(IllegalArgumentException.class, + () -> stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS)); + + assertEquals("Workflow not found: " + INSTANCE.workflowId().toKey(), + thrown.getMessage()); } @Test public void shouldFailTriggerOnExceptionFromTransaction() throws Exception { reset(storage); when(storage.getLatestStoredCounter(any())).thenReturn(Optional.empty()); - var cause = new IOException("fail!"); - when(storage.runInTransactionWithRetries(any())).thenThrow(cause); - exception.expectCause(is(cause)); - stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS); + when(storage.runInTransactionWithRetries(any())).thenThrow(new IOException("fail!")); + + var thrown = assertThrows(RuntimeException.class, + ()-> stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS)); + assertEquals("java.io.IOException: fail!", thrown.getMessage()); } @Test public void shouldRejectTriggerIfIsClosed() throws Exception { stateManager.close(); - exception.expect(IsClosedException.class); - stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS); + + assertThrows(IsClosedException.class, () -> stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS)); } @Test public void shouldRejectEventIfClosed() throws Exception { stateManager.close(); - exception.expect(IsClosedException.class); - stateManager.receive(Event.timeTrigger(INSTANCE)); + + assertThrows(IsClosedException.class, () -> stateManager.receive(Event.timeTrigger(INSTANCE))); } @Test @@ -385,12 +395,8 @@ public void shouldFailReceiveEventWithHigherCounter() throws Exception { when(transaction.readActiveState(INSTANCE)).thenReturn( Optional.of(RunState.create(INSTANCE, State.SUBMITTED, StateData.zero(), NOW, 17))); - try { - stateManager.receive(event, 16); - fail(); - } catch (StateTransitionConflictException ignore) { - } - + assertThrows(StateTransitionConflictException.class, + ()-> stateManager.receive(event, 16)); verify(storage, never()).writeEvent(any()); } @@ -401,12 +407,8 @@ public void shouldFailReceiveEventWithLowerCounter() throws Exception { RunState.create(INSTANCE, State.SUBMITTED, StateData.zero(), NOW.minusMillis(1), 17)); when(transaction.readActiveState(INSTANCE)).thenReturn(runState); - try { - stateManager.receive(event, 18); - fail(); - } catch (RuntimeException e) { - assertThat(e.getMessage(), startsWith("Unexpected current counter is less than last observed one for")); - } + var thrown = assertThrows(RuntimeException.class, + ()-> stateManager.receive(event, 18)); verify(storage, never()).writeEvent(any()); } @@ -443,25 +445,8 @@ public void shouldPreventIllegalStateTransition() throws Exception { RunState.create(INSTANCE, State.QUEUED, StateData.zero(), NOW.minusMillis(1), 17)); when(transaction.readActiveState(INSTANCE)).thenReturn(runState); - - try { - stateManager.receive(Event.terminate(INSTANCE, Optional.empty())); - fail(); - } catch (IllegalStateException ignore) { - } - - verify(transaction, never()).updateActiveState(any(), any()); - } - - @Test - public void shouldFailReceiveForUnknownActiveWFInstance() throws Exception { - when(transaction.readActiveState(INSTANCE)).thenReturn(Optional.empty()); - - try { - stateManager.receive(Event.terminate(INSTANCE, Optional.empty())); - fail(); - } catch (IllegalArgumentException ignore) { - } + assertThrows(IllegalStateException.class, + ()-> stateManager.receive(Event.terminate(INSTANCE, Optional.empty()))); verify(transaction, never()).updateActiveState(any(), any()); } @@ -512,12 +497,9 @@ public void triggerShouldHandleThrowingOutputHandler() throws Exception { when(transaction.workflow(INSTANCE.workflowId())).thenReturn(Optional.of(WORKFLOW)); final RuntimeException rootCause = new RuntimeException("foo!"); doThrow(rootCause).when(outputHandler).transitionInto(any(), any()); - try { - stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS); - fail(); - } catch (Exception e) { - assertThat(Throwables.getRootCause(e), is(rootCause)); - } + + var thrown = assertThrows(Exception.class, () -> stateManager.trigger(INSTANCE, TRIGGER1, PARAMETERS)); + assertEquals(rootCause, thrown); } @Test @@ -528,12 +510,10 @@ public void receiveShouldPropagateOutputHandlerException() throws Exception { final RuntimeException rootCause = new RuntimeException("foo!"); doThrow(rootCause).when(outputHandler).transitionInto(any(), any()); - try { - stateManager.receive(Event.dequeue(INSTANCE, ImmutableSet.of())); - fail(); - } catch (Exception e) { - assertThat(Throwables.getRootCause(e), is(rootCause)); - } + + var thrown = assertThrows(Exception.class, () -> stateManager.receive(Event.dequeue(INSTANCE, ImmutableSet.of()))); + + assertEquals(rootCause, thrown); } @Test @@ -554,11 +534,10 @@ public void receiveShouldLogFailure() throws Exception { reset(storage); when(storage.runInTransactionWithRetries(any())).thenThrow(cause); final Event event = Event.started(INSTANCE); - try { - stateManager.receive(event, 4711L); - fail(); - } catch (Exception ignore) { - } + + assertThrows(Exception.class, + () -> stateManager.receive(event, 4711L)); + verify(logger).debug( "Failed workflow instance transition: {}, counter={}", event, 4711L, cause); @@ -571,11 +550,9 @@ public void triggerShouldLogTransactionConflict() throws Exception { reset(storage); when(storage.getLatestStoredCounter(INSTANCE)).thenReturn(Optional.empty()); when(storage.runInTransactionWithRetries(any())).thenThrow(cause); - try { - stateManager.trigger(INSTANCE, Trigger.natural(), PARAMETERS); - fail(); - } catch (Exception ignore) { - } + + assertThrows(Exception.class, () -> stateManager.trigger(INSTANCE, Trigger.natural(), PARAMETERS)); + verify(logger).debug("Transaction conflict when triggering workflow instance. Aborted: {}", INSTANCE); } @@ -587,11 +564,10 @@ public void triggerShouldLogTransactionFailure() throws Exception { reset(storage); when(storage.getLatestStoredCounter(INSTANCE)).thenReturn(Optional.empty()); when(storage.runInTransactionWithRetries(any())).thenThrow(cause); - try { - stateManager.trigger(INSTANCE, Trigger.natural(), PARAMETERS); - fail(); - } catch (Exception ignore) { - } + + assertThrows(Exception.class, + ()-> stateManager.trigger(INSTANCE, Trigger.natural(), PARAMETERS)); + verify(logger).debug("Transaction failure when triggering workflow instance: {}: {}", INSTANCE, cause.getMessage(), cause); } @@ -602,11 +578,8 @@ public void triggerShouldLogFailure() throws Exception { reset(storage); when(storage.getLatestStoredCounter(INSTANCE)).thenReturn(Optional.empty()); when(storage.runInTransactionWithRetries(any())).thenThrow(cause); - try { - stateManager.trigger(INSTANCE, Trigger.natural(), PARAMETERS); - fail(); - } catch (Exception ignore) { - } + + assertThrows(Exception.class, () -> stateManager.trigger(INSTANCE, Trigger.natural(), PARAMETERS)); verify(logger).debug("Failure when triggering workflow instance: {}: {}", INSTANCE, cause.getMessage(), cause); } @@ -615,12 +588,10 @@ public void triggerShouldLogFailure() throws Exception { public void shouldThrowRuntimeException() throws Exception { final IOException exception = new IOException(); doThrow(exception).when(storage).runInTransactionWithRetries(any()); - try { - stateManager.receive(Event.dequeue(INSTANCE, ImmutableSet.of())); - fail(); - } catch (Exception e) { - assertThat(Throwables.getRootCause(e), is(exception)); - } + + var thrown = assertThrows(Exception.class, + ()-> stateManager.receive(Event.dequeue(INSTANCE, ImmutableSet.of()))); + assertThat(thrown.getCause(), is(exception)); } @Test @@ -644,12 +615,8 @@ public void shouldFailToUpdateResourceCountersOnDequeueDueToCapacity() throws Ex final PersistentStateManager spied = spy(stateManager); doNothing().when(spied).receiveIgnoreClosed(eq(infoEvent), anyLong()); - try { - spied.receive(dequeueEvent); - fail(); - } catch (Exception e) { - // expected exception - } + assertThrows(Exception.class, + ()-> spied.receive(dequeueEvent)); verify(spied).receiveIgnoreClosed(eq(infoEvent), anyLong()); } @@ -671,13 +638,8 @@ public void shouldFailToUpdateResourceCountersOnDequeueDueToCapacityAndNoInfoEve resources.stream().map(Resource::id).collect(toSet())); final PersistentStateManager spied = spy(stateManager); - try { - spied.receive(dequeueEvent); - fail(); - } catch (Exception e) { - // expected exception - } - + assertThrows(Exception.class, + ()-> spied.receive(dequeueEvent)); verify(spied, never()).receiveIgnoreClosed(eq(Event.info(INSTANCE, message)), anyLong()); } @@ -686,15 +648,13 @@ public void shouldFailToUpdateResourceCountersOnDequeueDueToConflict() throws Ex givenState(INSTANCE, State.QUEUED); var rootCause = new DatastoreIOException(new DatastoreException(10, "conflict!", "conflict!")); - doThrow(rootCause).when(transaction).updateCounter(shardedCounter, "resource1", 1); - var dequeueEvent = Event.dequeue(INSTANCE, Set.of("resource1")); - exception.expect(RuntimeException.class); - exception.expectCause(is(rootCause)); + var thrown = assertThrows(RuntimeException.class, + () -> stateManager.receive(dequeueEvent)); - stateManager.receive(dequeueEvent); + assertEquals(rootCause, thrown.getCause()); } @Test