Skip to content

Commit

Permalink
Finish basic happy path query test scenarios
Browse files Browse the repository at this point in the history
There are still a lot of TODO scenarios, but the ones remaining are
the more advanced cases, e.g. where there we need to poll the chunk
more than once, or where there's multiple chunks, plus preview mode.

As part of this commit, I also refactored the non-Unmarshal related
error wrapping code in `errors.go`. This was kind of a last minute
panic thing, but it occurred to me that customers might want to be
able to differentiate "job status" type errors like "Failed" or
"Cancelled" from other fatals like "internal server error", in order
to give their customers better error messaging.

Things are in a pretty good state now. Tests passing on Linux and
Windows.
  • Loading branch information
vcschapp committed Jul 16, 2021
1 parent 2aa7424 commit 820d683
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 56 deletions.
11 changes: 11 additions & 0 deletions cloudwatchlogs_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package incite

import (
"context"
"sync"
"testing"

"github.com/aws/aws-sdk-go/aws/request"
Expand All @@ -12,6 +13,7 @@ import (

type mockActions struct {
mock.Mock
lock sync.RWMutex
}

func newMockActions(t *testing.T) *mockActions {
Expand All @@ -21,6 +23,9 @@ func newMockActions(t *testing.T) *mockActions {
}

func (m *mockActions) StartQueryWithContext(ctx context.Context, input *cloudwatchlogs.StartQueryInput, _ ...request.Option) (*cloudwatchlogs.StartQueryOutput, error) {
m.lock.RLock()
defer m.lock.RUnlock()

args := m.Called(ctx, input)
if output, ok := args.Get(0).(*cloudwatchlogs.StartQueryOutput); ok {
return output, args.Error(1)
Expand All @@ -29,6 +34,9 @@ func (m *mockActions) StartQueryWithContext(ctx context.Context, input *cloudwat
}

func (m *mockActions) StopQueryWithContext(ctx context.Context, input *cloudwatchlogs.StopQueryInput, _ ...request.Option) (*cloudwatchlogs.StopQueryOutput, error) {
m.lock.RLock()
defer m.lock.RUnlock()

args := m.Called(ctx, input)
if output, ok := args.Get(0).(*cloudwatchlogs.StopQueryOutput); ok {
return output, args.Error(1)
Expand All @@ -37,6 +45,9 @@ func (m *mockActions) StopQueryWithContext(ctx context.Context, input *cloudwatc
}

func (m *mockActions) GetQueryResultsWithContext(ctx context.Context, input *cloudwatchlogs.GetQueryResultsInput, _ ...request.Option) (*cloudwatchlogs.GetQueryResultsOutput, error) {
m.lock.RLock()
defer m.lock.RUnlock()

args := m.Called(ctx, input)
if output, ok := args.Get(0).(*cloudwatchlogs.GetQueryResultsOutput); ok {
return output, args.Error(1)
Expand Down
48 changes: 36 additions & 12 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package incite
import (
"errors"
"fmt"
"time"
)

var (
Expand All @@ -11,24 +12,43 @@ var (
ErrClosed = errors.New("incite: operation on a closed object")
)

type wrappedErr struct {
cause error
msg string
type StartQueryError struct {
Text string
Start time.Time
End time.Time
Cause error
}

func wrap(cause error, format string, a ...interface{}) error {
return &wrappedErr{
cause: cause,
msg: fmt.Sprintf(format, a...),
}
func (err *StartQueryError) Error() string {
return fmt.Sprintf("incite: CloudWatch Logs failed to start query for chunk %q [%s..%s): %s", err.Text, err.Start, err.End, err.Cause)
}

func (w *wrappedErr) Error() string {
return w.msg + ": " + w.cause.Error()
func (err *StartQueryError) Unwrap() error {
return err.Cause
}

func (w *wrappedErr) Unwrap() error {
return w.cause
type TerminalQueryStatusError struct {
QueryID string
Status string
Text string
}

func (err *TerminalQueryStatusError) Error() string {
return fmt.Sprintf("incite: query %q has terminal status %q (text %q)", err.QueryID, err.Status, err.Text)
}

type UnexpectedQueryError struct {
QueryID string
Text string
Cause error
}

func (err *UnexpectedQueryError) Error() string {
return fmt.Sprintf("incite: query %q had unexpected error (text %q): %s", err.QueryID, err.Text, err.Cause)
}

func (err *UnexpectedQueryError) Unwrap() error {
return err.Cause
}

func errNoKey(id string) error {
Expand All @@ -39,6 +59,10 @@ func errNoValue(id, key string) error {
return fmt.Errorf("incite: query chunk %q: no value for key %q", id, key)
}

func errNilStatus() error {
return errors.New("incite: nil status in GetQueryResults output from CloudWatch Logs")
}

const (
nilActionsMsg = "incite: nil actions"
nilReaderMsg = "incite: nil reader"
Expand Down
21 changes: 13 additions & 8 deletions incite.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"container/ring"
"context"
"errors"
"fmt"
"io"
"strings"
"sync"
Expand Down Expand Up @@ -603,7 +602,13 @@ func (m *mgr) startNextChunk() error {
m.Logger.Printf("incite: QueryManager(%p) temporary failure to start chunk %q [%s..%s): %s",
m, s.Text, next, end, err.Error())
} else {
s.setErr(wrap(err, "incite: fatal error from CloudWatch Logs for chunk %q [%s..%s)", s.Text, next, end), true, Stats{})
err = &StartQueryError{
Text: s.Text,
Start: next,
End: end,
Cause: err,
}
s.setErr(err, true, Stats{})
m.stats.add(s.GetStats())
m.Logger.Printf("incite: QueryManager(%p) permanent failure to start %q [%s..%s) due to fatal error from CloudWatch Logs: %s",
m, s.Text, next, end, err.Error())
Expand Down Expand Up @@ -725,13 +730,15 @@ func (m *mgr) pollChunk(c *chunk) error {
output, err = m.Actions.GetQueryResultsWithContext(c.ctx, &input)
m.lastReq[GetQueryResults] = time.Now()
})
if err != nil {
return wrap(err, "incite: query chunk %q: failed to poll", c.id)
if err != nil && isTemporary(err) {
return nil
} else if err != nil {
return &UnexpectedQueryError{c.id, c.stream.Text, err}
}

status := output.Status
if status == nil {
return fmt.Errorf("incite: query chunk %q: nil status in GetQueryResults output from CloudWatch Logs", c.id)
return &UnexpectedQueryError{c.id, c.stream.Text, errNilStatus()}
}

c.status = *status
Expand All @@ -745,10 +752,8 @@ func (m *mgr) pollChunk(c *chunk) error {
return sendChunkBlock(c, output.Results, output.Statistics, false)
case cloudwatchlogs.QueryStatusComplete:
return sendChunkBlock(c, output.Results, output.Statistics, true)
case cloudwatchlogs.QueryStatusCancelled, cloudwatchlogs.QueryStatusFailed, "Timeout":
return fmt.Errorf("incite: query chunk %q: unexpected terminal status: %s", c.id, c.status)
default:
return fmt.Errorf("incite: query chunk %q: unhandled status: %s", c.id, c.status)
return &TerminalQueryStatusError{c.id, c.status, c.stream.Text}
}
}

Expand Down
Loading

0 comments on commit 820d683

Please sign in to comment.