diff --git a/metrics.md b/metrics.md deleted file mode 100644 index 7138d98ca..000000000 --- a/metrics.md +++ /dev/null @@ -1,5 +0,0 @@ -# EigenDA Metrics Documentation - -- [churner](operators/churner/mdoc/churner-metrics.md) -- [relay](relay/mdoc/relay-metrics.md) - diff --git a/operators/churner/churner_test.go b/operators/churner/churner_test.go index 35c026977..0854de749 100644 --- a/operators/churner/churner_test.go +++ b/operators/churner/churner_test.go @@ -28,8 +28,7 @@ func TestProcessChurnRequest(t *testing.T) { NumRetries: numRetries, }, } - metrics, err := churner.NewMetrics(9001, logger) - assert.NoError(t, err) + metrics := churner.NewMetrics("9001", logger) cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics) assert.NoError(t, err) assert.NotNil(t, cn) diff --git a/operators/churner/cmd/main.go b/operators/churner/cmd/main.go index 33a8c422d..a9ecd76e8 100644 --- a/operators/churner/cmd/main.go +++ b/operators/churner/cmd/main.go @@ -86,10 +86,7 @@ func run(ctx *cli.Context) error { logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint) indexer := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger) - metrics, err := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger) - if err != nil { - log.Fatalf("failed to create metrics: %v", err) - } + metrics := churner.NewMetrics(config.MetricsConfig.HTTPPort, logger) cn, err := churner.NewChurner(config, indexer, tx, logger, metrics) if err != nil { diff --git a/operators/churner/config.go b/operators/churner/config.go index fe2b1735a..d35a72121 100644 --- a/operators/churner/config.go +++ b/operators/churner/config.go @@ -37,7 +37,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { PerPublicKeyRateLimit: ctx.GlobalDuration(flags.PerPublicKeyRateLimit.Name), ChurnApprovalInterval: ctx.GlobalDuration(flags.ChurnApprovalInterval.Name), MetricsConfig: MetricsConfig{ - HTTPPort: ctx.GlobalInt(flags.MetricsHTTPPort.Name), + HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name), }, }, nil diff --git a/operators/churner/flags/flags.go b/operators/churner/flags/flags.go index 507a35276..906096c49 100644 --- a/operators/churner/flags/flags.go +++ b/operators/churner/flags/flags.go @@ -58,11 +58,11 @@ var ( EnvVar: common.PrefixEnvVar(envPrefix, "ENABLE_METRICS"), } /* Optional Flags*/ - MetricsHTTPPort = cli.IntFlag{ + MetricsHTTPPort = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "metrics-http-port"), Usage: "the http port which the metrics prometheus server is listening", Required: false, - Value: 9100, + Value: "9100", EnvVar: common.PrefixEnvVar(envPrefix, "METRICS_HTTP_PORT"), } ChurnApprovalInterval = cli.DurationFlag{ diff --git a/operators/churner/mdoc/churner-metrics.md b/operators/churner/mdoc/churner-metrics.md deleted file mode 100644 index 7a6b04180..000000000 --- a/operators/churner/mdoc/churner-metrics.md +++ /dev/null @@ -1,33 +0,0 @@ -# Metrics Documentation for namespace 'eigenda_churner' - -This documentation was automatically generated at time `2024-11-26T14:29:13-06:00` - -There are a total of `2` registered metrics. - ---- - -## latency_ms - -latency summary in milliseconds - -| | | -|---|---| -| **Name** | `latency` | -| **Unit** | `ms` | -| **Labels** | `method` | -| **Type** | `latency` | -| **Quantiles** | `0.500`, `0.900`, `0.950`, `0.990` | -| **Fully Qualified Name** | `eigenda_churner_latency_ms` | ---- - -## request_count - -the number of requests - -| | | -|---|---| -| **Name** | `request` | -| **Unit** | `count` | -| **Labels** | `status`, `method`, `reason` | -| **Type** | `counter` | -| **Fully Qualified Name** | `eigenda_churner_request_count` | diff --git a/operators/churner/mdoc/main.go b/operators/churner/mdoc/main.go deleted file mode 100644 index db022d61f..000000000 --- a/operators/churner/mdoc/main.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import ( - "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/operators/churner" -) - -// main generates documentation for churner metrics. -func main() { - logger, err := common.NewLogger(common.DefaultLoggerConfig()) - if err != nil { - panic(err) - } - - metrics, err := churner.NewMetrics(0, logger) - if err != nil { - panic(err) - } - - err = metrics.WriteMetricsDocumentation() - if err != nil { - panic(err) - } -} diff --git a/operators/churner/metrics.go b/operators/churner/metrics.go index 6ca30a0ed..2cece57ad 100644 --- a/operators/churner/metrics.go +++ b/operators/churner/metrics.go @@ -1,12 +1,15 @@ package churner import ( - "github.com/Layr-Labs/eigenda/common/metrics" - "time" + "context" + "fmt" + "net/http" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc/codes" ) @@ -25,7 +28,7 @@ const ( ) // Note: statusCodeMap must be maintained in sync with failure reason constants. -var statusCodeMap = map[FailReason]string{ +var statusCodeMap map[FailReason]string = map[FailReason]string{ FailReasonRateLimitExceeded: codes.ResourceExhausted.String(), FailReasonInsufficientStakeToRegister: codes.InvalidArgument.String(), FailReasonInsufficientStakeToChurn: codes.InvalidArgument.String(), @@ -37,77 +40,63 @@ var statusCodeMap = map[FailReason]string{ } type MetricsConfig struct { - HTTPPort int + HTTPPort string EnableMetrics bool } type Metrics struct { - metricsServer metrics.Metrics + registry *prometheus.Registry - numRequests metrics.CountMetric - latency metrics.LatencyMetric + NumRequests *prometheus.CounterVec + Latency *prometheus.SummaryVec - logger logging.Logger + httpPort string + logger logging.Logger } -type latencyLabel struct { - method string -} - -type numRequestsLabel struct { - status string - method string - reason string -} - -func NewMetrics(httpPort int, logger logging.Logger) (*Metrics, error) { +func NewMetrics(httpPort string, logger logging.Logger) *Metrics { + namespace := "eigenda_churner" reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - metricsServer := metrics.NewMetrics(logger, "eigenda_churner", httpPort) - - numRequests, err := metricsServer.NewCountMetric( - "request", - "the number of requests", - numRequestsLabel{}) - if err != nil { - return nil, err + metrics := &Metrics{ + NumRequests: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "requests", + Help: "the number of requests", + }, + []string{"status", "reason", "method"}, + ), + Latency: promauto.With(reg).NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "latency_ms", + Help: "latency summary in milliseconds", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + }, + []string{"method"}, + ), + registry: reg, + httpPort: httpPort, + logger: logger.With("component", "ChurnerMetrics"), } - - latency, err := metricsServer.NewLatencyMetric( - "latency", - "latency summary in milliseconds", - latencyLabel{}, - &metrics.Quantile{Quantile: 0.5, Error: 0.05}, - &metrics.Quantile{Quantile: 0.9, Error: 0.01}, - &metrics.Quantile{Quantile: 0.95, Error: 0.01}, - &metrics.Quantile{Quantile: 0.99, Error: 0.001}) - if err != nil { - return nil, err - } - - return &Metrics{ - metricsServer: metricsServer, - numRequests: numRequests, - latency: latency, - logger: logger.With("component", "ChurnerMetrics"), - }, nil + return metrics } -// WriteMetricsDocumentation writes the metrics for the churner to a markdown file. -func (g *Metrics) WriteMetricsDocumentation() error { - return g.metricsServer.WriteMetricsDocumentation("operators/churner/mdoc/churner-metrics.md") -} - -// ObserveLatency observes the latency of a stage -func (g *Metrics) ObserveLatency(method string, latency time.Duration) { - g.latency.ReportLatency(latency, latencyLabel{method: method}) +// ObserveLatency observes the latency of a stage in 'stage +func (g *Metrics) ObserveLatency(method string, latencyMs float64) { + g.Latency.WithLabelValues(method).Observe(latencyMs) } // IncrementSuccessfulRequestNum increments the number of successful requests func (g *Metrics) IncrementSuccessfulRequestNum(method string) { - g.numRequests.Increment(numRequestsLabel{status: "success", method: method}) + g.NumRequests.With(prometheus.Labels{ + "status": "success", + "method": method, + "reason": "", + }).Inc() } // IncrementFailedRequestNum increments the number of failed requests @@ -119,11 +108,25 @@ func (g *Metrics) IncrementFailedRequestNum(method string, reason FailReason) { // handle a negligence of mapping from failure reason to status code. code = codes.Internal.String() } - - g.numRequests.Increment(numRequestsLabel{status: code, reason: string(reason), method: method}) + g.NumRequests.With(prometheus.Labels{ + "status": code, + "reason": string(reason), + "method": method, + }).Inc() } // Start starts the metrics server -func (g *Metrics) Start() error { - return g.metricsServer.Start() +func (g *Metrics) Start(ctx context.Context) { + g.logger.Info("Starting metrics server at ", "port", g.httpPort) + addr := fmt.Sprintf(":%s", g.httpPort) + go func() { + log := g.logger + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.HandlerFor( + g.registry, + promhttp.HandlerOpts{}, + )) + err := http.ListenAndServe(addr, mux) + log.Error("Prometheus server failed", "err", err) + }() } diff --git a/operators/churner/server.go b/operators/churner/server.go index 83f62bf7f..fc8ba9310 100644 --- a/operators/churner/server.go +++ b/operators/churner/server.go @@ -47,12 +47,8 @@ func NewServer( func (s *Server) Start(metricsConfig MetricsConfig) error { // Enable Metrics Block if metricsConfig.EnableMetrics { - httpSocket := fmt.Sprintf(":%d", metricsConfig.HTTPPort) - err := s.metrics.Start() - if err != nil { - return fmt.Errorf("failed to start metrics server: %w", err) - } - + httpSocket := fmt.Sprintf(":%s", metricsConfig.HTTPPort) + s.metrics.Start(context.Background()) s.logger.Info("Enabled metrics for Churner", "socket", httpSocket) } return nil @@ -66,7 +62,7 @@ func (s *Server) Churn(ctx context.Context, req *pb.ChurnRequest) (*pb.ChurnRepl } timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { - s.metrics.ObserveLatency("Churn", time.Duration(f*float64(time.Second))) + s.metrics.ObserveLatency("Churn", f*1000) // make milliseconds })) defer timer.ObserveDuration() s.logger.Info("Received request: ", "QuorumIds", req.GetQuorumIds()) diff --git a/operators/churner/server_test.go b/operators/churner/server_test.go index e2b0fb6a2..5c7c471b7 100644 --- a/operators/churner/server_test.go +++ b/operators/churner/server_test.go @@ -181,8 +181,7 @@ func newTestServer(t *testing.T) *churner.Server { setupMockWriter() - metrics, err := churner.NewMetrics(9001, logger) - assert.NoError(t, err) + metrics := churner.NewMetrics("9001", logger) cn, err := churner.NewChurner(config, mockIndexer, transactorMock, logger, metrics) if err != nil { log.Fatalln("cannot create churner", err) diff --git a/operators/churner/tests/churner_test.go b/operators/churner/tests/churner_test.go index 5625c537c..ba9f11c52 100644 --- a/operators/churner/tests/churner_test.go +++ b/operators/churner/tests/churner_test.go @@ -225,8 +225,7 @@ func newTestServer(t *testing.T) *churner.Server { ) assert.NoError(t, err) - metrics, err := churner.NewMetrics(9001, logger) - assert.NoError(t, err) + metrics := churner.NewMetrics("9001", logger) cn, err := churner.NewChurner(config, mockIndexer, operatorTransactorChurner, logger, metrics) assert.NoError(t, err)