diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java index b6d98b0b341..a04d7e8c983 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/SyncSourceFactory.java @@ -44,15 +44,20 @@ public SyncSourceFactory( public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) { // Limit request rate to just a little under what we'd accept final int maxBlocksPerMinute = this.maxBlocksPerMinute - batchSize - 1; + final Optional maybeMaxBlobsPerBlock = spec.getMaxBlobsPerBlockForHighestMilestone(); final Optional maxBlobSidecarsPerMinute = - spec.getMaxBlobsPerBlockForHighestMilestone() - .map(maxBlobsPerBlock -> maxBlocksPerMinute * maxBlobsPerBlock); + maybeMaxBlobsPerBlock.map(maxBlobsPerBlock -> maxBlocksPerMinute * maxBlobsPerBlock); return syncSourcesByPeer.computeIfAbsent( peer, source -> new ThrottlingSyncSource( - asyncRunner, timeProvider, source, maxBlocksPerMinute, maxBlobSidecarsPerMinute)); + asyncRunner, + timeProvider, + source, + maxBlocksPerMinute, + maybeMaxBlobsPerBlock, + maxBlobSidecarsPerMinute)); } public void onPeerDisconnected(final Eth2Peer peer) { diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java index b8be60d4fb8..54036c6d097 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.beacon.sync.forward.multipeer.chains; +import com.google.common.annotations.VisibleForTesting; import java.time.Duration; import java.util.Optional; import org.apache.logging.log4j.LogManager; @@ -31,12 +32,16 @@ public class ThrottlingSyncSource implements SyncSource { private static final Logger LOG = LogManager.getLogger(); + + // 1 minute private static final long TIME_OUT = 60; public static final Duration PEER_REQUEST_DELAY = Duration.ofSeconds(3); + private final AsyncRunner asyncRunner; private final SyncSource delegate; private final RateTracker blocksRateTracker; + private final Optional maybeMaxBlobsPerBlock; private final RateTracker blobSidecarsRateTracker; public ThrottlingSyncSource( @@ -44,10 +49,12 @@ public ThrottlingSyncSource( final TimeProvider timeProvider, final SyncSource delegate, final int maxBlocksPerMinute, + final Optional maybeMaxBlobsPerBlock, final Optional maybeMaxBlobSidecarsPerMinute) { this.asyncRunner = asyncRunner; this.delegate = delegate; this.blocksRateTracker = RateTracker.create(maxBlocksPerMinute, TIME_OUT, timeProvider); + this.maybeMaxBlobsPerBlock = maybeMaxBlobsPerBlock; this.blobSidecarsRateTracker = maybeMaxBlobSidecarsPerMinute .map( @@ -61,33 +68,64 @@ public SafeFuture requestBlocksByRange( final UInt64 startSlot, final UInt64 count, final RpcResponseListener listener) { - if (blocksRateTracker.approveObjectsRequest(count.longValue()).isPresent()) { - LOG.debug("Sending request for {} blocks", count); - return delegate.requestBlocksByRange(startSlot, count, listener); - } else { - LOG.debug( - "Rate limiting request for {} blocks. Retry in {} seconds", - count, - PEER_REQUEST_DELAY.toSeconds()); - return asyncRunner.runAfterDelay( - () -> requestBlocksByRange(startSlot, count, listener), PEER_REQUEST_DELAY); - } + final RpcResponseListenerWithCount listenerWithCount = + new RpcResponseListenerWithCount<>(listener); + return blocksRateTracker + .approveObjectsRequest(count.longValue()) + .map( + requestApproval -> { + LOG.debug("Sending request for {} blocks", count); + return delegate + .requestBlocksByRange(startSlot, count, listenerWithCount) + .alwaysRun( + () -> + // adjust for slots with empty blocks + blocksRateTracker.adjustObjectsRequest( + requestApproval, listenerWithCount.count)); + }) + .orElseGet( + () -> { + LOG.debug( + "Rate limiting request for {} blocks. Retry in {} seconds", + count, + PEER_REQUEST_DELAY.toSeconds()); + return asyncRunner.runAfterDelay( + () -> requestBlocksByRange(startSlot, count, listener), PEER_REQUEST_DELAY); + }); } @Override public SafeFuture requestBlobSidecarsByRange( final UInt64 startSlot, final UInt64 count, final RpcResponseListener listener) { - if (blobSidecarsRateTracker.approveObjectsRequest(count.longValue()).isPresent()) { - LOG.debug("Sending request for {} blob sidecars", count); - return delegate.requestBlobSidecarsByRange(startSlot, count, listener); - } else { - LOG.debug( - "Rate limiting request for {} blob sidecars. Retry in {} seconds", - count, - PEER_REQUEST_DELAY.toSeconds()); - return asyncRunner.runAfterDelay( - () -> requestBlobSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY); - } + final RpcResponseListenerWithCount listenerWithCount = + new RpcResponseListenerWithCount<>(listener); + long blobSidecarsCount = + maybeMaxBlobsPerBlock + .map(maxBlobsPerBlock -> maxBlobsPerBlock * count.longValue()) + .orElse(0L); + return blobSidecarsRateTracker + .approveObjectsRequest(blobSidecarsCount) + .map( + requestApproval -> { + LOG.debug("Sending request for ~ {} blob sidecars", blobSidecarsCount); + return delegate + .requestBlobSidecarsByRange(startSlot, count, listenerWithCount) + .alwaysRun( + () -> + // adjust for slots with empty blocks and slots with blobs < + // maxBlobsPerBlock + blobSidecarsRateTracker.adjustObjectsRequest( + requestApproval, listenerWithCount.count)); + }) + .orElseGet( + () -> { + LOG.debug( + "Rate limiting request for ~ {} blob sidecars. Retry in {} seconds", + blobSidecarsCount, + PEER_REQUEST_DELAY.toSeconds()); + return asyncRunner.runAfterDelay( + () -> requestBlobSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY); + }); } @Override @@ -104,4 +142,22 @@ public void adjustReputation(final ReputationAdjustment adjustment) { public String toString() { return delegate.toString(); } + + @VisibleForTesting + static class RpcResponseListenerWithCount implements RpcResponseListener { + + private int count = 0; + + private final RpcResponseListener delegate; + + private RpcResponseListenerWithCount(final RpcResponseListener delegate) { + this.delegate = delegate; + } + + @Override + public SafeFuture onResponse(final T response) { + count++; + return delegate.onResponse(response); + } + } } diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSourceTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSourceTest.java index 942c96a66c6..7456e181be8 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSourceTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSourceTest.java @@ -14,6 +14,8 @@ package tech.pegasys.teku.beacon.sync.forward.multipeer.chains; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -21,7 +23,9 @@ import static tech.pegasys.teku.infrastructure.async.FutureUtil.ignoreFuture; import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.ThrottlingSyncSource.RpcResponseListenerWithCount; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; import tech.pegasys.teku.infrastructure.time.StubTimeProvider; @@ -32,19 +36,20 @@ import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +@SuppressWarnings("unchecked") class ThrottlingSyncSourceTest { private static final int MAX_BLOCKS_PER_MINUTE = 100; + private static final int MAX_BLOBS_PER_BLOCK = 6; private static final int MAX_BLOB_SIDECARS_PER_MINUTE = 100; + private final StubAsyncRunner asyncRunner = new StubAsyncRunner(); private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0); private final SyncSource delegate = mock(SyncSource.class); - @SuppressWarnings("unchecked") private final RpcResponseListener blocksListener = mock(RpcResponseListener.class); - @SuppressWarnings("unchecked") private final RpcResponseListener blobSidecarsListener = mock(RpcResponseListener.class); @@ -54,8 +59,34 @@ class ThrottlingSyncSourceTest { timeProvider, delegate, MAX_BLOCKS_PER_MINUTE, + Optional.of(MAX_BLOBS_PER_BLOCK), Optional.of(MAX_BLOB_SIDECARS_PER_MINUTE)); + @BeforeEach + void setup() { + // simulate complete RPC responses + when(delegate.requestBlocksByRange(any(), any(), any())) + .thenAnswer( + invocationOnMock -> { + final UInt64 count = invocationOnMock.getArgument(1); + final RpcResponseListenerWithCount listener = + invocationOnMock.getArgument(2); + UInt64.range(UInt64.ZERO, count) + .forEach(__ -> listener.onResponse(mock(SignedBeaconBlock.class))); + return SafeFuture.COMPLETE; + }); + when(delegate.requestBlobSidecarsByRange(any(), any(), any())) + .thenAnswer( + invocationOnMock -> { + final UInt64 count = invocationOnMock.getArgument(1); + final RpcResponseListenerWithCount listener = + invocationOnMock.getArgument(2); + UInt64.range(UInt64.ZERO, count.times(MAX_BLOBS_PER_BLOCK)) + .forEach(__ -> listener.onResponse(mock(BlobSidecar.class))); + return SafeFuture.COMPLETE; + }); + } + @Test void shouldDelegateDisconnectImmediately() { final SafeFuture result = new SafeFuture<>(); @@ -75,23 +106,33 @@ void shouldRequestBlocksImmediatelyIfRateLimitNotExceeded() { ignoreFuture(source.requestBlocksByRange(UInt64.valueOf(100), count, blocksListener)); // Both requests happen immediately - ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.ZERO, count, blocksListener)); - ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.valueOf(100), count, blocksListener)); + ignoreFuture( + verify(delegate) + .requestBlocksByRange( + eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class))); + ignoreFuture( + verify(delegate) + .requestBlocksByRange( + eq(UInt64.valueOf(100)), eq(count), any(RpcResponseListenerWithCount.class))); } @Test void shouldRequestBlobSidecarsImmediatelyIfRateLimitNotExceeded() { - final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE - 1); + // 100 / 6 = 16, 16 * 6 = 92 < 100 + final UInt64 count = UInt64.valueOf(16); ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); ignoreFuture( - source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + source.requestBlobSidecarsByRange(UInt64.valueOf(16), count, blobSidecarsListener)); // Both requests happen immediately ignoreFuture( - verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + verify(delegate) + .requestBlobSidecarsByRange( + eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class))); ignoreFuture( verify(delegate) - .requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + .requestBlobSidecarsByRange( + eq(UInt64.valueOf(16)), eq(count), any(RpcResponseListenerWithCount.class))); } @Test @@ -101,24 +142,33 @@ void shouldDelayRequestIfBlockLimitAlreadyExceeded() { ignoreFuture(source.requestBlocksByRange(UInt64.valueOf(100), count, blocksListener)); // Both requests happen immediately - ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.ZERO, count, blocksListener)); + ignoreFuture( + verify(delegate) + .requestBlocksByRange( + eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class))); verifyNoMoreInteractions(delegate); timeProvider.advanceTimeBySeconds(61); asyncRunner.executeQueuedActions(); - ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.valueOf(100), count, blocksListener)); + ignoreFuture( + verify(delegate) + .requestBlocksByRange( + eq(UInt64.valueOf(100)), eq(count), any(RpcResponseListenerWithCount.class))); } @Test void shouldDelayRequestIfBlobSidecarsLimitAlreadyExceeded() { - final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE); + // 17 * 6 = 102 > 100 + final UInt64 count = UInt64.valueOf(17); ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); ignoreFuture( - source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + source.requestBlobSidecarsByRange(UInt64.valueOf(17), count, blobSidecarsListener)); ignoreFuture( - verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + verify(delegate) + .requestBlobSidecarsByRange( + eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class))); verifyNoMoreInteractions(delegate); timeProvider.advanceTimeBySeconds(61); @@ -126,7 +176,8 @@ void shouldDelayRequestIfBlobSidecarsLimitAlreadyExceeded() { ignoreFuture( verify(delegate) - .requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + .requestBlobSidecarsByRange( + eq(UInt64.valueOf(17)), eq(count), any(RpcResponseListenerWithCount.class))); } @Test @@ -136,7 +187,10 @@ void shouldContinueDelayingBlocksRequestIfRequestStillExceeded() { ignoreFuture(source.requestBlocksByRange(UInt64.valueOf(100), count, blocksListener)); // Both requests happen immediately - ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.ZERO, count, blocksListener)); + ignoreFuture( + verify(delegate) + .requestBlocksByRange( + eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class))); verifyNoMoreInteractions(delegate); timeProvider.advanceTimeBySeconds(30); @@ -145,19 +199,25 @@ void shouldContinueDelayingBlocksRequestIfRequestStillExceeded() { timeProvider.advanceTimeBySeconds(31); asyncRunner.executeQueuedActions(); - ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.valueOf(100), count, blocksListener)); + ignoreFuture( + verify(delegate) + .requestBlocksByRange( + eq(UInt64.valueOf(100)), eq(count), any(RpcResponseListenerWithCount.class))); } @Test void shouldContinueDelayingBlobSidecarsRequestIfRequestStillExceeded() { - final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE); + // 17 * 6 = 102 > 100 + final UInt64 count = UInt64.valueOf(17); ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); ignoreFuture( - source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + source.requestBlobSidecarsByRange(UInt64.valueOf(17), count, blobSidecarsListener)); // Both requests happen immediately ignoreFuture( - verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener)); + verify(delegate) + .requestBlobSidecarsByRange( + eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class))); verifyNoMoreInteractions(delegate); timeProvider.advanceTimeBySeconds(30); @@ -168,6 +228,7 @@ void shouldContinueDelayingBlobSidecarsRequestIfRequestStillExceeded() { asyncRunner.executeQueuedActions(); ignoreFuture( verify(delegate) - .requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener)); + .requestBlobSidecarsByRange( + eq(UInt64.valueOf(17)), eq(count), any(RpcResponseListenerWithCount.class))); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/AsyncResponseProcessor.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/AsyncResponseProcessor.java index 7edd7229472..b732f82308c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/AsyncResponseProcessor.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/AsyncResponseProcessor.java @@ -22,6 +22,7 @@ import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.eth2.rpc.core.RpcException.AdditionalDataReceivedException; +import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler; class AsyncResponseProcessor { private static final Logger LOG = LogManager.getLogger(); @@ -35,15 +36,15 @@ class AsyncResponseProcessor { private final SafeFuture finishedProcessing = new SafeFuture<>(); private final AsyncRunner asyncRunner; - private final ResponseStream responseStream; + private final RpcResponseHandler responseHandler; private final AsyncProcessingErrorHandler onError; public AsyncResponseProcessor( final AsyncRunner asyncRunner, - final ResponseStream responseStream, + final RpcResponseHandler responseHandler, final AsyncProcessingErrorHandler onError) { this.asyncRunner = asyncRunner; - this.responseStream = responseStream; + this.responseHandler = responseHandler; this.onError = onError; } @@ -101,8 +102,8 @@ private void processNextResponse() { asyncRunner .runAsync( () -> { - LOG.trace("Send response to response stream: {}", response); - return responseStream.respond(response); + LOG.trace("Send response to response handler: {}", response); + return responseHandler.onResponse(response); }) .exceptionally( (err) -> { diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java index be3e39cd95b..2740675a856 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java @@ -60,7 +60,6 @@ enum State { private final AsyncRunner asyncRunner; private final int maximumResponseChunks; private final Eth2RpcResponseHandler responseHandler; - private final ResponseStream responseStream; private final AsyncRunner timeoutRunner; private final AtomicInteger currentChunkCount = new AtomicInteger(0); @@ -84,7 +83,6 @@ public Eth2OutgoingRequestHandler( this.timeoutRunner = timeoutRunner; this.maximumResponseChunks = request.getMaximumResponseChunks(); this.responseHandler = responseHandler; - responseStream = new ResponseStream<>(responseHandler); this.responseDecoder = responseDecoder; this.shouldReceiveResponse = shouldReceiveResponse; this.protocolId = protocolId; @@ -153,7 +151,7 @@ private AsyncResponseProcessor getResponseProcessor(final RpcStream r () -> new AsyncResponseProcessor<>( asyncRunner, - responseStream, + responseHandler, throwable -> abortRequest(rpcStream, throwable)))); } @@ -213,11 +211,11 @@ private void completeRequest(final RpcStream rpcStream) { .thenAccept( (__) -> { try { - responseStream.completeSuccessfully(); + responseHandler.onCompleted(); LOG.trace("Complete request"); } catch (final Throwable t) { LOG.error("Encountered error while completing outgoing request", t); - responseStream.completeWithError(t); + responseHandler.onCompleted(t); } }) .exceptionally( @@ -246,7 +244,7 @@ private void abortRequest(final RpcStream rpcStream, final Throwable error, fina } finally { getResponseProcessor(rpcStream) .finishProcessing() - .always(() -> responseStream.completeWithError(error)); + .always(() -> responseHandler.onCompleted(error)); } } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/ResponseStream.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/ResponseStream.java deleted file mode 100644 index 76124f62454..00000000000 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/ResponseStream.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright Consensys Software Inc., 2022 - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.teku.networking.eth2.rpc.core; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.concurrent.atomic.AtomicInteger; -import tech.pegasys.teku.infrastructure.async.SafeFuture; -import tech.pegasys.teku.networking.p2p.rpc.RpcResponseHandler; - -class ResponseStream { - - private final RpcResponseHandler responseHandler; - private final AtomicInteger receivedResponseCount = new AtomicInteger(0); - - public ResponseStream(final RpcResponseHandler responseHandler) { - checkNotNull(responseHandler); - this.responseHandler = responseHandler; - } - - public SafeFuture respond(final O data) { - receivedResponseCount.incrementAndGet(); - return responseHandler.onResponse(data); - } - - public int getResponseChunkCount() { - return receivedResponseCount.get(); - } - - public void completeSuccessfully() { - responseHandler.onCompleted(); - } - - public void completeWithError(final Throwable error) { - responseHandler.onCompleted(error); - } -} diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/AsyncResponseProcessorTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/AsyncResponseProcessorTest.java index ad7679d531a..2b953b0b397 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/AsyncResponseProcessorTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/rpc/core/AsyncResponseProcessorTest.java @@ -39,11 +39,10 @@ public class AsyncResponseProcessorTest { final Eth2RpcResponseHandler responseHandler = Eth2RpcResponseHandler.expectMultipleResponses( RpcResponseListener.from(res -> requestProcessor.get().accept(res))); - private final ResponseStream responseStream = new ResponseStream<>(responseHandler); private final StubAsyncRunner asyncRunner = new StubAsyncRunner(); private final AsyncResponseProcessor asyncResponseProcessor = - new AsyncResponseProcessor<>(asyncRunner, responseStream, errorConsumer); + new AsyncResponseProcessor<>(asyncRunner, responseHandler, errorConsumer); @Test public void processMultipleResponsesSuccessfully() throws Exception {