Skip to content

Commit

Permalink
Merge pull request #6 from rewardStyle/feature/consume-produce
Browse files Browse the repository at this point in the history
Start produce and consume after reconfiguring endpoints
  • Loading branch information
fjordan committed Dec 9, 2015
2 parents 7bb94ee + 4283c1a commit 83f7bcd
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 25 deletions.
6 changes: 5 additions & 1 deletion listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ func (l *Listener) Init(stream, shard string) (*Listener, error) {
return l, err
}

func (l *Listener) newEndpoint(endpoint string) {
func (l *Listener) NewEndpoint(endpoint string) {
// Re-initialize kinesis client for testing
l.kinesis.client = l.kinesis.newClient(endpoint)
l.initShardIterator()

if !l.IsConsuming() {
go l.consume()
}
}

func (l *Listener) setListening(listening bool) {
Expand Down
17 changes: 5 additions & 12 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const testEndpoint = "http://127.0.0.1:4567"

func TestListenerStop(t *testing.T) {
listener, _ := new(Listener).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)
listener.newEndpoint(testEndpoint)
listener.NewEndpoint(testEndpoint)

Convey("Given a running listener", t, func() {
go listener.Listen(func(msg []byte, wg *sync.WaitGroup) {
Expand All @@ -36,7 +36,7 @@ func TestListenerStop(t *testing.T) {

func TestListenerError(t *testing.T) {
listener, _ := new(Listener).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)
listener.newEndpoint(testEndpoint)
listener.NewEndpoint(testEndpoint)

Convey("Given a running listener", t, func() {
go listener.Listen(func(msg []byte, wg *sync.WaitGroup) {
Expand All @@ -59,7 +59,7 @@ func TestListenerError(t *testing.T) {

func TestListenerMessage(t *testing.T) {
listener, _ := new(Listener).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)
listener.newEndpoint(testEndpoint)
listener.NewEndpoint(testEndpoint)

go listener.Listen(func(msg []byte, wg *sync.WaitGroup) {
wg.Done()
Expand All @@ -83,21 +83,14 @@ func TestRetrieveMessage(t *testing.T) {
listener, _ := new(Listener).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)
producer, _ := new(Producer).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)

listener.newEndpoint(testEndpoint)
producer.newEndpoint(testEndpoint)
listener.NewEndpoint(testEndpoint)
producer.NewEndpoint(testEndpoint)

for _, c := range cases {
Convey("Given a valid message", t, func() {
producer.Send(new(Message).Init(c.message, "test"))
if !producer.IsProducing() {
go producer.produce()
}

Convey("It should be passed on the queue without error", func() {
if !listener.IsConsuming() {
go listener.consume()
}

msg, err := listener.Retrieve()
if err != nil {
t.Fatalf(err.Error())
Expand Down
6 changes: 5 additions & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ func (p *Producer) Init(stream, shard string) (*Producer, error) {
return p, err
}

func (p *Producer) newEndpoint(endpoint string) {
func (p *Producer) NewEndpoint(endpoint string) {
// Re-initialize kinesis client for testing
p.kinesis.client = p.kinesis.newClient(endpoint)
p.initShardIterator()

if !p.IsProducing() {
go p.produce()
}
}

// Each shard can support up to 1,000 records per second for writes, up to a maximum total
Expand Down
15 changes: 4 additions & 11 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

func TestProducerStop(t *testing.T) {
producer, _ := new(Producer).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)
producer.newEndpoint(testEndpoint)
producer.NewEndpoint(testEndpoint)

Convey("Given a running producer", t, func() {
go producer.produce()
Expand All @@ -35,7 +35,7 @@ func TestProducerStop(t *testing.T) {

func TestProducerError(t *testing.T) {
producer, _ := new(Producer).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)
producer.newEndpoint(testEndpoint)
producer.NewEndpoint(testEndpoint)

Convey("Given a running producer", t, func() {
go producer.produce()
Expand All @@ -56,21 +56,14 @@ func TestProducerMessage(t *testing.T) {
listener, _ := new(Listener).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)
producer, _ := new(Producer).Init(conf.Kinesis.Stream, conf.Kinesis.Shard)

listener.newEndpoint(testEndpoint)
producer.newEndpoint(testEndpoint)
listener.NewEndpoint(testEndpoint)
producer.NewEndpoint(testEndpoint)

for _, c := range cases {
Convey("Given a valid message", t, func() {
producer.Send(new(Message).Init(c.message, "test"))
if !producer.IsProducing() {
go producer.produce()
}

Convey("It should be passed on the queue without error", func() {
if !listener.IsConsuming() {
go listener.consume()
}

msg, err := listener.Retrieve()
if err != nil {
t.Fatalf(err.Error())
Expand Down

0 comments on commit 83f7bcd

Please sign in to comment.