diff --git a/errors.go b/errors.go index 9227092..383042f 100644 --- a/errors.go +++ b/errors.go @@ -203,8 +203,9 @@ const ( ) var ( - errClosing = errors.New("incite: closing") - errStopChunk = errors.New("incite: owning stream died, cancel chunk") - errRestartChunk = errors.New("incite: transient chunk failure, restart chunk") - errSplitChunk = errors.New("incite: chunk maxed, split chunk") + errClosing = errors.New("incite: closing") + errReduceParallel = errors.New("incite: exceeded concurrency limit, reduce parallelism") + errStopChunk = errors.New("incite: owning stream died, cancel chunk") + errRestartChunk = errors.New("incite: transient chunk failure, restart chunk") + errSplitChunk = errors.New("incite: chunk maxed, split chunk") ) diff --git a/incite_test.go b/incite_test.go index 8fe657c..db03952 100644 --- a/incite_test.go +++ b/incite_test.go @@ -1628,6 +1628,71 @@ var scenarios = []queryScenario{ }, }, + // This scenario involves two chunks, both of which ultimately + // succeed, but which both hit the query concurrency quota limit + // several times before being successfully started. + { + note: "MultiChunk.RepeatedConcurrencyLimitError", + QuerySpec: QuerySpec{ + Text: "truckin'", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/grateful/dead", "/american/beauty"}, + Chunk: defaultDuration / 2, + }, + chunks: []chunkPlan{ + { + startQueryInput: startQueryInput("truckin'", defaultStart, defaultStart.Add(defaultDuration/2), DefaultLimit, "/grateful/dead", "/american/beauty"), + startQuerySuccess: true, + startQueryErrs: []error{ + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "truckin', got my chips cashed in", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "like the do-dah man", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "together, more or less in line", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "just keep truckin' on", nil), + }, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + stats: &Stats{0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + { + startQueryInput: startQueryInput("truckin'", defaultStart.Add(defaultDuration/2), defaultEnd, DefaultLimit, "/grateful/dead", "/american/beauty"), + startQuerySuccess: true, + startQueryErrs: []error{ + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "arrows of neon and flashing marquees out on main street", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "chicago, new york, detroit", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "and it's all on the same street", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "your typical city involved in a typical day dream", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "hang it up and see what tomorrow brings", nil), + }, + pollOutputs: []chunkPollOutput{ + { + results: []Result{ + {{"@ptr", "dallas, got a soft machine"}, {"@message", "houston, too close to new orleans"}}, + {{"@ptr", "new york's got the ways and means"}, {"@message", "but it just won't let you go, oh no"}}, + }, + status: cloudwatchlogs.QueryStatusComplete, + stats: &Stats{10, 2, 3, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + results: []Result{ + {{"@ptr", "dallas, got a soft machine"}, {"@message", "houston, too close to new orleans"}}, + {{"@ptr", "new york's got the ways and means"}, {"@message", "but it just won't let you go, oh no"}}, + }, + stats: Stats{ + BytesScanned: 10, + RecordsMatched: 2, + RecordsScanned: 3, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, + }, + // This is the last scenario, and it is meant to be a super test case that, // by itself, runs more chunks than the QueryManager can run in parallel. // @@ -1953,6 +2018,7 @@ var scenarios = []queryScenario{ // CHUNK 20 [split sub-chunk 1/4]. { startQueryInput: startQueryInput("vikings", defaultStart.Add(1_170*time.Minute), defaultStart.Add(1_170*time.Minute+450*time.Second), MaxLimit, "/frihed/februar/første"), + startQueryErrs: []error{cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "y'all have too many dang queries goin' on..., nil")}, startQuerySuccess: true, pollOutputs: []chunkPollOutput{ { diff --git a/mgr.go b/mgr.go index eaa7b74..5500ece 100644 --- a/mgr.go +++ b/mgr.go @@ -24,6 +24,7 @@ type mgr struct { close chan struct{} // Receives notification on Close() pq streamHeap // Written by Query, written by mgr loop goroutine ready ring.Ring // Chunks ready to start + parallel adapter // Parallelism: parallelMin <= parallel.value() <= Config.Parallel numReady int // Number of chunks ready to start numStarting int // Number of chunks handed off to starter numPolling int // Number of chunks handed off to poller @@ -50,6 +51,12 @@ type mgr struct { stopper *stopper } +const ( + parallelUpPerS = 0.5 + parallelDownStep = 1.0 + parallelMin = 1.0 +) + // NewQueryManager returns a new query manager with the given // configuration. func NewQueryManager(cfg Config) QueryManager { @@ -74,6 +81,15 @@ func NewQueryManager(cfg Config) QueryManager { m := &mgr{ Config: cfg, + parallel: &standardAdapter{ + val: float64(cfg.Parallel), + max: float64(cfg.Parallel), + upPerS: parallelUpPerS, + min: parallelMin, + downStep: parallelDownStep, + last: time.Now(), + }, + close: make(chan struct{}), query: make(chan *stream), @@ -239,9 +255,7 @@ func (m *mgr) loop() { return } - // TODO: We need to find a way to feed back start query errors - // from starter and dynamically change m.Parallel. - for m.numStarting+m.numPolling+m.numStopping < m.Parallel { + for m.numStarting+m.numPolling+m.numStopping < int(m.parallel.value()) { c := m.getReadyChunk() if c == nil { break @@ -314,14 +328,11 @@ func (m *mgr) handleChunk(c *chunk) { switch c.state { case starting: m.numStarting-- - c.started() - m.killStream(c) + m.handleStartingError(c) case started: m.numStarting-- m.numPolling++ - c.started() - c.state = polling - m.poll <- c + m.handleChunkStarted(c) case polling: m.numPolling-- m.handlePollingError(c) @@ -342,6 +353,29 @@ func (m *mgr) makeReady(c *chunk) { m.numReady++ } +func (m *mgr) handleStartingError(c *chunk) { + if c.err == errReduceParallel { + if m.parallel.decrease() { + m.logChunk(c, fmt.Sprintf("reduced parallelism to: %.4f", m.parallel.value()), "") + } + m.makeReady(c) + return + } + + c.started() + m.killStream(c) +} + +func (m *mgr) handleChunkStarted(c *chunk) { + c.started() + c.state = polling + m.poll <- c + + if m.parallel.increase() { + m.logChunk(c, fmt.Sprintf("increased parallelism to: %.4f", m.parallel.value()), "") + } +} + func (m *mgr) handlePollingError(c *chunk) { if c.err == errRestartChunk { c.chunkID += "R" diff --git a/mgr_test.go b/mgr_test.go index c78abbc..d9816a9 100644 --- a/mgr_test.go +++ b/mgr_test.go @@ -2019,7 +2019,7 @@ func TestQueryManager_Query(t *testing.T) { t.Run("Query Fails with Error if Chunk Exceeds Max Temporary Errors", func(t *testing.T) { text := "a query destined to exceed all maxima on temporary errors" groups := []string{"grpA", "grpB"} - expectedErr := cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "you been limited") + expectedErr := cwlErr(cloudwatchlogs.ErrCodeServiceUnavailableException, "we lacking service") testCases := []struct { name string diff --git a/starter.go b/starter.go index 8c73e6b..f23f8e4 100644 --- a/starter.go +++ b/starter.go @@ -64,9 +64,9 @@ func (s *starter) manipulate(c *chunk) outcome { case throttlingClass: return throttlingError case limitExceededClass: - // TODO: Pass this information up to mgr. - // TODO: Log. - fallthrough + s.m.logChunk(c, "exceeded query concurrency limit", "temporary error from CloudWatch Logs: "+err.Error()) + c.err = errReduceParallel + return finished case temporaryClass: return temporaryError default: diff --git a/starter_test.go b/starter_test.go index 61833a1..b3339ed 100644 --- a/starter_test.go +++ b/starter_test.go @@ -72,11 +72,12 @@ func TestStarter_manipulate(t *testing.T) { queryID := "eggs" testCases := []struct { - name string - setup func(t *testing.T, logger *mockLogger) - output *cloudwatchlogs.StartQueryOutput - err error - expected outcome + name string + setup func(t *testing.T, logger *mockLogger) + output *cloudwatchlogs.StartQueryOutput + err error + expectedErr error + expected outcome }{ { name: "Throttling Error", @@ -84,9 +85,14 @@ func TestStarter_manipulate(t *testing.T) { expected: throttlingError, }, { - name: "Limit Exceeded Error", - err: awserr.New(cloudwatchlogs.ErrCodeLimitExceededException, "too many queries!", nil), - expected: temporaryError, + name: "Limit Exceeded Error", + setup: func(t *testing.T, logger *mockLogger) { + logger.expectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", t.Name(), + "exceeded query concurrency limit", chunkID, text, start, end, "temporary error from CloudWatch Logs: LimitExceededException: too many queries!") + }, + err: awserr.New(cloudwatchlogs.ErrCodeLimitExceededException, "too many queries!", nil), + expectedErr: errReduceParallel, + expected: finished, }, { name: "Temporary Error", @@ -142,6 +148,9 @@ func TestStarter_manipulate(t *testing.T) { if testCase.setup != nil { testCase.setup(t, logger) } + if testCase.err != nil && testCase.expectedErr == nil { + testCase.expectedErr = &StartQueryError{text, start, end, testCase.err} + } c := &chunk{ stream: &stream{ QuerySpec: QuerySpec{ @@ -163,8 +172,8 @@ func TestStarter_manipulate(t *testing.T) { assert.Equal(t, testCase.expected, actual) assert.GreaterOrEqual(t, s.lastReq.Sub(before), time.Duration(0)) assert.GreaterOrEqual(t, after.Sub(s.lastReq), time.Duration(0)) - if testCase.err != nil { - assert.Equal(t, &StartQueryError{text, start, end, testCase.err}, c.err) + if testCase.expectedErr != nil { + assert.Equal(t, testCase.expectedErr, c.err) } actions.AssertExpectations(t) logger.AssertExpectations(t)