Skip to content

Commit

Permalink
Enable millisecond granularity for time stamps and durations
Browse files Browse the repository at this point in the history
Previously, the query `Start` and `End` fields were not allowed to be
sub-second, and that limitation carried over into the duration fields
`Chunk` and `SplitUntil`. However, since CloudWatch Logs' Insights
team has confirmed that they allow `Start` and `End` to have
millisecond granularity (despite innacurate documentation stating
seconds) we might as well support millisecond granularity, as it
makes Incite slightly more powerful for dense data extracts.
  • Loading branch information
vcschapp committed Jul 22, 2022
1 parent 7a80c01 commit 65ae9cb
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 94 deletions.
8 changes: 4 additions & 4 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ const (
nilContextMsg = "incite: nil context"

textBlankMsg = "incite: blank query text"
startSubSecondMsg = "incite: start has sub-second granularity"
endSubSecondMsg = "incite: end has sub-second granularity"
startSubMillisecondMsg = "incite: start has sub-millisecond granularity"
endSubMillisecondMsg = "incite: end has sub-millisecond granularity"
endNotBeforeStartMsg = "incite: end not before start"
noGroupsMsg = "incite: no log groups"
exceededMaxLimitMsg = "incite: exceeded MaxLimit"
chunkSubSecondMsg = "incite: chunk has sub-second granularity"
splitUntilSubSecondMsg = "incite: split-until has sub-second granularity"
chunkSubMillisecondMsg = "incite: chunk has sub-millisecond granularity"
splitUntilSubMillisecondMsg = "incite: split-until has sub-millisecond granularity"
splitUntilWithPreviewMsg = "incite: split-until incompatible with preview"
splitUntilWithoutMaxLimitMsg = "incite: split-until requires MaxLimit"

Expand Down
14 changes: 8 additions & 6 deletions incite.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ type QuerySpec struct {
// inclusive of Start itself.
//
// Start must be strictly before End, and must represent a whole
// number of seconds (it cannot have sub-second granularity).
// number of milliseconds (it cannot have sub-millisecond
// granularity).
Start time.Time

// End specifies the end of the time range to query, exclusive of
// End itself.
//
// End must be strictly after Start, and must represent a whole
// number of seconds (it cannot have sub-second granularity).
// number of milliseconds (it cannot have sub-millisecond
// granularity).
End time.Time

// Limit optionally specifies the maximum number of results to be
Expand All @@ -73,8 +75,8 @@ type QuerySpec struct {
// positive and less than the difference between End and Start, the
// query is broken into n chunks, where n is (End-Start)/Chunk,
// rounded up to the nearest integer value. If Chunk is positive, it
// must represent a whole number of seconds (cannot have sub-second
// granularity).
// must represent a whole number of milliseconds (cannot have
// sub-millisecond granularity).
//
// In a chunked query, each chunk is sent to the CloudWatch Logs
// service as a separate Insights query. This can help large queries
Expand Down Expand Up @@ -157,8 +159,8 @@ type QuerySpec struct {
//
// If SplitUntil is zero or negative, then splitting is disabled.
// If positive, then splitting is enabled and SplitUntil must
// represent a whole number of seconds (cannot have sub-second
// granularity).
// represent a whole number of milliseconds (cannot have
// sub-millisecond granularity).
//
// To use splitting, you must also set Limit to MaxLimit and
// Preview must be false.
Expand Down
18 changes: 9 additions & 9 deletions incite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,15 +865,15 @@ var scenarios = []queryScenario{
QuerySpec: QuerySpec{
Text: "fields maximum, items",
Start: defaultStart,
End: defaultStart.Add(2 * time.Second),
End: defaultStart.Add(time.Second),
Limit: MaxLimit,
Groups: []string{"/a/plethora/of/logs"},
SplitUntil: time.Second,
SplitUntil: 500 * time.Millisecond,
},
chunks: []chunkPlan{
// Original chunk, length two seconds.
{
startQueryInput: startQueryInput("fields maximum, items", defaultStart, defaultStart.Add(2*time.Second), MaxLimit, "/a/plethora/of/logs"),
startQueryInput: startQueryInput("fields maximum, items", defaultStart, defaultStart.Add(time.Second), MaxLimit, "/a/plethora/of/logs"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
Expand All @@ -885,7 +885,7 @@ var scenarios = []queryScenario{
},
// Split chunk 1/2.
{
startQueryInput: startQueryInput("fields maximum, items", defaultStart, defaultStart.Add(time.Second), MaxLimit, "/a/plethora/of/logs"),
startQueryInput: startQueryInput("fields maximum, items", defaultStart, defaultStart.Add(500*time.Millisecond), MaxLimit, "/a/plethora/of/logs"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
Expand All @@ -897,7 +897,7 @@ var scenarios = []queryScenario{
},
// Split chunk 2/2.
{
startQueryInput: startQueryInput("fields maximum, items", defaultStart.Add(time.Second), defaultStart.Add(2*time.Second), MaxLimit, "/a/plethora/of/logs"),
startQueryInput: startQueryInput("fields maximum, items", defaultStart.Add(500*time.Millisecond), defaultStart.Add(time.Second), MaxLimit, "/a/plethora/of/logs"),
startQuerySuccess: true,
pollOutputs: []chunkPollOutput{
{
Expand All @@ -912,10 +912,10 @@ var scenarios = []queryScenario{
BytesScanned: 6,
RecordsMatched: 6,
RecordsScanned: 6,
RangeRequested: 2 * time.Second,
RangeStarted: 2 * time.Second,
RangeDone: 2 * time.Second,
RangeMaxed: time.Second,
RangeRequested: time.Second,
RangeStarted: time.Second,
RangeDone: time.Second,
RangeMaxed: 500 * time.Millisecond,
},
},

Expand Down
28 changes: 14 additions & 14 deletions mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ func (m *mgr) Query(q QuerySpec) (s Stream, err error) {
}

q.Start = q.Start.UTC()
if hasSubSecond(q.Start) {
return nil, errors.New(startSubSecondMsg)
if hasSubMillisecond(q.Start) {
return nil, errors.New(startSubMillisecondMsg)
}
q.End = q.End.UTC()
if hasSubSecond(q.End) {
return nil, errors.New(endSubSecondMsg)
if hasSubMillisecond(q.End) {
return nil, errors.New(endSubMillisecondMsg)
}
if !q.End.After(q.Start) {
return nil, errors.New(endNotBeforeStartMsg)
Expand All @@ -155,8 +155,8 @@ func (m *mgr) Query(q QuerySpec) (s Stream, err error) {
d := q.End.Sub(q.Start)
if q.Chunk <= 0 {
q.Chunk = d
} else if hasSubSecondD(q.Chunk) {
return nil, errors.New(chunkSubSecondMsg)
} else if hasSubMillisecondD(q.Chunk) {
return nil, errors.New(chunkSubMillisecondMsg)
} else if q.Chunk > d {
q.Chunk = d
}
Expand All @@ -176,8 +176,8 @@ func (m *mgr) Query(q QuerySpec) (s Stream, err error) {

if q.SplitUntil <= 0 {
q.SplitUntil = q.Chunk
} else if hasSubSecondD(q.SplitUntil) {
return nil, errors.New(splitUntilSubSecondMsg)
} else if hasSubMillisecondD(q.SplitUntil) {
return nil, errors.New(splitUntilSubMillisecondMsg)
} else if q.Preview {
return nil, errors.New(splitUntilWithPreviewMsg)
} else if q.Limit < maxLimit {
Expand Down Expand Up @@ -438,18 +438,18 @@ func (m *mgr) stopChunk(c *chunk) {

// splitBits is the number of child chunks into which a parent chunk
// will be split, assuming the parent chunk range is at least splitBits
// seconds long. The minimum chunk size is one second, so a 4-second
// parent chunk will be split into four chunks, but a two-second child
// chunk will only be split into two child chunks.
// milliseconds long. The minimum chunk size is one millisecond, so a
// four-millisecond parent chunk will be split into four chunks, but a
// two-millisecond child chunk will only be split into two child chunks.
const splitBits = 4

func (m *mgr) splitChunk(c *chunk) {
frac := c.duration() / splitBits
if frac < c.stream.SplitUntil {
frac = c.stream.SplitUntil
} else if hasSubSecondD(frac) {
frac = frac + time.Second/2
frac = frac.Round(time.Second)
} else if hasSubMillisecondD(frac) {
frac = frac + time.Millisecond/2
frac = frac.Round(time.Millisecond)
}

children := make([]*chunk, 1, splitBits)
Expand Down
82 changes: 41 additions & 41 deletions mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,24 +676,24 @@ func TestQueryManager_Query(t *testing.T) {
err: noGroupsMsg,
},
{
name: "Start.SubSecond",
name: "Start.SubMillisecond",
QuerySpec: QuerySpec{
Text: "baz",
Start: time.Date(2021, 7, 15, 3, 37, 25, 123, time.UTC),
End: defaultEnd,
Groups: []string{"baz"},
},
err: startSubSecondMsg,
err: startSubMillisecondMsg,
},
{
name: "End.SubSecond",
name: "End.SubMillisecond",
QuerySpec: QuerySpec{
Text: "qux",
Start: defaultStart,
End: time.Date(2021, 7, 15, 3, 37, 25, 123, time.UTC),
Groups: []string{"qux", "jilly"},
},
err: endSubSecondMsg,
err: endSubMillisecondMsg,
},
{
name: "End.NotAfter.Start",
Expand All @@ -717,18 +717,18 @@ func TestQueryManager_Query(t *testing.T) {
err: exceededMaxLimitMsg,
},
{
name: "Chunk.SubSecond",
name: "Chunk.SubMillisecond",
QuerySpec: QuerySpec{
Text: "ham",
Start: defaultStart,
End: defaultEnd,
Groups: []string{"ham"},
Chunk: 15 * time.Millisecond,
Chunk: 15 * time.Microsecond,
},
err: chunkSubSecondMsg,
err: chunkSubMillisecondMsg,
},
{
name: "SplitUntil.SubSecond",
name: "SplitUntil.SubMillisecond",
QuerySpec: QuerySpec{
Text: "Whose woods are these?\nI think I know",
Start: defaultStart,
Expand All @@ -737,7 +737,7 @@ func TestQueryManager_Query(t *testing.T) {
Limit: MaxLimit - 1,
SplitUntil: time.Minute + 10*time.Microsecond,
},
err: splitUntilSubSecondMsg,
err: splitUntilSubMillisecondMsg,
},
{
name: "SplitUntil.With.Preview",
Expand Down Expand Up @@ -1627,76 +1627,76 @@ func TestQueryManager_Query(t *testing.T) {
}{
{
name: "Already Minimum Chunk Size",
splitUntil: time.Second,
chunks: []expectedChunk{{time.Second, 0, 2, nil}},
splitUntil: time.Millisecond,
chunks: []expectedChunk{{time.Millisecond, 0, 2, nil}},
},
{
name: "One Split in Half",
splitUntil: time.Second,
splitUntil: time.Millisecond,
chunks: []expectedChunk{
{
size: 2 * time.Second,
size: 2 * time.Millisecond,
start: 0,
end: 2,
chunks: []expectedChunk{
{time.Second, 0, 2, nil},
{time.Second, 2, 3, nil},
{time.Millisecond, 0, 2, nil},
{time.Millisecond, 2, 3, nil},
},
},
},
},
{
name: "One Split in Thirds",
splitUntil: time.Second,
splitUntil: time.Millisecond,
chunks: []expectedChunk{
{
size: 3 * time.Second,
size: 3 * time.Millisecond,
start: 0,
end: 2,
chunks: []expectedChunk{
{time.Second, 0, 1, nil},
{time.Second, 1, 3, nil},
{time.Second, 3, 5, nil},
{time.Millisecond, 0, 1, nil},
{time.Millisecond, 1, 3, nil},
{time.Millisecond, 3, 5, nil},
},
},
},
},
{
name: "One Split in Quarters",
splitUntil: time.Second,
splitUntil: time.Millisecond,
chunks: []expectedChunk{
{
size: 4 * time.Second,
size: 4 * time.Millisecond,
start: 0,
end: 2,
chunks: []expectedChunk{
{time.Second, 0, 1, nil},
{time.Second, 1, 3, nil},
{time.Second, 3, 5, nil},
{time.Second, 5, 6, nil},
{time.Millisecond, 0, 1, nil},
{time.Millisecond, 1, 3, nil},
{time.Millisecond, 3, 5, nil},
{time.Millisecond, 5, 6, nil},
},
},
},
},
{
name: "Odd Splits",
splitUntil: time.Second,
splitUntil: time.Millisecond,
chunks: []expectedChunk{
{
size: 5 * time.Second,
size: 5 * time.Millisecond,
start: 0,
end: 2,
chunks: []expectedChunk{
{2 * time.Second, 0, 1, nil},
{2 * time.Millisecond, 0, 1, nil},
{
size: 2 * time.Second,
size: 2 * time.Millisecond,
start: 1, end: 3,
chunks: []expectedChunk{
{time.Second, 1, 2, nil},
{time.Second, 2, 3, nil},
{time.Millisecond, 1, 2, nil},
{time.Millisecond, 2, 3, nil},
},
},
{time.Second, 3, 5, nil},
{time.Millisecond, 3, 5, nil},
},
},
},
Expand Down Expand Up @@ -1728,13 +1728,13 @@ func TestQueryManager_Query(t *testing.T) {
var f func(string, time.Duration, expectedChunk)
f = func(chunkID string, offset time.Duration, chunk expectedChunk) {
actions.
On("StartQueryWithContext", anyContext, &cloudwatchlogs.StartQueryInput{
QueryString: sp("foo"),
LogGroupNames: []*string{sp("bar")},
Limit: int64p(maxLimit),
StartTime: startTimeMilliseconds(defaultStart.Add(offset)),
EndTime: endTimeMilliseconds(defaultStart.Add(offset).Add(chunk.size)),
}).
On("StartQueryWithContext", anyContext, startQueryInput(
"foo",
defaultStart.Add(offset),
defaultStart.Add(offset).Add(chunk.size),
maxLimit,
"bar",
)).
Return(&cloudwatchlogs.StartQueryOutput{
QueryId: sp(chunkID),
}, nil).
Expand Down Expand Up @@ -1936,7 +1936,7 @@ func TestQueryManager_Query(t *testing.T) {
RPS: lotsOfRPS,
})
t.Cleanup(func() {
m.Close()
_ = m.Close()
})
s, err := m.Query(QuerySpec{
Text: text,
Expand Down
8 changes: 4 additions & 4 deletions poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,12 @@ func deleteResult(ptr string) Result {
var maxLimit int64 = MaxLimit

func (p *poller) splittable(c *chunk, n int) bool {
// Short circuit if the chunk isn't maxed expected.
// Short circuit if the chunk isn't maxed out.
if int64(n) < c.stream.Limit {
return false
}

// This chunk is maxed expected so record that.
// This chunk is maxed out so record that.
c.Stats.RangeMaxed += c.duration()

// Short circuit if splitting isn't required.
Expand All @@ -281,8 +281,8 @@ func (p *poller) splittable(c *chunk, n int) bool {
}

// At this point we know this chunk will be split. Thus, we should
// stop counting it as maxed expected. If the sub-chunks are later
// determined to be maxed expected that will be recorded later.
// stop counting it as maxed out. If the sub-chunks are later
// determined to be maxed out that will be recorded later.
c.Stats.RangeMaxed -= c.duration()
return true
}
Loading

0 comments on commit 65ae9cb

Please sign in to comment.