Skip to content

Commit

Permalink
Replicate end-before-start bug for issue #25
Browse files Browse the repository at this point in the history
#25

This fix adds a unit test and a scenario test which can replicate the
bug, so the build is broken by this commit. The fix is coming in a
separate subsequent commit.

See also discussion on: #24.
  • Loading branch information
vcschapp committed Jan 30, 2023
1 parent c688b9c commit a0abf53
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 0 deletions.
153 changes: 153 additions & 0 deletions incite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1499,6 +1499,134 @@ var scenarios = []queryScenario{
},
},

// This scenario aims to regression test https://github.com/gogama/incite/issues/25.
// There are four generation zero chunks. Three chunks (0, 2, 3) have empty
// results. One chunk (1) comes back with MaxLimit results, necessitating
// splitting.
{
note: "MultiChunk.Issue25Regression",
QuerySpec: QuerySpec{
Text: "lineage",
Groups: []string{"/menger/böhm-bawerk/mises/hakek/rothbard"},
Start: defaultStart,
End: defaultStart.Add(40 * time.Minute),
Limit: MaxLimit,
Chunk: 10 * time.Minute,
SplitUntil: time.Millisecond,
},
chunks: []chunkPlan{
// CHUNK 0.
{
startQueryInput: startQueryInput("lineage", defaultStart, defaultStart.Add(10*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
stats: &Stats{100, 0, 10, 0, 0, 0, 0, 0},
},
},
},
// CHUNK 1.
{
startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute), defaultStart.Add(20*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
results: maxLimitResults,
stats: &Stats{1_000_000, MaxLimit, 2 * MaxLimit, 0, 0, 0, 0, 0},
},
},
},
// CHUNK 2.
{
startQueryInput: startQueryInput("lineage", defaultStart.Add(20*time.Minute), defaultStart.Add(30*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
stats: &Stats{100, 0, 10, 0, 0, 0, 0, 0},
},
},
},
// CHUNK 3.
{
startQueryInput: startQueryInput("lineage", defaultStart.Add(30*time.Minute), defaultStart.Add(40*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
stats: &Stats{100, 0, 10, 0, 0, 0, 0, 0},
},
},
},
// SUB-CHUNK 1/0.
{
startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute), defaultStart.Add(10*time.Minute+150*time.Second), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
results: maxLimitResults[0 : MaxLimit/4],
stats: &Stats{250_000, MaxLimit / 4, MaxLimit / 2, 0, 0, 0, 0, 0},
},
},
},
// SUB-CHUNK 1/1.
{
startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute+150*time.Second), defaultStart.Add(10*time.Minute+300*time.Second), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
results: maxLimitResults[MaxLimit/4 : MaxLimit/2],
stats: &Stats{250_000, MaxLimit / 4, MaxLimit / 2, 0, 0, 0, 0, 0},
},
},
},
// SUB-CHUNK 1/2.
{
startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute+300*time.Second), defaultStart.Add(10*time.Minute+450*time.Second), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
results: maxLimitResults[MaxLimit/2 : 3*MaxLimit/4],
stats: &Stats{250_000, MaxLimit / 4, MaxLimit / 2, 0, 0, 0, 0, 0},
},
},
},
// SUB-CHUNK 1/3.
{
startQueryInput: startQueryInput("lineage", defaultStart.Add(10*time.Minute+450*time.Second), defaultStart.Add(20*time.Minute), MaxLimit, "/menger/böhm-bawerk/mises/hakek/rothbard"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
results: maxLimitResults[3*MaxLimit/4:],
stats: &Stats{250_000, MaxLimit / 4, MaxLimit / 2, 0, 0, 0, 0, 0},
},
},
},
},
results: maxLimitResults,
postprocess: func(r []Result) {
sort.Slice(r, func(i, j int) bool {
a, _ := strconv.Atoi(r[i].get("@ptr"))
b, _ := strconv.Atoi(r[j].get("@ptr"))
return a < b
})
},
stats: Stats{
BytesScanned: 2_000_300,
RecordsMatched: 2 * MaxLimit,
RecordsScanned: 4*MaxLimit + 30,
RangeRequested: 40 * time.Minute,
RangeStarted: 40 * time.Minute,
RangeDone: 40 * time.Minute,
},
},

// This is the last scenario, and it is meant to be a super test case that,
// by itself, runs more chunks than the QueryManager can run in parallel.
//
Expand Down Expand Up @@ -2121,6 +2249,31 @@ func startQueryInput(text string, start, end time.Time, limit int64, groups ...s
}
}

func startQueryOutput(queryID string) *cloudwatchlogs.StartQueryOutput {
return &cloudwatchlogs.StartQueryOutput{
QueryId: &queryID,
}
}

func getQueryResultsInput(queryID string) *cloudwatchlogs.GetQueryResultsInput {
return &cloudwatchlogs.GetQueryResultsInput{
QueryId: &queryID,
}
}

func getQueryResultsOutput(r []Result, status string, stats *Stats) *cloudwatchlogs.GetQueryResultsOutput {
o := &cloudwatchlogs.GetQueryResultsOutput{
Results: backOut(r),
}
if status != "" {
o.Status = &status
}
if stats != nil {
o.Statistics = stats.backOut()
}
return o
}

func (r Result) get(k string) (v string) {
for _, f := range r {
if f.Field == k {
Expand Down
114 changes: 114 additions & 0 deletions mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,6 +1820,120 @@ func TestQueryManager_Query(t *testing.T) {
}
})

t.Run("Issue #25 - Chunk Splitting Does Not Cause Empty Time Range", func(t *testing.T) {
// Regression test for: https://github.com/gogama/incite/issues/25
//
// This test creates the following scenario:
// 1. Parallelism of 2 so mgr can run maximum two chunks at a time.
// 1. Chunk splitting on for the query.
// 2. Three original generation 0 chunks.
// 3. First chunk gets split into only two sub-chunks, both of which
// are started before the third generation 0 chunks can be
// started.
// 4. Third generation 0 chunk starts and ends before one of the
// split sub-chunks ends.
//
// The above scenario tests that the mgr correctly detects that there
// are no more generation 0 chunks to create and does not end up
// creating an empty chunk whose start and end are the same.
firstGen1ChunkStarting := make(chan time.Time)
thirdGen0ChunkStarting := make(chan time.Time)
actions := newMockActions(t)
m := NewQueryManager(Config{
Actions: actions,
Parallel: 2,
RPS: lotsOfRPS,
})
t.Cleanup(func() {
maxLimit = MaxLimit
_ = m.Close()
close(firstGen1ChunkStarting)
close(thirdGen0ChunkStarting)
})
maxLimit = 2
text := "It's the ABC train, papa!"
groups := []string{"regression/test/for", "https://github.com/gogama/incite/issues/25"}
start := time.Date(2023, 1, 28, 16, 48, 13, 0, time.UTC)
numChunks := 3
chunkSize := 2 * time.Millisecond
end := start.Add(time.Duration(numChunks) * chunkSize)

// First chunk (generation 0).
actions.
On("StartQueryWithContext", anyContext, startQueryInput(text, start, start.Add(1*chunkSize), maxLimit, groups...)).
Return(startQueryOutput("0"), nil).
Once()
actions.
On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("0")).
Return(getQueryResultsOutput([]Result{{{"@ptr", "0/0"}}, {{"@ptr", "0/1"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil).
Once()
// Second chunk (generation 0).
actions.
On("StartQueryWithContext", anyContext, startQueryInput(text, start.Add(1*chunkSize), start.Add(2*chunkSize), maxLimit, groups...)).
Return(startQueryOutput("1"), nil).
Once()
actions.
On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("1")).
WaitUntil(firstGen1ChunkStarting).
Return(getQueryResultsOutput([]Result{{{"@ptr", "1/0"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil).
Once()
// First chunk half 1 (generation 1).
actions.
On("StartQueryWithContext", anyContext, startQueryInput(text, start, start.Add(chunkSize/2), maxLimit, groups...)).
Run(func(_ mock.Arguments) {
firstGen1ChunkStarting <- time.Now()
}).
Return(startQueryOutput("0/0"), nil).
Once()
actions.
On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("0/0")).
Return(getQueryResultsOutput([]Result{{{"@ptr", "0/0"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil).
Once()
// First chunk half 2 (generation 1).
actions.
On("StartQueryWithContext", anyContext, startQueryInput(text, start.Add(chunkSize/2), start.Add(chunkSize), maxLimit, groups...)).
Return(startQueryOutput("0/1"), nil).
Once()
actions.
On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("0/1")).
WaitUntil(thirdGen0ChunkStarting).
Return(getQueryResultsOutput([]Result{{{"@ptr", "1/1"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil).
Once()
// Third chunk (generation 1).
actions.
On("StartQueryWithContext", anyContext, startQueryInput(text, start.Add(2*chunkSize), end, maxLimit, groups...)).
Run(func(_ mock.Arguments) {
thirdGen0ChunkStarting <- time.Now()
}).
Return(startQueryOutput("2"), nil).
Once()
actions.
On("GetQueryResultsWithContext", anyContext, getQueryResultsInput("2")).
Return(getQueryResultsOutput([]Result{{{"@ptr", "2/0"}}}, cloudwatchlogs.QueryStatusComplete, nil), nil).
Once()
s, err := m.Query(QuerySpec{
Text: text,
Groups: groups,
Start: start,
End: end,
Limit: maxLimit,
Chunk: chunkSize,
SplitUntil: time.Millisecond,
})
require.NoError(t, err)

var actualResults []Result
actualResults, err = ReadAll(s)

require.NoError(t, err)
actions.AssertExpectations(t)
sort.Slice(actualResults, func(i, j int) bool {
return actualResults[i].get("@ptr") < actualResults[j].get("@ptr")
})
expectedResults := []Result{{{"@ptr", "0/0"}}, {{"@ptr", "1/0"}}, {{"@ptr", "1/1"}}, {{"@ptr", "2/0"}}}
assert.Equal(t, expectedResults, actualResults)
})

t.Run("Maxed Chunks are Correctly Recorded", func(t *testing.T) {
testCases := []struct {
name string
Expand Down

0 comments on commit a0abf53

Please sign in to comment.