Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speeds up fate lock acquisition #5262

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
import java.util.Formatter;
Expand Down Expand Up @@ -285,19 +286,19 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath,
List<String> lockedIds = zr.getChildren(lockPath.toString());

for (String id : lockedIds) {

try {

FateLockPath fLockPath = FateLock.path(lockPath + "/" + id);
List<String> lockNodes =
FateLock.validateAndSort(fLockPath, zr.getChildren(fLockPath.toString()));
List<FateLock.FateLockNode> lockNodes =
FateLock.validateAndWarn(fLockPath, zr.getChildren(fLockPath.toString()));

lockNodes.sort(Comparator.comparingLong(ln -> ln.sequence));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the return type be a sorted set instead of a list that you sort later?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to a sorted set in 3f25ce0


int pos = 0;
boolean sawWriteLock = false;

for (String node : lockNodes) {
for (FateLock.FateLockNode node : lockNodes) {
try {
byte[] data = zr.getData(lockPath + "/" + id + "/" + node);
byte[] data = node.lockData.getBytes(UTF_8);
// Example data: "READ:<FateId>". FateId contains ':' hence the limit of 2
String[] lda = new String(data, UTF_8).split(":", 2);
FateId fateId = FateId.from(lda[1]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.function.BiPredicate;

import org.apache.accumulo.core.util.UtilWaitThread;
import org.slf4j.Logger;
Expand Down Expand Up @@ -98,9 +99,9 @@ public byte[] getLockData() {
// them,
// a writer only runs when they are at the top of the queue.
public interface QueueLock {
SortedMap<Long,byte[]> getEarlierEntries(long entry);
SortedMap<Long,byte[]> getEntries(BiPredicate<Long,byte[]> predicate);

void removeEntry(long entry);
void removeEntry(byte[] data, long entry);

long addEntry(byte[] data);
}
Expand Down Expand Up @@ -164,7 +165,8 @@ public boolean tryLock() {
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);

SortedMap<Long,byte[]> entries = qlock.getEntries((seq, lockData) -> seq <= entry);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
ParsedLock parsed = new ParsedLock(entry.getValue());
if (entry.getKey().equals(this.entry)) {
Expand Down Expand Up @@ -200,7 +202,7 @@ public void unlock() {
}
log.debug("Removing lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
qlock.removeEntry(entry);
qlock.removeEntry(new ParsedLock(this.getType(), this.userData).getLockData(), entry);
entry = -1;
}

Expand Down Expand Up @@ -232,7 +234,7 @@ public boolean tryLock() {
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), getType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
SortedMap<Long,byte[]> entries = qlock.getEntries((seq, locData) -> seq <= entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
Expand All @@ -251,19 +253,28 @@ public DistributedReadWriteLock(QueueLock qlock, byte[] data) {
}

public static DistributedLock recoverLock(QueueLock qlock, byte[] data) {
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(Long.MAX_VALUE);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
ParsedLock parsed = new ParsedLock(entry.getValue());
if (Arrays.equals(data, parsed.getUserData())) {
SortedMap<Long,byte[]> entries = qlock.getEntries((seq, lockData) -> {
ParsedLock parsed = new ParsedLock(lockData);
return Arrays.equals(data, parsed.getUserData());
});

switch (entries.size()) {
case 0:
return null;
case 1:
var entry = entries.entrySet().iterator().next();
ParsedLock parsed = new ParsedLock(entry.getValue());
switch (parsed.getType()) {
case READ:
return new ReadLock(qlock, parsed.getUserData(), entry.getKey());
case WRITE:
return new WriteLock(qlock, parsed.getUserData(), entry.getKey());
default:
throw new IllegalStateException("Uknown lock type " + parsed.getType());
keith-turner marked this conversation as resolved.
Show resolved Hide resolved
}
}
default:
throw new IllegalStateException("Found more than one lock node " + entries);
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
*/
package org.apache.accumulo.core.fate.zookeeper;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BiPredicate;

import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
Expand All @@ -35,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
* A persistent lock mechanism in ZooKeeper used for locking tables during FaTE operations.
*/
Expand Down Expand Up @@ -68,16 +72,34 @@ public FateLock(ZooReaderWriter zrw, FateLockPath path) {
this.path = requireNonNull(path);
}

public static class FateLockNode {
public final long sequence;
public final String lockData;

private FateLockNode(String nodeName) {
int len = nodeName.length();
Preconditions.checkArgument(nodeName.startsWith(PREFIX) && nodeName.charAt(len - 11) == '#',
"Illegal node name %s", nodeName);
sequence = Long.parseLong(nodeName.substring(len - 10));
lockData = nodeName.substring(PREFIX.length(), len - 11);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, while this is probably unlikely to happen, the sequential numbering can overflow causing a negative to occur, which I think would cause a problem with this parsing instead of splitting on #.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added handing and test for negative seq numbers in 2309b9b

}
}

// TODO change data arg from byte[] to String.. in the rest of the code its always a String.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will open a follow on issue for this and remove the TODO before merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the TODO and #5264. In the TODO I was thinking of using String, but realized using more concrete types would be better.

@Override
public long addEntry(byte[] data) {

String dataString = new String(data, UTF_8);
Preconditions.checkState(!dataString.contains("#"));

String newPath;
try {
while (true) {
try {
newPath = zoo.putPersistentSequential(path + "/" + PREFIX, data);
newPath = zoo.putPersistentSequential(path + "/" + PREFIX + dataString + "#", data);
keith-turner marked this conversation as resolved.
Show resolved Hide resolved
String[] parts = newPath.split("/");
String last = parts[parts.length - 1];
return Long.parseLong(last.substring(PREFIX.length()));
return new FateLockNode(last).sequence;
} catch (NoNodeException nne) {
// the parent does not exist so try to create it
zoo.putPersistentData(path.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
Expand All @@ -89,7 +111,7 @@ public long addEntry(byte[] data) {
}

@Override
public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
public SortedMap<Long,byte[]> getEntries(BiPredicate<Long,byte[]> predicate) {
SortedMap<Long,byte[]> result = new TreeMap<>();
try {
List<String> children = Collections.emptyList();
Expand All @@ -101,15 +123,10 @@ public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
}

for (String name : children) {
// this try catch must be done inside the loop because some subset of the children may exist
try {
long order = Long.parseLong(name.substring(PREFIX.length()));
if (order <= entry) {
byte[] data = zoo.getData(path + "/" + name);
result.put(order, data);
}
} catch (KeeperException.NoNodeException ex) {
// ignored
var parsed = new FateLockNode(name);
byte[] data = parsed.lockData.getBytes(UTF_8);
if (predicate.test(parsed.sequence, data)) {
result.put(parsed.sequence, data);
}
}
} catch (KeeperException | InterruptedException ex) {
Expand All @@ -119,9 +136,12 @@ public SortedMap<Long,byte[]> getEarlierEntries(long entry) {
}

@Override
public void removeEntry(long entry) {
public void removeEntry(byte[] data, long entry) {
String dataString = new String(data, UTF_8);
Preconditions.checkState(!dataString.contains("#"));
try {
zoo.recursiveDelete(path + String.format("/%s%010d", PREFIX, entry), NodeMissingPolicy.SKIP);
zoo.recursiveDelete(path + String.format("/%s%s#%010d", PREFIX, dataString, entry),
NodeMissingPolicy.SKIP);
try {
// try to delete the parent if it has no children
zoo.delete(path.toString());
Expand All @@ -136,50 +156,25 @@ public void removeEntry(long entry) {
/**
* Validate and sort child nodes at this lock path by the lock prefix
*/
public static List<String> validateAndSort(FateLockPath path, List<String> children) {
public static List<FateLockNode> validateAndWarn(FateLockPath path, List<String> children) {
log.trace("validating and sorting children at path {}", path);
List<String> validChildren = new ArrayList<>();

List<FateLockNode> validChildren = new ArrayList<>();

if (children == null || children.isEmpty()) {
return validChildren;
}

children.forEach(c -> {
log.trace("Validating {}", c);
if (c.startsWith(PREFIX)) {
int idx = c.indexOf('#');
String sequenceNum = c.substring(idx + 1);
if (sequenceNum.length() == 10) {
try {
log.trace("Testing number format of {}", sequenceNum);
Integer.parseInt(sequenceNum);
validChildren.add(c);
} catch (NumberFormatException e) {
log.warn("Fate lock found with invalid sequence number format: {} (not a number)", c);
}
} else {
log.warn("Fate lock found with invalid sequence number format: {} (not 10 characters)",
c);
}
} else {
log.warn("Fate lock found with invalid lock format: {} (does not start with {})", c,
PREFIX);
try {
var fateLockNode = new FateLockNode(c);
validChildren.add(fateLockNode);
} catch (RuntimeException e) {
log.warn("Illegal fate lock node {}", c, e);
}
});

if (validChildren.size() > 1) {
validChildren.sort((o1, o2) -> {
// Lock should be of the form:
// lock-sequenceNumber
// Example:
// flock#0000000000

// Lock length - sequenceNumber length
// 16 - 10
int secondHashIdx = 6;
return Integer.valueOf(o1.substring(secondHashIdx))
.compareTo(Integer.valueOf(o2.substring(secondHashIdx)));
});
}
log.trace("Children nodes (size: {}): {}", validChildren.size(), validChildren);
return validChildren;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.function.BiPredicate;

import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.QueueLock;
import org.junit.jupiter.api.Test;
Expand All @@ -40,14 +41,18 @@ public static class MockQueueLock implements QueueLock {
final SortedMap<Long,byte[]> locks = new TreeMap<>();

@Override
public synchronized SortedMap<Long,byte[]> getEarlierEntries(long entry) {
public synchronized SortedMap<Long,byte[]> getEntries(BiPredicate<Long,byte[]> predicate) {
SortedMap<Long,byte[]> result = new TreeMap<>();
result.putAll(locks.headMap(entry + 1));
locks.forEach((seq, lockData) -> {
if (predicate.test(seq, lockData)) {
result.put(seq, lockData);
}
});
return result;
}

@Override
public synchronized void removeEntry(long entry) {
public synchronized void removeEntry(byte[] data, long entry) {
synchronized (locks) {
locks.remove(entry);
locks.notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,20 @@ public static String row(int r) {
return String.format("r:%04d", r);
}

public static void compact(final AccumuloClient client, String table1, int modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
public static void addCompactionIterators(CompactionConfig config, int modulus,
String expectedQueue) {
IteratorSetting iterSetting = new IteratorSetting(100, TestFilter.class);
// make sure iterator options make it to compactor process
iterSetting.addOption("expectedQ", expectedQueue);
iterSetting.addOption("modulus", modulus + "");
CompactionConfig config =
new CompactionConfig().setIterators(List.of(iterSetting)).setWait(wait);
config.setIterators(List.of(iterSetting));
}

public static void compact(final AccumuloClient client, String table1, int modulus,
String expectedQueue, boolean wait)
throws AccumuloSecurityException, TableNotFoundException, AccumuloException {
CompactionConfig config = new CompactionConfig().setWait(wait);
addCompactionIterators(config, modulus, expectedQueue);
client.tableOperations().compact(table1, config);
}

Expand Down
Loading
Loading