Skip to content

Commit

Permalink
Added metrics for the encoding manager. (#967)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Dec 9, 2024
1 parent dd42f16 commit 9170987
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 12 deletions.
3 changes: 3 additions & 0 deletions disperser/cmd/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Config struct {

BLSOperatorStateRetrieverAddr string
EigenDAServiceManagerAddr string

MetricsPort int
}

func NewConfig(ctx *cli.Context) (Config, error) {
Expand Down Expand Up @@ -89,6 +91,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {

BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
MetricsPort: ctx.GlobalInt(flags.MetricsPortFlag.Name),
}
return config, nil
}
8 changes: 8 additions & 0 deletions disperser/cmd/controller/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BATCH_SIZE"),
Value: 128,
}
MetricsPortFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "metrics-port"),
Usage: "Port to expose metrics",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "METRICS_PORT"),
Value: 9101,
}
)

var requiredFlags = []cli.Flag{
Expand Down Expand Up @@ -202,6 +209,7 @@ var optionalFlags = []cli.Flag{
NumConcurrentDispersalRequestsFlag,
NodeClientCacheNumEntriesFlag,
MaxBatchSizeFlag,
MetricsPortFlag,
}

var Flags []cli.Flag
Expand Down
29 changes: 29 additions & 0 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@ package main
import (
"context"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"net/http"
"os"
"strings"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/aws/dynamodb"
Expand Down Expand Up @@ -76,6 +81,22 @@ func RunController(ctx *cli.Context) error {
config.DynamoDBTableName,
)

metricsRegistry := prometheus.NewRegistry()
metricsRegistry.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
metricsRegistry.MustRegister(collectors.NewGoCollector())

logger.Infof("Starting metrics server at port %d", config.MetricsPort)
addr := fmt.Sprintf(":%d", config.MetricsPort)
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(
metricsRegistry,
promhttp.HandlerOpts{},
))
metricsServer := &http.Server{
Addr: addr,
Handler: mux,
}

encoderClient, err := encoder.NewEncoderClientV2(config.EncodingManagerConfig.EncoderAddress)
if err != nil {
return fmt.Errorf("failed to create encoder client: %v", err)
Expand All @@ -88,6 +109,7 @@ func RunController(ctx *cli.Context) error {
encoderClient,
chainReader,
logger,
metricsRegistry,
)
if err != nil {
return fmt.Errorf("failed to create encoding manager: %v", err)
Expand Down Expand Up @@ -154,5 +176,12 @@ func RunController(ctx *cli.Context) error {
return fmt.Errorf("failed to start dispatcher: %v", err)
}

go func() {
err := metricsServer.ListenAndServe()
if err != nil && !strings.Contains(err.Error(), "http: Server closed") {
logger.Errorf("metrics metricsServer error: %v", err)
}
}()

return nil
}
64 changes: 53 additions & 11 deletions disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math"
"math/rand"
"sync/atomic"
Expand Down Expand Up @@ -57,6 +58,8 @@ type EncodingManager struct {
// state
cursor *blobstore.StatusIndexCursor
blobVersionParameters atomic.Pointer[corev2.BlobVersionParameterMap]

metrics *encodingManagerMetrics
}

func NewEncodingManager(
Expand All @@ -66,6 +69,7 @@ func NewEncodingManager(
encodingClient disperser.EncoderClientV2,
chainReader core.Reader,
logger logging.Logger,
registry *prometheus.Registry,
) (*EncodingManager, error) {
if config.NumRelayAssignment < 1 ||
len(config.AvailableRelays) == 0 ||
Expand All @@ -82,8 +86,8 @@ func NewEncodingManager(
encodingClient: encodingClient,
chainReader: chainReader,
logger: logger.With("component", "EncodingManager"),

cursor: nil,
cursor: nil,
metrics: newEncodingManagerMetrics(registry),
}, nil
}

Expand Down Expand Up @@ -151,6 +155,15 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
return fmt.Errorf("blob version parameters is nil")
}

e.metrics.reportBatchSize(len(blobMetadatas))
batchSizeBytes := uint64(0)
for _, blob := range blobMetadatas {
batchSizeBytes += blob.BlobSize
}
e.metrics.reportBatchDataSize(batchSizeBytes)

submissionStart := time.Now()

for _, blob := range blobMetadatas {
blob := blob
blobKey, err := blob.BlobHeader.BlobKey()
Expand All @@ -167,14 +180,25 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {

// Encode the blobs
e.pool.Submit(func() {
for i := 0; i < e.NumEncodingRetries+1; i++ {
start := time.Now()

var i int
var finishedEncodingTime time.Time
var finishedPutBlobCertificateTime time.Time
var finishedUpdateBlobStatusTime time.Time
var success bool

for i = 0; i < e.NumEncodingRetries+1; i++ {
encodingCtx, cancel := context.WithTimeout(ctx, e.EncodingRequestTimeout)
fragmentInfo, err := e.encodeBlob(encodingCtx, blobKey, blob, blobParams)
cancel()
if err != nil {
e.logger.Error("failed to encode blob", "blobKey", blobKey.Hex(), "err", err)
continue
}

finishedEncodingTime = time.Now()

relayKeys, err := GetRelayKeys(e.NumRelayAssignment, e.AvailableRelays)
if err != nil {
e.logger.Error("failed to get relay keys", "err", err)
Expand All @@ -194,28 +218,46 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
continue
}

finishedPutBlobCertificateTime = time.Now()

storeCtx, cancel = context.WithTimeout(ctx, e.StoreTimeout)
err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Encoded)
finishedUpdateBlobStatusTime = time.Now()
cancel()
if err == nil || errors.Is(err, dispcommon.ErrAlreadyExists) {
// Successfully updated the status to Encoded
return
success = true
break
}

e.logger.Error("failed to update blob status to Encoded", "blobKey", blobKey.Hex(), "err", err)
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
sleepTime := time.Duration(math.Pow(2, float64(i))) * time.Second
time.Sleep(sleepTime) // Wait before retrying
}

storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Failed)
cancel()
if err != nil {
e.logger.Error("failed to update blob status to Failed", "blobKey", blobKey.Hex(), "err", err)
return
e.metrics.reportBatchRetryCount(i)

if success {
e.metrics.reportEncodingLatency(finishedEncodingTime.Sub(start))
e.metrics.reportPutBlobCertLatency(finishedPutBlobCertificateTime.Sub(finishedEncodingTime))
e.metrics.reportUpdateBlobStatusLatency(
finishedUpdateBlobStatusTime.Sub(finishedPutBlobCertificateTime))
e.metrics.reportBlobHandleLatency(time.Since(start))
} else {
e.metrics.reportFailedSubmission()
storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
err = e.blobMetadataStore.UpdateBlobStatus(storeCtx, blobKey, v2.Failed)
cancel()
if err != nil {
e.logger.Error("failed to update blob status to Failed", "blobKey", blobKey.Hex(), "err", err)
return
}
}
})
}

e.metrics.reportBatchSubmissionLatency(time.Since(submissionStart))

if cursor != nil {
e.cursor = cursor
}
Expand Down
159 changes: 159 additions & 0 deletions disperser/controller/encoding_manager_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package controller

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"time"
)

const encodingManagerNamespace = "eigenda_encoding_manager"

// encodingManagerMetrics is a struct that holds the metrics for the encoding manager.
type encodingManagerMetrics struct {
batchSubmissionLatency *prometheus.SummaryVec
blobHandleLatency *prometheus.SummaryVec
encodingLatency *prometheus.SummaryVec
putBlobCertLatency *prometheus.SummaryVec
updateBlobStatusLatency *prometheus.SummaryVec
batchSize *prometheus.GaugeVec
batchDataSize *prometheus.GaugeVec
batchRetryCount *prometheus.GaugeVec
failedSubmissionCount *prometheus.CounterVec
}

// NewEncodingManagerMetrics sets up metrics for the encoding manager.
func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMetrics {
batchSubmissionLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: encodingManagerNamespace,
Name: "batch_submission_latency_ms",
Help: "The time required to submit a blob to the work pool for encoding.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

blobHandleLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: encodingManagerNamespace,
Name: "blob_handle_latency_ms",
Help: "The total time required to handle a blob.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

encodingLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: encodingManagerNamespace,
Name: "encoding_latency_ms",
Help: "The time required to encode a blob.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

putBlobCertLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: encodingManagerNamespace,
Name: "put_blob_cert_latency_ms",
Help: "The time required to put a blob certificate.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

updateBlobStatusLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: encodingManagerNamespace,
Name: "update_blob_status_latency_ms",
Help: "The time required to update a blob status.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

batchSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: encodingManagerNamespace,
Name: "batch_size",
Help: "The number of blobs in a batch.",
},
[]string{},
)

batchDataSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: encodingManagerNamespace,
Name: "batch_data_size_bytes",
Help: "The size of the data in a batch.",
},
[]string{},
)

batchRetryCount := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: encodingManagerNamespace,
Name: "batch_retry_count",
Help: "The number of retries required to encode a blob.",
},
[]string{},
)

failSubmissionCount := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: encodingManagerNamespace,
Name: "failed_submission_count",
Help: "The number of failed blob submissions (even after retries).",
},
[]string{},
)

return &encodingManagerMetrics{
batchSubmissionLatency: batchSubmissionLatency,
blobHandleLatency: blobHandleLatency,
encodingLatency: encodingLatency,
putBlobCertLatency: putBlobCertLatency,
updateBlobStatusLatency: updateBlobStatusLatency,
batchSize: batchSize,
batchDataSize: batchDataSize,
batchRetryCount: batchRetryCount,
failedSubmissionCount: failSubmissionCount,
}
}

func (m *encodingManagerMetrics) reportBatchSubmissionLatency(duration time.Duration) {
m.batchSubmissionLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *encodingManagerMetrics) reportBlobHandleLatency(duration time.Duration) {
m.blobHandleLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *encodingManagerMetrics) reportEncodingLatency(duration time.Duration) {
m.encodingLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *encodingManagerMetrics) reportPutBlobCertLatency(duration time.Duration) {
m.putBlobCertLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *encodingManagerMetrics) reportUpdateBlobStatusLatency(duration time.Duration) {
m.updateBlobStatusLatency.WithLabelValues().Observe(float64(duration.Nanoseconds()) / float64(time.Millisecond))
}

func (m *encodingManagerMetrics) reportBatchSize(size int) {
m.batchSize.WithLabelValues().Set(float64(size))
}

func (m *encodingManagerMetrics) reportBatchDataSize(size uint64) {
m.batchDataSize.WithLabelValues().Set(float64(size))
}

func (m *encodingManagerMetrics) reportBatchRetryCount(count int) {
m.batchRetryCount.WithLabelValues().Set(float64(count))
}

func (m *encodingManagerMetrics) reportFailedSubmission() {
m.failedSubmissionCount.WithLabelValues().Inc()
}
Loading

0 comments on commit 9170987

Please sign in to comment.