-
Notifications
You must be signed in to change notification settings - Fork 416
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
123 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
+++ | ||
title = "Requeuing After Error" | ||
description = "How to requeue a message after it fails to process" | ||
weight = -20 | ||
draft = false | ||
bref = "How to requeue a message after it fails to process" | ||
+++ | ||
|
||
When a message fails to process (a nack is sent), it usually blocks other messages on the same topic (within the same consumer group or partition). | ||
|
||
Depending on your setup, it may be useful to requeue the failed message back to the tail of the queue. | ||
|
||
Consider this if: | ||
* You don't care about the order of messages. | ||
* Your system isn't resilient to blocked messages. | ||
|
||
## Requeuer | ||
|
||
The `Requeuer` component is a wrapper on the `Router` that moves messages from one topic to another. | ||
|
||
{{% load-snippet-partial file="src-link/components/requeuer/requeuer.go" first_line_contains="type Config" last_line_contains="}" %}} | ||
|
||
A trivial usage can look like this. It requeues messages from one topic to the same topic after a delay. | ||
|
||
{{< callout "danger" >}} | ||
Using the delay this way is not recommended, as it blocks the entire requeue process for the given time. | ||
{{< /callout >}} | ||
|
||
```go | ||
req, err := requeuer.NewRequeuer(requeuer.Config{ | ||
Subscriber: sub, | ||
SubscribeTopic: "topic", | ||
Publisher: pub, | ||
GeneratePublishTopic: func(params requeuer.GeneratePublishTopicParams) (string, error) { | ||
return "topic", nil | ||
}, | ||
Delay: time.Millisecond * 200, | ||
}, logger) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err := req.Run(context.Background()) | ||
if err != nil { | ||
return err | ||
} | ||
``` | ||
|
||
A better way to use the `Requeuer` is to combine it with the `Poison` middleware. | ||
The middleware moves messages to a separate "poison" topic. | ||
Then, the requeuer moves them back to the original topic based on the metadata. | ||
|
||
You combine this with a Pub/Sub that supports delayed messages. | ||
See the [full example based on PostgreSQL](https://github.com/ThreeDotsLabs/watermill/blob/master/_examples/real-world-examples/delayed-requeue/main.go). | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -258,7 +258,6 @@ if err != nil { | |
} | ||
``` | ||
|
||
|
||
{{< tabs "publishing" >}} | ||
|
||
{{< tab "Go Channel" "go-channel" >}} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file added
BIN
+15.9 KB
...en/images/_hu70523d59fb738bec5c06336403a46531_39940_c0fe83760c1bebd5a39d4ddb7fce622e.webp
Binary file not shown.
Binary file added
BIN
+13.2 KB
...en/images/_hu89002a090cbbd5e6897ae6b591dddabc_33739_cb9243f2e37f830fb14160ae4284ce39.webp
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters