Skip to content

Commit

Permalink
Merge pull request #456 from tulumvinh/3.x
Browse files Browse the repository at this point in the history
1. Allow visibility on the state of a current restore  2.   Upgrade tests to use JMockit 1.19  3.  Properly log a client asks for a token value
  • Loading branch information
tulumvinh committed Mar 29, 2016
2 parents 564d1ae + e74c114 commit 8ec7847
Show file tree
Hide file tree
Showing 13 changed files with 480 additions and 182 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ subprojects {
compile 'com.google.apis:google-api-services-storage:v1-rev9-1.19.0'
compile 'com.google.http-client:google-http-client-jackson2:1.19.0'
provided 'javax.servlet:servlet-api:2.5'
testCompile 'com.googlecode.jmockit:jmockit:0.999.17'
testCompile 'org.jmockit:jmockit:1.19'
testCompile 'junit:junit:4.8'
}
}
}
92 changes: 90 additions & 2 deletions priam/src/main/java/com/netflix/priam/backup/Restore.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,7 +38,9 @@
import com.netflix.priam.backup.AbstractBackupPath.BackupFileType;
import com.netflix.priam.identity.InstanceIdentity;
import com.netflix.priam.scheduler.SimpleTimer;
import com.netflix.priam.scheduler.Task;
import com.netflix.priam.scheduler.TaskTimer;
import com.netflix.priam.scheduler.Task.STATE;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.Sleeper;
import com.netflix.priam.utils.SystemUtils;
Expand All @@ -60,13 +63,25 @@ public class Restore extends AbstractRestore
private MetaData metaData;
@Inject
private InstanceIdentity id;

private DateTime startDateRange = null, endDateRange = null; //Date range to restore from
private DateTime execStartTime = null, execEndTime = null; //Start-end time of the actual restore execution
private Task.STATE state = Task.STATE.NOT_APPLICABLE; //the state of a restore. Note: this is different than the "status" of a Task.

@Inject
public Restore(IConfiguration config, @Named("backup")IBackupFileSystem fs,Sleeper sleeper, ICassandraProcess cassProcess)
{
super(config, fs, JOBNAME, sleeper);
this.cassProcess = cassProcess;
}

private void initRestoreState() {
this.endDateRange = null;
this.startDateRange = null;
this.execEndTime = null;
this.execStartTime = null;
this.state = Task.STATE.NOT_APPLICABLE;
}

@Override
public void execute() throws Exception
Expand All @@ -79,6 +94,7 @@ public void execute() throws Exception
final Date startTime = path.parseDate(restore[0]);
final Date endTime = path.parseDate(restore[1]);
String origToken = id.getInstance().getToken();

try
{
if (config.isRestoreClosestToken())
Expand All @@ -90,15 +106,20 @@ public void execute() throws Exception
{
public Void retriableCall() throws Exception
{
logger.info("Attempting restore");
logger.info("Attempting restore");
restore(startTime, endTime);
logger.info("Restore completed");

// Wait for other server init to complete
sleeper.sleep(30000);
return null;
}
}.call();
}
catch (Exception e) {
this.state = STATE.ERROR;
this.execEndTime = new DateTime(new Date());
}
finally
{
id.getInstance().setToken(origToken);
Expand All @@ -112,6 +133,13 @@ public Void retriableCall() throws Exception
*/
public void restore(Date startTime, Date endTime) throws Exception
{

initRestoreState();
this.state = STATE.RUNNING;
this.startDateRange = new DateTime(startTime);
this.endDateRange = new DateTime(endTime);
this.execStartTime = new DateTime(new Date());

// Stop cassandra if its running and restoring all keyspaces
if (config.getRestoreKeySpaces().size() == 0)
cassProcess.stop();
Expand Down Expand Up @@ -144,6 +172,9 @@ public void restore(Date startTime, Date endTime) throws Exception
if (metas.size() == 0)
{
logger.info("[cass_backup] No snapshot meta file found, Restore Failed.");
execEndTime = new DateTime(new Date());
state = STATE.DONE;

assert false : "[cass_backup] No snapshots found, Restore Failed.";
return;
}
Expand Down Expand Up @@ -174,6 +205,9 @@ public void restore(Date startTime, Date endTime) throws Exception
Iterator<AbstractBackupPath> commitLogPathIterator = fs.list(prefix, meta.time, endTime);
download(commitLogPathIterator, BackupFileType.CL, config.maxCommitLogsRestore());
}

execEndTime = new DateTime(new Date());
state = STATE.DONE;
}

public static TaskTimer getTimer()
Expand All @@ -198,5 +232,59 @@ public static boolean isRestoreEnabled(IConfiguration conf)
boolean isBackedupRac = (CollectionUtils.isEmpty(conf.getBackupRacs()) || conf.getBackupRacs().contains(conf.getRac()));
return (isRestoreMode && isBackedupRac);
}

/*
* @returns the state of a restore, can be null if the restore never happened or if Priam was restarted (as
* restore state is not durable).
*/
public Task.STATE getRestoreState() {
return this.state;
}

/*
* @return the start date range used for the restore, null if there is no state information for the restore.
*/
public String getStartDateRange() {
if (this.startDateRange != null ) {
return this.startDateRange.toString("yyyyMMddHHmm");
} else {
return null;
}

}
/*
* @return the end date range used for the restore, null if there is no state information for the restore.
*/
public String getEndDateRange() {
if (this.startDateRange != null ) {
return this.endDateRange.toString("yyyyMMddHHmm");
} else {
return null;
}

}

/*
* @return the start time of actual restore, null if there is no state information for the restore.
*/
public String getExecStartTime() {
if (this.execStartTime != null ) {
return this.execStartTime.toString("yyyyMMddHHmm");
} else {
return null;
}

}
}
/*
* @return the end time of actual restore, null if there is no state information for the restore.
*/
public String getExecEndTime() {
if (this.execEndTime != null ) {
return this.execEndTime.toString("yyyyMMddHHmm");
} else {
return null;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public void setHostIP(String publicip)

public String getToken()
{
logger.info("Returning token value \"" + token + "\" for this instance to caller.");
return token;
}

Expand Down Expand Up @@ -164,4 +163,4 @@ public void setOutOfService(boolean outOfService)
}


}
}
60 changes: 10 additions & 50 deletions priam/src/main/java/com/netflix/priam/resources/BackupServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,48 +139,7 @@ public Response backupIncrementals() throws Exception
return Response.ok(REST_SUCCESS, MediaType.APPLICATION_JSON).build();
}

@GET
@Path("/restore")
public Response restore(@QueryParam(REST_HEADER_RANGE) String daterange, @QueryParam(REST_HEADER_REGION) String region, @QueryParam(REST_HEADER_TOKEN) String token,
@QueryParam(REST_KEYSPACES) String keyspaces, @QueryParam(REST_RESTORE_PREFIX) String restorePrefix) throws Exception
{
Date startTime;
Date endTime;

if (StringUtils.isBlank(daterange) || daterange.equalsIgnoreCase("default"))
{
startTime = new DateTime().minusDays(1).toDate();
endTime = new DateTime().toDate();
}
else
{
String[] restore = daterange.split(",");
AbstractBackupPath path = pathProvider.get();
startTime = path.parseDate(restore[0]);
endTime = path.parseDate(restore[1]);
}

String origRestorePrefix = config.getRestorePrefix();
if (StringUtils.isNotBlank(restorePrefix))
{
config.setRestorePrefix(restorePrefix);
}

logger.info("Parameters: { token: [" + token + "], region: [" + region + "], startTime: [" + startTime + "], endTime: [" + endTime +
"], keyspaces: [" + keyspaces + "], restorePrefix: [" + restorePrefix + "]}");

restore(token, region, startTime, endTime, keyspaces);

//Since this call is probably never called in parallel, config is multi-thread safe to be edited
if (origRestorePrefix != null)
config.setRestorePrefix(origRestorePrefix);
else config.setRestorePrefix("");

return Response.ok(REST_SUCCESS, MediaType.APPLICATION_JSON).build();
}




@GET
@Path("/list")
public Response list(@QueryParam(REST_HEADER_RANGE) String daterange, @QueryParam(REST_HEADER_FILTER) @DefaultValue("") String filter) throws Exception
Expand Down Expand Up @@ -209,19 +168,20 @@ public Response list(@QueryParam(REST_HEADER_RANGE) String daterange, @QueryPara
return Response.ok(object.toString(2), MediaType.APPLICATION_JSON).build();
}


@GET
@Path("/status")
public Response status() throws Exception
{
int restoreTCount = restoreObj.getActiveCount();
logger.debug("Thread counts for backup is: %d", restoreTCount);
int restoreTCount = restoreObj.getActiveCount(); //Active threads performing the restore
logger.debug("Thread counts for restore is: %d", restoreTCount);
int backupTCount = backupFs.getActivecount();
logger.debug("Thread counts for restore is: %d", backupTCount);
logger.debug("Thread counts for snapshot backup is: %d", backupTCount);
JSONObject object = new JSONObject();
object.put("Restore", new Integer(restoreTCount));
object.put("Status", restoreObj.state().toString());
object.put("Backup", new Integer(backupTCount));
object.put("Status", snapshotBackup.state().toString());
object.put("Restore", new Integer(restoreTCount)); //Number of active threads performing the restore
object.put("status", restoreObj.state().toString()); //state of the restore [ERROR|RUNNING|DONE]
object.put("Backup", new Integer(backupTCount)); //Number of active threads performing the snapshot backups
object.put("Snapshotstatus", snapshotBackup.state().toString());
return Response.ok(object.toString(), MediaType.APPLICATION_JSON).build();
}

Expand Down Expand Up @@ -508,4 +468,4 @@ public void removeAllDataFiles(String ks) throws Exception
logger.info("*** Done cleaning all the files inside <"+cleanupDirPath+">");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,12 @@ public Response getToken()
{
try
{
if (StringUtils.isNotBlank(priamServer.getId().getInstance().getToken()))
return Response.ok(priamServer.getId().getInstance().getToken()).build();
String token = priamServer.getId().getInstance().getToken();
if (StringUtils.isNotBlank(token)) {
logger.info("Returning token value \"" + token + "\" for this instance to caller.");
return Response.ok(priamServer.getId().getInstance().getToken()).build();
}

logger.error("Cannot find token for this instance.");
}
catch (Exception e)
Expand Down Expand Up @@ -167,4 +171,4 @@ public Response doubleRing() throws IOException, ClassNotFoundException
}
return Response.status(200).build();
}
}
}
Loading

0 comments on commit 8ec7847

Please sign in to comment.