-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathworker_pool_manager.go
111 lines (96 loc) · 3.51 KB
/
worker_pool_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package pool
import (
"context"
"sync"
"time"
"github.com/jellydator/ttlcache/v3"
)
// WorkerPoolManager - Self-expiring, lazily constructed map of fixed-size worker pools safe for concurrent use
type WorkerPoolManager struct {
workerPoolCache *ttlcache.Cache[string, WorkerPool]
workerPoolMaxSize int
poolReservationLock *sync.Mutex
stalePoolExpiration time.Duration
maxPoolLifetime time.Duration
}
// NewWorkerPoolManager factory constructor
//
// * poolSize - The max number of workers for each key
// * stalePoolExpiration - how long to cache unused pools for
// * maxPoolLifetime - max time to allow pools to live
func NewWorkerPoolManager(
poolSize int, stalePoolExpiration time.Duration, maxPoolLifetime time.Duration,
) *WorkerPoolManager {
workerPoolCache := ttlcache.New(
ttlcache.WithTTL[string, WorkerPool](stalePoolExpiration),
)
workerPoolCache.OnEviction(func(context context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[string, WorkerPool]) {
item.Value().Dispose()
})
go workerPoolCache.Start()
return &WorkerPoolManager{
workerPoolCache: workerPoolCache,
workerPoolMaxSize: poolSize,
poolReservationLock: &sync.Mutex{},
stalePoolExpiration: stalePoolExpiration,
maxPoolLifetime: maxPoolLifetime,
}
}
// GetPool returns the WorkerPool for this key, building a BaseWorkerPool and caching it if necessary.
// Spawns sendSize workers, up to a max of the manager's poolSize.
//
// This returns the pool in an "unexpirable" state - the caller should signal the returned done channel when it
// no longer requires the returned bundle.
func (m *WorkerPoolManager) GetPool(key string, sendSize int) (WorkerPool, chan<- bool) {
// The default factory, NewWorkerPool, cannot return an error
pool, doneUsing, _ := m.GetPoolWithFactory(key, sendSize, NewWorkerPool)
return pool, doneUsing
}
// GetPoolWithFactory returns the WorkerPool for this key, allowing you to specify a custom pool.Factory
// if you want to build a custom WorkerPool implementation which embeds a BaseWorkerPool and attaches
// supplimentary shared data for the pool.
func (m *WorkerPoolManager) GetPoolWithFactory(
key string, sendSize int, factory Factory,
) (WorkerPool, chan<- bool, error) {
var pool WorkerPool
var err error
m.poolReservationLock.Lock()
cachedPoolItem := m.workerPoolCache.Get(key)
if cachedPoolItem != nil {
pool = cachedPoolItem.Value()
} else {
pool, err = factory(m.workerPoolMaxSize)
if err != nil {
m.poolReservationLock.Unlock()
return nil, nil, err
}
m.workerPoolCache.Set(key, pool, ttlcache.DefaultTTL)
}
// Prevent this from being deleted until we're done using it - if reserve returns false, it was
// closed before we obtained control - otherwise we have a read lock and we know it won't be closed
// until we're done with it
goodForUse := pool.reserve()
if !goodForUse {
m.poolReservationLock.Unlock()
return m.GetPoolWithFactory(key, sendSize, factory)
}
pool.spawnWorkers(sendSize)
// If the item is older than maxClientBundleExpiration, remove it from the cache and schedule it for disposal.
// Disposal won't actually occur until the caller has released it
if pool.age() > m.maxPoolLifetime {
m.workerPoolCache.Delete(key)
go pool.Dispose()
}
doneUsing := make(chan bool)
go func() {
<-doneUsing
pool.release()
}()
m.poolReservationLock.Unlock()
return pool, doneUsing, nil
}
// Dispose clears the underlying cache and stops launched goroutines
func (m *WorkerPoolManager) Dispose() {
m.workerPoolCache.DeleteAll()
m.workerPoolCache.Stop()
}