Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes bug with FateInterleavingIT.testInterleaving #5214

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@

import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
Expand Down Expand Up @@ -54,7 +57,6 @@
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateTestRunner.TestEnv;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -63,13 +65,13 @@

import com.google.common.collect.Iterators;

public abstract class FateInterleavingIT extends SharedMiniClusterBase
implements FateTestRunner<FateInterleavingIT.FilTestEnv> {
public abstract class FateExecutionOrderIT extends SharedMiniClusterBase
implements FateTestRunner<FateExecutionOrderIT.FeoTestEnv> {

public static class FilTestEnv extends TestEnv {
public static class FeoTestEnv extends TestEnv {
private final AccumuloClient client;

public FilTestEnv(AccumuloClient client) {
public FeoTestEnv(AccumuloClient client) {
this.client = client;
}

Expand All @@ -78,19 +80,19 @@ AccumuloClient getClient() {
}
}

public static class FirstOp implements Repo<FateInterleavingIT.FilTestEnv> {
public static class FirstOp implements Repo<FateExecutionOrderIT.FeoTestEnv> {

private static final long serialVersionUID = 1L;

protected boolean isTrackingDataSet(FateId tid, FilTestEnv env, String step) throws Exception {
protected boolean isTrackingDataSet(FateId tid, FeoTestEnv env, String step) throws Exception {
try (Scanner scanner = env.getClient().createScanner(FATE_TRACKING_TABLE)) {
return scanner.stream()
.anyMatch(e -> e.getKey().getColumnFamily().toString().equals(tid.canonical())
&& e.getValue().toString().equals(step));
}
}

protected static void insertTrackingData(FateId tid, FilTestEnv env, String step)
protected static void insertTrackingData(FateId tid, FeoTestEnv env, String step)
throws TableNotFoundException, MutationsRejectedException {
try (BatchWriter bw = env.getClient().createBatchWriter(FATE_TRACKING_TABLE)) {
Mutation mut = new Mutation(Long.toString(System.currentTimeMillis()));
Expand All @@ -100,13 +102,18 @@ protected static void insertTrackingData(FateId tid, FilTestEnv env, String step
}

@Override
public long isReady(FateId tid, FilTestEnv env) throws Exception {
public long isReady(FateId tid, FeoTestEnv env) throws Exception {
// First call to isReady will return that it's not ready (defer time of 100ms), inserting
// the data 'isReady1' so we know isReady was called once. The second attempt (after the
// deferral time) will pass as ready (return 0) and insert the data 'isReady2' so we know
// the second call to isReady was made
Thread.sleep(50);
var step = this.getName() + "::isReady";
if (isTrackingDataSet(tid, env, step)) {
if (isTrackingDataSet(tid, env, step + "1")) {
insertTrackingData(tid, env, step + "2");
return 0;
} else {
insertTrackingData(tid, env, step);
insertTrackingData(tid, env, step + "1");
return 100;
}
}
Expand All @@ -117,14 +124,14 @@ public String getName() {
}

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv env) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv env) throws Exception {
Thread.sleep(50);
insertTrackingData(tid, env, this.getName() + "::call");
return new SecondOp();
}

@Override
public void undo(FateId fateId, FilTestEnv environment) throws Exception {
public void undo(FateId fateId, FeoTestEnv environment) throws Exception {
throw new UnsupportedOperationException();
}

Expand All @@ -138,7 +145,7 @@ public static class SecondOp extends FirstOp {
private static final long serialVersionUID = 1L;

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception {
super.call(tid, environment);
return new LastOp();
}
Expand All @@ -149,7 +156,7 @@ public static class LastOp extends FirstOp {
private static final long serialVersionUID = 1L;

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception {
super.call(tid, environment);
return null;
}
Expand Down Expand Up @@ -179,38 +186,42 @@ public void before() throws Exception {
}
}

private void waitFor(FateStore<FilTestEnv> store, FateId txid) throws Exception {
private void waitFor(FateStore<FeoTestEnv> store, FateId txid) throws Exception {
while (store.read(txid).getStatus() != SUCCESSFUL) {
Thread.sleep(50);
}
}

protected Fate<FilTestEnv> initializeFate(AccumuloClient client, FateStore<FilTestEnv> store) {
protected Fate<FeoTestEnv> initializeFate(AccumuloClient client, FateStore<FeoTestEnv> store) {
ConfigurationCopy config = new ConfigurationCopy();
config.set(Property.GENERAL_THREADPOOL_SIZE, "2");
config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1");
return new Fate<>(new FilTestEnv(client), store, false, r -> r + "", config);
return new Fate<>(new FeoTestEnv(client), store, false, r -> r + "", config);
}

private static Entry<String,String> toIdStep(Entry<Key,Value> e) {
return new AbstractMap.SimpleImmutableEntry<>(e.getKey().getColumnFamily().toString(),
e.getValue().toString());
private static Entry<FateId,String> toIdStep(Entry<Key,Value> e) {
return new AbstractMap.SimpleImmutableEntry<>(
FateId.from(e.getKey().getColumnFamily().toString()), e.getValue().toString());
}

@Test
public void testInterleaving() throws Exception {
executeTest(this::testInterleaving);
}

protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
protected void testInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx)
throws Exception {

// This test verifies that fates will interleave in time when their isReady() returns >0 and
// then 0.
// This test verifies that FATE will interleave at least once between fate operations when
// their isReady() returns > 0. Interleaving is not guaranteed, so we just check for one
// occurrence which is highly unlikely to fail unless something is broken with FATE.
// This test also ensures that the expected order of operations occurs per fate op.
// Interleaving should have no effect on this.

FateId[] fateIds = new FateId[3];
final int numFateIds = 3;
FateId[] fateIds = new FateId[numFateIds];

for (int i = 0; i < 3; i++) {
for (int i = 0; i < numFateIds; i++) {
fateIds[i] = store.create();
var txStore = store.reserve(fateIds[i]);
try {
Expand All @@ -222,7 +233,7 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
}
}

Fate<FilTestEnv> fate = null;
Fate<FeoTestEnv> fate = null;

// The execution order of the transactions is not according to their insertion
// order. However, we do know that the first step of each transaction will be
Expand All @@ -235,38 +246,48 @@ protected void testInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
waitFor(store, fateId);
}

var expectedIds =
Set.of(fateIds[0].canonical(), fateIds[1].canonical(), fateIds[2].canonical());

Scanner scanner = client.createScanner(FATE_TRACKING_TABLE);
Iterator<Entry<String,String>> iter = scanner.stream().map(FateInterleavingIT::toIdStep)
.filter(e -> e.getValue().contains("::call")).iterator();

SortedMap<String,String> subset = new TreeMap<>();

Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue()));

// Should see the call() for the first steps of all three fates come first in time
assertTrue(subset.values().stream().allMatch(v -> v.startsWith("FirstOp")));
assertEquals(expectedIds, subset.keySet());

subset.clear();

Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue()));

// Should see the call() for the second steps of all three fates come second in time
assertTrue(subset.values().stream().allMatch(v -> v.startsWith("SecondOp")));
assertEquals(expectedIds, subset.keySet());

subset.clear();

Iterators.limit(iter, 3).forEachRemaining(e -> subset.put(e.getKey(), e.getValue()));

// Should see the call() for the last steps of all three fates come last in time
assertTrue(subset.values().stream().allMatch(v -> v.startsWith("LastOp")));
assertEquals(expectedIds, subset.keySet());
var iter = scanner.stream().map(FateExecutionOrderIT::toIdStep).iterator();

// we should see the following execution order for all fate ids:
// FirstOp::isReady1, FirstOp::isReady2, FirstOp::call,
// SecondOp::isReady1, SecondOp::isReady2, SecondOp::call,
// LastOp::isReady1, LastOp::isReady2, LastOp::call
// the first isReady of each op will defer the op to be executed later, allowing for the FATE
// thread to interleave and work on another fate id, but may not always interleave.
// It is unlikely that the FATE will not interleave at least once in a run, so we will check
// for at least one occurrence.
int interleaves = 0;
int i = 0;
Map.Entry<FateId,String> prevOp = null;
var expRunOrder = List.of("FirstOp::isReady1", "FirstOp::isReady2", "FirstOp::call",
"SecondOp::isReady1", "SecondOp::isReady2", "SecondOp::call", "LastOp::isReady1",
"LastOp::isReady2", "LastOp::call");
var fateIdsToExpRunOrder = Map.of(fateIds[0], new ArrayList<>(expRunOrder), fateIds[1],
new ArrayList<>(expRunOrder), fateIds[2], new ArrayList<>(expRunOrder));

while (iter.hasNext()) {
var currOp = iter.next();
FateId fateId = currOp.getKey();
String currStep = currOp.getValue();
var expRunOrderFateId = fateIdsToExpRunOrder.get(fateId);

boolean passedFirstStep = !currStep.equals(expRunOrder.get(0));
boolean prevFateIdDiffered = prevOp != null && !prevOp.getKey().equals(fateId);
if (passedFirstStep && prevFateIdDiffered) {
interleaves++;
}
assertEquals(currStep, expRunOrderFateId.remove(0));
prevOp = currOp;
i++;
}

assertFalse(iter.hasNext());
assertTrue(interleaves > 0);
assertEquals(i, expRunOrder.size() * numFateIds);
assertEquals(numFateIds, fateIdsToExpRunOrder.size());
for (var expRunOrderFateId : fateIdsToExpRunOrder.values()) {
assertTrue(expRunOrderFateId.isEmpty());
}

} finally {
if (fate != null) {
Expand All @@ -280,14 +301,14 @@ public static class FirstNonInterleavingOp extends FirstOp {
private static final long serialVersionUID = 1L;

@Override
public long isReady(FateId tid, FilTestEnv env) throws Exception {
public long isReady(FateId tid, FeoTestEnv env) throws Exception {
Thread.sleep(50);
insertTrackingData(tid, env, this.getName() + "::isReady");
return 0;
}

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv manager) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv manager) throws Exception {
Thread.sleep(50);
insertTrackingData(tid, manager, this.getName() + "::call");
return new SecondNonInterleavingOp();
Expand All @@ -299,7 +320,7 @@ public static class SecondNonInterleavingOp extends FirstNonInterleavingOp {
private static final long serialVersionUID = 1L;

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception {
super.call(tid, environment);
return new LastNonInterleavingOp();
}
Expand All @@ -311,7 +332,7 @@ public static class LastNonInterleavingOp extends FirstNonInterleavingOp {
private static final long serialVersionUID = 1L;

@Override
public Repo<FilTestEnv> call(FateId tid, FilTestEnv environment) throws Exception {
public Repo<FeoTestEnv> call(FateId tid, FeoTestEnv environment) throws Exception {
super.call(tid, environment);
return null;
}
Expand All @@ -323,15 +344,16 @@ public void testNonInterleaving() throws Exception {
executeTest(this::testNonInterleaving);
}

protected void testNonInterleaving(FateStore<FilTestEnv> store, ServerContext sctx)
protected void testNonInterleaving(FateStore<FeoTestEnv> store, ServerContext sctx)
throws Exception {

// This test ensures that when isReady() always returns zero that all the fate steps will
// execute immediately

FateId[] fateIds = new FateId[3];
final int numFateIds = 3;
FateId[] fateIds = new FateId[numFateIds];

for (int i = 0; i < 3; i++) {
for (int i = 0; i < numFateIds; i++) {
fateIds[i] = store.create();
var txStore = store.reserve(fateIds[i]);
try {
Expand All @@ -343,7 +365,7 @@ protected void testNonInterleaving(FateStore<FilTestEnv> store, ServerContext sc
}
}

Fate<FilTestEnv> fate = null;
Fate<FeoTestEnv> fate = null;

// The execution order of the transactions is not according to their insertion
// order. In this case, without interleaving, a transaction will run start to finish
Expand Down Expand Up @@ -386,10 +408,11 @@ private FateId verifySameIds(Iterator<Entry<Key,Value>> iter, SortedMap<Key,Valu
Text fateId = subset.keySet().iterator().next().getColumnFamily();
assertTrue(subset.keySet().stream().allMatch(k -> k.getColumnFamily().equals(fateId)));

var expectedVals = Set.of("FirstNonInterleavingOp::isReady", "FirstNonInterleavingOp::call",
// list is used to ensure correct operations and correct order of operations
var expectedVals = List.of("FirstNonInterleavingOp::isReady", "FirstNonInterleavingOp::call",
"SecondNonInterleavingOp::isReady", "SecondNonInterleavingOp::call",
"LastNonInterleavingOp::isReady", "LastNonInterleavingOp::call");
var actualVals = subset.values().stream().map(Value::toString).collect(Collectors.toSet());
var actualVals = subset.values().stream().map(Value::toString).collect(Collectors.toList());
assertEquals(expectedVals, actualVals);

return FateId.from(fateId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FateInterleavingIT;
import org.apache.accumulo.test.fate.FateExecutionOrderIT;

public class MetaFateInterleavingIT extends FateInterleavingIT {
public class MetaFateExecutionOrderIT extends FateExecutionOrderIT {

// put the fate data for the test in a different location than what accumulo is using
private static final InstanceId IID = InstanceId.of(UUID.randomUUID());
private static final String ZK_ROOT = ZooUtil.getRoot(IID);

@Override
public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred,
public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
ServerContext sctx = getCluster().getServerContext();
String path = ZK_ROOT + Constants.ZFATE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.test.fate.FateInterleavingIT;
import org.apache.accumulo.test.fate.FateExecutionOrderIT;

public class UserFateInterleavingIT extends FateInterleavingIT {
public class UserFateExecutionOrderIT extends FateExecutionOrderIT {
@Override
public void executeTest(FateTestExecutor<FilTestEnv> testMethod, int maxDeferred,
public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
var table = getUniqueNames(1)[0];
try (ClientContext client =
Expand Down
Loading