diff --git a/incite_test.go b/incite_test.go index 0acca4e..c279b5f 100644 --- a/incite_test.go +++ b/incite_test.go @@ -1499,6 +1499,134 @@ var scenarios = []queryScenario{ }, }, + // This scenario aims to regression test https://github.com/gogama/incite/issues/25. + // There are four generation zero chunks. Three chunks (0, 2, 3) have empty + // results. One chunk (1) comes back with MaxLimit results, necessitating + // splitting. + { + note: "MultiChunk.Issue25Regression", + QuerySpec: QuerySpec{ + Text: "lineage", + Groups: []string{"/menger/böhm-bawerk/mises/hakek/rothbard"}, + Start: defaultStart, + End: defaultStart.Add(40 * time.Minute), + Limit: MaxLimit, + Chunk: 10 * time.Minute, + SplitUntil: time.Millisecond, + }, + chunks: []chunkPlan{ + // CHUNK 0. + { + startQueryInput: startQueryInput("lineage", defaultStart, defaultStart.Add(10*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"), + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + stats: &Stats{100, 0, 10, 0, 0, 0, 0, 0}, + }, + }, + }, + // CHUNK 1. + { + startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute), defaultStart.Add(20*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"), + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: maxLimitResults, + stats: &Stats{1_000_000, MaxLimit, 2 * MaxLimit, 0, 0, 0, 0, 0}, + }, + }, + }, + // CHUNK 2. + { + startQueryInput: startQueryInput("lineage", defaultStart.Add(20*time.Minute), defaultStart.Add(30*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"), + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + stats: &Stats{100, 0, 10, 0, 0, 0, 0, 0}, + }, + }, + }, + // CHUNK 3. + { + startQueryInput: startQueryInput("lineage", defaultStart.Add(30*time.Minute), defaultStart.Add(40*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"), + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + stats: &Stats{100, 0, 10, 0, 0, 0, 0, 0}, + }, + }, + }, + // SUB-CHUNK 1/0. + { + startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute), defaultStart.Add(10*time.Minute+150*time.Second), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"), + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: maxLimitResults[0 : MaxLimit/4], + stats: &Stats{250_000, MaxLimit / 4, MaxLimit / 2, 0, 0, 0, 0, 0}, + }, + }, + }, + // SUB-CHUNK 1/1. + { + startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute+150*time.Second), defaultStart.Add(10*time.Minute+300*time.Second), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"), + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: maxLimitResults[MaxLimit/4 : MaxLimit/2], + stats: &Stats{250_000, MaxLimit / 4, MaxLimit / 2, 0, 0, 0, 0, 0}, + }, + }, + }, + // SUB-CHUNK 1/2. + { + startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute+300*time.Second), defaultStart.Add(10*time.Minute+450*time.Second), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"), + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: maxLimitResults[MaxLimit/2 : 3*MaxLimit/4], + stats: &Stats{250_000, MaxLimit / 4, MaxLimit / 2, 0, 0, 0, 0, 0}, + }, + }, + }, + // SUB-CHUNK 1/3. + { + startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute+450*time.Second), defaultStart.Add(20*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"), + startQuerySuccess: true, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + results: maxLimitResults[3*MaxLimit/4:], + stats: &Stats{250_000, MaxLimit / 4, MaxLimit / 2, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + results: maxLimitResults, + postprocess: func(r []Result) { + sort.Slice(r, func(i, j int) bool { + a, _ := strconv.Atoi(r[i].get("@ptr")) + b, _ := strconv.Atoi(r[j].get("@ptr")) + return a < b + }) + }, + stats: Stats{ + BytesScanned: 2_000_300, + RecordsMatched: 2 * MaxLimit, + RecordsScanned: 4*MaxLimit + 30, + RangeRequested: 40 * time.Minute, + RangeStarted: 40 * time.Minute, + RangeDone: 40 * time.Minute, + }, + }, + // This is the last scenario, and it is meant to be a super test case that, // by itself, runs more chunks than the QueryManager can run in parallel. // @@ -2121,6 +2249,31 @@ func startQueryInput(text string, start, end time.Time, limit int64, groups ...s } } +func startQueryOutput(queryID string) *cloudwatchlogs.StartQueryOutput { + return &cloudwatchlogs.StartQueryOutput{ + QueryId: &queryID, + } +} + +func getQueryResultsInput(queryID string) *cloudwatchlogs.GetQueryResultsInput { + return &cloudwatchlogs.GetQueryResultsInput{ + QueryId: &queryID, + } +} + +func getQueryResultsOutput(r []Result, status string, stats *Stats) *cloudwatchlogs.GetQueryResultsOutput { + o := &cloudwatchlogs.GetQueryResultsOutput{ + Results: backOut(r), + } + if status != "" { + o.Status = &status + } + if stats != nil { + o.Statistics = stats.backOut() + } + return o +} + func (r Result) get(k string) (v string) { for _, f := range r { if f.Field == k { diff --git a/mgr_test.go b/mgr_test.go index 97252a0..75bc116 100644 --- a/mgr_test.go +++ b/mgr_test.go @@ -1820,6 +1820,120 @@ func TestQueryManager_Query(t *testing.T) { } }) + t.Run("Issue #25 - Chunk Splitting Does Not Cause Empty Time Range", func(t *testing.T) { + // Regression test for: https://github.com/gogama/incite/issues/25 + // + // This test creates the following scenario: + // 1. Parallelism of 2 so mgr can run maximum two chunks at a time. + // 1. Chunk splitting on for the query. + // 2. Three original generation 0 chunks. + // 3. First chunk gets split into only two sub-chunks, both of which + // are started before the third generation 0 chunks can be + // started. + // 4. Third generation 0 chunk starts and ends before one of the + // split sub-chunks ends. + // + // The above scenario tests that the mgr correctly detects that there + // are no more generation 0 chunks to create and does not end up + // creating an empty chunk whose start and end are the same. + firstGen1ChunkStarting := make(chan time.Time) + thirdGen0ChunkStarting := make(chan time.Time) + actions := newMockActions(t) + m := NewQueryManager(Config{ + Actions: actions, + Parallel: 2, + RPS: lotsOfRPS, + }) + t.Cleanup(func() { + maxLimit = MaxLimit + _ = m.Close() + close(firstGen1ChunkStarting) + close(thirdGen0ChunkStarting) + }) + maxLimit = 2 + text := "It's the ABC train, papa!" + groups := []string{"regression/test/for", "https://github.com/gogama/incite/issues/25"} + start := time.Date(2023, 1, 28, 16, 48, 13, 0, time.UTC) + numChunks := 3 + chunkSize := 2 * time.Millisecond + end := start.Add(time.Duration(numChunks) * chunkSize) + + // First chunk (generation 0). + actions. + On("StartQueryWithContext", anyContext, startQueryInput(text, start, start.Add(1*chunkSize), maxLimit, groups...)). + Return(startQueryOutput("0"), nil). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("0")). + Return(getQueryResultsOutput([]Result{{{"@ptr", "0/0"}}, {{"@ptr", "0/1"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil). + Once() + // Second chunk (generation 0). + actions. + On("StartQueryWithContext", anyContext, startQueryInput(text, start.Add(1*chunkSize), start.Add(2*chunkSize), maxLimit, groups...)). + Return(startQueryOutput("1"), nil). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("1")). + WaitUntil(firstGen1ChunkStarting). + Return(getQueryResultsOutput([]Result{{{"@ptr", "1/0"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil). + Once() + // First chunk half 1 (generation 1). + actions. + On("StartQueryWithContext", anyContext, startQueryInput(text, start, start.Add(chunkSize/2), maxLimit, groups...)). + Run(func(_ mock.Arguments) { + firstGen1ChunkStarting <- time.Now() + }). + Return(startQueryOutput("0/0"), nil). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("0/0")). + Return(getQueryResultsOutput([]Result{{{"@ptr", "0/0"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil). + Once() + // First chunk half 2 (generation 1). + actions. + On("StartQueryWithContext", anyContext, startQueryInput(text, start.Add(chunkSize/2), start.Add(chunkSize), maxLimit, groups...)). + Return(startQueryOutput("0/1"), nil). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("0/1")). + WaitUntil(thirdGen0ChunkStarting). + Return(getQueryResultsOutput([]Result{{{"@ptr", "1/1"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil). + Once() + // Third chunk (generation 1). + actions. + On("StartQueryWithContext", anyContext, startQueryInput(text, start.Add(2*chunkSize), end, maxLimit, groups...)). + Run(func(_ mock.Arguments) { + thirdGen0ChunkStarting <- time.Now() + }). + Return(startQueryOutput("2"), nil). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("2")). + Return(getQueryResultsOutput([]Result{{{"@ptr", "2/0"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil). + Once() + s, err := m.Query(QuerySpec{ + Text: text, + Groups: groups, + Start: start, + End: end, + Limit: maxLimit, + Chunk: chunkSize, + SplitUntil: time.Millisecond, + }) + require.NoError(t, err) + + var actualResults []Result + actualResults, err = ReadAll(s) + + require.NoError(t, err) + actions.AssertExpectations(t) + sort.Slice(actualResults, func(i, j int) bool { + return actualResults[i].get("@ptr") < actualResults[j].get("@ptr") + }) + expectedResults := []Result{{{"@ptr", "0/0"}}, {{"@ptr", "1/0"}}, {{"@ptr", "1/1"}}, {{"@ptr", "2/0"}}} + assert.Equal(t, expectedResults, actualResults) + }) + t.Run("Maxed Chunks are Correctly Recorded", func(t *testing.T) { testCases := []struct { name string