Skip to content

Commit

Permalink
Fix frozen stream when chunk exceeds temp errors (issue #15)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcschapp committed Apr 4, 2022
1 parent 5d76e38 commit 87514ee
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 20 deletions.
47 changes: 39 additions & 8 deletions incite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,28 @@ var scenarios = []queryScenario{
RangeFailed: defaultDuration,
},
},
{
note: "NoStart.ExceedMaxTemporaryErrors",
QuerySpec: QuerySpec{
Text: "a recipient of repeated temporary trauma",
Start: defaultStart,
End: defaultEnd,
Groups: []string{"/impaired/group"},
},
chunks: []chunkPlan{
{
startQueryInput: startQueryInput("a recipient of repeated temporary trauma", defaultStart, defaultEnd, DefaultLimit, "/impaired/group"),
startQueryErrs: repeatErr(maxTempStartingErrs, syscall.ETIMEDOUT),
startQuerySuccess: false,
},
},
err: &StartQueryError{"a recipient of repeated temporary trauma", defaultStart, defaultEnd, syscall.ETIMEDOUT},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
RangeFailed: defaultDuration,
},
},

{
note: "OneChunk.OnePoll.Empty",
Expand Down Expand Up @@ -276,7 +298,7 @@ var scenarios = []queryScenario{
},
},
},
err: &TerminalQueryStatusError{"scenario:3|chunk:0|OneChunk.OnePoll.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "destined for cancellation"},
err: &TerminalQueryStatusError{"scenario:4|chunk:0|OneChunk.OnePoll.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "destined for cancellation"},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
Expand All @@ -302,7 +324,7 @@ var scenarios = []queryScenario{
},
},
},
err: &TerminalQueryStatusError{"scenario:4|chunk:0|OneChunk.OnePoll.Status.Timeout", "Timeout", "tempting a timeout"},
err: &TerminalQueryStatusError{"scenario:5|chunk:0|OneChunk.OnePoll.Status.Timeout", "Timeout", "tempting a timeout"},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
Expand All @@ -328,7 +350,7 @@ var scenarios = []queryScenario{
},
},
},
err: &TerminalQueryStatusError{"scenario:5|chunk:0|OneChunk.OnePoll.Status.Unexpected", "Did you see this coming?", "expecting the unexpected...status"},
err: &TerminalQueryStatusError{"scenario:6|chunk:0|OneChunk.OnePoll.Status.Unexpected", "Did you see this coming?", "expecting the unexpected...status"},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
Expand All @@ -354,7 +376,7 @@ var scenarios = []queryScenario{
},
},
},
err: &UnexpectedQueryError{"scenario:6|chunk:0|OneChunk.OnePoll.Error.Unexpected", "expecting the unexpected...error", errors.New("very bad news")},
err: &UnexpectedQueryError{"scenario:7|chunk:0|OneChunk.OnePoll.Error.Unexpected", "expecting the unexpected...error", errors.New("very bad news")},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
Expand Down Expand Up @@ -515,7 +537,7 @@ var scenarios = []queryScenario{
},
},
},
err: &TerminalQueryStatusError{"scenario:9|chunk:0|OneChunk.Preview.Status.Failed", cloudwatchlogs.QueryStatusFailed, "fated for failure"},
err: &TerminalQueryStatusError{"scenario:10|chunk:0|OneChunk.Preview.Status.Failed", cloudwatchlogs.QueryStatusFailed, "fated for failure"},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
Expand All @@ -542,7 +564,7 @@ var scenarios = []queryScenario{
},
},
},
err: &TerminalQueryStatusError{"scenario:10|chunk:0|OneChunk.Preview.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "preview of coming cancellations"},
err: &TerminalQueryStatusError{"scenario:11|chunk:0|OneChunk.Preview.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "preview of coming cancellations"},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
Expand All @@ -569,7 +591,7 @@ var scenarios = []queryScenario{
},
},
},
err: &TerminalQueryStatusError{"scenario:11|chunk:0|OneChunk.Preview.Status.Timeout", "Timeout", "preview of coming timeouts"},
err: &TerminalQueryStatusError{"scenario:12|chunk:0|OneChunk.Preview.Status.Timeout", "Timeout", "preview of coming timeouts"},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
Expand All @@ -596,7 +618,7 @@ var scenarios = []queryScenario{
},
},
},
err: &TerminalQueryStatusError{"scenario:12|chunk:0|OneChunk.Preview.Status.Unexpected", "I did NOT see this coming!", "preview of coming surprises..."},
err: &TerminalQueryStatusError{"scenario:13|chunk:0|OneChunk.Preview.Status.Unexpected", "I did NOT see this coming!", "preview of coming surprises..."},
stats: Stats{
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
Expand Down Expand Up @@ -1976,6 +1998,14 @@ func result(ptr int) Result {
return Result{{"@ptr", strconv.Itoa(ptr)}}
}

func repeatErr(n int, err error) []error {
errs := make([]error, n)
for i := range errs {
errs[i] = err
}
return errs
}

var (
defaultDuration = 5 * time.Minute
defaultStart = time.Date(2020, 8, 25, 3, 30, 0, 0, time.UTC)
Expand All @@ -1990,5 +2020,6 @@ var (
})
anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput")
anyGetQueryResultsInput = mock.AnythingOfType("*cloudwatchlogs.GetQueryResultsInput")
anyStopQueryInput = mock.AnythingOfType("*cloudwatchlogs.StopQueryInput")
maxLimitResults = resultSeries(0, MaxLimit)
)
66 changes: 66 additions & 0 deletions mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,4 +1866,70 @@ func TestQueryManager_Query(t *testing.T) {
})
}
})

t.Run("Query Fails with Error if Chunk Exceeds Max Temporary Errors", func(t *testing.T) {
text := "a query destined to exceed all maxima on temporary errors"
groups := []string{"grpA", "grpB"}
expectedErr := cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "you been limited")

testCases := []struct {
name string
setup func(actions *mockActions)
}{
{
name: "InStarter",
setup: func(actions *mockActions) {
actions.
On("StartQueryWithContext", anyContext, startQueryInput(text, defaultStart, defaultEnd, DefaultLimit, groups...)).
Return(nil, expectedErr).
Times(maxTempStartingErrs)
},
},
{
name: "InPoller",
setup: func(actions *mockActions) {
queryID := "u-r-doomed"
stopSuccess := true
actions.
On("StartQueryWithContext", anyContext, startQueryInput(text, defaultStart, defaultEnd, DefaultLimit, groups...)).
Return(&cloudwatchlogs.StartQueryOutput{QueryId: &queryID}, nil).
Once()
actions.
On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{QueryId: &queryID}).
Return(nil, expectedErr).
Times(maxTempPollingErrs)
actions.
On("StopQueryWithContext", anyContext, anyStopQueryInput).
Return(&cloudwatchlogs.StopQueryOutput{Success: &stopSuccess}, nil).
Maybe()
},
},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
actions := newMockActions(t)
testCase.setup(actions)
m := NewQueryManager(Config{
Actions: actions,
RPS: lotsOfRPS,
})
t.Cleanup(func() {
m.Close()
})
s, err := m.Query(QuerySpec{
Text: text,
Groups: groups,
Start: defaultStart,
End: defaultEnd,
})
require.NoError(t, err)
require.NotNil(t, s)

_, err = ReadAll(s)

assert.ErrorIs(t, err, expectedErr)
})
}
})
}
16 changes: 11 additions & 5 deletions poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type poller struct {
worker
}

const maxTempPollingErrs = 10

func newPoller(m *mgr) *poller {
p := &poller{
worker: worker{
Expand All @@ -23,7 +25,7 @@ func newPoller(m *mgr) *poller {
in: m.poll,
out: m.update,
name: "poller",
maxTemporaryError: 10,
maxTemporaryError: maxTempPollingErrs,
},
}
p.manipulator = p
Expand Down Expand Up @@ -53,11 +55,12 @@ func (p *poller) manipulate(c *chunk) outcome {
output, err := p.m.Actions.GetQueryResultsWithContext(c.ctx, &input)
p.lastReq = time.Now()

if err != nil && isTemporary(err) {
p.m.logChunk(c, "temporary failure to poll", err.Error())
return temporaryError
} else if err != nil {
if err != nil {
c.err = &UnexpectedQueryError{c.queryID, c.stream.Text, err}
if isTemporary(err) {
p.m.logChunk(c, "temporary failure to poll", err.Error())
return temporaryError
}
return finished
}

Expand All @@ -69,8 +72,10 @@ func (p *poller) manipulate(c *chunk) outcome {
status := *output.Status
switch status {
case cloudwatchlogs.QueryStatusScheduled, "Unknown":
c.err = nil
return inconclusive
case cloudwatchlogs.QueryStatusRunning:
c.err = nil
if c.ptr == nil {
return inconclusive // Ignore non-previewable results.
}
Expand All @@ -85,6 +90,7 @@ func (p *poller) manipulate(c *chunk) outcome {
c.err = errSplitChunk
return finished
}
c.err = nil
c.Stats.RangeDone += c.duration()
if sendChunkBlock(c, output.Results) {
c.state = complete
Expand Down
5 changes: 5 additions & 0 deletions poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func TestPoller_manipulate(t *testing.T) {
},
err: syscall.ECONNRESET,
expectedOutcome: temporaryError,
expectedChunkErr: &UnexpectedQueryError{
QueryID: queryID,
Text: text,
Cause: syscall.ECONNRESET,
},
},
{
name: "Permanent Error",
Expand Down
19 changes: 12 additions & 7 deletions starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type starter struct {
worker
}

const maxTempStartingErrs = 10

func newStarter(m *mgr) *starter {
s := &starter{
worker: worker{
Expand All @@ -24,7 +26,7 @@ func newStarter(m *mgr) *starter {
in: m.start,
out: m.update,
name: "starter",
maxTemporaryError: 10,
maxTemporaryError: maxTempStartingErrs,
},
}
s.manipulator = s
Expand Down Expand Up @@ -55,13 +57,15 @@ func (s *starter) manipulate(c *chunk) outcome {
}
output, err := s.m.Actions.StartQueryWithContext(c.ctx, &input)
s.lastReq = time.Now()
if err != nil && isTemporary(err) {
s.m.logChunk(c, "temporary failure to start", err.Error())
return temporaryError
} else if err != nil {
if err != nil {
c.err = &StartQueryError{c.stream.Text, c.start, c.end, err}
s.m.logChunk(c, "permanent failure to start", "fatal error from CloudWatch Logs: "+err.Error())
return finished
if isTemporary(err) {
s.m.logChunk(c, "temporary failure to start", err.Error())
return temporaryError
} else {
s.m.logChunk(c, "permanent failure to start", "fatal error from CloudWatch Logs: "+err.Error())
return finished
}
}

// Save the current query ID into the chunk.
Expand All @@ -75,6 +79,7 @@ func (s *starter) manipulate(c *chunk) outcome {

// Chunk is started successfully.
c.state = started
c.err = nil
s.m.logChunk(c, "started", "")
return finished
}
Expand Down
3 changes: 3 additions & 0 deletions starter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ func TestStarter_manipulate(t *testing.T) {
assert.Equal(t, testCase.expected, actual)
assert.GreaterOrEqual(t, s.lastReq.Sub(before), time.Duration(0))
assert.GreaterOrEqual(t, after.Sub(s.lastReq), time.Duration(0))
if testCase.err != nil {
assert.Equal(t, &StartQueryError{text, start, end, testCase.err}, c.err)
}
actions.AssertExpectations(t)
logger.AssertExpectations(t)
})
Expand Down

0 comments on commit 87514ee

Please sign in to comment.