Skip to content

Commit

Permalink
Log when chunk finishes normally in all cases [issue#8]
Browse files Browse the repository at this point in the history
#8

Previously it was only logging when a chunk finished the stream also
finished, which worked in the simple case of 1 stream, 1 chunk but
failed to log correctly for chunked streams. This commit fixes it.
  • Loading branch information
vcschapp committed Oct 18, 2021
1 parent 2cfb281 commit b374de7
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 6 deletions.
7 changes: 4 additions & 3 deletions incite.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,7 @@ func (m *mgr) pollNextChunk() int {
c.stream.stats.add(c.Stats)
m.statsLock.Unlock()
c.stream.lock.Unlock()
m.logChunk(c, "finished", "")
} else {
m.chunks.Prev().Link(r)
}
Expand Down Expand Up @@ -835,7 +836,7 @@ func (m *mgr) cancelChunk(c *chunk, err error) {

func (m *mgr) cancelChunkMaybe(c *chunk, err error) {
if err == io.EOF {
m.logChunk(c, "finished", "")
m.logChunk(c, "finished", "end of stream")
return
}
if terminalErr, ok := err.(*TerminalQueryStatusError); ok {
Expand Down Expand Up @@ -873,9 +874,9 @@ func (m *mgr) logChunk(c *chunk, msg, detail string) {
id += "(" + c.queryID + ")"
}
if detail == "" {
m.Logger.Printf("incite: QueryManager(%s) %s chunk%s %q [%s..%s)", m.Name, msg, id, c.stream.Text, c.start, c.end)
m.Logger.Printf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", m.Name, msg, id, c.stream.Text, c.start, c.end)
} else {
m.Logger.Printf("incite: QueryManager(%s) %s chunk%s %q [%s..%s): %s", m.Name, msg, id, c.stream.Text, c.start, c.end, detail)
m.Logger.Printf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", m.Name, msg, id, c.stream.Text, c.start, c.end, detail)
}
}

Expand Down
137 changes: 137 additions & 0 deletions incite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,143 @@ func TestQueryManager_Query(t *testing.T) {
assert.Equal(t, starts2, starts)
assert.Equal(t, gets2, gets)
})

t.Run("Logger Receives Expected Messages", func(t *testing.T) {
t.Run("Successful Query", func(t *testing.T) {
// This test case creates a two-chunk query which runs into
// a few problems:
// - The first chunk encounters an error starting.
// - The second chunk fails the first time.

// ARRANGE.
logger := newMockLogger(t)
logger.
ExpectPrintf("incite: QueryManager(%s) started", t.Name()).
Once()
logger.
ExpectPrintf("incite: QueryManager(%s) stopping...", t.Name()).
Maybe()
logger.
ExpectPrintf("incite: QueryManager(%s) stopped", t.Name()).
Maybe()
actions := newMockActions(t)
text := "a query in two chunks which generates logs"
// CHUNK 1.
queryIDChunk1 := "foo"
actions.
On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{
StartTime: startTimeSeconds(defaultStart),
EndTime: endTimeSeconds(defaultStart.Add(time.Second)),
Limit: defaultLimit,
LogGroupNames: []*string{sp("grp")},
QueryString: &text,
}).
Return(nil, cwlErr(cloudwatchlogs.ErrCodeServiceUnavailableException, "foo")).
Once()
logger.
ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", t.Name(), "temporary failure to start", "0", text, defaultStart, defaultStart.Add(time.Second), "ServiceUnavailableException: foo").
Once()
actions.
On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{
StartTime: startTimeSeconds(defaultStart),
EndTime: endTimeSeconds(defaultStart.Add(time.Second)),
Limit: defaultLimit,
LogGroupNames: []*string{sp("grp")},
QueryString: &text,
}).
Return(&cloudwatchlogs.StartQueryOutput{QueryId: &queryIDChunk1}, nil).
Once()
logger.
ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", t.Name(), "started", "0(foo)").
Once()
actions.
On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{QueryId: &queryIDChunk1}).
Return(&cloudwatchlogs.GetQueryResultsOutput{
Results: [][]*cloudwatchlogs.ResultField{},
Status: sp(cloudwatchlogs.QueryStatusComplete),
}, nil).
Once()
logger.
ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", t.Name(), "finished", "0(foo)").
Once()
// CHUNK 2.
queryIDChunk2 := []string{"bar.try1", "bar.try2"}
actions.
On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{
StartTime: startTimeSeconds(defaultStart.Add(time.Second)),
EndTime: endTimeSeconds(defaultStart.Add(2 * time.Second)),
Limit: defaultLimit,
LogGroupNames: []*string{sp("grp")},
QueryString: &text,
}).
Return(&cloudwatchlogs.StartQueryOutput{QueryId: &queryIDChunk2[0]}, nil).
Once()
logger.
ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", t.Name(), "started", "1(bar.try1)").
Once()
actions.
On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{QueryId: &queryIDChunk2[0]}).
Return(&cloudwatchlogs.GetQueryResultsOutput{
Results: [][]*cloudwatchlogs.ResultField{},
Status: sp(cloudwatchlogs.QueryStatusFailed),
}, nil).
Once()
actions.
On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{
StartTime: startTimeSeconds(defaultStart.Add(time.Second)),
EndTime: endTimeSeconds(defaultStart.Add(2 * time.Second)),
Limit: defaultLimit,
LogGroupNames: []*string{sp("grp")},
QueryString: &text,
}).
Return(&cloudwatchlogs.StartQueryOutput{QueryId: &queryIDChunk2[1]}, nil).
Once()
logger.
ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s)", t.Name(), "started", "1R(bar.try2)").
Once()
actions.
On("GetQueryResultsWithContext", anyContext, &cloudwatchlogs.GetQueryResultsInput{QueryId: &queryIDChunk2[1]}).
Return(&cloudwatchlogs.GetQueryResultsOutput{
Results: [][]*cloudwatchlogs.ResultField{},
Status: sp(cloudwatchlogs.QueryStatusComplete),
}, nil).
Once()
logger.
ExpectPrintf("incite: QueryManager(%s) %s chunk %s %q [%s..%s): %s", t.Name(), "finished", "1R(bar.try2)", text, mock.Anything, mock.Anything, "end of stream").
Once()
// START QUERY.
m := NewQueryManager(Config{
Actions: actions,
RPS: lotsOfRPS,
Logger: logger,
Name: t.Name(),
})
require.NotNil(t, m)
t.Cleanup(func() {
_ = m.Close()
})
s, err := m.Query(QuerySpec{
Text: text,
Groups: []string{"grp"},
Start: defaultStart,
End: defaultStart.Add(2 * time.Second),
Chunk: time.Second,
})
require.NoError(t, err)
require.NotNil(t, s)

// ACT.
r, err1 := ReadAll(s)
err2 := m.Close()

// ASSERT.
assert.NoError(t, err1)
assert.Empty(t, r)
assert.NoError(t, err2)
actions.AssertExpectations(t)
logger.AssertExpectations(t)
})
})
}

func TestStream_Read(t *testing.T) {
Expand Down
19 changes: 16 additions & 3 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,22 @@ func (m *mockLogger) Printf(format string, v ...interface{}) {
m.Called(format, v)
}

func (m *mockLogger) ExpectPrintf(format string) *mock.Call {
func (m *mockLogger) ExpectPrintf(format string, v ...interface{}) *mock.Call {
nPct := strings.Count(format, "%")
return m.On("Printf", format, mock.MatchedBy(func(v []interface{}) bool {
return len(v) == nPct
if len(v) > nPct {
panic("more arguments given than percent signs in the format string!")
}
w := make([]interface{}, len(v))
copy(w, v)
return m.On("Printf", format, mock.MatchedBy(func(x []interface{}) bool {
if len(x) != nPct {
return false
}
for i := range w {
if x[i] != w[i] && w[i] != mock.Anything {
return false
}
}
return true
})).Return()
}

0 comments on commit b374de7

Please sign in to comment.