Skip to content

Commit

Permalink
Improve actor and http api server metrics (#539)
Browse files Browse the repository at this point in the history
* actor and http api metrics

* cleanup
  • Loading branch information
Andyz26 committed Aug 24, 2023
1 parent 0fc88ec commit 42b46dd
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.common.metrics;

import io.mantisrx.shaded.com.google.common.base.Joiner;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/**
* This implementation will print the selected metric groups in log.
* To use this publisher override config "mantis.metricsPublisher.class" in the master-*.properties file.
*/
@Slf4j
public class LoggingMetricsPublisher extends MetricsPublisher {
public static final String LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY =
"MANTIS_LOGGING_ENABLED_METRICS_GROUP_ID_LIST";
private Set<String> loggingEnabledMetricsGroupId = new HashSet<>();

public LoggingMetricsPublisher(Properties properties) {
super(properties);

String key = properties.getProperty(LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY,
System.getenv(LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY));
log.info("LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY: {}", key);

if (key != null) {
this.loggingEnabledMetricsGroupId =
Arrays.stream(key
.toLowerCase().split(";")).collect(Collectors.toSet());
log.info("[Metrics Publisher] enable logging for: {}",
Joiner.on(',').join(this.loggingEnabledMetricsGroupId));
}
}

@Override
public void publishMetrics(long timestamp,
Collection<Metrics> currentMetricsRegistered) {
log.info("Printing metrics from: {}", Instant.ofEpochMilli(timestamp));
currentMetricsRegistered.stream()
.filter(ms -> this.loggingEnabledMetricsGroupId.contains(ms.getMetricGroupId().id().toLowerCase()))
.map(ms -> ms.counters().entrySet())
.flatMap(Collection::stream)
.forEach(m -> log.info("[METRICS] {} : {}", m.getKey(), m.getValue().value()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
Expand All @@ -28,8 +27,10 @@
import akka.http.javadsl.settings.WebSocketSettings;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import com.netflix.spectator.impl.Preconditions;
import io.mantisrx.master.api.akka.route.MantisMasterRoute;
import io.mantisrx.master.api.akka.route.MasterApiMetrics;
import io.mantisrx.master.api.akka.route.handlers.JobArtifactRouteHandler;
import io.mantisrx.master.api.akka.route.handlers.JobArtifactRouteHandlerImpl;
import io.mantisrx.master.api.akka.route.handlers.JobClusterRouteHandler;
Expand Down Expand Up @@ -198,8 +199,6 @@ private void startAPIServer() {
final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow =
this.mantisMasterRoute.createRoute().flow(system, materializer);

final Http http = Http.get(system);

ServerSettings defaultSettings = ServerSettings.create(system);
java.time.Duration idleTimeout = system.settings().config().getDuration("akka.http.server.idle-timeout");
logger.info("idle timeout {} sec ", idleTimeout.getSeconds());
Expand All @@ -209,18 +208,22 @@ private void startAPIServer() {

ServerSettings customServerSettings = defaultSettings.withWebsocketSettings(customWebsocketSettings);

final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
ConnectHttp.toHost("0.0.0.0", port),
customServerSettings,
system.log(),
materializer);
binding.exceptionally(failure -> {
System.err.println("API service exited, committing suicide !" + failure.getMessage());
logger.info("Master API service exited in error, committing suicide !");
system.terminate();
System.exit(2);
return null;
});
final CompletionStage<ServerBinding> binding = Http.get(system)
.newServerAt("0.0.0.0", port)
.withSettings(customServerSettings)
.connectionSource()
.to(Sink.foreach(connection -> {
MasterApiMetrics.getInstance().incrementIncomingRequestCount();
connection.handleWith(routeFlow, materializer);
}))
.run(materializer)
.exceptionally(failure -> {
System.err.println("API service exited, committing suicide !" + failure.getMessage());
logger.info("Master API service exited in error, committing suicide !");
system.terminate();
System.exit(2);
return null;
});
logger.info("Starting Mantis Master API on port {}", port);
try {
serviceLatch.await();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ public class MasterApiMetrics {
private final Counter resp2xx;
private final Counter resp4xx;
private final Counter resp5xx;
private final Counter incomingRequestCount;
private final Counter askTimeOutCount;

private static final MasterApiMetrics INSTANCE = new MasterApiMetrics();

private MasterApiMetrics() {
Metrics m = new Metrics.Builder()
.id("MasterApiMetrics")
.addCounter("incomingRequestCount")
.addCounter("resp2xx")
.addCounter("resp4xx")
.addCounter("resp5xx")
Expand All @@ -41,6 +43,7 @@ private MasterApiMetrics() {
this.resp2xx = metrics.getCounter("resp2xx");
this.resp4xx = metrics.getCounter("resp4xx");
this.resp5xx = metrics.getCounter("resp5xx");
this.incomingRequestCount = metrics.getCounter("incomingRequestCount");
}

public static final MasterApiMetrics getInstance() {
Expand All @@ -62,4 +65,8 @@ public void incrementResp5xx() {
public void incrementAskTimeOutCount() {
askTimeOutCount.increment();
}

public void incrementIncomingRequestCount() {
incomingRequestCount.increment();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,20 @@ protected <T> Route withFuture(CompletableFuture<T> tFuture) {
t -> t.fold(
throwable -> {
if (throwable instanceof TaskExecutorNotFoundException) {
MasterApiMetrics.getInstance().incrementResp4xx();
return complete(StatusCodes.NOT_FOUND);
}

if (throwable instanceof RequestThrottledException) {
MasterApiMetrics.getInstance().incrementResp4xx();
return complete(StatusCodes.TOO_MANY_REQUESTS);
}

if (throwable instanceof AskTimeoutException) {
MasterApiMetrics.getInstance().incrementAskTimeOutCount();
}

MasterApiMetrics.getInstance().incrementResp5xx();
return complete(StatusCodes.INTERNAL_SERVER_ERROR, throwable, Jackson.marshaller());
},
r -> complete(StatusCodes.OK, r, Jackson.marshaller())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class ResourceClusterActor extends AbstractActorWithTimers {
private final String jobClustersWithArtifactCachingEnabled;

static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, Duration assignmentTimeout, Duration disabledTaskExecutorsCheckInterval, Clock clock, RpcService rpcService, MantisJobStore mantisJobStore, JobMessageRouter jobMessageRouter, int maxJobArtifactsToCache, String jobClustersWithArtifactCachingEnabled) {
return Props.create(ResourceClusterActor.class, clusterID, heartbeatTimeout, assignmentTimeout, disabledTaskExecutorsCheckInterval, clock, rpcService, mantisJobStore, jobMessageRouter, maxJobArtifactsToCache, jobClustersWithArtifactCachingEnabled);
return Props.create(ResourceClusterActor.class, clusterID, heartbeatTimeout, assignmentTimeout, disabledTaskExecutorsCheckInterval, clock, rpcService, mantisJobStore, jobMessageRouter, maxJobArtifactsToCache, jobClustersWithArtifactCachingEnabled)
.withMailbox("akka.actor.metered-mailbox");
}

ResourceClusterActor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import com.netflix.fenzo.AutoScaleAction;
import com.netflix.fenzo.AutoScaleRule;
import com.netflix.fenzo.VirtualMachineLease;
import com.netflix.spectator.api.DefaultRegistry;
import com.sampullara.cli.Args;
import com.sampullara.cli.Argument;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory;
import io.mantisrx.master.DeadLetterActor;
import io.mantisrx.master.JobClustersManagerActor;
import io.mantisrx.master.JobClustersManagerService;
Expand Down Expand Up @@ -365,6 +367,7 @@ public static void main(String[] args) {
}

try {
SpectatorRegistryFactory.setRegistry(new DefaultRegistry());
Properties props = new Properties();
props.putAll(System.getenv());
props.putAll(System.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,6 @@ mesos.worker.executorscript=startup_docker.sh
mesos.worker.installDir=/mnt/local/mantisWorkerInstall
mantis.master.framework.name=MantisFramework
mesos.worker.timeoutSecondsToReportStart=10

# test container override
mantis.metricsPublisher.class=io.mantisrx.common.metrics.LoggingMetricsPublisher
28 changes: 19 additions & 9 deletions mantis-testcontainers/src/test/java/TestContainerHelloWorld.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import io.mantisrx.common.metrics.LoggingMetricsPublisher;
import io.mantisrx.server.master.resourcecluster.TaskExecutorID;
import io.mantisrx.shaded.com.fasterxml.jackson.core.type.TypeReference;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -44,6 +45,7 @@
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.utility.Base58;
import org.testcontainers.utility.MountableFile;

@Slf4j
Expand All @@ -64,6 +66,8 @@ public class TestContainerHelloWorld {

private static final String CONTAINER_ARTIFACT_PATH = "/apps/mantis/mantis-server-agent/mantis-artifacts/storage/";

private static final String LOGGING_ENABLED_METRICS_GROUP =
"MasterApiMetrics;DeadLetterActor;JobDiscoveryRoute";
private static final String JOB_CLUSTER_CREATE = "{\"jobDefinition\":{\"name\":\"hello-sine-testcontainers\","
+ "\"user\":\"mantisoss\",\"jobJarFileLocation\":\"file:///mantis-examples-sine-function-2.1.0-SNAPSHOT"
+ ".zip\"," +
Expand Down Expand Up @@ -94,8 +98,9 @@ public void helloWorld() throws Exception {

Path path = Paths.get("../mantis-control-plane/Dockerfile");
log.info("Building control plane image from: {}", path);
ImageFromDockerfile controlPlaneDockerFile = new ImageFromDockerfile()
.withDockerfile(path);
ImageFromDockerfile controlPlaneDockerFile =
new ImageFromDockerfile("localhost/testcontainers/mantis_control_plane_server_" + Base58.randomString(4).toLowerCase())
.withDockerfile(path);
try (
Network network = Network.newNetwork();
GenericContainer<?> zookeeper =
Expand All @@ -106,11 +111,15 @@ public void helloWorld() throws Exception {

GenericContainer<?> master = USE_LOCAL_BUILT_IMAGE ?
new GenericContainer<>(controlPlaneDockerFile)
.withEnv(LoggingMetricsPublisher.LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY,
LOGGING_ENABLED_METRICS_GROUP)
.withNetwork(network)
.withNetworkAliases(CONTROL_PLANE_ALIAS)
.withExposedPorts(CONTROL_PLANE_API_PORT)
:
new GenericContainer<>("netflixoss/mantiscontrolplaneserver:latest")
.withEnv(LoggingMetricsPublisher.LOGGING_ENABLED_METRICS_GROUP_ID_LIST_KEY,
LOGGING_ENABLED_METRICS_GROUP)
.withNetwork(network)
.withNetworkAliases(CONTROL_PLANE_ALIAS)
.withExposedPorts(CONTROL_PLANE_API_PORT);
Expand All @@ -135,9 +144,15 @@ public void helloWorld() throws Exception {
log.info(response.body().string());

// Create agent(s)
Path agentDockerFilePath = Paths.get("../mantis-server/mantis-server-agent/Dockerfile");
log.info("Building agent image from: {}", agentDockerFilePath);
ImageFromDockerfile agentDockerFile =
new ImageFromDockerfile("localhost/testcontainers/mantis_agent_" + Base58.randomString(4).toLowerCase())
.withDockerfile(agentDockerFilePath);

final String agentId0 = "agent0";
final String agent0Hostname = String.format("%s%shostname", agentId0, CLUSTER_ID);
GenericContainer<?> agent0 = createAgent(agentId0, CLUSTER_ID, agent0Hostname, network);
GenericContainer<?> agent0 = createAgent(agentId0, CLUSTER_ID, agent0Hostname, agentDockerFile, network);

String controlPlaneHost = master.getHost();
int controlPlanePort = master.getMappedPort(CONTROL_PLANE_API_PORT);
Expand Down Expand Up @@ -207,12 +222,7 @@ private void zkCheck(GenericContainer<?> zookeeper) throws Exception {
}

private GenericContainer<?> createAgent(String agentId, String resourceClusterId, String hostname,
Network network) {
Path path = Paths.get("../mantis-server/mantis-server-agent/Dockerfile");
log.info("Building agent image from: {}", path);
ImageFromDockerfile dockerFile = new ImageFromDockerfile()
.withDockerfile(path);

ImageFromDockerfile dockerFile, Network network) {
// setup sample job artifact
MountableFile sampleArtifact = MountableFile.forHostPath(
Paths.get("../mantis-examples/mantis-examples-sine-function/build/distributions/mantis-examples-sine-function-2.1.0-SNAPSHOT.zip"));
Expand Down

0 comments on commit 42b46dd

Please sign in to comment.