diff --git a/incite.go b/incite.go index 309086a..9452c92 100644 --- a/incite.go +++ b/incite.go @@ -678,8 +678,11 @@ func (m *mgr) pollNextChunk() int { if err == errEndOfChunk { m.numChunks-- m.statsLock.Lock() + c.stream.lock.Lock() m.stats.add(c.Stats) + c.stream.stats.add(c.Stats) m.statsLock.Unlock() + c.stream.lock.Unlock() } else { m.chunks.Prev().Link(r) } diff --git a/incite_test.go b/incite_test.go index d6c4d37..6427a8e 100644 --- a/incite_test.go +++ b/incite_test.go @@ -1494,7 +1494,7 @@ var scenarios = []queryScenario{ }, { - note: "MultiChunk.Fractional.LessThanOne", + note: "MultiChunk.LessThanOne", QuerySpec: QuerySpec{ Text: "stats count_distinct(Eggs) as EggCount By Spam", Start: defaultStart, @@ -1531,12 +1531,119 @@ var scenarios = []queryScenario{ stats: Stats{77, 777, 7}, }, - //{ - // note: "MultiChunk.Fractional.OneAligned", - //}, - //{ - // note: "MultiChunk.Fractional.TwoAligned", - //}, + { + note: "MultiChunk.OneAligned", + QuerySpec: QuerySpec{ + Text: "QuerySpec indicates chunking but chunk size is fully aligned with start/end to produce one real chunk", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"primo", "secondo"}, + Chunk: 5 * time.Minute, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("QuerySpec indicates chunking but chunk size is fully aligned with start/end to produce one real chunk"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("primo"), sp("secondo")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusScheduled, + }, + { + status: cloudwatchlogs.QueryStatusRunning, + results: []Result{ + {{"ignore", "me"}}, + }, + stats: &Stats{-1, -2, -12}, + }, + { + status: cloudwatchlogs.QueryStatusComplete, + results: []Result{ + {{"@ptr", "1111"}, {"Something", "wicked this way comes"}}, + {{"@ptr", "2222"}, {"Something", "else"}}, + }, + stats: &Stats{13, 8, 3}, + }, + }, + }, + }, + results: []Result{ + {{"@ptr", "1111"}, {"Something", "wicked this way comes"}}, + {{"@ptr", "2222"}, {"Something", "else"}}, + }, + stats: Stats{13, 8, 3}, + }, + + { + note: "MultiChunk.TwoAligned", + QuerySpec: QuerySpec{ + Text: "QuerySpec indicates chunking and [start, end) defines exactly two chunks", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"ein", "zwei"}, + Chunk: 150 * time.Second, + }, + chunks: []chunkPlan{ + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("QuerySpec indicates chunking and [start, end) defines exactly two chunks"), + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultStart.Add(150 * time.Second)), + Limit: defaultLimit, + LogGroupNames: []*string{sp("ein"), sp("zwei")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: []Result{ + {{"@ptr", "aaaa"}, {"@timestamp", "2021-08-05 15:26:000.123"}}, + {{"@ptr", "bbbb"}, {"@timestamp", "2021-08-05 15:26:000.125"}}, + }, + stats: &Stats{1, 1, 1}, + }, + }, + }, + { + startQueryInput: cloudwatchlogs.StartQueryInput{ + QueryString: sp("QuerySpec indicates chunking and [start, end) defines exactly two chunks"), + StartTime: startTimeSeconds(defaultStart.Add(150 * time.Second)), + EndTime: endTimeSeconds(defaultEnd), + Limit: defaultLimit, + LogGroupNames: []*string{sp("ein"), sp("zwei")}, + }, + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: []Result{ + {{"@ptr", "dddd"}, {"@timestamp", "2021-08-05 15:26:000.126"}}, + {{"@ptr", "cccc"}, {"@timestamp", "2021-08-05 15:26:000.124"}}, + }, + stats: &Stats{2, 2, 1}, + }, + }, + }, + }, + results: []Result{ + {{"@ptr", "aaaa"}, {"@timestamp", "2021-08-05 15:26:000.123"}}, + {{"@ptr", "cccc"}, {"@timestamp", "2021-08-05 15:26:000.124"}}, + {{"@ptr", "bbbb"}, {"@timestamp", "2021-08-05 15:26:000.125"}}, + {{"@ptr", "dddd"}, {"@timestamp", "2021-08-05 15:26:000.126"}}, + }, + postprocess: func(r []Result) { + sort.Slice(r, func(i, j int) bool { + return r[i].get("@timestamp") < r[j].get("@timestamp") + }) + }, + stats: Stats{3, 3, 2}, + }, + //{ // note: "MultiChunk.Fractional.TwoMisaligned", //}, @@ -1553,15 +1660,15 @@ var scenarios = []queryScenario{ type queryScenario struct { QuerySpec - note string // Optional note describing the scenario - chunks []chunkPlan // Sub-scenario for each chunk - closeEarly bool // Whether to prematurely close the stream. - err error // Final expected error. - results []Result // Final results in the expected order after optional sorting using less. - less func(i, j int) bool // Optional less function for sorting results, needed for chunked scenarios. - stats Stats // Final stats - closeAfter bool // Whether to close the stream after the scenario. - expectStop bool // Whether to expect a StopQuery call + note string // Optional note describing the scenario + chunks []chunkPlan // Sub-scenario for each chunk + closeEarly bool // Whether to prematurely close the stream. + err error // Final expected error. + results []Result // Final results in the expected order after optional sorting using less. + postprocess func([]Result) // Optional function to post-process (e.g. sort) results in-place. + stats Stats // Final stats + closeAfter bool // Whether to close the stream after the scenario. + expectStop bool // Whether to expect a StopQuery call } func (qs *queryScenario) test(t *testing.T, i int, m QueryManager, actions *mockActions, parallel bool) { @@ -1575,8 +1682,8 @@ func (qs *queryScenario) test(t *testing.T, i int, m QueryManager, actions *mock func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mockActions) { // Set up the chunk polling scenarios. - for j, cp := range qs.chunks { - cp.setup(i, j, qs.note, qs.closeEarly, qs.expectStop, actions) + for j := range qs.chunks { + qs.chunks[j].setup(i, j, qs.note, qs.closeEarly, qs.expectStop, actions) } // Start the scenario query. @@ -1597,8 +1704,8 @@ func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mock // Test against the expected results and/or errors. if qs.err == nil { assert.NoError(t, err) - if qs.less != nil { - sort.Slice(r, qs.less) + if qs.postprocess != nil { + qs.postprocess(r) } expectedResults := qs.results if expectedResults == nil { @@ -1750,6 +1857,16 @@ func cwlErr(code, message string) error { return awserr.New(code, message, nil) } +func (r Result) get(k string) (v string) { + for _, f := range r { + if f.Field == k { + v = f.Value + break + } + } + return +} + var ( defaultStart = time.Date(2020, 8, 25, 3, 30, 0, 0, time.UTC) defaultEnd = defaultStart.Add(5 * time.Minute) @@ -1762,6 +1879,5 @@ var ( anyContext = mock.MatchedBy(func(ctx context.Context) bool { return ctx != nil }) - anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput") - anyGetQueryResultsInput = mock.AnythingOfType("*cloudwatchlogs.GetQueryResultsInput") + anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput") )