Skip to content

Commit

Permalink
Add metrics to encoder v2 (#1080)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored Jan 8, 2025
1 parent c33ad29 commit 9267dc2
Show file tree
Hide file tree
Showing 16 changed files with 153 additions and 77 deletions.
80 changes: 45 additions & 35 deletions disperser/api/grpc/encoder/v2/encoder.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions disperser/api/proto/encoder/v2/encoder.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ service Encoder {
message EncodeBlobRequest {
bytes blob_key = 1;
EncodingParams encoding_params = 2;
uint64 blob_size = 3;
}

// EncodingParams specifies how the blob should be encoded into chunks
Expand Down
1 change: 1 addition & 0 deletions disperser/cmd/encoder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
GrpcPort: ctx.GlobalString(flags.GrpcPortFlag.Name),
MaxConcurrentRequests: ctx.GlobalInt(flags.MaxConcurrentRequestsFlag.Name),
RequestPoolSize: ctx.GlobalInt(flags.RequestPoolSizeFlag.Name),
RequestQueueSize: ctx.GlobalInt(flags.RequestQueueSizeFlag.Name),
EnableGnarkChunkEncoding: ctx.Bool(flags.EnableGnarkChunkEncodingFlag.Name),
PreventReencoding: ctx.Bool(flags.PreventReencodingFlag.Name),
Backend: ctx.String(flags.BackendFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/encoder/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ var (
Value: 32,
EnvVar: common.PrefixEnvVar(envVarPrefix, "REQUEST_POOL_SIZE"),
}
RequestQueueSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "request-queue-size"),
Usage: "maximum number of requests in the request queue",
Required: false,
Value: 32,
EnvVar: common.PrefixEnvVar(envVarPrefix, "REQUEST_QUEUE_SIZE"),
}
EnableGnarkChunkEncodingFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-gnark-chunk-encoding"),
Usage: "if true, will produce chunks in Gnark, instead of Gob",
Expand Down Expand Up @@ -111,6 +118,7 @@ var optionalFlags = []cli.Flag{
EnableMetrics,
MaxConcurrentRequestsFlag,
RequestPoolSizeFlag,
RequestQueueSizeFlag,
EnableGnarkChunkEncodingFlag,
EncoderVersionFlag,
S3BucketNameFlag,
Expand Down
1 change: 1 addition & 0 deletions disperser/cmd/encoder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func RunEncoderServer(ctx *cli.Context) error {
logger,
prover,
metrics,
grpcMetrics,
)

return server.Start()
Expand Down
10 changes: 9 additions & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc/metadata"
)

var errNoBlobsToEncode = errors.New("no blobs to encode")
Expand Down Expand Up @@ -269,11 +270,18 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
}

func (e *EncodingManager) encodeBlob(ctx context.Context, blobKey corev2.BlobKey, blob *v2.BlobMetadata, blobParams *core.BlobVersionParameters) (*encoding.FragmentInfo, error) {
// Add headers for routing
md := metadata.New(map[string]string{
"content-type": "application/grpc",
"x-blob-size": fmt.Sprintf("%d", blob.BlobSize),
})
ctx = metadata.NewOutgoingContext(ctx, md)

encodingParams, err := blob.BlobHeader.GetEncodingParams(blobParams)
if err != nil {
return nil, fmt.Errorf("failed to get encoding params: %w", err)
}
return e.encodingClient.EncodeBlob(ctx, blobKey, encodingParams)
return e.encodingClient.EncodeBlob(ctx, blobKey, encodingParams, blob.BlobSize)
}

func (e *EncodingManager) refreshBlobVersionParams(ctx context.Context) error {
Expand Down
3 changes: 2 additions & 1 deletion disperser/encoder/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewEncoderClientV2(addr string) (disperser.EncoderClientV2, error) {
}, nil
}

func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams) (*encoding.FragmentInfo, error) {
func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams, blobSize uint64) (*encoding.FragmentInfo, error) {
// Establish connection
conn, err := grpc.NewClient(
c.addr,
Expand All @@ -43,6 +43,7 @@ func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encod
ChunkLength: encodingParams.ChunkLength,
NumChunks: encodingParams.NumChunks,
},
BlobSize: blobSize,
}

// Make the RPC call
Expand Down
1 change: 1 addition & 0 deletions disperser/encoder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ServerConfig struct {
GrpcPort string
MaxConcurrentRequests int
RequestPoolSize int
RequestQueueSize int
EnableGnarkChunkEncoding bool
PreventReencoding bool
Backend string
Expand Down
2 changes: 1 addition & 1 deletion disperser/encoder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()
default:
s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData()))
s.metrics.IncrementRateLimitedBlobRequestNum(blobSize)
s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests)
return nil, errors.New("too many requests")
}
Expand Down
Loading

0 comments on commit 9267dc2

Please sign in to comment.