Skip to content

Commit

Permalink
better-spliterator: Better spliterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Jul 2, 2024
1 parent 58cba6e commit 030388c
Show file tree
Hide file tree
Showing 12 changed files with 499 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,13 @@ public void streamAll() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> db.tx(() -> db.projects().streamAll(5001)));
}

private static <ID extends Entity.Id<?>> ReadTableParams<ID> defaultReadTableParamsNonLegacy() {
return RepositoryTest.<ID>buildReadTableParamsNonLegacy().build();
}

private static <ID extends Entity.Id<?>> ReadTableParams.ReadTableParamsBuilder<ID> buildReadTableParamsNonLegacy() {
return ReadTableParams.<ID>builder().useNewSpliterator(true);
return ReadTableParams.<ID>builder().useNewSpliterator2(true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.proto.ValueProtos;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.query.ReadTablePart;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.CommitTxSettings;
Expand Down Expand Up @@ -54,14 +56,23 @@
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueue;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueWrapper;
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator;
import tech.ydb.yoj.repository.ydb.statement.Statement;
import tech.ydb.yoj.repository.ydb.table.YdbTable;
import tech.ydb.yoj.util.lang.Interrupts;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -78,7 +89,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
private final List<ClosableSpliterator<?>> spliterators = new ArrayList<>();

@Getter
private final TxOptions options;
Expand All @@ -102,8 +113,8 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
}

private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
private <V> YdbNewLegacySpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbNewLegacySpliterator<V> spliterator = new YdbNewLegacySpliterator<>(request, isOrdered);
spliterators.add(spliterator);
return spliterator;
}
Expand Down Expand Up @@ -153,7 +164,7 @@ private void doCommit() {

private void closeStreams() {
Exception summaryException = null;
for (YdbSpliterator<?> spliterator : spliterators) {
for (ClosableSpliterator<?> spliterator : spliterators) {
try {
spliterator.close();
} catch (Exception e) {
Expand Down Expand Up @@ -387,7 +398,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
String yql = getYql(statement);
Params sdkParams = getSdkParams(statement, params);

YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);

initSession();
session.executeScanQuery(
Expand Down Expand Up @@ -489,7 +500,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
}

if (params.isUseNewSpliterator()) {
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());

initSession();
session.readTable(
Expand All @@ -500,6 +511,36 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
return spliterator.createStream();
}

if (params.isUseNewSpliterator2()) {
initSession();

// TODO: configure stream timeout
// TODO: rename wrapper to something other
YdbSpliteratorQueue<Iterator<RESULT>> queue = new YdbSpliteratorQueue<>(1, Duration.ofMinutes(5));
YdbSpliteratorQueueWrapper<Iterator<RESULT>> wrapper = new YdbSpliteratorQueueWrapper<>(
"readTable: " + tableName, queue
);

GrpcReadStream<ReadTablePart> grpcStream = session.executeReadTable(tableName, settings.build());

CompletableFuture<Status> future = grpcStream.start(readTablePart -> {
ResultSetIterator<RESULT> iterator = new ResultSetIterator<>(
readTablePart.getResultSetReader(),
mapper::mapResult
);
wrapper.onNext(iterator);
});
future.whenComplete(wrapper::onSupplierThreadComplete);

// TODO: do we have to close grpcStream??

YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>(queue, params.isOrdered());

spliterators.add(spliterator);

return spliterator.createStream();
}

try {
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
doCall("read table " + mapper.getTableName(""), () -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import java.util.Spliterator;

public interface ClosableSpliterator<V> extends Spliterator<V> {
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.proto.ValueProtos;

import java.util.List;

@FunctionalInterface
public interface ResultConverter<V> {
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.proto.ValueProtos;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.yoj.repository.ydb.client.YdbConverter;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

public final class ResultSetIterator<V> implements Iterator<V> {
private final ResultSetReader resultSet;
private final ResultConverter<V> converter;
private final List<ValueProtos.Column> columns;

private int position = 0;

public ResultSetIterator(ResultSetReader resultSet, ResultConverter<V> converter) {
List<ValueProtos.Column> columns;
if (resultSet.getRowCount() > 0) {
resultSet.setRowIndex(0);
columns = getColumns(resultSet);
} else {
columns = new ArrayList<>();
}

this.resultSet = resultSet;
this.converter = converter;
this.columns = columns;
}

@Override
public boolean hasNext() {
return position < resultSet.getRowCount();
}

@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

ValueProtos.Value value = buildValue(position++);

return converter.convert(columns, value);
}

private ValueProtos.Value buildValue(int rowIndex) {
resultSet.setRowIndex(rowIndex);
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
for (int i = 0; i < columns.size(); i++) {
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i)));
}
return value.build();
}

private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
List<ValueProtos.Column> columns = new ArrayList<>();
for (int i = 0; i < resultSet.getColumnCount(); i++) {
columns.add(ValueProtos.Column.newBuilder()
.setName(resultSet.getColumnName(i))
.build()
);
}
return columns;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.yoj.ExperimentalApi;

import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
public final class YdbSpliterator<V> implements ClosableSpliterator<V> {
private final YdbSpliteratorQueue<Iterator<V>> queue;
private final int flags;

private Iterator<V> valueIterator;

private boolean closed = false;

public YdbSpliterator(YdbSpliteratorQueue<Iterator<V>> queue, boolean isOrdered) {
this.queue = queue;
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
}

// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
public Stream<V> createStream() {
return StreamSupport.stream(this, false).onClose(this::close);
}

@Override
public boolean tryAdvance(Consumer<? super V> action) {
if (closed) {
return false;
}

if (valueIterator == null || !valueIterator.hasNext()) {
valueIterator = queue.poll();
if (valueIterator == null || !valueIterator.hasNext()) {
closed = true;
return false;
}
}

V value = valueIterator.next();

action.accept(value);

return true;
}

@Override
public void close() {
closed = true;
queue.close();
}

@Override
public Spliterator<V> trySplit() {
return null;
}

@Override
public long estimateSize() {
return Long.MAX_VALUE;
}

@Override
public long getExactSizeIfKnown() {
return -1;
}

@Override
public int characteristics() {
return flags;
}

}
Loading

0 comments on commit 030388c

Please sign in to comment.