From dfce853067ec574be5311207da43d485711e0ca7 Mon Sep 17 00:00:00 2001 From: Tiago Peczenyj Date: Sun, 5 Nov 2023 19:31:04 +0100 Subject: [PATCH] Skip fs cache based on config (#1644) * add cache manager struce * refactor cache by adding interface * generalize ctor * implement feature add unit tests * fix code * rename fs field as filesystem --- fs.go | 287 +++++++++++++++++++++++++++++++++++------------------ fs_test.go | 113 +++++++++++++++++++-- 2 files changed, 298 insertions(+), 102 deletions(-) diff --git a/fs.go b/fs.go index 4d8c1c1ebf..9ada758843 100644 --- a/fs.go +++ b/fs.go @@ -326,6 +326,11 @@ type FS struct { // "Cannot open requested path" PathNotFound RequestHandler + // SkipCache if true, will cache no file handler. + // + // By default is false. + SkipCache bool + // Expiration duration for inactive file handlers. // // FSHandlerCacheDuration is used by default. @@ -453,11 +458,6 @@ func (fs *FS) initRequestHandler() { compressRoot = fs.normalizeRoot(compressRoot) } - cacheDuration := fs.CacheDuration - if cacheDuration <= 0 { - cacheDuration = FSHandlerCacheDuration - } - compressedFileSuffixes := fs.CompressedFileSuffixes if len(compressedFileSuffixes["br"]) == 0 || len(compressedFileSuffixes["gzip"]) == 0 || compressedFileSuffixes["br"] == compressedFileSuffixes["gzip"] { @@ -474,7 +474,7 @@ func (fs *FS) initRequestHandler() { } h := &fsHandler{ - fs: fs.FS, + filesystem: fs.FS, root: root, indexNames: fs.IndexNames, pathRewrite: fs.PathRewrite, @@ -484,50 +484,20 @@ func (fs *FS) initRequestHandler() { compressRoot: compressRoot, pathNotFound: fs.PathNotFound, acceptByteRange: fs.AcceptByteRange, - cacheDuration: cacheDuration, compressedFileSuffixes: compressedFileSuffixes, - cache: make(map[string]*fsFile), - cacheBrotli: make(map[string]*fsFile), - cacheGzip: make(map[string]*fsFile), } - if h.fs == nil { - h.fs = &osFS{} // It provides os.Open and os.Stat - } - - go func() { - var pendingFiles []*fsFile - - clean := func() { - pendingFiles = h.cleanCache(pendingFiles) - } + h.cacheManager = newCacheManager(fs) - if fs.CleanStop != nil { - t := time.NewTicker(cacheDuration / 2) - for { - select { - case <-t.C: - clean() - case _, stillOpen := <-fs.CleanStop: - // Ignore values send on the channel, only stop when it is closed. - if !stillOpen { - t.Stop() - return - } - } - } - } - for { - time.Sleep(cacheDuration / 2) - clean() - } - }() + if h.filesystem == nil { + h.filesystem = &osFS{} // It provides os.Open and os.Stat + } fs.h = h.handleRequest } type fsHandler struct { - fs fs.FS + filesystem fs.FS root string indexNames []string pathRewrite PathRewriteFunc @@ -537,13 +507,9 @@ type fsHandler struct { compressBrotli bool compressRoot string acceptByteRange bool - cacheDuration time.Duration compressedFileSuffixes map[string]string - cache map[string]*fsFile - cacheBrotli map[string]*fsFile - cacheGzip map[string]*fsFile - cacheLock sync.Mutex + cacheManager cacheManager smallFileReaderPool sync.Pool } @@ -596,7 +562,7 @@ func (ff *fsFile) smallFileReader() (io.Reader, error) { const maxSmallFileSize = 2 * 4096 func (ff *fsFile) isBig() bool { - if _, ok := ff.h.fs.(*osFS); !ok { // fs.FS only uses bigFileReader, memory cache uses fsSmallFileReader + if _, ok := ff.h.filesystem.(*osFS); !ok { // fs.FS only uses bigFileReader, memory cache uses fsSmallFileReader return ff.f != nil } return ff.contentLength > maxSmallFileSize && len(ff.dirIndex) == 0 @@ -621,7 +587,7 @@ func (ff *fsFile) bigFileReader() (io.Reader, error) { return r, nil } - f, err := ff.h.fs.Open(ff.filename) + f, err := ff.h.filesystem.Open(ff.filename) if err != nil { return nil, fmt.Errorf("cannot open already opened file: %w", err) } @@ -647,12 +613,12 @@ func (ff *fsFile) Release() { } func (ff *fsFile) decReadersCount() { - ff.h.cacheLock.Lock() - ff.readersCount-- - if ff.readersCount < 0 { - ff.readersCount = 0 - } - ff.h.cacheLock.Unlock() + ff.h.cacheManager.WithLock(func() { + ff.readersCount-- + if ff.readersCount < 0 { + ff.readersCount = 0 + } + }) } // bigFileReader attempts to trigger sendfile @@ -811,10 +777,164 @@ func (r *fsSmallFileReader) WriteTo(w io.Writer) (int64, error) { return int64(curPos - r.startPos), err } -func (h *fsHandler) cleanCache(pendingFiles []*fsFile) []*fsFile { +type cacheManager interface { + WithLock(work func()) + GetFileFromCache(cacheKind CacheKind, path string) (*fsFile, bool) + SetFileToCache(cacheKind CacheKind, path string, ff *fsFile) *fsFile +} + +var ( + _ cacheManager = (*inMemoryCacheManager)(nil) + _ cacheManager = (*noopCacheManager)(nil) +) + +type CacheKind uint8 + +const ( + defaultCacheKind CacheKind = iota + brotliCacheKind + gzipCacheKind +) + +func newCacheManager(fs *FS) cacheManager { + if fs.SkipCache { + return &noopCacheManager{} + } + + cacheDuration := fs.CacheDuration + if cacheDuration <= 0 { + cacheDuration = FSHandlerCacheDuration + } + + instance := &inMemoryCacheManager{ + cacheDuration: cacheDuration, + cache: make(map[string]*fsFile), + cacheBrotli: make(map[string]*fsFile), + cacheGzip: make(map[string]*fsFile), + } + + go instance.handleCleanCache(fs.CleanStop) + + return instance +} + +type noopCacheManager struct { + cacheLock sync.Mutex +} + +func (n *noopCacheManager) WithLock(work func()) { + n.cacheLock.Lock() + + work() + + n.cacheLock.Unlock() +} + +func (*noopCacheManager) GetFileFromCache(cacheKind CacheKind, path string) (*fsFile, bool) { + return nil, false +} + +func (*noopCacheManager) SetFileToCache(cacheKind CacheKind, path string, ff *fsFile) *fsFile { + return ff +} + +type inMemoryCacheManager struct { + cacheDuration time.Duration + cache map[string]*fsFile + cacheBrotli map[string]*fsFile + cacheGzip map[string]*fsFile + cacheLock sync.Mutex +} + +func (cm *inMemoryCacheManager) WithLock(work func()) { + cm.cacheLock.Lock() + + work() + + cm.cacheLock.Unlock() +} + +func (cm *inMemoryCacheManager) getFsCache(cacheKind CacheKind) map[string]*fsFile { + fileCache := cm.cache + switch cacheKind { + case brotliCacheKind: + fileCache = cm.cacheBrotli + case gzipCacheKind: + fileCache = cm.cacheGzip + } + + return fileCache +} + +func (cm *inMemoryCacheManager) GetFileFromCache(cacheKind CacheKind, path string) (*fsFile, bool) { + fileCache := cm.getFsCache(cacheKind) + + cm.cacheLock.Lock() + ff, ok := fileCache[string(path)] + if ok { + ff.readersCount++ + } + cm.cacheLock.Unlock() + + return ff, ok +} + +func (cm *inMemoryCacheManager) SetFileToCache(cacheKind CacheKind, path string, ff *fsFile) *fsFile { + fileCache := cm.getFsCache(cacheKind) + + cm.cacheLock.Lock() + ff1, ok := fileCache[path] + if !ok { + fileCache[path] = ff + ff.readersCount++ + } else { + ff1.readersCount++ + } + cm.cacheLock.Unlock() + + if ok { + // The file has been already opened by another + // goroutine, so close the current file and use + // the file opened by another goroutine instead. + ff.Release() + ff = ff1 + } + + return ff +} + +func (cm *inMemoryCacheManager) handleCleanCache(cleanStop chan struct{}) { + var pendingFiles []*fsFile + + clean := func() { + pendingFiles = cm.cleanCache(pendingFiles) + } + + if cleanStop != nil { + t := time.NewTicker(cm.cacheDuration / 2) + for { + select { + case <-t.C: + clean() + case _, stillOpen := <-cleanStop: + // Ignore values send on the channel, only stop when it is closed. + if !stillOpen { + t.Stop() + return + } + } + } + } + for { + time.Sleep(cm.cacheDuration / 2) + clean() + } +} + +func (cm *inMemoryCacheManager) cleanCache(pendingFiles []*fsFile) []*fsFile { var filesToRelease []*fsFile - h.cacheLock.Lock() + cm.cacheLock.Lock() // Close files which couldn't be closed before due to non-zero // readers count on the previous run. @@ -828,11 +948,11 @@ func (h *fsHandler) cleanCache(pendingFiles []*fsFile) []*fsFile { } pendingFiles = remainingFiles - pendingFiles, filesToRelease = cleanCacheNolock(h.cache, pendingFiles, filesToRelease, h.cacheDuration) - pendingFiles, filesToRelease = cleanCacheNolock(h.cacheBrotli, pendingFiles, filesToRelease, h.cacheDuration) - pendingFiles, filesToRelease = cleanCacheNolock(h.cacheGzip, pendingFiles, filesToRelease, h.cacheDuration) + pendingFiles, filesToRelease = cleanCacheNolock(cm.cache, pendingFiles, filesToRelease, cm.cacheDuration) + pendingFiles, filesToRelease = cleanCacheNolock(cm.cacheBrotli, pendingFiles, filesToRelease, cm.cacheDuration) + pendingFiles, filesToRelease = cleanCacheNolock(cm.cacheGzip, pendingFiles, filesToRelease, cm.cacheDuration) - h.cacheLock.Unlock() + cm.cacheLock.Unlock() for _, ff := range filesToRelease { ff.Release() @@ -860,7 +980,7 @@ func cleanCacheNolock(cache map[string]*fsFile, pendingFiles, filesToRelease []* } func (h *fsHandler) pathToFilePath(path string) string { - if _, ok := h.fs.(*osFS); !ok { + if _, ok := h.filesystem.(*osFS); !ok { if len(path) < 1 { return path } @@ -906,30 +1026,25 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { } mustCompress := false - fileCache := h.cache + fileCacheKind := defaultCacheKind fileEncoding := "" byteRange := ctx.Request.Header.peek(strRange) if len(byteRange) == 0 && h.compress { if h.compressBrotli && ctx.Request.Header.HasAcceptEncodingBytes(strBr) { mustCompress = true - fileCache = h.cacheBrotli + fileCacheKind = brotliCacheKind fileEncoding = "br" } else if ctx.Request.Header.HasAcceptEncodingBytes(strGzip) { mustCompress = true - fileCache = h.cacheGzip + fileCacheKind = gzipCacheKind fileEncoding = "gzip" } } - h.cacheLock.Lock() - ff, ok := fileCache[string(path)] - if ok { - ff.readersCount++ - } - h.cacheLock.Unlock() + pathStr := string(path) + ff, ok := h.cacheManager.GetFileFromCache(fileCacheKind, pathStr) if !ok { - pathStr := string(path) filePath := h.pathToFilePath(pathStr) var err error @@ -962,23 +1077,7 @@ func (h *fsHandler) handleRequest(ctx *RequestCtx) { return } - h.cacheLock.Lock() - ff1, ok := fileCache[pathStr] - if !ok { - fileCache[pathStr] = ff - ff.readersCount++ - } else { - ff1.readersCount++ - } - h.cacheLock.Unlock() - - if ok { - // The file has been already opened by another - // goroutine, so close the current file and use - // the file opened by another goroutine instead. - ff.Release() - ff = ff1 - } + ff = h.cacheManager.SetFileToCache(fileCacheKind, pathStr, ff) } if !ctx.IfModifiedSince(ff.lastModified) { @@ -1153,7 +1252,7 @@ func (h *fsHandler) createDirIndex(ctx *RequestCtx, dirPath string, mustCompress _, _ = fmt.Fprintf(w, `
  • ..
  • `, parentPathEscaped) } - dirEntries, err := fs.ReadDir(h.fs, dirPath) + dirEntries, err := fs.ReadDir(h.filesystem, dirPath) if err != nil { return nil, err } @@ -1233,7 +1332,7 @@ const ( ) func (h *fsHandler) compressAndOpenFSFile(filePath string, fileEncoding string) (*fsFile, error) { - f, err := h.fs.Open(filePath) + f, err := h.filesystem.Open(filePath) if err != nil { return nil, err } @@ -1257,7 +1356,7 @@ func (h *fsHandler) compressAndOpenFSFile(filePath string, fileEncoding string) compressedFilePath := h.filePathToCompressed(filePath) - if _, ok := h.fs.(*osFS); !ok { + if _, ok := h.filesystem.(*osFS); !ok { return h.newCompressedFSFileCache(f, fileInfo, compressedFilePath, fileEncoding) } @@ -1397,7 +1496,7 @@ func (h *fsHandler) newCompressedFSFileCache(f fs.File, fileInfo fs.FileInfo, fi } func (h *fsHandler) newCompressedFSFile(filePath string, fileEncoding string) (*fsFile, error) { - f, err := h.fs.Open(filePath) + f, err := h.filesystem.Open(filePath) if err != nil { return nil, fmt.Errorf("cannot open compressed file %q: %w", filePath, err) } @@ -1415,7 +1514,7 @@ func (h *fsHandler) openFSFile(filePath string, mustCompress bool, fileEncoding filePath += h.compressedFileSuffixes[fileEncoding] } - f, err := h.fs.Open(filePath) + f, err := h.filesystem.Open(filePath) if err != nil { if mustCompress && errors.Is(err, fs.ErrNotExist) { return h.compressAndOpenFSFile(filePathOriginal, fileEncoding) @@ -1439,7 +1538,7 @@ func (h *fsHandler) openFSFile(filePath string, mustCompress bool, fileEncoding } if mustCompress { - fileInfoOriginal, err := fs.Stat(h.fs, filePathOriginal) + fileInfoOriginal, err := fs.Stat(h.filesystem, filePathOriginal) if err != nil { _ = f.Close() return nil, fmt.Errorf("cannot obtain info for original file %q: %w", filePathOriginal, err) diff --git a/fs_test.go b/fs_test.go index e38b1401eb..12cf8a956c 100644 --- a/fs_test.go +++ b/fs_test.go @@ -66,6 +66,8 @@ func TestNewVHostPathRewriterMaliciousHost(t *testing.T) { } func testPathNotFound(t *testing.T, pathNotFoundFunc RequestHandler) { + t.Helper() + var ctx RequestCtx var req Request req.SetRequestURI("http//some.url/file") @@ -302,11 +304,30 @@ func TestFSByteRangeConcurrent(t *testing.T) { stop := make(chan struct{}) defer close(stop) - fs := &FS{ + runFSByteRangeConcurrent(t, &FS{ Root: ".", AcceptByteRange: true, CleanStop: stop, - } + }) +} + +func TestFSByteRangeConcurrentSkipCache(t *testing.T) { + // This test can't run parallel as files in / might be changed by other tests. + + stop := make(chan struct{}) + defer close(stop) + + runFSByteRangeConcurrent(t, &FS{ + Root: ".", + SkipCache: true, + AcceptByteRange: true, + CleanStop: stop, + }) +} + +func runFSByteRangeConcurrent(t *testing.T, fs *FS) { + t.Helper() + h := fs.NewRequestHandler() concurrency := 10 @@ -336,11 +357,30 @@ func TestFSByteRangeSingleThread(t *testing.T) { stop := make(chan struct{}) defer close(stop) - fs := &FS{ + runFSByteRangeSingleThread(t, &FS{ Root: ".", AcceptByteRange: true, CleanStop: stop, - } + }) +} + +func TestFSByteRangeSingleThreadSkipCache(t *testing.T) { + // This test can't run parallel as files in / might be changed by other tests. + + stop := make(chan struct{}) + defer close(stop) + + runFSByteRangeSingleThread(t, &FS{ + Root: ".", + AcceptByteRange: true, + SkipCache: true, + CleanStop: stop, + }) +} + +func runFSByteRangeSingleThread(t *testing.T, fs *FS) { + t.Helper() + h := fs.NewRequestHandler() testFSByteRange(t, h, "/fs.go") @@ -348,6 +388,8 @@ func TestFSByteRangeSingleThread(t *testing.T) { } func testFSByteRange(t *testing.T, h RequestHandler, filePath string) { + t.Helper() + var ctx RequestCtx ctx.Init(&Request{}, nil, nil) @@ -427,6 +469,8 @@ func TestParseByteRangeSuccess(t *testing.T) { } func testParseByteRangeSuccess(t *testing.T, v string, contentLength, startPos, endPos int) { + t.Helper() + startPos1, endPos1, err := ParseByteRange([]byte(v), contentLength) if err != nil { t.Fatalf("unexpected error: %v. v=%q, contentLength=%d", err, v, contentLength) @@ -467,6 +511,8 @@ func TestParseByteRangeError(t *testing.T) { } func testParseByteRangeError(t *testing.T, v string, contentLength int) { + t.Helper() + _, _, err := ParseByteRange([]byte(v), contentLength) if err == nil { t.Fatalf("expecting error when parsing byte range %q", v) @@ -480,17 +526,41 @@ func TestFSCompressConcurrent(t *testing.T) { } // This test can't run parallel as files in / might be changed by other tests. - stop := make(chan struct{}) defer close(stop) - fs := &FS{ + runFSCompressConcurrent(t, &FS{ Root: ".", GenerateIndexPages: true, Compress: true, CompressBrotli: true, CleanStop: stop, + }) +} + +func TestFSCompressConcurrentSkipCache(t *testing.T) { + // Don't run this test on Windows, the Windows GitHub actions are too slow and timeout too often. + if runtime.GOOS == "windows" { + t.SkipNow() } + + // This test can't run parallel as files in / might be changed by other tests. + stop := make(chan struct{}) + defer close(stop) + + runFSCompressConcurrent(t, &FS{ + Root: ".", + GenerateIndexPages: true, + SkipCache: true, + Compress: true, + CompressBrotli: true, + CleanStop: stop, + }) +} + +func runFSCompressConcurrent(t *testing.T, fs *FS) { + t.Helper() + h := fs.NewRequestHandler() concurrency := 4 @@ -521,13 +591,34 @@ func TestFSCompressSingleThread(t *testing.T) { stop := make(chan struct{}) defer close(stop) - fs := &FS{ + runFSCompressSingleThread(t, &FS{ Root: ".", GenerateIndexPages: true, Compress: true, CompressBrotli: true, CleanStop: stop, - } + }) +} + +func TestFSCompressSingleThreadSkipCache(t *testing.T) { + // This test can't run parallel as files in / might be changed by other tests. + + stop := make(chan struct{}) + defer close(stop) + + runFSCompressSingleThread(t, &FS{ + Root: ".", + GenerateIndexPages: true, + SkipCache: true, + Compress: true, + CompressBrotli: true, + CleanStop: stop, + }) +} + +func runFSCompressSingleThread(t *testing.T, fs *FS) { + t.Helper() + h := fs.NewRequestHandler() testFSCompress(t, h, "/fs.go") @@ -536,6 +627,8 @@ func TestFSCompressSingleThread(t *testing.T) { } func testFSCompress(t *testing.T, h RequestHandler, filePath string) { + t.Helper() + // File locking is flaky on Windows. if runtime.GOOS == "windows" { t.SkipNow() @@ -755,6 +848,8 @@ func TestStripPathSlashes(t *testing.T) { } func testStripPathSlashes(t *testing.T, path string, stripSlashes int, expectedPath string) { + t.Helper() + s := stripLeadingSlashes([]byte(path), stripSlashes) s = stripTrailingSlashes(s) if string(s) != expectedPath { @@ -779,6 +874,8 @@ func TestFileExtension(t *testing.T) { } func testFileExtension(t *testing.T, path string, compressed bool, compressedFileSuffix, expectedExt string) { + t.Helper() + ext := fileExtension(path, compressed, compressedFileSuffix) if ext != expectedExt { t.Fatalf("unexpected file extension for file %q: %q. Expecting %q", path, ext, expectedExt)