Skip to content

Commit

Permalink
Allow cancel of package scope Query via context
Browse files Browse the repository at this point in the history
  • Loading branch information
vcschapp committed Aug 9, 2021
1 parent 0f3dabf commit 453fb60
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 40 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ TODO list in order:
2. Finish other TODO test cases in unmarshal_test.go.
3. Finish any other lingering TODO or FIXME.
4. Audit the logging code to ensure we log enough worthwhile status info.
5. Write README.



Expand Down
3 changes: 2 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func errNoValue(key string) error {

const (
nilActionsMsg = "incite: nil actions"
nilStream = "incite: nil stream"
nilStreamMsg = "incite: nil stream"
nilContextMsg = "incite: nil context"

textBlankMsg = "incite: blank query text"
startSubSecondMsg = "incite: start has sub-second granularity"
Expand Down
3 changes: 2 additions & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package incite_test

import (
"context"
"fmt"
"time"

Expand All @@ -19,7 +20,7 @@ func ExampleQuery() {
s := session.Must(session.NewSession())
a := cloudwatchlogs.New(s)
end := time.Now()
data, err := incite.Query(a, incite.QuerySpec{
data, err := incite.Query(context.Background(), a, incite.QuerySpec{
Text: "fields @timestamp, @message | filter @message =~ /foo/ | sort @timestamp desc",
Start: end.Add(-15 * time.Minute),
End: end,
Expand Down
42 changes: 36 additions & 6 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@

package incite

import "context"

// Query is sweet sweet sugar to perform a synchronous CloudWatch Logs
// Insights query and get back all the results without needing to
// construct a QueryManager.
// construct a QueryManager. Query runs the query indicated by q, using
// the CloudWatch Logs actions provided by a, and returns all the
// query results (or the error if the query failed).
//
// The context ctx controls the lifetime of the query. If ctx expires or
// is cancelled, the query is cancelled and an error is returned.
//
// Unlike NewQueryManager, which defaults to DefaultParallel, Query uses
// a parallelism factor of 1. This means that if q represents a chunked
Expand All @@ -17,14 +24,37 @@ package incite
// especially applications running concurrent queries against the same
// region from multiple goroutines, should construct and configure a
// QueryManager explicitly.
func Query(a CloudWatchLogsActions, q QuerySpec) ([]Result, error) {
func Query(ctx context.Context, a CloudWatchLogsActions, q QuerySpec) ([]Result, error) {
if ctx == nil {
panic(nilContextMsg)
}
m := NewQueryManager(Config{
Actions: a,
Parallel: 1,
})
s, err := m.Query(q)
if err != nil {
return nil, err
defer func() {
_ = m.Close()
}()
c := make(chan struct {
r []Result
err error
})
go func() {
var s Stream
var p struct {
r []Result
err error
}
s, p.err = m.Query(q)
if p.err == nil {
p.r, p.err = ReadAll(s)
}
c <- p
}()
select {
case p := <-c:
return p.r, p.err
case <-ctx.Done():
return nil, ctx.Err()
}
return ReadAll(s)
}
112 changes: 82 additions & 30 deletions query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,95 @@
package incite

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/stretchr/testify/assert"
)

func TestQuery(t *testing.T) {
// ARRANGE.
actions := newMockActions(t)
actions.
On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{
QueryString: sp("x"),
LogGroupNames: []*string{sp("y")},
StartTime: startTimeSeconds(defaultStart),
EndTime: endTimeSeconds(defaultEnd),
Limit: int64p(DefaultLimit),
}).
Return(&cloudwatchlogs.StartQueryOutput{QueryId: sp("ham")}, nil).
Once()
actions.On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{
QueryId: sp("ham"),
}).Return(&cloudwatchlogs.GetQueryResultsOutput{
Status: sp(cloudwatchlogs.QueryStatusComplete),
Results: [][]*cloudwatchlogs.ResultField{},
}, nil).Once()

// ACT.
r, err := Query(actions, QuerySpec{
Text: "x",
Groups: []string{"y"},
Start: defaultStart,
End: defaultEnd,
t.Run("Nil Context", func(t *testing.T) {
assert.PanicsWithValue(t, nilContextMsg, func() {
_, _ = Query(nil, newMockActions(t), QuerySpec{})
})
})

t.Run("Bad Query", func(t *testing.T) {
// ACT.
r, err := Query(context.Background(), newMockActions(t), QuerySpec{})

// ASSERT.
assert.Nil(t, r)
assert.EqualError(t, err, "incite: blank query text")
})

t.Run("Run to Completion", func(t *testing.T) {
// ARRANGE.
actions := newMockActions(t)
actions.
On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{
QueryString: sp("x"),
LogGroupNames: []*string{sp("y")},
StartTime: startTimeSeconds(defaultStart),
EndTime: endTimeSeconds(defaultEnd),
Limit: int64p(DefaultLimit),
}).
Return(&cloudwatchlogs.StartQueryOutput{QueryId: sp("ham")}, nil).
Once()
actions.On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{
QueryId: sp("ham"),
}).Return(&cloudwatchlogs.GetQueryResultsOutput{
Status: sp(cloudwatchlogs.QueryStatusComplete),
Results: [][]*cloudwatchlogs.ResultField{},
}, nil).Once()

// ACT.
r, err := Query(context.Background(), actions, QuerySpec{
Text: "x",
Groups: []string{"y"},
Start: defaultStart,
End: defaultEnd,
})

// ASSERT.
assert.Equal(t, []Result{}, r)
assert.NoError(t, err)
actions.AssertExpectations(t)
})

// ASSERT.
assert.Equal(t, []Result{}, r)
assert.NoError(t, err)
actions.AssertExpectations(t)
t.Run("Context Cancelled", func(t *testing.T) {
// ARRANGE.
timer := time.NewTimer(50 * time.Millisecond)
actions := newMockActions(t)
actions.
On("StartQueryWithContext", anyContext, anyStartQueryInput).
WaitUntil(timer.C).
Return(&cloudwatchlogs.StartQueryOutput{QueryId: sp("eggs")}).
Maybe()
actions.
On("GetQueryResultsWithContext", anyContext, mock.Anything).
Return(&cloudwatchlogs.GetQueryResultsOutput{
Status: sp(cloudwatchlogs.QueryStatusComplete),
}).
Maybe()
ctx, cancel := context.WithCancel(context.Background())
cancel()

// ACT.
r, err := Query(ctx, actions, QuerySpec{
Text: "x",
Groups: []string{"y"},
Start: defaultStart,
End: defaultEnd,
})

// ASSERT.
assert.Nil(t, r)
assert.Same(t, context.Canceled, err)
actions.AssertExpectations(t)
})
}
2 changes: 1 addition & 1 deletion readall.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// from Read as an error to be reported.
func ReadAll(s Stream) ([]Result, error) {
if s == nil {
panic(nilStream)
panic(nilStreamMsg)
}

// For interests' sake, this implementation is borrowed almost
Expand Down
2 changes: 1 addition & 1 deletion readall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

func TestReadAll(t *testing.T) {
t.Run("Panic", func(t *testing.T) {
assert.PanicsWithValue(t, nilStream, func() {
assert.PanicsWithValue(t, nilStreamMsg, func() {
_, _ = ReadAll(nil)
})
})
Expand Down

0 comments on commit 453fb60

Please sign in to comment.