diff --git a/test/src/test/java/io/seata/common/LockAndCallback.java b/test/src/test/java/io/seata/common/LockAndCallback.java index 9aedc77cb45..e01f30c979d 100644 --- a/test/src/test/java/io/seata/common/LockAndCallback.java +++ b/test/src/test/java/io/seata/common/LockAndCallback.java @@ -20,53 +20,75 @@ import io.seata.saga.statelang.domain.ExecutionStatus; import io.seata.saga.statelang.domain.StateMachineInstance; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + /** * @author wang.liang */ public class LockAndCallback { - private final Object lock; + private final Lock lock; + private final Condition notFinished; private final AsyncCallback callback; - + private final static long DEFAULT_TIMEOUT = 60000; private String result; public LockAndCallback() { - lock = new Object(); + lock = new ReentrantLock(); + notFinished = lock.newCondition(); callback = new AsyncCallback() { @Override public void onFinished(ProcessContext context, StateMachineInstance stateMachineInstance) { result = "onFinished"; - synchronized (lock) { - lock.notifyAll(); + try { + lock.lock(); + notFinished.signal(); + } finally { + lock.unlock(); } } @Override public void onError(ProcessContext context, StateMachineInstance stateMachineInstance, Exception exp) { result = "onError"; - synchronized (lock) { - lock.notifyAll(); + try { + lock.lock(); + notFinished.signal(); + } finally { + lock.unlock(); } } }; } + public void waitingForFinish(StateMachineInstance inst) { + waitingForFinish(inst, DEFAULT_TIMEOUT); + } - public void waittingForFinish(StateMachineInstance inst) { - synchronized (lock) { - if (ExecutionStatus.RU.equals(inst.getStatus())) { - long start = System.nanoTime(); - try { - lock.wait(30000); + public void waitingForFinish(StateMachineInstance inst, long timeout) { + if (ExecutionStatus.RU.equals(inst.getStatus())) { + long start = System.nanoTime(); + try { + lock.lock(); + boolean finished = notFinished.await(timeout, TimeUnit.MILLISECONDS); + if (finished) { System.out.printf("finish wait ====== XID: %s, status: %s, compensationStatus: %s, cost: %d ms, result: %s\r\n", inst.getId(), inst.getStatus(), inst.getCompensationStatus(), (System.nanoTime() - start) / 1000_000, result); - } catch (Exception e) { - System.out.printf("error wait ====== XID: %s, status: %s, compensationStatus: %s, cost: %d ms, result: %s, error: %s\r\n", - inst.getId(), inst.getStatus(), inst.getCompensationStatus(), (System.nanoTime() - start) / 1000_000, result, e.getMessage()); - throw new RuntimeException("waittingForFinish failed", e); + } else { + System.out.printf("timeout wait ====== XID: %s, status: %s, compensationStatus: %s, cost: %d ms, result: %s\r\n", + inst.getId(), inst.getStatus(), inst.getCompensationStatus(), (System.nanoTime() - start) / 1000_000, result); } - } else { - System.out.printf("do not wait ====== XID: %s, status: %s, compensationStatus: %s, result: %s\r\n", - inst.getId(), inst.getStatus(), inst.getCompensationStatus(), result); + } catch (Exception e) { + System.out.printf("error wait ====== XID: %s, status: %s, compensationStatus: %s, cost: %d ms, result: %s, error: %s\r\n", + inst.getId(), inst.getStatus(), inst.getCompensationStatus(), (System.nanoTime() - start) / 1000_000, result, e.getMessage()); + throw new RuntimeException("waitingForFinish failed", e); + } finally { + lock.unlock(); } + } else { + System.out.printf("do not wait ====== XID: %s, status: %s, compensationStatus: %s, result: %s\r\n", + inst.getId(), inst.getStatus(), inst.getCompensationStatus(), result); } } diff --git a/test/src/test/java/io/seata/saga/engine/StateMachineAsyncTests.java b/test/src/test/java/io/seata/saga/engine/StateMachineAsyncTests.java index 2fb6e49f659..ab18db41480 100644 --- a/test/src/test/java/io/seata/saga/engine/StateMachineAsyncTests.java +++ b/test/src/test/java/io/seata/saga/engine/StateMachineAsyncTests.java @@ -56,7 +56,7 @@ public void testSimpleCatchesStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus()); @@ -107,7 +107,7 @@ public void testSimpleRetryStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus()); @@ -125,7 +125,7 @@ public void testStatusMatchingStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); @@ -144,7 +144,7 @@ public void testCompensationStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); // FIXME: some times, the compensationStatus is RU @@ -163,7 +163,7 @@ public void testCompensationAndSubStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); }); @@ -180,7 +180,7 @@ public void testCompensationAndSubStateMachineWithLayout() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); }); @@ -200,7 +200,7 @@ public void testStateMachineWithComplexParams() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); People peopleResult = (People)inst.getEndParams().get("complexParameterMethodResult"); Assertions.assertNotNull(peopleResult); @@ -219,7 +219,7 @@ public void testSimpleStateMachineWithAsyncState() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.SU, inst.getStatus()); }); diff --git a/test/src/test/java/io/seata/saga/engine/db/StateMachineDBTests.java b/test/src/test/java/io/seata/saga/engine/db/StateMachineDBTests.java index 9cdc0c96521..06df1da454e 100644 --- a/test/src/test/java/io/seata/saga/engine/db/StateMachineDBTests.java +++ b/test/src/test/java/io/seata/saga/engine/db/StateMachineDBTests.java @@ -383,7 +383,7 @@ public void testSimpleCatchesStateMachineAsync() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus()); @@ -401,7 +401,7 @@ public void testSimpleRetryStateMachineAsync() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus()); @@ -419,7 +419,7 @@ public void testStatusMatchingStateMachineAsync() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); @@ -441,7 +441,7 @@ public void testCompensationStateMachineAsync() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); Assertions.assertEquals(ExecutionStatus.SU, inst.getCompensationStatus()); @@ -522,7 +522,7 @@ public void testCompensationAndSubStateMachineAsync() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); @@ -544,7 +544,7 @@ public void testCompensationAndSubStateMachineAsyncWithLayout() throws Exception LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); @@ -564,7 +564,7 @@ public void testAsyncStartSimpleStateMachineWithAsyncState() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.SU, inst.getStatus()); }); @@ -957,7 +957,7 @@ private void doTestStateMachineTransTimeoutAsync(Map paramMap, i SagaCostPrint.executeAndPrint("3-37-" + i, () -> { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); GlobalTransaction globalTransaction = getGlobalTransaction(inst); Assertions.assertNotNull(globalTransaction); @@ -1119,7 +1119,7 @@ private void doTestStateMachineCustomRecoverStrategyOnTimeoutAsync(Map { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); GlobalTransaction globalTransaction = getGlobalTransaction(inst); Assertions.assertNotNull(globalTransaction); diff --git a/test/src/test/java/io/seata/saga/engine/db/mockserver/StateMachineAsyncDBMockServerTests.java b/test/src/test/java/io/seata/saga/engine/db/mockserver/StateMachineAsyncDBMockServerTests.java index 4c0683b8cec..b9f1ee094e1 100644 --- a/test/src/test/java/io/seata/saga/engine/db/mockserver/StateMachineAsyncDBMockServerTests.java +++ b/test/src/test/java/io/seata/saga/engine/db/mockserver/StateMachineAsyncDBMockServerTests.java @@ -57,7 +57,7 @@ public void testSimpleCatchesStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus()); @@ -75,7 +75,7 @@ public void testSimpleRetryStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.FA, inst.getStatus()); @@ -93,7 +93,7 @@ public void testStatusMatchingStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertNotNull(inst.getException()); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); @@ -111,7 +111,7 @@ public void testCompensationStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); Assertions.assertEquals(ExecutionStatus.SU, inst.getCompensationStatus()); @@ -129,7 +129,7 @@ public void testCompensationAndSubStateMachine() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); }); @@ -146,7 +146,7 @@ public void testCompensationAndSubStateMachineWithLayout() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.UN, inst.getStatus()); }); @@ -166,7 +166,7 @@ public void testStateMachineWithComplexParams() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); People peopleResult = (People)inst.getEndParams().get("complexParameterMethodResult"); Assertions.assertNotNull(peopleResult); @@ -186,7 +186,7 @@ public void testSimpleStateMachineWithAsyncState() throws Exception { LockAndCallback lockAndCallback = new LockAndCallback(); StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, lockAndCallback.getCallback()); - lockAndCallback.waittingForFinish(inst); + lockAndCallback.waitingForFinish(inst); Assertions.assertEquals(ExecutionStatus.SU, inst.getStatus()); });