Skip to content

Commit

Permalink
merge: Support snapshotting simulated workloads (#113)
Browse files Browse the repository at this point in the history
This pull request adds support for snapshotting simulated workloads in OpenDC, which
serves as the basis for virtual machine migration/suspension support.

Part of #32 

## Implementation Notes ⚒️

* Support synchronous update of FlowStage
* Report exceptions in onStop as suppressed 
* Add support for snapshotting workloads
  • Loading branch information
fabianishere authored Oct 31, 2022
2 parents b96acc6 + c9750e5 commit 9d06fb0
Show file tree
Hide file tree
Showing 14 changed files with 398 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public class SimHost(

val bootWorkload = bootModel.get()
val hypervisor = hypervisor
val hypervisorWorkload = object : SimWorkload {
val hypervisorWorkload = object : SimWorkload by hypervisor {
override fun onStart(ctx: SimMachineContext) {
try {
_bootTime = clock.instant()
Expand All @@ -296,10 +296,6 @@ public class SimHost(
throw cause
}
}

override fun onStop(ctx: SimMachineContext) {
hypervisor.onStop(ctx)
}
}

val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public class SimTFDevice(
output = null
}

override fun snapshot(): SimWorkload = throw UnsupportedOperationException()

override fun onUpdate(ctx: FlowStage, now: Long): Long {
val output = output ?: return Long.MAX_VALUE
val lastPull = lastPull
Expand Down
1 change: 0 additions & 1 deletion opendc-simulator/opendc-simulator-compute/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ dependencies {
api(projects.opendcSimulator.opendcSimulatorPower)
api(projects.opendcSimulator.opendcSimulatorNetwork)
implementation(projects.opendcSimulator.opendcSimulatorCore)
implementation(libs.kotlin.logging)

testImplementation(libs.slf4j.simple)
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,11 @@
import org.opendc.simulator.flow2.sink.SimpleFlowSink;
import org.opendc.simulator.flow2.util.FlowTransformer;
import org.opendc.simulator.flow2.util.FlowTransforms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Abstract implementation of the {@link SimMachine} interface.
*/
public abstract class SimAbstractMachine implements SimMachine {
private static final Logger LOGGER = LoggerFactory.getLogger(SimAbstractMachine.class);
private final MachineModel model;

private Context activeContext;
Expand Down Expand Up @@ -108,7 +105,6 @@ public abstract static class Context implements SimMachineContext {
private final Map<String, Object> meta;
private final Consumer<Exception> completion;
private boolean isClosed;
private Exception cause;

/**
* Construct a new {@link Context} instance.
Expand All @@ -134,6 +130,11 @@ public final Map<String, Object> getMeta() {
return meta;
}

@Override
public SimWorkload snapshot() {
return workload.snapshot();
}

@Override
public void reset() {
final FlowGraph graph = getMemory().getInput().getGraph();
Expand All @@ -158,6 +159,11 @@ public void reset() {

@Override
public final void shutdown() {
shutdown(null);
}

@Override
public final void shutdown(Exception cause) {
if (isClosed) {
return;
}
Expand All @@ -170,19 +176,17 @@ public final void shutdown() {
// Cancel all the resources associated with the machine
doCancel();

Exception e = this.cause;

try {
workload.onStop(this);
} catch (Exception cause) {
if (e != null) {
e.addSuppressed(cause);
} catch (Exception e) {
if (cause == null) {
cause = e;
} else {
e = cause;
cause.addSuppressed(e);
}
}

completion.accept(e);
completion.accept(cause);
}

/**
Expand All @@ -193,8 +197,7 @@ final void start() {
machine.activeContext = this;
workload.onStart(this);
} catch (Exception cause) {
this.cause = cause;
shutdown();
shutdown(cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.opendc.simulator.compute.workload.SimWorkload;
import org.opendc.simulator.flow2.FlowGraph;

Expand All @@ -43,7 +44,7 @@ public interface SimMachineContext {
/**
* Return the metadata associated with the context.
* <p>
* Users can pass this metadata to the workload via {@link SimMachine#startWorkload(SimWorkload, Map)}.
* Users can pass this metadata to the workload via {@link SimMachine#startWorkload(SimWorkload, Map, Consumer)}.
*/
Map<String, Object> getMeta();

Expand All @@ -67,6 +68,13 @@ public interface SimMachineContext {
*/
List<? extends SimStorageInterface> getStorageInterfaces();

/**
* Create a snapshot of the {@link SimWorkload} running on this machine.
*
* @throws UnsupportedOperationException if the workload does not support snapshotting.
*/
SimWorkload snapshot();

/**
* Reset all resources of the machine.
*/
Expand All @@ -76,4 +84,11 @@ public interface SimMachineContext {
* Shutdown the workload.
*/
void shutdown();

/**
* Shutdown the workload due to failure.
*
* @param cause The cause for shutting down the workload.
*/
void shutdown(Exception cause);
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ public void onStop(SimMachineContext ctx) {
}
}

@Override
public SimWorkload snapshot() {
throw new UnsupportedOperationException("Unable to snapshot hypervisor");
}

/**
* The context which carries the state when the hypervisor is running on a machine.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,11 @@
import org.opendc.simulator.compute.SimProcessingUnit;
import org.opendc.simulator.compute.SimStorageInterface;
import org.opendc.simulator.flow2.FlowGraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link SimWorkload} that composes two {@link SimWorkload}s.
*/
final class SimChainWorkload implements SimWorkload {
private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class);

private final SimWorkload[] workloads;
private int activeWorkloadIndex;

Expand All @@ -48,9 +44,20 @@ final class SimChainWorkload implements SimWorkload {
* Construct a {@link SimChainWorkload} instance.
*
* @param workloads The workloads to chain.
* @param activeWorkloadIndex The index of the active workload.
*/
SimChainWorkload(SimWorkload... workloads) {
SimChainWorkload(SimWorkload[] workloads, int activeWorkloadIndex) {
this.workloads = workloads;
this.activeWorkloadIndex = activeWorkloadIndex;
}

/**
* Construct a {@link SimChainWorkload} instance.
*
* @param workloads The workloads to chain.
*/
SimChainWorkload(SimWorkload... workloads) {
this(workloads, 0);
}

@Override
Expand All @@ -65,9 +72,7 @@ public void onStart(SimMachineContext ctx) {
final Context context = new Context(ctx);
activeContext = context;

if (!context.doStart(workloads[activeWorkloadIndex])) {
ctx.shutdown();
}
tryThrow(context.doStart(workloads[activeWorkloadIndex]));
}

@Override
Expand All @@ -82,7 +87,20 @@ public void onStop(SimMachineContext ctx) {
final Context context = activeContext;
activeContext = null;

context.doStop(workloads[activeWorkloadIndex]);
tryThrow(context.doStop(workloads[activeWorkloadIndex]));
}

@Override
public SimChainWorkload snapshot() {
final int activeWorkloadIndex = this.activeWorkloadIndex;
final SimWorkload[] workloads = this.workloads;
final SimWorkload[] newWorkloads = new SimWorkload[workloads.length - activeWorkloadIndex];

for (int i = 0; i < newWorkloads.length; i++) {
newWorkloads[i] = workloads[activeWorkloadIndex + i].snapshot();
}

return new SimChainWorkload(newWorkloads, 0);
}

/**
Expand Down Expand Up @@ -125,58 +143,93 @@ public List<? extends SimStorageInterface> getStorageInterfaces() {
return ctx.getStorageInterfaces();
}

@Override
public SimWorkload snapshot() {
final SimWorkload workload = workloads[activeWorkloadIndex];
return workload.snapshot();
}

@Override
public void reset() {
ctx.reset();
}

@Override
public void shutdown() {
shutdown(null);
}

@Override
public void shutdown(Exception cause) {
final SimWorkload[] workloads = SimChainWorkload.this.workloads;
final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex;

if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) {
final Exception stopException = doStop(workloads[activeWorkloadIndex - 1]);
if (cause == null) {
cause = stopException;
} else if (stopException != null) {
cause.addSuppressed(stopException);
}

if (stopException == null && activeWorkloadIndex < workloads.length) {
ctx.reset();

if (doStart(workloads[activeWorkloadIndex])) {
final Exception startException = doStart(workloads[activeWorkloadIndex]);

if (startException == null) {
return;
} else if (cause == null) {
cause = startException;
} else {
cause.addSuppressed(startException);
}
}

ctx.shutdown();
ctx.shutdown(cause);
}

/**
* Start the specified workload.
*
* @return <code>true</code> if the workload started successfully, <code>false</code> otherwise.
* @return The {@link Exception} that occurred while starting the workload or <code>null</code> if the workload
* started successfully.
*/
private boolean doStart(SimWorkload workload) {
private Exception doStart(SimWorkload workload) {
try {
workload.onStart(this);
} catch (Exception cause) {
LOGGER.warn("Workload failed during onStart callback", cause);
doStop(workload);
return false;
final Exception stopException = doStop(workload);
if (stopException != null) {
cause.addSuppressed(stopException);
}
return cause;
}

return true;
return null;
}

/**
* Stop the specified workload.
*
* @return <code>true</code> if the workload stopped successfully, <code>false</code> otherwise.
* @return The {@link Exception} that occurred while stopping the workload or <code>null</code> if the workload
* stopped successfully.
*/
private boolean doStop(SimWorkload workload) {
private Exception doStop(SimWorkload workload) {
try {
workload.onStop(this);
} catch (Exception cause) {
LOGGER.warn("Workload failed during onStop callback", cause);
return false;
return cause;
}

return true;
return null;
}
}

@SuppressWarnings("unchecked")
private static <T extends Throwable> void tryThrow(Throwable e) throws T {
if (e == null) {
return;
}
throw (T) e;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic {

this.flops = flops;
this.utilization = utilization;
this.remainingAmount = flops;
}

@Override
Expand Down Expand Up @@ -98,8 +99,13 @@ public void onStop(SimMachineContext ctx) {
}

@Override
public String toString() {
return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]";
public SimFlopsWorkload snapshot() {
final FlowStage stage = this.stage;
if (stage != null) {
stage.sync();
}

return new SimFlopsWorkload((long) remainingAmount, utilization);
}

@Override
Expand Down Expand Up @@ -138,4 +144,9 @@ public long onUpdate(FlowStage ctx, long now) {

return now + duration;
}

@Override
public String toString() {
return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]";
}
}
Loading

0 comments on commit 9d06fb0

Please sign in to comment.