Skip to content

Commit

Permalink
Merge pull request #35 from rewardStyle/TryToSend
Browse files Browse the repository at this point in the history
A new Send function for when you want to lose data rather than crash.
  • Loading branch information
docmerlin authored Nov 30, 2016
2 parents eec34ce + a7eae51 commit 855787f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
15 changes: 15 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
ThroughputExceededError = errors.New("Configured AWS Kinesis throughput has been exceeded!")
KinesisFailureError = errors.New("AWS Kinesis internal failure.")
BadConcurrencyError = errors.New("Concurrency must be greater than zero.")
DroppedMessageError = errors.New("Channel is full, dropped message.")
)

// Producer keeps a queue of messages on a channel and continually attempts
Expand Down Expand Up @@ -282,6 +283,20 @@ func (p *Producer) Send(msg *Message) {
}()
}

// TryToSend tries to send the message, but if the channel is full it drops the message, and returns an error.
func (p *Producer) TryToSend(msg *Message) error {
// Add the terminating record indicator
if p.getProducerType() == firehoseType {
msg.SetValue(append(msg.Value(), truncatedRecordTerminator...))
}
select {
case p.messages <- msg:
return nil
default:
return DroppedMessageError
}
}

// If our payload is larger than allowed Kinesis will write as much as
// possible and fail the rest. We can then put them back on the queue
// to re-send
Expand Down
21 changes: 21 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kinetic

import (
"encoding/binary"
"errors"
"syscall"
"testing"
Expand Down Expand Up @@ -77,3 +78,23 @@ func TestProducerMessage(t *testing.T) {
listener.Close()
producer.Close()
}

func TestProducerTryToSend(t *testing.T) {
producer, _ := new(Producer).InitC("your-stream", "0", "LATEST", "accesskey", "secretkey", "us-east-1", 4)
producer.NewEndpoint(testEndpoint, "your-stream")
producer.Close() // This is to make the test deterministic. It stops producer from sending messages.
var totDropped int
for i := 0; i < 5000; i++ {
b := make([]byte, 2)
binary.LittleEndian.PutUint16(b, uint16(i))
if err := producer.TryToSend(new(Message).Init(b, "foo")); nil != err {
totDropped++
}
}
Convey("Given a producer", t, func() {
Convey("TryToSend should drop messages when the queue is full", func() {
So(totDropped, ShouldEqual, 1000)
So(len(producer.messages), ShouldEqual, 4000)
})
})
}

0 comments on commit 855787f

Please sign in to comment.