From a66a68d4fee7f336f2a44cc9fd5c65a59b8b460a Mon Sep 17 00:00:00 2001 From: Andriy Biletsky Date: Wed, 22 Jan 2025 14:02:32 +0700 Subject: [PATCH] Add extra context and a retry delay for cache retrieval --- app/query/cache.go | 23 +++++++++++++++-------- app/query/cache_test.go | 24 ++++++++++++++++-------- app/query/processors.go | 10 ++++++---- apps/lbrytv/config/config.go | 5 +++++ 4 files changed, 42 insertions(+), 20 deletions(-) diff --git a/app/query/cache.go b/app/query/cache.go index f7e67ff4..2e603a05 100644 --- a/app/query/cache.go +++ b/app/query/cache.go @@ -27,8 +27,9 @@ const ( ) type CacheRequest struct { - Method string - Params any + Method string + Params any + metaKey string } type CachedResponse struct { @@ -75,12 +76,18 @@ func NewQueryCacheWithInvalidator(baseCache cache.CacheInterface[any]) (*QueryCa return qc, nil } -func (c *QueryCache) Retrieve(query *Query, getter func() (any, error)) (*CachedResponse, error) { - log := logger.Log() - cacheReq := CacheRequest{ - Method: query.Method(), - Params: query.Params(), +func NewCacheRequest(method string, params any, metaKey string) CacheRequest { + return CacheRequest{ + Method: method, + Params: params, + metaKey: metaKey, } +} + +func (c *QueryCache) Retrieve(query *Query, metaKey string, getter func() (any, error)) (*CachedResponse, error) { + log := logger.Log() + + cacheReq := NewCacheRequest(query.Method(), query.Params(), metaKey) ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond) defer cancel() @@ -207,7 +214,7 @@ func (r CacheRequest) GetCacheKey() string { params = string(p) } } - fmt.Fprintf(digester, "%s:%s:%s", "request", r.Method, params) + fmt.Fprintf(digester, "[%s]%s:%s:%s", r.metaKey, "request", r.Method, params) hash := digester.Sum(nil) return fmt.Sprintf("%x", hash) } diff --git a/app/query/cache_test.go b/app/query/cache_test.go index f63346f8..f47fe4cc 100644 --- a/app/query/cache_test.go +++ b/app/query/cache_test.go @@ -13,26 +13,34 @@ func TestGetCacheKey(t *testing.T) { assert := assert.New(t) require := require.New(t) seen := map[string]bool{} - params := []map[string]any{{}, {"uri": "what"}, {"uri": "odysee"}, nil} - genCacheKey := func(params map[string]any) string { + params := []map[string]any{ + {}, + {"uri": "what"}, + {"uri": "odysee"}, + nil, + } + genCacheKey := func(params map[string]any, metaKey string) string { req := jsonrpc.NewRequest(MethodResolve, params) query, err := NewQuery(req, "") require.NoError(err) - cacheReq := CacheRequest{ - Method: query.Method(), - Params: query.Params(), - } + cacheReq := NewCacheRequest(query.Method(), query.Params(), metaKey) return cacheReq.GetCacheKey() } for _, p := range params { t.Run(fmt.Sprintf("%+v", p), func(t *testing.T) { - cacheKey := genCacheKey(p) + cacheKey := genCacheKey(p, "") + assert.Len(cacheKey, 32) + assert.NotContains(seen, cacheKey) + seen[cacheKey] = true + }) + t.Run(fmt.Sprintf("%+v", p), func(t *testing.T) { + cacheKey := genCacheKey(p, "user@endpoint") assert.Len(cacheKey, 32) assert.NotContains(seen, cacheKey) seen[cacheKey] = true }) } - assert.Contains(seen, genCacheKey(params[1])) + assert.Contains(seen, genCacheKey(params[1], "user@endpoint")) } func TestCachedResponseMarshal(t *testing.T) { diff --git a/app/query/processors.go b/app/query/processors.go index 5bc2aa90..49a81417 100644 --- a/app/query/processors.go +++ b/app/query/processors.go @@ -646,11 +646,12 @@ func preflightCacheHook(caller *Caller, ctx context.Context) (*jsonrpc.RPCRespon } query := QueryFromContext(ctx) - retrieverRetries := 3 - retriever := func() (any, error) { + getterRetries := config.GetCacheGetterRetries() + getter := func() (any, error) { var resp *jsonrpc.RPCResponse var err error - for attempt := range retrieverRetries { + for attempt := range getterRetries { + time.Sleep(time.Duration(attempt) * time.Second) start := time.Now() resp, err = caller.SendQuery(ctx, query) duration := time.Since(start).Seconds() @@ -674,7 +675,8 @@ func preflightCacheHook(caller *Caller, ctx context.Context) (*jsonrpc.RPCRespon return resp, err } - cachedResp, err := caller.Cache.Retrieve(query, retriever) + metaKey := fmt.Sprintf("%d@%s", caller.userID, caller.Endpoint()) + cachedResp, err := caller.Cache.Retrieve(query, metaKey, getter) if err != nil { return nil, rpcerrors.NewSDKError(err) } diff --git a/apps/lbrytv/config/config.go b/apps/lbrytv/config/config.go index 405f30ea..433d9f0b 100644 --- a/apps/lbrytv/config/config.go +++ b/apps/lbrytv/config/config.go @@ -196,6 +196,10 @@ func GetTokenCacheTimeout() time.Duration { return Config.Viper.GetDuration("TokenCacheTimeout") * time.Second } +func GetCacheGetterRetries() int { + return Config.Viper.GetInt("CacheGetterRetries") +} + func GetCORSDomains() []string { return Config.Viper.GetStringSlice("CORSDomains") } @@ -254,4 +258,5 @@ func init() { c.Viper.SetDefault("Address", ":8080") c.Viper.SetDefault("Host", "http://localhost:8080") c.Viper.SetDefault("Logging", map[string]string{"level": "debug", "format": "console"}) + c.Viper.SetDefault("CacheGetterRetries", 3) }