Skip to content

Commit

Permalink
Fix bug where stream read would prematurely return error
Browse files Browse the repository at this point in the history
  • Loading branch information
vcschapp committed Aug 14, 2021
1 parent 21d595d commit e56773b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ Incite! - CloudWatch Insights queries made (very) easy

TODO list in order:

1. Finish other TODO test cases in incite.go.
2. Finish other TODO test cases in unmarshal_test.go.
1. Finish other TODO test cases in unmarshal_test.go.
2. Remove hint support as it overcomplicates to little benefit.
3. Finish any other lingering TODO or FIXME.
4. Audit the logging code to ensure we log enough worthwhile status info.
5. Write README.
Expand Down
12 changes: 6 additions & 6 deletions incite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,9 +1133,9 @@ func (s *stream) Read(r []Result) (int, error) {
defer s.lock.Unlock()

for {
n := s.read(r)
if n > 0 || s.err != nil || len(r) == 0 {
return n, s.err
n, err := s.read(r)
if n > 0 || err != nil || len(r) == 0 {
return n, err
}
s.more.Wait()
}
Expand All @@ -1148,13 +1148,13 @@ func (s *stream) GetStats() Stats {
return s.stats
}

func (s *stream) read(r []Result) int {
func (s *stream) read(r []Result) (int, error) {
n := 0
for s.i < len(s.blocks) {
block := s.blocks[s.i]
for s.j < len(block) {
if n == len(r) {
return n
return n, nil
}
r[n] = block[s.j]
n++
Expand All @@ -1163,7 +1163,7 @@ func (s *stream) read(r []Result) int {
s.i++
s.j = 0
}
return n
return n, s.err
}

func (s *stream) setErr(err error, lock bool, stats Stats) bool {
Expand Down
70 changes: 69 additions & 1 deletion incite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"errors"
"fmt"
"io"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -1230,6 +1231,72 @@ func TestQueryManager_Query(t *testing.T) {
})
}

func TestStream_Read(t *testing.T) {
t.Run("Buffer is Shorter than Available Results", func(t *testing.T) {
actions := newMockActions(t)
actions.
On("StartQueryWithContext", anyContext, anyStartQueryInput).
Return(&cloudwatchlogs.StartQueryOutput{QueryId: sp("foo")}, nil).
Once()
actions.
On("GetQueryResultsWithContext", anyContext, anyGetQueryResultsInput).
Return(&cloudwatchlogs.GetQueryResultsOutput{
Status: sp(cloudwatchlogs.QueryStatusComplete),
Results: [][]*cloudwatchlogs.ResultField{
{{Field: sp("@ptr"), Value: sp("1")}},
{{Field: sp("@ptr"), Value: sp("2")}},
},
}, nil).
Once()
m := NewQueryManager(Config{
Actions: actions,
})
t.Cleanup(func() {
//err := m.Close()
//assert.NoError(t, err)
})
s, err := m.Query(QuerySpec{
Text: "bar",
Groups: []string{"baz"},
Start: defaultStart,
End: defaultEnd,
})
require.NotNil(t, s)
require.NoError(t, err)

t.Run("Read into Length Zero Buffer Succeeds with Zero Results", func(t *testing.T) {
var p []Result
n, err := s.Read(p)
assert.Equal(t, 0, n)
assert.NoError(t, err)
})

t.Run("Read Into Length One Buffer Succeeds with First of Two Results", func(t *testing.T) {
p := make([]Result, 1)
n, err := s.Read(p)
assert.Equal(t, 1, n)
assert.NoError(t, err)
assert.Equal(t, []Result{{{"@ptr", "1"}}}, p)
})

t.Run("Read Into Length One Buffer Succeeds with Second of Two Results and EOF", func(t *testing.T) {
p := make([]Result, 1)
n, err := s.Read(p)
assert.Equal(t, 1, n)
assert.Same(t, io.EOF, err)
assert.Equal(t, []Result{{{"@ptr", "2"}}}, p)
})

t.Run("Read Into Length One Buffer Returns EOF", func(t *testing.T) {
p := make([]Result, 1)
n, err := s.Read(p)
assert.Equal(t, 0, n)
assert.Same(t, io.EOF, err)
assert.Equal(t, make([]Result, 1), p)
})
})
}

func TestScenariosSerial(t *testing.T) {
actions := newMockActions(t)
m := NewQueryManager(Config{
Expand Down Expand Up @@ -2435,5 +2502,6 @@ var (
anyContext = mock.MatchedBy(func(ctx context.Context) bool {
return ctx != nil
})
anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput")
anyStartQueryInput = mock.AnythingOfType("*cloudwatchlogs.StartQueryInput")
anyGetQueryResultsInput = mock.AnythingOfType("*cloudwatchlogs.GetQueryResultsInput")
)

0 comments on commit e56773b

Please sign in to comment.