Skip to content

Commit

Permalink
Dynamically adapt query parallelism to noisy neighbors (issue#23)
Browse files Browse the repository at this point in the history
#23

This change modifies the mgr to dynamically change the amount of
attempted parallelism based on whether it is able to start chunks
without receiving a service quota limited exceeded error from the
CloudWatch Logs Insights Service's `StartQuery` API operation.

As with the earlier commit `8416966a`, the hope is that this commit
will make `QueryManager.Query()` more resilient. Previously, it would
be very annoying if a chunk collected more than 10 (i.e.
`maxTempStartingErrors`) temporary errors when the starter was trying
to start it. This could occur if other users were heavily querying the
CloudWatch Logs Insights service in the same AWS region, resulting in
many "LimitExceededException" errors due to exceeding the service's
query concurrency quota limit. When this would happen, it would kill
the entire stream, resulting in the end user's entire `Query()`
operation failing.

With this change, a mgr's effective parallelism can temporarily go
below the value given in `Config.Parallel`, but will increase back
up as chunks are successfully started. It will never go above the
value given in `Config.Parallel`. Temporary reductions are made in
response to "LimitExceededException" errors and should cause the mgr
to adjust its attempted chunk parallelism to the amount of free
concurrency that is actually available, accounting for other "noisy
neighbors" who are simultaneously using the Insights service,
including other instances of the Incite QueryManager. This should lead
to fewer "LimitExceededException" errors being received, reducing the
probability of a `Query()` failure.
  • Loading branch information
vcschapp committed Feb 15, 2023
1 parent 30ce985 commit 9bac15c
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 26 deletions.
9 changes: 5 additions & 4 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,9 @@ const (
)

var (
errClosing = errors.New("incite: closing")
errStopChunk = errors.New("incite: owning stream died, cancel chunk")
errRestartChunk = errors.New("incite: transient chunk failure, restart chunk")
errSplitChunk = errors.New("incite: chunk maxed, split chunk")
errClosing = errors.New("incite: closing")
errReduceParallel = errors.New("incite: exceeded concurrency limit, reduce parallelism")
errStopChunk = errors.New("incite: owning stream died, cancel chunk")
errRestartChunk = errors.New("incite: transient chunk failure, restart chunk")
errSplitChunk = errors.New("incite: chunk maxed, split chunk")
)
66 changes: 66 additions & 0 deletions incite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1628,6 +1628,71 @@ var scenarios = []queryScenario{
},
},

// This scenario involves two chunks, both of which ultimately
// succeed, but which both hit the query concurrency quota limit
// several times before being successfully started.
{
note: "MultiChunk.RepeatedConcurrencyLimitError",
QuerySpec: QuerySpec{
Text: "truckin'",
Start: defaultStart,
End: defaultEnd,
Groups: []string{"/grateful/dead", "/american/beauty"},
Chunk: defaultDuration / 2,
},
chunks: []chunkPlan{
{
startQueryInput: startQueryInput("truckin'", defaultStart, defaultStart.Add(defaultDuration/2), DefaultLimit, "/grateful/dead", "/american/beauty"),
startQuerySuccess: true,
startQueryErrs: []error{
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "truckin', got my chips cashed in", nil),
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "like the do-dah man", nil),
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "together, more or less in line", nil),
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "just keep truckin' on", nil),
},
pollOutputs: []chunkPollOutput{
{
status: cloudwatchlogs.QueryStatusComplete,
stats: &Stats{0, 0, 0, 0, 0, 0, 0, 0},
},
},
},
{
startQueryInput: startQueryInput("truckin'", defaultStart.Add(defaultDuration/2), defaultEnd, DefaultLimit, "/grateful/dead", "/american/beauty"),
startQuerySuccess: true,
startQueryErrs: []error{
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "arrows of neon and flashing marquees out on main street", nil),
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "chicago, new york, detroit", nil),
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "and it's all on the same street", nil),
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "your typical city involved in a typical day dream", nil),
cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "hang it up and see what tomorrow brings", nil),
},
pollOutputs: []chunkPollOutput{
{
results: []Result{
{{"@ptr", "dallas, got a soft machine"}, {"@message", "houston, too close to new orleans"}},
{{"@ptr", "new york's got the ways and means"}, {"@message", "but it just won't let you go, oh no"}},
},
status: cloudwatchlogs.QueryStatusComplete,
stats: &Stats{10, 2, 3, 0, 0, 0, 0, 0},
},
},
},
},
results: []Result{
{{"@ptr", "dallas, got a soft machine"}, {"@message", "houston, too close to new orleans"}},
{{"@ptr", "new york's got the ways and means"}, {"@message", "but it just won't let you go, oh no"}},
},
stats: Stats{
BytesScanned: 10,
RecordsMatched: 2,
RecordsScanned: 3,
RangeRequested: defaultDuration,
RangeStarted: defaultDuration,
RangeDone: defaultDuration,
},
},

// This is the last scenario, and it is meant to be a super test case that,
// by itself, runs more chunks than the QueryManager can run in parallel.
//
Expand Down Expand Up @@ -1953,6 +2018,7 @@ var scenarios = []queryScenario{
// CHUNK 20 [split sub-chunk 1/4].
{
startQueryInput: startQueryInput("vikings", defaultStart.Add(1_170*time.Minute), defaultStart.Add(1_170*time.Minute+450*time.Second), MaxLimit, "/frihed/februar/første"),
startQueryErrs: []error{cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "y'all have too many dang queries goin' on..., nil")},
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
Expand Down
50 changes: 42 additions & 8 deletions mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type mgr struct {
close chan struct{} // Receives notification on Close()
pq streamHeap // Written by Query, written by mgr loop goroutine
ready ring.Ring // Chunks ready to start
parallel adapter // Parallelism: parallelMin <= parallel.value() <= Config.Parallel
numReady int // Number of chunks ready to start
numStarting int // Number of chunks handed off to starter
numPolling int // Number of chunks handed off to poller
Expand All @@ -50,6 +51,12 @@ type mgr struct {
stopper *stopper
}

const (
parallelUpPerS = 0.5
parallelDownStep = 1.0
parallelMin = 1.0
)

// NewQueryManager returns a new query manager with the given
// configuration.
func NewQueryManager(cfg Config) QueryManager {
Expand All @@ -74,6 +81,15 @@ func NewQueryManager(cfg Config) QueryManager {
m := &mgr{
Config: cfg,

parallel: &standardAdapter{
val: float64(cfg.Parallel),
max: float64(cfg.Parallel),
upPerS: parallelUpPerS,
min: parallelMin,
downStep: parallelDownStep,
last: time.Now(),
},

close: make(chan struct{}),
query: make(chan *stream),

Expand Down Expand Up @@ -239,9 +255,7 @@ func (m *mgr) loop() {
return
}

// TODO: We need to find a way to feed back start query errors
// from starter and dynamically change m.Parallel.
for m.numStarting+m.numPolling+m.numStopping < m.Parallel {
for m.numStarting+m.numPolling+m.numStopping < int(m.parallel.value()) {
c := m.getReadyChunk()
if c == nil {
break
Expand Down Expand Up @@ -314,14 +328,11 @@ func (m *mgr) handleChunk(c *chunk) {
switch c.state {
case starting:
m.numStarting--
c.started()
m.killStream(c)
m.handleStartingError(c)
case started:
m.numStarting--
m.numPolling++
c.started()
c.state = polling
m.poll <- c
m.handleChunkStarted(c)
case polling:
m.numPolling--
m.handlePollingError(c)
Expand All @@ -342,6 +353,29 @@ func (m *mgr) makeReady(c *chunk) {
m.numReady++
}

func (m *mgr) handleStartingError(c *chunk) {
if c.err == errReduceParallel {
if m.parallel.decrease() {
m.logChunk(c, fmt.Sprintf("reduced parallelism to: %.4f", m.parallel.value()), "")
}
m.makeReady(c)
return
}

c.started()
m.killStream(c)
}

func (m *mgr) handleChunkStarted(c *chunk) {
c.started()
c.state = polling
m.poll <- c

if m.parallel.increase() {
m.logChunk(c, fmt.Sprintf("increased parallelism to: %.4f", m.parallel.value()), "")
}
}

func (m *mgr) handlePollingError(c *chunk) {
if c.err == errRestartChunk {
c.chunkID += "R"
Expand Down
2 changes: 1 addition & 1 deletion mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,7 +2019,7 @@ func TestQueryManager_Query(t *testing.T) {
t.Run("Query Fails with Error if Chunk Exceeds Max Temporary Errors", func(t *testing.T) {
text := "a query destined to exceed all maxima on temporary errors"
groups := []string{"grpA", "grpB"}
expectedErr := cwlErr(cloudwatchlogs.ErrCodeLimitExceededException, "you been limited")
expectedErr := cwlErr(cloudwatchlogs.ErrCodeServiceUnavailableException, "we lacking service")

testCases := []struct {
name string
Expand Down
6 changes: 3 additions & 3 deletions starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (s *starter) manipulate(c *chunk) outcome {
case throttlingClass:
return throttlingError
case limitExceededClass:
// TODO: Pass this information up to mgr.
// TODO: Log.
fallthrough
s.m.logChunk(c, "exceeded query concurrency limit", "temporary error from CloudWatch Logs: "+err.Error())
c.err = errReduceParallel
return finished
case temporaryClass:
return temporaryError
default:
Expand Down
29 changes: 19 additions & 10 deletions starter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,27 @@ func TestStarter_manipulate(t *testing.T) {
queryID := "eggs"

testCases := []struct {
name string
setup func(t *testing.T, logger *mockLogger)
output *cloudwatchlogs.StartQueryOutput
err error
expected outcome
name string
setup func(t *testing.T, logger *mockLogger)
output *cloudwatchlogs.StartQueryOutput
err error
expectedErr error
expected outcome
}{
{
name: "Throttling Error",
err: awserr.New("foo", "rate exceeded", nil),
expected: throttlingError,
},
{
name: "Limit Exceeded Error",
err: awserr.New(cloudwatchlogs.ErrCodeLimitExceededException, "too many queries!", nil),
expected: temporaryError,
name: "Limit Exceeded Error",
setup: func(t *testing.T, logger *mockLogger) {
logger.expectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", t.Name(),
"exceeded query concurrency limit", chunkID, text, start, end, "temporary error from CloudWatch Logs: LimitExceededException: too many queries!")
},
err: awserr.New(cloudwatchlogs.ErrCodeLimitExceededException, "too many queries!", nil),
expectedErr: errReduceParallel,
expected: finished,
},
{
name: "Temporary Error",
Expand Down Expand Up @@ -142,6 +148,9 @@ func TestStarter_manipulate(t *testing.T) {
if testCase.setup != nil {
testCase.setup(t, logger)
}
if testCase.err != nil && testCase.expectedErr == nil {
testCase.expectedErr = &StartQueryError{text, start, end, testCase.err}
}
c := &chunk{
stream: &stream{
QuerySpec: QuerySpec{
Expand All @@ -163,8 +172,8 @@ func TestStarter_manipulate(t *testing.T) {
assert.Equal(t, testCase.expected, actual)
assert.GreaterOrEqual(t, s.lastReq.Sub(before), time.Duration(0))
assert.GreaterOrEqual(t, after.Sub(s.lastReq), time.Duration(0))
if testCase.err != nil {
assert.Equal(t, &StartQueryError{text, start, end, testCase.err}, c.err)
if testCase.expectedErr != nil {
assert.Equal(t, testCase.expectedErr, c.err)
}
actions.AssertExpectations(t)
logger.AssertExpectations(t)
Expand Down

0 comments on commit 9bac15c

Please sign in to comment.