From b374de7e4bf843346cb3279c85242d675648c557 Mon Sep 17 00:00:00 2001 From: Victor Schappert Date: Sun, 17 Oct 2021 16:54:51 -0700 Subject: [PATCH] Log when chunk finishes normally in all cases [issue#8] https://github.com/gogama/incite/issues/8 Previously it was only logging when a chunk finished the stream also finished, which worked in the simple case of 1 stream, 1 chunk but failed to log correctly for chunked streams. This commit fixes it. --- incite.go | 7 +-- incite_test.go | 137 +++++++++++++++++++++++++++++++++++++++++++++++++ log_test.go | 19 +++++-- 3 files changed, 157 insertions(+), 6 deletions(-) diff --git a/incite.go b/incite.go index 13efab4..ef54545 100644 --- a/incite.go +++ b/incite.go @@ -743,6 +743,7 @@ func (m *mgr) pollNextChunk() int { c.stream.stats.add(c.Stats) m.statsLock.Unlock() c.stream.lock.Unlock() + m.logChunk(c, "finished", "") } else { m.chunks.Prev().Link(r) } @@ -835,7 +836,7 @@ func (m *mgr) cancelChunk(c *chunk, err error) { func (m *mgr) cancelChunkMaybe(c *chunk, err error) { if err == io.EOF { - m.logChunk(c, "finished", "") + m.logChunk(c, "finished", "end of stream") return } if terminalErr, ok := err.(*TerminalQueryStatusError); ok { @@ -873,9 +874,9 @@ func (m *mgr) logChunk(c *chunk, msg, detail string) { id += "(" + c.queryID + ")" } if detail == "" { - m.Logger.Printf("incite: QueryManager(%s) %s chunk%s %q [%s..%s)", m.Name, msg, id, c.stream.Text, c.start, c.end) + m.Logger.Printf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", m.Name, msg, id, c.stream.Text, c.start, c.end) } else { - m.Logger.Printf("incite: QueryManager(%s) %s chunk%s %q [%s..%s): %s", m.Name, msg, id, c.stream.Text, c.start, c.end, detail) + m.Logger.Printf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", m.Name, msg, id, c.stream.Text, c.start, c.end, detail) } } diff --git a/incite_test.go b/incite_test.go index 284aad9..3710cd6 100644 --- a/incite_test.go +++ b/incite_test.go @@ -1438,6 +1438,143 @@ func TestQueryManager_Query(t *testing.T) { assert.Equal(t, starts2, starts) assert.Equal(t, gets2, gets) }) + + t.Run("Logger Receives Expected Messages", func(t *testing.T) { + t.Run("Successful Query", func(t *testing.T) { + // This test case creates a two-chunk query which runs into + // a few problems: + // - The first chunk encounters an error starting. + // - The second chunk fails the first time. + + // ARRANGE. + logger := newMockLogger(t) + logger. + ExpectPrintf("incite: QueryManager(%s) started", t.Name()). + Once() + logger. + ExpectPrintf("incite: QueryManager(%s) stopping...", t.Name()). + Maybe() + logger. + ExpectPrintf("incite: QueryManager(%s) stopped", t.Name()). + Maybe() + actions := newMockActions(t) + text := "a query in two chunks which generates logs" + // CHUNK 1. + queryIDChunk1 := "foo" + actions. + On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{ + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultStart.Add(time.Second)), + Limit: defaultLimit, + LogGroupNames: []*string{sp("grp")}, + QueryString: &text, + }). + Return(nil, cwlErr(cloudwatchlogs.ErrCodeServiceUnavailableException, "foo")). + Once() + logger. + ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", t.Name(), "temporary failure to start", "0", text, defaultStart, defaultStart.Add(time.Second), "ServiceUnavailableException: foo"). + Once() + actions. + On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{ + StartTime: startTimeSeconds(defaultStart), + EndTime: endTimeSeconds(defaultStart.Add(time.Second)), + Limit: defaultLimit, + LogGroupNames: []*string{sp("grp")}, + QueryString: &text, + }). + Return(&cloudwatchlogs.StartQueryOutput{QueryId: &queryIDChunk1}, nil). + Once() + logger. + ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", t.Name(), "started", "0(foo)"). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{QueryId: &queryIDChunk1}). + Return(&cloudwatchlogs.GetQueryResultsOutput{ + Results: [][]*cloudwatchlogs.ResultField{}, + Status: sp(cloudwatchlogs.QueryStatusComplete), + }, nil). + Once() + logger. + ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", t.Name(), "finished", "0(foo)"). + Once() + // CHUNK 2. + queryIDChunk2 := []string{"bar.try1", "bar.try2"} + actions. + On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{ + StartTime: startTimeSeconds(defaultStart.Add(time.Second)), + EndTime: endTimeSeconds(defaultStart.Add(2 * time.Second)), + Limit: defaultLimit, + LogGroupNames: []*string{sp("grp")}, + QueryString: &text, + }). + Return(&cloudwatchlogs.StartQueryOutput{QueryId: &queryIDChunk2[0]}, nil). + Once() + logger. + ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", t.Name(), "started", "1(bar.try1)"). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{QueryId: &queryIDChunk2[0]}). + Return(&cloudwatchlogs.GetQueryResultsOutput{ + Results: [][]*cloudwatchlogs.ResultField{}, + Status: sp(cloudwatchlogs.QueryStatusFailed), + }, nil). + Once() + actions. + On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{ + StartTime: startTimeSeconds(defaultStart.Add(time.Second)), + EndTime: endTimeSeconds(defaultStart.Add(2 * time.Second)), + Limit: defaultLimit, + LogGroupNames: []*string{sp("grp")}, + QueryString: &text, + }). + Return(&cloudwatchlogs.StartQueryOutput{QueryId: &queryIDChunk2[1]}, nil). + Once() + logger. + ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", t.Name(), "started", "1R(bar.try2)"). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{QueryId: &queryIDChunk2[1]}). + Return(&cloudwatchlogs.GetQueryResultsOutput{ + Results: [][]*cloudwatchlogs.ResultField{}, + Status: sp(cloudwatchlogs.QueryStatusComplete), + }, nil). + Once() + logger. + ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", t.Name(), "finished", "1R(bar.try2)", text, mock.Anything, mock.Anything, "end of stream"). + Once() + // START QUERY. + m := NewQueryManager(Config{ + Actions: actions, + RPS: lotsOfRPS, + Logger: logger, + Name: t.Name(), + }) + require.NotNil(t, m) + t.Cleanup(func() { + _ = m.Close() + }) + s, err := m.Query(QuerySpec{ + Text: text, + Groups: []string{"grp"}, + Start: defaultStart, + End: defaultStart.Add(2 * time.Second), + Chunk: time.Second, + }) + require.NoError(t, err) + require.NotNil(t, s) + + // ACT. + r, err1 := ReadAll(s) + err2 := m.Close() + + // ASSERT. + assert.NoError(t, err1) + assert.Empty(t, r) + assert.NoError(t, err2) + actions.AssertExpectations(t) + logger.AssertExpectations(t) + }) + }) } func TestStream_Read(t *testing.T) { diff --git a/log_test.go b/log_test.go index 23c44fd..6ca425b 100644 --- a/log_test.go +++ b/log_test.go @@ -39,9 +39,22 @@ func (m *mockLogger) Printf(format string, v ...interface{}) { m.Called(format, v) } -func (m *mockLogger) ExpectPrintf(format string) *mock.Call { +func (m *mockLogger) ExpectPrintf(format string, v ...interface{}) *mock.Call { nPct := strings.Count(format, "%") - return m.On("Printf", format, mock.MatchedBy(func(v []interface{}) bool { - return len(v) == nPct + if len(v) > nPct { + panic("more arguments given than percent signs in the format string!") + } + w := make([]interface{}, len(v)) + copy(w, v) + return m.On("Printf", format, mock.MatchedBy(func(x []interface{}) bool { + if len(x) != nPct { + return false + } + for i := range w { + if x[i] != w[i] && w[i] != mock.Anything { + return false + } + } + return true })).Return() }