diff --git a/README.md b/README.md index 249bbc2..1ac8802 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ TODO list in order: 2. Finish other TODO test cases in unmarshal_test.go. 3. Finish any other lingering TODO or FIXME. 4. Audit the logging code to ensure we log enough worthwhile status info. +5. Write README. diff --git a/errors.go b/errors.go index 1113e4d..f6e1fa7 100644 --- a/errors.go +++ b/errors.go @@ -113,7 +113,8 @@ func errNoValue(key string) error { const ( nilActionsMsg = "incite: nil actions" - nilStream = "incite: nil stream" + nilStreamMsg = "incite: nil stream" + nilContextMsg = "incite: nil context" textBlankMsg = "incite: blank query text" startSubSecondMsg = "incite: start has sub-second granularity" diff --git a/example_test.go b/example_test.go index 939d303..ddbb76c 100644 --- a/example_test.go +++ b/example_test.go @@ -5,6 +5,7 @@ package incite_test import ( + "context" "fmt" "time" @@ -19,7 +20,7 @@ func ExampleQuery() { s := session.Must(session.NewSession()) a := cloudwatchlogs.New(s) end := time.Now() - data, err := incite.Query(a, incite.QuerySpec{ + data, err := incite.Query(context.Background(), a, incite.QuerySpec{ Text: "fields @timestamp, @message | filter @message =~ /foo/ | sort @timestamp desc", Start: end.Add(-15 * time.Minute), End: end, diff --git a/query.go b/query.go index b0ec0c3..e2a66c1 100644 --- a/query.go +++ b/query.go @@ -4,9 +4,16 @@ package incite +import "context" + // Query is sweet sweet sugar to perform a synchronous CloudWatch Logs // Insights query and get back all the results without needing to -// construct a QueryManager. +// construct a QueryManager. Query runs the query indicated by q, using +// the CloudWatch Logs actions provided by a, and returns all the +// query results (or the error if the query failed). +// +// The context ctx controls the lifetime of the query. If ctx expires or +// is cancelled, the query is cancelled and an error is returned. // // Unlike NewQueryManager, which defaults to DefaultParallel, Query uses // a parallelism factor of 1. This means that if q represents a chunked @@ -17,14 +24,37 @@ package incite // especially applications running concurrent queries against the same // region from multiple goroutines, should construct and configure a // QueryManager explicitly. -func Query(a CloudWatchLogsActions, q QuerySpec) ([]Result, error) { +func Query(ctx context.Context, a CloudWatchLogsActions, q QuerySpec) ([]Result, error) { + if ctx == nil { + panic(nilContextMsg) + } m := NewQueryManager(Config{ Actions: a, Parallel: 1, }) - s, err := m.Query(q) - if err != nil { - return nil, err + defer func() { + _ = m.Close() + }() + c := make(chan struct { + r []Result + err error + }) + go func() { + var s Stream + var p struct { + r []Result + err error + } + s, p.err = m.Query(q) + if p.err == nil { + p.r, p.err = ReadAll(s) + } + c <- p + }() + select { + case p := <-c: + return p.r, p.err + case <-ctx.Done(): + return nil, ctx.Err() } - return ReadAll(s) } diff --git a/query_test.go b/query_test.go index 5863083..18e6a16 100644 --- a/query_test.go +++ b/query_test.go @@ -5,43 +5,95 @@ package incite import ( + "context" "testing" + "time" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/stretchr/testify/assert" ) func TestQuery(t *testing.T) { - // ARRANGE. - actions := newMockActions(t) - actions. - On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{ - QueryString: sp("x"), - LogGroupNames: []*string{sp("y")}, - StartTime: startTimeSeconds(defaultStart), - EndTime: endTimeSeconds(defaultEnd), - Limit: int64p(DefaultLimit), - }). - Return(&cloudwatchlogs.StartQueryOutput{QueryId: sp("ham")}, nil). - Once() - actions.On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{ - QueryId: sp("ham"), - }).Return(&cloudwatchlogs.GetQueryResultsOutput{ - Status: sp(cloudwatchlogs.QueryStatusComplete), - Results: [][]*cloudwatchlogs.ResultField{}, - }, nil).Once() - - // ACT. - r, err := Query(actions, QuerySpec{ - Text: "x", - Groups: []string{"y"}, - Start: defaultStart, - End: defaultEnd, + t.Run("Nil Context", func(t *testing.T) { + assert.PanicsWithValue(t, nilContextMsg, func() { + _, _ = Query(nil, newMockActions(t), QuerySpec{}) + }) + }) + + t.Run("Bad Query", func(t *testing.T) { + // ACT. + r, err := Query(context.Background(), newMockActions(t), QuerySpec{}) + + // ASSERT. + assert.Nil(t, r) + assert.EqualError(t, err, "incite: blank query text") + }) + + t.Run("Run to Completion", func(t *testing.T) { + // ARRANGE. + actions := newMockActions(t) + actions. + On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{ + QueryString: sp("x"), + LogGroupNames: []*string{sp("y")}, + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: int64p(DefaultLimit), + }). + Return(&cloudwatchlogs.StartQueryOutput{QueryId: sp("ham")}, nil). + Once() + actions.On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{ + QueryId: sp("ham"), + }).Return(&cloudwatchlogs.GetQueryResultsOutput{ + Status: sp(cloudwatchlogs.QueryStatusComplete), + Results: [][]*cloudwatchlogs.ResultField{}, + }, nil).Once() + + // ACT. + r, err := Query(context.Background(), actions, QuerySpec{ + Text: "x", + Groups: []string{"y"}, + Start: defaultStart, + End: defaultEnd, + }) + + // ASSERT. + assert.Equal(t, []Result{}, r) + assert.NoError(t, err) + actions.AssertExpectations(t) }) - // ASSERT. - assert.Equal(t, []Result{}, r) - assert.NoError(t, err) - actions.AssertExpectations(t) + t.Run("Context Cancelled", func(t *testing.T) { + // ARRANGE. + timer := time.NewTimer(50 * time.Millisecond) + actions := newMockActions(t) + actions. + On("StartQueryWithContext", anyContext, anyStartQueryInput). + WaitUntil(timer.C). + Return(&cloudwatchlogs.StartQueryOutput{QueryId: sp("eggs")}). + Maybe() + actions. + On("GetQueryResultsWithContext", anyContext, mock.Anything). + Return(&cloudwatchlogs.GetQueryResultsOutput{ + Status: sp(cloudwatchlogs.QueryStatusComplete), + }). + Maybe() + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // ACT. + r, err := Query(ctx, actions, QuerySpec{ + Text: "x", + Groups: []string{"y"}, + Start: defaultStart, + End: defaultEnd, + }) + + // ASSERT. + assert.Nil(t, r) + assert.Same(t, context.Canceled, err) + actions.AssertExpectations(t) + }) } diff --git a/readall.go b/readall.go index bf0f47b..991a58b 100644 --- a/readall.go +++ b/readall.go @@ -14,7 +14,7 @@ import ( // from Read as an error to be reported. func ReadAll(s Stream) ([]Result, error) { if s == nil { - panic(nilStream) + panic(nilStreamMsg) } // For interests' sake, this implementation is borrowed almost diff --git a/readall_test.go b/readall_test.go index 26f2fe5..1f5facc 100644 --- a/readall_test.go +++ b/readall_test.go @@ -17,7 +17,7 @@ import ( func TestReadAll(t *testing.T) { t.Run("Panic", func(t *testing.T) { - assert.PanicsWithValue(t, nilStream, func() { + assert.PanicsWithValue(t, nilStreamMsg, func() { _, _ = ReadAll(nil) }) })