Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use TableDescriptor in InMemoryTable #111

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TableDescriptor;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.exception.EntityAlreadyExistsException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
Expand All @@ -20,33 +21,39 @@
import java.util.TreeMap;

final class InMemoryDataShard<T extends Entity<T>> {
private final Class<T> type;
private final TableDescriptor<T> tableDescriptor;
private final EntitySchema<T> schema;
private final TreeMap<Entity.Id<T>, InMemoryEntityLine> entityLines;
private final Map<Long, Set<Entity.Id<T>>> uncommited = new HashMap<>();

private InMemoryDataShard(
Class<T> type, EntitySchema<T> schema, TreeMap<Entity.Id<T>, InMemoryEntityLine> entityLines
TableDescriptor<T> tableDescriptor,
EntitySchema<T> schema,
TreeMap<Entity.Id<T>, InMemoryEntityLine> entityLines
) {
this.type = type;
this.tableDescriptor = tableDescriptor;
this.schema = schema;
this.entityLines = entityLines;
}

public InMemoryDataShard(Class<T> type) {
this(type, EntitySchema.of(type), createEmptyLines(type));
public InMemoryDataShard(TableDescriptor<T> tableDescriptor) {
this(
tableDescriptor,
EntitySchema.of(tableDescriptor.entityType()),
createEmptyLines(tableDescriptor.entityType())
);
}

private static <T extends Entity<T>> TreeMap<Entity.Id<T>, InMemoryEntityLine> createEmptyLines(Class<T> type) {
return new TreeMap<>(EntityIdSchema.getIdComparator(type));
}

public synchronized InMemoryDataShard<T> createSnapshot() {
TreeMap<Entity.Id<T>, InMemoryEntityLine> snapshotLines = createEmptyLines(type);
TreeMap<Entity.Id<T>, InMemoryEntityLine> snapshotLines = createEmptyLines(tableDescriptor.entityType());
for (Map.Entry<Entity.Id<T>, InMemoryEntityLine> entry : entityLines.entrySet()) {
snapshotLines.put(entry.getKey(), entry.getValue().createSnapshot());
}
return new InMemoryDataShard<>(type, schema, snapshotLines);
return new InMemoryDataShard<>(tableDescriptor, schema, snapshotLines);
}

public synchronized void commit(long txId, long version) {
Expand All @@ -60,14 +67,14 @@ public synchronized void commit(long txId, long version) {
}

public synchronized void checkLocks(long version, InMemoryTxLockWatcher watcher) {
for (Entity.Id<T> lockedId : watcher.getReadRows(type)) {
for (Entity.Id<T> lockedId : watcher.getReadRows(tableDescriptor)) {
InMemoryEntityLine entityLine = entityLines.get(lockedId);
if (entityLine != null && entityLine.hasYounger(version)) {
throw new OptimisticLockException("Row lock failed " + lockedId);
}
}

List<Range<Entity.Id<T>>> lockedRanges = watcher.getReadRanges(type);
List<Range<Entity.Id<T>>> lockedRanges = watcher.getReadRanges(tableDescriptor);
if (lockedRanges.isEmpty()) {
return;
}
Expand All @@ -79,7 +86,7 @@ public synchronized void checkLocks(long version, InMemoryTxLockWatcher watcher)

for (Range<Entity.Id<T>> lockedRange: lockedRanges) {
if (lockedRange.contains(entry.getKey())) {
throw new OptimisticLockException("Table lock failed " + type.getSimpleName());
throw new OptimisticLockException("Table lock failed " + tableDescriptor.toDebugString());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tech.ydb.yoj.repository.db.Repository;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.SchemaOperations;
import tech.ydb.yoj.repository.db.TableDescriptor;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.exception.DropTableException;

Expand Down Expand Up @@ -38,7 +39,7 @@ public void loadSnapshot(String id) {
}

@Override
public Set<Class<? extends Entity<?>>> tables() {
public Set<TableDescriptor<?>> tables() {
return Set.copyOf(storage.tables());
}

Expand All @@ -47,26 +48,25 @@ public RepositoryTransaction startTransaction(TxOptions options) {
return new InMemoryRepositoryTransaction(options, this);
}

@Override
public <T extends Entity<T>> SchemaOperations<T> schema(Class<T> c) {
public <T extends Entity<T>> SchemaOperations<T> schema(TableDescriptor<T> tableDescriptor) {
return new SchemaOperations<T>() {
@Override
public void create() {
storage.createTable(c);
storage.createTable(tableDescriptor);
}

@Override
public void drop() {
if (!storage.dropTable(c)) {
throw new DropTableException(
String.format("Can't drop table %s: table doesn't exist", c.getSimpleName())
if (!storage.dropTable(tableDescriptor)) {
throw new DropTableException(String.format("Can't drop table %s: table doesn't exist",
tableDescriptor.toDebugString())
);
}
}

@Override
public boolean exists() {
return storage.containsTable(c);
return storage.containsTable(tableDescriptor);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.TableDescriptor;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException;
Expand Down Expand Up @@ -59,6 +60,12 @@ public <T extends Entity<T>> Table<T> table(Class<T> c) {
return new InMemoryTable<>(getMemory(c));
}

@Override
public <T extends Entity<T>> Table<T> table(TableDescriptor<T> tableDescriptor) {
return new InMemoryTable<>(this, tableDescriptor);
}

@Deprecated // use other constructor of InMemoryTable
public final <T extends Entity<T>> InMemoryTable.DbMemory<T> getMemory(Class<T> c) {
return new InMemoryTable.DbMemory<>(c, this);
}
Expand Down Expand Up @@ -119,7 +126,7 @@ private boolean isFinalActionNeeded(String action) {
}

final <T extends Entity<T>> void doInWriteTransaction(
String log, Class<T> type, Consumer<WriteTxDataShard<T>> consumer
String log, TableDescriptor<T> tableDescriptor, Consumer<WriteTxDataShard<T>> consumer
) {
if (options.isScan()) {
throw new IllegalTransactionScanException("Mutable operations");
Expand All @@ -129,7 +136,7 @@ final <T extends Entity<T>> void doInWriteTransaction(
}

Runnable query = () -> logTransaction(log, () -> {
WriteTxDataShard<T> shard = storage.getWriteTxDataShard(type, txId, getVersion());
WriteTxDataShard<T> shard = storage.getWriteTxDataShard(tableDescriptor, txId, getVersion());
consumer.accept(shard);

hasWrites = true;
Expand All @@ -143,11 +150,13 @@ final <T extends Entity<T>> void doInWriteTransaction(
}

final <T extends Entity<T>, R> R doInTransaction(
String action, Class<T> type, Function<ReadOnlyTxDataShard<T>, R> func
String action, TableDescriptor<T> tableDescriptor, Function<ReadOnlyTxDataShard<T>, R> func
) {
return logTransaction(action, () -> {
InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS;
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(type, txId, getVersion(), findWatcher);
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(
tableDescriptor, txId, getVersion(), findWatcher
);
try {
return func.apply(shard);
} catch (OptimisticLockException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package tech.ydb.yoj.repository.test.inmemory;

import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.TableDescriptor;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

final class InMemoryStorage {
private final Map<Class<?>, InMemoryDataShard<?>> shards;
private final Map<Long, Set<Class<?>>> uncommited = new HashMap<>();
private final Map<TableDescriptor<?>, InMemoryDataShard<?>> shards;
private final Map<Long, Set<TableDescriptor<?>>> uncommited = new HashMap<>();

private long currentVersion;

public InMemoryStorage() {
this(0, new HashMap<>());
}

private InMemoryStorage(long version, Map<Class<?>, InMemoryDataShard<?>> shards) {
private InMemoryStorage(long version, Map<TableDescriptor<?>, InMemoryDataShard<?>> shards) {
this.shards = shards;
this.currentVersion = version;
}
Expand All @@ -27,8 +28,8 @@ public synchronized long getCurrentVersion() {
}

public synchronized InMemoryStorage createSnapshot() {
Map<Class<?>, InMemoryDataShard<?>> snapshotDb = new HashMap<>();
for (Map.Entry<Class<?>, InMemoryDataShard<?>> entry : shards.entrySet()) {
Map<TableDescriptor<?>, InMemoryDataShard<?>> snapshotDb = new HashMap<>();
for (Map.Entry<TableDescriptor<?>, InMemoryDataShard<?>> entry : shards.entrySet()) {
snapshotDb.put(entry.getKey(), entry.getValue().createSnapshot());
}
return new InMemoryStorage(currentVersion, snapshotDb);
Expand All @@ -45,42 +46,42 @@ public synchronized void commit(long txId, long version, InMemoryTxLockWatcher w

currentVersion++;

Set<Class<?>> uncommitedTables = uncommited.remove(txId);
for (Class<?> type : uncommitedTables) {
shards.get(type).commit(txId, currentVersion);
Set<TableDescriptor<?>> uncommitedTables = uncommited.remove(txId);
for (TableDescriptor<?> tableDescriptor : uncommitedTables) {
shards.get(tableDescriptor).commit(txId, currentVersion);
}
}

public synchronized void rollback(long txId) {
Set<Class<?>> uncommitedTables = uncommited.remove(txId);
Set<TableDescriptor<?>> uncommitedTables = uncommited.remove(txId);
if (uncommitedTables == null) {
return;
}
for (Class<?> type : uncommitedTables) {
shards.get(type).rollback(txId);
for (TableDescriptor<?> tableDescriptor : uncommitedTables) {
shards.get(tableDescriptor).rollback(txId);
}
}

public synchronized <T extends Entity<T>> WriteTxDataShard<T> getWriteTxDataShard(
Class<T> type, long txId, long version
TableDescriptor<T> tableDescriptor, long txId, long version
) {
uncommited.computeIfAbsent(txId, __ -> new HashSet<>()).add(type);
return getTxDataShard(type, txId, version, InMemoryTxLockWatcher.NO_LOCKS);
uncommited.computeIfAbsent(txId, __ -> new HashSet<>()).add(tableDescriptor);
return getTxDataShard(tableDescriptor, txId, version, InMemoryTxLockWatcher.NO_LOCKS);
}

public synchronized <T extends Entity<T>> ReadOnlyTxDataShard<T> getReadOnlyTxDataShard(
Class<T> type, long txId, long version, InMemoryTxLockWatcher watcher
TableDescriptor<T> tableDescriptor, long txId, long version, InMemoryTxLockWatcher watcher
) {
return getTxDataShard(type, txId, version, watcher);
return getTxDataShard(tableDescriptor, txId, version, watcher);
}

private <T extends Entity<T>> TxDataShardImpl<T> getTxDataShard(
Class<T> type, long txId, long version, InMemoryTxLockWatcher watcher
TableDescriptor<T> tableDescriptor, long txId, long version, InMemoryTxLockWatcher watcher
) {
@SuppressWarnings("unchecked")
InMemoryDataShard<T> shard = (InMemoryDataShard<T>) shards.get(type);
InMemoryDataShard<T> shard = (InMemoryDataShard<T>) shards.get(tableDescriptor);
if (shard == null) {
throw new InMemoryRepositoryException("Table is not created: " + type.getSimpleName());
throw new InMemoryRepositoryException("Table is not created: " + tableDescriptor.toDebugString());
}
return new TxDataShardImpl<>(shard, txId, version, watcher);
}
Expand All @@ -89,27 +90,26 @@ public synchronized void dropDb() {
shards.clear();
}

@SuppressWarnings({"unchecked", "rawtypes"})
public synchronized Set<Class<? extends Entity<?>>> tables() {
return (Set) shards.keySet();
public synchronized Set<TableDescriptor<?>> tables() {
return shards.keySet();
}

public synchronized boolean containsTable(Class<?> type) {
return shards.containsKey(type);
public synchronized boolean containsTable(TableDescriptor<?> tableDescriptor) {
return shards.containsKey(tableDescriptor);
}

public synchronized <T extends Entity<T>> void createTable(Class<T> type) {
if (containsTable(type)) {
public synchronized <T extends Entity<T>> void createTable(TableDescriptor<T> tableDescriptor) {
if (containsTable(tableDescriptor)) {
return;
}
shards.put(type, new InMemoryDataShard<>(type));
shards.put(tableDescriptor, new InMemoryDataShard<>(tableDescriptor));
}

public synchronized boolean dropTable(Class<?> type) {
if (!containsTable(type)) {
public synchronized boolean dropTable(TableDescriptor<?> tableDescriptor) {
if (!containsTable(tableDescriptor)) {
return false;
}
shards.remove(type);
shards.remove(tableDescriptor);
return true;
}
}
Loading
Loading