Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slice Callback Function with Batch Interval Functionality #129

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

epociask
Copy link

@epociask epociask commented Dec 17, 2020

Addressing issue #128

Added additional functionality for batch interval messages with a separate callback function that returns a slice of kinesis.Record than a single reference to a consumer.Record....

  • Added WithBatchSecondInterval function in Options to assign second interval for reading
  • Added ScanShardWithIntervalBatching function to send a slice of records upon a conditional evaluation of elapsed time
  • Added batchInterval data member to Consumer struct in Consumer.go, DEFAULT=0

To use batch interval consumption, just specify your callback function as func([]*kinesis.Record) error and pass it as a parameter to ScanBatch

// for each record and checkpoints the progress of scan.
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not break the public API if possible. lemme put some thought into this one

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good : )

type ScanFunc func(*Record) error

//ScanFuncBatch is the type of function called for read on a slice of records
//from the steram. The Record argument contains the batch of the last unseen records
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

steram -> stream

@harlow
Copy link
Owner

harlow commented Dec 20, 2020

Hi thanks for submitting the PR; I'm struggling a little with the amt of duplication between the two function; my fear is "drift" between the two makes this code path harder to maintain over time.

Taking a step back I'm curious if something like this is possible:

// shape of the code
func Scan(ctx, fn ScanFn)
	items = ScanBatch(ctx, func(items) {
		for each items 
			fn(item)
	})

func ScanBatch(ctx, fn ScanBatchFn)
       // 

so the Scan func passes off to the ScanBatch func (which would eliminate the duplication of the two loops). I'll play around with this a little and see if anything emerges.

Or potentially the inverse where we add a convenience function ScanBatch which uses Scan

func Scan(ctx, fn ScanFn)
	// original function

func ScanBatch(ctx, fn ScanBatchFn)
       c.Scan()
       // do the batching here




@@ -138,7 +148,49 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
return <-errc
}

// ScanShard loops over records on a specific shard, calls the callback func
//ScanBatch performs scan function using intereval batching for invoking callback function
func (c *Consumer) ScanBatch(ctx context.Context, fn ScanFuncBatch) error {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be ScanBatchFunc

time_start := time.Now()
for {

if time_elapsed := time.Since(time_start); int64(time_elapsed.Seconds()) <= c.batchInterval {
Copy link
Owner

@harlow harlow Dec 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this doing? is it different from the scan interval?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Effectively just a poorer duplication of scan interval, when I wrote this initially I overlooked that component of the code... thank you for pointing this out

}()
scanTicker := time.NewTicker(c.scanInterval)
defer scanTicker.Stop()
time_start := time.Now()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you run a linter on this code you'll see a bunch of warnings around CamelCase vs snake_case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants