From 42b46dd79bf8a2bdc0b81ed8e6033d60ea9c8d45 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Thu, 24 Aug 2023 09:53:59 -0700 Subject: [PATCH] Improve actor and http api server metrics (#539) * actor and http api metrics * cleanup --- .../metrics/LoggingMetricsPublisher.java | 65 +++++++++++++++++++ .../master/api/akka/MasterApiAkkaService.java | 33 +++++----- .../api/akka/route/MasterApiMetrics.java | 7 ++ .../master/api/akka/route/v1/BaseRoute.java | 7 ++ .../resourcecluster/ResourceClusterActor.java | 3 +- .../io/mantisrx/server/master/MasterMain.java | 3 + .../main/resources/master-docker.properties | 3 + .../test/java/TestContainerHelloWorld.java | 28 +++++--- 8 files changed, 124 insertions(+), 25 deletions(-) create mode 100644 mantis-common/src/main/java/io/mantisrx/common/metrics/LoggingMetricsPublisher.java diff --git a/mantis-common/src/main/java/io/mantisrx/common/metrics/LoggingMetricsPublisher.java b/mantis-common/src/main/java/io/mantisrx/common/metrics/LoggingMetricsPublisher.java new file mode 100644 index 000000000..eb952129a --- /dev/null +++ b/mantis-common/src/main/java/io/mantisrx/common/metrics/LoggingMetricsPublisher.java @@ -0,0 +1,65 @@ +/* + * Copyright 2023 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mantisrx.common.metrics; + +import io.mantisrx.shaded.com.google.common.base.Joiner; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +/** + * This implementation will print the selected metric groups in log. + * To use this publisher override config "mantis.metricsPublisher.class" in the master-*.properties file. + */ +@Slf4j +public class LoggingMetricsPublisher extends MetricsPublisher { + public static final String LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY = + "MANTIS_LOGGING_ENABLED_METRICS_GROUP_ID_LIST"; + private Set loggingEnabledMetricsGroupId = new HashSet<>(); + + public LoggingMetricsPublisher(Properties properties) { + super(properties); + + String key = properties.getProperty(LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY, + System.getenv(LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY)); + log.info("LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY: {}", key); + + if (key != null) { + this.loggingEnabledMetricsGroupId = + Arrays.stream(key + .toLowerCase().split(";")).collect(Collectors.toSet()); + log.info("[Metrics Publisher] enable logging for: {}", + Joiner.on(',').join(this.loggingEnabledMetricsGroupId)); + } + } + + @Override + public void publishMetrics(long timestamp, + Collection currentMetricsRegistered) { + log.info("Printing metrics from: {}", Instant.ofEpochMilli(timestamp)); + currentMetricsRegistered.stream() + .filter(ms -> this.loggingEnabledMetricsGroupId.contains(ms.getMetricGroupId().id().toLowerCase())) + .map(ms -> ms.counters().entrySet()) + .flatMap(Collection::stream) + .forEach(m -> log.info("[METRICS] {} : {}", m.getKey(), m.getValue().value())); + } +} diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java index 385648107..fe0a71cc9 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/MasterApiAkkaService.java @@ -19,7 +19,6 @@ import akka.NotUsed; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; @@ -28,8 +27,10 @@ import akka.http.javadsl.settings.WebSocketSettings; import akka.stream.Materializer; import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Sink; import com.netflix.spectator.impl.Preconditions; import io.mantisrx.master.api.akka.route.MantisMasterRoute; +import io.mantisrx.master.api.akka.route.MasterApiMetrics; import io.mantisrx.master.api.akka.route.handlers.JobArtifactRouteHandler; import io.mantisrx.master.api.akka.route.handlers.JobArtifactRouteHandlerImpl; import io.mantisrx.master.api.akka.route.handlers.JobClusterRouteHandler; @@ -198,8 +199,6 @@ private void startAPIServer() { final Flow routeFlow = this.mantisMasterRoute.createRoute().flow(system, materializer); - final Http http = Http.get(system); - ServerSettings defaultSettings = ServerSettings.create(system); java.time.Duration idleTimeout = system.settings().config().getDuration("akka.http.server.idle-timeout"); logger.info("idle timeout {} sec ", idleTimeout.getSeconds()); @@ -209,18 +208,22 @@ private void startAPIServer() { ServerSettings customServerSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings); - final CompletionStage binding = http.bindAndHandle(routeFlow, - ConnectHttp.toHost("0.0.0.0", port), - customServerSettings, - system.log(), - materializer); - binding.exceptionally(failure -> { - System.err.println("API service exited, committing suicide !" + failure.getMessage()); - logger.info("Master API service exited in error, committing suicide !"); - system.terminate(); - System.exit(2); - return null; - }); + final CompletionStage binding = Http.get(system) + .newServerAt("0.0.0.0", port) + .withSettings(customServerSettings) + .connectionSource() + .to(Sink.foreach(connection -> { + MasterApiMetrics.getInstance().incrementIncomingRequestCount(); + connection.handleWith(routeFlow, materializer); + })) + .run(materializer) + .exceptionally(failure -> { + System.err.println("API service exited, committing suicide !" + failure.getMessage()); + logger.info("Master API service exited in error, committing suicide !"); + system.terminate(); + System.exit(2); + return null; + }); logger.info("Starting Mantis Master API on port {}", port); try { serviceLatch.await(); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/MasterApiMetrics.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/MasterApiMetrics.java index bac2b37c8..a51f0c38b 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/MasterApiMetrics.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/MasterApiMetrics.java @@ -24,6 +24,7 @@ public class MasterApiMetrics { private final Counter resp2xx; private final Counter resp4xx; private final Counter resp5xx; + private final Counter incomingRequestCount; private final Counter askTimeOutCount; private static final MasterApiMetrics INSTANCE = new MasterApiMetrics(); @@ -31,6 +32,7 @@ public class MasterApiMetrics { private MasterApiMetrics() { Metrics m = new Metrics.Builder() .id("MasterApiMetrics") + .addCounter("incomingRequestCount") .addCounter("resp2xx") .addCounter("resp4xx") .addCounter("resp5xx") @@ -41,6 +43,7 @@ private MasterApiMetrics() { this.resp2xx = metrics.getCounter("resp2xx"); this.resp4xx = metrics.getCounter("resp4xx"); this.resp5xx = metrics.getCounter("resp5xx"); + this.incomingRequestCount = metrics.getCounter("incomingRequestCount"); } public static final MasterApiMetrics getInstance() { @@ -62,4 +65,8 @@ public void incrementResp5xx() { public void incrementAskTimeOutCount() { askTimeOutCount.increment(); } + + public void incrementIncomingRequestCount() { + incomingRequestCount.increment(); + } } diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/BaseRoute.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/BaseRoute.java index e7cad7aab..e590f0545 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/BaseRoute.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/api/akka/route/v1/BaseRoute.java @@ -326,13 +326,20 @@ protected Route withFuture(CompletableFuture tFuture) { t -> t.fold( throwable -> { if (throwable instanceof TaskExecutorNotFoundException) { + MasterApiMetrics.getInstance().incrementResp4xx(); return complete(StatusCodes.NOT_FOUND); } if (throwable instanceof RequestThrottledException) { + MasterApiMetrics.getInstance().incrementResp4xx(); return complete(StatusCodes.TOO_MANY_REQUESTS); } + if (throwable instanceof AskTimeoutException) { + MasterApiMetrics.getInstance().incrementAskTimeOutCount(); + } + + MasterApiMetrics.getInstance().incrementResp5xx(); return complete(StatusCodes.INTERNAL_SERVER_ERROR, throwable, Jackson.marshaller()); }, r -> complete(StatusCodes.OK, r, Jackson.marshaller()))); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java index 4eaefe6ad..e2e719391 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ResourceClusterActor.java @@ -117,7 +117,8 @@ class ResourceClusterActor extends AbstractActorWithTimers { private final String jobClustersWithArtifactCachingEnabled; static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, Duration assignmentTimeout, Duration disabledTaskExecutorsCheckInterval, Clock clock, RpcService rpcService, MantisJobStore mantisJobStore, JobMessageRouter jobMessageRouter, int maxJobArtifactsToCache, String jobClustersWithArtifactCachingEnabled) { - return Props.create(ResourceClusterActor.class, clusterID, heartbeatTimeout, assignmentTimeout, disabledTaskExecutorsCheckInterval, clock, rpcService, mantisJobStore, jobMessageRouter, maxJobArtifactsToCache, jobClustersWithArtifactCachingEnabled); + return Props.create(ResourceClusterActor.class, clusterID, heartbeatTimeout, assignmentTimeout, disabledTaskExecutorsCheckInterval, clock, rpcService, mantisJobStore, jobMessageRouter, maxJobArtifactsToCache, jobClustersWithArtifactCachingEnabled) + .withMailbox("akka.actor.metered-mailbox"); } ResourceClusterActor( diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java index 2e377e398..7f8f6c16b 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java @@ -25,10 +25,12 @@ import com.netflix.fenzo.AutoScaleAction; import com.netflix.fenzo.AutoScaleRule; import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.spectator.api.DefaultRegistry; import com.sampullara.cli.Args; import com.sampullara.cli.Argument; import io.mantisrx.common.metrics.Metrics; import io.mantisrx.common.metrics.MetricsRegistry; +import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory; import io.mantisrx.master.DeadLetterActor; import io.mantisrx.master.JobClustersManagerActor; import io.mantisrx.master.JobClustersManagerService; @@ -365,6 +367,7 @@ public static void main(String[] args) { } try { + SpectatorRegistryFactory.setRegistry(new DefaultRegistry()); Properties props = new Properties(); props.putAll(System.getenv()); props.putAll(System.getProperties()); diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/resources/master-docker.properties b/mantis-control-plane/mantis-control-plane-server/src/main/resources/master-docker.properties index 1924d3e8e..41750ab17 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/resources/master-docker.properties +++ b/mantis-control-plane/mantis-control-plane-server/src/main/resources/master-docker.properties @@ -50,3 +50,6 @@ mesos.worker.executorscript=startup_docker.sh mesos.worker.installDir=/mnt/local/mantisWorkerInstall mantis.master.framework.name=MantisFramework mesos.worker.timeoutSecondsToReportStart=10 + +# test container override +mantis.metricsPublisher.class=io.mantisrx.common.metrics.LoggingMetricsPublisher diff --git a/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java b/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java index 6f34751de..35aac98bb 100644 --- a/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java +++ b/mantis-testcontainers/src/test/java/TestContainerHelloWorld.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.mantisrx.common.metrics.LoggingMetricsPublisher; import io.mantisrx.server.master.resourcecluster.TaskExecutorID; import io.mantisrx.shaded.com.fasterxml.jackson.core.type.TypeReference; import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper; @@ -44,6 +45,7 @@ import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.images.builder.ImageFromDockerfile; +import org.testcontainers.utility.Base58; import org.testcontainers.utility.MountableFile; @Slf4j @@ -64,6 +66,8 @@ public class TestContainerHelloWorld { private static final String CONTAINER_ARTIFACT_PATH = "/apps/mantis/mantis-server-agent/mantis-artifacts/storage/"; + private static final String LOGGING_ENABLED_METRICS_GROUP = + "MasterApiMetrics;DeadLetterActor;JobDiscoveryRoute"; private static final String JOB_CLUSTER_CREATE = "{\"jobDefinition\":{\"name\":\"hello-sine-testcontainers\"," + "\"user\":\"mantisoss\",\"jobJarFileLocation\":\"file:///mantis-examples-sine-function-2.1.0-SNAPSHOT" + ".zip\"," + @@ -94,8 +98,9 @@ public void helloWorld() throws Exception { Path path = Paths.get("../mantis-control-plane/Dockerfile"); log.info("Building control plane image from: {}", path); - ImageFromDockerfile controlPlaneDockerFile = new ImageFromDockerfile() - .withDockerfile(path); + ImageFromDockerfile controlPlaneDockerFile = + new ImageFromDockerfile("localhost/testcontainers/mantis_control_plane_server_" + Base58.randomString(4).toLowerCase()) + .withDockerfile(path); try ( Network network = Network.newNetwork(); GenericContainer zookeeper = @@ -106,11 +111,15 @@ public void helloWorld() throws Exception { GenericContainer master = USE_LOCAL_BUILT_IMAGE ? new GenericContainer<>(controlPlaneDockerFile) + .withEnv(LoggingMetricsPublisher.LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY, + LOGGING_ENABLED_METRICS_GROUP) .withNetwork(network) .withNetworkAliases(CONTROL_PLANE_ALIAS) .withExposedPorts(CONTROL_PLANE_API_PORT) : new GenericContainer<>("netflixoss/mantiscontrolplaneserver:latest") + .withEnv(LoggingMetricsPublisher.LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY, + LOGGING_ENABLED_METRICS_GROUP) .withNetwork(network) .withNetworkAliases(CONTROL_PLANE_ALIAS) .withExposedPorts(CONTROL_PLANE_API_PORT); @@ -135,9 +144,15 @@ public void helloWorld() throws Exception { log.info(response.body().string()); // Create agent(s) + Path agentDockerFilePath = Paths.get("../mantis-server/mantis-server-agent/Dockerfile"); + log.info("Building agent image from: {}", agentDockerFilePath); + ImageFromDockerfile agentDockerFile = + new ImageFromDockerfile("localhost/testcontainers/mantis_agent_" + Base58.randomString(4).toLowerCase()) + .withDockerfile(agentDockerFilePath); + final String agentId0 = "agent0"; final String agent0Hostname = String.format("%s%shostname", agentId0, CLUSTER_ID); - GenericContainer agent0 = createAgent(agentId0, CLUSTER_ID, agent0Hostname, network); + GenericContainer agent0 = createAgent(agentId0, CLUSTER_ID, agent0Hostname, agentDockerFile, network); String controlPlaneHost = master.getHost(); int controlPlanePort = master.getMappedPort(CONTROL_PLANE_API_PORT); @@ -207,12 +222,7 @@ private void zkCheck(GenericContainer zookeeper) throws Exception { } private GenericContainer createAgent(String agentId, String resourceClusterId, String hostname, - Network network) { - Path path = Paths.get("../mantis-server/mantis-server-agent/Dockerfile"); - log.info("Building agent image from: {}", path); - ImageFromDockerfile dockerFile = new ImageFromDockerfile() - .withDockerfile(path); - + ImageFromDockerfile dockerFile, Network network) { // setup sample job artifact MountableFile sampleArtifact = MountableFile.forHostPath( Paths.get("../mantis-examples/mantis-examples-sine-function/build/distributions/mantis-examples-sine-function-2.1.0-SNAPSHOT.zip"));