-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Slice Callback Function with Batch Interval Functionality #129
base: master
Are you sure you want to change the base?
Conversation
…callback function types
// for each record and checkpoints the progress of scan. | ||
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather not break the public API if possible. lemme put some thought into this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good : )
type ScanFunc func(*Record) error | ||
|
||
//ScanFuncBatch is the type of function called for read on a slice of records | ||
//from the steram. The Record argument contains the batch of the last unseen records |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
steram -> stream
Hi thanks for submitting the PR; I'm struggling a little with the amt of duplication between the two function; my fear is "drift" between the two makes this code path harder to maintain over time. Taking a step back I'm curious if something like this is possible:
so the Or potentially the inverse where we add a convenience function
|
@@ -138,7 +148,49 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error { | |||
return <-errc | |||
} | |||
|
|||
// ScanShard loops over records on a specific shard, calls the callback func | |||
//ScanBatch performs scan function using intereval batching for invoking callback function | |||
func (c *Consumer) ScanBatch(ctx context.Context, fn ScanFuncBatch) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be ScanBatchFunc
time_start := time.Now() | ||
for { | ||
|
||
if time_elapsed := time.Since(time_start); int64(time_elapsed.Seconds()) <= c.batchInterval { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this doing? is it different from the scan interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Effectively just a poorer duplication of scan interval, when I wrote this initially I overlooked that component of the code... thank you for pointing this out
}() | ||
scanTicker := time.NewTicker(c.scanInterval) | ||
defer scanTicker.Stop() | ||
time_start := time.Now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you run a linter on this code you'll see a bunch of warnings around CamelCase vs snake_case
Addressing issue #128
Added additional functionality for batch interval messages with a separate callback function that returns a slice of kinesis.Record than a single reference to a consumer.Record....
Consumer.go
, DEFAULT=0To use batch interval consumption, just specify your callback function as
func([]*kinesis.Record) error
and pass it as a parameter toScanBatch