Skip to content

Commit

Permalink
Re-Implemented ERA's R-heuristic
Browse files Browse the repository at this point in the history
  • Loading branch information
marcbux committed Oct 25, 2017
1 parent 47cc1ee commit 2be75f1
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 53 deletions.
11 changes: 9 additions & 2 deletions src/de/huberlin/wbi/dcs/examples/Parameters.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class Parameters {

public static long seed = 42;
public static boolean outputDatacenterEvents = false;
public static int numberOfRuns = 1;
public static int numberOfRuns = 100;

public enum Scheduler {
STATIC_ROUND_ROBIN, HEFT, JOB_QUEUE, LATE, C3, ERA
Expand Down Expand Up @@ -252,7 +252,7 @@ public static void parseParameters(String[] args) {
ERA.alpha = Double.valueOf(args[++i]);
}
if (args[i].compareTo("-" + "rho") == 0) {
ERA.rho = Boolean.valueOf(args[++i]);
ERA.rho = Integer.valueOf(args[++i]);
}
if (args[i].compareTo("-" + "numberOfRuns") == 0) {
numberOfRuns = Integer.valueOf(args[++i]);
Expand Down Expand Up @@ -330,6 +330,13 @@ public static void parseParameters(String[] args) {
if (args[i].compareTo("-" + "ramGb") == 0) {
ram = (int) (Double.valueOf(args[++i]) * 1024);
}

if (args[i].compareTo("-" + "noHet") == 0) {
int mips = (mipsPerCoreOpteron270 + mipsPerCoreOpteron2218 + mipsPerCoreXeonE5430) / 3;
mipsPerCoreOpteron270 = mips;
mipsPerCoreOpteron2218 = mips;
mipsPerCoreXeonE5430 = mips;
}
}
numGen = new Random(seed);
}
Expand Down
1 change: 1 addition & 0 deletions src/de/huberlin/wbi/dcs/examples/WorkflowExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public static void main(String[] args) {
Log.printLine("minimum minutes (quotient): " + HeterogeneousCloudlet.getTotalMi() / DynamicHost.getTotalMi() / 60 + " "
+ HeterogeneousCloudlet.getTotalIo() / DynamicHost.getTotalIo() / 60 + " " + HeterogeneousCloudlet.getTotalBw() / DynamicHost.getTotalBw() / 60);
} catch (Exception e) {
// Log.printLine(e.getStackTrace().toString());
e.printStackTrace();
Log.printLine("The simulation has been terminated due to an unexpected error");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ protected void processCloudletReturn(SimEvent ev) {

if (tasksRemaining()) {
submitTasks();
} else if (idleTaskSlots.size() == getVmsCreatedList().size() * getTaskSlotsPerVm()) {
} else if (signalFinished() || (idleTaskSlots.size() == getVmsCreatedList().size() * getTaskSlotsPerVm())) {
Log.printLine(CloudSim.clock() + ": " + getName() + ": All Tasks executed. Finishing...");
terminate();
clearDatacenters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,21 +176,28 @@ protected void processCloudletReturn(SimEvent ev) {
+ task.getName() + " " + task.getParams() + " \"");
runningTasks.remove(task.getCloudletId());

resetTask(task);
taskReady(task);
if (!task.isSpeculativeCopy()) {
resetTask(task);
taskReady(task);
}

idleTaskSlots.add(vm);
taskFailed(task, vm);
}

if (tasksRemaining()) {
submitTasks();
} else if (idleTaskSlots.size() == getVmsCreatedList().size() * getTaskSlotsPerVm()) {
} else if (signalFinished() || (idleTaskSlots.size() == getVmsCreatedList().size() * getTaskSlotsPerVm())) {
Log.printLine(CloudSim.clock() + ": " + getName() + ": All Tasks executed. Finishing...");
terminate();
clearDatacenters();
finishExecution();
}
}

@Override
public boolean signalFinished() {
return false;
}

}
89 changes: 42 additions & 47 deletions src/de/huberlin/wbi/dcs/workflow/scheduler/ERA.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -22,10 +23,10 @@

import de.huberlin.wbi.dcs.workflow.Task;

public class ERA extends AbstractWorkflowScheduler {
public class ERA extends AbstractReplicationScheduler {

public static double alpha = 0.2;
public static boolean rho = false;
public static int rho = 1;

public static boolean printEstimatesVsRuntimes = false;
public static boolean logarithmize = false;
Expand All @@ -35,9 +36,9 @@ public class ERA extends AbstractWorkflowScheduler {

protected Map<String, Queue<Task>> readyTasksPerBot;
protected Map<String, Set<Task>> runningTasksPerBot;
protected Map<Vm, Map<String, Set<Task>>> runningTasksPerBotPerVm;
protected Map<Vm, Map<String, WienerProcessModel>> runtimePerBotPerVm;

protected List<Task> replicas;
protected Set<Task> todo;

public class Runtime {
Expand Down Expand Up @@ -98,7 +99,11 @@ public void addRuntime(double timestamp, double runtime) {
measurements.add(measurement);
}

public double getEstimate(double timestamp) {
public double getEstimate(double timestamp, double quantile) {
if (quantile == 0.5 && measurements.size() > 0) {
return logarithmize ? Math.pow(Math.E, measurements.getLast().runtime) : Math.max(measurements.getLast().runtime, Double.MIN_NORMAL);
}

if (differences.size() < 2) {
return 0d;
}
Expand All @@ -117,10 +122,10 @@ public double getEstimate(double timestamp) {
double estimate = lastMeasurement.runtime;
if (variance > 0d) {
NormalDistribution nd = new NormalDistribution(lastMeasurement.runtime, Math.sqrt(variance));
estimate = nd.inverseCumulativeProbability(alpha);
estimate = nd.inverseCumulativeProbability(quantile);
}

estimate = logarithmize ? Math.pow(Math.E, estimate) : Math.max(estimate, 0d);
estimate = logarithmize ? Math.pow(Math.E, estimate) : Math.max(estimate, Double.MIN_NORMAL);

if (printEstimatesVsRuntimes) {
Runtime runtime = new Runtime(timestamp, estimate);
Expand All @@ -134,9 +139,8 @@ public ERA(String name, int taskSlotsPerVm, int runId) throws Exception {
super(name, taskSlotsPerVm);
readyTasksPerBot = new HashMap<>();
runningTasksPerBot = new HashMap<>();
runningTasksPerBotPerVm = new HashMap<>();
runtimePerBotPerVm = new HashMap<>();
// stageintimePerMBPerVm = new HashMap<>();
replicas = new LinkedList<>();
this.runId = runId;
Locale loc = new Locale("en");
df = (DecimalFormat) NumberFormat.getNumberInstance(loc);
Expand All @@ -151,9 +155,6 @@ public void reschedule(Collection<Task> tasks, Collection<Vm> vms) {
if (!runtimePerBotPerVm.containsKey(vm)) {
Map<String, WienerProcessModel> runtimePerBot = new HashMap<>();
runtimePerBotPerVm.put(vm, runtimePerBot);

Map<String, Set<Task>> runningTasksPerBot_ = new HashMap<>();
runningTasksPerBotPerVm.put(vm, runningTasksPerBot_);
}
}

Expand All @@ -176,40 +177,35 @@ public Task getNextTask(Vm vm) {

Map<String, Queue<Task>> b_ready = readyTasksPerBot;
Map<String, Set<Task>> b_run = runningTasksPerBot;
Map<String, Set<Task>> b_j = runningTasksPerBotPerVm.get(vm);

Map<String, Queue<Task>> b_select = b_ready;
if (b_ready.isEmpty()) {
if (b_run.equals(b_j) || !rho) {
return null;
}

b_select = new HashMap<>();
for (Entry<String, Set<Task>> e : b_run.entrySet()) {
Queue<Task> tasks = new LinkedList<>(e.getValue());
if (b_j.containsKey(e.getKey()))
tasks.removeAll(b_j.get(e.getKey()));
if (!tasks.isEmpty())
b_select.put(e.getKey(), tasks);
b_select.put(e.getKey(), tasks);
}
replicate = true;
}

Set<Vm> m = runningTasksPerBotPerVm.keySet();
Set<Vm> m = runtimePerBotPerVm.keySet();
Queue<Task> b_min = null;
double s_min = Double.MAX_VALUE;
for (Entry<String, Queue<Task>> b_i : b_select.entrySet()) {
double e_j = runtimePerBotPerVm.get(vm).get(b_i.getKey()).getEstimate(CloudSim.clock());
double e_j = runtimePerBotPerVm.get(vm).get(b_i.getKey()).getEstimate(CloudSim.clock(), replicate ? 0.5 : alpha);
if (e_j == 0) {
b_min = b_i.getValue();
break;
if (!replicate) {
b_min = b_i.getValue();
break;
}
e_j = Double.MAX_VALUE;
}

double e_min = Double.MAX_VALUE;
double e_sum = e_j;
int e_num = 1;
for (Vm k : m) {
double e_k = runtimePerBotPerVm.get(k).get(b_i.getKey()).getEstimate(CloudSim.clock());
double e_k = runtimePerBotPerVm.get(k).get(b_i.getKey()).getEstimate(CloudSim.clock(), replicate ? 0.5 : alpha);
if (k.equals(vm) || e_k == 0)
continue;
if (e_k < e_min) {
Expand All @@ -232,21 +228,19 @@ public Task getNextTask(Vm vm) {
if (replicate) {
task = new Task(task);
task.setSpeculativeCopy(true);
} else if (readyTasksPerBot.get(task.getName()).isEmpty()) {
readyTasksPerBot.remove(task.getName());
}
replicas.add(task);
} else {
if (!runningTasksPerBot.containsKey(task.getName())) {
Set<Task> s = new HashSet<>();
runningTasksPerBot.put(task.getName(), s);
}
runningTasksPerBot.get(task.getName()).add(task);

if (!runningTasksPerBot.containsKey(task.getName())) {
Set<Task> s = new HashSet<>();
runningTasksPerBot.put(task.getName(), s);
}
runningTasksPerBot.get(task.getName()).add(task);
if (readyTasksPerBot.get(task.getName()).isEmpty()) {
readyTasksPerBot.remove(task.getName());
}

if (!runningTasksPerBotPerVm.get(vm).containsKey(task.getName())) {
Set<Task> s = new HashSet<>();
runningTasksPerBotPerVm.get(vm).put(task.getName(), s);
}
runningTasksPerBotPerVm.get(vm).get(task.getName()).add(task);

return task;
}
Expand All @@ -265,7 +259,12 @@ public void taskReady(Task task) {

@Override
public boolean tasksRemaining() {
return !todo.isEmpty();
return (!signalFinished() && (!readyTasksPerBot.isEmpty() || replicas.size() < rho));
}

@Override
public boolean signalFinished() {
return todo.isEmpty();
}

@Override
Expand All @@ -283,19 +282,15 @@ public void taskFailed(Task task, Vm vm) {
}

private void taskFinished(Task task) {
if (runningTasksPerBot.containsKey(task.getName()) && runningTasksPerBot.get(task.getName()).contains(task)) {
// if (task.isSpeculativeCopy()) {
while (replicas.contains(task))
replicas.remove(task);
// } else {
if (runningTasksPerBot.containsKey(task.getName())) {
runningTasksPerBot.get(task.getName()).remove(task);
if (runningTasksPerBot.get(task.getName()).isEmpty())
runningTasksPerBot.remove(task.getName());
}

for (Map<String, Set<Task>> bot : runningTasksPerBotPerVm.values()) {
if (bot.containsKey(task.getName()) && bot.get(task.getName()).contains(task)) {
bot.get(task.getName()).remove(task);
if (bot.get(task.getName()).isEmpty())
bot.remove(task.getName());
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public interface WorkflowScheduler {
public void taskFailed(Task task, Vm vm);

public boolean tasksRemaining();

public boolean signalFinished();

public void terminate();

Expand Down

0 comments on commit 2be75f1

Please sign in to comment.