diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/NoopSyncService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/NoopSyncService.java index 184dae2d09c..72a2916c332 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/NoopSyncService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/NoopSyncService.java @@ -113,7 +113,7 @@ public void requestRecentBlobSidecars( } @Override - public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) { + public void cancelRecentBlobSidecarsRequests(final Bytes32 blockRoot) { // No-op } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/BlockRootAndBlobIdentifiers.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/BlockRootAndBlobIdentifiers.java new file mode 100644 index 00000000000..a6b7bef80e4 --- /dev/null +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/BlockRootAndBlobIdentifiers.java @@ -0,0 +1,21 @@ +/* + * Copyright Consensys Software Inc., 2025 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.beacon.sync.fetch; + +import java.util.List; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; + +public record BlockRootAndBlobIdentifiers( + Bytes32 blockRoot, List blobIdentifiers) {} diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactory.java index 5cf5921b257..36acb0a02eb 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactory.java @@ -13,12 +13,10 @@ package tech.pegasys.teku.beacon.sync.fetch; -import java.util.List; import java.util.Optional; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; import tech.pegasys.teku.networking.p2p.network.P2PNetwork; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; public class DefaultFetchTaskFactory implements FetchTaskFactory { @@ -36,9 +34,8 @@ public FetchBlockTask createFetchBlockTask( @Override public FetchBlobSidecarsTask createFetchBlobSidecarsTask( - final Bytes32 blockRoot, - final List blobIdentifiers, + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers, final Optional preferredPeer) { - return new FetchBlobSidecarsTask(eth2Network, preferredPeer, blockRoot, blobIdentifiers); + return new FetchBlobSidecarsTask(eth2Network, preferredPeer, blockRootAndBlobIdentifiers); } } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTask.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTask.java index ff535b84889..a38ebe3c377 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTask.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTask.java @@ -18,7 +18,6 @@ import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; @@ -27,39 +26,36 @@ import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; -public class FetchBlobSidecarsTask extends AbstractFetchTask> { +public class FetchBlobSidecarsTask + extends AbstractFetchTask> { private static final Logger LOG = LogManager.getLogger(); - private final Bytes32 blockRoot; - private final List blobIdentifiers; + private final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers; FetchBlobSidecarsTask( final P2PNetwork eth2Network, - final Bytes32 blockRoot, - final List blobIdentifiers) { + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers) { super(eth2Network, Optional.empty()); - this.blockRoot = blockRoot; - this.blobIdentifiers = blobIdentifiers; + this.blockRootAndBlobIdentifiers = blockRootAndBlobIdentifiers; } public FetchBlobSidecarsTask( final P2PNetwork eth2Network, final Optional preferredPeer, - final Bytes32 blockRoot, - final List blobIdentifiers) { + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers) { super(eth2Network, preferredPeer); - this.blockRoot = blockRoot; - this.blobIdentifiers = blobIdentifiers; + this.blockRootAndBlobIdentifiers = blockRootAndBlobIdentifiers; } @Override - public Bytes32 getKey() { - return blockRoot; + public BlockRootAndBlobIdentifiers getKey() { + return blockRootAndBlobIdentifiers; } @Override SafeFuture>> fetch(final Eth2Peer peer) { + final List blobIdentifiers = blockRootAndBlobIdentifiers.blobIdentifiers(); final List blobSidecars = new ArrayList<>(blobIdentifiers.size()); return peer.requestBlobSidecarsByRoot( blobIdentifiers, RpcResponseListener.from(blobSidecars::add)) @@ -69,7 +65,9 @@ SafeFuture>> fetch(final Eth2Peer peer) { LOG.debug( String.format( "Failed to fetch %d blob sidecars for block root %s from peer %s", - blobIdentifiers.size(), blockRoot, peer.getId()), + blobIdentifiers.size(), + blockRootAndBlobIdentifiers.blockRoot(), + peer.getId()), err); return FetchResult.createFailed(peer, Status.FETCH_FAILED); }); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchTaskFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchTaskFactory.java index bf1f462850d..8af37d11891 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchTaskFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/fetch/FetchTaskFactory.java @@ -13,11 +13,9 @@ package tech.pegasys.teku.beacon.sync.fetch; -import java.util.List; import java.util.Optional; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; -import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier; public interface FetchTaskFactory { @@ -28,10 +26,10 @@ default FetchBlockTask createFetchBlockTask(final Bytes32 blockRoot) { FetchBlockTask createFetchBlockTask(Bytes32 blockRoot, Optional preferredPeer); default FetchBlobSidecarsTask createFetchBlobSidecarsTask( - final Bytes32 blockRoot, final List blobIdentifiers) { - return createFetchBlobSidecarsTask(blockRoot, blobIdentifiers, Optional.empty()); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers) { + return createFetchBlobSidecarsTask(blockRootAndBlobIdentifiers, Optional.empty()); } FetchBlobSidecarsTask createFetchBlobSidecarsTask( - Bytes32 blockRoot, List blobIdentifiers, Optional preferredPeer); + BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers, Optional preferredPeer); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/AbstractFetchService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/AbstractFetchService.java index 3272337ca39..fa407ea6e34 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/AbstractFetchService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/AbstractFetchService.java @@ -161,5 +161,7 @@ private String getTaskName(final T task) { return task.getClass().getSimpleName(); } + public abstract T createTask(K key); + public abstract void processFetchedResult(T task, R result); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchService.java index 2a792603a98..dcb84da22c8 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchService.java @@ -17,6 +17,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.beacon.sync.fetch.BlockRootAndBlobIdentifiers; import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarsTask; import tech.pegasys.teku.beacon.sync.fetch.FetchTaskFactory; import tech.pegasys.teku.beacon.sync.forward.ForwardSync; @@ -30,7 +31,8 @@ import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool; public class RecentBlobSidecarsFetchService - extends AbstractFetchService> + extends AbstractFetchService< + BlockRootAndBlobIdentifiers, FetchBlobSidecarsTask, List> implements RecentBlobSidecarsFetcher { private static final Logger LOG = LogManager.getLogger(); @@ -102,9 +104,10 @@ public void requestRecentBlobSidecars( // We already have all required blob sidecars return; } - final FetchBlobSidecarsTask task = - fetchTaskFactory.createFetchBlobSidecarsTask(blockRoot, requiredBlobIdentifiers); - if (allTasks.putIfAbsent(blockRoot, task) != null) { + final BlockRootAndBlobIdentifiers key = + new BlockRootAndBlobIdentifiers(blockRoot, requiredBlobIdentifiers); + final FetchBlobSidecarsTask task = createTask(key); + if (allTasks.putIfAbsent(key, task) != null) { // We're already tracking this task task.cancel(); return; @@ -114,8 +117,18 @@ public void requestRecentBlobSidecars( } @Override - public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) { - cancelRequest(blockRoot); + public void cancelRecentBlobSidecarsRequests(final Bytes32 blockRoot) { + allTasks.forEach( + (key, __) -> { + if (key.blockRoot().equals(blockRoot)) { + cancelRequest(key); + } + }); + } + + @Override + public FetchBlobSidecarsTask createTask(final BlockRootAndBlobIdentifiers key) { + return fetchTaskFactory.createFetchBlobSidecarsTask(key); } @Override @@ -135,13 +148,13 @@ public void onBlockValidated(final SignedBeaconBlock block) {} @Override public void onBlockImported(final SignedBeaconBlock block, final boolean executionOptimistic) { - cancelRecentBlobSidecarsRequest(block.getRoot()); + cancelRecentBlobSidecarsRequests(block.getRoot()); } private void setupSubscribers() { blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecars(this::requestRecentBlobSidecars); blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarsDropped( - this::cancelRecentBlobSidecarsRequest); + this::cancelRecentBlobSidecarsRequests); forwardSync.subscribeToSyncChanges(this::onSyncStatusChanged); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetcher.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetcher.java index 549c8e30ed8..537ced075f3 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetcher.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetcher.java @@ -39,7 +39,7 @@ public void requestRecentBlobSidecars( final Bytes32 blockRoot, final List blobIdentifiers) {} @Override - public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {} + public void cancelRecentBlobSidecarsRequests(final Bytes32 blockRoot) {} @Override public SafeFuture start() { @@ -86,5 +86,5 @@ static RecentBlobSidecarsFetcher create( void requestRecentBlobSidecars(Bytes32 blockRoot, List blobIdentifiers); - void cancelRecentBlobSidecarsRequest(Bytes32 blockRoot); + void cancelRecentBlobSidecarsRequests(Bytes32 blockRoot); } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java index 42920f23053..e6d4b47195f 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/gossip/blocks/RecentBlocksFetchService.java @@ -106,7 +106,7 @@ public void requestRecentBlock(final Bytes32 blockRoot) { // We already have this block, waiting for blobs return; } - final FetchBlockTask task = fetchTaskFactory.createFetchBlockTask(blockRoot); + final FetchBlockTask task = createTask(blockRoot); if (allTasks.putIfAbsent(blockRoot, task) != null) { // We're already tracking this task task.cancel(); @@ -121,6 +121,11 @@ public void cancelRecentBlockRequest(final Bytes32 blockRoot) { cancelRequest(blockRoot); } + @Override + public FetchBlockTask createTask(final Bytes32 key) { + return fetchTaskFactory.createFetchBlockTask(key); + } + @Override public void processFetchedResult(final FetchBlockTask task, final SignedBeaconBlock block) { LOG.trace("Successfully fetched block: {}", block); diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactoryTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactoryTest.java index 3299db4e0a6..0633ded0027 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactoryTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/DefaultFetchTaskFactoryTest.java @@ -41,7 +41,8 @@ public void createsFetchBlockTask() { public void createsFetchBlobSidecarsTask() { final FetchBlobSidecarsTask task = fetchTaskFactory.createFetchBlobSidecarsTask( - Bytes32.ZERO, List.of(new BlobIdentifier(Bytes32.ZERO, UInt64.ZERO))); + new BlockRootAndBlobIdentifiers( + Bytes32.ZERO, List.of(new BlobIdentifier(Bytes32.ZERO, UInt64.ZERO)))); assertThat(task).isNotNull(); } } diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTaskTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTaskTest.java index 39bcf2338ce..9f080a0d987 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTaskTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/fetch/FetchBlobSidecarsTaskTest.java @@ -37,9 +37,11 @@ public void run_successful() { final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = + new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers); final FetchBlobSidecarsTask task = - new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); - assertThat(task.getKey()).isEqualTo(block.getRoot()); + new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers); + assertThat(task.getKey()).isEqualTo(blockRootAndBlobIdentifiers); final Eth2Peer peer = registerNewPeer(1); mockRpcResponse(peer, blobIdentifiers, blobSidecars); @@ -58,8 +60,10 @@ public void run_noPeers() { final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = + new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers); final FetchBlobSidecarsTask task = - new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers); final SafeFuture>> result = task.run(); assertThat(result).isDone(); @@ -75,8 +79,10 @@ public void run_failAndRetryWithNoNewPeers() { final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = + new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers); final FetchBlobSidecarsTask task = - new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers); final Eth2Peer peer = registerNewPeer(1); when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) @@ -106,8 +112,10 @@ public void run_failAndRetryWithNewPeer() { final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = + new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers); final FetchBlobSidecarsTask task = - new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers); final Eth2Peer peer = registerNewPeer(1); when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) @@ -141,8 +149,10 @@ public void run_withMultiplesPeersAvailable() { final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = + new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers); final FetchBlobSidecarsTask task = - new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); + new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers); final Eth2Peer peer = registerNewPeer(1); when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) @@ -170,9 +180,11 @@ public void run_withPreferredPeer() { final List blobIdentifiers = getBlobIdentifiers(blobSidecars); mockRpcResponse(preferredPeer, blobIdentifiers, blobSidecars); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = + new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers); final FetchBlobSidecarsTask task = new FetchBlobSidecarsTask( - eth2P2PNetwork, Optional.of(preferredPeer), block.getRoot(), blobIdentifiers); + eth2P2PNetwork, Optional.of(preferredPeer), blockRootAndBlobIdentifiers); // Add a peer registerNewPeer(2); @@ -195,9 +207,11 @@ public void run_withRandomPeerWhenFetchingWithPreferredPeerFails() { when(preferredPeer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any())) .thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops"))); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = + new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers); final FetchBlobSidecarsTask task = new FetchBlobSidecarsTask( - eth2P2PNetwork, Optional.of(preferredPeer), block.getRoot(), blobIdentifiers); + eth2P2PNetwork, Optional.of(preferredPeer), blockRootAndBlobIdentifiers); final SafeFuture>> result = task.run(); assertThat(result).isDone(); @@ -227,9 +241,11 @@ public void cancel() { final List blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block); final List blobIdentifiers = getBlobIdentifiers(blobSidecars); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = + new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers); final FetchBlobSidecarsTask task = - new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers); - assertThat(task.getKey()).isEqualTo(block.getRoot()); + new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers); + assertThat(task.getKey()).isEqualTo(blockRootAndBlobIdentifiers); final Eth2Peer peer = registerNewPeer(1); mockRpcResponse(peer, blobIdentifiers, blobSidecars); diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchServiceTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchServiceTest.java index 559e105ac64..5496d8c3b89 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchServiceTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/gossip/blobs/RecentBlobSidecarsFetchServiceTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; +import tech.pegasys.teku.beacon.sync.fetch.BlockRootAndBlobIdentifiers; import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarsTask; import tech.pegasys.teku.beacon.sync.fetch.FetchResult; import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status; @@ -79,7 +80,7 @@ public void setup() { maxConcurrentRequests); lenient() - .when(fetchTaskFactory.createFetchBlobSidecarsTask(any(), any())) + .when(fetchTaskFactory.createFetchBlobSidecarsTask(any())) .thenAnswer(this::createMockTask); recentBlobSidecarsFetcher.subscribeBlobSidecarFetched(importedBlobSidecars::add); } @@ -124,16 +125,15 @@ public void fetchMultipleBlobSidecarsSuccessfully() { } @Test - public void handleRequiredBlobSidecarsWithSameBlockRoot() { + public void handleRequiredBlobSidecarsWithSameBlockAndBlobIdentifier() { final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar(); final Bytes32 blockRoot = blobSidecar.getBlockRoot(); final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); - final BlobIdentifier anotherBlobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); recentBlobSidecarsFetcher.requestRecentBlobSidecars(blockRoot, List.of(blobIdentifier)); - recentBlobSidecarsFetcher.requestRecentBlobSidecars(blockRoot, List.of(anotherBlobIdentifier)); + recentBlobSidecarsFetcher.requestRecentBlobSidecars(blockRoot, List.of(blobIdentifier)); - // only one task allowed per block root + // only one task allowed per block and blob identifiers assertTaskCounts(1, 1, 0); assertThat(importedBlobSidecars).isEmpty(); @@ -158,17 +158,24 @@ public void ignoreIfNoBlobSidecarsAreRequired() { } @Test - public void cancelBlobSidecarsRequest() { + public void cancelBlobSidecarsRequests() { final Bytes32 blockRoot = dataStructureUtil.randomBytes32(); final BlobIdentifier blobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); + final BlobIdentifier anotherBlobIdentifier = dataStructureUtil.randomBlobIdentifier(blockRoot); + recentBlobSidecarsFetcher.requestRecentBlobSidecars(blockRoot, List.of(blobIdentifier)); - recentBlobSidecarsFetcher.cancelRecentBlobSidecarsRequest(blockRoot); + recentBlobSidecarsFetcher.requestRecentBlobSidecars(blockRoot, List.of(anotherBlobIdentifier)); - verify(tasks.getFirst()).cancel(); - // Manually cancel future - taskFutures.getFirst().complete(FetchResult.createFailed(Status.CANCELLED)); + assertTaskCounts(2, 2, 0); + + recentBlobSidecarsFetcher.cancelRecentBlobSidecarsRequests(blockRoot); + + tasks.forEach(task -> verify(task).cancel()); + // manually cancel futures + taskFutures.forEach( + taskFuture -> taskFuture.complete(FetchResult.createFailed(Status.CANCELLED))); - // Task should be removed + // Tasks should be removed assertTaskCounts(0, 0, 0); assertThat(importedBlobSidecars).isEmpty(); } @@ -215,7 +222,7 @@ public void cancelTaskWhileWaitingToRetry() { assertTaskCounts(1, 0, 0); // Cancel task - recentBlobSidecarsFetcher.cancelRecentBlobSidecarsRequest(blobSidecar.getBlockRoot()); + recentBlobSidecarsFetcher.cancelRecentBlobSidecarsRequests(blobSidecar.getBlockRoot()); verify(tasks.getFirst()).cancel(); when(tasks.getFirst().run()) .thenReturn(SafeFuture.completedFuture(FetchResult.createFailed(Status.CANCELLED))); @@ -297,15 +304,15 @@ void shouldRequestRemainingRequiredBlobSidecarsWhenForwardSyncCompletes() { syncSubscriber.onSyncingChange(false); assertTaskCounts(2, 2, 0); final List requestingBlockRoots = - tasks.stream().map(FetchBlobSidecarsTask::getKey).toList(); + tasks.stream().map(task -> task.getKey().blockRoot()).toList(); assertThat(requestingBlockRoots).containsExactlyInAnyOrder(blockRoot, blockRoot1); } private FetchBlobSidecarsTask createMockTask(final InvocationOnMock invocationOnMock) { - final Bytes32 blockRoot = invocationOnMock.getArgument(0); + final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers = invocationOnMock.getArgument(0); final FetchBlobSidecarsTask task = mock(FetchBlobSidecarsTask.class); - lenient().when(task.getKey()).thenReturn(blockRoot); + lenient().when(task.getKey()).thenReturn(blockRootAndBlobIdentifiers); lenient().when(task.getNumberOfRetries()).thenReturn(0); final SafeFuture>> future = new SafeFuture<>(); lenient().when(task.run()).thenReturn(future);