Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redis pubsub: add retries limit and option to disable retries #2803

Draft
wants to merge 5 commits into
base: pubsub_Error
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 61 additions & 26 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,33 @@ type ConsumerConfig struct {
ResponseEntryTimeout time.Duration `koanf:"response-entry-timeout"`
// Minimum idle time after which messages will be autoclaimed
IdletimeToAutoclaim time.Duration `koanf:"idletime-to-autoclaim"`
// Enables retrying too long pending messages
Retry bool `koanf:"retry"`
// Number of message retries after which we set error response
MaxRetryCount int64 `koanf:"max-retry-count"`
}

var DefaultConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Hour,
IdletimeToAutoclaim: 5 * time.Minute,
Retry: true,
MaxRetryCount: -1,
}

var TestConsumerConfig = ConsumerConfig{
ResponseEntryTimeout: time.Minute,
IdletimeToAutoclaim: 30 * time.Millisecond,
Retry: true,
MaxRetryCount: -1,
}

var AlreadySetError = errors.New("redis key already set")

func ConsumerConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".response-entry-timeout", DefaultConsumerConfig.ResponseEntryTimeout, "timeout for response entry")
f.Duration(prefix+".idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers")
f.Duration(prefix+".idletime-to-autoclaim", DefaultConsumerConfig.IdletimeToAutoclaim, "After a message spends this amount of time in PEL (Pending Entries List i.e claimed by another consumer but not Acknowledged) it will be allowed to be autoclaimed by other consumers. This option should be set to the same value for all consumers and producers.")
f.Bool(prefix+".retry", DefaultConsumerConfig.Retry, "enables autoclaim for this consumer, if set to false this consumer will not check messages from PEL (Pending Entries List)")
f.Int64(prefix+".max-retry-count", DefaultConsumerConfig.MaxRetryCount, "number of message retries after which this consumer will set an error response and Acknowledge the message (-1 = no limit)")
}

// Consumer implements a consumer for redis stream provides heartbeat to
Expand Down Expand Up @@ -111,32 +121,55 @@ func decrementMsgIdByOne(msgId string) string {
// Consumer first checks it there exists pending message that is claimed by
// unresponsive consumer, if not then reads from the stream.
func (c *Consumer[Request, Response]) Consume(ctx context.Context) (*Message[Request], error) {
// First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim
// this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration
var messages []redis.XMessage
if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: c.redisStream,
Group: c.redisGroup,
Start: "-",
End: "+",
Count: 50,
Idle: c.cfg.IdletimeToAutoclaim,
}).Result(); err != nil {
if !errors.Is(err, redis.Nil) {
log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "penindlen", len(pendingMsgs))
}
} else if len(pendingMsgs) > 0 {
idx := rand.Intn(len(pendingMsgs))
messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{
Group: c.redisGroup,
Consumer: c.id,
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
Stream: c.redisStream,
Start: decrementMsgIdByOne(pendingMsgs[idx].ID),
Count: 1,
}).Result()
if err != nil {
log.Info("error from xautoclaim", "err", err)
if c.cfg.Retry {
// First try to XAUTOCLAIM, with start as a random messageID from PEL with MinIdle as IdletimeToAutoclaim
// this prioritizes processing PEL messages that have been waiting for more than IdletimeToAutoclaim duration
if pendingMsgs, err := c.client.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: c.redisStream,
Group: c.redisGroup,
Start: "-",
End: "+",
Count: 50,
Idle: c.cfg.IdletimeToAutoclaim,
}).Result(); err != nil {
if !errors.Is(err, redis.Nil) {
log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "pendingLen", len(pendingMsgs))
}
} else if len(pendingMsgs) > 0 {
if c.cfg.MaxRetryCount != -1 {
var exceededRetries []redis.XPendingExt
var filtered []redis.XPendingExt
for _, msg := range pendingMsgs {
if msg.RetryCount > c.cfg.MaxRetryCount {
exceededRetries = append(exceededRetries, msg)
} else {
filtered = append(filtered, msg)
}
}
if len(exceededRetries) > 0 {
idx := rand.Intn(len(exceededRetries))
if err := c.SetError(ctx, exceededRetries[idx].ID, "too many retries"); err != nil {
// TODO(magic): don't log error when other consumer set the error before us
log.Error("Failed to set error response for a message that exceeded retries limit", "err", err, "retryCount", exceededRetries[idx].RetryCount)
}
}
pendingMsgs = filtered
}
if len(pendingMsgs) > 0 {
idx := rand.Intn(len(pendingMsgs))
messages, _, err = c.client.XAutoClaim(ctx, &redis.XAutoClaimArgs{
Group: c.redisGroup,
Consumer: c.id,
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
Stream: c.redisStream,
Start: decrementMsgIdByOne(pendingMsgs[idx].ID),
Count: 1,
}).Result()
if err != nil {
log.Info("error from xautoclaim", "err", err)
}
}
}
}
if len(messages) == 0 {
Expand Down Expand Up @@ -233,6 +266,7 @@ func (c *Consumer[Request, Response]) SetResult(ctx context.Context, messageID s
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
log.Debug("consumer: xdel", "cid", c.id, "messageId", messageID)
if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil {
return fmt.Errorf("deleting message: %v, error: %w", messageID, err)
}
Expand All @@ -257,6 +291,7 @@ func (c *Consumer[Request, Response]) SetError(ctx context.Context, messageID st
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
log.Debug("consumer: xdel", "cid", c.id, "messageId", messageID)
if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil {
return fmt.Errorf("deleting message: %v, error: %w", messageID, err)
}
Expand Down
Loading
Loading