From 030388c529ec2201901a31e21a85f299435db13a Mon Sep 17 00:00:00 2001 From: Alexander Lavrukov Date: Wed, 15 May 2024 21:47:01 +0300 Subject: [PATCH] better-spliterator: Better spliterator --- .../yoj/repository/test/RepositoryTest.java | 4 +- .../ydb/YdbRepositoryTransaction.java | 53 ++++- .../ydb/spliterator/ClosableSpliterator.java | 7 + .../ydb/spliterator/ResultConverter.java | 10 + .../ydb/spliterator/ResultSetIterator.java | 68 ++++++ .../ydb/spliterator/YdbSpliterator.java | 77 +++++++ .../ydb/spliterator/YdbSpliteratorQueue.java | 194 ++++++++++++++++++ .../YdbSpliteratorQueueWrapper.java | 68 ++++++ .../legacy}/YdbLegacySpliterator.java | 2 +- .../legacy/YdbNewLegacySpliterator.java} | 24 +-- .../legacy/YdbNewLegacySpliteratorTest.java} | 14 +- .../db/readtable/ReadTableParams.java | 9 + 12 files changed, 499 insertions(+), 31 deletions(-) create mode 100644 repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ClosableSpliterator.java create mode 100644 repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultConverter.java create mode 100644 repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultSetIterator.java create mode 100644 repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliterator.java create mode 100644 repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueue.java create mode 100644 repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueueWrapper.java rename repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/{ => spliterator/legacy}/YdbLegacySpliterator.java (96%) rename repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/{YdbSpliterator.java => spliterator/legacy/YdbNewLegacySpliterator.java} (87%) rename repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/{YdbSpliteratorTest.java => spliterator/legacy/YdbNewLegacySpliteratorTest.java} (92%) diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java index 76924a3e..d94d9770 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java @@ -357,13 +357,13 @@ public void streamAll() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> db.tx(() -> db.projects().streamAll(5001))); } - + private static > ReadTableParams defaultReadTableParamsNonLegacy() { return RepositoryTest.buildReadTableParamsNonLegacy().build(); } private static > ReadTableParams.ReadTableParamsBuilder buildReadTableParamsNonLegacy() { - return ReadTableParams.builder().useNewSpliterator(true); + return ReadTableParams.builder().useNewSpliterator2(true); } @Test diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index 95e4d535..f9876b4c 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -11,10 +11,12 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; +import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.proto.ValueProtos; import tech.ydb.table.Session; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.Params; +import tech.ydb.table.query.ReadTablePart; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.settings.BulkUpsertSettings; import tech.ydb.table.settings.CommitTxSettings; @@ -54,14 +56,23 @@ import tech.ydb.yoj.repository.ydb.exception.YdbRepositoryException; import tech.ydb.yoj.repository.ydb.merge.QueriesMerger; import tech.ydb.yoj.repository.ydb.readtable.ReadTableMapper; +import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator; +import tech.ydb.yoj.repository.ydb.spliterator.ResultSetIterator; +import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliterator; +import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueue; +import tech.ydb.yoj.repository.ydb.spliterator.YdbSpliteratorQueueWrapper; +import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbLegacySpliterator; +import tech.ydb.yoj.repository.ydb.spliterator.legacy.YdbNewLegacySpliterator; import tech.ydb.yoj.repository.ydb.statement.Statement; import tech.ydb.yoj.repository.ydb.table.YdbTable; import tech.ydb.yoj.util.lang.Interrupts; import java.time.Duration; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -78,7 +89,7 @@ public class YdbRepositoryTransaction private static final Logger log = LoggerFactory.getLogger(YdbRepositoryTransaction.class); private final List> pendingWrites = new ArrayList<>(); - private final List> spliterators = new ArrayList<>(); + private final List> spliterators = new ArrayList<>(); @Getter private final TxOptions options; @@ -102,8 +113,8 @@ public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) { this.cache = options.isFirstLevelCache() ? new RepositoryCacheImpl() : RepositoryCache.empty(); } - private YdbSpliterator createSpliterator(String request, boolean isOrdered) { - YdbSpliterator spliterator = new YdbSpliterator<>(request, isOrdered); + private YdbNewLegacySpliterator createSpliterator(String request, boolean isOrdered) { + YdbNewLegacySpliterator spliterator = new YdbNewLegacySpliterator<>(request, isOrdered); spliterators.add(spliterator); return spliterator; } @@ -153,7 +164,7 @@ private void doCommit() { private void closeStreams() { Exception summaryException = null; - for (YdbSpliterator spliterator : spliterators) { + for (ClosableSpliterator spliterator : spliterators) { try { spliterator.close(); } catch (Exception e) { @@ -387,7 +398,7 @@ public Stream executeScanQuery(Statement spliterator = createSpliterator("scanQuery: " + yql, false); + YdbNewLegacySpliterator spliterator = createSpliterator("scanQuery: " + yql, false); initSession(); session.executeScanQuery( @@ -489,7 +500,7 @@ public Stream readTable(ReadTableMapper } if (params.isUseNewSpliterator()) { - YdbSpliterator spliterator = createSpliterator("readTable: " + tableName, params.isOrdered()); + YdbNewLegacySpliterator spliterator = createSpliterator("readTable: " + tableName, params.isOrdered()); initSession(); session.readTable( @@ -500,6 +511,36 @@ public Stream readTable(ReadTableMapper return spliterator.createStream(); } + if (params.isUseNewSpliterator2()) { + initSession(); + + // TODO: configure stream timeout + // TODO: rename wrapper to something other + YdbSpliteratorQueue> queue = new YdbSpliteratorQueue<>(1, Duration.ofMinutes(5)); + YdbSpliteratorQueueWrapper> wrapper = new YdbSpliteratorQueueWrapper<>( + "readTable: " + tableName, queue + ); + + GrpcReadStream grpcStream = session.executeReadTable(tableName, settings.build()); + + CompletableFuture future = grpcStream.start(readTablePart -> { + ResultSetIterator iterator = new ResultSetIterator<>( + readTablePart.getResultSetReader(), + mapper::mapResult + ); + wrapper.onNext(iterator); + }); + future.whenComplete(wrapper::onSupplierThreadComplete); + + // TODO: do we have to close grpcStream?? + + YdbSpliterator spliterator = new YdbSpliterator<>(queue, params.isOrdered()); + + spliterators.add(spliterator); + + return spliterator.createStream(); + } + try { YdbLegacySpliterator spliterator = new YdbLegacySpliterator<>(params.isOrdered(), action -> doCall("read table " + mapper.getTableName(""), () -> { diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ClosableSpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ClosableSpliterator.java new file mode 100644 index 00000000..8a6ddec9 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ClosableSpliterator.java @@ -0,0 +1,7 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import java.util.Spliterator; + +public interface ClosableSpliterator extends Spliterator { + void close(); +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultConverter.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultConverter.java new file mode 100644 index 00000000..da988aed --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultConverter.java @@ -0,0 +1,10 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import tech.ydb.proto.ValueProtos; + +import java.util.List; + +@FunctionalInterface +public interface ResultConverter { + V convert(List columns, ValueProtos.Value value); +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultSetIterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultSetIterator.java new file mode 100644 index 00000000..60a0f0c5 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/ResultSetIterator.java @@ -0,0 +1,68 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import tech.ydb.proto.ValueProtos; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.yoj.repository.ydb.client.YdbConverter; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +public final class ResultSetIterator implements Iterator { + private final ResultSetReader resultSet; + private final ResultConverter converter; + private final List columns; + + private int position = 0; + + public ResultSetIterator(ResultSetReader resultSet, ResultConverter converter) { + List columns; + if (resultSet.getRowCount() > 0) { + resultSet.setRowIndex(0); + columns = getColumns(resultSet); + } else { + columns = new ArrayList<>(); + } + + this.resultSet = resultSet; + this.converter = converter; + this.columns = columns; + } + + @Override + public boolean hasNext() { + return position < resultSet.getRowCount(); + } + + @Override + public V next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + ValueProtos.Value value = buildValue(position++); + + return converter.convert(columns, value); + } + + private ValueProtos.Value buildValue(int rowIndex) { + resultSet.setRowIndex(rowIndex); + ValueProtos.Value.Builder value = ValueProtos.Value.newBuilder(); + for (int i = 0; i < columns.size(); i++) { + value.addItems(YdbConverter.convertValueToProto(resultSet.getColumn(i))); + } + return value.build(); + } + + private static List getColumns(ResultSetReader resultSet) { + List columns = new ArrayList<>(); + for (int i = 0; i < resultSet.getColumnCount(); i++) { + columns.add(ValueProtos.Column.newBuilder() + .setName(resultSet.getColumnName(i)) + .build() + ); + } + return columns; + } +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliterator.java new file mode 100644 index 00000000..5cefe51c --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliterator.java @@ -0,0 +1,77 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import tech.ydb.yoj.ExperimentalApi; + +import java.util.Iterator; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") +public final class YdbSpliterator implements ClosableSpliterator { + private final YdbSpliteratorQueue> queue; + private final int flags; + + private Iterator valueIterator; + + private boolean closed = false; + + public YdbSpliterator(YdbSpliteratorQueue> queue, boolean isOrdered) { + this.queue = queue; + this.flags = (isOrdered ? ORDERED : 0) | NONNULL; + } + + // Correct way to create stream with YdbSpliterator. onClose call is important for avoid supplier thread leak. + public Stream createStream() { + return StreamSupport.stream(this, false).onClose(this::close); + } + + @Override + public boolean tryAdvance(Consumer action) { + if (closed) { + return false; + } + + if (valueIterator == null || !valueIterator.hasNext()) { + valueIterator = queue.poll(); + if (valueIterator == null || !valueIterator.hasNext()) { + closed = true; + return false; + } + } + + V value = valueIterator.next(); + + action.accept(value); + + return true; + } + + @Override + public void close() { + closed = true; + queue.close(); + } + + @Override + public Spliterator trySplit() { + return null; + } + + @Override + public long estimateSize() { + return Long.MAX_VALUE; + } + + @Override + public long getExactSizeIfKnown() { + return -1; + } + + @Override + public int characteristics() { + return flags; + } + +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueue.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueue.java new file mode 100644 index 00000000..83772c96 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueue.java @@ -0,0 +1,194 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; +import tech.ydb.yoj.repository.db.exception.QueryInterruptedException; + +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") +public final class YdbSpliteratorQueue { + private static final Logger log = LoggerFactory.getLogger(YdbSpliteratorQueue.class); + + private static final SupplierStatus UNDONE_SUPPLIER_STATUS = () -> false; + + private final int maxQueueSize; + private final ArrayDeque queue; + private final long streamWorkDeadlineNanos; + + private final Lock lock = new ReentrantLock(); + private final Condition newElement = lock.newCondition(); + private final Condition queueIsNotFull = lock.newCondition(); + + private SupplierStatus supplierStatus = UNDONE_SUPPLIER_STATUS; + private boolean closed = false; + + public YdbSpliteratorQueue(int maxQueueSize, Duration streamWorkTimeout) { + Preconditions.checkArgument(maxQueueSize > 0, "maxQueueSize must be greater than 0"); + this.maxQueueSize = maxQueueSize; + this.queue = new ArrayDeque<>(maxQueueSize); + this.streamWorkDeadlineNanos = System.nanoTime() + TimeUnit.NANOSECONDS.toNanos(saturatedToNanos(streamWorkTimeout)); + } + + public boolean onNext(V value) { + Preconditions.checkState(supplierStatus.equals(UNDONE_SUPPLIER_STATUS), + "can't call onNext after supplierDone" + ); + + lock.lock(); + try { + if (!awaitFreeSpaceLocked()) { + return false; + } + + queue.add(value); + + newElement.signal(); + } finally { + lock.unlock(); + } + + return true; + } + + public boolean awaitFreeSpace() { + Preconditions.checkState(supplierStatus.equals(UNDONE_SUPPLIER_STATUS), + "can't call onNext after supplierDone" + ); + + lock.lock(); + try { + return awaitFreeSpaceLocked(); + } finally { + lock.unlock(); + } + } + + private boolean awaitFreeSpaceLocked() { + if (closed) { + return false; + } + + if (queue.size() != maxQueueSize) { + return true; + } + + try { + if (!queueIsNotFull.await(calculateTimeout(), TimeUnit.NANOSECONDS)) { + throw new OfferDeadlineExceededException(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new QueryInterruptedException("Supplier thread interrupted", e); + } + + return !closed; + } + + // (supplier thread) Send knowledge to stream when data is over. + public void supplierDone(SupplierStatus status) { + lock.lock(); + try { + if (closed) { + return; + } + + supplierStatus = status; + + newElement.signal(); + } finally { + lock.unlock(); + } + } + + public boolean isClosed() { + lock.lock(); + try { + return closed; + } finally { + lock.unlock(); + } + } + + public V poll() { + lock.lock(); + try { + if (closed) { + return null; + } + + if (queue.isEmpty()) { + if (supplierStatus.isDone()) { + return null; + } + + try { + if (!newElement.await(calculateTimeout(), TimeUnit.NANOSECONDS)) { + log.warn("Supplier thread was closed because consumer didn't poll an element of stream on timeout"); + throw new DeadlineExceededException("Stream deadline exceeded on poll"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new QueryInterruptedException("Consumer thread interrupted", e); + } + + if (closed || supplierStatus.isDone()) { + return null; + } + } + + V value = queue.pop(); + + queueIsNotFull.signal(); + + return value; + } finally { + lock.unlock(); + } + } + + public void close() { + lock.lock(); + try { + if (closed) { + return; + } + + closed = true; + + queueIsNotFull.signal(); + newElement.signalAll(); + } finally { + lock.unlock(); + } + } + + private long calculateTimeout() { + return TimeUnit.NANOSECONDS.toNanos(streamWorkDeadlineNanos - System.nanoTime()); + } + + public static final class OfferDeadlineExceededException extends RuntimeException { + } + + // copy-paste from com.google.common.util.concurrent.Uninterruptibles + private static long saturatedToNanos(Duration duration) { + try { + return duration.toNanos(); + } catch (ArithmeticException ignore) { + return duration.isNegative() ? -9223372036854775808L : 9223372036854775807L; + } + } + + @FunctionalInterface + public interface SupplierStatus { + boolean isDone(); + } +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueueWrapper.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueueWrapper.java new file mode 100644 index 00000000..bbf80359 --- /dev/null +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/YdbSpliteratorQueueWrapper.java @@ -0,0 +1,68 @@ +package tech.ydb.yoj.repository.ydb.spliterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.core.Status; +import tech.ydb.core.grpc.GrpcReadStream; +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.repository.ydb.YdbOperations; + +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; + +import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate; +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") +public final class YdbSpliteratorQueueWrapper implements GrpcReadStream.Observer { + private static final Logger log = LoggerFactory.getLogger(YdbSpliteratorQueueWrapper.class); + + private final String request; + private final YdbSpliteratorQueue queue; + + public YdbSpliteratorQueueWrapper(String request, YdbSpliteratorQueue queue) { + this.request = request; + this.queue = queue; + } + + public void onNext(V values) { + if (!queue.onNext(values)) { + // Need to abort supplier thread if stream is closed. onSupplierThreadComplete will exit immediately. + // ConsumerDoneException isn't handled because onSupplierThreadComplete will exit by this.closed. + throw ConsumerDoneException.INSTANCE; + } + + if (!queue.awaitFreeSpace()) { + throw ConsumerDoneException.INSTANCE; + } + } + + // (supplier thread) Send knowledge to stream when data is over. + public void onSupplierThreadComplete(Status status, Throwable ex) { + var error = unwrapException(ex); + if (queue.isClosed() || error instanceof YdbSpliteratorQueue.OfferDeadlineExceededException) { + log.error("Supplier thread was closed because consumer didn't poll an element of stream on timeout"); + // If deadline exceeded happen, need to do nothing. Stream thread will exit at deadline by themself. + return; + } + + queue.supplierDone(() -> { + if (error != null) { + throw YdbOperations.convertToRepositoryException(error); + } + + validate(request, status.getCode(), status.toString()); + + return true; + }); + } + + private static Throwable unwrapException(Throwable ex) { + if (ex instanceof CompletionException || ex instanceof ExecutionException) { + return ex.getCause(); + } + return ex; + } + + private static class ConsumerDoneException extends RuntimeException { + public final static ConsumerDoneException INSTANCE = new ConsumerDoneException(); + } +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbLegacySpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbLegacySpliterator.java similarity index 96% rename from repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbLegacySpliterator.java rename to repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbLegacySpliterator.java index 1f5f1686..d1b898dd 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbLegacySpliterator.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbLegacySpliterator.java @@ -1,4 +1,4 @@ -package tech.ydb.yoj.repository.ydb; +package tech.ydb.yoj.repository.ydb.spliterator.legacy; import java.util.Spliterator; import java.util.function.Consumer; diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliterator.java similarity index 87% rename from repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java rename to repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliterator.java index 82e35f9a..57c856a7 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbSpliterator.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliterator.java @@ -1,4 +1,4 @@ -package tech.ydb.yoj.repository.ydb; +package tech.ydb.yoj.repository.ydb.spliterator.legacy; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -7,6 +7,8 @@ import tech.ydb.yoj.ExperimentalApi; import tech.ydb.yoj.repository.db.exception.DeadlineExceededException; import tech.ydb.yoj.repository.db.exception.QueryInterruptedException; +import tech.ydb.yoj.repository.ydb.YdbOperations; +import tech.ydb.yoj.repository.ydb.spliterator.ClosableSpliterator; import javax.annotation.Nullable; import java.time.Duration; @@ -23,19 +25,10 @@ import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate; -/** - * {@code YdbSpliterator} used to read data from YDB streams. - * It's possible to supply values from different threads, but supplier threads must not call {@code onNext()} concurrently. - * This Spliterator should be explicitly closed by the {@code close()} method for finish work in YDB session; when the stream returned by - * {@code readTable()} is used inside a YOJ transaction, {@code close()} will be called automatically at transaction end (both commit and rollback). - *

To use the new implementation, set {@link tech.ydb.yoj.repository.db.readtable.ReadTableParams.ReadTableParamsBuilder#useNewSpliterator(boolean) - * ReadTableParams<...>.builder().<...>.useNewSpliterator(true)}. - *

Note that using the new implementation currently has a negative performance impact, for more information refer to - * GitHub Issue #42. - */ +@Deprecated // too slow @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") -public class YdbSpliterator implements Spliterator { - private static final Logger log = LoggerFactory.getLogger(YdbSpliterator.class); +public class YdbNewLegacySpliterator implements ClosableSpliterator { + private static final Logger log = LoggerFactory.getLogger(YdbNewLegacySpliterator.class); private static final Duration DEFAULT_STREAM_WORK_TIMEOUT = Duration.ofMinutes(5); @@ -54,12 +47,12 @@ public class YdbSpliterator implements Spliterator { private boolean endData = false; - public YdbSpliterator(String request, boolean isOrdered) { + public YdbNewLegacySpliterator(String request, boolean isOrdered) { this(request, isOrdered, DEFAULT_STREAM_WORK_TIMEOUT); } @VisibleForTesting - protected YdbSpliterator(String request, boolean isOrdered, Duration streamWorkTimeout) { + protected YdbNewLegacySpliterator(String request, boolean isOrdered, Duration streamWorkTimeout) { this.flags = (isOrdered ? ORDERED : 0) | NONNULL; this.streamWorkDeadlineNanos = System.nanoTime() + TimeUnit.NANOSECONDS.toNanos(saturatedToNanos(streamWorkTimeout)); this.validateResponse = (status, error) -> { @@ -155,6 +148,7 @@ public boolean tryAdvance(Consumer action) { } // (stream thread) close spliterator and abort supplier thread + @Override public void close() { // close() can be called twice by stream.close() and in the end of transaction if (closed) { diff --git a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbSpliteratorTest.java b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliteratorTest.java similarity index 92% rename from repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbSpliteratorTest.java rename to repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliteratorTest.java index 97f1b3fd..ae9a1bd5 100644 --- a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbSpliteratorTest.java +++ b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/spliterator/legacy/YdbNewLegacySpliteratorTest.java @@ -1,4 +1,4 @@ -package tech.ydb.yoj.repository.ydb; +package tech.ydb.yoj.repository.ydb.spliterator.legacy; import com.google.common.util.concurrent.Runnables; import com.google.common.util.concurrent.Uninterruptibles; @@ -19,7 +19,7 @@ import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -public class YdbSpliteratorTest { +public class YdbNewLegacySpliteratorTest { @SneakyThrows public static void doAfter(int millis, Runnable runnable) { Thread.sleep(millis); @@ -88,7 +88,7 @@ private static class ReadTableMock { private final AtomicInteger selectedValuesCount = new AtomicInteger(); - private final YdbSpliterator spliterator; + private final YdbNewLegacySpliterator spliterator; private final List bucketSizes = new ArrayList<>(); @@ -96,7 +96,7 @@ private static class ReadTableMock { private Status status = Status.SUCCESS; private Throwable exception = null; - private ReadTableMock(YdbSpliterator spliterator) { + private ReadTableMock(YdbNewLegacySpliterator spliterator) { this.spliterator = spliterator; } @@ -105,7 +105,7 @@ public static ReadTableMock start() { } public static ReadTableMock start(Duration timeout) { - YdbSpliterator spliterator = new YdbSpliterator<>("stream", false, timeout); + YdbNewLegacySpliterator spliterator = new YdbNewLegacySpliterator<>("stream", false, timeout); ReadTableMock mock = new ReadTableMock(spliterator); mock.run(); @@ -200,7 +200,7 @@ public void closeSupplierThreadWhenCloseOfLimitedStreamWasForgotten() { @Test @SneakyThrows public void endStreamWhenSupplerOfferValue() { - YdbSpliterator spliterator = new YdbSpliterator<>("stream", false, Duration.ofMillis(500)); + YdbNewLegacySpliterator spliterator = new YdbNewLegacySpliterator<>("stream", false, Duration.ofMillis(500)); spliterator.onNext(1); @@ -209,7 +209,7 @@ public void endStreamWhenSupplerOfferValue() { thread.start(); spliterator.onNext(2); - assertThatExceptionOfType(YdbSpliterator.ConsumerDoneException.class).isThrownBy(() -> + assertThatExceptionOfType(YdbNewLegacySpliterator.ConsumerDoneException.class).isThrownBy(() -> spliterator.onNext(3) ); diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/readtable/ReadTableParams.java b/repository/src/main/java/tech/ydb/yoj/repository/db/readtable/ReadTableParams.java index f1cb203d..c7871b6a 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/readtable/ReadTableParams.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/readtable/ReadTableParams.java @@ -27,6 +27,15 @@ public class ReadTableParams { @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") boolean useNewSpliterator; + /** + * Set this to {@code true} to use a {@code Spliterator} contract-conformant and less memory consuming implementation for the {@code Stream} + * returned by {@code readTable()}. + *

Note that using the new implementation currently has a negative performance impact, for more information refer to + * GitHub Issue #42. + */ + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/42") + boolean useNewSpliterator2; + int batchLimitBytes; int batchLimitRows;