From 0db7695351531f05e6d21d5ba831176df3701b71 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Thu, 9 Jan 2025 21:13:43 -0800 Subject: [PATCH] Add throughput metrics to controller (#1094) --- disperser/controller/dispatcher.go | 10 ++++ disperser/controller/dispatcher_metrics.go | 50 +++++++++++++++---- disperser/controller/encoding_manager.go | 2 + .../controller/encoding_manager_metrics.go | 28 +++++++++++ 4 files changed, 79 insertions(+), 11 deletions(-) diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index 947ce3c22..31cdffe08 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -521,6 +521,9 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu if err != nil { multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err)) } + if metadata, ok := batch.Metadata[blobKey]; ok { + d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Failed) + } continue } @@ -538,6 +541,9 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu if err != nil { multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err)) } + if metadata, ok := batch.Metadata[blobKey]; ok { + d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.InsufficientSignatures) + } continue } @@ -548,6 +554,7 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu if metadata, ok := batch.Metadata[blobKey]; ok { requestedAt := time.Unix(0, int64(metadata.RequestedAt)) d.metrics.reportE2EDispersalLatency(time.Since(requestedAt)) + d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Certified) } } @@ -561,6 +568,9 @@ func (d *Dispatcher) failBatch(ctx context.Context, batch *batchData) error { if err != nil { multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err)) } + if metadata, ok := batch.Metadata[blobKey]; ok { + d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Failed) + } } return multierr diff --git a/disperser/controller/dispatcher_metrics.go b/disperser/controller/dispatcher_metrics.go index 7e88971c3..8b94b8878 100644 --- a/disperser/controller/dispatcher_metrics.go +++ b/disperser/controller/dispatcher_metrics.go @@ -3,7 +3,8 @@ package controller import ( "time" - "github.com/Layr-Labs/eigenda/common" + common "github.com/Layr-Labs/eigenda/common" + dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -26,19 +27,17 @@ type dispatcherMetrics struct { sendChunksLatency *prometheus.SummaryVec sendChunksRetryCount *prometheus.GaugeVec putDispersalResponseLatency *prometheus.SummaryVec - - handleSignaturesLatency *prometheus.SummaryVec - receiveSignaturesLatency *prometheus.SummaryVec - aggregateSignaturesLatency *prometheus.SummaryVec - putAttestationLatency *prometheus.SummaryVec - updateBatchStatusLatency *prometheus.SummaryVec - - blobE2EDispersalLatency *prometheus.SummaryVec + handleSignaturesLatency *prometheus.SummaryVec + receiveSignaturesLatency *prometheus.SummaryVec + aggregateSignaturesLatency *prometheus.SummaryVec + putAttestationLatency *prometheus.SummaryVec + updateBatchStatusLatency *prometheus.SummaryVec + blobE2EDispersalLatency *prometheus.SummaryVec + completedBlobs *prometheus.CounterVec } // NewDispatcherMetrics sets up metrics for the dispatcher. func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics { - objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001} handleBatchLatency := promauto.With(registry).NewSummaryVec( @@ -232,7 +231,7 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics { blobE2EDispersalLatency := promauto.With(registry).NewSummaryVec( prometheus.SummaryOpts{ - Namespace: encodingManagerNamespace, + Namespace: dispatcherNamespace, Name: "e2e_dispersal_latency_ms", Help: "The time required to disperse a blob end-to-end.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, @@ -240,6 +239,15 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics { []string{}, ) + completedBlobs := promauto.With(registry).NewCounterVec( + prometheus.CounterOpts{ + Namespace: dispatcherNamespace, + Name: "completed_blobs_total", + Help: "The number and size of completed blobs by status.", + }, + []string{"state", "data"}, + ) + return &dispatcherMetrics{ handleBatchLatency: handleBatchLatency, newBatchLatency: newBatchLatency, @@ -261,6 +269,7 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics { putAttestationLatency: putAttestationLatency, updateBatchStatusLatency: updateBatchStatusLatency, blobE2EDispersalLatency: blobE2EDispersalLatency, + completedBlobs: completedBlobs, } } @@ -343,3 +352,22 @@ func (m *dispatcherMetrics) reportUpdateBatchStatusLatency(duration time.Duratio func (m *dispatcherMetrics) reportE2EDispersalLatency(duration time.Duration) { m.blobE2EDispersalLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } + +func (m *dispatcherMetrics) reportCompletedBlob(size int, status dispv2.BlobStatus) { + switch status { + case dispv2.Certified: + m.completedBlobs.WithLabelValues("certified", "number").Inc() + m.completedBlobs.WithLabelValues("certified", "size").Add(float64(size)) + case dispv2.Failed: + m.completedBlobs.WithLabelValues("failed", "number").Inc() + m.completedBlobs.WithLabelValues("failed", "size").Add(float64(size)) + case dispv2.InsufficientSignatures: + m.completedBlobs.WithLabelValues("insufficient_signature", "number").Inc() + m.completedBlobs.WithLabelValues("insufficient_signature", "size").Add(float64(size)) + default: + return + } + + m.completedBlobs.WithLabelValues("total", "number").Inc() + m.completedBlobs.WithLabelValues("total", "size").Add(float64(size)) +} diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index 994b44bd3..090ecf58d 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -249,6 +249,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { requestedAt := time.Unix(0, int64(blob.RequestedAt)) e.metrics.reportE2EEncodingLatency(time.Since(requestedAt)) + e.metrics.reportCompletedBlob(int(blob.BlobSize), v2.Encoded) } else { e.metrics.reportFailedSubmission() storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout) @@ -258,6 +259,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { e.logger.Error("failed to update blob status to Failed", "blobKey", blobKey.Hex(), "err", err) return } + e.metrics.reportCompletedBlob(int(blob.BlobSize), v2.Failed) } }) } diff --git a/disperser/controller/encoding_manager_metrics.go b/disperser/controller/encoding_manager_metrics.go index 3ba35bdc2..ad1372457 100644 --- a/disperser/controller/encoding_manager_metrics.go +++ b/disperser/controller/encoding_manager_metrics.go @@ -4,6 +4,7 @@ import ( "time" common "github.com/Layr-Labs/eigenda/common" + dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -22,6 +23,7 @@ type encodingManagerMetrics struct { batchDataSize *prometheus.GaugeVec batchRetryCount *prometheus.GaugeVec failedSubmissionCount *prometheus.CounterVec + completedBlobs *prometheus.CounterVec } // NewEncodingManagerMetrics sets up metrics for the encoding manager. @@ -122,6 +124,15 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe []string{}, ) + completedBlobs := promauto.With(registry).NewCounterVec( + prometheus.CounterOpts{ + Namespace: encodingManagerNamespace, + Name: "completed_blobs_total", + Help: "The number and size of completed blobs by status.", + }, + []string{"state", "data"}, + ) + return &encodingManagerMetrics{ batchSubmissionLatency: batchSubmissionLatency, blobHandleLatency: blobHandleLatency, @@ -133,6 +144,7 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe batchDataSize: batchDataSize, batchRetryCount: batchRetryCount, failedSubmissionCount: failSubmissionCount, + completedBlobs: completedBlobs, } } @@ -175,3 +187,19 @@ func (m *encodingManagerMetrics) reportBatchRetryCount(count int) { func (m *encodingManagerMetrics) reportFailedSubmission() { m.failedSubmissionCount.WithLabelValues().Inc() } + +func (m *encodingManagerMetrics) reportCompletedBlob(size int, status dispv2.BlobStatus) { + switch status { + case dispv2.Encoded: + m.completedBlobs.WithLabelValues("encoded", "number").Inc() + m.completedBlobs.WithLabelValues("encoded", "size").Add(float64(size)) + case dispv2.Failed: + m.completedBlobs.WithLabelValues("failed", "number").Inc() + m.completedBlobs.WithLabelValues("failed", "size").Add(float64(size)) + default: + return + } + + m.completedBlobs.WithLabelValues("total", "number").Inc() + m.completedBlobs.WithLabelValues("total", "size").Add(float64(size)) +}