-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeduplication_storage.go
87 lines (75 loc) · 2.82 KB
/
deduplication_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package streams
import (
"context"
"errors"
"log"
"os"
"strings"
"github.com/allegro/bigcache/v3"
)
// A DeduplicationStorage is a special kind of storage used by a data-in motion system to
// keep track of duplicate messages.
//
// It is recommended to use in-memory (or at least external) storages
// to increase performance significantly and reduce main database backpressure.
type DeduplicationStorage interface {
// Commit registers and acknowledges a message has been processed correctly.
Commit(ctx context.Context, workerID, messageID string)
// IsDuplicated indicates if a message has been processed before.
IsDuplicated(ctx context.Context, workerID, messageID string) (bool, error)
}
// EmbeddedDeduplicationStorageConfig is the EmbeddedDeduplicationStorage schema configuration.
type EmbeddedDeduplicationStorageConfig struct {
Logger *log.Logger
ErrorLogger *log.Logger
KeyDelimiter string
}
// A EmbeddedDeduplicationStorage is the in-memory concrete implementation of DeduplicationStorage using
// package allegro.BigCache as high-performance underlying storage.
//
// Consider by using this storage, your computing instance becomes stateful, meaning if the node gets down,
// the deduplicated message database will be dropped as well.
type EmbeddedDeduplicationStorage struct {
cache *bigcache.BigCache
cfg EmbeddedDeduplicationStorageConfig
}
const embeddedDeduplicationStorageKeyDelimiter = "#"
var _ DeduplicationStorage = EmbeddedDeduplicationStorage{}
// NewEmbeddedDeduplicationStorage allocates an in-memory DeduplicationStorage instance.
func NewEmbeddedDeduplicationStorage(cfg EmbeddedDeduplicationStorageConfig, cache *bigcache.BigCache) EmbeddedDeduplicationStorage {
if cfg.Logger == nil || cfg.ErrorLogger == nil {
logger := log.New(os.Stdout, "streams: ", 0)
if cfg.Logger == nil {
cfg.Logger = logger
}
if cfg.ErrorLogger == nil {
cfg.ErrorLogger = logger
}
}
if cfg.KeyDelimiter == "" {
cfg.KeyDelimiter = embeddedDeduplicationStorageKeyDelimiter
}
return EmbeddedDeduplicationStorage{
cache: cache,
cfg: cfg,
}
}
func (e EmbeddedDeduplicationStorage) Commit(_ context.Context, workerID, messageID string) {
key := strings.Join([]string{messageID, workerID}, e.cfg.KeyDelimiter)
// using cache as hash set
if err := e.cache.Set(key, nil); err != nil {
e.cfg.ErrorLogger.Printf("failed to commit message, error %s", err.Error())
return
}
e.cfg.Logger.Printf("committed message with id <%s> and worker id <%s>", workerID, messageID)
}
func (e EmbeddedDeduplicationStorage) IsDuplicated(_ context.Context, workerID, messageID string) (bool, error) {
key := strings.Join([]string{messageID, workerID}, e.cfg.KeyDelimiter)
if _, err := e.cache.Get(key); err != nil {
if errors.Is(err, bigcache.ErrEntryNotFound) {
return false, nil
}
return false, err
}
return true, nil
}