Skip to content

Commit

Permalink
Reduce response chunk arrival timeout (Consensys#8962)
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored Jan 7, 2025
1 parent 6e8bc88 commit b0b414d
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
public class Eth2IncomingRequestHandler<
TRequest extends RpcRequest & SszData, TResponse extends SszData>
implements RpcRequestHandler {

private static final Logger LOG = LogManager.getLogger();

private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10);

private final PeerLookup peerLookup;
Expand Down Expand Up @@ -119,17 +121,17 @@ private void handleRequest(

private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) {
asyncRunner
.getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT)
.thenAccept(
(__) -> {
.runAfterDelay(
() -> {
if (!requestHandled.get()) {
LOG.debug(
"Failed to receive incoming request data within {} sec for protocol {}. Close stream.",
RECEIVE_INCOMING_REQUEST_TIMEOUT.toSeconds(),
protocolId);
stream.closeAbruptly().ifExceptionGetsHereRaiseABug();
}
})
},
RECEIVE_INCOMING_REQUEST_TIMEOUT)
.ifExceptionGetsHereRaiseABug();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ enum State {
private static final Logger LOG = LogManager.getLogger();

@VisibleForTesting static final Duration READ_COMPLETE_TIMEOUT = Duration.ofSeconds(10);
@VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(30);
@VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(10);

private final AsyncRunner asyncRunner;
private final int maximumResponseChunks;
Expand Down Expand Up @@ -116,7 +116,7 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By
throw new RpcException.ExtraDataAppendedException(" extra data: " + bufToString(data));
}

List<TResponse> maybeResponses = responseDecoder.decodeNextResponses(data);
final List<TResponse> maybeResponses = responseDecoder.decodeNextResponses(data);
final int chunksReceived = currentChunkCount.addAndGet(maybeResponses.size());

if (chunksReceived > maximumResponseChunks) {
Expand Down Expand Up @@ -161,8 +161,8 @@ private String bufToString(final ByteBuf buf) {
final int contentSize = Integer.min(buf.readableBytes(), 1024);
String bufContent = "";
if (contentSize > 0) {
ByteBuf bufSlice = buf.slice(0, contentSize);
byte[] bytes = new byte[bufSlice.readableBytes()];
final ByteBuf bufSlice = buf.slice(0, contentSize);
final byte[] bytes = new byte[bufSlice.readableBytes()];
bufSlice.getBytes(0, bytes);
bufContent += Bytes.wrap(bytes);
if (contentSize < buf.readableBytes()) {
Expand Down Expand Up @@ -255,32 +255,32 @@ private void ensureNextResponseChunkArrivesInTime(
final int previousResponseCount,
final AtomicInteger currentResponseCount) {
timeoutRunner
.getDelayedFuture(RESPONSE_CHUNK_ARRIVAL_TIMEOUT)
.thenAccept(
(__) -> {
.runAfterDelay(
() -> {
if (previousResponseCount == currentResponseCount.get()) {
abortRequest(
stream,
new RpcTimeoutException(
"Timed out waiting for response chunk " + previousResponseCount,
RESPONSE_CHUNK_ARRIVAL_TIMEOUT));
}
})
},
RESPONSE_CHUNK_ARRIVAL_TIMEOUT)
.ifExceptionGetsHereRaiseABug();
}

private void ensureReadCompleteArrivesInTime(final RpcStream stream) {
timeoutRunner
.getDelayedFuture(READ_COMPLETE_TIMEOUT)
.thenAccept(
(__) -> {
.runAfterDelay(
() -> {
if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) {
abortRequest(
stream,
new RpcTimeoutException(
"Timed out waiting for read channel close", READ_COMPLETE_TIMEOUT));
}
})
},
READ_COMPLETE_TIMEOUT)
.ifExceptionGetsHereRaiseABug();
}

Expand Down

0 comments on commit b0b414d

Please sign in to comment.