diff --git a/mgr.go b/mgr.go index bb06a70..eaa7b74 100644 --- a/mgr.go +++ b/mgr.go @@ -199,8 +199,9 @@ func (m *mgr) Query(q QuerySpec) (s Stream, err error) { ctx: ctx, cancel: cancel, - n: n, + n0: n, groups: groups, + n: n, stats: Stats{ RangeRequested: d, }, @@ -409,7 +410,7 @@ func (m *mgr) getReadyChunk() *chunk { start, end := s.nextChunkRange() chunkID := strconv.Itoa(int(s.next)) s.next++ - if s.next < s.n { + if s.next < s.n0 { heap.Push(&m.pq, s) } diff --git a/stream.go b/stream.go index 5ac68ab..fd2aae1 100644 --- a/stream.go +++ b/stream.go @@ -16,12 +16,13 @@ type stream struct { // Immutable fields. ctx context.Context // Stream context used to parent chunk contexts cancel context.CancelFunc // Cancels ctx when the stream is closed - n int64 // Number of total chunks + n0 int64 // Number of generation 0 chunks groups []*string // Preprocessed slice for StartQuery // Mutable fields only read/written by mgr loop goroutine. - next int64 // Next chunk to create - m int64 // Number of chunks completed + next int64 // Next generation 0 chunk to create + m int64 // Number of chunks completed so far + n int64 // Number of chunks (may grow beyond n0 due to splitting) // Lock controlling access to the below mutable fields. lock sync.RWMutex @@ -121,7 +122,7 @@ func (s *stream) alive() bool { func (s *stream) nextChunkRange() (start, end time.Time) { // For a single-chunk query, the chunk range is always the query // range. - if s.n == 1 && s.Chunk == s.End.Sub(s.Start) { + if s.n0 == 1 && s.Chunk == s.End.Sub(s.Start) { start, end = s.Start, s.End return } diff --git a/stream_test.go b/stream_test.go index 2745812..4a9532e 100644 --- a/stream_test.go +++ b/stream_test.go @@ -228,7 +228,7 @@ func TestStream_NextChunkRange(t *testing.T) { End: defaultEnd.Add(time.Hour), Chunk: 2*time.Hour + defaultDuration, }, - n: 1, + n0: 1, next: 0, }, c: chunk{