Skip to content

Commit

Permalink
Ensure stream stats are added in all cases, improve test coverage
Browse files Browse the repository at this point in the history
This commit fixes a bug where, when a non-terminal chunk of a stream
finishes (i.e. the stream will not be EOF even after consuming all
the chunks' results) the chunk stats aren't added to the stream stats.

As well, a few additional test cases are added and some small subtle
problems with the testing framework are corrected.

Testing:

```
$ go test -count=1000 -timeout=20m
PASS
ok      github.com/gogama/incite        761.134s

$ go version
go version go1.15.6 windows/amd64
```
  • Loading branch information
vcschapp committed Aug 5, 2021
1 parent c1228c0 commit 6d22cbf
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 22 deletions.
3 changes: 3 additions & 0 deletions incite.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,11 @@ func (m *mgr) pollNextChunk() int {
if err == errEndOfChunk {
m.numChunks--
m.statsLock.Lock()
c.stream.lock.Lock()
m.stats.add(c.Stats)
c.stream.stats.add(c.Stats)
m.statsLock.Unlock()
c.stream.lock.Unlock()
} else {
m.chunks.Prev().Link(r)
}
Expand Down
160 changes: 138 additions & 22 deletions incite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,7 @@ var scenarios = []queryScenario{
},

{
note: "MultiChunk.Fractional.LessThanOne",
note: "MultiChunk.LessThanOne",
QuerySpec: QuerySpec{
Text: "stats count_distinct(Eggs) as EggCount By Spam",
Start: defaultStart,
Expand Down Expand Up @@ -1531,12 +1531,119 @@ var scenarios = []queryScenario{
stats: Stats{77, 777, 7},
},

//{
// note: "MultiChunk.Fractional.OneAligned",
//},
//{
// note: "MultiChunk.Fractional.TwoAligned",
//},
{
note: "MultiChunk.OneAligned",
QuerySpec: QuerySpec{
Text: "QuerySpec indicates chunking but chunk size is fully aligned with start/end to produce one real chunk",
Start: defaultStart,
End: defaultEnd,
Groups: []string{"primo", "secondo"},
Chunk: 5 * time.Minute,
},
chunks: []chunkPlan{
{
startQueryInput: cloudwatchlogs.StartQueryInput{
QueryString: sp("QuerySpec indicates chunking but chunk size is fully aligned with start/end to produce one real chunk"),
StartTime: startTimeSeconds(defaultStart),
EndTime: endTimeSeconds(defaultEnd),
Limit: defaultLimit,
LogGroupNames: []*string{sp("primo"), sp("secondo")},
},
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusScheduled,
},
{
status: cloudwatchlogs.QueryStatusRunning,
results: []Result{
{{"ignore", "me"}},
},
stats: &Stats{-1, -2, -12},
},
{
status: cloudwatchlogs.QueryStatusComplete,
results: []Result{
{{"@ptr", "1111"}, {"Something", "wicked this way comes"}},
{{"@ptr", "2222"}, {"Something", "else"}},
},
stats: &Stats{13, 8, 3},
},
},
},
},
results: []Result{
{{"@ptr", "1111"}, {"Something", "wicked this way comes"}},
{{"@ptr", "2222"}, {"Something", "else"}},
},
stats: Stats{13, 8, 3},
},

{
note: "MultiChunk.TwoAligned",
QuerySpec: QuerySpec{
Text: "QuerySpec indicates chunking and [start, end) defines exactly two chunks",
Start: defaultStart,
End: defaultEnd,
Groups: []string{"ein", "zwei"},
Chunk: 150 * time.Second,
},
chunks: []chunkPlan{
{
startQueryInput: cloudwatchlogs.StartQueryInput{
QueryString: sp("QuerySpec indicates chunking and [start, end) defines exactly two chunks"),
StartTime: startTimeSeconds(defaultStart),
EndTime: endTimeSeconds(defaultStart.Add(150 * time.Second)),
Limit: defaultLimit,
LogGroupNames: []*string{sp("ein"), sp("zwei")},
},
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
results: []Result{
{{"@ptr", "aaaa"}, {"@timestamp", "2021-08-05 15:26:000.123"}},
{{"@ptr", "bbbb"}, {"@timestamp", "2021-08-05 15:26:000.125"}},
},
stats: &Stats{1, 1, 1},
},
},
},
{
startQueryInput: cloudwatchlogs.StartQueryInput{
QueryString: sp("QuerySpec indicates chunking and [start, end) defines exactly two chunks"),
StartTime: startTimeSeconds(defaultStart.Add(150 * time.Second)),
EndTime: endTimeSeconds(defaultEnd),
Limit: defaultLimit,
LogGroupNames: []*string{sp("ein"), sp("zwei")},
},
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
results: []Result{
{{"@ptr", "dddd"}, {"@timestamp", "2021-08-05 15:26:000.126"}},
{{"@ptr", "cccc"}, {"@timestamp", "2021-08-05 15:26:000.124"}},
},
stats: &Stats{2, 2, 1},
},
},
},
},
results: []Result{
{{"@ptr", "aaaa"}, {"@timestamp", "2021-08-05 15:26:000.123"}},
{{"@ptr", "cccc"}, {"@timestamp", "2021-08-05 15:26:000.124"}},
{{"@ptr", "bbbb"}, {"@timestamp", "2021-08-05 15:26:000.125"}},
{{"@ptr", "dddd"}, {"@timestamp", "2021-08-05 15:26:000.126"}},
},
postprocess: func(r []Result) {
sort.Slice(r, func(i, j int) bool {
return r[i].get("@timestamp") < r[j].get("@timestamp")
})
},
stats: Stats{3, 3, 2},
},

//{
// note: "MultiChunk.Fractional.TwoMisaligned",
//},
Expand All @@ -1553,15 +1660,15 @@ var scenarios = []queryScenario{

type queryScenario struct {
QuerySpec
note string // Optional note describing the scenario
chunks []chunkPlan // Sub-scenario for each chunk
closeEarly bool // Whether to prematurely close the stream.
err error // Final expected error.
results []Result // Final results in the expected order after optional sorting using less.
less func(i, j int) bool // Optional less function for sorting results, needed for chunked scenarios.
stats Stats // Final stats
closeAfter bool // Whether to close the stream after the scenario.
expectStop bool // Whether to expect a StopQuery call
note string // Optional note describing the scenario
chunks []chunkPlan // Sub-scenario for each chunk
closeEarly bool // Whether to prematurely close the stream.
err error // Final expected error.
results []Result // Final results in the expected order after optional sorting using less.
postprocess func([]Result) // Optional function to post-process (e.g. sort) results in-place.
stats Stats // Final stats
closeAfter bool // Whether to close the stream after the scenario.
expectStop bool // Whether to expect a StopQuery call
}

func (qs *queryScenario) test(t *testing.T, i int, m QueryManager, actions *mockActions, parallel bool) {
Expand All @@ -1575,8 +1682,8 @@ func (qs *queryScenario) test(t *testing.T, i int, m QueryManager, actions *mock

func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mockActions) {
// Set up the chunk polling scenarios.
for j, cp := range qs.chunks {
cp.setup(i, j, qs.note, qs.closeEarly, qs.expectStop, actions)
for j := range qs.chunks {
qs.chunks[j].setup(i, j, qs.note, qs.closeEarly, qs.expectStop, actions)
}

// Start the scenario query.
Expand All @@ -1597,8 +1704,8 @@ func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mock
// Test against the expected results and/or errors.
if qs.err == nil {
assert.NoError(t, err)
if qs.less != nil {
sort.Slice(r, qs.less)
if qs.postprocess != nil {
qs.postprocess(r)
}
expectedResults := qs.results
if expectedResults == nil {
Expand Down Expand Up @@ -1750,6 +1857,16 @@ func cwlErr(code, message string) error {
return awserr.New(code, message, nil)
}

func (r Result) get(k string) (v string) {
for _, f := range r {
if f.Field == k {
v = f.Value
break
}
}
return
}

var (
defaultStart = time.Date(2020, 8, 25, 3, 30, 0, 0, time.UTC)
defaultEnd = defaultStart.Add(5 * time.Minute)
Expand All @@ -1762,6 +1879,5 @@ var (
anyContext = mock.MatchedBy(func(ctx context.Context) bool {
return ctx != nil
})
anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput")
anyGetQueryResultsInput = mock.AnythingOfType("*cloudwatchlogs.GetQueryResultsInput")
anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput")
)

0 comments on commit 6d22cbf

Please sign in to comment.