From 9c36b5f0f5b2750a2847707dd06f53c513bd43d2 Mon Sep 17 00:00:00 2001 From: stroomdev66 Date: Wed, 15 Jan 2025 22:51:08 +0000 Subject: [PATCH] #4682 PlanB State Store --- .../pipeline/task/TestStateLookupTask.java | 28 +++++++++++-------- .../pipeline/refdata/ReferenceData.java | 17 ++++++++--- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/stroom-app/src/test/java/stroom/pipeline/task/TestStateLookupTask.java b/stroom-app/src/test/java/stroom/pipeline/task/TestStateLookupTask.java index ca74b2365b..7e8350e6fd 100644 --- a/stroom-app/src/test/java/stroom/pipeline/task/TestStateLookupTask.java +++ b/stroom-app/src/test/java/stroom/pipeline/task/TestStateLookupTask.java @@ -54,7 +54,7 @@ import java.util.Collections; import java.util.List; import java.util.Map.Entry; -import java.util.UUID; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -95,10 +95,10 @@ void test() throws IOException { scyllaDbDoc.setConnection(ScyllaDbUtil.getDefaultConnection()); scyllaDbDocStore.writeDocument(scyllaDbDoc); - createStateDoc(scyllaDbDoc, "hostname_to_location_map"); - createStateDoc(scyllaDbDoc, "hostname_to_ip_map"); - createStateDoc(scyllaDbDoc, "id_to_user_map"); - createStateDoc(scyllaDbDoc, "number_to_id"); + final DocRef stateDoc1 = createStateDoc(scyllaDbDoc, "hostname_to_location_map"); + final DocRef stateDoc2 = createStateDoc(scyllaDbDoc, "hostname_to_ip_map"); + final DocRef stateDoc3 = createStateDoc(scyllaDbDoc, "id_to_user_map"); + final DocRef stateDoc4 = createStateDoc(scyllaDbDoc, "number_to_id"); // Add reference data to state store. // Setup the pipeline. @@ -125,12 +125,15 @@ void test() throws IOException { assertThat(refDataProcessResults).hasSize(3); // Add event data and processor filters. - final List pipelineReferences = Collections.singletonList(PipelineDataUtil.createReference( - "translationFilter", - "pipelineReference", - new DocRef(StateDoc.TYPE, UUID.randomUUID().toString()), - null, - null)); + final List pipelineReferences = Stream + .of(stateDoc1, stateDoc2, stateDoc3, stateDoc4) + .map(docRef -> PipelineDataUtil.createReference( + "translationFilter", + "pipelineReference", + docRef, + null, + null)) + .toList(); commonTranslationTestHelper.setupStateProcess( CommonTranslationTestHelper.FEED_NAME, Collections.singletonList(CommonTranslationTestHelper.VALID_RESOURCE_NAME), @@ -181,11 +184,12 @@ private void process(final int expectedProcessCount) { } } - private void createStateDoc(final ScyllaDbDoc scyllaDbDoc, final String name) { + private DocRef createStateDoc(final ScyllaDbDoc scyllaDbDoc, final String name) { final DocRef docRef = stateDocStore.createDocument(name); final StateDoc doc = stateDocStore.readDocument(docRef); doc.setScyllaDbRef(scyllaDbDoc.asDocRef()); doc.setStateType(StateType.TEMPORAL_STATE); stateDocStore.writeDocument(doc); + return docRef; } } diff --git a/stroom-pipeline/src/main/java/stroom/pipeline/refdata/ReferenceData.java b/stroom-pipeline/src/main/java/stroom/pipeline/refdata/ReferenceData.java index 1d097d1cd6..07b229549b 100644 --- a/stroom-pipeline/src/main/java/stroom/pipeline/refdata/ReferenceData.java +++ b/stroom-pipeline/src/main/java/stroom/pipeline/refdata/ReferenceData.java @@ -49,6 +49,7 @@ import stroom.util.logging.LogUtil; import stroom.util.shared.Severity; +import jakarta.annotation.Nullable; import jakarta.inject.Inject; import java.time.Instant; @@ -98,8 +99,8 @@ public class ReferenceData { final PipelineStore pipelineStore, final SecurityContext securityContext, final TaskContextFactory taskContextFactory, - final StateLookup stateLookup, - final PlanBLookup planBLookup) { + @Nullable final StateLookup stateLookup, + @Nullable final PlanBLookup planBLookup) { this.effectiveStreamService = effectiveStreamService; this.feedHolder = feedHolder; this.metaHolder = metaHolder; @@ -244,13 +245,21 @@ private void doGetValue(final List pipelineReferences, final String mapName = lookupIdentifier.getPrimaryMapName(); if (PlanBDoc.TYPE.equals(pipeline.getType())) { // TODO : @66 TEMPORARY INTEGRATION OF STATE LOOKUP USING PIPELINE AS STATE DOC REFERENCE. - if (planBLookup != null && mapName.equalsIgnoreCase(pipeline.getName())) { + Objects.requireNonNull(planBLookup, + "Attempt to perform Plan B state lookup but Plan B service is not present"); + Objects.requireNonNull(pipeline.getName(), "Null name for Plan B doc ref in lookup"); + + if (mapName.equalsIgnoreCase(pipeline.getName())) { planBLookup.lookup(lookupIdentifier, referenceDataResult); } } else if (StateDoc.TYPE.equals(pipeline.getType())) { // TODO : @66 TEMPORARY INTEGRATION OF STATE LOOKUP USING PIPELINE AS STATE DOC REFERENCE. - if (planBLookup != null && mapName.equalsIgnoreCase(pipeline.getName())) { + Objects.requireNonNull(stateLookup, + "Attempt to perform state lookup but state lookup service is not present"); + Objects.requireNonNull(pipeline.getName(), "Null name for state doc ref in lookup"); + + if (mapName.equalsIgnoreCase(pipeline.getName())) { stateLookup.lookup(lookupIdentifier, referenceDataResult); }