diff --git a/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java b/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java index d63e1a397..a466d50d1 100644 --- a/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java +++ b/topic/src/main/java/tech/ydb/topic/write/SyncWriter.java @@ -1,5 +1,6 @@ package tech.ydb.topic.write; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -27,8 +28,9 @@ public interface SyncWriter { * Send message. Blocks infinitely until the message is put into sending buffer. * @param message message data to write * @param settings send settings + * @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement */ - void send(Message message, SendSettings settings); + CompletableFuture send(Message message, SendSettings settings); /** * Send message. Blocks until the message is put into sending buffer. @@ -38,16 +40,18 @@ public interface SyncWriter { * @param settings send settings * @param timeout timeout to wait until message is punt into sending buffer * @param unit {@link TimeUnit} for timeout + * @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement */ - void send(Message message, SendSettings settings, long timeout, TimeUnit unit) + CompletableFuture send(Message message, SendSettings settings, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; /** * Send message. Blocks infinitely until the message is put into sending buffer. * @param message message data to write + * @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement */ - default void send(Message message) { - send(message, SendSettings.newBuilder().build()); + default CompletableFuture send(Message message) { + return send(message, SendSettings.newBuilder().build()); } /** @@ -57,10 +61,11 @@ default void send(Message message) { * @param message message data to write * @param timeout timeout to wait until message is punt into sending buffer * @param unit {@link TimeUnit} for timeout + * @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement */ - default void send(Message message, long timeout, TimeUnit unit) + default CompletableFuture send(Message message, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - send(message, SendSettings.newBuilder().build(), timeout, unit); + return send(message, SendSettings.newBuilder().build(), timeout, unit); } /** diff --git a/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java b/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java index 31de1ca2b..077ac186d 100644 --- a/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java +++ b/topic/src/main/java/tech/ydb/topic/write/impl/SyncWriterImpl.java @@ -1,5 +1,6 @@ package tech.ydb.topic.write.impl; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -11,6 +12,7 @@ import tech.ydb.topic.write.InitResult; import tech.ydb.topic.write.Message; import tech.ydb.topic.write.SyncWriter; +import tech.ydb.topic.write.WriteAck; /** * @author Nikolay Perfilov @@ -33,14 +35,14 @@ public InitResult initAndWait() { } @Override - public void send(Message message, SendSettings sendSettings) { - sendImpl(message, sendSettings, false).join(); + public CompletableFuture send(Message message, SendSettings sendSettings) { + return sendImpl(message, sendSettings, false).join(); } @Override - public void send(Message message, SendSettings sendSettings, long timeout, TimeUnit unit) + public CompletableFuture send(Message message, SendSettings sendSettings, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - sendImpl(message, sendSettings, false).get(timeout, unit); + return sendImpl(message, sendSettings, false).get(timeout, unit); } @Override