Skip to content

Commit

Permalink
feat(db): keep leveldb and rocksdb same behaviors for db operations
Browse files Browse the repository at this point in the history
  • Loading branch information
halibobo1205 committed Jan 3, 2025
1 parent f16c3f8 commit 6f3cbca
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.fusesource.leveldbjni.JniDBFactory.factory;

import com.google.common.collect.Sets;
import com.google.common.primitives.Bytes;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -30,18 +31,14 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import com.google.common.primitives.Bytes;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.iq80.leveldb.CompressionType;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Logger;
Expand All @@ -54,6 +51,7 @@
import org.tron.common.storage.WriteOptionsWrapper;
import org.tron.common.storage.metric.DbStat;
import org.tron.common.utils.FileUtil;
import org.tron.common.utils.PropUtil;
import org.tron.common.utils.StorageUtils;
import org.tron.core.db.common.DbSourceInter;
import org.tron.core.db.common.iterator.StoreIterator;
Expand All @@ -74,6 +72,7 @@ public class LevelDbDataSourceImpl extends DbStat implements DbSourceInter<byte[
private ReadWriteLock resetDbLock = new ReentrantReadWriteLock();
private static final String LEVELDB = "LEVELDB";
private static final org.slf4j.Logger innerLogger = LoggerFactory.getLogger(LEVELDB);
private static final String KEY_ENGINE = "ENGINE";
private Logger leveldbLogger = new Logger() {
@Override
public void log(String message) {
Expand Down Expand Up @@ -109,6 +108,10 @@ public LevelDbDataSourceImpl(String parentPath, String dataBaseName) {

@Override
public void initDB() {
if (!checkOrInitEngine()) {
throw new RuntimeException(
String.format("failed to check database: %s, engine do not match", dataBaseName));
}
resetDbLock.writeLock().lock();
try {
logger.debug("Init DB: {}.", dataBaseName);
Expand All @@ -134,6 +137,28 @@ public void initDB() {
}
}

private boolean checkOrInitEngine() {
String dir = getDbPath().toString();
String enginePath = dir + File.separator + "engine.properties";

if (FileUtil.createDirIfNotExists(dir)) {
if (!FileUtil.createFileIfNotExists(enginePath)) {
return false;
}
} else {
return false;
}

// for the first init engine
String engine = PropUtil.readProperty(enginePath, KEY_ENGINE);
if (engine.isEmpty() && !PropUtil.writeProperty(enginePath, KEY_ENGINE, LEVELDB)) {
return false;
}
engine = PropUtil.readProperty(enginePath, KEY_ENGINE);

return LEVELDB.equals(engine);
}

private void openDatabase(Options dbOptions) throws IOException {
final Path dbPath = getDbPath();
if (dbPath == null || dbPath.getParent() == null) {
Expand Down Expand Up @@ -361,6 +386,7 @@ public Map<WrappedByteArray, byte[]> prefixQuery(byte[] key) {
}
}

@Deprecated
@Override
public long getTotal() throws RuntimeException {
resetDbLock.readLock().lock();
Expand All @@ -377,13 +403,6 @@ public long getTotal() throws RuntimeException {
}
}

private void updateByBatchInner(Map<byte[], byte[]> rows) throws Exception {
try (WriteBatch batch = database.createWriteBatch()) {
innerBatchUpdate(rows,batch);
database.write(batch, writeOptions);
}
}

private void updateByBatchInner(Map<byte[], byte[]> rows, WriteOptions options) throws Exception {
try (WriteBatch batch = database.createWriteBatch()) {
innerBatchUpdate(rows,batch);
Expand All @@ -403,30 +422,23 @@ private void innerBatchUpdate(Map<byte[], byte[]> rows, WriteBatch batch) {

@Override
public void updateByBatch(Map<byte[], byte[]> rows, WriteOptionsWrapper options) {
resetDbLock.readLock().lock();
try {
updateByBatchInner(rows, options.level);
} catch (Exception e) {
try {
updateByBatchInner(rows, options.level);
} catch (Exception e1) {
throw new RuntimeException(e);
}
} finally {
resetDbLock.readLock().unlock();
}
this.updateByBatch(rows, options.level);
}

@Override
public void updateByBatch(Map<byte[], byte[]> rows) {
this.updateByBatch(rows, writeOptions);
}

private void updateByBatch(Map<byte[], byte[]> rows, WriteOptions options) {
resetDbLock.readLock().lock();
try {
updateByBatchInner(rows);
updateByBatchInner(rows, options);
} catch (Exception e) {
try {
updateByBatchInner(rows);
updateByBatchInner(rows, options);
} catch (Exception e1) {
throw new RuntimeException(e);
throw new RuntimeException(e1);
}
} finally {
resetDbLock.readLock().unlock();
Expand Down
Loading

0 comments on commit 6f3cbca

Please sign in to comment.