Skip to content

Commit

Permalink
fix bug of using cloudsim.File
Browse files Browse the repository at this point in the history
  • Loading branch information
Weiwei Chen committed Mar 24, 2015
1 parent efaf781 commit 33e6ab0
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 66 deletions.
18 changes: 8 additions & 10 deletions sources/org/workflowsim/ClusteringEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.workflowsim;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.cloudbus.cloudsim.Log;
import org.cloudbus.cloudsim.core.CloudSimTags;
Expand Down Expand Up @@ -191,12 +190,12 @@ else if (params.getClustersSize() != 0) {
* @param file, the file to be checked
* @return
*/
private boolean isRealInputFile(List<org.cloudbus.cloudsim.File> list, org.cloudbus.cloudsim.File file) {
private boolean isRealInputFile(List<FileItem> list, FileItem file) {
/**
* if the type is input file)
*/
if (file.getType() == FileType.INPUT.value) {
for (org.cloudbus.cloudsim.File another : list) {
if (file.getType() == FileType.INPUT) {
for (FileItem another : list) {
/**
* if there is another file that has the same name and it is
* output file
Expand All @@ -205,7 +204,7 @@ private boolean isRealInputFile(List<org.cloudbus.cloudsim.File> list, org.cloud
/**
* It is output file
*/
&& another.getType() == FileType.OUTPUT.value) {
&& another.getType() == FileType.OUTPUT) {
return false;
}
}
Expand All @@ -222,7 +221,7 @@ protected void processDatastaging() {
/**
* All the files of this workflow, it is saved in the workflow engine
*/
List list = this.engine.getTaskFiles();
List<FileItem> list = this.engine.getTaskFiles();
/**
* A bug of cloudsim, you cannot set the length of a cloudlet to be
* smaller than 110 otherwise it will fail The reason why we set the id
Expand All @@ -236,14 +235,13 @@ protected void processDatastaging() {
* all the files to be the input of this stage-in job so that
* WorkflowSim will transfers them when this job is executed
*/
List fileList = new ArrayList<>();
for (Iterator it = list.iterator(); it.hasNext();) {
org.cloudbus.cloudsim.File file = (org.cloudbus.cloudsim.File) it.next();
List<FileItem> fileList = new ArrayList<>();
for (FileItem file : list) {
/**
* To avoid duplicate files
*/
if (isRealInputFile(list, file)) {
ReplicaCatalog.addStorageList(file.getName(), Parameters.SOURCE);
ReplicaCatalog.addFileToStorage(file.getName(), Parameters.SOURCE);
fileList.add(file);
}
}
Expand Down
12 changes: 5 additions & 7 deletions sources/org/workflowsim/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.workflowsim;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.cloudbus.cloudsim.Cloudlet;
import org.cloudbus.cloudsim.Consts;
Expand Down Expand Up @@ -173,7 +172,7 @@ public List<Task> getChildList() {
*
* @param list, child list of the task
*/
public void setChildList(List list) {
public void setChildList(List<Task> list) {
this.childList = list;
}

Expand All @@ -182,7 +181,7 @@ public void setChildList(List list) {
*
* @param list, parent list of the task
*/
public void setParentList(List list) {
public void setParentList(List<Task> list) {
this.parentList = list;
}

Expand All @@ -191,7 +190,7 @@ public void setParentList(List list) {
*
* @param list, the child list to be added
*/
public void addChildList(List list) {
public void addChildList(List<Task> list) {
this.childList.addAll(list);
}

Expand All @@ -200,7 +199,7 @@ public void addChildList(List list) {
*
* @param list, the parent list to be added
*/
public void addParentList(List list) {
public void addParentList(List<Task> list) {
this.parentList.addAll(list);
}

Expand Down Expand Up @@ -316,8 +315,7 @@ public double getProcessingCost() {

// ...plus input data transfer cost...
long fileSize = 0;
for (Iterator it = getFileList().iterator(); it.hasNext();) {
org.cloudbus.cloudsim.File file = (org.cloudbus.cloudsim.File) it.next();
for (FileItem file : getFileList()) {
fileSize += file.getSize() / Consts.MILLION;
}
cost += costPerBw * fileSize;
Expand Down
74 changes: 34 additions & 40 deletions sources/org/workflowsim/WorkflowDatacenter.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ protected void processCloudletSubmit(SimEvent ev, boolean ack) {
/**
* cl is actually a job but it is not necessary to cast it to a job
*/
Cloudlet cl = (Cloudlet) ev.getData();
Job job = (Job) ev.getData();

if (cl.isFinished()) {
String name = CloudSim.getEntityName(cl.getUserId());
Log.printLine(getName() + ": Warning - Cloudlet #" + cl.getCloudletId() + " owned by " + name
if (job.isFinished()) {
String name = CloudSim.getEntityName(job.getUserId());
Log.printLine(getName() + ": Warning - Cloudlet #" + job.getCloudletId() + " owned by " + name
+ " is already completed/finished.");
Log.printLine("Therefore, it is not being executed again");
Log.printLine();
Expand All @@ -87,32 +87,32 @@ protected void processCloudletSubmit(SimEvent ev, boolean ack) {
if (ack) {
int[] data = new int[3];
data[0] = getId();
data[1] = cl.getCloudletId();
data[1] = job.getCloudletId();
data[2] = CloudSimTags.FALSE;

// unique tag = operation tag
int tag = CloudSimTags.CLOUDLET_SUBMIT_ACK;
sendNow(cl.getUserId(), tag, data);
sendNow(job.getUserId(), tag, data);
}

sendNow(cl.getUserId(), CloudSimTags.CLOUDLET_RETURN, cl);
sendNow(job.getUserId(), CloudSimTags.CLOUDLET_RETURN, job);

return;
}

int userId = cl.getUserId();
int vmId = cl.getVmId();
int userId = job.getUserId();
int vmId = job.getVmId();
Host host = getVmAllocationPolicy().getHost(vmId, userId);
CondorVM vm = (CondorVM) host.getVm(vmId, userId);

switch (Parameters.getCostModel()) {
case DATACENTER:
// process this Cloudlet to this CloudResource
cl.setResourceParameter(getId(), getCharacteristics().getCostPerSecond(),
job.setResourceParameter(getId(), getCharacteristics().getCostPerSecond(),
getCharacteristics().getCostPerBw());
break;
case VM:
cl.setResourceParameter(getId(), vm.getCost(), vm.getCostPerBW());
job.setResourceParameter(getId(), vm.getCost(), vm.getCostPerBW());
break;
default:
break;
Expand All @@ -121,25 +121,21 @@ protected void processCloudletSubmit(SimEvent ev, boolean ack) {
/**
* Stage-in file && Shared based on the file.system
*/
if (cl.getClassType() == ClassType.STAGE_IN.value) {
stageInFile2FileSystem(cl);
if (job.getClassType() == ClassType.STAGE_IN.value) {
stageInFile2FileSystem(job);
}

/**
* Add data transfer time (communication cost
*/
/**
* Cast cl to job so as we can use its functions
*/
Job job = (Job) cl;

double fileTransferTime = 0.0;
if (cl.getClassType() == ClassType.STAGE_IN.value) {
fileTransferTime = processDataStageIn(job.getFileList(), cl);
if (job.getClassType() == ClassType.STAGE_IN.value) {
fileTransferTime = processDataStageIn(job.getFileList(), job);
}

CloudletScheduler scheduler = vm.getCloudletScheduler();
double estimatedFinishTime = scheduler.cloudletSubmit(cl, fileTransferTime);
double estimatedFinishTime = scheduler.cloudletSubmit(job, fileTransferTime);
updateTaskExecTime(job, vm);

// if this cloudlet is in the exec queue
Expand All @@ -152,11 +148,11 @@ protected void processCloudletSubmit(SimEvent ev, boolean ack) {
if (ack) {
int[] data = new int[3];
data[0] = getId();
data[1] = cl.getCloudletId();
data[1] = job.getCloudletId();
data[2] = CloudSimTags.TRUE;

int tag = CloudSimTags.CLOUDLET_SUBMIT_ACK;
sendNow(cl.getUserId(), tag, data);
sendNow(job.getUserId(), tag, data);
}
} catch (ClassCastException c) {
Log.printLine(getName() + ".processCloudletSubmit(): " + "ClassCastException error.");
Expand All @@ -168,7 +164,7 @@ protected void processCloudletSubmit(SimEvent ev, boolean ack) {
}

/**
* Update the submission time/exec time of a task
* Update the submission time/exec time of a job
*
* @param job
* @param vm
Expand All @@ -193,9 +189,8 @@ private void updateTaskExecTime(Job job, Vm vm) {
* @pre $none
* @post $none
*/
private void stageInFile2FileSystem(Cloudlet cl) {
Task t1 = (Task) cl;
List<FileItem> fList = t1.getFileList();
private void stageInFile2FileSystem(Job job) {
List<FileItem> fList = job.getFileList();

for (FileItem file : fList) {
switch (ReplicaCatalog.getFileSystem()) {
Expand All @@ -204,7 +199,7 @@ private void stageInFile2FileSystem(Cloudlet cl) {
* name)
*/
case LOCAL:
ReplicaCatalog.addStorageList(file.getName(), this.getName());
ReplicaCatalog.addFileToStorage(file.getName(), this.getName());
/**
* Is it not really needed currently but it is left for
* future usage
Expand All @@ -216,7 +211,7 @@ private void stageInFile2FileSystem(Cloudlet cl) {
* For shared file system, add it to the shared storage
*/
case SHARED:
ReplicaCatalog.addStorageList(file.getName(), this.getName());
ReplicaCatalog.addFileToStorage(file.getName(), this.getName());
break;
default:
break;
Expand Down Expand Up @@ -255,12 +250,12 @@ private boolean isRealInputFile(List<FileItem> list, FileItem file) {
/*
* Stage in for a single job (both stage-in job and compute job)
* @param requiredFiles, all files to be stage-in
* @param cl, the job to be processed
* @param job, the job to be processed
* @pre $none
* @post $none
*/

protected double processDataStageIn(List<FileItem> requiredFiles, Cloudlet cl) throws Exception {
protected double processDataStageIn(List<FileItem> requiredFiles, Job job) throws Exception {
double time = 0.0;
for (FileItem file : requiredFiles) {
//The input file is not an output File
Expand All @@ -287,8 +282,8 @@ protected double processDataStageIn(List<FileItem> requiredFiles, Cloudlet cl) t
time += file.getSize() / (double) Consts.MILLION / maxRate;
break;
case LOCAL:
int vmId = cl.getVmId();
int userId = cl.getUserId();
int vmId = job.getVmId();
int userId = job.getUserId();
Host host = getVmAllocationPolicy().getHost(vmId, userId);
Vm vm = host.getVm(vmId, userId);

Expand Down Expand Up @@ -332,7 +327,7 @@ protected double processDataStageIn(List<FileItem> requiredFiles, Cloudlet cl) t
//We should add but since CondorVm has a small capability it often fails
//We currently don't use this storage to do anything meaningful. It is left for future.
//condorVm.addLocalFile(file);
ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId));
ReplicaCatalog.addFileToStorage(file.getName(), Integer.toString(vmId));
break;
}
}
Expand Down Expand Up @@ -394,21 +389,20 @@ protected void checkCloudletCompletion() {
/*
* Register a file to the storage if it is an output file
* @param requiredFiles, all files to be stage-in
* @param cl, the job to be processed
* @param job, the job to be processed
* @pre $none
* @post $none
*/

private void register(Cloudlet cl) {
Task tl = (Task) cl;
List fList = tl.getFileList();
for (Iterator it = fList.iterator(); it.hasNext();) {
org.cloudbus.cloudsim.File file = (org.cloudbus.cloudsim.File) it.next();
if (file.getType() == FileType.OUTPUT.value)//output file
List<FileItem> fList = tl.getFileList();
for (FileItem file : fList) {
if (file.getType() == FileType.OUTPUT)//output file
{
switch (ReplicaCatalog.getFileSystem()) {
case SHARED:
ReplicaCatalog.addStorageList(file.getName(), this.getName());
ReplicaCatalog.addFileToStorage(file.getName(), this.getName());
break;
case LOCAL:
int vmId = cl.getVmId();
Expand All @@ -418,7 +412,7 @@ private void register(Cloudlet cl) {
* Left here for future work
*/
CondorVM vm = (CondorVM) host.getVm(vmId, userId);
ReplicaCatalog.addStorageList(file.getName(), Integer.toString(vmId));
ReplicaCatalog.addFileToStorage(file.getName(), Integer.toString(vmId));
break;
}
}
Expand Down
2 changes: 0 additions & 2 deletions sources/org/workflowsim/WorkflowScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.workflowsim;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.cloudbus.cloudsim.Cloudlet;
import org.cloudbus.cloudsim.DatacenterBroker;
Expand Down Expand Up @@ -350,7 +349,6 @@ protected void processCloudletSubmit(SimEvent ev) {

sendNow(this.getId(), WorkflowSimTags.CLOUDLET_UPDATE);
if (!processCloudletSubmitHasShown) {
//Log.printLine("Pay Attention that the actual vm size is " + getVmsCreatedList().size());
processCloudletSubmitHasShown = true;
}
}
Expand Down
4 changes: 2 additions & 2 deletions sources/org/workflowsim/reclustering/ReclusteringEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,13 @@ private static List DRReclustering(List<Job> jobList, Job job, int id, List<Task
private static void updateDependencies (Job job, List<Job> jobList) {
// Key idea: avoid setChildList(job.getChildList) within the for loop since
// it will override the list
List<Job> parents = job.getParentList();
List<Task> parents = job.getParentList();
List<Task> children = job.getChildList();
for (Job rawJob : jobList) {
rawJob.setChildList(children);
rawJob.setParentList(parents);
}
for (Job parent : parents) {
for (Task parent : parents) {
parent.getChildList().addAll(jobList);
}
for (Task childTask : children) {
Expand Down
10 changes: 5 additions & 5 deletions sources/org/workflowsim/utils/ReplicaCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public enum FileSystem {
/**
* Map from file to a list of data storage
*/
private static Map<String, List> dataReplicaCatalog;
private static Map<String, List<String>> dataReplicaCatalog;

/**
* Initialize a ReplicaCatalog
Expand Down Expand Up @@ -107,7 +107,7 @@ public static boolean containsFile(String fileName) {
* @param file the file object
* @return list of storages
*/
public static List getStorageList(String file) {
public static List<String> getStorageList(String file) {
return dataReplicaCatalog.get(file);
}

Expand All @@ -117,11 +117,11 @@ public static List getStorageList(String file) {
* @param file, a file object
* @param storage , the storage associated with this file
*/
public static void addStorageList(String file, String storage) {
public static void addFileToStorage(String file, String storage) {
if (!dataReplicaCatalog.containsKey(file)) {
dataReplicaCatalog.put(file, new ArrayList());
dataReplicaCatalog.put(file, new ArrayList<>());
}
List list = getStorageList(file);
List<String> list = getStorageList(file);
if (!list.contains(storage)) {
list.add(storage);
}
Expand Down

0 comments on commit 33e6ab0

Please sign in to comment.