diff --git a/cloudwatchlogs_actions_test.go b/cloudwatchlogs_actions_test.go index 2b65356..30c307f 100644 --- a/cloudwatchlogs_actions_test.go +++ b/cloudwatchlogs_actions_test.go @@ -2,6 +2,7 @@ package incite import ( "context" + "sync" "testing" "github.com/aws/aws-sdk-go/aws/request" @@ -12,6 +13,7 @@ import ( type mockActions struct { mock.Mock + lock sync.RWMutex } func newMockActions(t *testing.T) *mockActions { @@ -21,6 +23,9 @@ func newMockActions(t *testing.T) *mockActions { } func (m *mockActions) StartQueryWithContext(ctx context.Context, input *cloudwatchlogs.StartQueryInput, _ ...request.Option) (*cloudwatchlogs.StartQueryOutput, error) { + m.lock.RLock() + defer m.lock.RUnlock() + args := m.Called(ctx, input) if output, ok := args.Get(0).(*cloudwatchlogs.StartQueryOutput); ok { return output, args.Error(1) @@ -29,6 +34,9 @@ func (m *mockActions) StartQueryWithContext(ctx context.Context, input *cloudwat } func (m *mockActions) StopQueryWithContext(ctx context.Context, input *cloudwatchlogs.StopQueryInput, _ ...request.Option) (*cloudwatchlogs.StopQueryOutput, error) { + m.lock.RLock() + defer m.lock.RUnlock() + args := m.Called(ctx, input) if output, ok := args.Get(0).(*cloudwatchlogs.StopQueryOutput); ok { return output, args.Error(1) @@ -37,6 +45,9 @@ func (m *mockActions) StopQueryWithContext(ctx context.Context, input *cloudwatc } func (m *mockActions) GetQueryResultsWithContext(ctx context.Context, input *cloudwatchlogs.GetQueryResultsInput, _ ...request.Option) (*cloudwatchlogs.GetQueryResultsOutput, error) { + m.lock.RLock() + defer m.lock.RUnlock() + args := m.Called(ctx, input) if output, ok := args.Get(0).(*cloudwatchlogs.GetQueryResultsOutput); ok { return output, args.Error(1) diff --git a/errors.go b/errors.go index a090a66..c40ed91 100644 --- a/errors.go +++ b/errors.go @@ -3,6 +3,7 @@ package incite import ( "errors" "fmt" + "time" ) var ( @@ -11,24 +12,43 @@ var ( ErrClosed = errors.New("incite: operation on a closed object") ) -type wrappedErr struct { - cause error - msg string +type StartQueryError struct { + Text string + Start time.Time + End time.Time + Cause error } -func wrap(cause error, format string, a ...interface{}) error { - return &wrappedErr{ - cause: cause, - msg: fmt.Sprintf(format, a...), - } +func (err *StartQueryError) Error() string { + return fmt.Sprintf("incite: CloudWatch Logs failed to start query for chunk %q [%s..%s): %s", err.Text, err.Start, err.End, err.Cause) } -func (w *wrappedErr) Error() string { - return w.msg + ": " + w.cause.Error() +func (err *StartQueryError) Unwrap() error { + return err.Cause } -func (w *wrappedErr) Unwrap() error { - return w.cause +type TerminalQueryStatusError struct { + QueryID string + Status string + Text string +} + +func (err *TerminalQueryStatusError) Error() string { + return fmt.Sprintf("incite: query %q has terminal status %q (text %q)", err.QueryID, err.Status, err.Text) +} + +type UnexpectedQueryError struct { + QueryID string + Text string + Cause error +} + +func (err *UnexpectedQueryError) Error() string { + return fmt.Sprintf("incite: query %q had unexpected error (text %q): %s", err.QueryID, err.Text, err.Cause) +} + +func (err *UnexpectedQueryError) Unwrap() error { + return err.Cause } func errNoKey(id string) error { @@ -39,6 +59,10 @@ func errNoValue(id, key string) error { return fmt.Errorf("incite: query chunk %q: no value for key %q", id, key) } +func errNilStatus() error { + return errors.New("incite: nil status in GetQueryResults output from CloudWatch Logs") +} + const ( nilActionsMsg = "incite: nil actions" nilReaderMsg = "incite: nil reader" diff --git a/incite.go b/incite.go index 18ee419..0ce292a 100644 --- a/incite.go +++ b/incite.go @@ -5,7 +5,6 @@ import ( "container/ring" "context" "errors" - "fmt" "io" "strings" "sync" @@ -603,7 +602,13 @@ func (m *mgr) startNextChunk() error { m.Logger.Printf("incite: QueryManager(%p) temporary failure to start chunk %q [%s..%s): %s", m, s.Text, next, end, err.Error()) } else { - s.setErr(wrap(err, "incite: fatal error from CloudWatch Logs for chunk %q [%s..%s)", s.Text, next, end), true, Stats{}) + err = &StartQueryError{ + Text: s.Text, + Start: next, + End: end, + Cause: err, + } + s.setErr(err, true, Stats{}) m.stats.add(s.GetStats()) m.Logger.Printf("incite: QueryManager(%p) permanent failure to start %q [%s..%s) due to fatal error from CloudWatch Logs: %s", m, s.Text, next, end, err.Error()) @@ -725,13 +730,15 @@ func (m *mgr) pollChunk(c *chunk) error { output, err = m.Actions.GetQueryResultsWithContext(c.ctx, &input) m.lastReq[GetQueryResults] = time.Now() }) - if err != nil { - return wrap(err, "incite: query chunk %q: failed to poll", c.id) + if err != nil && isTemporary(err) { + return nil + } else if err != nil { + return &UnexpectedQueryError{c.id, c.stream.Text, err} } status := output.Status if status == nil { - return fmt.Errorf("incite: query chunk %q: nil status in GetQueryResults output from CloudWatch Logs", c.id) + return &UnexpectedQueryError{c.id, c.stream.Text, errNilStatus()} } c.status = *status @@ -745,10 +752,8 @@ func (m *mgr) pollChunk(c *chunk) error { return sendChunkBlock(c, output.Results, output.Statistics, false) case cloudwatchlogs.QueryStatusComplete: return sendChunkBlock(c, output.Results, output.Statistics, true) - case cloudwatchlogs.QueryStatusCancelled, cloudwatchlogs.QueryStatusFailed, "Timeout": - return fmt.Errorf("incite: query chunk %q: unexpected terminal status: %s", c.id, c.status) default: - return fmt.Errorf("incite: query chunk %q: unhandled status: %s", c.id, c.status) + return &TerminalQueryStatusError{c.id, c.status, c.stream.Text} } } diff --git a/incite_test.go b/incite_test.go index 8441da0..afe080c 100644 --- a/incite_test.go +++ b/incite_test.go @@ -4,10 +4,13 @@ import ( "context" "errors" "fmt" + "regexp" "sort" "testing" "time" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/stretchr/testify/mock" @@ -701,7 +704,7 @@ func TestQueryManager_Query(t *testing.T) { r := make([]Result, 1) n, err := s.Read(r) assert.Equal(t, 0, n) - assert.EqualError(t, err, `incite: fatal error from CloudWatch Logs for chunk "foo" [2020-08-25 03:30:00 +0000 UTC..2020-08-25 03:35:00 +0000 UTC): super fatal error`) + assert.EqualError(t, err, `incite: CloudWatch Logs failed to start query for chunk "foo" [2020-08-25 03:30:00 +0000 UTC..2020-08-25 03:35:00 +0000 UTC): super fatal error`) assert.ErrorIs(t, err, causeErr) assert.Equal(t, Stats{}, s.GetStats()) @@ -750,33 +753,24 @@ func TestQueryManager_Query(t *testing.T) { // look for additional issues. for p := 0; p < QueryConcurrencyQuotaLimit; p++ { t.Run(fmt.Sprintf("Parallel=%d", p), func(t *testing.T) { - for rps := 2; rps <= 5; rps++ { - t.Run(fmt.Sprintf("RPS=%d", rps), func(t *testing.T) { - actions := newMockActions(t) - m := NewQueryManager(Config{ - Actions: actions, - Parallel: p, - RPS: map[CloudWatchLogsAction]int{ - StartQuery: rps, - StopQuery: rps, - GetQueryResults: rps, - }, - }) - require.NotNil(t, m) - t.Cleanup(func() { - err := m.Close() - if err != nil { - t.Errorf("Cleanup: failed to close m: %s", err.Error()) - } - actions.AssertExpectations(t) - }) - - for i, s := range scenarios { - t.Run(fmt.Sprintf("Scenario=%d", i), func(t *testing.T) { - t.Parallel() // Run scenarios in parallel. - s.play(t, i, m, actions) - }) - } + actions := newMockActions(t) + m := NewQueryManager(Config{ + Actions: actions, + Parallel: p, + RPS: RPSQuotaLimits, + }) + require.NotNil(t, m) + t.Cleanup(func() { + err := m.Close() + if err != nil { + t.Errorf("Cleanup: failed to close m: %s", err.Error()) + } + }) + + for i, s := range scenarios { + t.Run(fmt.Sprintf("Scenario=%d", i), func(t *testing.T) { + t.Parallel() // Run scenarios in parallel. + s.play(t, i, m, actions) }) } }) @@ -796,8 +790,56 @@ func TestQueryManager_Query(t *testing.T) { } var scenarios = []queryScenario{ - // NoStart.InvalidQueryError - // NoStart.UnexpectedError + { + note: "NoStart.InvalidQueryError", + QuerySpec: QuerySpec{ + Text: "a poorly written query", + Start: defaultStart, + End: defaultEnd, + Limit: 50, + Groups: []string{"/my/group/1", "/my/group/2"}, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("a poorly written query"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: int64p(50), + LogGroupNames: []*string{sp("/my/group/1"), sp("/my/group/2")}, + }, + startQueryErrs: []error{ + cwlErr(cloudwatchlogs.ErrCodeInvalidParameterException, "terrible query writing there bud"), + }, + startQuerySuccess: false, + }, + }, + err: regexp.MustCompile(`incite: CloudWatch Logs failed to start query for chunk "a poorly written query" .*: InvalidParameterException: terrible query writing there bud$`), + }, + { + note: "NoStart.UnexpectedError", + QuerySpec: QuerySpec{ + Text: "an ill-fated query", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("an ill-fated query"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQueryErrs: []error{errors.New("pow exclamation point")}, + startQuerySuccess: false, + }, + }, + err: regexp.MustCompile(`incite: CloudWatch Logs failed to start query for chunk "an ill-fated query" .*: pow exclamation point$`), + }, + { note: "OneChunk.OnePoll.Empty", QuerySpec: QuerySpec{ @@ -826,10 +868,175 @@ var scenarios = []queryScenario{ }, closeAfter: true, }, - // OneChunk.OnePoll.[3 cases = Cancelled,Failed,Timeout] + { + note: "OneChunk.OnePoll.Status.Cancelled", + QuerySpec: QuerySpec{ + Text: "destined for cancellation", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Priority: -5, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("destined for cancellation"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusCancelled, + }, + }, + }, + }, + err: `incite: query "scenario:3|chunk:0|OneChunk.OnePoll.Status.Cancelled" has terminal status "Cancelled" (text "destined for cancellation")`, + }, + { + note: "OneChunk.OnePoll.Status.Failed", + QuerySpec: QuerySpec{ + Text: "fated for failure", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Preview: true, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("fated for failure"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusFailed, + }, + }, + }, + }, + err: `incite: query "scenario:4|chunk:0|OneChunk.OnePoll.Status.Failed" has terminal status "Failed" (text "fated for failure")`, + }, + { + note: "OneChunk.OnePoll.Status.Timeout", + QuerySpec: QuerySpec{ + Text: "tempting a timeout", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Preview: true, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("tempting a timeout"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: "Timeout", + }, + }, + }, + }, + err: `incite: query "scenario:5|chunk:0|OneChunk.OnePoll.Status.Timeout" has terminal status "Timeout" (text "tempting a timeout")`, + }, + { + note: "OneChunk.OnePoll.Status.Unexpected", + QuerySpec: QuerySpec{ + Text: "expecting the unexpected", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Preview: true, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("expecting the unexpected"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: "Did you see this coming?", + }, + }, + }, + }, + err: `incite: query "scenario:6|chunk:0|OneChunk.OnePoll.Status.Unexpected" has terminal status "Did you see this coming?" (text "expecting the unexpected")`, + }, + { + note: "OneChunk.OnePoll.WithResults", + QuerySpec: QuerySpec{ + Text: "deliver me some results", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Preview: true, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("deliver me some results"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: []Result{ + { + {"@ptr", "123"}, + {"@MyField", "hello"}, + }, + { + {"@ptr", "456"}, + {"@MyField", "goodbye"}, + }, + }, + stats: &Stats{1, 2, 3}, + }, + }, + }, + }, + results: []Result{ + { + {"@ptr", "123"}, + {"@MyField", "hello"}, + }, + { + {"@ptr", "456"}, + {"@MyField", "goodbye"}, + }, + }, + stats: Stats{1, 2, 3}, + }, // OneChunk.MultiPoll.(one case with limit exceeded, throttling, scheduled, unknown, and several running statuses with intermediate results that get ignored) // OneChunk.Preview.Stats (noPtr) // OneChunk.Preview.Normal (with @ptr) + // MultiChunk.Fractional.LessThanOne + // MultiChunk.Fractional.OneAligned + // MultiChunk.Fractional.TwoAligned + // MultiChunk.Fractional.TwoMisaligned + // MultiChunk.Fractional.ThreeMisaligned // MultiChunk.NoPreview // MultiChunk.Preview } @@ -839,7 +1046,7 @@ type queryScenario struct { note string // Optional note describing the scenario chunks []chunkPlan // Sub-scenario for each chunk closeEarly bool // Whether to prematurely close the stream. - err string // Final expected error + err interface{} // Final expected error. Must be nil, string, or *regexp.Regexp. results []Result // Final results in the expected order after optional sorting using less. less func(i, j int) bool // Optional less function for sorting results, needed for chunked scenarios. stats Stats // Final stats @@ -868,7 +1075,7 @@ func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mock r, err := ReadAll(s) // Test against the expected results and/or errors. - if qs.err == "" { + if qs.err == nil { assert.NoError(t, err) if qs.less != nil { sort.Slice(r, qs.less) @@ -879,8 +1086,14 @@ func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mock } assert.Equal(t, expectedResults, r) } else { - assert.EqualError(t, err, qs.err) - assert.Nil(t, r) + switch x := qs.err.(type) { + case string: + assert.EqualError(t, err, x) + case *regexp.Regexp: + assert.Error(t, err) + assert.Regexp(t, x, err.Error(), "expected Error string to match Regexp %s, but it did not: %s", x, err.Error()) + } + assert.Empty(t, r) } assert.Equal(t, qs.stats, s.GetStats()) @@ -911,6 +1124,9 @@ type chunkPollOutput struct { } func (cp *chunkPlan) setup(i, j int, note string, closeEarly bool, actions *mockActions) { + actions.lock.Lock() + defer actions.lock.Unlock() + for _, err := range cp.startQueryErrs { actions. On("StartQueryWithContext", anyContext, &cp.startQueryInput). @@ -924,7 +1140,7 @@ func (cp *chunkPlan) setup(i, j int, note string, closeEarly bool, actions *mock queryID := fmt.Sprintf("scenario:%d|chunk:%d", i, j) if note != "" { - queryID += note + queryID += "|" + note } actions. On("StartQueryWithContext", anyContext, &cp.startQueryInput). @@ -1011,6 +1227,10 @@ func endTimeSeconds(t time.Time) *int64 { return startTimeSeconds(t.Add(-time.Second)) } +func cwlErr(code, message string) error { + return awserr.New(code, message, nil) +} + var ( defaultStart = time.Date(2020, 8, 25, 3, 30, 0, 0, time.UTC) defaultEnd = defaultStart.Add(5 * time.Minute)