Skip to content

Commit

Permalink
Fix ThrottlingSyncSource
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Jan 11, 2025
1 parent 89caf92 commit 6670900
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,20 @@ public SyncSourceFactory(
public SyncSource getOrCreateSyncSource(final Eth2Peer peer, final Spec spec) {
// Limit request rate to just a little under what we'd accept
final int maxBlocksPerMinute = this.maxBlocksPerMinute - batchSize - 1;
final Optional<Integer> maybeMaxBlobsPerBlock = spec.getMaxBlobsPerBlockForHighestMilestone();
final Optional<Integer> maxBlobSidecarsPerMinute =
spec.getMaxBlobsPerBlockForHighestMilestone()
.map(maxBlobsPerBlock -> maxBlocksPerMinute * maxBlobsPerBlock);
maybeMaxBlobsPerBlock.map(maxBlobsPerBlock -> maxBlocksPerMinute * maxBlobsPerBlock);

return syncSourcesByPeer.computeIfAbsent(
peer,
source ->
new ThrottlingSyncSource(
asyncRunner, timeProvider, source, maxBlocksPerMinute, maxBlobSidecarsPerMinute));
asyncRunner,
timeProvider,
source,
maxBlocksPerMinute,
maybeMaxBlobsPerBlock,
maxBlobSidecarsPerMinute));
}

public void onPeerDisconnected(final Eth2Peer peer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.beacon.sync.forward.multipeer.chains;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
Expand All @@ -31,23 +32,29 @@

public class ThrottlingSyncSource implements SyncSource {
private static final Logger LOG = LogManager.getLogger();

// 1 minute
private static final long TIME_OUT = 60;
public static final Duration PEER_REQUEST_DELAY = Duration.ofSeconds(3);

private final AsyncRunner asyncRunner;
private final SyncSource delegate;

private final RateTracker blocksRateTracker;
private final Optional<Integer> maybeMaxBlobsPerBlock;
private final RateTracker blobSidecarsRateTracker;

public ThrottlingSyncSource(
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final SyncSource delegate,
final int maxBlocksPerMinute,
final Optional<Integer> maybeMaxBlobsPerBlock,
final Optional<Integer> maybeMaxBlobSidecarsPerMinute) {
this.asyncRunner = asyncRunner;
this.delegate = delegate;
this.blocksRateTracker = RateTracker.create(maxBlocksPerMinute, TIME_OUT, timeProvider);
this.maybeMaxBlobsPerBlock = maybeMaxBlobsPerBlock;
this.blobSidecarsRateTracker =
maybeMaxBlobSidecarsPerMinute
.map(
Expand All @@ -61,33 +68,64 @@ public SafeFuture<Void> requestBlocksByRange(
final UInt64 startSlot,
final UInt64 count,
final RpcResponseListener<SignedBeaconBlock> listener) {
if (blocksRateTracker.approveObjectsRequest(count.longValue()).isPresent()) {
LOG.debug("Sending request for {} blocks", count);
return delegate.requestBlocksByRange(startSlot, count, listener);
} else {
LOG.debug(
"Rate limiting request for {} blocks. Retry in {} seconds",
count,
PEER_REQUEST_DELAY.toSeconds());
return asyncRunner.runAfterDelay(
() -> requestBlocksByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
}
final RpcResponseListenerWithCount<SignedBeaconBlock> listenerWithCount =
new RpcResponseListenerWithCount<>(listener);
return blocksRateTracker
.approveObjectsRequest(count.longValue())
.map(
requestApproval -> {
LOG.debug("Sending request for {} blocks", count);
return delegate
.requestBlocksByRange(startSlot, count, listenerWithCount)
.alwaysRun(
() ->
// adjust for slots with empty blocks
blocksRateTracker.adjustObjectsRequest(
requestApproval, listenerWithCount.count));
})
.orElseGet(
() -> {
LOG.debug(
"Rate limiting request for {} blocks. Retry in {} seconds",
count,
PEER_REQUEST_DELAY.toSeconds());
return asyncRunner.runAfterDelay(
() -> requestBlocksByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
});
}

@Override
public SafeFuture<Void> requestBlobSidecarsByRange(
final UInt64 startSlot, final UInt64 count, final RpcResponseListener<BlobSidecar> listener) {
if (blobSidecarsRateTracker.approveObjectsRequest(count.longValue()).isPresent()) {
LOG.debug("Sending request for {} blob sidecars", count);
return delegate.requestBlobSidecarsByRange(startSlot, count, listener);
} else {
LOG.debug(
"Rate limiting request for {} blob sidecars. Retry in {} seconds",
count,
PEER_REQUEST_DELAY.toSeconds());
return asyncRunner.runAfterDelay(
() -> requestBlobSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
}
final RpcResponseListenerWithCount<BlobSidecar> listenerWithCount =
new RpcResponseListenerWithCount<>(listener);
long blobSidecarsCount =
maybeMaxBlobsPerBlock
.map(maxBlobsPerBlock -> maxBlobsPerBlock * count.longValue())
.orElse(0L);
return blobSidecarsRateTracker
.approveObjectsRequest(blobSidecarsCount)
.map(
requestApproval -> {
LOG.debug("Sending request for ~ {} blob sidecars", blobSidecarsCount);
return delegate
.requestBlobSidecarsByRange(startSlot, count, listenerWithCount)
.alwaysRun(
() ->
// adjust for slots with empty blocks and slots with blobs <
// maxBlobsPerBlock
blobSidecarsRateTracker.adjustObjectsRequest(
requestApproval, listenerWithCount.count));
})
.orElseGet(
() -> {
LOG.debug(
"Rate limiting request for ~ {} blob sidecars. Retry in {} seconds",
blobSidecarsCount,
PEER_REQUEST_DELAY.toSeconds());
return asyncRunner.runAfterDelay(
() -> requestBlobSidecarsByRange(startSlot, count, listener), PEER_REQUEST_DELAY);
});
}

@Override
Expand All @@ -104,4 +142,22 @@ public void adjustReputation(final ReputationAdjustment adjustment) {
public String toString() {
return delegate.toString();
}

@VisibleForTesting
static class RpcResponseListenerWithCount<T> implements RpcResponseListener<T> {

private int count = 0;

private final RpcResponseListener<T> delegate;

private RpcResponseListenerWithCount(final RpcResponseListener<T> delegate) {
this.delegate = delegate;
}

@Override
public SafeFuture<?> onResponse(final T response) {
count++;
return delegate.onResponse(response);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
package tech.pegasys.teku.beacon.sync.forward.multipeer.chains;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static tech.pegasys.teku.infrastructure.async.FutureUtil.ignoreFuture;

import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.ThrottlingSyncSource.RpcResponseListenerWithCount;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.async.StubAsyncRunner;
import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
Expand All @@ -32,19 +36,20 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

@SuppressWarnings("unchecked")
class ThrottlingSyncSourceTest {

private static final int MAX_BLOCKS_PER_MINUTE = 100;
private static final int MAX_BLOBS_PER_BLOCK = 6;
private static final int MAX_BLOB_SIDECARS_PER_MINUTE = 100;

private final StubAsyncRunner asyncRunner = new StubAsyncRunner();
private final StubTimeProvider timeProvider = StubTimeProvider.withTimeInSeconds(0);
private final SyncSource delegate = mock(SyncSource.class);

@SuppressWarnings("unchecked")
private final RpcResponseListener<SignedBeaconBlock> blocksListener =
mock(RpcResponseListener.class);

@SuppressWarnings("unchecked")
private final RpcResponseListener<BlobSidecar> blobSidecarsListener =
mock(RpcResponseListener.class);

Expand All @@ -54,8 +59,34 @@ class ThrottlingSyncSourceTest {
timeProvider,
delegate,
MAX_BLOCKS_PER_MINUTE,
Optional.of(MAX_BLOBS_PER_BLOCK),
Optional.of(MAX_BLOB_SIDECARS_PER_MINUTE));

@BeforeEach
void setup() {
// simulate complete RPC responses
when(delegate.requestBlocksByRange(any(), any(), any()))
.thenAnswer(
invocationOnMock -> {
final UInt64 count = invocationOnMock.getArgument(1);
final RpcResponseListenerWithCount<SignedBeaconBlock> listener =
invocationOnMock.getArgument(2);
UInt64.range(UInt64.ZERO, count)
.forEach(__ -> listener.onResponse(mock(SignedBeaconBlock.class)));
return SafeFuture.COMPLETE;
});
when(delegate.requestBlobSidecarsByRange(any(), any(), any()))
.thenAnswer(
invocationOnMock -> {
final UInt64 count = invocationOnMock.getArgument(1);
final RpcResponseListenerWithCount<BlobSidecar> listener =
invocationOnMock.getArgument(2);
UInt64.range(UInt64.ZERO, count.times(MAX_BLOBS_PER_BLOCK))
.forEach(__ -> listener.onResponse(mock(BlobSidecar.class)));
return SafeFuture.COMPLETE;
});
}

@Test
void shouldDelegateDisconnectImmediately() {
final SafeFuture<Void> result = new SafeFuture<>();
Expand All @@ -75,23 +106,33 @@ void shouldRequestBlocksImmediatelyIfRateLimitNotExceeded() {
ignoreFuture(source.requestBlocksByRange(UInt64.valueOf(100), count, blocksListener));

// Both requests happen immediately
ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.ZERO, count, blocksListener));
ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.valueOf(100), count, blocksListener));
ignoreFuture(
verify(delegate)
.requestBlocksByRange(
eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class)));
ignoreFuture(
verify(delegate)
.requestBlocksByRange(
eq(UInt64.valueOf(100)), eq(count), any(RpcResponseListenerWithCount.class)));
}

@Test
void shouldRequestBlobSidecarsImmediatelyIfRateLimitNotExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE - 1);
// 100 / 6 = 16, 16 * 6 = 92 < 100
final UInt64 count = UInt64.valueOf(16);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
source.requestBlobSidecarsByRange(UInt64.valueOf(16), count, blobSidecarsListener));

// Both requests happen immediately
ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
verify(delegate)
.requestBlobSidecarsByRange(
eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class)));
ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
.requestBlobSidecarsByRange(
eq(UInt64.valueOf(16)), eq(count), any(RpcResponseListenerWithCount.class)));
}

@Test
Expand All @@ -101,32 +142,42 @@ void shouldDelayRequestIfBlockLimitAlreadyExceeded() {
ignoreFuture(source.requestBlocksByRange(UInt64.valueOf(100), count, blocksListener));

// Both requests happen immediately
ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.ZERO, count, blocksListener));
ignoreFuture(
verify(delegate)
.requestBlocksByRange(
eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class)));
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(61);
asyncRunner.executeQueuedActions();

ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.valueOf(100), count, blocksListener));
ignoreFuture(
verify(delegate)
.requestBlocksByRange(
eq(UInt64.valueOf(100)), eq(count), any(RpcResponseListenerWithCount.class)));
}

@Test
void shouldDelayRequestIfBlobSidecarsLimitAlreadyExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE);
// 17 * 6 = 102 > 100
final UInt64 count = UInt64.valueOf(17);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
source.requestBlobSidecarsByRange(UInt64.valueOf(17), count, blobSidecarsListener));

ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
verify(delegate)
.requestBlobSidecarsByRange(
eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class)));
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(61);
asyncRunner.executeQueuedActions();

ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
.requestBlobSidecarsByRange(
eq(UInt64.valueOf(17)), eq(count), any(RpcResponseListenerWithCount.class)));
}

@Test
Expand All @@ -136,7 +187,10 @@ void shouldContinueDelayingBlocksRequestIfRequestStillExceeded() {
ignoreFuture(source.requestBlocksByRange(UInt64.valueOf(100), count, blocksListener));

// Both requests happen immediately
ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.ZERO, count, blocksListener));
ignoreFuture(
verify(delegate)
.requestBlocksByRange(
eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class)));
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(30);
Expand All @@ -145,19 +199,25 @@ void shouldContinueDelayingBlocksRequestIfRequestStillExceeded() {

timeProvider.advanceTimeBySeconds(31);
asyncRunner.executeQueuedActions();
ignoreFuture(verify(delegate).requestBlocksByRange(UInt64.valueOf(100), count, blocksListener));
ignoreFuture(
verify(delegate)
.requestBlocksByRange(
eq(UInt64.valueOf(100)), eq(count), any(RpcResponseListenerWithCount.class)));
}

@Test
void shouldContinueDelayingBlobSidecarsRequestIfRequestStillExceeded() {
final UInt64 count = UInt64.valueOf(MAX_BLOB_SIDECARS_PER_MINUTE);
// 17 * 6 = 102 > 100
final UInt64 count = UInt64.valueOf(17);
ignoreFuture(source.requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
ignoreFuture(
source.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
source.requestBlobSidecarsByRange(UInt64.valueOf(17), count, blobSidecarsListener));

// Both requests happen immediately
ignoreFuture(
verify(delegate).requestBlobSidecarsByRange(UInt64.ZERO, count, blobSidecarsListener));
verify(delegate)
.requestBlobSidecarsByRange(
eq(UInt64.ZERO), eq(count), any(RpcResponseListenerWithCount.class)));
verifyNoMoreInteractions(delegate);

timeProvider.advanceTimeBySeconds(30);
Expand All @@ -168,6 +228,7 @@ void shouldContinueDelayingBlobSidecarsRequestIfRequestStillExceeded() {
asyncRunner.executeQueuedActions();
ignoreFuture(
verify(delegate)
.requestBlobSidecarsByRange(UInt64.valueOf(100), count, blobSidecarsListener));
.requestBlobSidecarsByRange(
eq(UInt64.valueOf(17)), eq(count), any(RpcResponseListenerWithCount.class)));
}
}
Loading

0 comments on commit 6670900

Please sign in to comment.