From 13e29851f2682cb1fb0b8ec705b52e610ba13861 Mon Sep 17 00:00:00 2001 From: Victor Schappert Date: Sun, 29 Jan 2023 08:31:43 -0800 Subject: [PATCH] Fix end-before-start bug (issue#25) https://github.com/gogama/incite/issues/25 As suggested, the fix was to separate `mgr.n` into two fields to reflect the two competing jobs it was trying to perform. (Tracking the number of generation 0 chunks on one hand, for the purpose of determing whether to remove a stream from `mgr.pq`; and tracking the total number of known chunks for the stream, for the purpose of knowing whether the chunk is EOF.) In implementing this commit, I realized that `mgr.n` was incorrectly listed in the "immutable" section of `type mgr struct`, so that has also been fixed. This incorrect placement would have occurred stealthily during implementation of dynamic chunk splitting. --- mgr.go | 5 +++-- stream.go | 9 +++++---- stream_test.go | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/mgr.go b/mgr.go index 016b113..9d47980 100644 --- a/mgr.go +++ b/mgr.go @@ -191,8 +191,9 @@ func (m *mgr) Query(q QuerySpec) (s Stream, err error) { ctx: ctx, cancel: cancel, - n: n, + n0: n, groups: groups, + n: n, stats: Stats{ RangeRequested: d, }, @@ -399,7 +400,7 @@ func (m *mgr) getReadyChunk() *chunk { start, end := s.nextChunkRange() chunkID := strconv.Itoa(int(s.next)) s.next++ - if s.next < s.n { + if s.next < s.n0 { heap.Push(&m.pq, s) } diff --git a/stream.go b/stream.go index 5ac68ab..fd2aae1 100644 --- a/stream.go +++ b/stream.go @@ -16,12 +16,13 @@ type stream struct { // Immutable fields. ctx context.Context // Stream context used to parent chunk contexts cancel context.CancelFunc // Cancels ctx when the stream is closed - n int64 // Number of total chunks + n0 int64 // Number of generation 0 chunks groups []*string // Preprocessed slice for StartQuery // Mutable fields only read/written by mgr loop goroutine. - next int64 // Next chunk to create - m int64 // Number of chunks completed + next int64 // Next generation 0 chunk to create + m int64 // Number of chunks completed so far + n int64 // Number of chunks (may grow beyond n0 due to splitting) // Lock controlling access to the below mutable fields. lock sync.RWMutex @@ -121,7 +122,7 @@ func (s *stream) alive() bool { func (s *stream) nextChunkRange() (start, end time.Time) { // For a single-chunk query, the chunk range is always the query // range. - if s.n == 1 && s.Chunk == s.End.Sub(s.Start) { + if s.n0 == 1 && s.Chunk == s.End.Sub(s.Start) { start, end = s.Start, s.End return } diff --git a/stream_test.go b/stream_test.go index 2745812..4a9532e 100644 --- a/stream_test.go +++ b/stream_test.go @@ -228,7 +228,7 @@ func TestStream_NextChunkRange(t *testing.T) { End: defaultEnd.Add(time.Hour), Chunk: 2*time.Hour + defaultDuration, }, - n: 1, + n0: 1, next: 0, }, c: chunk{