Skip to content

Commit

Permalink
Use TableDescriptor in InMemoryTable
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Dec 23, 2024
1 parent 8088322 commit 7ccfc12
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TableDescriptor;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.exception.EntityAlreadyExistsException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
Expand All @@ -20,33 +21,39 @@
import java.util.TreeMap;

final class InMemoryDataShard<T extends Entity<T>> {
private final Class<T> type;
private final TableDescriptor<T> tableDescriptor;
private final EntitySchema<T> schema;
private final TreeMap<Entity.Id<T>, InMemoryEntityLine> entityLines;
private final Map<Long, Set<Entity.Id<T>>> uncommited = new HashMap<>();

private InMemoryDataShard(
Class<T> type, EntitySchema<T> schema, TreeMap<Entity.Id<T>, InMemoryEntityLine> entityLines
TableDescriptor<T> tableDescriptor,
EntitySchema<T> schema,
TreeMap<Entity.Id<T>, InMemoryEntityLine> entityLines
) {
this.type = type;
this.tableDescriptor = tableDescriptor;
this.schema = schema;
this.entityLines = entityLines;
}

public InMemoryDataShard(Class<T> type) {
this(type, EntitySchema.of(type), createEmptyLines(type));
public InMemoryDataShard(TableDescriptor<T> tableDescriptor) {
this(
tableDescriptor,
EntitySchema.of(tableDescriptor.entityType()),
createEmptyLines(tableDescriptor.entityType())
);
}

private static <T extends Entity<T>> TreeMap<Entity.Id<T>, InMemoryEntityLine> createEmptyLines(Class<T> type) {
return new TreeMap<>(EntityIdSchema.getIdComparator(type));
}

public synchronized InMemoryDataShard<T> createSnapshot() {
TreeMap<Entity.Id<T>, InMemoryEntityLine> snapshotLines = createEmptyLines(type);
TreeMap<Entity.Id<T>, InMemoryEntityLine> snapshotLines = createEmptyLines(tableDescriptor.entityType());
for (Map.Entry<Entity.Id<T>, InMemoryEntityLine> entry : entityLines.entrySet()) {
snapshotLines.put(entry.getKey(), entry.getValue().createSnapshot());
}
return new InMemoryDataShard<>(type, schema, snapshotLines);
return new InMemoryDataShard<>(tableDescriptor, schema, snapshotLines);
}

public synchronized void commit(long txId, long version) {
Expand All @@ -60,14 +67,14 @@ public synchronized void commit(long txId, long version) {
}

public synchronized void checkLocks(long version, InMemoryTxLockWatcher watcher) {
for (Entity.Id<T> lockedId : watcher.getReadRows(type)) {
for (Entity.Id<T> lockedId : watcher.getReadRows(tableDescriptor)) {
InMemoryEntityLine entityLine = entityLines.get(lockedId);
if (entityLine != null && entityLine.hasYounger(version)) {
throw new OptimisticLockException("Row lock failed " + lockedId);
}
}

List<Range<Entity.Id<T>>> lockedRanges = watcher.getReadRanges(type);
List<Range<Entity.Id<T>>> lockedRanges = watcher.getReadRanges(tableDescriptor);
if (lockedRanges.isEmpty()) {
return;
}
Expand All @@ -79,7 +86,7 @@ public synchronized void checkLocks(long version, InMemoryTxLockWatcher watcher)

for (Range<Entity.Id<T>> lockedRange: lockedRanges) {
if (lockedRange.contains(entry.getKey())) {
throw new OptimisticLockException("Table lock failed " + type.getSimpleName());
throw new OptimisticLockException("Table lock failed " + tableDescriptor.toDebugString());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
import tech.ydb.yoj.repository.db.Repository;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.SchemaOperations;
import tech.ydb.yoj.repository.db.TableDescriptor;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.exception.DropTableException;

import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -37,36 +37,30 @@ public void loadSnapshot(String id) {
storage = snapshots.get(id).createSnapshot();
}

@Override
public Set<Class<? extends Entity<?>>> tables() {
return Set.copyOf(storage.tables());
}

@Override
public RepositoryTransaction startTransaction(TxOptions options) {
return new InMemoryRepositoryTransaction(options, this);
}

@Override
public <T extends Entity<T>> SchemaOperations<T> schema(Class<T> c) {
public <T extends Entity<T>> SchemaOperations<T> schema(TableDescriptor<T> tableDescriptor) {
return new SchemaOperations<T>() {
@Override
public void create() {
storage.createTable(c);
storage.createTable(tableDescriptor);
}

@Override
public void drop() {
if (!storage.dropTable(c)) {
throw new DropTableException(
String.format("Can't drop table %s: table doesn't exist", c.getSimpleName())
if (!storage.dropTable(tableDescriptor)) {
throw new DropTableException(String.format("Can't drop table %s: table doesn't exist",
tableDescriptor.toDebugString())
);
}
}

@Override
public boolean exists() {
return storage.containsTable(c);
return storage.containsTable(tableDescriptor);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TableDescriptor;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException;
Expand Down Expand Up @@ -59,6 +60,12 @@ public <T extends Entity<T>> Table<T> table(Class<T> c) {
return new InMemoryTable<>(getMemory(c));
}

@Override
public <T extends Entity<T>> Table<T> table(TableDescriptor<T> tableDescriptor) {
return new InMemoryTable<>(this, tableDescriptor);
}

@Deprecated // use other constructor of InMemoryTable
public final <T extends Entity<T>> InMemoryTable.DbMemory<T> getMemory(Class<T> c) {
return new InMemoryTable.DbMemory<>(c, this);
}
Expand Down Expand Up @@ -119,7 +126,7 @@ private boolean isFinalActionNeeded(String action) {
}

final <T extends Entity<T>> void doInWriteTransaction(
String log, Class<T> type, Consumer<WriteTxDataShard<T>> consumer
String log, TableDescriptor<T> tableDescriptor, Consumer<WriteTxDataShard<T>> consumer
) {
if (options.isScan()) {
throw new IllegalTransactionScanException("Mutable operations");
Expand All @@ -129,7 +136,7 @@ final <T extends Entity<T>> void doInWriteTransaction(
}

Runnable query = () -> logTransaction(log, () -> {
WriteTxDataShard<T> shard = storage.getWriteTxDataShard(type, txId, getVersion());
WriteTxDataShard<T> shard = storage.getWriteTxDataShard(tableDescriptor, txId, getVersion());
consumer.accept(shard);

hasWrites = true;
Expand All @@ -143,11 +150,13 @@ final <T extends Entity<T>> void doInWriteTransaction(
}

final <T extends Entity<T>, R> R doInTransaction(
String action, Class<T> type, Function<ReadOnlyTxDataShard<T>, R> func
String action, TableDescriptor<T> tableDescriptor, Function<ReadOnlyTxDataShard<T>, R> func
) {
return logTransaction(action, () -> {
InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS;
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(type, txId, getVersion(), findWatcher);
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(
tableDescriptor, txId, getVersion(), findWatcher
);
try {
return func.apply(shard);
} catch (OptimisticLockException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package tech.ydb.yoj.repository.test.inmemory;

import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.TableDescriptor;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

final class InMemoryStorage {
private final Map<Class<?>, InMemoryDataShard<?>> shards;
private final Map<Long, Set<Class<?>>> uncommited = new HashMap<>();
private final Map<TableDescriptor<?>, InMemoryDataShard<?>> shards;
private final Map<Long, Set<TableDescriptor<?>>> uncommited = new HashMap<>();

private long currentVersion;

public InMemoryStorage() {
this(0, new HashMap<>());
}

private InMemoryStorage(long version, Map<Class<?>, InMemoryDataShard<?>> shards) {
private InMemoryStorage(long version, Map<TableDescriptor<?>, InMemoryDataShard<?>> shards) {
this.shards = shards;
this.currentVersion = version;
}
Expand All @@ -27,8 +28,8 @@ public synchronized long getCurrentVersion() {
}

public synchronized InMemoryStorage createSnapshot() {
Map<Class<?>, InMemoryDataShard<?>> snapshotDb = new HashMap<>();
for (Map.Entry<Class<?>, InMemoryDataShard<?>> entry : shards.entrySet()) {
Map<TableDescriptor<?>, InMemoryDataShard<?>> snapshotDb = new HashMap<>();
for (Map.Entry<TableDescriptor<?>, InMemoryDataShard<?>> entry : shards.entrySet()) {
snapshotDb.put(entry.getKey(), entry.getValue().createSnapshot());
}
return new InMemoryStorage(currentVersion, snapshotDb);
Expand All @@ -45,42 +46,42 @@ public synchronized void commit(long txId, long version, InMemoryTxLockWatcher w

currentVersion++;

Set<Class<?>> uncommitedTables = uncommited.remove(txId);
for (Class<?> type : uncommitedTables) {
shards.get(type).commit(txId, currentVersion);
Set<TableDescriptor<?>> uncommitedTables = uncommited.remove(txId);
for (TableDescriptor<?> tableDescriptor : uncommitedTables) {
shards.get(tableDescriptor).commit(txId, currentVersion);
}
}

public synchronized void rollback(long txId) {
Set<Class<?>> uncommitedTables = uncommited.remove(txId);
Set<TableDescriptor<?>> uncommitedTables = uncommited.remove(txId);
if (uncommitedTables == null) {
return;
}
for (Class<?> type : uncommitedTables) {
shards.get(type).rollback(txId);
for (TableDescriptor<?> tableDescriptor : uncommitedTables) {
shards.get(tableDescriptor).rollback(txId);
}
}

public synchronized <T extends Entity<T>> WriteTxDataShard<T> getWriteTxDataShard(
Class<T> type, long txId, long version
TableDescriptor<T> tableDescriptor, long txId, long version
) {
uncommited.computeIfAbsent(txId, __ -> new HashSet<>()).add(type);
return getTxDataShard(type, txId, version, InMemoryTxLockWatcher.NO_LOCKS);
uncommited.computeIfAbsent(txId, __ -> new HashSet<>()).add(tableDescriptor);
return getTxDataShard(tableDescriptor, txId, version, InMemoryTxLockWatcher.NO_LOCKS);
}

public synchronized <T extends Entity<T>> ReadOnlyTxDataShard<T> getReadOnlyTxDataShard(
Class<T> type, long txId, long version, InMemoryTxLockWatcher watcher
TableDescriptor<T> tableDescriptor, long txId, long version, InMemoryTxLockWatcher watcher
) {
return getTxDataShard(type, txId, version, watcher);
return getTxDataShard(tableDescriptor, txId, version, watcher);
}

private <T extends Entity<T>> TxDataShardImpl<T> getTxDataShard(
Class<T> type, long txId, long version, InMemoryTxLockWatcher watcher
TableDescriptor<T> tableDescriptor, long txId, long version, InMemoryTxLockWatcher watcher
) {
@SuppressWarnings("unchecked")
InMemoryDataShard<T> shard = (InMemoryDataShard<T>) shards.get(type);
InMemoryDataShard<T> shard = (InMemoryDataShard<T>) shards.get(tableDescriptor);
if (shard == null) {
throw new InMemoryRepositoryException("Table is not created: " + type.getSimpleName());
throw new InMemoryRepositoryException("Table is not created: " + tableDescriptor.toDebugString());
}
return new TxDataShardImpl<>(shard, txId, version, watcher);
}
Expand All @@ -89,27 +90,26 @@ public synchronized void dropDb() {
shards.clear();
}

@SuppressWarnings({"unchecked", "rawtypes"})
public synchronized Set<Class<? extends Entity<?>>> tables() {
return (Set) shards.keySet();
public synchronized Set<TableDescriptor<?>> tables() {
return shards.keySet();
}

public synchronized boolean containsTable(Class<?> type) {
return shards.containsKey(type);
public synchronized boolean containsTable(TableDescriptor<?> tableDescriptor) {
return shards.containsKey(tableDescriptor);
}

public synchronized <T extends Entity<T>> void createTable(Class<T> type) {
if (containsTable(type)) {
public synchronized <T extends Entity<T>> void createTable(TableDescriptor<T> tableDescriptor) {
if (containsTable(tableDescriptor)) {
return;
}
shards.put(type, new InMemoryDataShard<>(type));
shards.put(tableDescriptor, new InMemoryDataShard<>(tableDescriptor));
}

public synchronized boolean dropTable(Class<?> type) {
if (!containsTable(type)) {
public synchronized boolean dropTable(TableDescriptor<?> tableDescriptor) {
if (!containsTable(tableDescriptor)) {
return false;
}
shards.remove(type);
shards.remove(tableDescriptor);
return true;
}
}
Loading

0 comments on commit 7ccfc12

Please sign in to comment.