Skip to content

Commit

Permalink
[v2] Remove rate limiter from disperser (#909)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Nov 20, 2024
1 parent eef48b6 commit f7ab152
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 48 deletions.
8 changes: 1 addition & 7 deletions disperser/apiserver/disperse_blob_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser/v2"
"github.com/Layr-Labs/eigenda/common"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/Layr-Labs/eigenda/encoding"
Expand All @@ -19,17 +18,12 @@ func (s *DispersalServerV2) DisperseBlob(ctx context.Context, req *pb.DisperseBl
return nil, err
}

origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true)
if err != nil {
return nil, api.NewErrorInvalidArg(err.Error())
}

data := req.GetData()
blobHeader, err := corev2.BlobHeaderFromProtobuf(req.GetBlobHeader())
if err != nil {
return nil, api.NewErrorInternal(err.Error())
}
s.logger.Debug("received a new blob dispersal request", "origin", origin, "blobSizeBytes", len(data), "quorums", req.GetBlobHeader().GetQuorumNumbers())
s.logger.Debug("received a new blob dispersal request", "blobSizeBytes", len(data), "quorums", req.GetBlobHeader().GetQuorumNumbers())

// TODO(ian-shim): handle payments and check rate limits

Expand Down
39 changes: 0 additions & 39 deletions disperser/apiserver/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ type DispersalServerV2 struct {
pb.UnimplementedDisperserServer

serverConfig disperser.ServerConfig
rateConfig RateConfig
blobStore *blobstore.BlobStore
blobMetadataStore *blobstore.BlobMetadataStore

chainReader core.Reader
ratelimiter common.RateLimiter
authenticator corev2.BlobRequestAuthenticator
prover encoding.Prover
logger logging.Logger
Expand Down Expand Up @@ -67,12 +65,10 @@ func NewDispersalServerV2(

return &DispersalServerV2{
serverConfig: serverConfig,
rateConfig: rateConfig,
blobStore: blobStore,
blobMetadataStore: blobMetadataStore,

chainReader: chainReader,
ratelimiter: ratelimiter,
authenticator: authenticator,
prover: prover,
logger: logger,
Expand Down Expand Up @@ -105,10 +101,6 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
return fmt.Errorf("failed to refresh onchain quorum state: %w", err)
}

if err := s.RefreshAllowlist(); err != nil {
return fmt.Errorf("failed to refresh allowlist: %w", err)
}

go func() {
ticker := time.NewTicker(s.onchainStateRefreshInterval)
defer ticker.Stop()
Expand All @@ -125,21 +117,6 @@ func (s *DispersalServerV2) Start(ctx context.Context) error {
}
}()

go func() {
t := time.NewTicker(s.rateConfig.AllowlistRefreshInterval)
defer t.Stop()
for {
select {
case <-t.C:
if err := s.RefreshAllowlist(); err != nil {
s.logger.Error("failed to refresh allowlist", "err", err)
}
case <-ctx.Done():
return
}
}
}()

s.logger.Info("GRPC Listening", "port", s.serverConfig.GrpcPort, "address", listener.Addr().String())

if err := gs.Serve(listener); err != nil {
Expand Down Expand Up @@ -186,22 +163,6 @@ func (s *DispersalServerV2) GetBlobCommitment(ctx context.Context, req *pb.BlobC
}}, nil
}

func (s *DispersalServerV2) RefreshAllowlist() error {
s.logger.Debug("Refreshing onchain quorum state")
al, err := ReadAllowlistFromFile(s.rateConfig.AllowlistFile)
if err != nil {
return fmt.Errorf("failed to load allowlist: %w", err)
}
s.rateConfig.Allowlist = al
for account, rateInfoByQuorum := range al {
for quorumID, rateInfo := range rateInfoByQuorum {
s.logger.Info("[Allowlist]", "account", account, "name", rateInfo.Name, "quorumID", quorumID, "throughput", rateInfo.Throughput, "blobRate", rateInfo.BlobRate)
}
}

return nil
}

// refreshOnchainState refreshes the onchain quorum state.
// It should be called periodically to keep the state up to date.
// **Note** that there is no lock. If the state is being updated concurrently, it may lead to inconsistent state.
Expand Down
2 changes: 0 additions & 2 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type Config struct {
IndexerConfig indexer.Config
ChainStateConfig thegraph.Config
UseGraph bool
IndexerDataDir string

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string
Expand Down Expand Up @@ -84,7 +83,6 @@ func NewConfig(ctx *cli.Context) (Config, error) {
IndexerConfig: indexer.ReadIndexerConfig(ctx),
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
UseGraph: ctx.GlobalBool(flags.UseGraphFlag.Name),
IndexerDataDir: ctx.GlobalString(flags.IndexerDataDirFlag.Name),

BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
Expand Down

0 comments on commit f7ab152

Please sign in to comment.