diff --git a/.gitignore b/.gitignore index 5dca467..c6d755d 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,17 @@ _testmain.go *.exe *.test *.prof + +# Created by https://www.gitignore.io/api/vim + +### Vim ### +# swap +[._]*.s[a-w][a-z] +[._]s[a-w][a-z] +# session +Session.vim +# temporary +.netrwhist +*~ +# auto-generated tag files +tags diff --git a/firehose.go b/firehose.go index 40fb9af..5b7a1ca 100644 --- a/firehose.go +++ b/firehose.go @@ -143,6 +143,6 @@ func (p *Producer) sendFirehoseRecords(args *gokinesis.RequestArgs) { } if conf.Debug.Verbose && p.getMsgCount()%100 == 0 { - log.Println("Messages sent so far: " + strconv.Itoa(p.getMsgCount())) + log.Println("Messages sent so far: " + strconv.FormatInt(p.getMsgCount(), 10)) } } diff --git a/kinesis.go b/kinesis.go index a31ff22..5410947 100644 --- a/kinesis.go +++ b/kinesis.go @@ -3,6 +3,7 @@ package kinetic import ( "errors" "sync" + "sync/atomic" "time" gokinesis "github.com/rewardStyle/go-kinesis" @@ -57,11 +58,8 @@ type kinesis struct { client gokinesis.KinesisClient - msgCount int - msgCountMu sync.Mutex - - errCount int - errCountMu sync.Mutex + msgCount int64 + errCount int64 } func (k *kinesis) init(stream, shard, shardIteratorType, accessKey, secretKey, region string) (*kinesis, error) { @@ -157,39 +155,27 @@ func (k *kinesis) refreshClient(accessKey, secretKey, region string) { } func (k *kinesis) decMsgCount() { - k.msgCountMu.Lock() - k.msgCount-- - k.msgCountMu.Unlock() + atomic.AddInt64(&k.msgCount, -1) } func (k *kinesis) incMsgCount() { - k.msgCountMu.Lock() - k.msgCount++ - k.msgCountMu.Unlock() + atomic.AddInt64(&k.msgCount, 1) } -func (k *kinesis) getMsgCount() int { - k.msgCountMu.Lock() - defer k.msgCountMu.Unlock() - return k.msgCount +func (k *kinesis) getMsgCount() int64 { + return atomic.LoadInt64(&k.msgCount) } func (k *kinesis) decErrCount() { - k.errCountMu.Lock() - k.errCount-- - k.errCountMu.Unlock() + atomic.AddInt64(&k.errCount, -1) } func (k *kinesis) incErrCount() { - k.errCountMu.Lock() - k.errCount++ - k.errCountMu.Unlock() + atomic.AddInt64(&k.errCount, 1) } -func (k *kinesis) getErrCount() int { - k.errCountMu.Lock() - defer k.errCountMu.Unlock() - return k.errCount +func (k *kinesis) getErrCount() int64 { + return atomic.LoadInt64(&k.errCount) } func getLock(sem chan bool) { diff --git a/listener.go b/listener.go index f29962e..ffa016e 100644 --- a/listener.go +++ b/listener.go @@ -206,11 +206,19 @@ retry: func (l *Listener) consume() { l.setConsuming(true) - counter := 0 - timer := time.Now() + readCounter := 0 + readTimer := time.Now() + + GsiCounter := 0 + GsiTimer := time.Now() for { - l.throttle(&counter, &timer) + if !l.shouldConsume() { + l.setConsuming(false) + break + } + + l.throttle(&readCounter, &readTimer) // args() will give us the shard iterator and type as well as the shard id response, err := l.client.GetRecords(l.args()) @@ -241,9 +249,7 @@ func (l *Listener) consume() { // If we received an error we should wait and attempt to // refresh the shard iterator again - <-time.After(1 * time.Second) - - log.Println("Retrying after waiting one second.") + l.throttle(&GsiCounter, &GsiTimer) goto refresh_iterator } @@ -259,11 +265,6 @@ func (l *Listener) consume() { } } } - - if !l.shouldConsume() { - l.setConsuming(false) - break - } } } diff --git a/listener_test.go b/listener_test.go index 8f3e3b6..a667276 100644 --- a/listener_test.go +++ b/listener_test.go @@ -49,7 +49,7 @@ func TestListenerError(t *testing.T) { // Let the error propagate <-time.After(1 * time.Second) - So(listener.errCount, ShouldNotEqual, 0) + So(listener.getErrCount(), ShouldNotEqual, 0) So(listener.IsListening(), ShouldEqual, true) }) }) diff --git a/producer.go b/producer.go index 36c757b..8beccba 100644 --- a/producer.go +++ b/producer.go @@ -218,7 +218,7 @@ stop: p.incMsgCount() if conf.Debug.Verbose && p.getMsgCount()%100 == 0 { - log.Println("Received message to send. Total messages received: " + strconv.Itoa(p.getMsgCount())) + log.Println("Received message to send. Total messages received: " + strconv.FormatInt(p.getMsgCount(), 10)) } kargs := p.args() @@ -320,7 +320,7 @@ func (p *Producer) sendRecords(args *gokinesis.RequestArgs) { } if conf.Debug.Verbose && p.getMsgCount()%100 == 0 { - log.Println("Messages sent so far: " + strconv.Itoa(p.getMsgCount())) + log.Println("Messages sent so far: " + strconv.FormatInt(p.getMsgCount(), 10)) } } diff --git a/producer_test.go b/producer_test.go index a017922..a9d60be 100644 --- a/producer_test.go +++ b/producer_test.go @@ -43,8 +43,8 @@ func TestProducerError(t *testing.T) { Convey("It should handle errors successfully", func() { producer.errors <- errors.New("All your base are belong to us!") // Let the error propagate - <-time.After(1 * time.Second) - So(producer.errCount, ShouldEqual, 1) + <-time.After(3 * time.Second) + So(producer.getErrCount(), ShouldEqual, 1) So(producer.IsProducing(), ShouldEqual, true) }) })