Skip to content

Commit

Permalink
Bug fix: resource leak (Too many open files) while executing Cassandr…
Browse files Browse the repository at this point in the history
…a monitor (#573)

* Bug fix: resource leak (Too many open files) while executing Cassandra monitor.
  • Loading branch information
arunagrawal-84 authored Jul 19, 2017
1 parent 5d56edf commit ea3a557
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 67 deletions.
99 changes: 50 additions & 49 deletions priam/src/main/java/com/netflix/priam/utils/CassandraMonitor.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,84 +15,85 @@
*/
package com.netflix.priam.utils;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.priam.IConfiguration;
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.scheduler.TaskTimer;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.atomic.AtomicBoolean;

/*
* This task checks if the Cassandra process is running.
*/
@Singleton
public class CassandraMonitor extends Task{
public class CassandraMonitor extends Task {

public static final String JOBNAME = "CASS_MONITOR_THREAD";
public static final String JOBNAME = "CASS_MONITOR_THREAD";
private static final Logger logger = LoggerFactory.getLogger(CassandraMonitor.class);
private static final AtomicBoolean isCassandraStarted = new AtomicBoolean(false);

@Inject
protected CassandraMonitor(IConfiguration config) {
super(config);
}
super(config);
}

@Override
public void execute() throws Exception {
@Override
public void execute() throws Exception {

try
{
//This returns pid for the Cassandra process
Process p = Runtime.getRuntime().exec("pgrep -f " + config.getCassProcessName());
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
Process process = null;
BufferedReader input = null;
try {
//This returns pid for the Cassandra process
process = Runtime.getRuntime().exec("pgrep -f " + config.getCassProcessName());
input = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = input.readLine();
if (line != null&& !isCassadraStarted())
{
//Setting cassandra flag to true
isCassandraStarted.set(true);
}
else if(line == null&& isCassadraStarted())
{
//Setting cassandra flag to false
isCassandraStarted.set(false);
}
}
catch(Exception e)
{
logger.warn("Exception thrown while checking if Cassandra is running or not ", e);
if (line != null && !isCassadraStarted()) {
//Setting cassandra flag to true
isCassandraStarted.set(true);
} else if (line == null && isCassadraStarted()) {
//Setting cassandra flag to false
isCassandraStarted.set(false);
}
} catch (Exception e) {
logger.warn("Exception thrown while checking if Cassandra is running or not ", e);
//Setting Cassandra flag to false
isCassandraStarted.set(false);
} finally {
if (process != null) {
IOUtils.closeQuietly(process.getInputStream());
IOUtils.closeQuietly(process.getOutputStream());
IOUtils.closeQuietly(process.getErrorStream());
}

if (input != null)
IOUtils.closeQuietly(input);
}

}

public static TaskTimer getTimer()
{
}

public static TaskTimer getTimer() {
return new SimpleTimer(JOBNAME, 10L * 1000);
}

@Override
public String getName()
{
public String getName() {
return JOBNAME;
}

public static Boolean isCassadraStarted()
{
public static Boolean isCassadraStarted() {
return isCassandraStarted.get();
}

//Added for testing only
public static void setIsCassadraStarted()
{
//Setting cassandra flag to true
isCassandraStarted.set(true);
}
public static void setIsCassadraStarted() {
//Setting cassandra flag to true
isCassandraStarted.set(true);
}
}
35 changes: 17 additions & 18 deletions priam/src/test/java/com/netflix/priam/backup/TestBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,23 @@ public void testSnapshotBackup() throws Exception
{
filesystem.setupTest();
SnapshotBackup backup = injector.getInstance(SnapshotBackup.class);
CassandraMonitor cassMon = injector.getInstance(CassandraMonitor.class);
cassMon.setIsCassadraStarted();
/* *
backup.execute();
Assert.assertEquals(3, filesystem.uploadedFiles.size());
System.out.println("***** "+filesystem.uploadedFiles.size());
boolean metafile = false;
for (String filePath : expectedFiles)
Assert.assertTrue(filesystem.uploadedFiles.contains(filePath));
for(String filepath : filesystem.uploadedFiles){
if( filepath.endsWith("meta.json")){
metafile = true;
break;
}
}
Assert.assertTrue(metafile);
* */

//
// backup.execute();
// Assert.assertEquals(3, filesystem.uploadedFiles.size());
// System.out.println("***** "+filesystem.uploadedFiles.size());
// boolean metafile = false;
// for (String filePath : expectedFiles)
// Assert.assertTrue(filesystem.uploadedFiles.contains(filePath));
//
// for(String filepath : filesystem.uploadedFiles){
// if( filepath.endsWith("meta.json")){
// metafile = true;
// break;
// }
// }
// Assert.assertTrue(metafile);

}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.netflix.priam.utils;

import com.google.inject.Guice;
import com.google.inject.Injector;
import com.netflix.priam.backup.BRTestModule;
import junit.framework.Assert;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;

/**
* Created by aagrawal on 7/18/17.
*/
public class TestCassandraMonitor {

private Injector injector;
private CassandraMonitor monitor;

@Before
public void setUp() {
if (injector == null)
injector = Guice.createInjector(new BRTestModule());

if (monitor == null)
monitor = injector.getInstance(CassandraMonitor.class);
}

@Test
public void testCassandraMonitor() throws Exception {
monitor.execute();
Assert.assertFalse(monitor.isCassadraStarted());

monitor.setIsCassadraStarted();
Assert.assertTrue(monitor.isCassadraStarted());

monitor.execute();
Assert.assertFalse(monitor.isCassadraStarted());
}

}

0 comments on commit ea3a557

Please sign in to comment.