Skip to content

Commit

Permalink
pubsub/all: add a constructor arg to disable dynamic batching (#1609)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangent authored and jba committed Mar 20, 2019
1 parent c6f52ab commit 08a6b72
Show file tree
Hide file tree
Showing 32 changed files with 701 additions and 450 deletions.
154 changes: 154 additions & 0 deletions go.sum

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions pubsub/acks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestAckTriggersDriverSendAcksForOneMessage(t *testing.T) {
return nil
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
m2, err := sub.Receive(ctx)
if err != nil {
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestMultipleAcksCanGoIntoASingleBatch(t *testing.T) {
return nil
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)

// Receive and ack the messages concurrently.
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestTooManyAcksForASingleBatchGoIntoMultipleBatches(t *testing.T) {
return nil
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)

// Receive and ack the messages concurrently.
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestAckDoesNotBlock(t *testing.T) {
return nil
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
defer cancel()
mr, err := sub.Receive(ctx)
Expand All @@ -214,7 +214,7 @@ func TestDoubleAckCausesPanic(t *testing.T) {
return nil
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
mr, err := sub.Receive(ctx)
if err != nil {
Expand All @@ -241,7 +241,7 @@ func TestConcurrentDoubleAckCausesPanic(t *testing.T) {
return nil
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
mr, err := sub.Receive(ctx)
if err != nil {
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestSubShutdownCanBeCanceledEvenWithHangingSendAcks(t *testing.T) {
return ctx.Err()
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
mr, err := sub.Receive(ctx)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestReceiveReturnsErrorFromSendAcks(t *testing.T) {
return serr
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
m, err := sub.Receive(ctx)
if err != nil {
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestReceiveReturnsAckErrorOnNoMoreMessages(t *testing.T) {
return serr
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
m, err := sub.Receive(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/awssnssqs/awssnssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ type SubscriptionOptions struct{}
// The queue is assumed to be subscribed to some SNS topic, though there is no
// check for this.
func OpenSubscription(ctx context.Context, client *sqs.SQS, qURL string, opts *SubscriptionOptions) *pubsub.Subscription {
return pubsub.NewSubscription(openSubscription(ctx, client, qURL), nil)
return pubsub.NewSubscription(openSubscription(ctx, client, qURL), true, nil)
}

// openSubscription returns a driver.Subscription.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/awssnssqs/awssnssqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func BenchmarkAwsPubSub(b *testing.B) {
b.Fatal(err)
}
defer cleanup2()
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
drivertest.RunBenchmarks(b, topic, sub)
}
Expand Down
98 changes: 49 additions & 49 deletions pubsub/awssnssqs/testdata/TestConformance/TestAs/aws_test.replay

Large diffs are not rendered by default.

Large diffs are not rendered by default.

106 changes: 53 additions & 53 deletions pubsub/awssnssqs/testdata/TestConformance/TestMetadata.replay

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
},
"Entries": [
{
"ID": "7e73b4bc5185ac58",
"ID": "aa08e8c9bc0ec258",
"Request": {
"Method": "POST",
"URL": "https://sqs.us-east-2.amazonaws.com/",
Expand Down Expand Up @@ -73,13 +73,13 @@
"text/xml"
],
"Date": [
"Wed, 06 Mar 2019 04:32:00 GMT"
"Wed, 20 Mar 2019 15:59:09 GMT"
],
"X-Amzn-Requestid": [
"048ad00d-a3bd-542d-9cf4-84f4ec3d60d4"
"b2ef7c2b-390c-5ca4-bfb9-29275c810e65"
]
},
"Body": "PD94bWwgdmVyc2lvbj0iMS4wIj8+PEVycm9yUmVzcG9uc2UgeG1sbnM9Imh0dHA6Ly9xdWV1ZS5hbWF6b25hd3MuY29tL2RvYy8yMDEyLTExLTA1LyI+PEVycm9yPjxUeXBlPlNlbmRlcjwvVHlwZT48Q29kZT5JbnZhbGlkQWRkcmVzczwvQ29kZT48TWVzc2FnZT5UaGUgYWRkcmVzcyBub25leGlzdGVudC1zdWJzY3JpcHRpb24gaXMgbm90IHZhbGlkIGZvciB0aGlzIGVuZHBvaW50LjwvTWVzc2FnZT48RGV0YWlsLz48L0Vycm9yPjxSZXF1ZXN0SWQ+MDQ4YWQwMGQtYTNiZC01NDJkLTljZjQtODRmNGVjM2Q2MGQ0PC9SZXF1ZXN0SWQ+PC9FcnJvclJlc3BvbnNlPg=="
"Body": "PD94bWwgdmVyc2lvbj0iMS4wIj8+PEVycm9yUmVzcG9uc2UgeG1sbnM9Imh0dHA6Ly9xdWV1ZS5hbWF6b25hd3MuY29tL2RvYy8yMDEyLTExLTA1LyI+PEVycm9yPjxUeXBlPlNlbmRlcjwvVHlwZT48Q29kZT5JbnZhbGlkQWRkcmVzczwvQ29kZT48TWVzc2FnZT5UaGUgYWRkcmVzcyBub25leGlzdGVudC1zdWJzY3JpcHRpb24gaXMgbm90IHZhbGlkIGZvciB0aGlzIGVuZHBvaW50LjwvTWVzc2FnZT48RGV0YWlsLz48L0Vycm9yPjxSZXF1ZXN0SWQ+YjJlZjdjMmItMzkwYy01Y2E0LWJmYjktMjkyNzVjODEwZTY1PC9SZXF1ZXN0SWQ+PC9FcnJvclJlc3BvbnNlPg=="
}
}
]
Expand Down

Large diffs are not rendered by default.

194 changes: 120 additions & 74 deletions pubsub/awssnssqs/testdata/TestConformance/TestSendReceive.replay

Large diffs are not rendered by default.

310 changes: 178 additions & 132 deletions pubsub/awssnssqs/testdata/TestConformance/TestSendReceiveTwo.replay

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pubsub/azuresb/azuresb.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func OpenSubscription(ctx context.Context, parentNamespace *servicebus.Namespace
if err != nil {
return nil, err
}
return pubsub.NewSubscription(ds, nil), nil
return pubsub.NewSubscription(ds, true, nil), nil
}

// openSubscription returns a driver.Subscription.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func runBenchmark(t *testing.T, description string, numGoRoutines int, receivePr
}

fake := &fakeSub{msgs: msgs, profile: receiveProfile}
sub := newSubscription(fake, nil)
sub := newSubscription(fake, true, nil)

// Configure our output in a hook called whenever ReceiveBatch returns.

Expand Down
8 changes: 4 additions & 4 deletions pubsub/drivertest/drivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func testNonExistentSubscriptionSucceedsOnOpenButFailsOnReceive(t *testing.T, ne
if err != nil {
t.Fatalf("failed to make non-existent subscription: %v", err)
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, false, nil)
defer sub.Shutdown(ctx)

_, err = sub.Receive(ctx)
Expand Down Expand Up @@ -272,7 +272,7 @@ func testSendReceiveTwo(t *testing.T, newHarness HarnessMaker) {
t.Fatal(err)
}
defer cleanup()
s := pubsub.NewSubscription(ds, nil)
s := pubsub.NewSubscription(ds, false, nil)
defer s.Shutdown(ctx)
ss = append(ss, s)
}
Expand Down Expand Up @@ -509,7 +509,7 @@ func makePair(ctx context.Context, h Harness, testName string) (*pubsub.Topic, *
return nil, nil, nil, err
}
t := pubsub.NewTopic(dt, nil)
s := pubsub.NewSubscription(ds, nil)
s := pubsub.NewSubscription(ds, false, nil)
cleanup := func() {
topicCleanup()
subCleanup()
Expand Down Expand Up @@ -566,7 +566,7 @@ func testAs(t *testing.T, newHarness HarnessMaker, st AsTest) {
if err != nil {
t.Fatal(err)
}
sub = pubsub.NewSubscription(ds, nil)
sub = pubsub.NewSubscription(ds, false, nil)
defer sub.Shutdown(ctx)
_, subErr := sub.Receive(ctx)
if subErr == nil {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/gcppubsub/gcppubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ type SubscriptionOptions struct{}
// documentation for an example.
func OpenSubscription(client *raw.SubscriberClient, proj gcp.ProjectID, subscriptionName string, opts *SubscriptionOptions) *pubsub.Subscription {
ds := openSubscription(client, proj, subscriptionName)
return pubsub.NewSubscription(ds, nil)
return pubsub.NewSubscription(ds, true, nil)
}

// openSubscription returns a driver.Subscription.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/gcppubsub/gcppubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func BenchmarkGcpPubSub(b *testing.B) {
b.Fatal(err)
}
defer cleanup2()
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)

drivertest.RunBenchmarks(b, topic, sub)
Expand Down
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestAs/gcp_test.replay
Binary file not shown.
Binary file not shown.
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestMetadata.replay
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestSendReceive.replay
Binary file not shown.
Binary file modified pubsub/gcppubsub/testdata/TestConformance/TestSendReceiveTwo.replay
Binary file not shown.
2 changes: 1 addition & 1 deletion pubsub/mempubsub/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func NewSubscription(top *pubsub.Topic, ackDeadline time.Duration) *pubsub.Subsc
if !top.As(&t) {
panic("mempubsub: NewSubscription passed a Topic not from mempubsub")
}
return pubsub.NewSubscription(newSubscription(t, ackDeadline), nil)
return pubsub.NewSubscription(newSubscription(t, ackDeadline), true, nil)
}

func newSubscription(t *topic, ackDeadline time.Duration) *subscription {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/natspubsub/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func CreateSubscription(nc *nats.Conn, subscriptionName string, ackFunc func(),
if err != nil {
return nil, err
}
return pubsub.NewSubscription(ds, nil), nil
return pubsub.NewSubscription(ds, true, nil), nil
}

func createSubscription(nc *nats.Conn, subscriptionName string, ackFunc func()) (driver.Subscription, error) {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/natspubsub/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func BenchmarkNatsPubSub(b *testing.B) {
b.Fatal(err)
}
defer cleanup()
drivertest.RunBenchmarks(b, pubsub.NewTopic(dt, nil), pubsub.NewSubscription(ds, nil))
drivertest.RunBenchmarks(b, pubsub.NewTopic(dt, nil), pubsub.NewSubscription(ds, true, nil))
}

func fakeConnectionStringInEnv() func() {
Expand Down
15 changes: 10 additions & 5 deletions pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ type Subscription struct {
ackFunc func() // if non-nil, used for Ack
cancel func() // for canceling all SendAcks calls

dynamicBatchSizes bool // if false, batch size is always 2

mu sync.Mutex // protects everything below
q []*Message // local queue of messages downloaded from server
err error // permanent error
Expand Down Expand Up @@ -407,7 +409,7 @@ func (s *Subscription) Receive(ctx context.Context) (_ *Message, err error) {
// Unless we don't have information about process time (at the beginning), in
// which case just get one message.
nMessages := 1
if s.avgProcessTime > 0 {
if s.dynamicBatchSizes && s.avgProcessTime > 0 {
// Using Ceil guarantees at least one message.
n := math.Ceil(desiredQueueDuration.Seconds() / s.avgProcessTime)
// Cap nMessages at some non-ridiculous value.
Expand Down Expand Up @@ -533,12 +535,15 @@ var NewSubscription = newSubscription
// newSubscription creates a Subscription from a driver.Subscription
// and a function to make a batcher that sends batches of acks to the provider.
// If newAckBatcher is nil, a default batcher implementation will be used.
func newSubscription(ds driver.Subscription, newAckBatcher func(context.Context, *Subscription, driver.Subscription) driver.Batcher) *Subscription {
// dynamicBatchSizes should be true except for tests, where stability is
// necessary for record/replay.
func newSubscription(ds driver.Subscription, dynamicBatchSizes bool, newAckBatcher func(context.Context, *Subscription, driver.Subscription) driver.Batcher) *Subscription {
ctx, cancel := context.WithCancel(context.Background())
s := &Subscription{
driver: ds,
tracer: newTracer(ds),
cancel: cancel,
driver: ds,
tracer: newTracer(ds),
cancel: cancel,
dynamicBatchSizes: dynamicBatchSizes,
}
if newAckBatcher == nil {
newAckBatcher = defaultAckBatcher
Expand Down
12 changes: 6 additions & 6 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestSendReceive(t *testing.T) {
t.Fatal(err)
}

sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
m2, err := sub.Receive(ctx)
if err != nil {
Expand All @@ -146,7 +146,7 @@ func TestConcurrentReceivesGetAllTheMessages(t *testing.T) {
// Make a subscription.
ds := NewDriverSub()
dt.subs = append(dt.subs, ds)
s := pubsub.NewSubscription(ds, nil)
s := pubsub.NewSubscription(ds, true, nil)
defer s.Shutdown(ctx)

// Start 10 goroutines to receive from it.
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestCancelSend(t *testing.T) {
func TestCancelReceive(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ds := NewDriverSub()
s := pubsub.NewSubscription(ds, nil)
s := pubsub.NewSubscription(ds, true, nil)
defer s.Shutdown(ctx)
cancel()
// Without cancellation, this Receive would hang.
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestCancelTwoReceives(t *testing.T) {
// We expect Goroutine 2's Receive to exit immediately. That won't
// happen if Receive holds the lock during the call to ReceiveBatch.
inReceiveBatch := make(chan int, 1)
s := pubsub.NewSubscription(blockingDriverSub{inReceiveBatch: inReceiveBatch}, nil)
s := pubsub.NewSubscription(blockingDriverSub{inReceiveBatch: inReceiveBatch}, true, nil)
go func() {
s.Receive(context.Background())
t.Fatal("Receive should never return")
Expand Down Expand Up @@ -314,7 +314,7 @@ func (*failTopic) ErrorCode(error) gcerrors.ErrorCode { return gcerrors.Unknown

func TestRetryReceive(t *testing.T) {
fs := &failSub{}
sub := pubsub.NewSubscription(fs, nil)
sub := pubsub.NewSubscription(fs, true, nil)
_, err := sub.Receive(context.Background())
if err != nil {
t.Errorf("Receive: got %v, want nil", err)
Expand Down Expand Up @@ -371,7 +371,7 @@ func (erroringSubscription) AckFunc() func() { re
func TestErrorsAreWrapped(t *testing.T) {
ctx := context.Background()
top := pubsub.NewTopic(erroringTopic{}, nil)
sub := pubsub.NewSubscription(erroringSubscription{}, nil)
sub := pubsub.NewSubscription(erroringSubscription{}, true, nil)

verify := func(err error) {
t.Helper()
Expand Down
2 changes: 1 addition & 1 deletion pubsub/rabbitpubsub/rabbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func errorAs(err error, i interface{}) bool {
// The documentation of the amqp package recommends using separate connections for
// publishing and subscribing.
func OpenSubscription(conn *amqp.Connection, name string, opts *SubscriptionOptions) *pubsub.Subscription {
return pubsub.NewSubscription(newSubscription(&connection{conn}, name), nil)
return pubsub.NewSubscription(newSubscription(&connection{conn}, name), true, nil)
}

type subscription struct {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/rabbitpubsub/rabbit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func BenchmarkRabbit(b *testing.B) {
b.Fatal(err)
}
defer cleanup()
drivertest.RunBenchmarks(b, pubsub.NewTopic(dt, nil), pubsub.NewSubscription(ds, nil))
drivertest.RunBenchmarks(b, pubsub.NewTopic(dt, nil), pubsub.NewSubscription(ds, true, nil))
}

type harness struct {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/sub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestReceiveWithEmptyBatchReturnedFromDriver(t *testing.T) {
{&driver.Message{}},
},
}
sub := pubsub.NewSubscription(ds, nil)
sub := pubsub.NewSubscription(ds, true, nil)
defer sub.Shutdown(ctx)
_, err := sub.Receive(ctx)
if err != nil {
Expand Down

0 comments on commit 08a6b72

Please sign in to comment.