diff --git a/README.md b/README.md index 1ac8802..92e5096 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,8 @@ Incite! - CloudWatch Insights queries made (very) easy TODO list in order: -1. Finish other TODO test cases in incite.go. -2. Finish other TODO test cases in unmarshal_test.go. +1. Finish other TODO test cases in unmarshal_test.go. +2. Remove hint support as it overcomplicates to little benefit. 3. Finish any other lingering TODO or FIXME. 4. Audit the logging code to ensure we log enough worthwhile status info. 5. Write README. diff --git a/incite.go b/incite.go index 373abb0..3f189a6 100644 --- a/incite.go +++ b/incite.go @@ -1133,9 +1133,9 @@ func (s *stream) Read(r []Result) (int, error) { defer s.lock.Unlock() for { - n := s.read(r) - if n > 0 || s.err != nil || len(r) == 0 { - return n, s.err + n, err := s.read(r) + if n > 0 || err != nil || len(r) == 0 { + return n, err } s.more.Wait() } @@ -1148,13 +1148,13 @@ func (s *stream) GetStats() Stats { return s.stats } -func (s *stream) read(r []Result) int { +func (s *stream) read(r []Result) (int, error) { n := 0 for s.i < len(s.blocks) { block := s.blocks[s.i] for s.j < len(block) { if n == len(r) { - return n + return n, nil } r[n] = block[s.j] n++ @@ -1163,7 +1163,7 @@ func (s *stream) read(r []Result) int { s.i++ s.j = 0 } - return n + return n, s.err } func (s *stream) setErr(err error, lock bool, stats Stats) bool { diff --git a/incite_test.go b/incite_test.go index 7942a1f..11ba87b 100644 --- a/incite_test.go +++ b/incite_test.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "io" "sort" "strconv" "sync" @@ -1230,6 +1231,72 @@ func TestQueryManager_Query(t *testing.T) { }) } +func TestStream_Read(t *testing.T) { + t.Run("Buffer is Shorter than Available Results", func(t *testing.T) { + actions := newMockActions(t) + actions. + On("StartQueryWithContext", anyContext, anyStartQueryInput). + Return(&cloudwatchlogs.StartQueryOutput{QueryId: sp("foo")}, nil). + Once() + actions. + On("GetQueryResultsWithContext", anyContext, anyGetQueryResultsInput). + Return(&cloudwatchlogs.GetQueryResultsOutput{ + Status: sp(cloudwatchlogs.QueryStatusComplete), + Results: [][]*cloudwatchlogs.ResultField{ + {{Field: sp("@ptr"), Value: sp("1")}}, + {{Field: sp("@ptr"), Value: sp("2")}}, + }, + }, nil). + Once() + m := NewQueryManager(Config{ + Actions: actions, + }) + t.Cleanup(func() { + //err := m.Close() + //assert.NoError(t, err) + }) + s, err := m.Query(QuerySpec{ + Text: "bar", + Groups: []string{"baz"}, + Start: defaultStart, + End: defaultEnd, + }) + require.NotNil(t, s) + require.NoError(t, err) + + t.Run("Read into Length Zero Buffer Succeeds with Zero Results", func(t *testing.T) { + var p []Result + n, err := s.Read(p) + assert.Equal(t, 0, n) + assert.NoError(t, err) + }) + + t.Run("Read Into Length One Buffer Succeeds with First of Two Results", func(t *testing.T) { + p := make([]Result, 1) + n, err := s.Read(p) + assert.Equal(t, 1, n) + assert.NoError(t, err) + assert.Equal(t, []Result{{{"@ptr", "1"}}}, p) + }) + + t.Run("Read Into Length One Buffer Succeeds with Second of Two Results and EOF", func(t *testing.T) { + p := make([]Result, 1) + n, err := s.Read(p) + assert.Equal(t, 1, n) + assert.Same(t, io.EOF, err) + assert.Equal(t, []Result{{{"@ptr", "2"}}}, p) + }) + + t.Run("Read Into Length One Buffer Returns EOF", func(t *testing.T) { + p := make([]Result, 1) + n, err := s.Read(p) + assert.Equal(t, 0, n) + assert.Same(t, io.EOF, err) + assert.Equal(t, make([]Result, 1), p) + }) + }) +} + func TestScenariosSerial(t *testing.T) { actions := newMockActions(t) m := NewQueryManager(Config{ @@ -2435,5 +2502,6 @@ var ( anyContext = mock.MatchedBy(func(ctx context.Context) bool { return ctx != nil }) - anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput") + anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput") + anyGetQueryResultsInput = mock.AnythingOfType("*cloudwatchlogs.GetQueryResultsInput") )