Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better spliterator #69

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -357,13 +357,13 @@ public void streamAll() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> db.tx(() -> db.projects().streamAll(5001)));
}

private static <ID extends Entity.Id<?>> ReadTableParams<ID> defaultReadTableParamsNonLegacy() {
return RepositoryTest.<ID>buildReadTableParamsNonLegacy().build();
}

private static <ID extends Entity.Id<?>> ReadTableParams.ReadTableParamsBuilder<ID> buildReadTableParamsNonLegacy() {
return ReadTableParams.<ID>builder().useNewSpliterator(true);
return ReadTableParams.<ID>builder().useNewSpliterator2(true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.proto.ValueProtos;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.query.ReadTablePart;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.CommitTxSettings;
Expand Down Expand Up @@ -54,14 +56,23 @@
import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException;
import tech.ydb.yoj.repository.ydb.merge.QueriesMerger;
import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper;
import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueue;
import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueGrpcStreamAdapter;
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator;
import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator;
import tech.ydb.yoj.repository.ydb.statement.Statement;
import tech.ydb.yoj.repository.ydb.table.YdbTable;
import tech.ydb.yoj.util.lang.Interrupts;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -78,7 +89,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class);

private final List<YdbRepository.Query<?>> pendingWrites = new ArrayList<>();
private final List<YdbSpliterator<?>> spliterators = new ArrayList<>();
private final List<ClosableSpliterator<?>> spliterators = new ArrayList<>();

@Getter
private final TxOptions options;
Expand All @@ -102,8 +113,8 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty();
}

private <V> YdbSpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbSpliterator<V> spliterator = new YdbSpliterator<>(request, isOrdered);
private <V> YdbNewLegacySpliterator<V> createSpliterator(String request, boolean isOrdered) {
YdbNewLegacySpliterator<V> spliterator = new YdbNewLegacySpliterator<>(request, isOrdered);
spliterators.add(spliterator);
return spliterator;
}
Expand Down Expand Up @@ -153,7 +164,7 @@ private void doCommit() {

private void closeStreams() {
Exception summaryException = null;
for (YdbSpliterator<?> spliterator : spliterators) {
for (ClosableSpliterator<?> spliterator : spliterators) {
try {
spliterator.close();
} catch (Exception e) {
Expand Down Expand Up @@ -387,7 +398,7 @@ public <PARAMS, RESULT> Stream<RESULT> executeScanQuery(Statement<PARAMS, RESULT
String yql = getYql(statement);
Params sdkParams = getSdkParams(statement, params);

YdbSpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("scanQuery: " + yql, false);

initSession();
session.executeScanQuery(
Expand Down Expand Up @@ -489,7 +500,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
}

if (params.isUseNewSpliterator()) {
YdbSpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());
YdbNewLegacySpliterator<RESULT> spliterator = createSpliterator("readTable: " + tableName, params.isOrdered());

initSession();
session.readTable(
Expand All @@ -500,6 +511,30 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
return spliterator.createStream();
}

if (params.isUseNewSpliterator2()) {
initSession();

// TODO: configure stream timeout
YdbSpliteratorQueue<Iterator<RESULT>> queue = new YdbSpliteratorQueue<>(1, Duration.ofMinutes(5));

var adapter = new YdbSpliteratorQueueGrpcStreamAdapter<>("readTable: " + tableName, queue);
GrpcReadStream<ReadTablePart> grpcStream = session.executeReadTable(tableName, settings.build());
CompletableFuture<Status> future = grpcStream.start(readTablePart -> {
ResultSetIterator<RESULT> iterator = new ResultSetIterator<>(
readTablePart.getResultSetReader(),
mapper::mapResult
);
adapter.onNext(iterator);
});
future.whenComplete(adapter::onSupplierThreadComplete);

YdbSpliterator<RESULT> spliterator = new YdbSpliterator<>(queue, params.isOrdered());

spliterators.add(spliterator);

return spliterator.createStream();
}

try {
YdbLegacySpliterator<RESULT> spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action ->
doCall("read table " + mapper.getTableName(""), () -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import java.util.Spliterator;

public interface ClosableSpliterator<V> extends Spliterator<V> {
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.proto.ValueProtos;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.yoj.repository.ydb.client.YdbConverter;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

public final class ResultSetIterator<V> implements Iterator<V> {
private final ResultSetReader resultSet;
private final ResultConverter<V> converter;
private final List<ValueProtos.Column> columns;

private int position = 0;

public ResultSetIterator(ResultSetReader resultSet, ResultConverter<V> converter) {
List<ValueProtos.Column> columns;
if (resultSet.getRowCount() > 0) {
resultSet.setRowIndex(0);
columns = getColumns(resultSet);
} else {
columns = new ArrayList<>();
}

this.resultSet = resultSet;
this.converter = converter;
this.columns = columns;
}

@Override
public boolean hasNext() {
return position < resultSet.getRowCount();
}

@Override
public V next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

ValueProtos.Value value = buildValue(position++);

return converter.convert(columns, value);
}

private ValueProtos.Value buildValue(int rowIndex) {
resultSet.setRowIndex(rowIndex);
ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder();
for (int i = 0; i < columns.size(); i++) {
value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i)));
}
return value.build();
}

private static List<ValueProtos.Column> getColumns(ResultSetReader resultSet) {
List<ValueProtos.Column> columns = new ArrayList<>();
for (int i = 0; i < resultSet.getColumnCount(); i++) {
columns.add(ValueProtos.Column.newBuilder()
.setName(resultSet.getColumnName(i))
.build()
);
}
return columns;
}

@FunctionalInterface
public interface ResultConverter<V> {
V convert(List<ValueProtos.Column> columns, ValueProtos.Value value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package tech.ydb.yoj.repository.ydb.spliterator;

import tech.ydb.yoj.ExperimentalApi;

import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42")
public final class YdbSpliterator<V> implements ClosableSpliterator<V> {
private final YdbSpliteratorQueue<Iterator<V>> queue;
private final int flags;

private Iterator<V> valueIterator;

private boolean closed = false;

public YdbSpliterator(YdbSpliteratorQueue<Iterator<V>> queue, boolean isOrdered) {
this.queue = queue;
this.flags = (isOrdered ? ORDERED : 0) | NONNULL;
}

// Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak.
public Stream<V> createStream() {
return StreamSupport.stream(this, false).onClose(this::close);
}

@Override
public void close() {
if (closed) {
return;
}
closed = true;
queue.close();
}

@Override
public boolean tryAdvance(Consumer<? super V> action) {
if (closed) {
return false;
}

// WARNING: At one point in time, this spliterator will store up to queue.size() + 2 blocks from YDB in memory.
// One block right here, one in the queue, one in the grpc thread, waiting for free space in the queue.
// Maximum response size in YDB - 50mb. It means that it could be up to 150mb for spliterator.
valueIterator = getValueIterator(valueIterator, queue);
if (valueIterator == null) {
close();
return false;
}

V value = valueIterator.next();

action.accept(value);

return true;
}

/*
* Returns not empty valueIterator, null in case of end of stream
*/
@Nullable
private static <V> Iterator<V> getValueIterator(
@Nullable Iterator<V> valueIterator, YdbSpliteratorQueue<Iterator<V>> queue
) {
// valueIterator could be null only on first call of tryAdvance
if (valueIterator == null) {
valueIterator = queue.poll();
if (valueIterator == null) {
return null;
}
}

// queue could return empty iterator, we have to select one with elements
while (!valueIterator.hasNext()) {
valueIterator = queue.poll();
if (valueIterator == null) {
return null;
}
}

return valueIterator;
}

@Override
public Spliterator<V> trySplit() {
return null;
}

@Override
public long estimateSize() {
return Long.MAX_VALUE;
}

@Override
public long getExactSizeIfKnown() {
return -1;
}

@Override
public int characteristics() {
return flags;
}
}
Loading