Skip to content

Commit

Permalink
Fix end-before-start bug (issue#25)
Browse files Browse the repository at this point in the history
#25

As suggested, the fix was to separate `mgr.n` into two fields to
reflect the two competing jobs it was trying to perform. (Tracking
the number of generation 0 chunks on one hand, for the purpose of
determing whether to remove a stream from `mgr.pq`; and tracking
the total number of known chunks for the stream, for the purpose of
knowing whether the chunk is EOF.)

In implementing this commit, I realized that `mgr.n` was incorrectly
listed in the "immutable" section of `type mgr struct`, so that has
also been fixed. This incorrect placement would have occurred
stealthily during implementation of dynamic chunk splitting.
  • Loading branch information
vcschapp committed Jan 30, 2023
1 parent a0abf53 commit 13e2985
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
5 changes: 3 additions & 2 deletions mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,9 @@ func (m *mgr) Query(q QuerySpec) (s Stream, err error) {

ctx: ctx,
cancel: cancel,
n: n,
n0: n,
groups: groups,
n: n,
stats: Stats{
RangeRequested: d,
},
Expand Down Expand Up @@ -399,7 +400,7 @@ func (m *mgr) getReadyChunk() *chunk {
start, end := s.nextChunkRange()
chunkID := strconv.Itoa(int(s.next))
s.next++
if s.next < s.n {
if s.next < s.n0 {
heap.Push(&m.pq, s)
}

Expand Down
9 changes: 5 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ type stream struct {
// Immutable fields.
ctx context.Context // Stream context used to parent chunk contexts
cancel context.CancelFunc // Cancels ctx when the stream is closed
n int64 // Number of total chunks
n0 int64 // Number of generation 0 chunks
groups []*string // Preprocessed slice for StartQuery

// Mutable fields only read/written by mgr loop goroutine.
next int64 // Next chunk to create
m int64 // Number of chunks completed
next int64 // Next generation 0 chunk to create
m int64 // Number of chunks completed so far
n int64 // Number of chunks (may grow beyond n0 due to splitting)

// Lock controlling access to the below mutable fields.
lock sync.RWMutex
Expand Down Expand Up @@ -121,7 +122,7 @@ func (s *stream) alive() bool {
func (s *stream) nextChunkRange() (start, end time.Time) {
// For a single-chunk query, the chunk range is always the query
// range.
if s.n == 1 && s.Chunk == s.End.Sub(s.Start) {
if s.n0 == 1 && s.Chunk == s.End.Sub(s.Start) {
start, end = s.Start, s.End
return
}
Expand Down
2 changes: 1 addition & 1 deletion stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestStream_NextChunkRange(t *testing.T) {
End: defaultEnd.Add(time.Hour),
Chunk: 2*time.Hour + defaultDuration,
},
n: 1,
n0: 1,
next: 0,
},
c: chunk{
Expand Down

0 comments on commit 13e2985

Please sign in to comment.