From 87514ee1d43328756a76c36b8142995acc494c45 Mon Sep 17 00:00:00 2001 From: Victor Schappert Date: Mon, 4 Apr 2022 08:01:20 -0700 Subject: [PATCH] Fix frozen stream when chunk exceeds temp errors (issue #15) https://github.com/gogama/incite/issues/15 --- incite_test.go | 47 +++++++++++++++++++++++++++++------ mgr_test.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ poller.go | 16 ++++++++---- poller_test.go | 5 ++++ starter.go | 19 ++++++++------ starter_test.go | 3 +++ 6 files changed, 136 insertions(+), 20 deletions(-) diff --git a/incite_test.go b/incite_test.go index 39e14eb..0b72f94 100644 --- a/incite_test.go +++ b/incite_test.go @@ -229,6 +229,28 @@ var scenarios = []queryScenario{ RangeFailed: defaultDuration, }, }, + { + note: "NoStart.ExceedMaxTemporaryErrors", + QuerySpec: QuerySpec{ + Text: "a recipient of repeated temporary trauma", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/impaired/group"}, + }, + chunks: []chunkPlan{ + { + startQueryInput: startQueryInput("a recipient of repeated temporary trauma", defaultStart, defaultEnd, DefaultLimit, "/impaired/group"), + startQueryErrs: repeatErr(maxTempStartingErrs, syscall.ETIMEDOUT), + startQuerySuccess: false, + }, + }, + err: &StartQueryError{"a recipient of repeated temporary trauma", defaultStart, defaultEnd, syscall.ETIMEDOUT}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, + }, { note: "OneChunk.OnePoll.Empty", @@ -276,7 +298,7 @@ var scenarios = []queryScenario{ }, }, }, - err: &TerminalQueryStatusError{"scenario:3|chunk:0|OneChunk.OnePoll.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "destined for cancellation"}, + err: &TerminalQueryStatusError{"scenario:4|chunk:0|OneChunk.OnePoll.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "destined for cancellation"}, stats: Stats{ RangeRequested: defaultDuration, RangeStarted: defaultDuration, @@ -302,7 +324,7 @@ var scenarios = []queryScenario{ }, }, }, - err: &TerminalQueryStatusError{"scenario:4|chunk:0|OneChunk.OnePoll.Status.Timeout", "Timeout", "tempting a timeout"}, + err: &TerminalQueryStatusError{"scenario:5|chunk:0|OneChunk.OnePoll.Status.Timeout", "Timeout", "tempting a timeout"}, stats: Stats{ RangeRequested: defaultDuration, RangeStarted: defaultDuration, @@ -328,7 +350,7 @@ var scenarios = []queryScenario{ }, }, }, - err: &TerminalQueryStatusError{"scenario:5|chunk:0|OneChunk.OnePoll.Status.Unexpected", "Did you see this coming?", "expecting the unexpected...status"}, + err: &TerminalQueryStatusError{"scenario:6|chunk:0|OneChunk.OnePoll.Status.Unexpected", "Did you see this coming?", "expecting the unexpected...status"}, stats: Stats{ RangeRequested: defaultDuration, RangeStarted: defaultDuration, @@ -354,7 +376,7 @@ var scenarios = []queryScenario{ }, }, }, - err: &UnexpectedQueryError{"scenario:6|chunk:0|OneChunk.OnePoll.Error.Unexpected", "expecting the unexpected...error", errors.New("very bad news")}, + err: &UnexpectedQueryError{"scenario:7|chunk:0|OneChunk.OnePoll.Error.Unexpected", "expecting the unexpected...error", errors.New("very bad news")}, stats: Stats{ RangeRequested: defaultDuration, RangeStarted: defaultDuration, @@ -515,7 +537,7 @@ var scenarios = []queryScenario{ }, }, }, - err: &TerminalQueryStatusError{"scenario:9|chunk:0|OneChunk.Preview.Status.Failed", cloudwatchlogs.QueryStatusFailed, "fated for failure"}, + err: &TerminalQueryStatusError{"scenario:10|chunk:0|OneChunk.Preview.Status.Failed", cloudwatchlogs.QueryStatusFailed, "fated for failure"}, stats: Stats{ RangeRequested: defaultDuration, RangeStarted: defaultDuration, @@ -542,7 +564,7 @@ var scenarios = []queryScenario{ }, }, }, - err: &TerminalQueryStatusError{"scenario:10|chunk:0|OneChunk.Preview.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "preview of coming cancellations"}, + err: &TerminalQueryStatusError{"scenario:11|chunk:0|OneChunk.Preview.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "preview of coming cancellations"}, stats: Stats{ RangeRequested: defaultDuration, RangeStarted: defaultDuration, @@ -569,7 +591,7 @@ var scenarios = []queryScenario{ }, }, }, - err: &TerminalQueryStatusError{"scenario:11|chunk:0|OneChunk.Preview.Status.Timeout", "Timeout", "preview of coming timeouts"}, + err: &TerminalQueryStatusError{"scenario:12|chunk:0|OneChunk.Preview.Status.Timeout", "Timeout", "preview of coming timeouts"}, stats: Stats{ RangeRequested: defaultDuration, RangeStarted: defaultDuration, @@ -596,7 +618,7 @@ var scenarios = []queryScenario{ }, }, }, - err: &TerminalQueryStatusError{"scenario:12|chunk:0|OneChunk.Preview.Status.Unexpected", "I did NOT see this coming!", "preview of coming surprises..."}, + err: &TerminalQueryStatusError{"scenario:13|chunk:0|OneChunk.Preview.Status.Unexpected", "I did NOT see this coming!", "preview of coming surprises..."}, stats: Stats{ RangeRequested: defaultDuration, RangeStarted: defaultDuration, @@ -1976,6 +1998,14 @@ func result(ptr int) Result { return Result{{"@ptr", strconv.Itoa(ptr)}} } +func repeatErr(n int, err error) []error { + errs := make([]error, n) + for i := range errs { + errs[i] = err + } + return errs +} + var ( defaultDuration = 5 * time.Minute defaultStart = time.Date(2020, 8, 25, 3, 30, 0, 0, time.UTC) @@ -1990,5 +2020,6 @@ var ( }) anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput") anyGetQueryResultsInput = mock.AnythingOfType("*cloudwatchlogs.GetQueryResultsInput") + anyStopQueryInput = mock.AnythingOfType("*cloudwatchlogs.StopQueryInput") maxLimitResults = resultSeries(0, MaxLimit) ) diff --git a/mgr_test.go b/mgr_test.go index 50a6bc8..eb2fd9e 100644 --- a/mgr_test.go +++ b/mgr_test.go @@ -1866,4 +1866,70 @@ func TestQueryManager_Query(t *testing.T) { }) } }) + + t.Run("Query Fails with Error if Chunk Exceeds Max Temporary Errors", func(t *testing.T) { + text := "a query destined to exceed all maxima on temporary errors" + groups := []string{"grpA", "grpB"} + expectedErr := cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "you been limited") + + testCases := []struct { + name string + setup func(actions *mockActions) + }{ + { + name: "InStarter", + setup: func(actions *mockActions) { + actions. + On("StartQueryWithContext", anyContext, startQueryInput(text, defaultStart, defaultEnd, DefaultLimit, groups...)). + Return(nil, expectedErr). + Times(maxTempStartingErrs) + }, + }, + { + name: "InPoller", + setup: func(actions *mockActions) { + queryID := "u-r-doomed" + stopSuccess := true + actions. + On("StartQueryWithContext", anyContext, startQueryInput(text, defaultStart, defaultEnd, DefaultLimit, groups...)). + Return(&cloudwatchlogs.StartQueryOutput{QueryId: &queryID}, nil). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{QueryId: &queryID}). + Return(nil, expectedErr). + Times(maxTempPollingErrs) + actions. + On("StopQueryWithContext", anyContext, anyStopQueryInput). + Return(&cloudwatchlogs.StopQueryOutput{Success: &stopSuccess}, nil). + Maybe() + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + actions := newMockActions(t) + testCase.setup(actions) + m := NewQueryManager(Config{ + Actions: actions, + RPS: lotsOfRPS, + }) + t.Cleanup(func() { + m.Close() + }) + s, err := m.Query(QuerySpec{ + Text: text, + Groups: groups, + Start: defaultStart, + End: defaultEnd, + }) + require.NoError(t, err) + require.NotNil(t, s) + + _, err = ReadAll(s) + + assert.ErrorIs(t, err, expectedErr) + }) + } + }) } diff --git a/poller.go b/poller.go index b9baa23..4901d88 100644 --- a/poller.go +++ b/poller.go @@ -15,6 +15,8 @@ type poller struct { worker } +const maxTempPollingErrs = 10 + func newPoller(m *mgr) *poller { p := &poller{ worker: worker{ @@ -23,7 +25,7 @@ func newPoller(m *mgr) *poller { in: m.poll, out: m.update, name: "poller", - maxTemporaryError: 10, + maxTemporaryError: maxTempPollingErrs, }, } p.manipulator = p @@ -53,11 +55,12 @@ func (p *poller) manipulate(c *chunk) outcome { output, err := p.m.Actions.GetQueryResultsWithContext(c.ctx, &input) p.lastReq = time.Now() - if err != nil && isTemporary(err) { - p.m.logChunk(c, "temporary failure to poll", err.Error()) - return temporaryError - } else if err != nil { + if err != nil { c.err = &UnexpectedQueryError{c.queryID, c.stream.Text, err} + if isTemporary(err) { + p.m.logChunk(c, "temporary failure to poll", err.Error()) + return temporaryError + } return finished } @@ -69,8 +72,10 @@ func (p *poller) manipulate(c *chunk) outcome { status := *output.Status switch status { case cloudwatchlogs.QueryStatusScheduled, "Unknown": + c.err = nil return inconclusive case cloudwatchlogs.QueryStatusRunning: + c.err = nil if c.ptr == nil { return inconclusive // Ignore non-previewable results. } @@ -85,6 +90,7 @@ func (p *poller) manipulate(c *chunk) outcome { c.err = errSplitChunk return finished } + c.err = nil c.Stats.RangeDone += c.duration() if sendChunkBlock(c, output.Results) { c.state = complete diff --git a/poller_test.go b/poller_test.go index 6fbc566..bc1ce0e 100644 --- a/poller_test.go +++ b/poller_test.go @@ -91,6 +91,11 @@ func TestPoller_manipulate(t *testing.T) { }, err: syscall.ECONNRESET, expectedOutcome: temporaryError, + expectedChunkErr: &UnexpectedQueryError{ + QueryID: queryID, + Text: text, + Cause: syscall.ECONNRESET, + }, }, { name: "Permanent Error", diff --git a/starter.go b/starter.go index 934c36f..4234841 100644 --- a/starter.go +++ b/starter.go @@ -16,6 +16,8 @@ type starter struct { worker } +const maxTempStartingErrs = 10 + func newStarter(m *mgr) *starter { s := &starter{ worker: worker{ @@ -24,7 +26,7 @@ func newStarter(m *mgr) *starter { in: m.start, out: m.update, name: "starter", - maxTemporaryError: 10, + maxTemporaryError: maxTempStartingErrs, }, } s.manipulator = s @@ -55,13 +57,15 @@ func (s *starter) manipulate(c *chunk) outcome { } output, err := s.m.Actions.StartQueryWithContext(c.ctx, &input) s.lastReq = time.Now() - if err != nil && isTemporary(err) { - s.m.logChunk(c, "temporary failure to start", err.Error()) - return temporaryError - } else if err != nil { + if err != nil { c.err = &StartQueryError{c.stream.Text, c.start, c.end, err} - s.m.logChunk(c, "permanent failure to start", "fatal error from CloudWatch Logs: "+err.Error()) - return finished + if isTemporary(err) { + s.m.logChunk(c, "temporary failure to start", err.Error()) + return temporaryError + } else { + s.m.logChunk(c, "permanent failure to start", "fatal error from CloudWatch Logs: "+err.Error()) + return finished + } } // Save the current query ID into the chunk. @@ -75,6 +79,7 @@ func (s *starter) manipulate(c *chunk) outcome { // Chunk is started successfully. c.state = started + c.err = nil s.m.logChunk(c, "started", "") return finished } diff --git a/starter_test.go b/starter_test.go index 16d423f..20b4588 100644 --- a/starter_test.go +++ b/starter_test.go @@ -151,6 +151,9 @@ func TestStarter_manipulate(t *testing.T) { assert.Equal(t, testCase.expected, actual) assert.GreaterOrEqual(t, s.lastReq.Sub(before), time.Duration(0)) assert.GreaterOrEqual(t, after.Sub(s.lastReq), time.Duration(0)) + if testCase.err != nil { + assert.Equal(t, &StartQueryError{text, start, end, testCase.err}, c.err) + } actions.AssertExpectations(t) logger.AssertExpectations(t) })