From b91eac1390a302f03f82ac4773db78035a0becf5 Mon Sep 17 00:00:00 2001 From: Victor Schappert Date: Tue, 14 Dec 2021 00:41:01 -0800 Subject: [PATCH] Emit progress statistics in Stats (issue #1) https://github.com/gogama/incite/issues/1 --- incite.go | 130 +++++++++++++++++++++++------- incite_test.go | 213 +++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 297 insertions(+), 46 deletions(-) diff --git a/incite.go b/incite.go index 1950334..d4bec93 100644 --- a/incite.go +++ b/incite.go @@ -192,12 +192,19 @@ type QuerySpec struct { // returned from a QueryManager, Stats contains accumulated metadata // about all queries executed by the query manager. // -// The Stats structure contains two types of metadata. The first type of -// metadata are returned from the CloudWatch Logs Insights web service -// and consist of metrics about the amount of data scanned by the query -// or queries. The second type of metadata are collected by Incite and +// The Stats structure contains two types of metadata. +// +// The first type of metadata in Stats are returned from the CloudWatch +// Logs Insights web service and consist of metrics about the amount of +// data scanned by the query or queries. These metadata are contained +// within the fields BytesScanned, RecordsMatched, and RecordsScanned. +// +// The second type of metadata in Stats are collected by Incite and // consist of metrics about the size of the time range or ranges queried -// and how much progress has been on the queries. +// and how much progress has been on the queries. These metadata can be +// useful, for example, for showing progress bars or other work in +// progress indicators. They are contained within the fields +// RangeRequested, RangeStarted, RangeDone, and RangeFailed. type Stats struct { // BytesScanned is a metric returned by CloudWatch Logs Insights // which represents the total number of bytes of log events @@ -619,6 +626,13 @@ func (m *mgr) setTimerRPS(action CloudWatchLogsAction) bool { return m.setTimer(delayRem) } +func (m *mgr) addQuery(s *stream) { + heap.Push(&m.pq, s) + m.doStats(func(ms *Stats) { + ms.RangeRequested += s.stats.RangeRequested + }) +} + func (m *mgr) startNextChunks() int { var numStarted int for len(m.pq)+m.numReady > 0 && m.chunks.Len() <= m.Parallel { @@ -651,7 +665,7 @@ func (m *mgr) startNextChunk() error { case <-m.close: return errClosing case s := <-m.query: - heap.Push(&m.pq, s) + m.addQuery(s) return errInterrupted case <-m.timer.C: m.ding = true @@ -685,7 +699,16 @@ func (m *mgr) startNextChunk() error { m.logChunk(c, "temporary failure to start", err.Error()) } else { err = &StartQueryError{c.stream.Text, c.start, c.end, err} - c.stream.setErr(err, true, Stats{}) + s := Stats{ + RangeFailed: c.end.Sub(c.start), + } + if c.gen == 0 { + s.RangeStarted = s.RangeFailed + } + c.stream.setErr(err, true, s) + m.doStats(func(ms *Stats) { + ms.add(&s) + }) m.logChunk(c, "permanent failure to start", "fatal error from CloudWatch Logs: "+err.Error()) } return err @@ -695,12 +718,31 @@ func (m *mgr) startNextChunk() error { queryID := output.QueryId if queryID == nil { err = &StartQueryError{c.stream.Text, c.start, c.end, errors.New(outputMissingQueryIDMsg)} - c.stream.setErr(err, true, Stats{}) + s := Stats{ + RangeFailed: c.end.Sub(c.start), + } + if c.gen == 0 { + s.RangeStarted = s.RangeFailed + } + c.stream.setErr(err, true, s) + m.doStats(func(ms *Stats) { + ms.add(&s) + }) m.logChunk(c, "nil query ID from CloudWatch Logs for", "") return err } c.queryID = *queryID + // For initial chunks (not resulting from splits), count the chunks' + // time range toward the range started statistic. + if c.gen == 0 { + d := c.end.Sub(c.start) + m.coStats(c.stream, func(ms, ss *Stats) { + ms.RangeStarted += d + ss.RangeStarted += d + }) + } + // Put the chunk at the end of the running chunks list. m.chunks.Prev().Link(r) m.numChunks++ @@ -806,7 +848,7 @@ func (m *mgr) pollNextChunk() int { return -1 } else if err == errInterrupted { return 0 - } else if err == errChunkFailed { + } else if err == errRestartChunk { c.chunkID += "R" c.queryID = "" m.numChunks-- @@ -814,14 +856,14 @@ func (m *mgr) pollNextChunk() int { m.numReady++ m.ready.Prev().Link(r) return 1 - } else if err != nil && err != errEndOfChunk { + } else if err != nil && err != errEndOfChunk && err != errSplitChunk { m.numChunks-- m.chunks.Unlink(1) + m.finalizeChunk(c, err) + m.doStats(func(ms *Stats) { + ms.add(&c.Stats) + }) c.stream.setErr(err, true, c.Stats) - m.statsLock.Lock() - m.stats.add(&c.Stats) - m.statsLock.Unlock() - m.cancelChunkMaybe(c, err) continue } @@ -831,13 +873,18 @@ func (m *mgr) pollNextChunk() int { m.chunks.Unlink(1) if err == errEndOfChunk { m.numChunks-- - m.statsLock.Lock() - c.stream.lock.Lock() - m.stats.add(&c.Stats) - c.stream.stats.add(&c.Stats) - m.statsLock.Unlock() - c.stream.lock.Unlock() + c.Stats.RangeDone += c.end.Sub(c.start) + m.coStats(c.stream, func(ms, ss *Stats) { + ms.add(&c.Stats) + ss.add(&c.Stats) + }) m.logChunk(c, "finished", "end of chunk") + } else if err == errSplitChunk { + m.numChunks-- + m.coStats(c.stream, func(ms, ss *Stats) { + ms.add(&c.Stats) + ss.add(&c.Stats) + }) } else { m.chunks.Prev().Link(r) } @@ -856,7 +903,7 @@ func (m *mgr) pollChunk(c *chunk) error { case <-m.close: return errClosing case s := <-m.query: - heap.Push(&m.pq, s) + m.addQuery(s) return errInterrupted case <-m.timer.C: m.ding = true @@ -893,13 +940,13 @@ func (m *mgr) pollChunk(c *chunk) error { case cloudwatchlogs.QueryStatusComplete: translateStats(output.Statistics, &c.Stats) if m.splitChunk(c, len(output.Results)) { - return errEndOfChunk + return errSplitChunk } return sendChunkBlock(c, output.Results, true) case cloudwatchlogs.QueryStatusFailed: if c.ptr == nil { translateStats(output.Statistics, &c.Stats) - return errChunkFailed // Retry transient failures if stream isn't previewable. + return errRestartChunk // Retry transient failures if stream isn't previewable. } fallthrough default: @@ -940,6 +987,7 @@ func (m *mgr) splitChunk(c *chunk, n int) bool { return &chunk{ stream: parent.stream, ctx: context.WithValue(parent.stream.ctx, chunkIDKey, chunkID), + gen: parent.gen + 1, chunkID: chunkID, start: start, end: end, @@ -995,6 +1043,7 @@ func (m *mgr) cancelChunk(c *chunk, err error) { QueryId: &c.queryID, }) m.lastReq[StopQuery] = time.Now() + c.Stats.RangeFailed += c.end.Sub(c.start) if err != nil { m.logChunk(c, "failed to cancel", "error from CloudWatch Logs: "+err.Error()) } else if output.Success == nil || !*output.Success { @@ -1004,19 +1053,21 @@ func (m *mgr) cancelChunk(c *chunk, err error) { } } -func (m *mgr) cancelChunkMaybe(c *chunk, err error) { +func (m *mgr) finalizeChunk(c *chunk, err error) { if err == io.EOF { + c.Stats.RangeDone += c.end.Sub(c.start) m.logChunk(c, "finished", "end of stream") return } if terminalErr, ok := err.(*TerminalQueryStatusError); ok { switch terminalErr.Status { case cloudwatchlogs.QueryStatusFailed, cloudwatchlogs.QueryStatusCancelled, "Timeout": + c.Stats.RangeFailed += c.end.Sub(c.start) m.logChunk(c, "unexpected terminal status", terminalErr.Status) return } } - m.cancelChunk(c, err) + m.cancelChunk(c, nil) } func (m *mgr) waitForWork() int { @@ -1030,7 +1081,7 @@ func (m *mgr) waitForWork() int { case <-m.close: return -1 case s := <-m.query: - heap.Push(&m.pq, s) + m.addQuery(s) return 0 case <-m.timer.C: m.ding = true @@ -1050,6 +1101,20 @@ func (m *mgr) logChunk(c *chunk, msg, detail string) { } } +func (m *mgr) doStats(f func(ms *Stats)) { + m.statsLock.Lock() + defer m.statsLock.Unlock() + f(&m.stats) +} + +func (m *mgr) coStats(s *stream, f func(ms, ss *Stats)) { + m.statsLock.Lock() + defer m.statsLock.Unlock() + s.lock.Lock() + defer s.lock.Unlock() + f(&m.stats, &s.stats) +} + func sendChunkBlock(c *chunk, results [][]*cloudwatchlogs.ResultField, eof bool) error { var block []Result var err error @@ -1275,6 +1340,9 @@ func (m *mgr) Query(q QuerySpec) (s Stream, err error) { cancel: cancel, n: n, groups: groups, + stats: Stats{ + RangeRequested: d, + }, } ss.more = sync.NewCond(&ss.lock) @@ -1444,6 +1512,7 @@ type chunk struct { Stats stream *stream // Owning stream which receives results of the chunk ctx context.Context // Child of the stream's context owned by this chunk + gen int // Generation, zero if an initial chunk, positive if it came from a split chunkID string // Incite chunk ID queryID string // Insights query ID status string // Insights query status @@ -1453,8 +1522,9 @@ type chunk struct { } var ( - errClosing = errors.New("incite: closed") - errEndOfChunk = errors.New("incite: end of chunk") - errInterrupted = errors.New("incite: timer wait interrupted") - errChunkFailed = errors.New("incite: transient chunk failure") + errClosing = errors.New("incite: closed") + errEndOfChunk = errors.New("incite: end of chunk") + errInterrupted = errors.New("incite: timer wait interrupted") + errSplitChunk = errors.New("incite: chunk split") + errRestartChunk = errors.New("incite: transient chunk failure, restart chunk") ) diff --git a/incite_test.go b/incite_test.go index e469215..7a37cca 100644 --- a/incite_test.go +++ b/incite_test.go @@ -973,6 +973,7 @@ func TestQueryManager_Query(t *testing.T) { expectedGroups []*string expectedNext int64 expectedCauseErr error + expectedStats Stats }{ { name: "Zero", @@ -996,6 +997,11 @@ func TestQueryManager_Query(t *testing.T) { expectedGroups: []*string{sp("bar"), sp("Baz")}, expectedNext: 1, expectedCauseErr: causeErr, + expectedStats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { name: "ChunkExceedsRange", @@ -1020,6 +1026,11 @@ func TestQueryManager_Query(t *testing.T) { expectedGroups: []*string{sp("bar"), sp("Baz")}, expectedNext: 1, expectedCauseErr: causeErr, + expectedStats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { name: "PartialChunk", @@ -1046,6 +1057,11 @@ func TestQueryManager_Query(t *testing.T) { expectedGroups: []*string{sp("bar"), sp("Baz")}, expectedNext: 1, expectedCauseErr: causeErr, + expectedStats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: 3 * time.Minute, + RangeFailed: 3 * time.Minute, + }, }, { name: "MissingQueryID", @@ -1070,6 +1086,11 @@ func TestQueryManager_Query(t *testing.T) { expectedGroups: []*string{sp("eggs"), sp("Spam")}, expectedNext: 1, expectedCauseErr: errors.New(outputMissingQueryIDMsg), + expectedStats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, } @@ -1108,13 +1129,15 @@ func TestQueryManager_Query(t *testing.T) { var sqe *StartQueryError assert.ErrorAs(t, err, &sqe) assert.Equal(t, sqe.Cause, testCase.expectedCauseErr) - assert.Equal(t, Stats{}, s.GetStats()) + assert.Equal(t, testCase.expectedStats, s.GetStats()) + s.GetStats().checkInvariants(t, true, false) err = s.Close() assert.NoError(t, err) err = s.Close() assert.Same(t, ErrClosed, err) - assert.Equal(t, Stats{}, s.GetStats()) + assert.Equal(t, testCase.expectedStats, s.GetStats()) + s.GetStats().checkInvariants(t, true, false) actions.AssertExpectations(t) }) @@ -2222,6 +2245,11 @@ var scenarios = []queryScenario{ }, }, err: &StartQueryError{"a poorly written query", defaultStart, defaultEnd, cwlErr(cloudwatchlogs.ErrCodeInvalidParameterException, "terrible query writing there bud")}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { note: "NoStart.UnexpectedError", @@ -2245,6 +2273,11 @@ var scenarios = []queryScenario{ }, }, err: &StartQueryError{"an ill-fated query", defaultStart, defaultEnd, errors.New("pow exclamation point")}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { @@ -2272,6 +2305,11 @@ var scenarios = []queryScenario{ }, }, }, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, closeAfter: true, }, { @@ -2301,6 +2339,11 @@ var scenarios = []queryScenario{ }, }, err: &TerminalQueryStatusError{"scenario:3|chunk:0|OneChunk.OnePoll.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "destined for cancellation"}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { note: "OneChunk.OnePoll.Status.Timeout", @@ -2328,6 +2371,11 @@ var scenarios = []queryScenario{ }, }, err: &TerminalQueryStatusError{"scenario:4|chunk:0|OneChunk.OnePoll.Status.Timeout", "Timeout", "tempting a timeout"}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { note: "OneChunk.OnePoll.Status.Unexpected", @@ -2354,8 +2402,13 @@ var scenarios = []queryScenario{ }, }, }, + err: &TerminalQueryStatusError{"scenario:5|chunk:0|OneChunk.OnePoll.Status.Unexpected", "Did you see this coming?", "expecting the unexpected...status"}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, expectStop: true, - err: &TerminalQueryStatusError{"scenario:5|chunk:0|OneChunk.OnePoll.Status.Unexpected", "Did you see this coming?", "expecting the unexpected...status"}, }, { note: "OneChunk.OnePoll.Error.Unexpected", @@ -2382,8 +2435,13 @@ var scenarios = []queryScenario{ }, }, }, + err: &UnexpectedQueryError{"scenario:6|chunk:0|OneChunk.OnePoll.Error.Unexpected", "expecting the unexpected...error", errors.New("very bad news")}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, expectStop: true, - err: &UnexpectedQueryError{"scenario:6|chunk:0|OneChunk.OnePoll.Error.Unexpected", "expecting the unexpected...error", errors.New("very bad news")}, }, { note: "OneChunk.OnePoll.WithResults", @@ -2432,7 +2490,14 @@ var scenarios = []queryScenario{ {"@MyField", "goodbye"}, }, }, - stats: Stats{1, 2, 3, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 1, + RecordsMatched: 2, + RecordsScanned: 3, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { @@ -2513,7 +2578,14 @@ var scenarios = []queryScenario{ {"MyField", "world"}, }, }, - stats: Stats{100, 99, 98, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 100, + RecordsMatched: 99, + RecordsScanned: 98, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { @@ -2543,6 +2615,11 @@ var scenarios = []queryScenario{ }, }, err: &TerminalQueryStatusError{"scenario:9|chunk:0|OneChunk.Preview.Status.Failed", cloudwatchlogs.QueryStatusFailed, "fated for failure"}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { note: "OneChunk.Preview.Status.Cancelled", @@ -2571,6 +2648,11 @@ var scenarios = []queryScenario{ }, }, err: &TerminalQueryStatusError{"scenario:10|chunk:0|OneChunk.Preview.Status.Cancelled", cloudwatchlogs.QueryStatusCancelled, "preview of coming cancellations"}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { note: "OneChunk.Preview.Status.Timeout", @@ -2599,6 +2681,11 @@ var scenarios = []queryScenario{ }, }, err: &TerminalQueryStatusError{"scenario:11|chunk:0|OneChunk.Preview.Status.Timeout", "Timeout", "preview of coming timeouts"}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, }, { note: "OneChunk.Preview.Status.Unexpected", @@ -2626,8 +2713,13 @@ var scenarios = []queryScenario{ }, }, }, + err: &TerminalQueryStatusError{"scenario:12|chunk:0|OneChunk.Preview.Status.Unexpected", "I did NOT see this coming!", "preview of coming surprises..."}, + stats: Stats{ + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeFailed: defaultDuration, + }, expectStop: true, - err: &TerminalQueryStatusError{"scenario:12|chunk:0|OneChunk.Preview.Status.Unexpected", "I did NOT see this coming!", "preview of coming surprises..."}, }, { @@ -2716,7 +2808,14 @@ var scenarios = []queryScenario{ {{"Foo", "Foo.4.0"}, {"Bar", "Bar.4.0"}, {"@ptr", "4"}}, {{"@ptr", "2"}, {"@deleted", "true"}}, }, - stats: Stats{6, 12, 18, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 6, + RecordsMatched: 12, + RecordsScanned: 18, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { note: "OneChunk.Preview.SimulateStatsCommand.NoPtr", @@ -2775,7 +2874,14 @@ var scenarios = []queryScenario{ {{"count_distinct(Foo)", "41"}, {"bar", "eggs"}}, {{"count_distinct(Foo)", "10"}, {"bar", "spam"}}, }, - stats: Stats{4, 5, 8, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 4, + RecordsMatched: 5, + RecordsScanned: 8, + RangeRequested: defaultDuration + 2*time.Hour, + RangeStarted: defaultDuration + 2*time.Hour, + RangeDone: defaultDuration + 2*time.Hour, + }, }, { @@ -2882,7 +2988,14 @@ var scenarios = []queryScenario{ }, }, results: maxLimitResults, - stats: Stats{15, 15, 15, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 15, + RecordsMatched: 15, + RecordsScanned: 15, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { @@ -2920,7 +3033,14 @@ var scenarios = []queryScenario{ {{"EggCount", "1"}, {"Spam", "true"}}, {{"EggCount", "2"}, {"Span", "false"}}, }, - stats: Stats{77, 777, 7, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 77, + RecordsMatched: 777, + RecordsScanned: 7, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { @@ -2968,7 +3088,14 @@ var scenarios = []queryScenario{ {{"@ptr", "1111"}, {"Something", "wicked this way comes"}}, {{"@ptr", "2222"}, {"Something", "else"}}, }, - stats: Stats{13, 8, 3, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 13, + RecordsMatched: 8, + RecordsScanned: 3, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { @@ -3033,7 +3160,14 @@ var scenarios = []queryScenario{ return r[i].get("@timestamp") < r[j].get("@timestamp") }) }, - stats: Stats{3, 3, 2, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 3, + RecordsMatched: 3, + RecordsScanned: 2, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { @@ -3108,7 +3242,14 @@ var scenarios = []queryScenario{ return r[i].get("@ptr") < r[j].get("@ptr") }) }, - stats: Stats{100, 100, 100, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 100, + RecordsMatched: 100, + RecordsScanned: 100, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { @@ -3191,7 +3332,14 @@ var scenarios = []queryScenario{ return r[i].get("@ptr") < r[j].get("@ptr") }) }, - stats: Stats{132, 165, 198, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 132, + RecordsMatched: 165, + RecordsScanned: 198, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, { @@ -3321,7 +3469,14 @@ var scenarios = []queryScenario{ return pi < pj }) }, - stats: Stats{2, 2, 2, 0, 0, 0, 0}, + stats: Stats{ + BytesScanned: 2, + RecordsMatched: 2, + RecordsScanned: 2, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, }, } @@ -3344,10 +3499,14 @@ func (qs *queryScenario) test(t *testing.T, i int, m QueryManager, actions *mock t.Parallel() } qs.play(t, i, m, actions) + m.GetStats().checkInvariants(t, false, false) }) } func (qs *queryScenario) play(t *testing.T, i int, m QueryManager, actions *mockActions) { + // Validate the expected status in advance. + qs.stats.checkInvariants(t, true, false) + // Set up the chunk polling scenarios. for j := range qs.chunks { qs.chunks[j].setup(i, j, qs.note, qs.closeEarly, qs.expectStop, actions) @@ -3512,6 +3671,28 @@ func (s *Stats) backOut() *cloudwatchlogs.QueryStatistics { } } +func (s Stats) checkInvariants(t *testing.T, done bool, success bool) { + // The following invariants must always be true regardless of whether + // the Stream or QueryManager finished all requested work. + assert.GreaterOrEqual(t, s.RangeRequested, s.RangeStarted, "RangeRequested must be greater than or equal to RangeStarted") + assert.GreaterOrEqual(t, s.RangeStarted, s.RangeDone, "RangeStarted must be greater than or equal to RangeDone") + assert.GreaterOrEqual(t, s.RangeStarted, s.RangeFailed, "RangeStarted must be greater than or equal to RangeFailed") + assert.GreaterOrEqual(t, s.RangeStarted, s.RangeDone+s.RangeFailed, "RangeStarted must be greater than or equal to the sum of RangeDone + RangeFailed") + // The following invariants are always true when the Stream or + // QueryManager has finished all requested work even if some of it + // failed. + if done { + assert.Equal(t, s.RangeDone+s.RangeFailed, s.RangeStarted, "the sum of RangeDone + RangeFailed must equal RangeStarted if all work is done") + } + // The following invariants are always true when the Stream or + // QueryManager has finished all requested work successfully. + if success { + assert.Equal(t, s.RangeStarted, s.RangeRequested, "RangeStarted must equal RangeRequested if all work finished successfully") + assert.Equal(t, s.RangeDone, s.RangeStarted, "RangeDone must equal RangeStarted if all work finished successfully") + assert.Equal(t, 0, s.RangeFailed, "RangeFailed must equal zero if all work finished successfully") + } +} + func int64p(i int64) *int64 { return &i }