Skip to content

Commit

Permalink
Enable cron expression on flush (#572)
Browse files Browse the repository at this point in the history
Default behavior to look for HOUR i.e. the flush interval. If the flush interval is not set, it is considered OFF (default backward compatible behavior). Similar behavior for CRON.
  • Loading branch information
arunagrawal-84 authored Jul 16, 2017
1 parent 06d6c7c commit 485299b
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 43 deletions.
31 changes: 25 additions & 6 deletions priam/src/main/java/com/netflix/priam/IConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,17 @@ public interface IConfiguration {
public int getBackupHour();

/**
* Cron expression to be used for snapshot backups.
* @return Backup cron expression for snapshots
* @see <a href="http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html">quartz-scheduler</a>}
* @see <a href="http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html">quartz-scheduler</a>
* @see <a href="http://www.cronmaker.com">http://www.cronmaker.com</a> To build new cron timer
*/
public String getBackupCronExpression();

/**
* @return Type of scheduler to use for backup. Note the default is TIMER based i.e. to use @see #getBackupHour().
* If value of "CRON" is provided it starts using @see #getBackupCronExpression().
* Backup scheduler type to use for backup.
* @return Type of scheduler to use for backup. Note the default is TIMER based i.e. to use {@link #getBackupHour()}.
* If value of "CRON" is provided it starts using {@link #getBackupCronExpression()}.
*/
public SchedulerType getBackupSchedulerType() throws UnsupportedTypeException;

Expand Down Expand Up @@ -615,18 +617,35 @@ public interface IConfiguration {
*/
int getStreamingSocketTimeoutInMS();

/*
/**
* List of keyspaces to flush. Default: all keyspaces.
* @return a comma delimited list of keyspaces to flush
*/
public String getFlushKeyspaces();

/*
/**
* Interval to be used for flush.
* @return the interval to run the flush task. Format is name=value where
* “name” is an enum of hour, daily, value is ...
*/
public String getFlushInterval();

/*
/**
* Scheduler type to use for flush.
* @return Type of scheduler to use for flush. Note the default is TIMER based i.e. to use {@link #getFlushInterval()}.
* If value of "CRON" is provided it starts using {@link #getFlushCronExpression()}.
*/
public SchedulerType getFlushSchedulerType() throws UnsupportedTypeException;

/**
* Cron expression to be used for flush.
* @return Cron expression for flush
* @see <a href="http://www.quartz-scheduler.org/documentation/quartz-2.x/tutorials/crontrigger.html">quartz-scheduler</a>
* @see <a href="http://www.cronmaker.com">http://www.cronmaker.com</a> To build new cron timer
*/
public String getFlushCronExpression();

/**
@return the absolute path to store the backup status on disk
*/
public String getBackupStatusFileLoc();
Expand Down
7 changes: 4 additions & 3 deletions priam/src/main/java/com/netflix/priam/PriamServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.priam.restore.GoogleCryptographyRestoreStrategy;
import com.netflix.priam.restore.RestoreContext;
import com.netflix.priam.scheduler.PriamScheduler;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.utils.CassandraMonitor;
import com.netflix.priam.utils.Sleeper;
import com.netflix.priam.utils.TuneCassandra;
Expand Down Expand Up @@ -173,9 +174,9 @@ else if (UpdateSecuritySettings.firstTimeUpdated)
scheduler.addTask(UpdateCleanupPolicy.JOBNAME, UpdateCleanupPolicy.class, UpdateCleanupPolicy.getTimer());

//Set up nodetool flush task
String timerVal = config.getFlushInterval(); //e.g. hour=0 or daily=10)
if (timerVal != null && !timerVal.isEmpty()) {
scheduler.addTask(FlushTask.JOBNAME, FlushTask.class, FlushTask.getTimer(config));
TaskTimer flushTaskTimer = FlushTask.getTimer(config);
if (flushTaskTimer != null) {
scheduler.addTask(FlushTask.JOBNAME, FlushTask.class, flushTaskTimer);
logger.info("Added nodetool flush task.");
}
}
Expand Down
18 changes: 0 additions & 18 deletions priam/src/main/java/com/netflix/priam/aws/S3FileSystemBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,6 @@ public S3FileSystemBase (IMetricPublisher metricPublisher) {
this.metricPublisher = metricPublisher;
awsSlowDownMeasurement = new AWSSlowDownExceptionMeasurement(); //a counter of AWS warning for all uploads
}

// /*
// * S3 End point information
// * http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
// */
// protected String getS3Endpoint(IConfiguration config)
// {
// final String curRegion = config.getDC();
// if("us-east-1".equalsIgnoreCase(curRegion) ||
// "us-west-1".equalsIgnoreCase(curRegion) ||
// "us-west-2".equalsIgnoreCase(curRegion) ||
// "eu-west-1".equalsIgnoreCase(curRegion) ||
// "sa-east-1".equalsIgnoreCase(curRegion) ||
// "eu-central-1".equalsIgnoreCase(curRegion))
// return config.getS3EndPoint();
//
// throw new IllegalStateException("Unsupported region for this application: " + curRegion);
// }

public AmazonS3 getS3Client()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import com.netflix.priam.scheduler.CronTimer;
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.scheduler.UnsupportedTypeException;
import com.netflix.priam.utils.JMXConnectorMgr;
import org.apache.commons.lang3.StringUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,24 +89,59 @@ public String getName() {
return JOBNAME;
}

/*
@return the hourly or daily time to execute the flush
/**
* Timer to be used for flush interval.
* @param config {@link IConfiguration} to get configuration details from priam.
* @return the timer to be used for flush interval.
* <p>
* If {@link IConfiguration#getFlushSchedulerType()} is {@link com.netflix.priam.scheduler.SchedulerType#HOUR} then it expects {@link IConfiguration#getFlushInterval()} in the format of hour=x or daily=x
* <p>
* If {@link IConfiguration#getFlushSchedulerType()} is {@link com.netflix.priam.scheduler.SchedulerType#CRON} then it expects a valid CRON expression from {@link IConfiguration#getFlushCronExpression()}
*/
public static TaskTimer getTimer(IConfiguration config) {
String timerVal = config.getFlushInterval(); //e.g. hour=0 or daily=10
String s[] = timerVal.split("=");
if (s.length != 2 ){
throw new IllegalArgumentException("Flush interval format is invalid. Expecting name=value, received: " + timerVal);
}
String name = s[0];
Integer time = new Integer(s[1]);
public static TaskTimer getTimer(IConfiguration config) throws Exception{

if (name.equalsIgnoreCase("hour")) {
return new CronTimer(JOBNAME, time, 0); //minute, sec after each hour
} if (name.equalsIgnoreCase("daily")) {
return new CronTimer(JOBNAME, time, 0 , 0); //hour, minute, sec to run on a daily basis
} else {
throw new IllegalArgumentException("Flush interval type is invalid. Expecting \"hour, daily\", received: " + name);
CronTimer cronTimer = null;
switch (config.getFlushSchedulerType())
{
case HOUR:
String timerVal = config.getFlushInterval(); //e.g. hour=0 or daily=10
if (timerVal == null)
return null;
String s[] = timerVal.split("=");
if (s.length != 2 ){
throw new IllegalArgumentException("Flush interval format is invalid. Expecting name=value, received: " + timerVal);
}
String name = s[0].toUpperCase();
Integer time = new Integer(s[1]);
switch(name)
{
case "HOUR":
cronTimer = new CronTimer(JOBNAME, time, 0); //minute, sec after each hour
break;
case "DAILY":
cronTimer = new CronTimer(JOBNAME, time, 0 , 0); //hour, minute, sec to run on a daily basis
break;
default:
throw new UnsupportedTypeException("Flush interval type is invalid. Expecting \"hour, daily\", received: " + name);
}

break;
case CRON:
String cronExpression = config.getFlushCronExpression();

if(StringUtils.isEmpty(cronExpression)){
logger.info("Skipping flush as flush cron is not set.");
}else
{
if(!CronExpression.isValidExpression(cronExpression))
throw new Exception("Invalid CRON expression: " + cronExpression +
". Please remove cron expression if you wish to disable flush else fix the CRON expression and try again!");

cronTimer = new CronTimer(JOBNAME, cronExpression);
logger.info(String.format("Starting flush with CRON expression %s", cronTimer.getCronExpression()));
}
break;
}
return cronTimer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,17 @@ public SchedulerType getBackupSchedulerType() throws UnsupportedTypeException{
return SchedulerType.lookup(schedulerType);
}

@Override
public SchedulerType getFlushSchedulerType() throws UnsupportedTypeException{
String schedulerType = config.get(PRIAM_PRE + ".flush.schedule.type", SchedulerType.HOUR.getSchedulerType());
return SchedulerType.lookup(schedulerType);
}

@Override
public String getFlushCronExpression() {
return config.get(PRIAM_PRE + ".flush.cron");
}

@Override
public String getSnapshotKeyspaceFilters() {
return config.get(CONFIG_SNAPSHOT_KEYSPACE_FILTER);
Expand Down
11 changes: 11 additions & 0 deletions priam/src/test/java/com/netflix/priam/FakeConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.inject.Singleton;
import com.netflix.priam.defaultimpl.PriamConfiguration;
import com.netflix.priam.scheduler.SchedulerType;
import com.netflix.priam.scheduler.UnsupportedTypeException;

@Singleton
public class FakeConfiguration implements IConfiguration
Expand Down Expand Up @@ -786,4 +787,14 @@ public String getFlushInterval() {
public String getBackupStatusFileLoc() {
return "backupstatus.ser";
}

@Override
public SchedulerType getFlushSchedulerType() throws UnsupportedTypeException {
return SchedulerType.HOUR;
}

@Override
public String getFlushCronExpression() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.inject.Singleton;
import com.netflix.priam.defaultimpl.PriamConfiguration;
import com.netflix.priam.scheduler.SchedulerType;
import com.netflix.priam.scheduler.UnsupportedTypeException;

@Singleton
public class FakeConfigurationMurmur3 implements IConfiguration
Expand Down Expand Up @@ -786,4 +787,14 @@ public String getFlushInterval() {
public String getBackupStatusFileLoc() {
return "backupstatus.ser";
}

@Override
public SchedulerType getFlushSchedulerType() throws UnsupportedTypeException {
return SchedulerType.HOUR;
}

@Override
public String getFlushCronExpression() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.netflix.priam.scheduler

import com.netflix.priam.FakeConfiguration
import com.netflix.priam.cluster.management.FlushTask
import spock.lang.Specification
import spock.lang.Unroll

/**
Created by aagrawal on 7/15/17.
*/
@Unroll
class TestFlushTask extends Specification {

def "Exception for value #flushSchedulerType, #flushCronExpression, #flushInterval"() {
when:
FlushTask.getTimer(new FlushConfiguration(flushSchedulerType, flushCronExpression, flushInterval))

then:
def error = thrown(expectedException)

where:
flushSchedulerType | flushCronExpression | flushInterval || expectedException
"sdf" | null | null || UnsupportedTypeException
"hour" | null | "2" || IllegalArgumentException
"hour" | "0 0 2 * * ?" | "2" || IllegalArgumentException
"cron" | "abc" | null || Exception
"cron" | "abc" | "daily=2" || Exception
"hour" | null | "hour=2,daily=2" || IllegalArgumentException
}

def "SchedulerType for value #flushSchedulerType, #flushCronExpression, #flushInterval is null"() {
expect:
FlushTask.getTimer(new FlushConfiguration(flushSchedulerType, flushCronExpression, flushInterval)) == result

where:
flushSchedulerType | flushCronExpression | flushInterval || result
"hour" | null | null || null
"cron" | null | null || null
"hour" | "abc" | null || null
"cron" | null | "abc" || null
}

def "SchedulerType for value #flushSchedulerType, #flushCronExpression, #flushInterval is #result"() {
expect:
FlushTask.getTimer(new FlushConfiguration(flushSchedulerType, flushCronExpression, flushInterval)).getCronExpression() == result

where:
flushSchedulerType | flushCronExpression | flushInterval || result
"hour" | null | "daily=2" || "0 0 2 * * ?"
"hour" | null | "hour=2" || "0 2 0/1 * * ?"
"cron" | "0 0 0/1 1/1 * ? *" | null || "0 0 0/1 1/1 * ? *"
"cron" | "0 0 0/1 1/1 * ? *" | "daily=2" || "0 0 0/1 1/1 * ? *"
}


private class FlushConfiguration extends FakeConfiguration {
private String flushSchedulerType, flushCronExpression, flushInterval;

FlushConfiguration(String flushSchedulerType, String flushCronExpression, String flushInterval) {
this.flushCronExpression = flushCronExpression;
this.flushSchedulerType = flushSchedulerType;
this.flushInterval = flushInterval;
}

@Override
public SchedulerType getFlushSchedulerType() throws UnsupportedTypeException {
return SchedulerType.lookup(flushSchedulerType);
}

@Override
public String getFlushCronExpression() {
return flushCronExpression;
}

@Override
public String getFlushInterval() {
return flushInterval;
}
}

}

0 comments on commit 485299b

Please sign in to comment.