Skip to content

Commit

Permalink
Add throughput metrics to controller (#1094)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored Jan 10, 2025
1 parent 2699b91 commit 0db7695
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 11 deletions.
10 changes: 10 additions & 0 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
if metadata, ok := batch.Metadata[blobKey]; ok {
d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Failed)
}
continue
}

Expand All @@ -538,6 +541,9 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
if metadata, ok := batch.Metadata[blobKey]; ok {
d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.InsufficientSignatures)
}
continue
}

Expand All @@ -548,6 +554,7 @@ func (d *Dispatcher) updateBatchStatus(ctx context.Context, batch *batchData, qu
if metadata, ok := batch.Metadata[blobKey]; ok {
requestedAt := time.Unix(0, int64(metadata.RequestedAt))
d.metrics.reportE2EDispersalLatency(time.Since(requestedAt))
d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Certified)
}
}

Expand All @@ -561,6 +568,9 @@ func (d *Dispatcher) failBatch(ctx context.Context, batch *batchData) error {
if err != nil {
multierr = multierror.Append(multierr, fmt.Errorf("failed to update blob status for blob %s to failed: %w", blobKey.Hex(), err))
}
if metadata, ok := batch.Metadata[blobKey]; ok {
d.metrics.reportCompletedBlob(int(metadata.BlobSize), v2.Failed)
}
}

return multierr
Expand Down
50 changes: 39 additions & 11 deletions disperser/controller/dispatcher_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package controller
import (
"time"

"github.com/Layr-Labs/eigenda/common"
common "github.com/Layr-Labs/eigenda/common"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -26,19 +27,17 @@ type dispatcherMetrics struct {
sendChunksLatency *prometheus.SummaryVec
sendChunksRetryCount *prometheus.GaugeVec
putDispersalResponseLatency *prometheus.SummaryVec

handleSignaturesLatency *prometheus.SummaryVec
receiveSignaturesLatency *prometheus.SummaryVec
aggregateSignaturesLatency *prometheus.SummaryVec
putAttestationLatency *prometheus.SummaryVec
updateBatchStatusLatency *prometheus.SummaryVec

blobE2EDispersalLatency *prometheus.SummaryVec
handleSignaturesLatency *prometheus.SummaryVec
receiveSignaturesLatency *prometheus.SummaryVec
aggregateSignaturesLatency *prometheus.SummaryVec
putAttestationLatency *prometheus.SummaryVec
updateBatchStatusLatency *prometheus.SummaryVec
blobE2EDispersalLatency *prometheus.SummaryVec
completedBlobs *prometheus.CounterVec
}

// NewDispatcherMetrics sets up metrics for the dispatcher.
func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {

objectives := map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}

handleBatchLatency := promauto.With(registry).NewSummaryVec(
Expand Down Expand Up @@ -232,14 +231,23 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {

blobE2EDispersalLatency := promauto.With(registry).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: encodingManagerNamespace,
Namespace: dispatcherNamespace,
Name: "e2e_dispersal_latency_ms",
Help: "The time required to disperse a blob end-to-end.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{},
)

completedBlobs := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: dispatcherNamespace,
Name: "completed_blobs_total",
Help: "The number and size of completed blobs by status.",
},
[]string{"state", "data"},
)

return &dispatcherMetrics{
handleBatchLatency: handleBatchLatency,
newBatchLatency: newBatchLatency,
Expand All @@ -261,6 +269,7 @@ func newDispatcherMetrics(registry *prometheus.Registry) *dispatcherMetrics {
putAttestationLatency: putAttestationLatency,
updateBatchStatusLatency: updateBatchStatusLatency,
blobE2EDispersalLatency: blobE2EDispersalLatency,
completedBlobs: completedBlobs,
}
}

Expand Down Expand Up @@ -343,3 +352,22 @@ func (m *dispatcherMetrics) reportUpdateBatchStatusLatency(duration time.Duratio
func (m *dispatcherMetrics) reportE2EDispersalLatency(duration time.Duration) {
m.blobE2EDispersalLatency.WithLabelValues().Observe(common.ToMilliseconds(duration))
}

func (m *dispatcherMetrics) reportCompletedBlob(size int, status dispv2.BlobStatus) {
switch status {
case dispv2.Certified:
m.completedBlobs.WithLabelValues("certified", "number").Inc()
m.completedBlobs.WithLabelValues("certified", "size").Add(float64(size))
case dispv2.Failed:
m.completedBlobs.WithLabelValues("failed", "number").Inc()
m.completedBlobs.WithLabelValues("failed", "size").Add(float64(size))
case dispv2.InsufficientSignatures:
m.completedBlobs.WithLabelValues("insufficient_signature", "number").Inc()
m.completedBlobs.WithLabelValues("insufficient_signature", "size").Add(float64(size))
default:
return
}

m.completedBlobs.WithLabelValues("total", "number").Inc()
m.completedBlobs.WithLabelValues("total", "size").Add(float64(size))
}
2 changes: 2 additions & 0 deletions disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {

requestedAt := time.Unix(0, int64(blob.RequestedAt))
e.metrics.reportE2EEncodingLatency(time.Since(requestedAt))
e.metrics.reportCompletedBlob(int(blob.BlobSize), v2.Encoded)
} else {
e.metrics.reportFailedSubmission()
storeCtx, cancel := context.WithTimeout(ctx, e.StoreTimeout)
Expand All @@ -258,6 +259,7 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
e.logger.Error("failed to update blob status to Failed", "blobKey", blobKey.Hex(), "err", err)
return
}
e.metrics.reportCompletedBlob(int(blob.BlobSize), v2.Failed)
}
})
}
Expand Down
28 changes: 28 additions & 0 deletions disperser/controller/encoding_manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

common "github.com/Layr-Labs/eigenda/common"
dispv2 "github.com/Layr-Labs/eigenda/disperser/common/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -22,6 +23,7 @@ type encodingManagerMetrics struct {
batchDataSize *prometheus.GaugeVec
batchRetryCount *prometheus.GaugeVec
failedSubmissionCount *prometheus.CounterVec
completedBlobs *prometheus.CounterVec
}

// NewEncodingManagerMetrics sets up metrics for the encoding manager.
Expand Down Expand Up @@ -122,6 +124,15 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe
[]string{},
)

completedBlobs := promauto.With(registry).NewCounterVec(
prometheus.CounterOpts{
Namespace: encodingManagerNamespace,
Name: "completed_blobs_total",
Help: "The number and size of completed blobs by status.",
},
[]string{"state", "data"},
)

return &encodingManagerMetrics{
batchSubmissionLatency: batchSubmissionLatency,
blobHandleLatency: blobHandleLatency,
Expand All @@ -133,6 +144,7 @@ func newEncodingManagerMetrics(registry *prometheus.Registry) *encodingManagerMe
batchDataSize: batchDataSize,
batchRetryCount: batchRetryCount,
failedSubmissionCount: failSubmissionCount,
completedBlobs: completedBlobs,
}
}

Expand Down Expand Up @@ -175,3 +187,19 @@ func (m *encodingManagerMetrics) reportBatchRetryCount(count int) {
func (m *encodingManagerMetrics) reportFailedSubmission() {
m.failedSubmissionCount.WithLabelValues().Inc()
}

func (m *encodingManagerMetrics) reportCompletedBlob(size int, status dispv2.BlobStatus) {
switch status {
case dispv2.Encoded:
m.completedBlobs.WithLabelValues("encoded", "number").Inc()
m.completedBlobs.WithLabelValues("encoded", "size").Add(float64(size))
case dispv2.Failed:
m.completedBlobs.WithLabelValues("failed", "number").Inc()
m.completedBlobs.WithLabelValues("failed", "size").Add(float64(size))
default:
return
}

m.completedBlobs.WithLabelValues("total", "number").Inc()
m.completedBlobs.WithLabelValues("total", "size").Add(float64(size))
}

0 comments on commit 0db7695

Please sign in to comment.