Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to encoder v2 #1080

Merged
merged 9 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 45 additions & 35 deletions disperser/api/grpc/encoder/v2/encoder.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions disperser/api/proto/encoder/v2/encoder.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ service Encoder {
message EncodeBlobRequest {
bytes blob_key = 1;
EncodingParams encoding_params = 2;
uint64 blob_size = 3;
}

// EncodingParams specifies how the blob should be encoded into chunks
Expand Down
1 change: 1 addition & 0 deletions disperser/cmd/encoder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
GrpcPort: ctx.GlobalString(flags.GrpcPortFlag.Name),
MaxConcurrentRequests: ctx.GlobalInt(flags.MaxConcurrentRequestsFlag.Name),
RequestPoolSize: ctx.GlobalInt(flags.RequestPoolSizeFlag.Name),
RequestQueueSize: ctx.GlobalInt(flags.RequestQueueSizeFlag.Name),
EnableGnarkChunkEncoding: ctx.Bool(flags.EnableGnarkChunkEncodingFlag.Name),
PreventReencoding: ctx.Bool(flags.PreventReencodingFlag.Name),
Backend: ctx.String(flags.BackendFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/encoder/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ var (
Value: 32,
EnvVar: common.PrefixEnvVar(envVarPrefix, "REQUEST_POOL_SIZE"),
}
RequestQueueSizeFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "request-queue-size"),
Usage: "maximum number of requests in the request queue",
Required: false,
Value: 32,
EnvVar: common.PrefixEnvVar(envVarPrefix, "REQUEST_QUEUE_SIZE"),
}
EnableGnarkChunkEncodingFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "enable-gnark-chunk-encoding"),
Usage: "if true, will produce chunks in Gnark, instead of Gob",
Expand Down Expand Up @@ -111,6 +118,7 @@ var optionalFlags = []cli.Flag{
EnableMetrics,
MaxConcurrentRequestsFlag,
RequestPoolSizeFlag,
RequestQueueSizeFlag,
EnableGnarkChunkEncodingFlag,
EncoderVersionFlag,
S3BucketNameFlag,
Expand Down
1 change: 1 addition & 0 deletions disperser/cmd/encoder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func RunEncoderServer(ctx *cli.Context) error {
logger,
prover,
metrics,
grpcMetrics,
)

return server.Start()
Expand Down
10 changes: 9 additions & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigensdk-go/logging"
"google.golang.org/grpc/metadata"
)

var errNoBlobsToEncode = errors.New("no blobs to encode")
Expand Down Expand Up @@ -269,11 +270,18 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
}

func (e *EncodingManager) encodeBlob(ctx context.Context, blobKey corev2.BlobKey, blob *v2.BlobMetadata, blobParams *core.BlobVersionParameters) (*encoding.FragmentInfo, error) {
// Add headers for routing
md := metadata.New(map[string]string{
"content-type": "application/grpc",
"x-blob-size": fmt.Sprintf("%d", blob.BlobSize),
})
ctx = metadata.NewOutgoingContext(ctx, md)

encodingParams, err := blob.BlobHeader.GetEncodingParams(blobParams)
if err != nil {
return nil, fmt.Errorf("failed to get encoding params: %w", err)
}
return e.encodingClient.EncodeBlob(ctx, blobKey, encodingParams)
return e.encodingClient.EncodeBlob(ctx, blobKey, encodingParams, blob.BlobSize)
}

func (e *EncodingManager) refreshBlobVersionParams(ctx context.Context) error {
Expand Down
3 changes: 2 additions & 1 deletion disperser/encoder/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewEncoderClientV2(addr string) (disperser.EncoderClientV2, error) {
}, nil
}

func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams) (*encoding.FragmentInfo, error) {
func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encodingParams encoding.EncodingParams, blobSize uint64) (*encoding.FragmentInfo, error) {
// Establish connection
conn, err := grpc.NewClient(
c.addr,
Expand All @@ -43,6 +43,7 @@ func (c *clientV2) EncodeBlob(ctx context.Context, blobKey corev2.BlobKey, encod
ChunkLength: encodingParams.ChunkLength,
NumChunks: encodingParams.NumChunks,
},
BlobSize: blobSize,
}

// Make the RPC call
Expand Down
1 change: 1 addition & 0 deletions disperser/encoder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ServerConfig struct {
GrpcPort string
MaxConcurrentRequests int
RequestPoolSize int
RequestQueueSize int
EnableGnarkChunkEncoding bool
PreventReencoding bool
Backend string
Expand Down
2 changes: 1 addition & 1 deletion disperser/encoder/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *EncoderServer) EncodeBlob(ctx context.Context, req *pb.EncodeBlobReques
s.metrics.ObserveQueue(s.queueStats)
s.queueLock.Unlock()
default:
s.metrics.IncrementRateLimitedBlobRequestNum(len(req.GetData()))
s.metrics.IncrementRateLimitedBlobRequestNum(blobSize)
s.logger.Warn("rate limiting as request pool is full", "requestPoolSize", s.config.RequestPoolSize, "maxConcurrentRequests", s.config.MaxConcurrentRequests)
return nil, errors.New("too many requests")
}
Expand Down
Loading
Loading