Skip to content

Commit

Permalink
separate-projections-poc: MigrationProjectionCache
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Jun 2, 2024
1 parent 443869c commit 17e56a5
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -405,25 +405,25 @@ public T insert(T tt) {
T t = tt.preSave();
transaction.getWatcher().markRowRead(type, t.getId());
transaction.doInWriteTransaction("insert(" + t + ")", type, shard -> shard.insert(t));
transaction.getTransactionLocal().projectionCache().save(transaction, t);
transaction.getTransactionLocal().firstLevelCache().put(t);
transaction.getTransactionLocal().projectionCache().save(t);
return t;
}

@Override
public T save(T tt) {
T t = tt.preSave();
transaction.doInWriteTransaction("save(" + t + ")", type, shard -> shard.save(t));
transaction.getTransactionLocal().projectionCache().save(transaction, t);
transaction.getTransactionLocal().firstLevelCache().put(t);
transaction.getTransactionLocal().projectionCache().save(t);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
transaction.doInWriteTransaction("delete(" + id + ")", type, shard -> shard.delete(id));
transaction.getTransactionLocal().projectionCache().delete(transaction, id);
transaction.getTransactionLocal().firstLevelCache().putEmpty(id);
transaction.getTransactionLocal().projectionCache().delete(id);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.db.statement.Changeset;
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
Expand Down Expand Up @@ -54,17 +54,17 @@
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;

public class YdbTable<T extends Entity<T>> implements Table<T> {
private final QueryExecutor executor;
private final YdbRepositoryTransaction<?> executor;
@Getter
private final Class<T> type;

public YdbTable(Class<T> type, QueryExecutor executor) {
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
this.type = type;
this.executor = new CheckingQueryExecutor(executor);
this.executor = executor;
}

protected YdbTable(QueryExecutor executor) {
this.executor = new CheckingQueryExecutor(executor);
this.executor = (YdbRepositoryTransaction<?>) executor;
this.type = resolveEntityType();
}

Expand Down Expand Up @@ -420,25 +420,25 @@ public void update(Entity.Id<T> id, Changeset changeset) {
public T insert(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
return t;
}

@Override
public T save(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
executor.pendingExecute(YqlStatement.delete(type), id);
executor.getTransactionLocal().projectionCache().delete(executor, id);
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
executor.getTransactionLocal().projectionCache().delete(id);
}

/**
Expand All @@ -458,7 +458,7 @@ public <ID extends Id<T>> void migrate(ID id) {
T rawEntity = foundRaw.get(0);
T entityToSave = rawEntity.postLoad().preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
}

@Override
Expand Down Expand Up @@ -494,55 +494,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
TransactionLocal getTransactionLocal();
}

public static class CheckingQueryExecutor implements QueryExecutor {
private final QueryExecutor delegate;
private final Tx originTx;

public CheckingQueryExecutor(QueryExecutor delegate) {
this.delegate = delegate;
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
}

private void check() {
Tx.checkSameTx(originTx);
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
check();
return delegate.execute(statement, params);
}

@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
return delegate.executeScanQuery(statement, params);
}

@Override
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
check();
delegate.pendingExecute(statement, value);
}

@Override
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
check();
delegate.bulkUpsert(mapper, input, params);
}

@Override
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
check();
return delegate.readTable(mapper, params);
}

@Override
public TransactionLocal getTransactionLocal() {
check();
return delegate.getTransactionLocal();
}
}

public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
import tech.ydb.yoj.repository.db.Table;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.ViewSchema;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.cache.FirstLevelCache;
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.db.statement.Changeset;
import tech.ydb.yoj.repository.ydb.YdbRepositoryTransaction;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapperImpl;
import tech.ydb.yoj.repository.ydb.readtable.EntityIdKeyMapper;
Expand Down Expand Up @@ -54,17 +54,17 @@
import static tech.ydb.yoj.repository.db.EntityExpressions.defaultOrder;

public class YdbTable<T extends Entity<T>> implements Table<T> {
private final QueryExecutor executor;
private final YdbRepositoryTransaction<?> executor;
@Getter
private final Class<T> type;

public YdbTable(Class<T> type, QueryExecutor executor) {
public YdbTable(Class<T> type, YdbRepositoryTransaction<?> executor) {
this.type = type;
this.executor = new CheckingQueryExecutor(executor);
this.executor = executor;
}

protected YdbTable(QueryExecutor executor) {
this.executor = new CheckingQueryExecutor(executor);
this.executor = (YdbRepositoryTransaction<?>) executor;
this.type = resolveEntityType();
}

Expand Down Expand Up @@ -421,7 +421,7 @@ public T insert(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.insert(type), entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
return t;
}

Expand All @@ -430,15 +430,15 @@ public T save(T t) {
T entityToSave = t.preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().firstLevelCache().put(entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
return t;
}

@Override
public void delete(Entity.Id<T> id) {
executor.pendingExecute(YqlStatement.delete(type), id);
executor.getTransactionLocal().firstLevelCache().putEmpty(id);
executor.getTransactionLocal().projectionCache().delete(id);
executor.getTransactionLocal().projectionCache().delete(executor, id);
}

/**
Expand All @@ -458,7 +458,7 @@ public <ID extends Id<T>> void migrate(ID id) {
T rawEntity = foundRaw.get(0);
T entityToSave = rawEntity.postLoad().preSave();
executor.pendingExecute(YqlStatement.save(type), entityToSave);
executor.getTransactionLocal().projectionCache().save(entityToSave);
executor.getTransactionLocal().projectionCache().save(executor, entityToSave);
}

@Override
Expand Down Expand Up @@ -494,55 +494,6 @@ default <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams p
TransactionLocal getTransactionLocal();
}

public static class CheckingQueryExecutor implements QueryExecutor {
private final QueryExecutor delegate;
private final Tx originTx;

public CheckingQueryExecutor(QueryExecutor delegate) {
this.delegate = delegate;
this.originTx = Tx.Current.exists() ? Tx.Current.get() : null;
}

private void check() {
Tx.checkSameTx(originTx);
}

@Override
public <PARAMS, RESULT> List<RESULT> execute(Statement<PARAMS, RESULT> statement, PARAMS params) {
check();
return delegate.execute(statement, params);
}

@Override
public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT> statement, PARAMS params) {
return delegate.executeScanQuery(statement, params);
}

@Override
public <PARAMS> void pendingExecute(Statement<PARAMS, ?> statement, PARAMS value) {
check();
delegate.pendingExecute(statement, value);
}

@Override
public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams params) {
check();
delegate.bulkUpsert(mapper, input, params);
}

@Override
public <IN, OUT> Stream<OUT> readTable(ReadTableMapper<IN, OUT> mapper, ReadTableParams<IN> params) {
check();
return delegate.readTable(mapper, params);
}

@Override
public TransactionLocal getTransactionLocal() {
check();
return delegate.getTransactionLocal();
}
}

public <ID extends Id<T>> void updateIn(Collection<ID> ids, Changeset changeset) {
var params = new UpdateInStatement.UpdateInStatementInput<>(ids, changeset.toMap());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@ public TxManager immediateWrites() {
return withOptions(this.options.withImmediateWrites(true));
}

@Override
public TxManager separateProjections() {
return withOptions(this.options.withSeparateProjections(true));
}

@Override
public TxManager noFirstLevelCache() {
return withOptions(this.options.withFirstLevelCache(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public interface TxManager {
*/
TxManager immediateWrites();

TxManager separateProjections();

/**
* Turn off first level cache
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class TxOptions {

boolean immediateWrites;

boolean separateProjections;

public static TxOptions create(@NonNull IsolationLevel isolationLevel) {
return builder()
.isolationLevel(isolationLevel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,46 +3,48 @@
import lombok.NonNull;
import tech.ydb.yoj.repository.BaseDb;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.projection.MigrationProjectionCache;
import tech.ydb.yoj.repository.db.projection.ProjectionCache;
import tech.ydb.yoj.repository.db.projection.RoProjectionCache;
import tech.ydb.yoj.repository.db.projection.RwProjectionCache;

import java.util.IdentityHashMap;
import java.util.Map;
import java.util.function.Supplier;

public class TransactionLocal {
private final Map<Supplier<?>, Object> singletons = new IdentityHashMap<>();

private final Supplier<FirstLevelCache> firstLevelCacheSupplier;
private final Supplier<ProjectionCache> projectionCacheSupplier;
private final Supplier<TransactionLog> logSupplier;
private final FirstLevelCache firstLevelCache;
private final ProjectionCache projectionCache;
private final TransactionLog log;

public TransactionLocal(@NonNull TxOptions options) {
this.firstLevelCacheSupplier = options.isFirstLevelCache() ? FirstLevelCache::create : FirstLevelCache::empty;
this.projectionCacheSupplier = options.isMutable() ? RwProjectionCache::new : RoProjectionCache::new;
this.logSupplier = () -> new TransactionLog(options.getLogLevel());
this.firstLevelCache = options.isFirstLevelCache() ? FirstLevelCache.create() : FirstLevelCache.empty();
this.projectionCache = createProjectionCache(firstLevelCache, options);
this.log = new TransactionLog(options.getLogLevel());
}

public static TransactionLocal get() {
return BaseDb.current(Holder.class).getTransactionLocal();
private static ProjectionCache createProjectionCache(FirstLevelCache firstLevelCache, TxOptions options) {
if (options.isMutable()) {
if (options.isSeparateProjections()) {
return new MigrationProjectionCache(firstLevelCache);
}

return new RwProjectionCache();
}

return new RoProjectionCache();
}

@SuppressWarnings("unchecked")
public <X> X instance(@NonNull Supplier<X> supplier) {
return (X) singletons.computeIfAbsent(supplier, Supplier::get);
public static TransactionLocal get() {
return BaseDb.current(Holder.class).getTransactionLocal();
}

public ProjectionCache projectionCache() {
return instance(projectionCacheSupplier);
return projectionCache;
}

public FirstLevelCache firstLevelCache() {
return instance(firstLevelCacheSupplier);
return firstLevelCache;
}

public TransactionLog log() {
return instance(logSupplier);
return log;
}

public interface Holder {
Expand Down
Loading

0 comments on commit 17e56a5

Please sign in to comment.