Skip to content

Commit

Permalink
better-spliterator: Better spliterator
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Jul 2, 2024
1 parent 58cba6e commit 64ac2fb
Show file tree
Hide file tree
Showing 11 changed files with 430 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,13 @@ public void streamAll() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> db.tx(() -> db.projects().streamAll(5001)));
}

private static <ID extends Entity.Id<?>> ReadTableParams<ID> defaultReadTableParamsNonLegacy() {
return RepositoryTest.<ID>buildReadTableParamsNonLegacy().build();
}

private static <ID extends Entity.Id<?>> ReadTableParams.ReadTableParamsBuilder<ID> buildReadTableParamsNonLegacy() {
return ReadTableParams.<ID>builder().useNewSpliterator(true);
return ReadTableParams.<ID>builder().useNewSpliterator2(true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.proto.ValueProtos;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.query.ReadTablePart;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.CommitTxSettings;
Expand Down Expand Up @@ -54,14 +56,23 @@
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueue;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueGrpcStreamAdapter;
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator;
import tech.ydb.yoj.repository.ydb.statement.Statement;
import tech.ydb.yoj.repository.ydb.table.YdbTable;
import tech.ydb.yoj.util.lang.Interrupts;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -78,7 +89,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
private final List<ClosableSpliterator<?>> spliterators = new ArrayList<>();

@Getter
private final TxOptions options;
Expand All @@ -102,8 +113,8 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
}

private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
private <V> YdbNewLegacySpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbNewLegacySpliterator<V> spliterator = new YdbNewLegacySpliterator<>(request, isOrdered);
spliterators.add(spliterator);
return spliterator;
}
Expand Down Expand Up @@ -153,7 +164,7 @@ private void doCommit() {

private void closeStreams() {
Exception summaryException = null;
for (YdbSpliterator<?> spliterator : spliterators) {
for (ClosableSpliterator<?> spliterator : spliterators) {
try {
spliterator.close();
} catch (Exception e) {
Expand Down Expand Up @@ -387,7 +398,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
String yql = getYql(statement);
Params sdkParams = getSdkParams(statement, params);

YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);

initSession();
session.executeScanQuery(
Expand Down Expand Up @@ -489,7 +500,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
}

if (params.isUseNewSpliterator()) {
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());

initSession();
session.readTable(
Expand All @@ -500,6 +511,30 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
return spliterator.createStream();
}

if (params.isUseNewSpliterator2()) {
initSession();

// TODO: configure stream timeout
YdbSpliteratorQueue<Iterator<RESULT>> queue = new YdbSpliteratorQueue<>(1, Duration.ofMinutes(5));

var adapter = new YdbSpliteratorQueueGrpcStreamAdapter<>("readTable: " + tableName, queue);
GrpcReadStream<ReadTablePart> grpcStream = session.executeReadTable(tableName, settings.build());
CompletableFuture<Status> future = grpcStream.start(readTablePart -> {
ResultSetIterator<RESULT> iterator = new ResultSetIterator<>(
readTablePart.getResultSetReader(),
mapper::mapResult
);
adapter.onNext(iterator);
});
future.whenComplete(adapter::onSupplierThreadComplete);

YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>(queue, params.isOrdered());

spliterators.add(spliterator);

return spliterator.createStream();
}

try {
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
doCall("read table " + mapper.getTableName(""), () -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import java.util.Spliterator;

public interface ClosableSpliterator<V> extends Spliterator<V> {
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.proto.ValueProtos;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.yoj.repository.ydb.client.YdbConverter;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

public final class ResultSetIterator<V> implements Iterator<V> {
private final ResultSetReader resultSet;
private final ResultConverter<V> converter;
private final List<ValueProtos.Column> columns;

private int position = 0;

public ResultSetIterator(ResultSetReader resultSet, ResultConverter<V> converter) {
List<ValueProtos.Column> columns;
if (resultSet.getRowCount() > 0) {
resultSet.setRowIndex(0);
columns = getColumns(resultSet);
} else {
columns = new ArrayList<>();
}

this.resultSet = resultSet;
this.converter = converter;
this.columns = columns;
}

@Override
public boolean hasNext() {
return position < resultSet.getRowCount();
}

@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

ValueProtos.Value value = buildValue(position++);

return converter.convert(columns, value);
}

private ValueProtos.Value buildValue(int rowIndex) {
resultSet.setRowIndex(rowIndex);
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
for (int i = 0; i < columns.size(); i++) {
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i)));
}
return value.build();
}

private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
List<ValueProtos.Column> columns = new ArrayList<>();
for (int i = 0; i < resultSet.getColumnCount(); i++) {
columns.add(ValueProtos.Column.newBuilder()
.setName(resultSet.getColumnName(i))
.build()
);
}
return columns;
}

@FunctionalInterface
public interface ResultConverter<V> {
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.yoj.ExperimentalApi;

import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
public final class YdbSpliterator<V> implements ClosableSpliterator<V> {
private final YdbSpliteratorQueue<Iterator<V>> queue;
private final int flags;

private Iterator<V> valueIterator;

private boolean closed = false;

public YdbSpliterator(YdbSpliteratorQueue<Iterator<V>> queue, boolean isOrdered) {
this.queue = queue;
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
}

// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
public Stream<V> createStream() {
return StreamSupport.stream(this, false).onClose(this::close);
}

@Override
public void close() {
if (closed) {
return;
}
closed = true;
queue.close();
}

@Override
public boolean tryAdvance(Consumer<? super V> action) {
if (closed) {
return false;
}

// WARNING: At one point in time, this spliterator will store up to queue.size() + 2 blocks from YDB in memory.
// One block right here, one in the queue, one in the grpc thread, waiting for free space in the queue.
// Maximum response size in YDB - 50mb. It means that it could be up to 150mb for spliterator.
valueIterator = getValueIterator(valueIterator, queue);
if (valueIterator == null) {
close();
return false;
}

V value = valueIterator.next();

action.accept(value);

return true;
}

/*
* Returns not empty valueIterator, null in case of end of stream
*/
@Nullable
private static <V> Iterator<V> getValueIterator(
@Nullable Iterator<V> valueIterator, YdbSpliteratorQueue<Iterator<V>> queue
) {
// valueIterator could be null only on first call of tryAdvance
if (valueIterator == null) {
valueIterator = queue.poll();
if (valueIterator == null) {
return null;
}
}

// queue could return empty iterator, we have to select one with elements
while (!valueIterator.hasNext()) {
valueIterator = queue.poll();
if (valueIterator == null) {
return null;
}
}

return valueIterator;
}

@Override
public Spliterator<V> trySplit() {
return null;
}

@Override
public long estimateSize() {
return Long.MAX_VALUE;
}

@Override
public long getExactSizeIfKnown() {
return -1;
}

@Override
public int characteristics() {
return flags;
}
}
Loading

0 comments on commit 64ac2fb

Please sign in to comment.