From 9bac15c5e50512b03de902f4fae1b0099d803480 Mon Sep 17 00:00:00 2001 From: Victor Schappert Date: Wed, 1 Feb 2023 05:24:33 -0800 Subject: [PATCH] Dynamically adapt query parallelism to noisy neighbors (issue#23) https://github.com/gogama/incite/issues/23 This change modifies the mgr to dynamically change the amount of attempted parallelism based on whether it is able to start chunks without receiving a service quota limited exceeded error from the CloudWatch Logs Insights Service's `StartQuery` API operation. As with the earlier commit `8416966a`, the hope is that this commit will make `QueryManager.Query()` more resilient. Previously, it would be very annoying if a chunk collected more than 10 (i.e. `maxTempStartingErrors`) temporary errors when the starter was trying to start it. This could occur if other users were heavily querying the CloudWatch Logs Insights service in the same AWS region, resulting in many "LimitExceededException" errors due to exceeding the service's query concurrency quota limit. When this would happen, it would kill the entire stream, resulting in the end user's entire `Query()` operation failing. With this change, a mgr's effective parallelism can temporarily go below the value given in `Config.Parallel`, but will increase back up as chunks are successfully started. It will never go above the value given in `Config.Parallel`. Temporary reductions are made in response to "LimitExceededException" errors and should cause the mgr to adjust its attempted chunk parallelism to the amount of free concurrency that is actually available, accounting for other "noisy neighbors" who are simultaneously using the Insights service, including other instances of the Incite QueryManager. This should lead to fewer "LimitExceededException" errors being received, reducing the probability of a `Query()` failure. --- errors.go | 9 ++++--- incite_test.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++++ mgr.go | 50 +++++++++++++++++++++++++++++++------ mgr_test.go | 2 +- starter.go | 6 ++--- starter_test.go | 29 ++++++++++++++-------- 6 files changed, 136 insertions(+), 26 deletions(-) diff --git a/errors.go b/errors.go index 9227092..383042f 100644 --- a/errors.go +++ b/errors.go @@ -203,8 +203,9 @@ const ( ) var ( - errClosing = errors.New("incite: closing") - errStopChunk = errors.New("incite: owning stream died, cancel chunk") - errRestartChunk = errors.New("incite: transient chunk failure, restart chunk") - errSplitChunk = errors.New("incite: chunk maxed, split chunk") + errClosing = errors.New("incite: closing") + errReduceParallel = errors.New("incite: exceeded concurrency limit, reduce parallelism") + errStopChunk = errors.New("incite: owning stream died, cancel chunk") + errRestartChunk = errors.New("incite: transient chunk failure, restart chunk") + errSplitChunk = errors.New("incite: chunk maxed, split chunk") ) diff --git a/incite_test.go b/incite_test.go index 8fe657c..db03952 100644 --- a/incite_test.go +++ b/incite_test.go @@ -1628,6 +1628,71 @@ var scenarios = []queryScenario{ }, }, + // This scenario involves two chunks, both of which ultimately + // succeed, but which both hit the query concurrency quota limit + // several times before being successfully started. + { + note: "MultiChunk.RepeatedConcurrencyLimitError", + QuerySpec: QuerySpec{ + Text: "truckin'", + Start: defaultStart, + End: defaultEnd, + Groups: []string{"/grateful/dead", "/american/beauty"}, + Chunk: defaultDuration / 2, + }, + chunks: []chunkPlan{ + { + startQueryInput: startQueryInput("truckin'", defaultStart, defaultStart.Add(defaultDuration/2), DefaultLimit, "/grateful/dead", "/american/beauty"), + startQuerySuccess: true, + startQueryErrs: []error{ + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "truckin', got my chips cashed in", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "like the do-dah man", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "together, more or less in line", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "just keep truckin' on", nil), + }, + pollOutputs: []chunkPollOutput{ + { + status: cloudwatchlogs.QueryStatusComplete, + stats: &Stats{0, 0, 0, 0, 0, 0, 0, 0}, + }, + }, + }, + { + startQueryInput: startQueryInput("truckin'", defaultStart.Add(defaultDuration/2), defaultEnd, DefaultLimit, "/grateful/dead", "/american/beauty"), + startQuerySuccess: true, + startQueryErrs: []error{ + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "arrows of neon and flashing marquees out on main street", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "chicago, new york, detroit", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "and it's all on the same street", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "your typical city involved in a typical day dream", nil), + cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "hang it up and see what tomorrow brings", nil), + }, + pollOutputs: []chunkPollOutput{ + { + results: []Result{ + {{"@ptr", "dallas, got a soft machine"}, {"@message", "houston, too close to new orleans"}}, + {{"@ptr", "new york's got the ways and means"}, {"@message", "but it just won't let you go, oh no"}}, + }, + status: cloudwatchlogs.QueryStatusComplete, + stats: &Stats{10, 2, 3, 0, 0, 0, 0, 0}, + }, + }, + }, + }, + results: []Result{ + {{"@ptr", "dallas, got a soft machine"}, {"@message", "houston, too close to new orleans"}}, + {{"@ptr", "new york's got the ways and means"}, {"@message", "but it just won't let you go, oh no"}}, + }, + stats: Stats{ + BytesScanned: 10, + RecordsMatched: 2, + RecordsScanned: 3, + RangeRequested: defaultDuration, + RangeStarted: defaultDuration, + RangeDone: defaultDuration, + }, + }, + // This is the last scenario, and it is meant to be a super test case that, // by itself, runs more chunks than the QueryManager can run in parallel. // @@ -1953,6 +2018,7 @@ var scenarios = []queryScenario{ // CHUNK 20 [split sub-chunk 1/4]. { startQueryInput: startQueryInput("vikings", defaultStart.Add(1_170*time.Minute), defaultStart.Add(1_170*time.Minute+450*time.Second), MaxLimit, "/frihed/februar/første"), + startQueryErrs: []error{cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "y'all have too many dang queries goin' on..., nil")}, startQuerySuccess: true, pollOutputs: []chunkPollOutput{ { diff --git a/mgr.go b/mgr.go index eaa7b74..5500ece 100644 --- a/mgr.go +++ b/mgr.go @@ -24,6 +24,7 @@ type mgr struct { close chan struct{} // Receives notification on Close() pq streamHeap // Written by Query, written by mgr loop goroutine ready ring.Ring // Chunks ready to start + parallel adapter // Parallelism: parallelMin <= parallel.value() <= Config.Parallel numReady int // Number of chunks ready to start numStarting int // Number of chunks handed off to starter numPolling int // Number of chunks handed off to poller @@ -50,6 +51,12 @@ type mgr struct { stopper *stopper } +const ( + parallelUpPerS = 0.5 + parallelDownStep = 1.0 + parallelMin = 1.0 +) + // NewQueryManager returns a new query manager with the given // configuration. func NewQueryManager(cfg Config) QueryManager { @@ -74,6 +81,15 @@ func NewQueryManager(cfg Config) QueryManager { m := &mgr{ Config: cfg, + parallel: &standardAdapter{ + val: float64(cfg.Parallel), + max: float64(cfg.Parallel), + upPerS: parallelUpPerS, + min: parallelMin, + downStep: parallelDownStep, + last: time.Now(), + }, + close: make(chan struct{}), query: make(chan *stream), @@ -239,9 +255,7 @@ func (m *mgr) loop() { return } - // TODO: We need to find a way to feed back start query errors - // from starter and dynamically change m.Parallel. - for m.numStarting+m.numPolling+m.numStopping < m.Parallel { + for m.numStarting+m.numPolling+m.numStopping < int(m.parallel.value()) { c := m.getReadyChunk() if c == nil { break @@ -314,14 +328,11 @@ func (m *mgr) handleChunk(c *chunk) { switch c.state { case starting: m.numStarting-- - c.started() - m.killStream(c) + m.handleStartingError(c) case started: m.numStarting-- m.numPolling++ - c.started() - c.state = polling - m.poll <- c + m.handleChunkStarted(c) case polling: m.numPolling-- m.handlePollingError(c) @@ -342,6 +353,29 @@ func (m *mgr) makeReady(c *chunk) { m.numReady++ } +func (m *mgr) handleStartingError(c *chunk) { + if c.err == errReduceParallel { + if m.parallel.decrease() { + m.logChunk(c, fmt.Sprintf("reduced parallelism to: %.4f", m.parallel.value()), "") + } + m.makeReady(c) + return + } + + c.started() + m.killStream(c) +} + +func (m *mgr) handleChunkStarted(c *chunk) { + c.started() + c.state = polling + m.poll <- c + + if m.parallel.increase() { + m.logChunk(c, fmt.Sprintf("increased parallelism to: %.4f", m.parallel.value()), "") + } +} + func (m *mgr) handlePollingError(c *chunk) { if c.err == errRestartChunk { c.chunkID += "R" diff --git a/mgr_test.go b/mgr_test.go index c78abbc..d9816a9 100644 --- a/mgr_test.go +++ b/mgr_test.go @@ -2019,7 +2019,7 @@ func TestQueryManager_Query(t *testing.T) { t.Run("Query Fails with Error if Chunk Exceeds Max Temporary Errors", func(t *testing.T) { text := "a query destined to exceed all maxima on temporary errors" groups := []string{"grpA", "grpB"} - expectedErr := cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "you been limited") + expectedErr := cwlErr(cloudwatchlogs.ErrCodeServiceUnavailableException, "we lacking service") testCases := []struct { name string diff --git a/starter.go b/starter.go index 8c73e6b..f23f8e4 100644 --- a/starter.go +++ b/starter.go @@ -64,9 +64,9 @@ func (s *starter) manipulate(c *chunk) outcome { case throttlingClass: return throttlingError case limitExceededClass: - // TODO: Pass this information up to mgr. - // TODO: Log. - fallthrough + s.m.logChunk(c, "exceeded query concurrency limit", "temporary error from CloudWatch Logs: "+err.Error()) + c.err = errReduceParallel + return finished case temporaryClass: return temporaryError default: diff --git a/starter_test.go b/starter_test.go index 61833a1..b3339ed 100644 --- a/starter_test.go +++ b/starter_test.go @@ -72,11 +72,12 @@ func TestStarter_manipulate(t *testing.T) { queryID := "eggs" testCases := []struct { - name string - setup func(t *testing.T, logger *mockLogger) - output *cloudwatchlogs.StartQueryOutput - err error - expected outcome + name string + setup func(t *testing.T, logger *mockLogger) + output *cloudwatchlogs.StartQueryOutput + err error + expectedErr error + expected outcome }{ { name: "Throttling Error", @@ -84,9 +85,14 @@ func TestStarter_manipulate(t *testing.T) { expected: throttlingError, }, { - name: "Limit Exceeded Error", - err: awserr.New(cloudwatchlogs.ErrCodeLimitExceededException, "too many queries!", nil), - expected: temporaryError, + name: "Limit Exceeded Error", + setup: func(t *testing.T, logger *mockLogger) { + logger.expectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", t.Name(), + "exceeded query concurrency limit", chunkID, text, start, end, "temporary error from CloudWatch Logs: LimitExceededException: too many queries!") + }, + err: awserr.New(cloudwatchlogs.ErrCodeLimitExceededException, "too many queries!", nil), + expectedErr: errReduceParallel, + expected: finished, }, { name: "Temporary Error", @@ -142,6 +148,9 @@ func TestStarter_manipulate(t *testing.T) { if testCase.setup != nil { testCase.setup(t, logger) } + if testCase.err != nil && testCase.expectedErr == nil { + testCase.expectedErr = &StartQueryError{text, start, end, testCase.err} + } c := &chunk{ stream: &stream{ QuerySpec: QuerySpec{ @@ -163,8 +172,8 @@ func TestStarter_manipulate(t *testing.T) { assert.Equal(t, testCase.expected, actual) assert.GreaterOrEqual(t, s.lastReq.Sub(before), time.Duration(0)) assert.GreaterOrEqual(t, after.Sub(s.lastReq), time.Duration(0)) - if testCase.err != nil { - assert.Equal(t, &StartQueryError{text, start, end, testCase.err}, c.err) + if testCase.expectedErr != nil { + assert.Equal(t, testCase.expectedErr, c.err) } actions.AssertExpectations(t) logger.AssertExpectations(t)