From b09cdc35860a0388b9d0e04262ece8c3a89a2dbe Mon Sep 17 00:00:00 2001 From: Ian Shim <100327837+ian-shim@users.noreply.github.com> Date: Fri, 10 Jan 2025 09:17:24 +0900 Subject: [PATCH] Add e2e latency metrics in controller (#1095) --- disperser/controller/dispatcher.go | 8 ++++++++ disperser/controller/dispatcher_metrics.go | 17 +++++++++++++++++ disperser/controller/encoding_manager.go | 3 +++ .../controller/encoding_manager_metrics.go | 19 ++++++++++++++++++- 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index a6fe5e2ac..947ce3c22 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -50,6 +50,7 @@ type batchData struct { Batch *corev2.Batch BatchHeaderHash [32]byte BlobKeys []corev2.BlobKey + Metadata map[corev2.BlobKey]*v2.BlobMetadata OperatorState *core.IndexedOperatorState } @@ -351,6 +352,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) } keys := make([]corev2.BlobKey, len(blobMetadatas)) + metadataMap := make(map[corev2.BlobKey]*v2.BlobMetadata, len(blobMetadatas)) for i, metadata := range blobMetadatas { if metadata == nil || metadata.BlobHeader == nil { return nil, fmt.Errorf("invalid blob metadata") @@ -360,6 +362,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) return nil, fmt.Errorf("failed to get blob key: %w", err) } keys[i] = blobKey + metadataMap[blobKey] = metadata } certs, _, err := d.blobMetadataStore.GetBlobCertificates(ctx, keys) @@ -471,6 +474,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) }, BatchHeaderHash: batchHeaderHash, BlobKeys: keys, + Metadata: metadataMap, OperatorState: state, }, nil } @@ -541,6 +545,10 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu if err != nil { multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to certified: %w", blobKey.Hex(), err)) } + if metadata, ok := batch.Metadata[blobKey]; ok { + requestedAt := time.Unix(0, int64(metadata.RequestedAt)) + d.metrics.reportE2EDispersalLatency(time.Since(requestedAt)) + } } return multierr diff --git a/disperser/controller/dispatcher_metrics.go b/disperser/controller/dispatcher_metrics.go index 00dc79e5a..7e88971c3 100644 --- a/disperser/controller/dispatcher_metrics.go +++ b/disperser/controller/dispatcher_metrics.go @@ -32,6 +32,8 @@ type dispatcherMetrics struct { aggregateSignaturesLatency *prometheus.SummaryVec putAttestationLatency *prometheus.SummaryVec updateBatchStatusLatency *prometheus.SummaryVec + + blobE2EDispersalLatency *prometheus.SummaryVec } // NewDispatcherMetrics sets up metrics for the dispatcher. @@ -228,6 +230,16 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics { []string{}, ) + blobE2EDispersalLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: encodingManagerNamespace, + Name: "e2e_dispersal_latency_ms", + Help: "The time required to disperse a blob end-to-end.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{}, + ) + return &dispatcherMetrics{ handleBatchLatency: handleBatchLatency, newBatchLatency: newBatchLatency, @@ -248,6 +260,7 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics { aggregateSignaturesLatency: aggregateSignaturesLatency, putAttestationLatency: putAttestationLatency, updateBatchStatusLatency: updateBatchStatusLatency, + blobE2EDispersalLatency: blobE2EDispersalLatency, } } @@ -326,3 +339,7 @@ func (m *dispatcherMetrics) reportPutAttestationLatency(duration time.Duration) func (m *dispatcherMetrics) reportUpdateBatchStatusLatency(duration time.Duration) { m.updateBatchStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } + +func (m *dispatcherMetrics) reportE2EDispersalLatency(duration time.Duration) { + m.blobE2EDispersalLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) +} diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index cbd9f444d..994b44bd3 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -246,6 +246,9 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error { e.metrics.reportUpdateBlobStatusLatency( finishedUpdateBlobStatusTime.Sub(finishedPutBlobCertificateTime)) e.metrics.reportBlobHandleLatency(time.Since(start)) + + requestedAt := time.Unix(0, int64(blob.RequestedAt)) + e.metrics.reportE2EEncodingLatency(time.Since(requestedAt)) } else { e.metrics.reportFailedSubmission() storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout) diff --git a/disperser/controller/encoding_manager_metrics.go b/disperser/controller/encoding_manager_metrics.go index 791193203..3ba35bdc2 100644 --- a/disperser/controller/encoding_manager_metrics.go +++ b/disperser/controller/encoding_manager_metrics.go @@ -1,10 +1,11 @@ package controller import ( + "time" + common "github.com/Layr-Labs/eigenda/common" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "time" ) const encodingManagerNamespace = "eigenda_encoding_manager" @@ -16,6 +17,7 @@ type encodingManagerMetrics struct { encodingLatency *prometheus.SummaryVec putBlobCertLatency *prometheus.SummaryVec updateBlobStatusLatency *prometheus.SummaryVec + blobE2EEncodingLatency *prometheus.SummaryVec batchSize *prometheus.GaugeVec batchDataSize *prometheus.GaugeVec batchRetryCount *prometheus.GaugeVec @@ -74,6 +76,16 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe []string{}, ) + blobE2EEncodingLatency := promauto.With(registry).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: encodingManagerNamespace, + Name: "e2e_encoding_latency_ms", + Help: "The time required to encode a blob end-to-end.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, + []string{}, + ) + batchSize := promauto.With(registry).NewGaugeVec( prometheus.GaugeOpts{ Namespace: encodingManagerNamespace, @@ -116,6 +128,7 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe encodingLatency: encodingLatency, putBlobCertLatency: putBlobCertLatency, updateBlobStatusLatency: updateBlobStatusLatency, + blobE2EEncodingLatency: blobE2EEncodingLatency, batchSize: batchSize, batchDataSize: batchDataSize, batchRetryCount: batchRetryCount, @@ -143,6 +156,10 @@ func (m *encodingManagerMetrics) reportUpdateBlobStatusLatency(duration time.Dur m.updateBlobStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) } +func (m *encodingManagerMetrics) reportE2EEncodingLatency(duration time.Duration) { + m.blobE2EEncodingLatency.WithLabelValues().Observe(common.ToMilliseconds(duration)) +} + func (m *encodingManagerMetrics) reportBatchSize(size int) { m.batchSize.WithLabelValues().Set(float64(size)) }