From f71e9734c1cdedad524da1520fb584601e210a24 Mon Sep 17 00:00:00 2001 From: Adrian Lungu Date: Tue, 2 Jan 2018 13:02:30 +0200 Subject: [PATCH 01/23] added Janitor to limit cache size and number of items with LRU disposal --- fs.go | 2 +- fscache.go | 64 +++++++++++++++++++---- fscache_test.go | 116 +++++++++++++++++++++++++++++++++++++++-- janitor.go | 133 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 299 insertions(+), 16 deletions(-) create mode 100644 janitor.go diff --git a/fs.go b/fs.go index 91aaae3..acc62e5 100644 --- a/fs.go +++ b/fs.go @@ -22,7 +22,7 @@ type FileSystem interface { // Stream FileSystem stream.FileSystem - // Reload should look through the FileSystem and call the suplied fn + // Reload should look through the FileSystem and call the supplied fn // with the key/filename pairs that are found. Reload(func(key, name string)) error diff --git a/fscache.go b/fscache.go index 2d8acb1..ea05e44 100644 --- a/fscache.go +++ b/fscache.go @@ -6,13 +6,12 @@ import ( "sync" "sync/atomic" "time" - + "gopkg.in/djherbis/stream.v1" ) // Cache works like a concurrent-safe map for streams. type Cache interface { - // Get manages access to the streams in the cache. // If the key does not exist, w != nil and you can start writing to the stream. // If the key does exist, w == nil. @@ -35,10 +34,13 @@ type Cache interface { } type cache struct { - mu sync.RWMutex - files map[string]fileStream - grim Reaper - fs FileSystem + maxFiles, maxSize int + + mu sync.RWMutex + files map[string]fileStream + grim Reaper + scrubs Janitor + fs FileSystem } // ReadAtCloser is an io.ReadCloser, and an io.ReaderAt. It supports both so that Range @@ -71,17 +73,49 @@ func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) { period: expiry, } } - return NewCache(fs, grim) + return NewCache(fs, grim, nil) +} + +// New creates a new Cache using NewFs(dir, perms). +// +// expiry is the duration after which an un-accessed key will be removed from +// the cache, a zero value expiro means never expire. +// +// scrubbingPeriod is the duration after which the cache will be checked if +// it has more items than the maxItems or if its size is bigger than maxSize. +func NewWithLimits(dir string, perms os.FileMode, expiry, scrubbingPeriod time.Duration, maxItems int, maxSize int64) (Cache, error) { + fs, err := NewFs(dir, perms) + if err != nil { + return nil, err + } + var grim Reaper + if expiry > 0 { + grim = &reaper{ + expiry: expiry, + period: expiry, + } + } + + var scrubs Janitor + if maxItems > 0 || maxSize > 0 { + scrubs = &janitor{ + period: scrubbingPeriod, + maxItems: maxItems, + maxSize: maxSize, + } + } + return NewCache(fs, grim, scrubs) } // NewCache creates a new Cache based on FileSystem fs. // fs.Files() are loaded using the name they were created with as a key. // Reaper is used to determine when files expire, nil means never expire. -func NewCache(fs FileSystem, grim Reaper) (Cache, error) { +func NewCache(fs FileSystem, grim Reaper, scrubs Janitor) (Cache, error) { c := &cache{ - files: make(map[string]fileStream), - grim: grim, - fs: fs, + files: make(map[string]fileStream), + grim: grim, + scrubs: scrubs, + fs: fs, } err := c.load() if err != nil { @@ -90,9 +124,17 @@ func NewCache(fs FileSystem, grim Reaper) (Cache, error) { if grim != nil { c.haunter() } + if scrubs != nil { + c.scrubber() + } return c, nil } +func (c *cache) scrubber() { + c.scrubs.Scrub(c) + time.AfterFunc(c.scrubs.Next(), c.scrubber) +} + func (c *cache) haunter() { c.haunt() time.AfterFunc(c.grim.Next(), c.haunter) diff --git a/fscache_test.go b/fscache_test.go index bb65377..b5356c9 100644 --- a/fscache_test.go +++ b/fscache_test.go @@ -19,7 +19,7 @@ func createFile(name string) (*os.File, error) { } func init() { - c, _ := NewCache(NewMemFs(), nil) + c, _ := NewCache(NewMemFs(), nil, nil) go ListenAndServe(c, ":10000") } @@ -31,14 +31,14 @@ func testCaches(t *testing.T, run func(c Cache)) { } run(c) - c, err = NewCache(NewMemFs(), NewReaper(time.Hour, time.Hour)) + c, err = NewCache(NewMemFs(), NewReaper(time.Hour, time.Hour), nil) if err != nil { t.Error(err.Error()) return } run(c) - c2, _ := NewCache(NewMemFs(), nil) + c2, _ := NewCache(NewMemFs(), nil, nil) run(NewPartition(NewDistributor(c, c2))) lc := NewLayered(c, c2) @@ -211,6 +211,114 @@ func TestReload(t *testing.T) { } } +func TestJanitorMaxItems(t *testing.T) { + + fs, err := NewFs("./cache1", 0700) + if err != nil { + t.Error(err.Error()) + t.FailNow() + } + + c, err := NewCache(fs, nil, NewJanitor(3, 0, 400*time.Millisecond)) + + if err != nil { + t.Error(err.Error()) + return + } + defer c.Clean() + + for i := 0; i < 5; i++ { + name := fmt.Sprintf("stream-%v", i) + r, w, _ := c.Get(name) + w.Write([]byte("hello")) + w.Close() + io.Copy(ioutil.Discard, r) + + if !c.Exists(name) { + t.Errorf(name + " should exist") + } + + <-time.After(10 * time.Millisecond) + + err := r.Close() + if err != nil { + t.Error(err) + } + } + + <-time.After(400 * time.Millisecond) + + if c.Exists("stream-0") { + t.Errorf("stream-0 should have been scrubbed") + } + + if c.Exists("stream-1") { + t.Errorf("stream-1 should have been scrubbed") + } + + files, err := ioutil.ReadDir("./cache1") + if err != nil { + t.Error(err.Error()) + return + } + + if len(files) != 3 { + t.Errorf("expected 3 items in directory") + } +} + +func TestJanitorMaxSize(t *testing.T) { + + fs, err := NewFs("./cache1", 0700) + if err != nil { + t.Error(err.Error()) + t.FailNow() + } + + c, err := NewCache(fs, nil, NewJanitor(0, 24, 400*time.Millisecond)) + + if err != nil { + t.Error(err.Error()) + return + } + defer c.Clean() + + for i := 0; i < 5; i++ { + name := fmt.Sprintf("stream-%v", i) + r, w, _ := c.Get(name) + w.Write([]byte("hello")) + w.Close() + io.Copy(ioutil.Discard, r) + + if !c.Exists(name) { + t.Errorf(name + " should exist") + } + + <-time.After(10 * time.Millisecond) + + err := r.Close() + if err != nil { + t.Error(err) + } + } + + <-time.After(400 * time.Millisecond) + + if c.Exists("stream-0") { + t.Errorf("stream-0 should have been scrubbed") + } + + files, err := ioutil.ReadDir("./cache1") + if err != nil { + t.Error(err.Error()) + return + } + + if len(files) != 4 { + t.Errorf("expected 4 items in directory") + } +} + func TestReaper(t *testing.T) { fs, err := NewFs("./cache1", 0700) if err != nil { @@ -218,7 +326,7 @@ func TestReaper(t *testing.T) { t.FailNow() } - c, err := NewCache(fs, NewReaper(0*time.Second, 100*time.Millisecond)) + c, err := NewCache(fs, NewReaper(0*time.Second, 100*time.Millisecond), nil) if err != nil { t.Error(err.Error()) diff --git a/janitor.go b/janitor.go new file mode 100644 index 0000000..ea8c612 --- /dev/null +++ b/janitor.go @@ -0,0 +1,133 @@ +package fscache + +import ( + "time" + "os" + "sort" + "fmt" +) + +type janitorKV struct { + Key string + Value fileStream +} + +// Janitor is used to control when there are too many streams +// or the size of the streams is too big. +// It is called once right after loading, and then it is run +// again after every Next() period of time. +type Janitor interface { + // Returns the amount of time to wait before the next scheduled Reaping. + Next() time.Duration + + // Given a key and the last r/w times of a file, return true + // to remove the file from the cache, false to keep it. + Scrub(c *cache) +} + +// NewJanitor returns a simple janitor which runs every "period" +// and scrubs older files when the total file size is over maxSize or +// total item count is over maxItems. +// If maxItems or maxSize are 0, they won't be checked +func NewJanitor(maxItems int, maxSize int64, period time.Duration) Janitor { + return &janitor{ + period: period, + maxItems: maxItems, + maxSize: maxSize, + } +} + +type janitor struct { + period time.Duration + maxItems int + maxSize int64 +} + +func (j *janitor) Next() time.Duration { + return j.period +} + +func (j *janitor) Scrub(c *cache) { + + if len(c.files) == 0 { + return + } + + var count int + var size int64 + var okFiles []janitorKV + + c.mu.Lock() + + for k, f := range c.files { + if f.inUse() { + continue + } + + stat, err := os.Stat(f.Name()) + if err != nil { + continue + } + + count++ + size = size + stat.Size() + okFiles = append(okFiles, janitorKV{ + Key: k, + Value: f, + }) + } + + c.mu.Unlock() + + sort.Slice(okFiles, func(i, l int) bool { + iLastRead, _, err := c.fs.AccessTimes(okFiles[i].Value.Name()) + if err != nil { + return false + } + + jLastRead, _, err := c.fs.AccessTimes(okFiles[l].Value.Name()) + if err != nil { + return false + } + + return iLastRead.Before(jLastRead) + }) + + if j.maxItems > 0 { + for count > j.maxItems { + count, size = j.removeFirst(c, &okFiles, count, size) + } + } + + if j.maxSize > 0 { + for size > j.maxSize { + count, size = j.removeFirst(c, &okFiles, count, size) + } + } + +} + +func (j *janitor) removeFirst(c *cache, items *[]janitorKV, count int, size int64) (int, int64) { + + var f janitorKV + + f, *items = (*items)[0], (*items)[1:] + + stat, err := os.Stat(f.Value.Name()) + if err != nil { + fmt.Println(err) + return count, size + } + + err = c.Remove(f.Key) + if err != nil { + fmt.Println(err) + return count, size + } + + count-- + size = size - stat.Size() + + return count, size + +} From 986034b4b8eb954d517ab04de3d7cb9d67d91dc7 Mon Sep 17 00:00:00 2001 From: Adrian Lungu Date: Tue, 2 Jan 2018 13:13:29 +0200 Subject: [PATCH 02/23] update Go version to 1.8 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index cd4414c..74013aa 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: go go: -- 1.6 +- 1.8 before_install: - go get github.com/golang/lint/golint - go get github.com/axw/gocov/gocov From 89b0b4dec1d95fff0c878e23826af0322736129e Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Thu, 24 Jan 2019 17:35:00 +0300 Subject: [PATCH 03/23] Fixed djherbis comments after pull request review --- fs.go | 14 +++++++ fscache.go | 72 +++++++++----------------------- fscache_test.go | 12 +++--- janitor.go | 109 ++++++++++++++++++++++-------------------------- memfs.go | 10 +++++ reaper.go | 4 +- 6 files changed, 103 insertions(+), 118 deletions(-) diff --git a/fs.go b/fs.go index a0f61d5..88d52f2 100644 --- a/fs.go +++ b/fs.go @@ -17,11 +17,17 @@ import ( "gopkg.in/djherbis/stream.v1" ) +type FileSystemStater interface { + Size(name string) (int64, error) +} + // FileSystem is used as the source for a Cache. type FileSystem interface { // Stream FileSystem stream.FileSystem + FileSystemStater + // Reload should look through the FileSystem and call the supplied fn // with the key/filename pairs that are found. Reload(func(key, name string)) error @@ -50,6 +56,14 @@ func NewFs(dir string, mode os.FileMode) (FileSystem, error) { return fs, fs.init() } +func (fs *stdFs) Size(name string) (int64, error) { + stat, err := os.Stat(name) + if err == nil { + return stat.Size(), nil + } + return 0, err +} + func (fs *stdFs) Reload(add func(key, name string)) error { files, err := ioutil.ReadDir(fs.root) if err != nil { diff --git a/fscache.go b/fscache.go index ea05e44..9cc0641 100644 --- a/fscache.go +++ b/fscache.go @@ -34,13 +34,10 @@ type Cache interface { } type cache struct { - maxFiles, maxSize int - - mu sync.RWMutex - files map[string]fileStream - grim Reaper - scrubs Janitor - fs FileSystem + mu sync.RWMutex + files map[string]fileStream + grim Reaper + fs FileSystem } // ReadAtCloser is an io.ReadCloser, and an io.ReaderAt. It supports both so that Range @@ -73,49 +70,17 @@ func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) { period: expiry, } } - return NewCache(fs, grim, nil) -} - -// New creates a new Cache using NewFs(dir, perms). -// -// expiry is the duration after which an un-accessed key will be removed from -// the cache, a zero value expiro means never expire. -// -// scrubbingPeriod is the duration after which the cache will be checked if -// it has more items than the maxItems or if its size is bigger than maxSize. -func NewWithLimits(dir string, perms os.FileMode, expiry, scrubbingPeriod time.Duration, maxItems int, maxSize int64) (Cache, error) { - fs, err := NewFs(dir, perms) - if err != nil { - return nil, err - } - var grim Reaper - if expiry > 0 { - grim = &reaper{ - expiry: expiry, - period: expiry, - } - } - - var scrubs Janitor - if maxItems > 0 || maxSize > 0 { - scrubs = &janitor{ - period: scrubbingPeriod, - maxItems: maxItems, - maxSize: maxSize, - } - } - return NewCache(fs, grim, scrubs) + return NewCache(fs, grim) } // NewCache creates a new Cache based on FileSystem fs. // fs.Files() are loaded using the name they were created with as a key. // Reaper is used to determine when files expire, nil means never expire. -func NewCache(fs FileSystem, grim Reaper, scrubs Janitor) (Cache, error) { +func NewCache(fs FileSystem, grim Reaper) (Cache, error) { c := &cache{ - files: make(map[string]fileStream), - grim: grim, - scrubs: scrubs, - fs: fs, + files: make(map[string]fileStream), + grim: grim, + fs: fs, } err := c.load() if err != nil { @@ -124,17 +89,9 @@ func NewCache(fs FileSystem, grim Reaper, scrubs Janitor) (Cache, error) { if grim != nil { c.haunter() } - if scrubs != nil { - c.scrubber() - } return c, nil } -func (c *cache) scrubber() { - c.scrubs.Scrub(c) - time.AfterFunc(c.scrubs.Next(), c.scrubber) -} - func (c *cache) haunter() { c.haunt() time.AfterFunc(c.grim.Next(), c.haunter) @@ -144,6 +101,17 @@ func (c *cache) haunt() { c.mu.Lock() defer c.mu.Unlock() + if cr, ok := c.grim.(StatsBasedReaper); ok { + for _, key := range cr.ReapUsingStats(CacheStats{cache: c}) { + f, ok := c.files[key] + delete(c.files, key) + if ok { + c.fs.Remove(f.Name()) + } + } + return + } + for key, f := range c.files { if f.inUse() { continue diff --git a/fscache_test.go b/fscache_test.go index 5ffec01..06888b6 100644 --- a/fscache_test.go +++ b/fscache_test.go @@ -19,7 +19,7 @@ func createFile(name string) (*os.File, error) { } func init() { - c, _ := NewCache(NewMemFs(), nil, nil) + c, _ := NewCache(NewMemFs(), nil) go ListenAndServe(c, ":10000") } @@ -31,14 +31,14 @@ func testCaches(t *testing.T, run func(c Cache)) { } run(c) - c, err = NewCache(NewMemFs(), NewReaper(time.Hour, time.Hour), nil) + c, err = NewCache(NewMemFs(), NewReaper(time.Hour, time.Hour)) if err != nil { t.Error(err.Error()) return } run(c) - c2, _ := NewCache(NewMemFs(), nil, nil) + c2, _ := NewCache(NewMemFs(), nil) run(NewPartition(NewDistributor(c, c2))) lc := NewLayered(c, c2) @@ -219,7 +219,7 @@ func TestJanitorMaxItems(t *testing.T) { t.FailNow() } - c, err := NewCache(fs, nil, NewJanitor(3, 0, 400*time.Millisecond)) + c, err := NewCache(fs, &Janitor{Period: 400 * time.Millisecond, MaxItems: 3, MaxTotalFileSize: 0}) if err != nil { t.Error(err.Error()) @@ -275,7 +275,7 @@ func TestJanitorMaxSize(t *testing.T) { t.FailNow() } - c, err := NewCache(fs, nil, NewJanitor(0, 24, 400*time.Millisecond)) + c, err := NewCache(fs, &Janitor{Period: 400 * time.Millisecond, MaxItems: 0, MaxTotalFileSize: 24}) if err != nil { t.Error(err.Error()) @@ -326,7 +326,7 @@ func TestReaper(t *testing.T) { t.FailNow() } - c, err := NewCache(fs, NewReaper(0*time.Second, 100*time.Millisecond), nil) + c, err := NewCache(fs, NewReaper(0*time.Second, 100*time.Millisecond)) if err != nil { t.Error(err.Error()) diff --git a/janitor.go b/janitor.go index 51eaa80..c37571c 100644 --- a/janitor.go +++ b/janitor.go @@ -2,7 +2,6 @@ package fscache import ( "fmt" - "os" "sort" "time" ) @@ -12,44 +11,41 @@ type janitorKV struct { Value fileStream } -// Janitor is used to control when there are too many streams -// or the size of the streams is too big. -// It is called once right after loading, and then it is run -// again after every Next() period of time. -type Janitor interface { - // Returns the amount of time to wait before the next scheduled Reaping. - Next() time.Duration - - // Given a key and the last r/w times of a file, return true - // to remove the file from the cache, false to keep it. - Scrub(c *cache) +type CacheStats struct { + // public fields / methods to provide info about the Cache state + // this can expose the extra info needed by Janitor to decide what to reap + cache *cache } -// NewJanitor returns a simple janitor which runs every "period" -// and scrubs older files when the total file size is over maxSize or -// total item count is over maxItems. -// If maxItems or maxSize are 0, they won't be checked -func NewJanitor(maxItems int, maxSize int64, period time.Duration) Janitor { - return &janitor{ - period: period, - maxItems: maxItems, - maxSize: maxSize, - } +type StatsBasedReaper interface { + ReapUsingStats(ctx CacheStats) (keysToReap []string) } -type janitor struct { - period time.Duration - maxItems int - maxSize int64 +// Janitor is used to control when there are too many streams +// or the size of the streams is too big. +// It is called once right after loading, and then it is run +// again after every Next() Period of time. +// Janitor runs every "Period" and scrubs older files +// when the total file size is over MaxTotalFileSize or +// total item count is over MaxItems. +// If MaxItems or MaxTotalFileSize are 0, they won't be checked +type Janitor struct { + Period time.Duration + MaxItems int + MaxTotalFileSize int64 } -func (j *janitor) Next() time.Duration { - return j.period +func (j *Janitor) Next() time.Duration { + return j.Period } -func (j *janitor) Scrub(c *cache) { +func (j *Janitor) Reap(_ string, _, _ time.Time) bool { + // not implemented: should not get here + return false +} - if len(c.files) == 0 { +func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { + if len(ctx.cache.files) == 0 { return } @@ -57,35 +53,31 @@ func (j *janitor) Scrub(c *cache) { var size int64 var okFiles []janitorKV - c.mu.Lock() - - for k, f := range c.files { + for k, f := range ctx.cache.files { if f.inUse() { continue } - stat, err := os.Stat(f.Name()) + fileSize, err := ctx.cache.fs.Size(f.Name()) if err != nil { continue } count++ - size = size + stat.Size() + size = size + fileSize okFiles = append(okFiles, janitorKV{ Key: k, Value: f, }) } - c.mu.Unlock() - sort.Slice(okFiles, func(i, l int) bool { - iLastRead, _, err := c.fs.AccessTimes(okFiles[i].Value.Name()) + iLastRead, _, err := ctx.cache.fs.AccessTimes(okFiles[i].Value.Name()) if err != nil { return false } - jLastRead, _, err := c.fs.AccessTimes(okFiles[l].Value.Name()) + jLastRead, _, err := ctx.cache.fs.AccessTimes(okFiles[l].Value.Name()) if err != nil { return false } @@ -93,41 +85,42 @@ func (j *janitor) Scrub(c *cache) { return iLastRead.Before(jLastRead) }) - if j.maxItems > 0 { - for count > j.maxItems { - count, size = j.removeFirst(c, &okFiles, count, size) + if j.MaxItems > 0 { + for count > j.MaxItems { + var key *string + key, count, size = j.removeFirst(ctx.cache, &okFiles, count, size) + if key != nil { + keysToReap = append(keysToReap, *key) + } } } - if j.maxSize > 0 { - for size > j.maxSize { - count, size = j.removeFirst(c, &okFiles, count, size) + if j.MaxTotalFileSize > 0 { + for size > j.MaxTotalFileSize { + var key *string + key, count, size = j.removeFirst(ctx.cache, &okFiles, count, size) + if key != nil { + keysToReap = append(keysToReap, *key) + } } } + return keysToReap } -func (j *janitor) removeFirst(c *cache, items *[]janitorKV, count int, size int64) (int, int64) { - +func (j *Janitor) removeFirst(c *cache, items *[]janitorKV, count int, size int64) (*string, int, int64) { var f janitorKV f, *items = (*items)[0], (*items)[1:] - stat, err := os.Stat(f.Value.Name()) - if err != nil { - fmt.Println(err) - return count, size - } - - err = c.Remove(f.Key) + fileSize, err := c.fs.Size(f.Value.Name()) if err != nil { fmt.Println(err) - return count, size + return nil, count, size } count-- - size = size - stat.Size() - - return count, size + size = size - fileSize + return &f.Key, count, size } diff --git a/memfs.go b/memfs.go index cfe7e0d..dbf3070 100644 --- a/memfs.go +++ b/memfs.go @@ -23,6 +23,16 @@ func NewMemFs() FileSystem { } } +func (fs *memFS) Size(name string) (int64, error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + f, ok := fs.files[name] + if ok { + return int64(len(f.Bytes())), nil + } + return 0, errors.New("file has not been read") +} + func (fs *memFS) Reload(add func(key, name string)) error { return nil } diff --git a/reaper.go b/reaper.go index 601e02c..2a6b7b0 100644 --- a/reaper.go +++ b/reaper.go @@ -4,7 +4,7 @@ import "time" // Reaper is used to control when streams expire from the cache. // It is called once right after loading, and then it is run -// again after every Next() period of time. +// again after every Next() Period of time. type Reaper interface { // Returns the amount of time to wait before the next scheduled Reaping. Next() time.Duration @@ -14,7 +14,7 @@ type Reaper interface { Reap(key string, lastRead, lastWrite time.Time) bool } -// NewReaper returns a simple reaper which runs every "period" +// NewReaper returns a simple reaper which runs every "Period" // and reaps files which are older than "expiry". func NewReaper(expiry, period time.Duration) Reaper { return &reaper{ From 9be121ea0b3c4f7d01a9ece7d5dda9e1b4958532 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Fri, 25 Jan 2019 14:25:35 +0300 Subject: [PATCH 04/23] Removed `fmt.Println`. Fixed possible infinite loop --- janitor.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/janitor.go b/janitor.go index c37571c..4c0edea 100644 --- a/janitor.go +++ b/janitor.go @@ -1,7 +1,6 @@ package fscache import ( - "fmt" "sort" "time" ) @@ -88,7 +87,11 @@ func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { if j.MaxItems > 0 { for count > j.MaxItems { var key *string - key, count, size = j.removeFirst(ctx.cache, &okFiles, count, size) + var err error + key, count, size, err = j.removeFirst(ctx.cache, &okFiles, count, size) + if err != nil { + break + } if key != nil { keysToReap = append(keysToReap, *key) } @@ -98,7 +101,11 @@ func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { if j.MaxTotalFileSize > 0 { for size > j.MaxTotalFileSize { var key *string - key, count, size = j.removeFirst(ctx.cache, &okFiles, count, size) + var err error + key, count, size, err = j.removeFirst(ctx.cache, &okFiles, count, size) + if err != nil { + break + } if key != nil { keysToReap = append(keysToReap, *key) } @@ -108,19 +115,18 @@ func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { return keysToReap } -func (j *Janitor) removeFirst(c *cache, items *[]janitorKV, count int, size int64) (*string, int, int64) { +func (j *Janitor) removeFirst(c *cache, items *[]janitorKV, count int, size int64) (*string, int, int64, error) { var f janitorKV f, *items = (*items)[0], (*items)[1:] fileSize, err := c.fs.Size(f.Value.Name()) if err != nil { - fmt.Println(err) - return nil, count, size + return nil, count, size, err } count-- size = size - fileSize - return &f.Key, count, size + return &f.Key, count, size, nil } From 1eb0d2fee5b969e35e456651fc77a5ff90cee81a Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Fri, 25 Jan 2019 15:45:08 +0300 Subject: [PATCH 05/23] Haunter strategy dependency injected --- fscache.go | 59 ++++++++++++----------------- fscache_test.go | 4 +- haunter.go | 98 +++++++++++++++++++++++++++++++++++++++++++++++++ janitor.go | 76 +++++++++++++++++++------------------- 4 files changed, 162 insertions(+), 75 deletions(-) create mode 100644 haunter.go diff --git a/fscache.go b/fscache.go index 9cc0641..65fe271 100644 --- a/fscache.go +++ b/fscache.go @@ -34,10 +34,10 @@ type Cache interface { } type cache struct { - mu sync.RWMutex - files map[string]fileStream - grim Reaper - fs FileSystem + mu sync.RWMutex + files map[string]fileStream + fs FileSystem + hauntProvider Haunter } // ReadAtCloser is an io.ReadCloser, and an io.ReaderAt. It supports both so that Range @@ -77,57 +77,44 @@ func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) { // fs.Files() are loaded using the name they were created with as a key. // Reaper is used to determine when files expire, nil means never expire. func NewCache(fs FileSystem, grim Reaper) (Cache, error) { + var haunter Haunter + if grim != nil { + haunter = NewReaperHaunter(grim) + } + + return NewCacheWithHaunter(fs, haunter) +} + +// NewCacheWithHaunter create a new Cache based on FileSystem fs. +// fs.Files() are loaded using the name they were created with as a key. +// Haunter is used to determine when files expire, nil means never expire. +func NewCacheWithHaunter(fs FileSystem, haunter Haunter) (Cache, error) { c := &cache{ - files: make(map[string]fileStream), - grim: grim, - fs: fs, + files: make(map[string]fileStream), + hauntProvider: haunter, + fs: fs, } err := c.load() if err != nil { return nil, err } - if grim != nil { + if haunter != nil { c.haunter() } + return c, nil } func (c *cache) haunter() { c.haunt() - time.AfterFunc(c.grim.Next(), c.haunter) + time.AfterFunc(c.hauntProvider.Next(), c.haunter) } func (c *cache) haunt() { c.mu.Lock() defer c.mu.Unlock() - if cr, ok := c.grim.(StatsBasedReaper); ok { - for _, key := range cr.ReapUsingStats(CacheStats{cache: c}) { - f, ok := c.files[key] - delete(c.files, key) - if ok { - c.fs.Remove(f.Name()) - } - } - return - } - - for key, f := range c.files { - if f.inUse() { - continue - } - - lastRead, lastWrite, err := c.fs.AccessTimes(f.Name()) - if err != nil { - continue - } - - if c.grim.Reap(key, lastRead, lastWrite) { - delete(c.files, key) - c.fs.Remove(f.Name()) - } - } - return + c.hauntProvider.Haunt(c) } func (c *cache) load() error { diff --git a/fscache_test.go b/fscache_test.go index 06888b6..f5b9ee7 100644 --- a/fscache_test.go +++ b/fscache_test.go @@ -219,7 +219,7 @@ func TestJanitorMaxItems(t *testing.T) { t.FailNow() } - c, err := NewCache(fs, &Janitor{Period: 400 * time.Millisecond, MaxItems: 3, MaxTotalFileSize: 0}) + c, err := NewCacheWithHaunter(fs, NewJanitorHaunter(NewJanitor(3, 0, 400*time.Millisecond))) if err != nil { t.Error(err.Error()) @@ -275,7 +275,7 @@ func TestJanitorMaxSize(t *testing.T) { t.FailNow() } - c, err := NewCache(fs, &Janitor{Period: 400 * time.Millisecond, MaxItems: 0, MaxTotalFileSize: 24}) + c, err := NewCacheWithHaunter(fs, NewJanitorHaunter(NewJanitor(0, 24, 400*time.Millisecond))) if err != nil { t.Error(err.Error()) diff --git a/haunter.go b/haunter.go new file mode 100644 index 0000000..ba75ddc --- /dev/null +++ b/haunter.go @@ -0,0 +1,98 @@ +package fscache + +import ( + "math" + "time" +) + +type Haunter interface { + Haunt(c *cache) + Next() time.Duration +} + +type reaperHaunter struct { + reaper Reaper +} + +type janitorHaunter struct { + janitor Janitor +} + +type compoundHaunter struct { + haunters []Haunter +} + +// NewCompoundHaunter returns a compound haunter which provides a multi strategy implementation +func NewCompoundHaunter(haunters []Haunter) Haunter { + return &compoundHaunter{ + haunters: haunters, + } +} + +func (h *compoundHaunter) Haunt(c *cache) { + for _, haunter := range h.haunters { + haunter.Haunt(c) + } +} + +func (h *compoundHaunter) Next() time.Duration { + minPeriod := time.Duration(math.MaxInt64) + for _, haunter := range h.haunters { + if period := haunter.Next(); period < minPeriod { + minPeriod = period + } + } + + return minPeriod +} + +// NewJanitorHaunter returns a simple haunter which provides an implementation Janitor strategy +func NewJanitorHaunter(janitor Janitor) Haunter { + return &janitorHaunter{ + janitor: janitor, + } +} + +func (h *janitorHaunter) Haunt(c *cache) { + for _, key := range h.janitor.Scrub(c) { + f, ok := c.files[key] + delete(c.files, key) + if ok { + c.fs.Remove(f.Name()) + } + } + +} + +func (h *janitorHaunter) Next() time.Duration { + return h.janitor.Next() +} + +// NewReaperHaunter returns a simple haunter which provides an implementation Reaper strategy +func NewReaperHaunter(reaper Reaper) Haunter { + return &reaperHaunter{ + reaper: reaper, + } +} + +func (h *reaperHaunter) Haunt(c *cache) { + for key, f := range c.files { + if f.inUse() { + continue + } + + lastRead, lastWrite, err := c.fs.AccessTimes(f.Name()) + if err != nil { + continue + } + + if h.reaper.Reap(key, lastRead, lastWrite) { + delete(c.files, key) + c.fs.Remove(f.Name()) + } + } +} + +func (h *reaperHaunter) Next() time.Duration { + return h.reaper.Next() +} diff --git a/janitor.go b/janitor.go index 4c0edea..0be9219 100644 --- a/janitor.go +++ b/janitor.go @@ -10,41 +10,43 @@ type janitorKV struct { Value fileStream } -type CacheStats struct { - // public fields / methods to provide info about the Cache state - // this can expose the extra info needed by Janitor to decide what to reap - cache *cache -} - -type StatsBasedReaper interface { - ReapUsingStats(ctx CacheStats) (keysToReap []string) -} - // Janitor is used to control when there are too many streams // or the size of the streams is too big. // It is called once right after loading, and then it is run -// again after every Next() Period of time. -// Janitor runs every "Period" and scrubs older files -// when the total file size is over MaxTotalFileSize or -// total item count is over MaxItems. -// If MaxItems or MaxTotalFileSize are 0, they won't be checked -type Janitor struct { - Period time.Duration - MaxItems int - MaxTotalFileSize int64 +// again after every Next() period of time. +type Janitor interface { + // Returns the amount of time to wait before the next scheduled Reaping. + Next() time.Duration + + // Given a key and the last r/w times of a file, return true + // to remove the file from the cache, false to keep it. + Scrub(c *cache) []string +} + +// NewJanitor returns a simple janitor which runs every "period" +// and scrubs older files when the total file size is over maxSize or +// total item count is over maxItems. +// If maxItems or maxSize are 0, they won't be checked +func NewJanitor(maxItems int, maxSize int64, period time.Duration) Janitor { + return &janitor{ + period: period, + maxItems: maxItems, + maxSize: maxSize, + } } -func (j *Janitor) Next() time.Duration { - return j.Period +type janitor struct { + period time.Duration + maxItems int + maxSize int64 } -func (j *Janitor) Reap(_ string, _, _ time.Time) bool { - // not implemented: should not get here - return false +func (j *janitor) Next() time.Duration { + return j.period } -func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { - if len(ctx.cache.files) == 0 { +func (j *janitor) Scrub(cache *cache) (keysToReap []string) { + if len(cache.files) == 0 { return } @@ -52,12 +54,12 @@ func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { var size int64 var okFiles []janitorKV - for k, f := range ctx.cache.files { + for k, f := range cache.files { if f.inUse() { continue } - fileSize, err := ctx.cache.fs.Size(f.Name()) + fileSize, err := cache.fs.Size(f.Name()) if err != nil { continue } @@ -71,12 +73,12 @@ func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { } sort.Slice(okFiles, func(i, l int) bool { - iLastRead, _, err := ctx.cache.fs.AccessTimes(okFiles[i].Value.Name()) + iLastRead, _, err := cache.fs.AccessTimes(okFiles[i].Value.Name()) if err != nil { return false } - jLastRead, _, err := ctx.cache.fs.AccessTimes(okFiles[l].Value.Name()) + jLastRead, _, err := cache.fs.AccessTimes(okFiles[l].Value.Name()) if err != nil { return false } @@ -84,11 +86,11 @@ func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { return iLastRead.Before(jLastRead) }) - if j.MaxItems > 0 { - for count > j.MaxItems { + if j.maxItems > 0 { + for count > j.maxItems { var key *string var err error - key, count, size, err = j.removeFirst(ctx.cache, &okFiles, count, size) + key, count, size, err = j.removeFirst(cache, &okFiles, count, size) if err != nil { break } @@ -98,11 +100,11 @@ func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { } } - if j.MaxTotalFileSize > 0 { - for size > j.MaxTotalFileSize { + if j.maxSize > 0 { + for size > j.maxSize { var key *string var err error - key, count, size, err = j.removeFirst(ctx.cache, &okFiles, count, size) + key, count, size, err = j.removeFirst(cache, &okFiles, count, size) if err != nil { break } @@ -115,7 +117,7 @@ func (j *Janitor) ReapUsingStats(ctx CacheStats) (keysToReap []string) { return keysToReap } -func (j *Janitor) removeFirst(c *cache, items *[]janitorKV, count int, size int64) (*string, int, int64, error) { +func (j *janitor) removeFirst(c *cache, items *[]janitorKV, count int, size int64) (*string, int, int64, error) { var f janitorKV f, *items = (*items)[0], (*items)[1:] From 5cdc7107497329ce2625f428413aa134cb759ff5 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Fri, 25 Jan 2019 18:06:07 +0300 Subject: [PATCH 06/23] Haunter refactoring. `CacheStats` interface replaced `cache` struct in haunters implementation --- fscache.go | 54 ++++++++++++++++++++++++++++++--------------- haunter.go | 43 +++++++++++++++++++----------------- janitor.go | 64 +++++++++++++++++++++++++++--------------------------- 3 files changed, 92 insertions(+), 69 deletions(-) diff --git a/fscache.go b/fscache.go index 65fe271..84fd6e7 100644 --- a/fscache.go +++ b/fscache.go @@ -34,10 +34,10 @@ type Cache interface { } type cache struct { - mu sync.RWMutex - files map[string]fileStream - fs FileSystem - hauntProvider Haunter + mu sync.RWMutex + files map[string]FileStream + fs FileSystem + haunter Haunter } // ReadAtCloser is an io.ReadCloser, and an io.ReaderAt. It supports both so that Range @@ -47,9 +47,9 @@ type ReadAtCloser interface { io.ReaderAt } -type fileStream interface { +type FileStream interface { next() (ReadAtCloser, error) - inUse() bool + InUse() bool io.WriteCloser Remove() error Name() string @@ -90,31 +90,31 @@ func NewCache(fs FileSystem, grim Reaper) (Cache, error) { // Haunter is used to determine when files expire, nil means never expire. func NewCacheWithHaunter(fs FileSystem, haunter Haunter) (Cache, error) { c := &cache{ - files: make(map[string]fileStream), - hauntProvider: haunter, - fs: fs, + files: make(map[string]FileStream), + haunter: haunter, + fs: fs, } err := c.load() if err != nil { return nil, err } if haunter != nil { - c.haunter() + c.scheduleHaunt() } return c, nil } -func (c *cache) haunter() { +func (c *cache) scheduleHaunt() { c.haunt() - time.AfterFunc(c.hauntProvider.Next(), c.haunter) + time.AfterFunc(c.haunter.Next(), c.scheduleHaunt) } func (c *cache) haunt() { c.mu.Lock() defer c.mu.Unlock() - c.hauntProvider.Haunt(c) + c.haunter.Haunt(c) } func (c *cache) load() error { @@ -183,16 +183,36 @@ func (c *cache) Remove(key string) error { func (c *cache) Clean() error { c.mu.Lock() defer c.mu.Unlock() - c.files = make(map[string]fileStream) + c.files = make(map[string]FileStream) return c.fs.RemoveAll() } +func (c *cache) FileSystem() FileSystem { + return c.fs +} + +func (c *cache) EnumerateFiles(enumerator func(key string, f FileStream) bool) { + for k, f := range c.files { + if !enumerator(k, f) { + break + } + } +} + +func (c *cache) RemoveFile(key string) { + f, ok := c.files[key] + delete(c.files, key) + if ok { + c.fs.Remove(f.Name()) + } +} + type cachedFile struct { stream *stream.Stream handleCounter } -func (c *cache) newFile(name string) (fileStream, error) { +func (c *cache) newFile(name string) (FileStream, error) { s, err := stream.NewStream(name, c.fs) if err != nil { return nil, err @@ -204,7 +224,7 @@ func (c *cache) newFile(name string) (fileStream, error) { return cf, nil } -func (c *cache) oldFile(name string) fileStream { +func (c *cache) oldFile(name string) FileStream { return &reloadedFile{ fs: c.fs, name: name, @@ -291,7 +311,7 @@ func (h *handleCounter) dec() { h.grp.Done() } -func (h *handleCounter) inUse() bool { +func (h *handleCounter) InUse() bool { return atomic.LoadInt64(&h.cnt) > 0 } diff --git a/haunter.go b/haunter.go index ba75ddc..5cc315d 100644 --- a/haunter.go +++ b/haunter.go @@ -5,8 +5,14 @@ import ( "time" ) +type CacheStats interface { + FileSystem() FileSystem + EnumerateFiles(enumerator func(key string, f FileStream) bool) + RemoveFile(key string) +} + type Haunter interface { - Haunt(c *cache) + Haunt(c CacheStats) Next() time.Duration } @@ -22,14 +28,14 @@ type compoundHaunter struct { haunters []Haunter } -// NewCompoundHaunter returns a compound haunter which provides a multi strategy implementation +// NewCompoundHaunter returns a compound scheduleHaunt which provides a multi strategy implementation func NewCompoundHaunter(haunters []Haunter) Haunter { return &compoundHaunter{ haunters: haunters, } } -func (h *compoundHaunter) Haunt(c *cache) { +func (h *compoundHaunter) Haunt(c CacheStats) { for _, haunter := range h.haunters { haunter.Haunt(c) } @@ -46,20 +52,16 @@ func (h *compoundHaunter) Next() time.Duration { return minPeriod } -// NewJanitorHaunter returns a simple haunter which provides an implementation Janitor strategy +// NewJanitorHaunter returns a simple scheduleHaunt which provides an implementation Janitor strategy func NewJanitorHaunter(janitor Janitor) Haunter { return &janitorHaunter{ janitor: janitor, } } -func (h *janitorHaunter) Haunt(c *cache) { +func (h *janitorHaunter) Haunt(c CacheStats) { for _, key := range h.janitor.Scrub(c) { - f, ok := c.files[key] - delete(c.files, key) - if ok { - c.fs.Remove(f.Name()) - } + c.RemoveFile(key) } } @@ -68,29 +70,30 @@ func (h *janitorHaunter) Next() time.Duration { return h.janitor.Next() } -// NewReaperHaunter returns a simple haunter which provides an implementation Reaper strategy +// NewReaperHaunter returns a simple scheduleHaunt which provides an implementation Reaper strategy func NewReaperHaunter(reaper Reaper) Haunter { return &reaperHaunter{ reaper: reaper, } } -func (h *reaperHaunter) Haunt(c *cache) { - for key, f := range c.files { - if f.inUse() { - continue +func (h *reaperHaunter) Haunt(c CacheStats) { + c.EnumerateFiles(func(key string, f FileStream) bool { + if f.InUse() { + return true } - lastRead, lastWrite, err := c.fs.AccessTimes(f.Name()) + lastRead, lastWrite, err := c.FileSystem().AccessTimes(f.Name()) if err != nil { - continue + return true } if h.reaper.Reap(key, lastRead, lastWrite) { - delete(c.files, key) - c.fs.Remove(f.Name()) + c.RemoveFile(key) } - } + + return true + }) } func (h *reaperHaunter) Next() time.Duration { diff --git a/janitor.go b/janitor.go index 0be9219..8eba6c6 100644 --- a/janitor.go +++ b/janitor.go @@ -7,7 +7,7 @@ import ( type janitorKV struct { Key string - Value fileStream + Value FileStream } // Janitor is used to control when there are too many streams @@ -20,7 +20,7 @@ type Janitor interface { // Given a key and the last r/w times of a file, return true // to remove the file from the cache, false to keep it. - Scrub(c *cache) []string + Scrub(c CacheStats) []string } // NewJanitor returns a simple janitor which runs every "period" @@ -45,40 +45,38 @@ func (j *janitor) Next() time.Duration { return j.period } -func (j *janitor) Scrub(cache *cache) (keysToReap []string) { - if len(cache.files) == 0 { - return - } - +func (j *janitor) Scrub(c CacheStats) (keysToReap []string) { var count int var size int64 var okFiles []janitorKV - for k, f := range cache.files { - if f.inUse() { - continue + c.EnumerateFiles(func(key string, f FileStream) bool { + if f.InUse() { + return true } - fileSize, err := cache.fs.Size(f.Name()) + fileSize, err := c.FileSystem().Size(f.Name()) if err != nil { - continue + return true } count++ size = size + fileSize okFiles = append(okFiles, janitorKV{ - Key: k, + Key: key, Value: f, }) - } + + return true + }) sort.Slice(okFiles, func(i, l int) bool { - iLastRead, _, err := cache.fs.AccessTimes(okFiles[i].Value.Name()) + iLastRead, _, err := c.FileSystem().AccessTimes(okFiles[i].Value.Name()) if err != nil { return false } - jLastRead, _, err := cache.fs.AccessTimes(okFiles[l].Value.Name()) + jLastRead, _, err := c.FileSystem().AccessTimes(okFiles[l].Value.Name()) if err != nil { return false } @@ -86,43 +84,45 @@ func (j *janitor) Scrub(cache *cache) (keysToReap []string) { return iLastRead.Before(jLastRead) }) + collectKeysToReapFn := func() bool { + var key *string + var err error + key, count, size, err = j.removeFirst(c, &okFiles, count, size) + if err != nil { + return false + } + if key != nil { + keysToReap = append(keysToReap, *key) + } + + return true + } + if j.maxItems > 0 { for count > j.maxItems { - var key *string - var err error - key, count, size, err = j.removeFirst(cache, &okFiles, count, size) - if err != nil { + if !collectKeysToReapFn() { break } - if key != nil { - keysToReap = append(keysToReap, *key) - } } } if j.maxSize > 0 { for size > j.maxSize { - var key *string - var err error - key, count, size, err = j.removeFirst(cache, &okFiles, count, size) - if err != nil { + if !collectKeysToReapFn() { break } - if key != nil { - keysToReap = append(keysToReap, *key) - } } } return keysToReap } -func (j *janitor) removeFirst(c *cache, items *[]janitorKV, count int, size int64) (*string, int, int64, error) { +func (j *janitor) removeFirst(c CacheStats, items *[]janitorKV, count int, size int64) (*string, int, int64, error) { var f janitorKV f, *items = (*items)[0], (*items)[1:] - fileSize, err := c.fs.Size(f.Value.Name()) + fileSize, err := c.FileSystem().Size(f.Value.Name()) if err != nil { return nil, count, size, err } From 59d0a8ff1fb4e598dec26d3a0b35ec734dbbca50 Mon Sep 17 00:00:00 2001 From: Alexey Malov Date: Fri, 25 Jan 2019 18:55:13 +0300 Subject: [PATCH 07/23] Reaper/Janitor functionality was refactored FileStream.remove() method was made private AccessTimes method was moved from FileSystem to FileStater cache implements FileSystemStater interface CacheStats was renamed into CacheAccessor --- fs.go | 13 +++++++------ fscache.go | 20 ++++++++++++-------- haunter.go | 14 +++++++------- janitor.go | 14 +++++++------- 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/fs.go b/fs.go index 88d52f2..7f324eb 100644 --- a/fs.go +++ b/fs.go @@ -18,7 +18,14 @@ import ( ) type FileSystemStater interface { + // Size takes a File.Name() and returns size of the file Size(name string) (int64, error) + + // AccessTimes takes a File.Name() and returns the last time the file was read, + // and the last time it was written to. + // It will be used to check expiry of a file, and must be concurrent safe + // with modifications to the FileSystem (writes, reads etc.) + AccessTimes(name string) (rt, wt time.Time, err error) } // FileSystem is used as the source for a Cache. @@ -34,12 +41,6 @@ type FileSystem interface { // RemoveAll should empty the FileSystem of all files. RemoveAll() error - - // AccessTimes takes a File.Name() and returns the last time the file was read, - // and the last time it was written to. - // It will be used to check expiry of a file, and must be concurrent safe - // with modifications to the FileSystem (writes, reads etc.) - AccessTimes(name string) (rt, wt time.Time, err error) } type stdFs struct { diff --git a/fscache.go b/fscache.go index 84fd6e7..8a4a1c9 100644 --- a/fscache.go +++ b/fscache.go @@ -19,9 +19,9 @@ type Cache interface { // Get can be called concurrently, and writing and reading is concurrent safe. Get(key string) (ReadAtCloser, io.WriteCloser, error) - // Remove deletes the stream from the cache, blocking until the underlying + // remove deletes the stream from the cache, blocking until the underlying // file can be deleted (all active streams finish with it). - // It is safe to call Remove concurrently with Get. + // It is safe to call remove concurrently with Get. Remove(key string) error // Exists checks if a key is in the cache. @@ -51,7 +51,7 @@ type FileStream interface { next() (ReadAtCloser, error) InUse() bool io.WriteCloser - Remove() error + remove() error Name() string } @@ -175,7 +175,7 @@ func (c *cache) Remove(key string) error { c.mu.Unlock() if ok { - return f.Remove() + return f.remove() } return nil } @@ -187,8 +187,12 @@ func (c *cache) Clean() error { return c.fs.RemoveAll() } -func (c *cache) FileSystem() FileSystem { - return c.fs +func (c *cache) Size(name string) (int64, error) { + return c.fs.Size(name) +} + +func (c *cache) AccessTimes(name string) (rt, wt time.Time, err error) { + return c.fs.AccessTimes(name) } func (c *cache) EnumerateFiles(enumerator func(key string, f FileStream) bool) { @@ -240,7 +244,7 @@ type reloadedFile struct { func (f *reloadedFile) Name() string { return f.name } -func (f *reloadedFile) Remove() error { +func (f *reloadedFile) remove() error { f.waitUntilFree() return f.fs.Remove(f.name) } @@ -255,7 +259,7 @@ func (f *reloadedFile) next() (r ReadAtCloser, err error) { func (f *cachedFile) Name() string { return f.stream.Name() } -func (f *cachedFile) Remove() error { return f.stream.Remove() } +func (f *cachedFile) remove() error { return f.stream.Remove() } func (f *cachedFile) next() (r ReadAtCloser, err error) { reader, err := f.stream.NextReader() diff --git a/haunter.go b/haunter.go index 5cc315d..c6970ef 100644 --- a/haunter.go +++ b/haunter.go @@ -5,14 +5,14 @@ import ( "time" ) -type CacheStats interface { - FileSystem() FileSystem +type CacheAccessor interface { + FileSystemStater EnumerateFiles(enumerator func(key string, f FileStream) bool) RemoveFile(key string) } type Haunter interface { - Haunt(c CacheStats) + Haunt(c CacheAccessor) Next() time.Duration } @@ -35,7 +35,7 @@ func NewCompoundHaunter(haunters []Haunter) Haunter { } } -func (h *compoundHaunter) Haunt(c CacheStats) { +func (h *compoundHaunter) Haunt(c CacheAccessor) { for _, haunter := range h.haunters { haunter.Haunt(c) } @@ -59,7 +59,7 @@ func NewJanitorHaunter(janitor Janitor) Haunter { } } -func (h *janitorHaunter) Haunt(c CacheStats) { +func (h *janitorHaunter) Haunt(c CacheAccessor) { for _, key := range h.janitor.Scrub(c) { c.RemoveFile(key) } @@ -77,13 +77,13 @@ func NewReaperHaunter(reaper Reaper) Haunter { } } -func (h *reaperHaunter) Haunt(c CacheStats) { +func (h *reaperHaunter) Haunt(c CacheAccessor) { c.EnumerateFiles(func(key string, f FileStream) bool { if f.InUse() { return true } - lastRead, lastWrite, err := c.FileSystem().AccessTimes(f.Name()) + lastRead, lastWrite, err := c.AccessTimes(f.Name()) if err != nil { return true } diff --git a/janitor.go b/janitor.go index 8eba6c6..afbc108 100644 --- a/janitor.go +++ b/janitor.go @@ -20,7 +20,7 @@ type Janitor interface { // Given a key and the last r/w times of a file, return true // to remove the file from the cache, false to keep it. - Scrub(c CacheStats) []string + Scrub(c CacheAccessor) []string } // NewJanitor returns a simple janitor which runs every "period" @@ -45,7 +45,7 @@ func (j *janitor) Next() time.Duration { return j.period } -func (j *janitor) Scrub(c CacheStats) (keysToReap []string) { +func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { var count int var size int64 var okFiles []janitorKV @@ -55,7 +55,7 @@ func (j *janitor) Scrub(c CacheStats) (keysToReap []string) { return true } - fileSize, err := c.FileSystem().Size(f.Name()) + fileSize, err := c.Size(f.Name()) if err != nil { return true } @@ -71,12 +71,12 @@ func (j *janitor) Scrub(c CacheStats) (keysToReap []string) { }) sort.Slice(okFiles, func(i, l int) bool { - iLastRead, _, err := c.FileSystem().AccessTimes(okFiles[i].Value.Name()) + iLastRead, _, err := c.AccessTimes(okFiles[i].Value.Name()) if err != nil { return false } - jLastRead, _, err := c.FileSystem().AccessTimes(okFiles[l].Value.Name()) + jLastRead, _, err := c.AccessTimes(okFiles[l].Value.Name()) if err != nil { return false } @@ -117,12 +117,12 @@ func (j *janitor) Scrub(c CacheStats) (keysToReap []string) { return keysToReap } -func (j *janitor) removeFirst(c CacheStats, items *[]janitorKV, count int, size int64) (*string, int, int64, error) { +func (j *janitor) removeFirst(fsStater FileSystemStater, items *[]janitorKV, count int, size int64) (*string, int, int64, error) { var f janitorKV f, *items = (*items)[0], (*items)[1:] - fileSize, err := c.FileSystem().Size(f.Value.Name()) + fileSize, err := fsStater.Size(f.Value.Name()) if err != nil { return nil, count, size, err } From 90b77ff5934ad4273737e8e548331a4f7b21fa8a Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Mon, 28 Jan 2019 10:32:15 +0300 Subject: [PATCH 08/23] Fixed some methods comments --- fscache.go | 2 +- janitor.go | 3 +-- reaper.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/fscache.go b/fscache.go index 8a4a1c9..63d44f4 100644 --- a/fscache.go +++ b/fscache.go @@ -19,7 +19,7 @@ type Cache interface { // Get can be called concurrently, and writing and reading is concurrent safe. Get(key string) (ReadAtCloser, io.WriteCloser, error) - // remove deletes the stream from the cache, blocking until the underlying + // Remove deletes the stream from the cache, blocking until the underlying // file can be deleted (all active streams finish with it). // It is safe to call remove concurrently with Get. Remove(key string) error diff --git a/janitor.go b/janitor.go index afbc108..95ac378 100644 --- a/janitor.go +++ b/janitor.go @@ -18,8 +18,7 @@ type Janitor interface { // Returns the amount of time to wait before the next scheduled Reaping. Next() time.Duration - // Given a key and the last r/w times of a file, return true - // to remove the file from the cache, false to keep it. + // Given a CacheAccessor, return keys to reap list. Scrub(c CacheAccessor) []string } diff --git a/reaper.go b/reaper.go index 2a6b7b0..d801202 100644 --- a/reaper.go +++ b/reaper.go @@ -4,7 +4,7 @@ import "time" // Reaper is used to control when streams expire from the cache. // It is called once right after loading, and then it is run -// again after every Next() Period of time. +// again after every Next() period of time. type Reaper interface { // Returns the amount of time to wait before the next scheduled Reaping. Next() time.Duration From cbb319197c5442ec6db9d9fe9de7b2581d1182d9 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Mon, 28 Jan 2019 11:09:21 +0300 Subject: [PATCH 09/23] CompoundHaunter was removed --- haunter.go | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/haunter.go b/haunter.go index c6970ef..711b2b8 100644 --- a/haunter.go +++ b/haunter.go @@ -1,7 +1,6 @@ package fscache import ( - "math" "time" ) @@ -24,34 +23,6 @@ type janitorHaunter struct { janitor Janitor } -type compoundHaunter struct { - haunters []Haunter -} - -// NewCompoundHaunter returns a compound scheduleHaunt which provides a multi strategy implementation -func NewCompoundHaunter(haunters []Haunter) Haunter { - return &compoundHaunter{ - haunters: haunters, - } -} - -func (h *compoundHaunter) Haunt(c CacheAccessor) { - for _, haunter := range h.haunters { - haunter.Haunt(c) - } -} - -func (h *compoundHaunter) Next() time.Duration { - minPeriod := time.Duration(math.MaxInt64) - for _, haunter := range h.haunters { - if period := haunter.Next(); period < minPeriod { - minPeriod = period - } - } - - return minPeriod -} - // NewJanitorHaunter returns a simple scheduleHaunt which provides an implementation Janitor strategy func NewJanitorHaunter(janitor Janitor) Haunter { return &janitorHaunter{ From 80175082d95458093d505cd52204b24c58a0db84 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Mon, 28 Jan 2019 11:33:55 +0300 Subject: [PATCH 10/23] `CacheAccessor.Enumerate...` now works with the abstract Entry --- fscache.go | 2 +- haunter.go | 13 +++++++++---- janitor.go | 10 +++++----- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/fscache.go b/fscache.go index 63d44f4..034f1b7 100644 --- a/fscache.go +++ b/fscache.go @@ -195,7 +195,7 @@ func (c *cache) AccessTimes(name string) (rt, wt time.Time, err error) { return c.fs.AccessTimes(name) } -func (c *cache) EnumerateFiles(enumerator func(key string, f FileStream) bool) { +func (c *cache) EnumerateEntries(enumerator func(key string, e Entry) bool) { for k, f := range c.files { if !enumerator(k, f) { break diff --git a/haunter.go b/haunter.go index 711b2b8..2e59a42 100644 --- a/haunter.go +++ b/haunter.go @@ -4,9 +4,14 @@ import ( "time" ) +type Entry interface { + InUse() bool + Name() string +} + type CacheAccessor interface { FileSystemStater - EnumerateFiles(enumerator func(key string, f FileStream) bool) + EnumerateEntries(enumerator func(key string, e Entry) bool) RemoveFile(key string) } @@ -49,12 +54,12 @@ func NewReaperHaunter(reaper Reaper) Haunter { } func (h *reaperHaunter) Haunt(c CacheAccessor) { - c.EnumerateFiles(func(key string, f FileStream) bool { - if f.InUse() { + c.EnumerateEntries(func(key string, e Entry) bool { + if e.InUse() { return true } - lastRead, lastWrite, err := c.AccessTimes(f.Name()) + lastRead, lastWrite, err := c.AccessTimes(e.Name()) if err != nil { return true } diff --git a/janitor.go b/janitor.go index 95ac378..477abb5 100644 --- a/janitor.go +++ b/janitor.go @@ -7,7 +7,7 @@ import ( type janitorKV struct { Key string - Value FileStream + Value Entry } // Janitor is used to control when there are too many streams @@ -49,12 +49,12 @@ func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { var size int64 var okFiles []janitorKV - c.EnumerateFiles(func(key string, f FileStream) bool { - if f.InUse() { + c.EnumerateEntries(func(key string, e Entry) bool { + if e.InUse() { return true } - fileSize, err := c.Size(f.Name()) + fileSize, err := c.Size(e.Name()) if err != nil { return true } @@ -63,7 +63,7 @@ func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { size = size + fileSize okFiles = append(okFiles, janitorKV{ Key: key, - Value: f, + Value: e, }) return true From 681503fbebb3b76b0f3805ab5cdc630e0107d8e6 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Mon, 28 Jan 2019 13:13:36 +0300 Subject: [PATCH 11/23] FileInfo interface implemented --- fs.go | 34 ++++++++++++++++++++++++++++++---- fscache.go | 8 ++------ haunter.go | 7 ++++++- janitor.go | 22 ++++++++++++++++------ memfs.go | 49 +++++++++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 101 insertions(+), 19 deletions(-) diff --git a/fs.go b/fs.go index 7f324eb..cbf32b9 100644 --- a/fs.go +++ b/fs.go @@ -18,14 +18,18 @@ import ( ) type FileSystemStater interface { - // Size takes a File.Name() and returns size of the file - Size(name string) (int64, error) + // Stat takes a File.Name() and returns FileInfo interface + Stat(name string) (FileInfo, error) +} + +type FileInfo interface { + os.FileInfo - // AccessTimes takes a File.Name() and returns the last time the file was read, + // AccessTimes returns the last time the file was read, // and the last time it was written to. // It will be used to check expiry of a file, and must be concurrent safe // with modifications to the FileSystem (writes, reads etc.) - AccessTimes(name string) (rt, wt time.Time, err error) + AccessTimes() (rt, wt time.Time, err error) } // FileSystem is used as the source for a Cache. @@ -48,6 +52,19 @@ type stdFs struct { init func() error } +type fileInfo struct { + name string + os.FileInfo +} + +func (f *fileInfo) AccessTimes() (rt, wt time.Time, err error) { + fi, err := os.Stat(f.name) + if err != nil { + return rt, wt, err + } + return atime.Get(fi), fi.ModTime(), nil +} + // NewFs returns a FileSystem rooted at directory dir. // Dir is created with perms if it doesn't exist. func NewFs(dir string, mode os.FileMode) (FileSystem, error) { @@ -153,6 +170,15 @@ func (fs *stdFs) AccessTimes(name string) (rt, wt time.Time, err error) { return atime.Get(fi), fi.ModTime(), nil } +func (fs *stdFs) Stat(name string) (FileInfo, error) { + stat, err := os.Stat(name) + if err != nil { + return nil, err + } + + return &fileInfo{name, stat}, nil +} + const ( saltSize = 8 maxShort = 20 diff --git a/fscache.go b/fscache.go index 034f1b7..f195202 100644 --- a/fscache.go +++ b/fscache.go @@ -187,12 +187,8 @@ func (c *cache) Clean() error { return c.fs.RemoveAll() } -func (c *cache) Size(name string) (int64, error) { - return c.fs.Size(name) -} - -func (c *cache) AccessTimes(name string) (rt, wt time.Time, err error) { - return c.fs.AccessTimes(name) +func (c *cache) Stat(name string) (FileInfo, error) { + return c.fs.Stat(name) } func (c *cache) EnumerateEntries(enumerator func(key string, e Entry) bool) { diff --git a/haunter.go b/haunter.go index 2e59a42..1a3688c 100644 --- a/haunter.go +++ b/haunter.go @@ -59,7 +59,12 @@ func (h *reaperHaunter) Haunt(c CacheAccessor) { return true } - lastRead, lastWrite, err := c.AccessTimes(e.Name()) + fileInfo, err := c.Stat(e.Name()) + if err != nil { + return true + } + + lastRead, lastWrite, err := fileInfo.AccessTimes() if err != nil { return true } diff --git a/janitor.go b/janitor.go index 477abb5..4fc7183 100644 --- a/janitor.go +++ b/janitor.go @@ -54,13 +54,13 @@ func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { return true } - fileSize, err := c.Size(e.Name()) + fileInfo, err := c.Stat(e.Name()) if err != nil { return true } count++ - size = size + fileSize + size = size + fileInfo.Size() okFiles = append(okFiles, janitorKV{ Key: key, Value: e, @@ -70,12 +70,22 @@ func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { }) sort.Slice(okFiles, func(i, l int) bool { - iLastRead, _, err := c.AccessTimes(okFiles[i].Value.Name()) + iFileInfo, err := c.Stat(okFiles[i].Value.Name()) if err != nil { return false } - jLastRead, _, err := c.AccessTimes(okFiles[l].Value.Name()) + iLastRead, _, err := iFileInfo.AccessTimes() + if err != nil { + return false + } + + jFileInfo, err := c.Stat(okFiles[i].Value.Name()) + if err != nil { + return false + } + + jLastRead, _, err := jFileInfo.AccessTimes() if err != nil { return false } @@ -121,13 +131,13 @@ func (j *janitor) removeFirst(fsStater FileSystemStater, items *[]janitorKV, cou f, *items = (*items)[0], (*items)[1:] - fileSize, err := fsStater.Size(f.Value.Name()) + fileInfo, err := fsStater.Stat(f.Value.Name()) if err != nil { return nil, count, size, err } count-- - size = size - fileSize + size = size - fileInfo.Size() return &f.Key, count, size, nil } diff --git a/memfs.go b/memfs.go index dbf3070..8834eec 100644 --- a/memfs.go +++ b/memfs.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "io" + "os" "sync" "time" @@ -15,6 +16,41 @@ type memFS struct { files map[string]*memFile } +type memFileInfo struct { + name string + size int64 + fs *memFS +} + +func (f *memFileInfo) Name() string { + return f.name +} + +func (f *memFileInfo) Size() int64 { + return f.size +} + +func (f *memFileInfo) Mode() os.FileMode { + return os.ModeIrregular +} + +func (f *memFileInfo) ModTime() time.Time { + _, wt, _ := f.fs.accessTimes(f.Name()) + return wt +} + +func (f *memFileInfo) IsDir() bool { + return false +} + +func (f *memFileInfo) Sys() interface{} { + return nil +} + +func (f *memFileInfo) AccessTimes() (rt, wt time.Time, err error) { + return f.fs.accessTimes(f.Name()) +} + // NewMemFs creates an in-memory FileSystem. // It does not support persistence (Reload is a nop). func NewMemFs() FileSystem { @@ -23,7 +59,16 @@ func NewMemFs() FileSystem { } } -func (fs *memFS) Size(name string) (int64, error) { +func (fs *memFS) Stat(name string) (FileInfo, error) { + size, err := fs.size(name) + if err != nil { + return nil, err + } + + return &memFileInfo{name: name, size: size, fs: fs}, nil +} + +func (fs *memFS) size(name string) (int64, error) { fs.mu.RLock() defer fs.mu.RUnlock() f, ok := fs.files[name] @@ -37,7 +82,7 @@ func (fs *memFS) Reload(add func(key, name string)) error { return nil } -func (fs *memFS) AccessTimes(name string) (rt, wt time.Time, err error) { +func (fs *memFS) accessTimes(name string) (rt, wt time.Time, err error) { fs.mu.RLock() defer fs.mu.RUnlock() f, ok := fs.files[name] From 475be9eab5021febfdd23ce88c77e51f0ee60564 Mon Sep 17 00:00:00 2001 From: Alexey Malov Date: Mon, 28 Jan 2019 13:55:39 +0300 Subject: [PATCH 12/23] FileStream interface was made private --- fscache.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/fscache.go b/fscache.go index f195202..918e809 100644 --- a/fscache.go +++ b/fscache.go @@ -35,7 +35,7 @@ type Cache interface { type cache struct { mu sync.RWMutex - files map[string]FileStream + files map[string]fileStream fs FileSystem haunter Haunter } @@ -47,7 +47,7 @@ type ReadAtCloser interface { io.ReaderAt } -type FileStream interface { +type fileStream interface { next() (ReadAtCloser, error) InUse() bool io.WriteCloser @@ -90,7 +90,7 @@ func NewCache(fs FileSystem, grim Reaper) (Cache, error) { // Haunter is used to determine when files expire, nil means never expire. func NewCacheWithHaunter(fs FileSystem, haunter Haunter) (Cache, error) { c := &cache{ - files: make(map[string]FileStream), + files: make(map[string]fileStream), haunter: haunter, fs: fs, } @@ -183,7 +183,7 @@ func (c *cache) Remove(key string) error { func (c *cache) Clean() error { c.mu.Lock() defer c.mu.Unlock() - c.files = make(map[string]FileStream) + c.files = make(map[string]fileStream) return c.fs.RemoveAll() } @@ -212,7 +212,7 @@ type cachedFile struct { handleCounter } -func (c *cache) newFile(name string) (FileStream, error) { +func (c *cache) newFile(name string) (fileStream, error) { s, err := stream.NewStream(name, c.fs) if err != nil { return nil, err @@ -224,7 +224,7 @@ func (c *cache) newFile(name string) (FileStream, error) { return cf, nil } -func (c *cache) oldFile(name string) FileStream { +func (c *cache) oldFile(name string) fileStream { return &reloadedFile{ fs: c.fs, name: name, From 546d631dacae924fcf73653bbee50762e50b2f81 Mon Sep 17 00:00:00 2001 From: Alexey Malov Date: Mon, 28 Jan 2019 13:57:35 +0300 Subject: [PATCH 13/23] File entries sorting was fixed --- janitor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/janitor.go b/janitor.go index 4fc7183..154b2e9 100644 --- a/janitor.go +++ b/janitor.go @@ -69,7 +69,7 @@ func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { return true }) - sort.Slice(okFiles, func(i, l int) bool { + sort.Slice(okFiles, func(i, j int) bool { iFileInfo, err := c.Stat(okFiles[i].Value.Name()) if err != nil { return false @@ -80,7 +80,7 @@ func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { return false } - jFileInfo, err := c.Stat(okFiles[i].Value.Name()) + jFileInfo, err := c.Stat(okFiles[j].Value.Name()) if err != nil { return false } From b14383f4c41952ebcd1cb5209d0c5d00c4d1b863 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Wed, 30 Jan 2019 11:24:50 +0300 Subject: [PATCH 14/23] Janitor renamed to LRUHaunter --- fscache.go | 5 ++--- fscache_test.go | 8 ++++---- haunter.go | 32 ++++++++++++++++---------------- janitor.go => lruhaunter.go | 26 +++++++++++++------------- 4 files changed, 35 insertions(+), 36 deletions(-) rename janitor.go => lruhaunter.go (76%) diff --git a/fscache.go b/fscache.go index 918e809..7746da3 100644 --- a/fscache.go +++ b/fscache.go @@ -77,12 +77,11 @@ func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) { // fs.Files() are loaded using the name they were created with as a key. // Reaper is used to determine when files expire, nil means never expire. func NewCache(fs FileSystem, grim Reaper) (Cache, error) { - var haunter Haunter if grim != nil { - haunter = NewReaperHaunter(grim) + return NewCacheWithHaunter(fs, NewReaperHaunterStrategy(grim)) } - return NewCacheWithHaunter(fs, haunter) + return NewCacheWithHaunter(fs, nil) } // NewCacheWithHaunter create a new Cache based on FileSystem fs. diff --git a/fscache_test.go b/fscache_test.go index f5b9ee7..c83bcfa 100644 --- a/fscache_test.go +++ b/fscache_test.go @@ -211,7 +211,7 @@ func TestReload(t *testing.T) { } } -func TestJanitorMaxItems(t *testing.T) { +func TestLRUHaunterMaxItems(t *testing.T) { fs, err := NewFs("./cache1", 0700) if err != nil { @@ -219,7 +219,7 @@ func TestJanitorMaxItems(t *testing.T) { t.FailNow() } - c, err := NewCacheWithHaunter(fs, NewJanitorHaunter(NewJanitor(3, 0, 400*time.Millisecond))) + c, err := NewCacheWithHaunter(fs, NewLRUHaunterStrategy(NewLRUHaunter(3, 0, 400*time.Millisecond))) if err != nil { t.Error(err.Error()) @@ -267,7 +267,7 @@ func TestJanitorMaxItems(t *testing.T) { } } -func TestJanitorMaxSize(t *testing.T) { +func TestLRUHaunterMaxSize(t *testing.T) { fs, err := NewFs("./cache1", 0700) if err != nil { @@ -275,7 +275,7 @@ func TestJanitorMaxSize(t *testing.T) { t.FailNow() } - c, err := NewCacheWithHaunter(fs, NewJanitorHaunter(NewJanitor(0, 24, 400*time.Millisecond))) + c, err := NewCacheWithHaunter(fs, NewLRUHaunterStrategy(NewLRUHaunter(0, 24, 400*time.Millisecond))) if err != nil { t.Error(err.Error()) diff --git a/haunter.go b/haunter.go index 1a3688c..00d713f 100644 --- a/haunter.go +++ b/haunter.go @@ -20,40 +20,40 @@ type Haunter interface { Next() time.Duration } -type reaperHaunter struct { +type reaperHaunterStrategy struct { reaper Reaper } -type janitorHaunter struct { - janitor Janitor +type lruHaunterStrategy struct { + haunter LRUHaunter } -// NewJanitorHaunter returns a simple scheduleHaunt which provides an implementation Janitor strategy -func NewJanitorHaunter(janitor Janitor) Haunter { - return &janitorHaunter{ - janitor: janitor, +// NewLRUHaunterStrategy returns a simple scheduleHaunt which provides an implementation LRUHaunter strategy +func NewLRUHaunterStrategy(haunter LRUHaunter) Haunter { + return &lruHaunterStrategy{ + haunter: haunter, } } -func (h *janitorHaunter) Haunt(c CacheAccessor) { - for _, key := range h.janitor.Scrub(c) { +func (h *lruHaunterStrategy) Haunt(c CacheAccessor) { + for _, key := range h.haunter.Scrub(c) { c.RemoveFile(key) } } -func (h *janitorHaunter) Next() time.Duration { - return h.janitor.Next() +func (h *lruHaunterStrategy) Next() time.Duration { + return h.haunter.Next() } -// NewReaperHaunter returns a simple scheduleHaunt which provides an implementation Reaper strategy -func NewReaperHaunter(reaper Reaper) Haunter { - return &reaperHaunter{ +// NewReaperHaunterStrategy returns a simple scheduleHaunt which provides an implementation Reaper strategy +func NewReaperHaunterStrategy(reaper Reaper) Haunter { + return &reaperHaunterStrategy{ reaper: reaper, } } -func (h *reaperHaunter) Haunt(c CacheAccessor) { +func (h *reaperHaunterStrategy) Haunt(c CacheAccessor) { c.EnumerateEntries(func(key string, e Entry) bool { if e.InUse() { return true @@ -77,6 +77,6 @@ func (h *reaperHaunter) Haunt(c CacheAccessor) { }) } -func (h *reaperHaunter) Next() time.Duration { +func (h *reaperHaunterStrategy) Next() time.Duration { return h.reaper.Next() } diff --git a/janitor.go b/lruhaunter.go similarity index 76% rename from janitor.go rename to lruhaunter.go index 154b2e9..eeab3b3 100644 --- a/janitor.go +++ b/lruhaunter.go @@ -5,16 +5,16 @@ import ( "time" ) -type janitorKV struct { +type lruHaunterKV struct { Key string Value Entry } -// Janitor is used to control when there are too many streams +// LRUHaunter is used to control when there are too many streams // or the size of the streams is too big. // It is called once right after loading, and then it is run // again after every Next() period of time. -type Janitor interface { +type LRUHaunter interface { // Returns the amount of time to wait before the next scheduled Reaping. Next() time.Duration @@ -22,32 +22,32 @@ type Janitor interface { Scrub(c CacheAccessor) []string } -// NewJanitor returns a simple janitor which runs every "period" +// NewLRUHaunter returns a simple haunter which runs every "period" // and scrubs older files when the total file size is over maxSize or // total item count is over maxItems. // If maxItems or maxSize are 0, they won't be checked -func NewJanitor(maxItems int, maxSize int64, period time.Duration) Janitor { - return &janitor{ +func NewLRUHaunter(maxItems int, maxSize int64, period time.Duration) LRUHaunter { + return &lruHaunter{ period: period, maxItems: maxItems, maxSize: maxSize, } } -type janitor struct { +type lruHaunter struct { period time.Duration maxItems int maxSize int64 } -func (j *janitor) Next() time.Duration { +func (j *lruHaunter) Next() time.Duration { return j.period } -func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { +func (j *lruHaunter) Scrub(c CacheAccessor) (keysToReap []string) { var count int var size int64 - var okFiles []janitorKV + var okFiles []lruHaunterKV c.EnumerateEntries(func(key string, e Entry) bool { if e.InUse() { @@ -61,7 +61,7 @@ func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { count++ size = size + fileInfo.Size() - okFiles = append(okFiles, janitorKV{ + okFiles = append(okFiles, lruHaunterKV{ Key: key, Value: e, }) @@ -126,8 +126,8 @@ func (j *janitor) Scrub(c CacheAccessor) (keysToReap []string) { return keysToReap } -func (j *janitor) removeFirst(fsStater FileSystemStater, items *[]janitorKV, count int, size int64) (*string, int, int64, error) { - var f janitorKV +func (j *lruHaunter) removeFirst(fsStater FileSystemStater, items *[]lruHaunterKV, count int, size int64) (*string, int, int64, error) { + var f lruHaunterKV f, *items = (*items)[0], (*items)[1:] From 2adb1fa9bf8bdf694aaf296b6865ef5075b49212 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Wed, 30 Jan 2019 11:25:49 +0300 Subject: [PATCH 15/23] `Remove` method comment fixed --- fscache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fscache.go b/fscache.go index 7746da3..644c3f0 100644 --- a/fscache.go +++ b/fscache.go @@ -21,7 +21,7 @@ type Cache interface { // Remove deletes the stream from the cache, blocking until the underlying // file can be deleted (all active streams finish with it). - // It is safe to call remove concurrently with Get. + // It is safe to call Remove concurrently with Get. Remove(key string) error // Exists checks if a key is in the cache. From 9228c18ee372130857464bbd506263931cbcfc08 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Wed, 30 Jan 2019 11:27:24 +0300 Subject: [PATCH 16/23] `AccessTime` method of stdFs is simplified --- fs.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/fs.go b/fs.go index cbf32b9..762ef1d 100644 --- a/fs.go +++ b/fs.go @@ -58,11 +58,7 @@ type fileInfo struct { } func (f *fileInfo) AccessTimes() (rt, wt time.Time, err error) { - fi, err := os.Stat(f.name) - if err != nil { - return rt, wt, err - } - return atime.Get(fi), fi.ModTime(), nil + return atime.Get(f.FileInfo), f.FileInfo.ModTime(), nil } // NewFs returns a FileSystem rooted at directory dir. From 9fe7685e54aed469d1fff6ce509de26591d6ba9b Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Wed, 30 Jan 2019 11:29:22 +0300 Subject: [PATCH 17/23] Removed redundant `Size` method --- fs.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/fs.go b/fs.go index 762ef1d..92e15e8 100644 --- a/fs.go +++ b/fs.go @@ -70,14 +70,6 @@ func NewFs(dir string, mode os.FileMode) (FileSystem, error) { return fs, fs.init() } -func (fs *stdFs) Size(name string) (int64, error) { - stat, err := os.Stat(name) - if err == nil { - return stat.Size(), nil - } - return 0, err -} - func (fs *stdFs) Reload(add func(key, name string)) error { files, err := ioutil.ReadDir(fs.root) if err != nil { From 5327f6695a90489c5ccbb962a487695d9c26e004 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Wed, 30 Jan 2019 11:50:12 +0300 Subject: [PATCH 18/23] Removed `memFileInfo` dependency from `memFs` --- fs.go | 6 +++--- haunter.go | 5 +---- lruhaunter.go | 10 ++-------- memfs.go | 31 +++++++++++++------------------ 4 files changed, 19 insertions(+), 33 deletions(-) diff --git a/fs.go b/fs.go index 92e15e8..1d0529a 100644 --- a/fs.go +++ b/fs.go @@ -29,7 +29,7 @@ type FileInfo interface { // and the last time it was written to. // It will be used to check expiry of a file, and must be concurrent safe // with modifications to the FileSystem (writes, reads etc.) - AccessTimes() (rt, wt time.Time, err error) + AccessTimes() (rt, wt time.Time) } // FileSystem is used as the source for a Cache. @@ -57,8 +57,8 @@ type fileInfo struct { os.FileInfo } -func (f *fileInfo) AccessTimes() (rt, wt time.Time, err error) { - return atime.Get(f.FileInfo), f.FileInfo.ModTime(), nil +func (f *fileInfo) AccessTimes() (rt, wt time.Time) { + return atime.Get(f.FileInfo), f.FileInfo.ModTime() } // NewFs returns a FileSystem rooted at directory dir. diff --git a/haunter.go b/haunter.go index 00d713f..9dd3b09 100644 --- a/haunter.go +++ b/haunter.go @@ -64,10 +64,7 @@ func (h *reaperHaunterStrategy) Haunt(c CacheAccessor) { return true } - lastRead, lastWrite, err := fileInfo.AccessTimes() - if err != nil { - return true - } + lastRead, lastWrite := fileInfo.AccessTimes() if h.reaper.Reap(key, lastRead, lastWrite) { c.RemoveFile(key) diff --git a/lruhaunter.go b/lruhaunter.go index eeab3b3..1eb1c41 100644 --- a/lruhaunter.go +++ b/lruhaunter.go @@ -75,20 +75,14 @@ func (j *lruHaunter) Scrub(c CacheAccessor) (keysToReap []string) { return false } - iLastRead, _, err := iFileInfo.AccessTimes() - if err != nil { - return false - } + iLastRead, _ := iFileInfo.AccessTimes() jFileInfo, err := c.Stat(okFiles[j].Value.Name()) if err != nil { return false } - jLastRead, _, err := jFileInfo.AccessTimes() - if err != nil { - return false - } + jLastRead, _ := jFileInfo.AccessTimes() return iLastRead.Before(jLastRead) }) diff --git a/memfs.go b/memfs.go index 8834eec..f984ed3 100644 --- a/memfs.go +++ b/memfs.go @@ -19,7 +19,8 @@ type memFS struct { type memFileInfo struct { name string size int64 - fs *memFS + rt time.Time + wt time.Time } func (f *memFileInfo) Name() string { @@ -35,8 +36,7 @@ func (f *memFileInfo) Mode() os.FileMode { } func (f *memFileInfo) ModTime() time.Time { - _, wt, _ := f.fs.accessTimes(f.Name()) - return wt + return f.wt } func (f *memFileInfo) IsDir() bool { @@ -47,8 +47,8 @@ func (f *memFileInfo) Sys() interface{} { return nil } -func (f *memFileInfo) AccessTimes() (rt, wt time.Time, err error) { - return f.fs.accessTimes(f.Name()) +func (f *memFileInfo) AccessTimes() (rt, wt time.Time) { + return f.rt, f.wt } // NewMemFs creates an in-memory FileSystem. @@ -60,17 +60,22 @@ func NewMemFs() FileSystem { } func (fs *memFS) Stat(name string) (FileInfo, error) { + fs.mu.RLock() + defer fs.mu.RUnlock() + f, ok := fs.files[name] + if !ok { + return nil, errors.New("file has not been read") + } + size, err := fs.size(name) if err != nil { return nil, err } - return &memFileInfo{name: name, size: size, fs: fs}, nil + return &memFileInfo{name: name, size: size, rt: f.rt, wt: f.wt}, nil } func (fs *memFS) size(name string) (int64, error) { - fs.mu.RLock() - defer fs.mu.RUnlock() f, ok := fs.files[name] if ok { return int64(len(f.Bytes())), nil @@ -82,16 +87,6 @@ func (fs *memFS) Reload(add func(key, name string)) error { return nil } -func (fs *memFS) accessTimes(name string) (rt, wt time.Time, err error) { - fs.mu.RLock() - defer fs.mu.RUnlock() - f, ok := fs.files[name] - if ok { - return f.rt, f.wt, nil - } - return rt, wt, errors.New("file has not been read") -} - func (fs *memFS) Create(key string) (stream.File, error) { fs.mu.Lock() defer fs.mu.Unlock() From 048eb314cc9311015def557a2d0876c50d802f0e Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Wed, 30 Jan 2019 12:55:09 +0300 Subject: [PATCH 19/23] `FileInfo` became a static structure --- fileinfo.go | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ fs.go | 31 ++++++++++--------------------- memfs.go | 49 +++++++++++-------------------------------------- 3 files changed, 69 insertions(+), 59 deletions(-) create mode 100644 fileinfo.go diff --git a/fileinfo.go b/fileinfo.go new file mode 100644 index 0000000..937d84e --- /dev/null +++ b/fileinfo.go @@ -0,0 +1,48 @@ +package fscache + +import ( + "os" + "time" +) + +type FileInfo struct { + name string + size int64 + fileMode os.FileMode + isDir bool + sys interface{} + rt time.Time + wt time.Time +} + +func (f *FileInfo) Name() string { + return f.name +} + +func (f *FileInfo) Size() int64 { + return f.size +} + +func (f *FileInfo) Mode() os.FileMode { + return f.fileMode +} + +func (f *FileInfo) ModTime() time.Time { + return f.wt +} + +func (f *FileInfo) IsDir() bool { + return f.isDir +} + +func (f *FileInfo) Sys() interface{} { + return f.sys +} + +// AccessTimes returns the last time the file was read, +// and the last time it was written to. +// It will be used to check expiry of a file, and must be concurrent safe +// with modifications to the FileSystem (writes, reads etc.) +func (f *FileInfo) AccessTimes() (rt, wt time.Time) { + return f.rt, f.wt +} diff --git a/fs.go b/fs.go index 1d0529a..0e0a5dc 100644 --- a/fs.go +++ b/fs.go @@ -22,16 +22,6 @@ type FileSystemStater interface { Stat(name string) (FileInfo, error) } -type FileInfo interface { - os.FileInfo - - // AccessTimes returns the last time the file was read, - // and the last time it was written to. - // It will be used to check expiry of a file, and must be concurrent safe - // with modifications to the FileSystem (writes, reads etc.) - AccessTimes() (rt, wt time.Time) -} - // FileSystem is used as the source for a Cache. type FileSystem interface { // Stream FileSystem @@ -52,15 +42,6 @@ type stdFs struct { init func() error } -type fileInfo struct { - name string - os.FileInfo -} - -func (f *fileInfo) AccessTimes() (rt, wt time.Time) { - return atime.Get(f.FileInfo), f.FileInfo.ModTime() -} - // NewFs returns a FileSystem rooted at directory dir. // Dir is created with perms if it doesn't exist. func NewFs(dir string, mode os.FileMode) (FileSystem, error) { @@ -161,10 +142,18 @@ func (fs *stdFs) AccessTimes(name string) (rt, wt time.Time, err error) { func (fs *stdFs) Stat(name string) (FileInfo, error) { stat, err := os.Stat(name) if err != nil { - return nil, err + return FileInfo{}, err } - return &fileInfo{name, stat}, nil + return FileInfo{ + name: name, + size: stat.Size(), + fileMode: stat.Mode(), + isDir: stat.IsDir(), + sys: stat.Sys(), + rt: atime.Get(stat), + wt: stat.ModTime(), + }, nil } const ( diff --git a/memfs.go b/memfs.go index f984ed3..26dbae9 100644 --- a/memfs.go +++ b/memfs.go @@ -16,41 +16,6 @@ type memFS struct { files map[string]*memFile } -type memFileInfo struct { - name string - size int64 - rt time.Time - wt time.Time -} - -func (f *memFileInfo) Name() string { - return f.name -} - -func (f *memFileInfo) Size() int64 { - return f.size -} - -func (f *memFileInfo) Mode() os.FileMode { - return os.ModeIrregular -} - -func (f *memFileInfo) ModTime() time.Time { - return f.wt -} - -func (f *memFileInfo) IsDir() bool { - return false -} - -func (f *memFileInfo) Sys() interface{} { - return nil -} - -func (f *memFileInfo) AccessTimes() (rt, wt time.Time) { - return f.rt, f.wt -} - // NewMemFs creates an in-memory FileSystem. // It does not support persistence (Reload is a nop). func NewMemFs() FileSystem { @@ -64,15 +29,23 @@ func (fs *memFS) Stat(name string) (FileInfo, error) { defer fs.mu.RUnlock() f, ok := fs.files[name] if !ok { - return nil, errors.New("file has not been read") + return FileInfo{}, errors.New("file has not been read") } size, err := fs.size(name) if err != nil { - return nil, err + return FileInfo{}, err } - return &memFileInfo{name: name, size: size, rt: f.rt, wt: f.wt}, nil + return FileInfo{ + name: name, + size: size, + fileMode: os.ModeIrregular, + isDir: false, + sys: nil, + rt: f.rt, + wt: f.wt, + }, nil } func (fs *memFS) size(name string) (int64, error) { From def7c928787b7abf54184a633ab5f4302d90dd65 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Wed, 30 Jan 2019 13:10:20 +0300 Subject: [PATCH 20/23] `Entry` became a static structure --- fscache.go | 2 +- haunter.go | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/fscache.go b/fscache.go index 644c3f0..b21874f 100644 --- a/fscache.go +++ b/fscache.go @@ -192,7 +192,7 @@ func (c *cache) Stat(name string) (FileInfo, error) { func (c *cache) EnumerateEntries(enumerator func(key string, e Entry) bool) { for k, f := range c.files { - if !enumerator(k, f) { + if !enumerator(k, Entry{name: f.Name(), inUse: f.InUse()}) { break } } diff --git a/haunter.go b/haunter.go index 9dd3b09..3d0b9b6 100644 --- a/haunter.go +++ b/haunter.go @@ -4,9 +4,17 @@ import ( "time" ) -type Entry interface { - InUse() bool - Name() string +type Entry struct { + name string + inUse bool +} + +func (e *Entry) InUse() bool { + return e.inUse +} + +func (e *Entry) Name() string { + return e.name } type CacheAccessor interface { From ae8dffc12dfbb3d682388e834f04b85624e0fb11 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Fri, 1 Feb 2019 09:51:33 +0300 Subject: [PATCH 21/23] Remove redudant `memFs.size` method --- memfs.go | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/memfs.go b/memfs.go index 26dbae9..1294669 100644 --- a/memfs.go +++ b/memfs.go @@ -32,10 +32,7 @@ func (fs *memFS) Stat(name string) (FileInfo, error) { return FileInfo{}, errors.New("file has not been read") } - size, err := fs.size(name) - if err != nil { - return FileInfo{}, err - } + size := int64(len(f.Bytes())) return FileInfo{ name: name, @@ -48,14 +45,6 @@ func (fs *memFS) Stat(name string) (FileInfo, error) { }, nil } -func (fs *memFS) size(name string) (int64, error) { - f, ok := fs.files[name] - if ok { - return int64(len(f.Bytes())), nil - } - return 0, errors.New("file has not been read") -} - func (fs *memFS) Reload(add func(key, name string)) error { return nil } From 3db2a43e29d485840c9ea55bd2256330e7bdeebe Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Fri, 1 Feb 2019 10:12:02 +0300 Subject: [PATCH 22/23] `FileInfo` made more friendly --- fileinfo.go | 25 ++++++++++++++----------- fs.go | 16 +++++++++------- haunter.go | 4 +--- lruhaunter.go | 4 ++-- memfs.go | 16 +++++++++------- 5 files changed, 35 insertions(+), 30 deletions(-) diff --git a/fileinfo.go b/fileinfo.go index 937d84e..3374123 100644 --- a/fileinfo.go +++ b/fileinfo.go @@ -6,43 +6,46 @@ import ( ) type FileInfo struct { + os.FileInfo + Atime time.Time +} + +type fileInfo struct { name string size int64 fileMode os.FileMode isDir bool sys interface{} - rt time.Time wt time.Time } -func (f *FileInfo) Name() string { +func (f *fileInfo) Name() string { return f.name } -func (f *FileInfo) Size() int64 { +func (f *fileInfo) Size() int64 { return f.size } -func (f *FileInfo) Mode() os.FileMode { +func (f *fileInfo) Mode() os.FileMode { return f.fileMode } -func (f *FileInfo) ModTime() time.Time { +func (f *fileInfo) ModTime() time.Time { return f.wt } -func (f *FileInfo) IsDir() bool { +func (f *fileInfo) IsDir() bool { return f.isDir } -func (f *FileInfo) Sys() interface{} { +func (f *fileInfo) Sys() interface{} { return f.sys } -// AccessTimes returns the last time the file was read, -// and the last time it was written to. +// AccessTime returns the last time the file was read. // It will be used to check expiry of a file, and must be concurrent safe // with modifications to the FileSystem (writes, reads etc.) -func (f *FileInfo) AccessTimes() (rt, wt time.Time) { - return f.rt, f.wt +func (f *FileInfo) AccessTime() time.Time { + return f.Atime } diff --git a/fs.go b/fs.go index 0e0a5dc..f7bd099 100644 --- a/fs.go +++ b/fs.go @@ -146,13 +146,15 @@ func (fs *stdFs) Stat(name string) (FileInfo, error) { } return FileInfo{ - name: name, - size: stat.Size(), - fileMode: stat.Mode(), - isDir: stat.IsDir(), - sys: stat.Sys(), - rt: atime.Get(stat), - wt: stat.ModTime(), + FileInfo: &fileInfo{ + name: name, + size: stat.Size(), + fileMode: stat.Mode(), + isDir: stat.IsDir(), + sys: stat.Sys(), + wt: stat.ModTime(), + }, + Atime: atime.Get(stat), }, nil } diff --git a/haunter.go b/haunter.go index 3d0b9b6..7d404fa 100644 --- a/haunter.go +++ b/haunter.go @@ -72,9 +72,7 @@ func (h *reaperHaunterStrategy) Haunt(c CacheAccessor) { return true } - lastRead, lastWrite := fileInfo.AccessTimes() - - if h.reaper.Reap(key, lastRead, lastWrite) { + if h.reaper.Reap(key, fileInfo.AccessTime(), fileInfo.ModTime()) { c.RemoveFile(key) } diff --git a/lruhaunter.go b/lruhaunter.go index 1eb1c41..7b90ef3 100644 --- a/lruhaunter.go +++ b/lruhaunter.go @@ -75,14 +75,14 @@ func (j *lruHaunter) Scrub(c CacheAccessor) (keysToReap []string) { return false } - iLastRead, _ := iFileInfo.AccessTimes() + iLastRead := iFileInfo.AccessTime() jFileInfo, err := c.Stat(okFiles[j].Value.Name()) if err != nil { return false } - jLastRead, _ := jFileInfo.AccessTimes() + jLastRead := jFileInfo.AccessTime() return iLastRead.Before(jLastRead) }) diff --git a/memfs.go b/memfs.go index 1294669..11c9d89 100644 --- a/memfs.go +++ b/memfs.go @@ -35,13 +35,15 @@ func (fs *memFS) Stat(name string) (FileInfo, error) { size := int64(len(f.Bytes())) return FileInfo{ - name: name, - size: size, - fileMode: os.ModeIrregular, - isDir: false, - sys: nil, - rt: f.rt, - wt: f.wt, + FileInfo: &fileInfo{ + name: name, + size: size, + fileMode: os.ModeIrregular, + isDir: false, + sys: nil, + wt: f.wt, + }, + Atime: f.rt, }, nil } From db4a42329152e33b698033e1d9b1198ac6031198 Mon Sep 17 00:00:00 2001 From: Egor Chernukha Date: Sat, 2 Feb 2019 12:48:55 +0300 Subject: [PATCH 23/23] `fileInfo` replaced on `os.Stat` in `stdFs` impl --- fs.go | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/fs.go b/fs.go index f7bd099..8e36785 100644 --- a/fs.go +++ b/fs.go @@ -145,17 +145,7 @@ func (fs *stdFs) Stat(name string) (FileInfo, error) { return FileInfo{}, err } - return FileInfo{ - FileInfo: &fileInfo{ - name: name, - size: stat.Size(), - fileMode: stat.Mode(), - isDir: stat.IsDir(), - sys: stat.Sys(), - wt: stat.ModTime(), - }, - Atime: atime.Get(stat), - }, nil + return FileInfo{FileInfo: stat, Atime: atime.Get(stat)}, nil } const (