Skip to content

Commit

Permalink
Merge pull request #29 from rewardStyle/fix_refresh_iterator_rate_limit
Browse files Browse the repository at this point in the history
Fix refresh iterator rate limit
  • Loading branch information
fjordan authored Nov 22, 2016
2 parents 3c4f36f + 7a28a70 commit ed63d5f
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 42 deletions.
14 changes: 14 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,17 @@ _testmain.go
*.exe
*.test
*.prof

# Created by https://www.gitignore.io/api/vim

### Vim ###
# swap
[._]*.s[a-w][a-z]
[._]s[a-w][a-z]
# session
Session.vim
# temporary
.netrwhist
*~
# auto-generated tag files
tags
2 changes: 1 addition & 1 deletion firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,6 @@ func (p *Producer) sendFirehoseRecords(args *gokinesis.RequestArgs) {
}

if conf.Debug.Verbose && p.getMsgCount()%100 == 0 {
log.Println("Messages sent so far: " + strconv.Itoa(p.getMsgCount()))
log.Println("Messages sent so far: " + strconv.FormatInt(p.getMsgCount(), 10))
}
}
36 changes: 11 additions & 25 deletions kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kinetic
import (
"errors"
"sync"
"sync/atomic"
"time"

gokinesis "github.com/rewardStyle/go-kinesis"
Expand Down Expand Up @@ -57,11 +58,8 @@ type kinesis struct {

client gokinesis.KinesisClient

msgCount int
msgCountMu sync.Mutex

errCount int
errCountMu sync.Mutex
msgCount int64
errCount int64
}

func (k *kinesis) init(stream, shard, shardIteratorType, accessKey, secretKey, region string) (*kinesis, error) {
Expand Down Expand Up @@ -157,39 +155,27 @@ func (k *kinesis) refreshClient(accessKey, secretKey, region string) {
}

func (k *kinesis) decMsgCount() {
k.msgCountMu.Lock()
k.msgCount--
k.msgCountMu.Unlock()
atomic.AddInt64(&k.msgCount, -1)
}

func (k *kinesis) incMsgCount() {
k.msgCountMu.Lock()
k.msgCount++
k.msgCountMu.Unlock()
atomic.AddInt64(&k.msgCount, 1)
}

func (k *kinesis) getMsgCount() int {
k.msgCountMu.Lock()
defer k.msgCountMu.Unlock()
return k.msgCount
func (k *kinesis) getMsgCount() int64 {
return atomic.LoadInt64(&k.msgCount)
}

func (k *kinesis) decErrCount() {
k.errCountMu.Lock()
k.errCount--
k.errCountMu.Unlock()
atomic.AddInt64(&k.errCount, -1)
}

func (k *kinesis) incErrCount() {
k.errCountMu.Lock()
k.errCount++
k.errCountMu.Unlock()
atomic.AddInt64(&k.errCount, 1)
}

func (k *kinesis) getErrCount() int {
k.errCountMu.Lock()
defer k.errCountMu.Unlock()
return k.errCount
func (k *kinesis) getErrCount() int64 {
return atomic.LoadInt64(&k.errCount)
}

func getLock(sem chan bool) {
Expand Down
23 changes: 12 additions & 11 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,19 @@ retry:
func (l *Listener) consume() {
l.setConsuming(true)

counter := 0
timer := time.Now()
readCounter := 0
readTimer := time.Now()

GsiCounter := 0
GsiTimer := time.Now()

for {
l.throttle(&counter, &timer)
if !l.shouldConsume() {
l.setConsuming(false)
break
}

l.throttle(&readCounter, &readTimer)

// args() will give us the shard iterator and type as well as the shard id
response, err := l.client.GetRecords(l.args())
Expand Down Expand Up @@ -241,9 +249,7 @@ func (l *Listener) consume() {

// If we received an error we should wait and attempt to
// refresh the shard iterator again
<-time.After(1 * time.Second)

log.Println("Retrying after waiting one second.")
l.throttle(&GsiCounter, &GsiTimer)

goto refresh_iterator
}
Expand All @@ -259,11 +265,6 @@ func (l *Listener) consume() {
}
}
}

if !l.shouldConsume() {
l.setConsuming(false)
break
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestListenerError(t *testing.T) {
// Let the error propagate
<-time.After(1 * time.Second)

So(listener.errCount, ShouldNotEqual, 0)
So(listener.getErrCount(), ShouldNotEqual, 0)
So(listener.IsListening(), ShouldEqual, true)
})
})
Expand Down
4 changes: 2 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ stop:
p.incMsgCount()

if conf.Debug.Verbose && p.getMsgCount()%100 == 0 {
log.Println("Received message to send. Total messages received: " + strconv.Itoa(p.getMsgCount()))
log.Println("Received message to send. Total messages received: " + strconv.FormatInt(p.getMsgCount(), 10))
}

kargs := p.args()
Expand Down Expand Up @@ -320,7 +320,7 @@ func (p *Producer) sendRecords(args *gokinesis.RequestArgs) {
}

if conf.Debug.Verbose && p.getMsgCount()%100 == 0 {
log.Println("Messages sent so far: " + strconv.Itoa(p.getMsgCount()))
log.Println("Messages sent so far: " + strconv.FormatInt(p.getMsgCount(), 10))
}
}

Expand Down
4 changes: 2 additions & 2 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func TestProducerError(t *testing.T) {
Convey("It should handle errors successfully", func() {
producer.errors <- errors.New("All your base are belong to us!")
// Let the error propagate
<-time.After(1 * time.Second)
So(producer.errCount, ShouldEqual, 1)
<-time.After(3 * time.Second)
So(producer.getErrCount(), ShouldEqual, 1)
So(producer.IsProducing(), ShouldEqual, true)
})
})
Expand Down

0 comments on commit ed63d5f

Please sign in to comment.