diff --git a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt index b3e56f38c..5eccc6ec2 100644 --- a/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt +++ b/opendc-compute/opendc-compute-simulator/src/main/kotlin/org/opendc/compute/simulator/SimHost.kt @@ -280,7 +280,7 @@ public class SimHost( val bootWorkload = bootModel.get() val hypervisor = hypervisor - val hypervisorWorkload = object : SimWorkload { + val hypervisorWorkload = object : SimWorkload by hypervisor { override fun onStart(ctx: SimMachineContext) { try { _bootTime = clock.instant() @@ -296,10 +296,6 @@ public class SimHost( throw cause } } - - override fun onStop(ctx: SimMachineContext) { - hypervisor.onStop(ctx) - } } val workload = if (bootWorkload != null) SimWorkloads.chain(bootWorkload, hypervisorWorkload) else hypervisorWorkload diff --git a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt index a7fc102a7..eb3089703 100644 --- a/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt +++ b/opendc-experiments/opendc-experiments-tf20/src/main/kotlin/org/opendc/experiments/tf20/core/SimTFDevice.kt @@ -127,6 +127,8 @@ public class SimTFDevice( output = null } + override fun snapshot(): SimWorkload = throw UnsupportedOperationException() + override fun onUpdate(ctx: FlowStage, now: Long): Long { val output = output ?: return Long.MAX_VALUE val lastPull = lastPull diff --git a/opendc-simulator/opendc-simulator-compute/build.gradle.kts b/opendc-simulator/opendc-simulator-compute/build.gradle.kts index f3f90bb63..0ea0c2528 100644 --- a/opendc-simulator/opendc-simulator-compute/build.gradle.kts +++ b/opendc-simulator/opendc-simulator-compute/build.gradle.kts @@ -32,7 +32,6 @@ dependencies { api(projects.opendcSimulator.opendcSimulatorPower) api(projects.opendcSimulator.opendcSimulatorNetwork) implementation(projects.opendcSimulator.opendcSimulatorCore) - implementation(libs.kotlin.logging) testImplementation(libs.slf4j.simple) } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java index d968d8844..f684c54dd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimAbstractMachine.java @@ -36,14 +36,11 @@ import org.opendc.simulator.flow2.sink.SimpleFlowSink; import org.opendc.simulator.flow2.util.FlowTransformer; import org.opendc.simulator.flow2.util.FlowTransforms; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Abstract implementation of the {@link SimMachine} interface. */ public abstract class SimAbstractMachine implements SimMachine { - private static final Logger LOGGER = LoggerFactory.getLogger(SimAbstractMachine.class); private final MachineModel model; private Context activeContext; @@ -108,7 +105,6 @@ public abstract static class Context implements SimMachineContext { private final Map meta; private final Consumer completion; private boolean isClosed; - private Exception cause; /** * Construct a new {@link Context} instance. @@ -134,6 +130,11 @@ public final Map getMeta() { return meta; } + @Override + public SimWorkload snapshot() { + return workload.snapshot(); + } + @Override public void reset() { final FlowGraph graph = getMemory().getInput().getGraph(); @@ -158,6 +159,11 @@ public void reset() { @Override public final void shutdown() { + shutdown(null); + } + + @Override + public final void shutdown(Exception cause) { if (isClosed) { return; } @@ -170,19 +176,17 @@ public final void shutdown() { // Cancel all the resources associated with the machine doCancel(); - Exception e = this.cause; - try { workload.onStop(this); - } catch (Exception cause) { - if (e != null) { - e.addSuppressed(cause); + } catch (Exception e) { + if (cause == null) { + cause = e; } else { - e = cause; + cause.addSuppressed(e); } } - completion.accept(e); + completion.accept(cause); } /** @@ -193,8 +197,7 @@ final void start() { machine.activeContext = this; workload.onStart(this); } catch (Exception cause) { - this.cause = cause; - shutdown(); + shutdown(cause); } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java index 5d08e2b79..bce5c0a8b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/SimMachineContext.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.opendc.simulator.compute.workload.SimWorkload; import org.opendc.simulator.flow2.FlowGraph; @@ -43,7 +44,7 @@ public interface SimMachineContext { /** * Return the metadata associated with the context. *

- * Users can pass this metadata to the workload via {@link SimMachine#startWorkload(SimWorkload, Map)}. + * Users can pass this metadata to the workload via {@link SimMachine#startWorkload(SimWorkload, Map, Consumer)}. */ Map getMeta(); @@ -67,6 +68,13 @@ public interface SimMachineContext { */ List getStorageInterfaces(); + /** + * Create a snapshot of the {@link SimWorkload} running on this machine. + * + * @throws UnsupportedOperationException if the workload does not support snapshotting. + */ + SimWorkload snapshot(); + /** * Reset all resources of the machine. */ @@ -76,4 +84,11 @@ public interface SimMachineContext { * Shutdown the workload. */ void shutdown(); + + /** + * Shutdown the workload due to failure. + * + * @param cause The cause for shutting down the workload. + */ + void shutdown(Exception cause); } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java index f03a0c201..4ebcba714 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/kernel/SimHypervisor.java @@ -240,6 +240,11 @@ public void onStop(SimMachineContext ctx) { } } + @Override + public SimWorkload snapshot() { + throw new UnsupportedOperationException("Unable to snapshot hypervisor"); + } + /** * The context which carries the state when the hypervisor is running on a machine. */ diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java index 9304122ac..7480b3c0e 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimChainWorkload.java @@ -30,15 +30,11 @@ import org.opendc.simulator.compute.SimProcessingUnit; import org.opendc.simulator.compute.SimStorageInterface; import org.opendc.simulator.flow2.FlowGraph; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * A {@link SimWorkload} that composes two {@link SimWorkload}s. */ final class SimChainWorkload implements SimWorkload { - private static final Logger LOGGER = LoggerFactory.getLogger(SimChainWorkload.class); - private final SimWorkload[] workloads; private int activeWorkloadIndex; @@ -48,9 +44,20 @@ final class SimChainWorkload implements SimWorkload { * Construct a {@link SimChainWorkload} instance. * * @param workloads The workloads to chain. + * @param activeWorkloadIndex The index of the active workload. */ - SimChainWorkload(SimWorkload... workloads) { + SimChainWorkload(SimWorkload[] workloads, int activeWorkloadIndex) { this.workloads = workloads; + this.activeWorkloadIndex = activeWorkloadIndex; + } + + /** + * Construct a {@link SimChainWorkload} instance. + * + * @param workloads The workloads to chain. + */ + SimChainWorkload(SimWorkload... workloads) { + this(workloads, 0); } @Override @@ -65,9 +72,7 @@ public void onStart(SimMachineContext ctx) { final Context context = new Context(ctx); activeContext = context; - if (!context.doStart(workloads[activeWorkloadIndex])) { - ctx.shutdown(); - } + tryThrow(context.doStart(workloads[activeWorkloadIndex])); } @Override @@ -82,7 +87,20 @@ public void onStop(SimMachineContext ctx) { final Context context = activeContext; activeContext = null; - context.doStop(workloads[activeWorkloadIndex]); + tryThrow(context.doStop(workloads[activeWorkloadIndex])); + } + + @Override + public SimChainWorkload snapshot() { + final int activeWorkloadIndex = this.activeWorkloadIndex; + final SimWorkload[] workloads = this.workloads; + final SimWorkload[] newWorkloads = new SimWorkload[workloads.length - activeWorkloadIndex]; + + for (int i = 0; i < newWorkloads.length; i++) { + newWorkloads[i] = workloads[activeWorkloadIndex + i].snapshot(); + } + + return new SimChainWorkload(newWorkloads, 0); } /** @@ -125,6 +143,12 @@ public List getStorageInterfaces() { return ctx.getStorageInterfaces(); } + @Override + public SimWorkload snapshot() { + final SimWorkload workload = workloads[activeWorkloadIndex]; + return workload.snapshot(); + } + @Override public void reset() { ctx.reset(); @@ -132,51 +156,80 @@ public void reset() { @Override public void shutdown() { + shutdown(null); + } + + @Override + public void shutdown(Exception cause) { final SimWorkload[] workloads = SimChainWorkload.this.workloads; final int activeWorkloadIndex = ++SimChainWorkload.this.activeWorkloadIndex; - if (doStop(workloads[activeWorkloadIndex - 1]) && activeWorkloadIndex < workloads.length) { + final Exception stopException = doStop(workloads[activeWorkloadIndex - 1]); + if (cause == null) { + cause = stopException; + } else if (stopException != null) { + cause.addSuppressed(stopException); + } + + if (stopException == null && activeWorkloadIndex < workloads.length) { ctx.reset(); - if (doStart(workloads[activeWorkloadIndex])) { + final Exception startException = doStart(workloads[activeWorkloadIndex]); + + if (startException == null) { return; + } else if (cause == null) { + cause = startException; + } else { + cause.addSuppressed(startException); } } - ctx.shutdown(); + ctx.shutdown(cause); } /** * Start the specified workload. * - * @return true if the workload started successfully, false otherwise. + * @return The {@link Exception} that occurred while starting the workload or null if the workload + * started successfully. */ - private boolean doStart(SimWorkload workload) { + private Exception doStart(SimWorkload workload) { try { workload.onStart(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStart callback", cause); - doStop(workload); - return false; + final Exception stopException = doStop(workload); + if (stopException != null) { + cause.addSuppressed(stopException); + } + return cause; } - return true; + return null; } /** * Stop the specified workload. * - * @return true if the workload stopped successfully, false otherwise. + * @return The {@link Exception} that occurred while stopping the workload or null if the workload + * stopped successfully. */ - private boolean doStop(SimWorkload workload) { + private Exception doStop(SimWorkload workload) { try { workload.onStop(this); } catch (Exception cause) { - LOGGER.warn("Workload failed during onStop callback", cause); - return false; + return cause; } - return true; + return null; + } + } + + @SuppressWarnings("unchecked") + private static void tryThrow(Throwable e) throws T { + if (e == null) { + return; } + throw (T) e; } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java index f3efbebba..839856bbe 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimFlopsWorkload.java @@ -60,6 +60,7 @@ public class SimFlopsWorkload implements SimWorkload, FlowStageLogic { this.flops = flops; this.utilization = utilization; + this.remainingAmount = flops; } @Override @@ -98,8 +99,13 @@ public void onStop(SimMachineContext ctx) { } @Override - public String toString() { - return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]"; + public SimFlopsWorkload snapshot() { + final FlowStage stage = this.stage; + if (stage != null) { + stage.sync(); + } + + return new SimFlopsWorkload((long) remainingAmount, utilization); } @Override @@ -138,4 +144,9 @@ public long onUpdate(FlowStage ctx, long now) { return now + duration; } + + @Override + public String toString() { + return "SimFlopsWorkload[FLOPs=" + flops + ",utilization=" + utilization + "]"; + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java index 194efafdc..9c9f47881 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimRuntimeWorkload.java @@ -59,6 +59,7 @@ public class SimRuntimeWorkload implements SimWorkload, FlowStageLogic { this.duration = duration; this.utilization = utilization; + this.remainingDuration = duration; } @Override @@ -97,6 +98,16 @@ public void onStop(SimMachineContext ctx) { } } + @Override + public SimRuntimeWorkload snapshot() { + final FlowStage stage = this.stage; + if (stage != null) { + stage.sync(); + } + + return new SimRuntimeWorkload(remainingDuration, utilization); + } + @Override public long onUpdate(FlowStage ctx, long now) { long lastUpdate = this.lastUpdate; diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java index 12a567ff1..1d8667d5b 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimTrace.java @@ -71,7 +71,7 @@ private SimTrace(double[] usageCol, long[] deadlineCol, int[] coresCol, int size * @param offset The offset for the timestamps. */ public SimWorkload createWorkload(long offset) { - return new Workload(offset, usageCol, deadlineCol, coresCol, size); + return new Workload(offset, usageCol, deadlineCol, coresCol, size, 0); } /** @@ -211,22 +211,24 @@ private static class Workload implements SimWorkload { private final long[] deadlineCol; private final int[] coresCol; private final int size; + private final int index; - private Workload(long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size) { + private Workload(long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size, int index) { this.offset = offset; this.usageCol = usageCol; this.deadlineCol = deadlineCol; this.coresCol = coresCol; this.size = size; + this.index = index; } @Override public void onStart(SimMachineContext ctx) { final WorkloadStageLogic logic; if (ctx.getCpus().size() == 1) { - logic = new SingleWorkloadLogic(ctx, offset, usageCol, deadlineCol, size); + logic = new SingleWorkloadLogic(ctx, offset, usageCol, deadlineCol, size, index); } else { - logic = new MultiWorkloadLogic(ctx, offset, usageCol, deadlineCol, coresCol, size); + logic = new MultiWorkloadLogic(ctx, offset, usageCol, deadlineCol, coresCol, size, index); } this.logic = logic; } @@ -240,6 +242,18 @@ public void onStop(SimMachineContext ctx) { logic.getStage().close(); } } + + @Override + public SimWorkload snapshot() { + final WorkloadStageLogic logic = this.logic; + int index = this.index; + + if (logic != null) { + index = logic.getIndex(); + } + + return new Workload(offset, usageCol, deadlineCol, coresCol, size, index); + } } /** @@ -250,6 +264,11 @@ private interface WorkloadStageLogic extends FlowStageLogic { * Return the {@link FlowStage} belonging to this instance. */ FlowStage getStage(); + + /** + * Return the current index of the workload. + */ + int getIndex(); } /** @@ -268,12 +287,13 @@ private static class SingleWorkloadLogic implements WorkloadStageLogic { private final SimMachineContext ctx; private SingleWorkloadLogic( - SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int size) { + SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int size, int index) { this.ctx = ctx; this.offset = offset; this.usageCol = usageCol; this.deadlineCol = deadlineCol; this.size = size; + this.index = index; final FlowGraph graph = ctx.getGraph(); final List cpus = ctx.getCpus(); @@ -315,6 +335,11 @@ public FlowStage getStage() { return stage; } + @Override + public int getIndex() { + return index; + } + /** * Helper method to stop the execution of the workload. */ @@ -346,13 +371,20 @@ private static class MultiWorkloadLogic implements WorkloadStageLogic { private final SimMachineContext ctx; private MultiWorkloadLogic( - SimMachineContext ctx, long offset, double[] usageCol, long[] deadlineCol, int[] coresCol, int size) { + SimMachineContext ctx, + long offset, + double[] usageCol, + long[] deadlineCol, + int[] coresCol, + int size, + int index) { this.ctx = ctx; this.offset = offset; this.usageCol = usageCol; this.deadlineCol = deadlineCol; this.coresCol = coresCol; this.size = size; + this.index = index; final FlowGraph graph = ctx.getGraph(); final List cpus = ctx.getCpus(); @@ -418,5 +450,10 @@ public long onUpdate(FlowStage ctx, long now) { public FlowStage getStage() { return stage; } + + @Override + public int getIndex() { + return index; + } } } diff --git a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java index 7be51265d..cad324fbd 100644 --- a/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java +++ b/opendc-simulator/opendc-simulator-compute/src/main/java/org/opendc/simulator/compute/workload/SimWorkload.java @@ -45,4 +45,9 @@ public interface SimWorkload { * @param ctx The execution context in which the machine runs. */ void onStop(SimMachineContext ctx); + + /** + * Create a snapshot of this workload. + */ + SimWorkload snapshot(); } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt index 266839bd4..2acf6ec7f 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/SimMachineTest.kt @@ -22,6 +22,8 @@ package org.opendc.simulator.compute +import io.mockk.every +import io.mockk.mockk import kotlinx.coroutines.CancellationException import kotlinx.coroutines.cancel import kotlinx.coroutines.coroutineScope @@ -175,6 +177,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) } @@ -195,6 +199,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) } @@ -215,6 +221,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) assertEquals(1000, clock.millis()) @@ -241,6 +249,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) assertEquals(40, clock.millis()) @@ -264,6 +274,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) assertEquals(4000, clock.millis()) @@ -287,6 +299,8 @@ class SimMachineTest { } override fun onStop(ctx: SimMachineContext) {} + + override fun snapshot(): SimWorkload = TODO() }) assertEquals(4000, clock.millis()) @@ -334,4 +348,71 @@ class SimMachineTest { } } } + + @Test + fun testCatchStartFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = mockk() + every { workload.onStart(any()) } throws IllegalStateException() + + assertThrows { machine.runWorkload(workload) } + } + + @Test + fun testCatchStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = mockk() + every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown() } + every { workload.onStop(any()) } throws IllegalStateException() + + assertThrows { machine.runWorkload(workload) } + } + + @Test + fun testCatchShutdownFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = mockk() + every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) } + + assertThrows { machine.runWorkload(workload) } + } + + @Test + fun testCatchNestedFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workload = mockk() + every { workload.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) } + every { workload.onStop(any()) } throws IllegalStateException() + + val exc = assertThrows { machine.runWorkload(workload) } + assertEquals(1, exc.cause!!.suppressedExceptions.size) + } } diff --git a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt index 6bf05f651..d0b0efaa1 100644 --- a/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt +++ b/opendc-simulator/opendc-simulator-compute/src/test/kotlin/org/opendc/simulator/compute/workload/SimChainWorkloadTest.kt @@ -25,10 +25,14 @@ package org.opendc.simulator.compute.workload import io.mockk.every import io.mockk.mockk import io.mockk.spyk +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.opendc.simulator.compute.SimBareMetalMachine +import org.opendc.simulator.compute.SimMachineContext import org.opendc.simulator.compute.model.MachineModel import org.opendc.simulator.compute.model.MemoryUnit import org.opendc.simulator.compute.model.ProcessingNode @@ -65,8 +69,8 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( - SimRuntimeWorkload(1000, 1.0), - SimRuntimeWorkload(1000, 1.0) + SimWorkloads.runtime(1000, 1.0), + SimWorkloads.runtime(1000, 1.0) ) machine.runWorkload(workload) @@ -91,10 +95,10 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( workloadA, - SimRuntimeWorkload(1000, 1.0) + SimWorkloads.runtime(1000, 1.0) ) - machine.runWorkload(workload) + assertThrows { machine.runWorkload(workload) } assertEquals(0, clock.millis()) } @@ -115,12 +119,12 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( - SimRuntimeWorkload(1000, 1.0), + SimWorkloads.runtime(1000, 1.0), workloadA, - SimRuntimeWorkload(1000, 1.0) + SimWorkloads.runtime(1000, 1.0) ) - machine.runWorkload(workload) + assertThrows { machine.runWorkload(workload) } assertEquals(1000, clock.millis()) } @@ -141,10 +145,10 @@ class SimChainWorkloadTest { val workload = SimWorkloads.chain( workloadA, - SimRuntimeWorkload(1000, 1.0) + SimWorkloads.runtime(1000, 1.0) ) - machine.runWorkload(workload) + assertThrows { machine.runWorkload(workload) } assertEquals(1000, clock.millis()) } @@ -162,15 +166,120 @@ class SimChainWorkloadTest { val workloadA = spyk(SimRuntimeWorkload(1000, 1.0)) every { workloadA.onStop(any()) } throws IllegalStateException("Staged") + val workload = + SimWorkloads.chain( + SimWorkloads.runtime(1000, 1.0), + workloadA, + SimWorkloads.runtime(1000, 1.0) + ) + + assertThrows { machine.runWorkload(workload) } + + assertEquals(2000, clock.millis()) + } + + @Test + fun testStartAndStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk() + every { workloadA.onStart(any()) } throws IllegalStateException() + every { workloadA.onStop(any()) } throws IllegalStateException() + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA + ) + + val exc = assertThrows { machine.runWorkload(workload) } + + assertEquals(2, exc.cause!!.suppressedExceptions.size) + assertEquals(1000, clock.millis()) + } + + @Test + fun testShutdownAndStopFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk() + every { workloadA.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) } + every { workloadA.onStop(any()) } throws IllegalStateException() + + val workload = + SimWorkloads.chain( + SimRuntimeWorkload(1000, 1.0), + workloadA + ) + + val exc = assertThrows { machine.runWorkload(workload) } + + assertEquals(1, exc.cause!!.suppressedExceptions.size) + assertEquals(1000, clock.millis()) + } + + @Test + fun testShutdownAndStartFailure() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create( + graph, + machineModel + ) + + val workloadA = mockk(relaxUnitFun = true) + every { workloadA.onStart(any()) } answers { (it.invocation.args[0] as SimMachineContext).shutdown(IllegalStateException()) } + + val workloadB = mockk(relaxUnitFun = true) + every { workloadB.onStart(any()) } throws IllegalStateException() + val workload = SimWorkloads.chain( SimRuntimeWorkload(1000, 1.0), workloadA, - SimRuntimeWorkload(1000, 1.0) + workloadB ) - machine.runWorkload(workload) + val exc = assertThrows { machine.runWorkload(workload) } + assertEquals(1, exc.cause!!.suppressedExceptions.size) + assertEquals(1000, clock.millis()) + } + + @Test + fun testSnapshot() = runSimulation { + val engine = FlowEngine.create(coroutineContext, clock) + val graph = engine.newGraph() + + val machine = SimBareMetalMachine.create(graph, machineModel) + val workload = + SimWorkloads.chain( + SimWorkloads.runtime(1000, 1.0), + SimWorkloads.runtime(1000, 1.0) + ) + + val job = launch { machine.runWorkload(workload) } + delay(500L) + val snapshot = workload.snapshot() + + job.join() assertEquals(2000, clock.millis()) + + machine.runWorkload(snapshot) + + assertEquals(3500, clock.millis()) } } diff --git a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java index 4d0980431..ed5579eaf 100644 --- a/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java +++ b/opendc-simulator/opendc-simulator-flow/src/main/java/org/opendc/simulator/flow2/FlowStage.java @@ -151,6 +151,15 @@ public void invalidate() { } } + /** + * Synchronously update the {@link FlowStage} at the current timestamp. + */ + public void sync() { + this.flags |= STAGE_INVALIDATE; + onUpdate(clock.millis()); + engine.scheduleDelayed(this); + } + /** * Close the {@link FlowStage} and disconnect all inlets and outlets. */