From 820d6836c17d5f4de169a4adbe412726ce1ce025 Mon Sep 17 00:00:00 2001 From: Victor Schappert Date: Fri, 16 Jul 2021 15:56:56 -0700 Subject: [PATCH] Finish basic happy path query test scenarios There are still a lot of TODO scenarios, but the ones remaining are the more advanced cases, e.g. where there we need to poll the chunk more than once, or where there's multiple chunks, plus preview mode. As part of this commit, I also refactored the non-Unmarshal related error wrapping code in `errors.go`. This was kind of a last minute panic thing, but it occurred to me that customers might want to be able to differentiate "job status" type errors like "Failed" or "Cancelled" from other fatals like "internal server error", in order to give their customers better error messaging. Things are in a pretty good state now. Tests passing on Linux and Windows. --- cloudwatchlogs_actions_test.go | 11 ++ errors.go | 48 ++++-- incite.go | 21 ++- incite_test.go | 292 +++++++++++++++++++++++++++++---- 4 files changed, 316 insertions(+), 56 deletions(-) diff --git a/cloudwatchlogs_actions_test.go b/cloudwatchlogs_actions_test.go index 2b65356..30c307f 100644 --- a/cloudwatchlogs_actions_test.go +++ b/cloudwatchlogs_actions_test.go @@ -2,6 +2,7 @@ package incite import ( "context" + "sync" "testing" "github.com/aws/aws-sdk-go/aws/request" @@ -12,6 +13,7 @@ import ( type mockActions struct { mock.Mock + lock sync.RWMutex } func newMockActions(t *testing.T) *mockActions { @@ -21,6 +23,9 @@ func newMockActions(t *testing.T) *mockActions { } func (m *mockActions) StartQueryWithContext(ctx context.Context, input *cloudwatchlogs.StartQueryInput, _ ...request.Option) (*cloudwatchlogs.StartQueryOutput, error) { + m.lock.RLock() + defer m.lock.RUnlock() + args := m.Called(ctx, input) if output, ok := args.Get(0).(*cloudwatchlogs.StartQueryOutput); ok { return output, args.Error(1) @@ -29,6 +34,9 @@ func (m *mockActions) StartQueryWithContext(ctx context.Context, input *cloudwat } func (m *mockActions) StopQueryWithContext(ctx context.Context, input *cloudwatchlogs.StopQueryInput, _ ...request.Option) (*cloudwatchlogs.StopQueryOutput, error) { + m.lock.RLock() + defer m.lock.RUnlock() + args := m.Called(ctx, input) if output, ok := args.Get(0).(*cloudwatchlogs.StopQueryOutput); ok { return output, args.Error(1) @@ -37,6 +45,9 @@ func (m *mockActions) StopQueryWithContext(ctx context.Context, input *cloudwatc } func (m *mockActions) GetQueryResultsWithContext(ctx context.Context, input *cloudwatchlogs.GetQueryResultsInput, _ ...request.Option) (*cloudwatchlogs.GetQueryResultsOutput, error) { + m.lock.RLock() + defer m.lock.RUnlock() + args := m.Called(ctx, input) if output, ok := args.Get(0).(*cloudwatchlogs.GetQueryResultsOutput); ok { return output, args.Error(1) diff --git a/errors.go b/errors.go index a090a66..c40ed91 100644 --- a/errors.go +++ b/errors.go @@ -3,6 +3,7 @@ package incite import ( "errors" "fmt" + "time" ) var ( @@ -11,24 +12,43 @@ var ( ErrClosed = errors.New("incite: operation on a closed object") ) -type wrappedErr struct { - cause error - msg string +type StartQueryError struct { + Text string + Start time.Time + End time.Time + Cause error } -func wrap(cause error, format string, a ...interface{}) error { - return &wrappedErr{ - cause: cause, - msg: fmt.Sprintf(format, a...), - } +func (err *StartQueryError) Error() string { + return fmt.Sprintf("incite: CloudWatch Logs failed to start query for chunk %q [%s..%s): %s", err.Text, err.Start, err.End, err.Cause) } -func (w *wrappedErr) Error() string { - return w.msg + ": " + w.cause.Error() +func (err *StartQueryError) Unwrap() error { + return err.Cause } -func (w *wrappedErr) Unwrap() error { - return w.cause +type TerminalQueryStatusError struct { + QueryID string + Status string + Text string +} + +func (err *TerminalQueryStatusError) Error() string { + return fmt.Sprintf("incite: query %q has terminal status %q (text %q)", err.QueryID, err.Status, err.Text) +} + +type UnexpectedQueryError struct { + QueryID string + Text string + Cause error +} + +func (err *UnexpectedQueryError) Error() string { + return fmt.Sprintf("incite: query %q had unexpected error (text %q): %s", err.QueryID, err.Text, err.Cause) +} + +func (err *UnexpectedQueryError) Unwrap() error { + return err.Cause } func errNoKey(id string) error { @@ -39,6 +59,10 @@ func errNoValue(id, key string) error { return fmt.Errorf("incite: query chunk %q: no value for key %q", id, key) } +func errNilStatus() error { + return errors.New("incite: nil status in GetQueryResults output from CloudWatch Logs") +} + const ( nilActionsMsg = "incite: nil actions" nilReaderMsg = "incite: nil reader" diff --git a/incite.go b/incite.go index 18ee419..0ce292a 100644 --- a/incite.go +++ b/incite.go @@ -5,7 +5,6 @@ import ( "container/ring" "context" "errors" - "fmt" "io" "strings" "sync" @@ -603,7 +602,13 @@ func (m *mgr) startNextChunk() error { m.Logger.Printf("incite: QueryManager(%p) temporary failure to start chunk %q [%s..%s): %s", m, s.Text, next, end, err.Error()) } else { - s.setErr(wrap(err, "incite: fatal error from CloudWatch Logs for chunk %q [%s..%s)", s.Text, next, end), true, Stats{}) + err = &StartQueryError{ + Text: s.Text, + Start: next, + End: end, + Cause: err, + } + s.setErr(err, true, Stats{}) m.stats.add(s.GetStats()) m.Logger.Printf("incite: QueryManager(%p) permanent failure to start %q [%s..%s) due to fatal error from CloudWatch Logs: %s", m, s.Text, next, end, err.Error()) @@ -725,13 +730,15 @@ func (m *mgr) pollChunk(c *chunk) error { output, err = m.Actions.GetQueryResultsWithContext(c.ctx, &input) m.lastReq[GetQueryResults] = time.Now() }) - if err != nil { - return wrap(err, "incite: query chunk %q: failed to poll", c.id) + if err != nil && isTemporary(err) { + return nil + } else if err != nil { + return &UnexpectedQueryError{c.id, c.stream.Text, err} } status := output.Status if status == nil { - return fmt.Errorf("incite: query chunk %q: nil status in GetQueryResults output from CloudWatch Logs", c.id) + return &UnexpectedQueryError{c.id, c.stream.Text, errNilStatus()} } c.status = *status @@ -745,10 +752,8 @@ func (m *mgr) pollChunk(c *chunk) error { return sendChunkBlock(c, output.Results, output.Statistics, false) case cloudwatchlogs.QueryStatusComplete: return sendChunkBlock(c, output.Results, output.Statistics, true) - case cloudwatchlogs.QueryStatusCancelled, cloudwatchlogs.QueryStatusFailed, "Timeout": - return fmt.Errorf("incite: query chunk %q: unexpected terminal status: %s", c.id, c.status) default: - return fmt.Errorf("incite: query chunk %q: unhandled status: %s", c.id, c.status) + return &TerminalQueryStatusError{c.id, c.status, c.stream.Text} } } diff --git a/incite_test.go b/incite_test.go index 8441da0..afe080c 100644 --- a/incite_test.go +++ b/incite_test.go @@ -4,10 +4,13 @@ import ( "context" "errors" "fmt" + "regexp" "sort" "testing" "time" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" "github.com/stretchr/testify/mock" @@ -701,7 +704,7 @@ func TestQueryManager_Query(t *testing.T) { r := make([]Result, 1) n, err := s.Read(r) assert.Equal(t, 0, n) - assert.EqualError(t, err, `incite: fatal error from CloudWatch Logs for chunk "foo" [2020-08-25 03:30:00 +0000 UTC..2020-08-25 03:35:00 +0000 UTC): super fatal error`) + assert.EqualError(t, err, `incite: CloudWatch Logs failed to start query for chunk "foo" [2020-08-25 03:30:00 +0000 UTC..2020-08-25 03:35:00 +0000 UTC): super fatal error`) assert.ErrorIs(t, err, causeErr) assert.Equal(t, Stats{}, s.GetStats()) @@ -750,33 +753,24 @@ func TestQueryManager_Query(t *testing.T) { // look for additional issues. for p := 0; p < QueryConcurrencyQuotaLimit; p++ { t.Run(fmt.Sprintf("Parallel=%d", p), func(t *testing.T) { - for rps := 2; rps <= 5; rps++ { - t.Run(fmt.Sprintf("RPS=%d", rps), func(t *testing.T) { - actions := newMockActions(t) - m := NewQueryManager(Config{ - Actions: actions, - Parallel: p, - RPS: map[CloudWatchLogsAction]int{ - StartQuery: rps, - StopQuery: rps, - GetQueryResults: rps, - }, - }) - require.NotNil(t, m) - t.Cleanup(func() { - err := m.Close() - if err != nil { - t.Errorf("Cleanup: failed to close m: %s", err.Error()) - } - actions.AssertExpectations(t) - }) - - for i, s := range scenarios { - t.Run(fmt.Sprintf("Scenario=%d", i), func(t *testing.T) { - t.Parallel() // Run scenarios in parallel. - s.play(t, i, m, actions) - }) - } + actions := newMockActions(t) + m := NewQueryManager(Config{ + Actions: actions, + Parallel: p, + RPS: RPSQuotaLimits, + }) + require.NotNil(t, m) + t.Cleanup(func() { + err := m.Close() + if err != nil { + t.Errorf("Cleanup: failed to close m: %s", err.Error()) + } + }) + + for i, s := range scenarios { + t.Run(fmt.Sprintf("Scenario=%d", i), func(t *testing.T) { + t.Parallel() // Run scenarios in parallel. + s.play(t, i, m, actions) }) } }) @@ -796,8 +790,56 @@ func TestQueryManager_Query(t *testing.T) { } var scenarios = []queryScenario{ - // NoStart.InvalidQueryError - // NoStart.UnexpectedError + { + note: "NoStart.InvalidQueryError", + QuerySpec: QuerySpec{ + Text: "a poorly written query", + Start: defaultStart, + End: defaultEnd, + Limit: 50, + Groups: []string{"/my/group/1", "/my/group/2"}, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("a poorly written query"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: int64p(50), + LogGroupNames: []*string{sp("/my/group/1"), sp("/my/group/2")}, + }, + startQueryErrs: []error{ + cwlErr(cloudwatchlogs.ErrCodeInvalidParameterException, "terrible query writing there bud"), + }, + startQuerySuccess: false, + }, + }, + err: regexp.MustCompile(`incite: CloudWatch Logs failed to start query for chunk "a poorly written query" .*: InvalidParameterException: terrible query writing there bud$`), + }, + { + note: "NoStart.UnexpectedError", + QuerySpec: QuerySpec{ + Text: "an ill-fated query", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("an ill-fated query"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQueryErrs: []error{errors.New("pow exclamation point")}, + startQuerySuccess: false, + }, + }, + err: regexp.MustCompile(`incite: CloudWatch Logs failed to start query for chunk "an ill-fated query" .*: pow exclamation point$`), + }, + { note: "OneChunk.OnePoll.Empty", QuerySpec: QuerySpec{ @@ -826,10 +868,175 @@ var scenarios = []queryScenario{ }, closeAfter: true, }, - // OneChunk.OnePoll.[3 cases = Cancelled,Failed,Timeout] + { + note: "OneChunk.OnePoll.Status.Cancelled", + QuerySpec: QuerySpec{ + Text: "destined for cancellation", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Priority: -5, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("destined for cancellation"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusCancelled, + }, + }, + }, + }, + err: `incite: query "scenario:3|chunk:0|OneChunk.OnePoll.Status.Cancelled" has terminal status "Cancelled" (text "destined for cancellation")`, + }, + { + note: "OneChunk.OnePoll.Status.Failed", + QuerySpec: QuerySpec{ + Text: "fated for failure", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Preview: true, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("fated for failure"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusFailed, + }, + }, + }, + }, + err: `incite: query "scenario:4|chunk:0|OneChunk.OnePoll.Status.Failed" has terminal status "Failed" (text "fated for failure")`, + }, + { + note: "OneChunk.OnePoll.Status.Timeout", + QuerySpec: QuerySpec{ + Text: "tempting a timeout", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Preview: true, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("tempting a timeout"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: "Timeout", + }, + }, + }, + }, + err: `incite: query "scenario:5|chunk:0|OneChunk.OnePoll.Status.Timeout" has terminal status "Timeout" (text "tempting a timeout")`, + }, + { + note: "OneChunk.OnePoll.Status.Unexpected", + QuerySpec: QuerySpec{ + Text: "expecting the unexpected", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Preview: true, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("expecting the unexpected"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: "Did you see this coming?", + }, + }, + }, + }, + err: `incite: query "scenario:6|chunk:0|OneChunk.OnePoll.Status.Unexpected" has terminal status "Did you see this coming?" (text "expecting the unexpected")`, + }, + { + note: "OneChunk.OnePoll.WithResults", + QuerySpec: QuerySpec{ + Text: "deliver me some results", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/any/group"}, + Preview: true, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("deliver me some results"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("/any/group")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: []Result{ + { + {"@ptr", "123"}, + {"@MyField", "hello"}, + }, + { + {"@ptr", "456"}, + {"@MyField", "goodbye"}, + }, + }, + stats: &Stats{1, 2, 3}, + }, + }, + }, + }, + results: []Result{ + { + {"@ptr", "123"}, + {"@MyField", "hello"}, + }, + { + {"@ptr", "456"}, + {"@MyField", "goodbye"}, + }, + }, + stats: Stats{1, 2, 3}, + }, // OneChunk.MultiPoll.(one case with limit exceeded, throttling, scheduled, unknown, and several running statuses with intermediate results that get ignored) // OneChunk.Preview.Stats (noPtr) // OneChunk.Preview.Normal (with @ptr) + // MultiChunk.Fractional.LessThanOne + // MultiChunk.Fractional.OneAligned + // MultiChunk.Fractional.TwoAligned + // MultiChunk.Fractional.TwoMisaligned + // MultiChunk.Fractional.ThreeMisaligned // MultiChunk.NoPreview // MultiChunk.Preview } @@ -839,7 +1046,7 @@ type queryScenario struct { note string // Optional note describing the scenario chunks []chunkPlan // Sub-scenario for each chunk closeEarly bool // Whether to prematurely close the stream. - err string // Final expected error + err interface{} // Final expected error. Must be nil, string, or *regexp.Regexp. results []Result // Final results in the expected order after optional sorting using less. less func(i, j int) bool // Optional less function for sorting results, needed for chunked scenarios. stats Stats // Final stats @@ -868,7 +1075,7 @@ func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mock r, err := ReadAll(s) // Test against the expected results and/or errors. - if qs.err == "" { + if qs.err == nil { assert.NoError(t, err) if qs.less != nil { sort.Slice(r, qs.less) @@ -879,8 +1086,14 @@ func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mock } assert.Equal(t, expectedResults, r) } else { - assert.EqualError(t, err, qs.err) - assert.Nil(t, r) + switch x := qs.err.(type) { + case string: + assert.EqualError(t, err, x) + case *regexp.Regexp: + assert.Error(t, err) + assert.Regexp(t, x, err.Error(), "expected Error string to match Regexp %s, but it did not: %s", x, err.Error()) + } + assert.Empty(t, r) } assert.Equal(t, qs.stats, s.GetStats()) @@ -911,6 +1124,9 @@ type chunkPollOutput struct { } func (cp *chunkPlan) setup(i, j int, note string, closeEarly bool, actions *mockActions) { + actions.lock.Lock() + defer actions.lock.Unlock() + for _, err := range cp.startQueryErrs { actions. On("StartQueryWithContext", anyContext, &cp.startQueryInput). @@ -924,7 +1140,7 @@ func (cp *chunkPlan) setup(i, j int, note string, closeEarly bool, actions *mock queryID := fmt.Sprintf("scenario:%d|chunk:%d", i, j) if note != "" { - queryID += note + queryID += "|" + note } actions. On("StartQueryWithContext", anyContext, &cp.startQueryInput). @@ -1011,6 +1227,10 @@ func endTimeSeconds(t time.Time) *int64 { return startTimeSeconds(t.Add(-time.Second)) } +func cwlErr(code, message string) error { + return awserr.New(code, message, nil) +} + var ( defaultStart = time.Date(2020, 8, 25, 3, 30, 0, 0, time.UTC) defaultEnd = defaultStart.Add(5 * time.Minute)