Skip to content

Commit

Permalink
Merge pull request #14 from wolfrace/cache_limits_v2
Browse files Browse the repository at this point in the history
Cache limits with LRU v2
  • Loading branch information
djherbis authored Feb 2, 2019
2 parents 6681963 + db4a423 commit 287ea9b
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 52 deletions.
51 changes: 51 additions & 0 deletions fileinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fscache

import (
"os"
"time"
)

type FileInfo struct {
os.FileInfo
Atime time.Time
}

type fileInfo struct {
name string
size int64
fileMode os.FileMode
isDir bool
sys interface{}
wt time.Time
}

func (f *fileInfo) Name() string {
return f.name
}

func (f *fileInfo) Size() int64 {
return f.size
}

func (f *fileInfo) Mode() os.FileMode {
return f.fileMode
}

func (f *fileInfo) ModTime() time.Time {
return f.wt
}

func (f *fileInfo) IsDir() bool {
return f.isDir
}

func (f *fileInfo) Sys() interface{} {
return f.sys
}

// AccessTime returns the last time the file was read.
// It will be used to check expiry of a file, and must be concurrent safe
// with modifications to the FileSystem (writes, reads etc.)
func (f *FileInfo) AccessTime() time.Time {
return f.Atime
}
24 changes: 17 additions & 7 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@ import (
"gopkg.in/djherbis/stream.v1"
)

type FileSystemStater interface {
// Stat takes a File.Name() and returns FileInfo interface
Stat(name string) (FileInfo, error)
}

// FileSystem is used as the source for a Cache.
type FileSystem interface {
// Stream FileSystem
stream.FileSystem

// Reload should look through the FileSystem and call the suplied fn
FileSystemStater

// Reload should look through the FileSystem and call the supplied fn
// with the key/filename pairs that are found.
Reload(func(key, name string)) error

// RemoveAll should empty the FileSystem of all files.
RemoveAll() error

// AccessTimes takes a File.Name() and returns the last time the file was read,
// and the last time it was written to.
// It will be used to check expiry of a file, and must be concurrent safe
// with modifications to the FileSystem (writes, reads etc.)
AccessTimes(name string) (rt, wt time.Time, err error)
}

type stdFs struct {
Expand Down Expand Up @@ -138,6 +139,15 @@ func (fs *stdFs) AccessTimes(name string) (rt, wt time.Time, err error) {
return atime.Get(fi), fi.ModTime(), nil
}

func (fs *stdFs) Stat(name string) (FileInfo, error) {
stat, err := os.Stat(name)
if err != nil {
return FileInfo{}, err
}

return FileInfo{FileInfo: stat, Atime: atime.Get(stat)}, nil
}

const (
saltSize = 8
maxShort = 20
Expand Down
86 changes: 51 additions & 35 deletions fscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"sync"
"sync/atomic"
"time"

"gopkg.in/djherbis/stream.v1"
)

// Cache works like a concurrent-safe map for streams.
type Cache interface {

// Get manages access to the streams in the cache.
// If the key does not exist, w != nil and you can start writing to the stream.
// If the key does exist, w == nil.
Expand All @@ -35,10 +34,10 @@ type Cache interface {
}

type cache struct {
mu sync.RWMutex
files map[string]fileStream
grim Reaper
fs FileSystem
mu sync.RWMutex
files map[string]fileStream
fs FileSystem
haunter Haunter
}

// ReadAtCloser is an io.ReadCloser, and an io.ReaderAt. It supports both so that Range
Expand All @@ -50,9 +49,9 @@ type ReadAtCloser interface {

type fileStream interface {
next() (ReadAtCloser, error)
inUse() bool
InUse() bool
io.WriteCloser
Remove() error
remove() error
Name() string
}

Expand All @@ -78,46 +77,43 @@ func New(dir string, perms os.FileMode, expiry time.Duration) (Cache, error) {
// fs.Files() are loaded using the name they were created with as a key.
// Reaper is used to determine when files expire, nil means never expire.
func NewCache(fs FileSystem, grim Reaper) (Cache, error) {
if grim != nil {
return NewCacheWithHaunter(fs, NewReaperHaunterStrategy(grim))
}

return NewCacheWithHaunter(fs, nil)
}

// NewCacheWithHaunter create a new Cache based on FileSystem fs.
// fs.Files() are loaded using the name they were created with as a key.
// Haunter is used to determine when files expire, nil means never expire.
func NewCacheWithHaunter(fs FileSystem, haunter Haunter) (Cache, error) {
c := &cache{
files: make(map[string]fileStream),
grim: grim,
fs: fs,
files: make(map[string]fileStream),
haunter: haunter,
fs: fs,
}
err := c.load()
if err != nil {
return nil, err
}
if grim != nil {
c.haunter()
if haunter != nil {
c.scheduleHaunt()
}

return c, nil
}

func (c *cache) haunter() {
func (c *cache) scheduleHaunt() {
c.haunt()
time.AfterFunc(c.grim.Next(), c.haunter)
time.AfterFunc(c.haunter.Next(), c.scheduleHaunt)
}

func (c *cache) haunt() {
c.mu.Lock()
defer c.mu.Unlock()

for key, f := range c.files {
if f.inUse() {
continue
}

lastRead, lastWrite, err := c.fs.AccessTimes(f.Name())
if err != nil {
continue
}

if c.grim.Reap(key, lastRead, lastWrite) {
delete(c.files, key)
c.fs.Remove(f.Name())
}
}
return
c.haunter.Haunt(c)
}

func (c *cache) load() error {
Expand Down Expand Up @@ -178,7 +174,7 @@ func (c *cache) Remove(key string) error {
c.mu.Unlock()

if ok {
return f.Remove()
return f.remove()
}
return nil
}
Expand All @@ -190,6 +186,26 @@ func (c *cache) Clean() error {
return c.fs.RemoveAll()
}

func (c *cache) Stat(name string) (FileInfo, error) {
return c.fs.Stat(name)
}

func (c *cache) EnumerateEntries(enumerator func(key string, e Entry) bool) {
for k, f := range c.files {
if !enumerator(k, Entry{name: f.Name(), inUse: f.InUse()}) {
break
}
}
}

func (c *cache) RemoveFile(key string) {
f, ok := c.files[key]
delete(c.files, key)
if ok {
c.fs.Remove(f.Name())
}
}

type cachedFile struct {
stream *stream.Stream
handleCounter
Expand Down Expand Up @@ -223,7 +239,7 @@ type reloadedFile struct {

func (f *reloadedFile) Name() string { return f.name }

func (f *reloadedFile) Remove() error {
func (f *reloadedFile) remove() error {
f.waitUntilFree()
return f.fs.Remove(f.name)
}
Expand All @@ -238,7 +254,7 @@ func (f *reloadedFile) next() (r ReadAtCloser, err error) {

func (f *cachedFile) Name() string { return f.stream.Name() }

func (f *cachedFile) Remove() error { return f.stream.Remove() }
func (f *cachedFile) remove() error { return f.stream.Remove() }

func (f *cachedFile) next() (r ReadAtCloser, err error) {
reader, err := f.stream.NextReader()
Expand Down Expand Up @@ -294,7 +310,7 @@ func (h *handleCounter) dec() {
h.grp.Done()
}

func (h *handleCounter) inUse() bool {
func (h *handleCounter) InUse() bool {
return atomic.LoadInt64(&h.cnt) > 0
}

Expand Down
110 changes: 109 additions & 1 deletion fscache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,114 @@ func TestReload(t *testing.T) {
}
}

func TestLRUHaunterMaxItems(t *testing.T) {

fs, err := NewFs("./cache1", 0700)
if err != nil {
t.Error(err.Error())
t.FailNow()
}

c, err := NewCacheWithHaunter(fs, NewLRUHaunterStrategy(NewLRUHaunter(3, 0, 400*time.Millisecond)))

if err != nil {
t.Error(err.Error())
return
}
defer c.Clean()

for i := 0; i < 5; i++ {
name := fmt.Sprintf("stream-%v", i)
r, w, _ := c.Get(name)
w.Write([]byte("hello"))
w.Close()
io.Copy(ioutil.Discard, r)

if !c.Exists(name) {
t.Errorf(name + " should exist")
}

<-time.After(10 * time.Millisecond)

err := r.Close()
if err != nil {
t.Error(err)
}
}

<-time.After(400 * time.Millisecond)

if c.Exists("stream-0") {
t.Errorf("stream-0 should have been scrubbed")
}

if c.Exists("stream-1") {
t.Errorf("stream-1 should have been scrubbed")
}

files, err := ioutil.ReadDir("./cache1")
if err != nil {
t.Error(err.Error())
return
}

if len(files) != 3 {
t.Errorf("expected 3 items in directory")
}
}

func TestLRUHaunterMaxSize(t *testing.T) {

fs, err := NewFs("./cache1", 0700)
if err != nil {
t.Error(err.Error())
t.FailNow()
}

c, err := NewCacheWithHaunter(fs, NewLRUHaunterStrategy(NewLRUHaunter(0, 24, 400*time.Millisecond)))

if err != nil {
t.Error(err.Error())
return
}
defer c.Clean()

for i := 0; i < 5; i++ {
name := fmt.Sprintf("stream-%v", i)
r, w, _ := c.Get(name)
w.Write([]byte("hello"))
w.Close()
io.Copy(ioutil.Discard, r)

if !c.Exists(name) {
t.Errorf(name + " should exist")
}

<-time.After(10 * time.Millisecond)

err := r.Close()
if err != nil {
t.Error(err)
}
}

<-time.After(400 * time.Millisecond)

if c.Exists("stream-0") {
t.Errorf("stream-0 should have been scrubbed")
}

files, err := ioutil.ReadDir("./cache1")
if err != nil {
t.Error(err.Error())
return
}

if len(files) != 4 {
t.Errorf("expected 4 items in directory")
}
}

func TestReaper(t *testing.T) {
fs, err := NewFs("./cache1", 0700)
if err != nil {
Expand Down Expand Up @@ -376,4 +484,4 @@ func check(t *testing.T, r io.Reader, data string) {
if !bytes.Equal(buf.Bytes(), []byte(data)) {
t.Errorf("unexpected output %s", buf.Bytes())
}
}
}
Loading

0 comments on commit 287ea9b

Please sign in to comment.