Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(#191): Routing namespace --> Storage #195

Merged
merged 6 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ In order to disperse to the EigenDA network in production, or at high throughput
| `--s3.path` | | `$EIGENDA_PROXY_S3_PATH` | Bucket path for S3 storage. |
| `--s3.endpoint` | | `$EIGENDA_PROXY_S3_ENDPOINT` | Endpoint for S3 storage. |
| `--s3.enable-tls` | | `$EIGENDA_PROXY_S3_ENABLE_TLS` | Enable TLS connection to S3 endpoint. |
| `--routing.fallback-targets` | `[]` | `$EIGENDA_PROXY_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. |
| `--routing.cache-targets` | `[]` | `$EIGENDA_PROXY_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. |
| `--routing.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. |
| `--storage.fallback-targets` | `[]` | `$EIGENDA_PROXY_STORAGE_FALLBACK_TARGETS` | Fall back backend targets. Supports S3. | Backup storage locations to read from in the event of eigenda retrieval failure. |
| `--storage.cache-targets` | `[]` | `$EIGENDA_PROXY_STORAGE_CACHE_TARGETS` | Caching targets. Supports S3. | Caches data to backend targets after dispersing to DA, retrieved from before trying read from EigenDA. |
| `--storage.concurrent-write-threads` | `0` | `$EIGENDA_PROXY_STORAGE_CONCURRENT_WRITE_THREADS` | Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes. |
| `--s3.timeout` | `5s` | `$EIGENDA_PROXY_S3_TIMEOUT` | timeout for S3 storage operations (e.g. get, put) |
| `--redis.db` | `0` | `$EIGENDA_PROXY_REDIS_DB` | redis database to use after connecting to server |
| `--redis.endpoint` | `""` | `$EIGENDA_PROXY_REDIS_ENDPOINT` | redis endpoint url |
Expand Down
2 changes: 1 addition & 1 deletion utils/utils.go → common/common.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package common

import (
"fmt"
Expand Down
6 changes: 3 additions & 3 deletions utils/utils_test.go → common/common_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package utils_test
package common_test

import (
"fmt"
"testing"

"github.com/Layr-Labs/eigenda-proxy/utils"
"github.com/Layr-Labs/eigenda-proxy/common"
)

func TestParseByteAmount(t *testing.T) {
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestParseByteAmount(t *testing.T) {
t.Run(fmt.Sprintf("Input: %s", tc.input), func(t *testing.T) {
t.Parallel()

got, err := utils.ParseBytesAmount(tc.input)
got, err := common.ParseBytesAmount(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("wantErr: %v, got error: %v", tc.wantErr, err)
}
Expand Down
83 changes: 83 additions & 0 deletions common/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package common

import (
"context"
"fmt"
"strings"
)

// BackendType ... Storage backend type
type BackendType uint8

const (
EigenDABackendType BackendType = iota
MemoryBackendType
S3BackendType
RedisBackendType

UnknownBackendType
)

var (
ErrProxyOversizedBlob = fmt.Errorf("encoded blob is larger than max blob size")
ErrEigenDAOversizedBlob = fmt.Errorf("blob size cannot exceed")
)

func (b BackendType) String() string {
switch b {
case EigenDABackendType:
return "EigenDA"
case MemoryBackendType:
return "Memory"
case S3BackendType:
return "S3"
case RedisBackendType:
return "Redis"
case UnknownBackendType:
fallthrough
default:
return "Unknown"
}
}

func StringToBackendType(s string) BackendType {
lower := strings.ToLower(s)

switch lower {
case "eigenda":
return EigenDABackendType
case "memory":
return MemoryBackendType
case "s3":
return S3BackendType
case "redis":
return RedisBackendType
case "unknown":
fallthrough
default:
return UnknownBackendType
}
}

type Store interface {
// Backend returns the backend type provider of the store.
BackendType() BackendType
// Verify verifies the given key-value pair.
Verify(ctx context.Context, key []byte, value []byte) error
}

type GeneratedKeyStore interface {
Store
// Get retrieves the given key if it's present in the key-value data store.
Get(ctx context.Context, key []byte) ([]byte, error)
// Put inserts the given value into the key-value data store.
Put(ctx context.Context, value []byte) (key []byte, err error)
}

type PrecomputedKeyStore interface {
Store
// Get retrieves the given key if it's present in the key-value data store.
Get(ctx context.Context, key []byte) ([]byte, error)
// Put inserts the given value into the key-value data store.
Put(ctx context.Context, key []byte, value []byte) error
}
3 changes: 2 additions & 1 deletion e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/e2e"
"github.com/Layr-Labs/eigenda-proxy/store"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
Expand Down Expand Up @@ -54,7 +55,7 @@ func requireDispersalRetrievalEigenDA(t *testing.T, cm *metrics.CountMap, mode c
}

// requireWriteReadSecondary ... ensure that secondary backend was successfully written/read to/from
func requireWriteReadSecondary(t *testing.T, cm *metrics.CountMap, bt store.BackendType) {
func requireWriteReadSecondary(t *testing.T, cm *metrics.CountMap, bt common.BackendType) {
writeCount, err := cm.Get(http.MethodPut, store.Success, bt.String())
require.NoError(t, err)
require.True(t, writeCount > 0)
Expand Down
2 changes: 1 addition & 1 deletion e2e/safety_checks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestKeccak256CommitmentRequestErrorsWhenS3NotSet(t *testing.T) {
testCfg.UseKeccak256ModeS3 = true

tsConfig := e2e.TestSuiteConfig(testCfg)
tsConfig.EigenDAConfig.S3Config.Endpoint = ""
tsConfig.EigenDAConfig.StorageConfig.S3Config.Endpoint = ""
ts, kill := e2e.CreateTestSuite(tsConfig)
defer kill()

Expand Down
8 changes: 4 additions & 4 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/commitments"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/stretchr/testify/require"

"github.com/Layr-Labs/eigenda-proxy/e2e"
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestProxyCaching(t *testing.T) {
defer kill()

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.S3BackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}

Expand All @@ -119,7 +119,7 @@ func TestProxyCachingWithRedis(t *testing.T) {
defer kill()

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.RedisBackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.RedisBackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}

Expand Down Expand Up @@ -163,6 +163,6 @@ func TestProxyReadFallback(t *testing.T) {
require.Equal(t, expectedBlob, actualBlob)

requireSimpleClientSetGet(t, ts, e2e.RandBytes(1_000_000))
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, store.S3BackendType)
requireWriteReadSecondary(t, ts.Metrics.SecondaryRequestsTotal, common.S3BackendType)
requireDispersalRetrievalEigenDA(t, ts.Metrics.HTTPServerRequestsTotal, commitments.SimpleCommitmentMode)
}
20 changes: 12 additions & 8 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"strings"
"time"

"github.com/Layr-Labs/eigenda-proxy/common"
"github.com/Layr-Labs/eigenda-proxy/metrics"
"github.com/Layr-Labs/eigenda-proxy/server"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3"
"github.com/Layr-Labs/eigenda-proxy/utils"
"github.com/Layr-Labs/eigenda-proxy/verify"
"github.com/Layr-Labs/eigenda/api/clients"
"github.com/Layr-Labs/eigenda/encoding/kzg"
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestConfig(useMemory bool) *Cfg {
}

func createRedisConfig(eigendaCfg server.Config) server.CLIConfig {
eigendaCfg.RedisConfig = redis.Config{
eigendaCfg.StorageConfig.RedisConfig = redis.Config{
Endpoint: redisEndpoint,
Password: "",
DB: 0,
Expand All @@ -144,7 +145,7 @@ func createS3Config(eigendaCfg server.Config) server.CLIConfig {
bucketName := "eigenda-proxy-test-" + RandStr(10)
createS3Bucket(bucketName)

eigendaCfg.S3Config = s3.Config{
eigendaCfg.StorageConfig.S3Config = s3.Config{
Bucket: bucketName,
Path: "",
Endpoint: minioEndpoint,
Expand Down Expand Up @@ -178,7 +179,7 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
pollInterval = time.Minute * 1
}

maxBlobLengthBytes, err := utils.ParseBytesAmount("16mib")
maxBlobLengthBytes, err := common.ParseBytesAmount("16mib")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -213,7 +214,10 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
BlobExpiration: testCfg.Expiration,
MaxBlobSizeBytes: maxBlobLengthBytes,
},
AsyncPutWorkers: testCfg.WriteThreadCount,

StorageConfig: store.Config{
AsyncPutWorkers: testCfg.WriteThreadCount,
},
}

if testCfg.UseMemory {
Expand All @@ -226,15 +230,15 @@ func TestSuiteConfig(testCfg *Cfg) server.CLIConfig {
cfg = createS3Config(eigendaCfg)

case testCfg.UseS3Caching:
eigendaCfg.CacheTargets = []string{"S3"}
eigendaCfg.StorageConfig.CacheTargets = []string{"S3"}
cfg = createS3Config(eigendaCfg)

case testCfg.UseS3Fallback:
eigendaCfg.FallbackTargets = []string{"S3"}
eigendaCfg.StorageConfig.FallbackTargets = []string{"S3"}
cfg = createS3Config(eigendaCfg)

case testCfg.UseRedisCaching:
eigendaCfg.CacheTargets = []string{"redis"}
eigendaCfg.StorageConfig.CacheTargets = []string{"redis"}
cfg = createRedisConfig(eigendaCfg)

default:
Expand Down
29 changes: 5 additions & 24 deletions flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flags

import (
"github.com/Layr-Labs/eigenda-proxy/flags/eigendaflags"
"github.com/Layr-Labs/eigenda-proxy/store"
"github.com/Layr-Labs/eigenda-proxy/store/generated_key/memstore"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/redis"
"github.com/Layr-Labs/eigenda-proxy/store/precomputed_key/s3"
Expand All @@ -17,6 +18,8 @@ const (
EigenDAClientCategory = "EigenDA Client"
EigenDADeprecatedCategory = "DEPRECATED EIGENDA CLIENT FLAGS -- THESE WILL BE REMOVED IN V2.0.0"
MemstoreFlagsCategory = "Memstore (for testing purposes - replaces EigenDA backend)"
StorageFlagsCategory = "Storage"
StorageDeprecatedCategory = "DEPRECATED STORAGE FLAGS -- THESE WILL BE REMOVED IN V2.0.0"
RedisCategory = "Redis Cache/Fallback"
S3Category = "S3 Cache/Fallback"
VerifierCategory = "KZG and Cert Verifier"
Expand All @@ -26,12 +29,6 @@ const (
const (
ListenAddrFlagName = "addr"
PortFlagName = "port"

// routing flags
// TODO: change "routing" --> "secondary"
FallbackTargetsFlagName = "routing.fallback-targets"
CacheTargetsFlagName = "routing.cache-targets"
ConcurrentWriteThreads = "routing.concurrent-write-routines"
)

const EnvVarPrefix = "EIGENDA_PROXY"
Expand All @@ -55,24 +52,6 @@ func CLIFlags() []cli.Flag {
Value: 3100,
EnvVars: prefixEnvVars("PORT"),
},
&cli.StringSliceFlag{
Name: FallbackTargetsFlagName,
Usage: "List of read fallback targets to rollover to if cert can't be read from EigenDA.",
Value: cli.NewStringSlice(),
EnvVars: prefixEnvVars("FALLBACK_TARGETS"),
},
&cli.StringSliceFlag{
Name: CacheTargetsFlagName,
Usage: "List of caching targets to use fast reads from EigenDA.",
Value: cli.NewStringSlice(),
EnvVars: prefixEnvVars("CACHE_TARGETS"),
},
&cli.IntFlag{
Name: ConcurrentWriteThreads,
Usage: "Number of threads spun-up for async secondary storage insertions. (<=0) denotes single threaded insertions where (>0) indicates decoupled writes.",
Value: 0,
EnvVars: prefixEnvVars("CONCURRENT_WRITE_THREADS"),
},
}

return flags
Expand All @@ -87,6 +66,8 @@ func init() {
Flags = append(Flags, opmetrics.CLIFlags(EnvVarPrefix)...)
Flags = append(Flags, eigendaflags.CLIFlags(EnvVarPrefix, EigenDAClientCategory)...)
Flags = append(Flags, eigendaflags.DeprecatedCLIFlags(EnvVarPrefix, EigenDADeprecatedCategory)...)
Flags = append(Flags, store.CLIFlags(EnvVarPrefix, StorageFlagsCategory)...)
Flags = append(Flags, store.DeprecatedCLIFlags(EnvVarPrefix, StorageDeprecatedCategory)...)
Flags = append(Flags, redis.CLIFlags(EnvVarPrefix, RedisCategory)...)
Flags = append(Flags, s3.CLIFlags(EnvVarPrefix, S3Category)...)
Flags = append(Flags, memstore.CLIFlags(EnvVarPrefix, MemstoreFlagsCategory)...)
Expand Down
Loading
Loading