Skip to content

Commit

Permalink
Add e2e latency metrics in controller (#1095)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Jan 10, 2025
1 parent 5492fe3 commit b09cdc3
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 1 deletion.
8 changes: 8 additions & 0 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type batchData struct {
Batch *corev2.Batch
BatchHeaderHash [32]byte
BlobKeys []corev2.BlobKey
Metadata map[corev2.BlobKey]*v2.BlobMetadata
OperatorState *core.IndexedOperatorState
}

Expand Down Expand Up @@ -351,6 +352,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}

keys := make([]corev2.BlobKey, len(blobMetadatas))
metadataMap := make(map[corev2.BlobKey]*v2.BlobMetadata, len(blobMetadatas))
for i, metadata := range blobMetadatas {
if metadata == nil || metadata.BlobHeader == nil {
return nil, fmt.Errorf("invalid blob metadata")
Expand All @@ -360,6 +362,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob key: %w", err)
}
keys[i] = blobKey
metadataMap[blobKey] = metadata
}

certs, _, err := d.blobMetadataStore.GetBlobCertificates(ctx, keys)
Expand Down Expand Up @@ -471,6 +474,7 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
},
BatchHeaderHash: batchHeaderHash,
BlobKeys: keys,
Metadata: metadataMap,
OperatorState: state,
}, nil
}
Expand Down Expand Up @@ -541,6 +545,10 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to certified: %w", blobKey.Hex(), err))
}
if metadata, ok := batch.Metadata[blobKey]; ok {
requestedAt := time.Unix(0, int64(metadata.RequestedAt))
d.metrics.reportE2EDispersalLatency(time.Since(requestedAt))
}
}

return multierr
Expand Down
17 changes: 17 additions & 0 deletions disperser/controller/dispatcher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type dispatcherMetrics struct {
aggregateSignaturesLatency *prometheus.SummaryVec
putAttestationLatency *prometheus.SummaryVec
updateBatchStatusLatency *prometheus.SummaryVec

blobE2EDispersalLatency *prometheus.SummaryVec
}

// NewDispatcherMetrics sets up metrics for the dispatcher.
Expand Down Expand Up @@ -228,6 +230,16 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {
[]string{},
)

blobE2EDispersalLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: encodingManagerNamespace,
Name: "e2e_dispersal_latency_ms",
Help: "The time required to disperse a blob end-to-end.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

return &dispatcherMetrics{
handleBatchLatency: handleBatchLatency,
newBatchLatency: newBatchLatency,
Expand All @@ -248,6 +260,7 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {
aggregateSignaturesLatency: aggregateSignaturesLatency,
putAttestationLatency: putAttestationLatency,
updateBatchStatusLatency: updateBatchStatusLatency,
blobE2EDispersalLatency: blobE2EDispersalLatency,
}
}

Expand Down Expand Up @@ -326,3 +339,7 @@ func (m *dispatcherMetrics) reportPutAttestationLatency(duration time.Duration)
func (m *dispatcherMetrics) reportUpdateBatchStatusLatency(duration time.Duration) {
m.updateBatchStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportE2EDispersalLatency(duration time.Duration) {
m.blobE2EDispersalLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}
3 changes: 3 additions & 0 deletions disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
e.metrics.reportUpdateBlobStatusLatency(
finishedUpdateBlobStatusTime.Sub(finishedPutBlobCertificateTime))
e.metrics.reportBlobHandleLatency(time.Since(start))

requestedAt := time.Unix(0, int64(blob.RequestedAt))
e.metrics.reportE2EEncodingLatency(time.Since(requestedAt))
} else {
e.metrics.reportFailedSubmission()
storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
Expand Down
19 changes: 18 additions & 1 deletion disperser/controller/encoding_manager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package controller

import (
"time"

common "github.com/Layr-Labs/eigenda/common"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"time"
)

const encodingManagerNamespace = "eigenda_encoding_manager"
Expand All @@ -16,6 +17,7 @@ type encodingManagerMetrics struct {
encodingLatency *prometheus.SummaryVec
putBlobCertLatency *prometheus.SummaryVec
updateBlobStatusLatency *prometheus.SummaryVec
blobE2EEncodingLatency *prometheus.SummaryVec
batchSize *prometheus.GaugeVec
batchDataSize *prometheus.GaugeVec
batchRetryCount *prometheus.GaugeVec
Expand Down Expand Up @@ -74,6 +76,16 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe
[]string{},
)

blobE2EEncodingLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: encodingManagerNamespace,
Name: "e2e_encoding_latency_ms",
Help: "The time required to encode a blob end-to-end.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

batchSize := promauto.With(registry).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: encodingManagerNamespace,
Expand Down Expand Up @@ -116,6 +128,7 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe
encodingLatency: encodingLatency,
putBlobCertLatency: putBlobCertLatency,
updateBlobStatusLatency: updateBlobStatusLatency,
blobE2EEncodingLatency: blobE2EEncodingLatency,
batchSize: batchSize,
batchDataSize: batchDataSize,
batchRetryCount: batchRetryCount,
Expand Down Expand Up @@ -143,6 +156,10 @@ func (m *encodingManagerMetrics) reportUpdateBlobStatusLatency(duration time.Dur
m.updateBlobStatusLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportE2EEncodingLatency(duration time.Duration) {
m.blobE2EEncodingLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *encodingManagerMetrics) reportBatchSize(size int) {
m.batchSize.WithLabelValues().Set(float64(size))
}
Expand Down

0 comments on commit b09cdc3

Please sign in to comment.