Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolves #280 Add WriteAck as a result of SyncWriter operations #281

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions topic/src/main/java/tech/ydb/topic/write/SyncWriter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.topic.write;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -27,8 +28,9 @@ public interface SyncWriter {
* Send message. Blocks infinitely until the message is put into sending buffer.
* @param message message data to write
* @param settings send settings
* @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement
*/
void send(Message message, SendSettings settings);
CompletableFuture<WriteAck> send(Message message, SendSettings settings);

/**
* Send message. Blocks until the message is put into sending buffer.
Expand All @@ -38,16 +40,18 @@ public interface SyncWriter {
* @param settings send settings
* @param timeout timeout to wait until message is punt into sending buffer
* @param unit {@link TimeUnit} for timeout
* @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement
*/
void send(Message message, SendSettings settings, long timeout, TimeUnit unit)
CompletableFuture<WriteAck> send(Message message, SendSettings settings, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

/**
* Send message. Blocks infinitely until the message is put into sending buffer.
* @param message message data to write
* @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement
*/
default void send(Message message) {
send(message, SendSettings.newBuilder().build());
default CompletableFuture<WriteAck> send(Message message) {
return send(message, SendSettings.newBuilder().build());
}

/**
Expand All @@ -57,10 +61,11 @@ default void send(Message message) {
* @param message message data to write
* @param timeout timeout to wait until message is punt into sending buffer
* @param unit {@link TimeUnit} for timeout
* @return {@link CompletableFuture} with {@link WriteAck} for write acknowledgement
*/
default void send(Message message, long timeout, TimeUnit unit)
default CompletableFuture<WriteAck> send(Message message, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
send(message, SendSettings.newBuilder().build(), timeout, unit);
return send(message, SendSettings.newBuilder().build(), timeout, unit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.topic.write.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -11,6 +12,7 @@
import tech.ydb.topic.write.InitResult;
import tech.ydb.topic.write.Message;
import tech.ydb.topic.write.SyncWriter;
import tech.ydb.topic.write.WriteAck;

/**
* @author Nikolay Perfilov
Expand All @@ -33,14 +35,14 @@ public InitResult initAndWait() {
}

@Override
public void send(Message message, SendSettings sendSettings) {
sendImpl(message, sendSettings, false).join();
public CompletableFuture<WriteAck> send(Message message, SendSettings sendSettings) {
return sendImpl(message, sendSettings, false).join();
}

@Override
public void send(Message message, SendSettings sendSettings, long timeout, TimeUnit unit)
public CompletableFuture<WriteAck> send(Message message, SendSettings sendSettings, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
sendImpl(message, sendSettings, false).get(timeout, unit);
return sendImpl(message, sendSettings, false).get(timeout, unit);
}

@Override
Expand Down
Loading