Skip to content

Commit

Permalink
Emit progress statistics in Stats (issue #1)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcschapp committed Dec 22, 2021
1 parent f8b8013 commit b91eac1
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 46 deletions.
130 changes: 100 additions & 30 deletions incite.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,19 @@ type QuerySpec struct {
// returned from a QueryManager, Stats contains accumulated metadata
// about all queries executed by the query manager.
//
// The Stats structure contains two types of metadata. The first type of
// metadata are returned from the CloudWatch Logs Insights web service
// and consist of metrics about the amount of data scanned by the query
// or queries. The second type of metadata are collected by Incite and
// The Stats structure contains two types of metadata.
//
// The first type of metadata in Stats are returned from the CloudWatch
// Logs Insights web service and consist of metrics about the amount of
// data scanned by the query or queries. These metadata are contained
// within the fields BytesScanned, RecordsMatched, and RecordsScanned.
//
// The second type of metadata in Stats are collected by Incite and
// consist of metrics about the size of the time range or ranges queried
// and how much progress has been on the queries.
// and how much progress has been on the queries. These metadata can be
// useful, for example, for showing progress bars or other work in
// progress indicators. They are contained within the fields
// RangeRequested, RangeStarted, RangeDone, and RangeFailed.
type Stats struct {
// BytesScanned is a metric returned by CloudWatch Logs Insights
// which represents the total number of bytes of log events
Expand Down Expand Up @@ -619,6 +626,13 @@ func (m *mgr) setTimerRPS(action CloudWatchLogsAction) bool {
return m.setTimer(delayRem)
}

func (m *mgr) addQuery(s *stream) {
heap.Push(&m.pq, s)
m.doStats(func(ms *Stats) {
ms.RangeRequested += s.stats.RangeRequested
})
}

func (m *mgr) startNextChunks() int {
var numStarted int
for len(m.pq)+m.numReady > 0 && m.chunks.Len() <= m.Parallel {
Expand Down Expand Up @@ -651,7 +665,7 @@ func (m *mgr) startNextChunk() error {
case <-m.close:
return errClosing
case s := <-m.query:
heap.Push(&m.pq, s)
m.addQuery(s)
return errInterrupted
case <-m.timer.C:
m.ding = true
Expand Down Expand Up @@ -685,7 +699,16 @@ func (m *mgr) startNextChunk() error {
m.logChunk(c, "temporary failure to start", err.Error())
} else {
err = &StartQueryError{c.stream.Text, c.start, c.end, err}
c.stream.setErr(err, true, Stats{})
s := Stats{
RangeFailed: c.end.Sub(c.start),
}
if c.gen == 0 {
s.RangeStarted = s.RangeFailed
}
c.stream.setErr(err, true, s)
m.doStats(func(ms *Stats) {
ms.add(&s)
})
m.logChunk(c, "permanent failure to start", "fatal error from CloudWatch Logs: "+err.Error())
}
return err
Expand All @@ -695,12 +718,31 @@ func (m *mgr) startNextChunk() error {
queryID := output.QueryId
if queryID == nil {
err = &StartQueryError{c.stream.Text, c.start, c.end, errors.New(outputMissingQueryIDMsg)}
c.stream.setErr(err, true, Stats{})
s := Stats{
RangeFailed: c.end.Sub(c.start),
}
if c.gen == 0 {
s.RangeStarted = s.RangeFailed
}
c.stream.setErr(err, true, s)
m.doStats(func(ms *Stats) {
ms.add(&s)
})
m.logChunk(c, "nil query ID from CloudWatch Logs for", "")
return err
}
c.queryID = *queryID

// For initial chunks (not resulting from splits), count the chunks'
// time range toward the range started statistic.
if c.gen == 0 {
d := c.end.Sub(c.start)
m.coStats(c.stream, func(ms, ss *Stats) {
ms.RangeStarted += d
ss.RangeStarted += d
})
}

// Put the chunk at the end of the running chunks list.
m.chunks.Prev().Link(r)
m.numChunks++
Expand Down Expand Up @@ -806,22 +848,22 @@ func (m *mgr) pollNextChunk() int {
return -1
} else if err == errInterrupted {
return 0
} else if err == errChunkFailed {
} else if err == errRestartChunk {
c.chunkID += "R"
c.queryID = ""
m.numChunks--
m.chunks.Unlink(1)
m.numReady++
m.ready.Prev().Link(r)
return 1
} else if err != nil && err != errEndOfChunk {
} else if err != nil && err != errEndOfChunk && err != errSplitChunk {
m.numChunks--
m.chunks.Unlink(1)
m.finalizeChunk(c, err)
m.doStats(func(ms *Stats) {
ms.add(&c.Stats)
})
c.stream.setErr(err, true, c.Stats)
m.statsLock.Lock()
m.stats.add(&c.Stats)
m.statsLock.Unlock()
m.cancelChunkMaybe(c, err)
continue
}

Expand All @@ -831,13 +873,18 @@ func (m *mgr) pollNextChunk() int {
m.chunks.Unlink(1)
if err == errEndOfChunk {
m.numChunks--
m.statsLock.Lock()
c.stream.lock.Lock()
m.stats.add(&c.Stats)
c.stream.stats.add(&c.Stats)
m.statsLock.Unlock()
c.stream.lock.Unlock()
c.Stats.RangeDone += c.end.Sub(c.start)
m.coStats(c.stream, func(ms, ss *Stats) {
ms.add(&c.Stats)
ss.add(&c.Stats)
})
m.logChunk(c, "finished", "end of chunk")
} else if err == errSplitChunk {
m.numChunks--
m.coStats(c.stream, func(ms, ss *Stats) {
ms.add(&c.Stats)
ss.add(&c.Stats)
})
} else {
m.chunks.Prev().Link(r)
}
Expand All @@ -856,7 +903,7 @@ func (m *mgr) pollChunk(c *chunk) error {
case <-m.close:
return errClosing
case s := <-m.query:
heap.Push(&m.pq, s)
m.addQuery(s)
return errInterrupted
case <-m.timer.C:
m.ding = true
Expand Down Expand Up @@ -893,13 +940,13 @@ func (m *mgr) pollChunk(c *chunk) error {
case cloudwatchlogs.QueryStatusComplete:
translateStats(output.Statistics, &c.Stats)
if m.splitChunk(c, len(output.Results)) {
return errEndOfChunk
return errSplitChunk
}
return sendChunkBlock(c, output.Results, true)
case cloudwatchlogs.QueryStatusFailed:
if c.ptr == nil {
translateStats(output.Statistics, &c.Stats)
return errChunkFailed // Retry transient failures if stream isn't previewable.
return errRestartChunk // Retry transient failures if stream isn't previewable.
}
fallthrough
default:
Expand Down Expand Up @@ -940,6 +987,7 @@ func (m *mgr) splitChunk(c *chunk, n int) bool {
return &chunk{
stream: parent.stream,
ctx: context.WithValue(parent.stream.ctx, chunkIDKey, chunkID),
gen: parent.gen + 1,
chunkID: chunkID,
start: start,
end: end,
Expand Down Expand Up @@ -995,6 +1043,7 @@ func (m *mgr) cancelChunk(c *chunk, err error) {
QueryId: &c.queryID,
})
m.lastReq[StopQuery] = time.Now()
c.Stats.RangeFailed += c.end.Sub(c.start)
if err != nil {
m.logChunk(c, "failed to cancel", "error from CloudWatch Logs: "+err.Error())
} else if output.Success == nil || !*output.Success {
Expand All @@ -1004,19 +1053,21 @@ func (m *mgr) cancelChunk(c *chunk, err error) {
}
}

func (m *mgr) cancelChunkMaybe(c *chunk, err error) {
func (m *mgr) finalizeChunk(c *chunk, err error) {
if err == io.EOF {
c.Stats.RangeDone += c.end.Sub(c.start)
m.logChunk(c, "finished", "end of stream")
return
}
if terminalErr, ok := err.(*TerminalQueryStatusError); ok {
switch terminalErr.Status {
case cloudwatchlogs.QueryStatusFailed, cloudwatchlogs.QueryStatusCancelled, "Timeout":
c.Stats.RangeFailed += c.end.Sub(c.start)
m.logChunk(c, "unexpected terminal status", terminalErr.Status)
return
}
}
m.cancelChunk(c, err)
m.cancelChunk(c, nil)
}

func (m *mgr) waitForWork() int {
Expand All @@ -1030,7 +1081,7 @@ func (m *mgr) waitForWork() int {
case <-m.close:
return -1
case s := <-m.query:
heap.Push(&m.pq, s)
m.addQuery(s)
return 0
case <-m.timer.C:
m.ding = true
Expand All @@ -1050,6 +1101,20 @@ func (m *mgr) logChunk(c *chunk, msg, detail string) {
}
}

func (m *mgr) doStats(f func(ms *Stats)) {
m.statsLock.Lock()
defer m.statsLock.Unlock()
f(&m.stats)
}

func (m *mgr) coStats(s *stream, f func(ms, ss *Stats)) {
m.statsLock.Lock()
defer m.statsLock.Unlock()
s.lock.Lock()
defer s.lock.Unlock()
f(&m.stats, &s.stats)
}

func sendChunkBlock(c *chunk, results [][]*cloudwatchlogs.ResultField, eof bool) error {
var block []Result
var err error
Expand Down Expand Up @@ -1275,6 +1340,9 @@ func (m *mgr) Query(q QuerySpec) (s Stream, err error) {
cancel: cancel,
n: n,
groups: groups,
stats: Stats{
RangeRequested: d,
},
}
ss.more = sync.NewCond(&ss.lock)

Expand Down Expand Up @@ -1444,6 +1512,7 @@ type chunk struct {
Stats
stream *stream // Owning stream which receives results of the chunk
ctx context.Context // Child of the stream's context owned by this chunk
gen int // Generation, zero if an initial chunk, positive if it came from a split
chunkID string // Incite chunk ID
queryID string // Insights query ID
status string // Insights query status
Expand All @@ -1453,8 +1522,9 @@ type chunk struct {
}

var (
errClosing = errors.New("incite: closed")
errEndOfChunk = errors.New("incite: end of chunk")
errInterrupted = errors.New("incite: timer wait interrupted")
errChunkFailed = errors.New("incite: transient chunk failure")
errClosing = errors.New("incite: closed")
errEndOfChunk = errors.New("incite: end of chunk")
errInterrupted = errors.New("incite: timer wait interrupted")
errSplitChunk = errors.New("incite: chunk split")
errRestartChunk = errors.New("incite: transient chunk failure, restart chunk")
)
Loading

0 comments on commit b91eac1

Please sign in to comment.