Skip to content

Commit

Permalink
Metrics for Cassandra Process Manager and Monitor. (#656)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinaykumarchella authored Jan 11, 2018
1 parent 3990f8e commit 8a60bb3
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.priam.ICassandraProcess;
import com.netflix.priam.IConfiguration;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.merics.ICassMonitorMetrics;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,11 +39,13 @@ public class CassandraProcessManager implements ICassandraProcess {
private static final int SCRIPT_EXECUTE_WAIT_TIME_MS = 5000;
protected final IConfiguration config;
private InstanceState instanceState;
private ICassMonitorMetrics iCassMonitorMetrics;

@Inject
public CassandraProcessManager(IConfiguration config, InstanceState instanceState) {
public CassandraProcessManager(IConfiguration config, InstanceState instanceState, ICassMonitorMetrics cassMonitorMetrics) {
this.config = config;
this.instanceState = instanceState;
this.iCassMonitorMetrics = cassMonitorMetrics;
}

protected void setEnv(Map<String, String> env) {
Expand Down Expand Up @@ -94,6 +97,7 @@ public void start(boolean join_ring) throws IOException
if (code == 0) {
logger.info("Cassandra server has been started");
instanceState.setCassandraProcessAlive(true);
this.iCassMonitorMetrics.incCassStart();
}
else
logger.error("Unable to start cassandra server. Error code: {}", code);
Expand Down Expand Up @@ -160,6 +164,7 @@ public void stop() throws IOException {
int code = stopper.waitFor();
if (code == 0) {
logger.info("Cassandra server has been stopped");
this.iCassMonitorMetrics.incCassStop();
instanceState.setCassandraProcessAlive(false);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.netflix.priam.identity.InstanceEnvIdentity;
import com.netflix.priam.identity.token.*;
import com.netflix.priam.merics.BackupMetricsMgr;
import com.netflix.priam.merics.CassMonitorMetrics;
import com.netflix.priam.merics.ICassMonitorMetrics;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;

Expand Down Expand Up @@ -71,5 +73,6 @@ protected void configure() {
bind(ITaskQueueMgr.class).annotatedWith(Names.named("backup")).to(CassandraBackupQueueMgr.class);
bind(InstanceEnvIdentity.class).to(AwsInstanceEnvIdentity.class);
bind(IBackupMetrics.class).to(BackupMetricsMgr.class);
bind(ICassMonitorMetrics.class).to(CassMonitorMetrics.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Copyright 2018 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.priam.merics;

import com.google.inject.Singleton;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author vchella
*/
@Singleton
public class CassMonitorMetrics implements ICassMonitorMetrics {

AtomicInteger cassStop = new AtomicInteger(),
cassAutoStart = new AtomicInteger(),
cassStart = new AtomicInteger();


@Override
public void incCassStop() {
cassStop.getAndIncrement();
}

@Override
public void incCassAutoStart() {
cassAutoStart.getAndIncrement();
}

@Override
public void incCassStart() {
cassStart.getAndIncrement();
}

@Override
public int getCassStop() {
return cassStop.get();
}

@Override
public int getCassAutoStart() {
return cassAutoStart.get();
}

@Override
public int getCassStart() {
return cassStart.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/**
* Copyright 2018 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.priam.merics;

import com.google.inject.ImplementedBy;

/**
* A means to keep track of events with Cassandra Monitor (Start, restart, stop etc.,)
* Created by vchella on 01/10/18.
*/
@ImplementedBy(CassMonitorMetrics.class)
public interface ICassMonitorMetrics {
void incCassStop();
void incCassAutoStart();
void incCassStart();

int getCassStop();
int getCassAutoStart();
int getCassStart();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.inject.Inject;
import com.netflix.priam.IConfiguration;
import com.netflix.priam.defaultimpl.CassandraProcessManager;
import com.netflix.priam.merics.ICassMonitorMetrics;
import com.netflix.priam.tuner.dse.IDseConfiguration.NodeType;
import com.netflix.priam.health.InstanceState;

Expand All @@ -28,8 +29,8 @@ public class DseProcessManager extends CassandraProcessManager {
private InstanceState instanceState;

@Inject
public DseProcessManager(IConfiguration config, IDseConfiguration dseConfig, InstanceState instanceState) {
super(config, instanceState);
public DseProcessManager(IConfiguration config, IDseConfiguration dseConfig, InstanceState instanceState, ICassMonitorMetrics cassMonitorMetrics) {
super(config, instanceState, cassMonitorMetrics);
this.dseConfig = dseConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.priam.ICassandraProcess;
import com.netflix.priam.IConfiguration;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.merics.ICassMonitorMetrics;
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.scheduler.TaskTimer;
Expand All @@ -48,13 +49,15 @@ public class CassandraMonitor extends Task {
private InstanceState instanceState;
private ICassandraProcess cassProcess;
private RateLimiter startRateLimiter;
private ICassMonitorMetrics cassMonitorMetrics;

@Inject
protected CassandraMonitor(IConfiguration config, InstanceState instanceState, ICassandraProcess cassProcess) {
protected CassandraMonitor(IConfiguration config, InstanceState instanceState, ICassandraProcess cassProcess, ICassMonitorMetrics cassMonitorMetrics) {
super(config);
this.instanceState = instanceState;
this.cassProcess = cassProcess;
startRateLimiter = RateLimiter.create(1.0);
this.cassMonitorMetrics = cassMonitorMetrics;
}

@Override
Expand Down Expand Up @@ -110,6 +113,7 @@ public void execute() throws Exception {
if (rate >= 0 && !config.doesCassandraStartManually()) {
if (instanceState.shouldCassandraBeAlive() && !instanceState.isCassandraProcessAlive()) {
if (rate == 0 || startRateLimiter.tryAcquire(rate)) {
cassMonitorMetrics.incCassAutoStart();
cassProcess.start(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.priam.IConfiguration;
import com.netflix.priam.backup.BRTestModule;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.merics.ICassMonitorMetrics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -38,7 +39,9 @@ public class CassandraProcessManagerTest {
public void setup() {
IConfiguration config = new FakeConfiguration("us-east-1", "test_cluster", "us-east-1a", "i-2378afd3");
InstanceState instanceState = Guice.createInjector(new BRTestModule()).getInstance(InstanceState.class);
cpm = new CassandraProcessManager(config, instanceState);
ICassMonitorMetrics cassMonitorMetrics = Guice.createInjector(new BRTestModule()).getInstance(ICassMonitorMetrics.class);

cpm = new CassandraProcessManager(config, instanceState, cassMonitorMetrics);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.priam.IConfiguration;
import com.netflix.priam.backup.BRTestModule;
import com.netflix.priam.health.InstanceState;
import com.netflix.priam.merics.ICassMonitorMetrics;
import org.junit.Assert;
import mockit.*;
import org.apache.cassandra.tools.NodeProbe;
Expand All @@ -38,6 +39,8 @@
public class TestCassandraMonitor {
private static CassandraMonitor monitor;
private static InstanceState instanceState;
private static ICassMonitorMetrics cassMonitorMetrics;

private IConfiguration config;

@Mocked
Expand All @@ -53,8 +56,10 @@ public void setUp() {
config = injector.getInstance(IConfiguration.class);
if (instanceState == null)
instanceState = injector.getInstance(InstanceState.class);
if (cassMonitorMetrics == null)
cassMonitorMetrics = injector.getInstance(ICassMonitorMetrics.class);
if (monitor == null)
monitor = new CassandraMonitor(config, instanceState, cassProcess);
monitor = new CassandraMonitor(config, instanceState, cassProcess, cassMonitorMetrics);
}

@Test
Expand Down

0 comments on commit 8a60bb3

Please sign in to comment.