Skip to content

Commit

Permalink
Remove throttle function from KinesisReader as it is now redundant wi…
Browse files Browse the repository at this point in the history
…th the count/size rate limiter.
  • Loading branch information
Jason Yu committed Aug 30, 2017
1 parent 6a5bfe2 commit 0176c83
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 29 deletions.
28 changes: 0 additions & 28 deletions kinesis_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@ import (

const (
kinesisReaderMaxBatchSize = 10000
kinesisReaderDefaultConcurrency = 5
kinesisReaderDefaultTransactionCountLimit = 5
kinesisReaderDefaultTransmissionSizeLimit = 2000000
)

// kinesisReaderOptions a struct that holds all of the KinesisReader's configurable parameters.
type kinesisReaderOptions struct {
batchSize int // maximum records per GetRecordsRequest call
concurrency int // maximum number of concurrent GetRecord or GetRecords calls allowed
transactionCountLimit int // maximum transactions per second for GetRecords calls
transmissionSizeLimit int // maximum transmission size per second for GetRecords calls
shardIterator *ShardIterator // shard iterator for Kinesis GetRecords API calls
Expand All @@ -38,7 +36,6 @@ type kinesisReaderOptions struct {
func defaultKinesisReaderOptions() *kinesisReaderOptions {
return &kinesisReaderOptions{
batchSize: kinesisReaderMaxBatchSize,
concurrency: kinesisReaderDefaultConcurrency,
transactionCountLimit: kinesisReaderDefaultTransactionCountLimit,
transmissionSizeLimit: kinesisReaderDefaultTransmissionSizeLimit,
shardIterator: NewShardIterator(),
Expand All @@ -63,18 +60,6 @@ func KinesisReaderBatchSize(size int) KinesisReaderOptionsFn {
}
}

// KinesisReaderConcurrency is a functional option method for configuring the KinesisReader's
// concurrency.
func KinesisReaderConcurrency(count int) KinesisReaderOptionsFn {
return func(o *KinesisReader) error {
if count > 0 {
o.concurrency = count
return nil
}
return ErrInvalidConcurrency
}
}

// KinesisReaderTransactionCountLimit is a functional option method for configuring the
// KinesisReader's transaction count limit
func KinesisReaderTransactionCountLimit(count int) KinesisReaderOptionsFn {
Expand Down Expand Up @@ -139,7 +124,6 @@ type KinesisReader struct {
*LogHelper
stream string // name of AWS Kinesis Stream to stream from
shard string // shardID of AWS Kinesis Stream to stream from
throttleSem chan empty // channel used to throttle concurrent GetRecord calls
txnCountRateLimiter *rate.Limiter // rate limiter to limit the number of transactions per second
txSizeRateLimiter *rate.Limiter // rate limiter to limit the transmission size per seccond
nextShardIterator string // shardIterator to start with with GetRecord request
Expand All @@ -164,7 +148,6 @@ func NewKinesisReader(c *aws.Config, stream string, shard string, optionFns ...K
optionFn(kinesisReader)
}

kinesisReader.throttleSem = make(chan empty, kinesisReader.concurrency)
kinesisReader.txnCountRateLimiter = rate.NewLimiter(rate.Limit(kinesisReader.transactionCountLimit), 1)
kinesisReader.txSizeRateLimiter = rate.NewLimiter(rate.Limit(kinesisReader.transmissionSizeLimit), kinesisReader.transmissionSizeLimit)
kinesisReader.LogHelper = &LogHelper{
Expand Down Expand Up @@ -232,23 +215,12 @@ func (r *KinesisReader) setSequenceNumber(sequenceNumber string) error {
return nil
}

// Kinesis allows five read ops per second per shard.
// http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html
func (r *KinesisReader) throttle(sem chan empty) {
sem <- empty{}
time.AfterFunc(1*time.Second, func() {
<-sem
})
}

func (r *KinesisReader) getRecords(ctx context.Context, fn messageHandler, batchSize int) (count int, size int, err error) {
if err = r.ensureShardIterator(); err != nil {
r.LogError("Error calling ensureShardIterator(): ", err)
return count, size, err
}

r.throttle(r.throttleSem)

// We use the GetRecordsRequest method of creating requests to allow for registering custom handlers for better
// control over the API request.
var startReadTime time.Time
Expand Down
1 change: 0 additions & 1 deletion testexec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ func newKineticConsumer(k *kinetic.Kinetic, streamName string) *kinetic.Consumer
csc := kinetic.NewDefaultConsumerStatsCollector(registry)
r, err := kinetic.NewKinesisReader(k.Session.Config, streamName, shards[0],
kinetic.KinesisReaderBatchSize(10000),
kinetic.KinesisReaderConcurrency(5),
kinetic.KinesisReaderTransactionCountLimit(5),
kinetic.KinesisReaderTransmissionSizeLimit(2000000),
//kinetic.KinesisReaderShardIterator(),
Expand Down

0 comments on commit 0176c83

Please sign in to comment.