-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsync.go
112 lines (86 loc) · 2.34 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package hibp
import (
"context"
"errors"
"fmt"
"github.com/alitto/pond"
mapset "github.com/deckarep/golang-set/v2"
"math"
syncPkg "sync"
"sync/atomic"
)
func sync(ctx context.Context, from, to int64, client *hibpClient, store storage, pool *pond.WorkerPool, onProgress ProgressFunc) error {
var (
mErr error
errLock syncPkg.Mutex
processed atomic.Int64
inFlightSet = mapset.NewSet[int64]()
onProgressLock syncPkg.Mutex
)
processed.Store(from)
for i := from; i < to; i++ {
current := i
// Pool is configured to be non-buffering, i.e., when the context gets canceled, we will finish the jobs
// that are currently being processed, but we will not start new ones.
if err := ctx.Err(); err != nil {
return err
}
pool.Submit(func() {
rangePrefix := toRangeString(current)
err := func() (innerErr error) {
defer func() {
if r := recover(); r != nil {
innerErr = fmt.Errorf("recovered panic: %v", r)
}
}()
inFlightSet.Add(current)
// We basically ignore any error here because we can still process the range even if we can't load the etag
etag, err := store.LoadETag(rangePrefix)
if err != nil {
etag = ""
}
resp, err := client.RequestRange(rangePrefix, etag)
if err != nil {
return err
}
if !resp.NotModified {
if err := store.Save(rangePrefix, resp.ETag, resp.Data); err != nil {
return fmt.Errorf("saving range: %w", err)
}
}
p := processed.Add(1)
inFlightSet.Remove(current)
lowest := lowestInFlight(inFlightSet, to)
remaining := to - p
if p%10 == 0 || remaining == 0 {
onProgressLock.Lock()
defer onProgressLock.Unlock()
if err := onProgress(lowest, current, to, p, remaining); err != nil {
return fmt.Errorf("reporting progress: %w", err)
}
}
return nil
}()
if err != nil {
errLock.Lock()
defer errLock.Unlock()
mErr = errors.Join(mErr, fmt.Errorf("processing range %q: %w", rangePrefix, err))
}
})
}
pool.StopAndWait()
return mErr
}
func toRangeString(i int64) string {
return fmt.Sprintf("%05X", i)
}
func lowestInFlight(inFlight mapset.Set[int64], to int64) int64 {
lowest := int64(math.MaxInt64)
for _, a := range inFlight.ToSlice() {
lowest = min(lowest, a)
}
if lowest == math.MaxInt64 {
return to - 1
}
return lowest
}