Skip to content

Commit

Permalink
change key for the task
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Jan 8, 2025
1 parent 1e83524 commit cc702fb
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void requestRecentBlobSidecars(
}

@Override
public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {
public void cancelRecentBlobSidecarsRequests(final Bytes32 blockRoot) {
// No-op
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.List;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;

public record BlockRootAndBlobIdentifiers(
Bytes32 blockRoot, List<BlobIdentifier> blobIdentifiers) {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;

public class DefaultFetchTaskFactory implements FetchTaskFactory {

Expand All @@ -36,9 +34,8 @@ public FetchBlockTask createFetchBlockTask(

@Override
public FetchBlobSidecarsTask createFetchBlobSidecarsTask(
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers,
final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers,
final Optional<Eth2Peer> preferredPeer) {
return new FetchBlobSidecarsTask(eth2Network, preferredPeer, blockRoot, blobIdentifiers);
return new FetchBlobSidecarsTask(eth2Network, preferredPeer, blockRootAndBlobIdentifiers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beacon.sync.fetch.FetchResult.Status;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
Expand All @@ -27,39 +26,36 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;

public class FetchBlobSidecarsTask extends AbstractFetchTask<Bytes32, List<BlobSidecar>> {
public class FetchBlobSidecarsTask
extends AbstractFetchTask<BlockRootAndBlobIdentifiers, List<BlobSidecar>> {

private static final Logger LOG = LogManager.getLogger();

private final Bytes32 blockRoot;
private final List<BlobIdentifier> blobIdentifiers;
private final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers;

FetchBlobSidecarsTask(
final P2PNetwork<Eth2Peer> eth2Network,
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers) {
final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers) {
super(eth2Network, Optional.empty());
this.blockRoot = blockRoot;
this.blobIdentifiers = blobIdentifiers;
this.blockRootAndBlobIdentifiers = blockRootAndBlobIdentifiers;
}

public FetchBlobSidecarsTask(
final P2PNetwork<Eth2Peer> eth2Network,
final Optional<Eth2Peer> preferredPeer,
final Bytes32 blockRoot,
final List<BlobIdentifier> blobIdentifiers) {
final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers) {
super(eth2Network, preferredPeer);
this.blockRoot = blockRoot;
this.blobIdentifiers = blobIdentifiers;
this.blockRootAndBlobIdentifiers = blockRootAndBlobIdentifiers;
}

@Override
public Bytes32 getKey() {
return blockRoot;
public BlockRootAndBlobIdentifiers getKey() {
return blockRootAndBlobIdentifiers;
}

@Override
SafeFuture<FetchResult<List<BlobSidecar>>> fetch(final Eth2Peer peer) {
final List<BlobIdentifier> blobIdentifiers = blockRootAndBlobIdentifiers.blobIdentifiers();
final List<BlobSidecar> blobSidecars = new ArrayList<>(blobIdentifiers.size());
return peer.requestBlobSidecarsByRoot(
blobIdentifiers, RpcResponseListener.from(blobSidecars::add))
Expand All @@ -69,7 +65,9 @@ SafeFuture<FetchResult<List<BlobSidecar>>> fetch(final Eth2Peer peer) {
LOG.debug(
String.format(
"Failed to fetch %d blob sidecars for block root %s from peer %s",
blobIdentifiers.size(), blockRoot, peer.getId()),
blobIdentifiers.size(),
blockRootAndBlobIdentifiers.blockRoot(),
peer.getId()),
err);
return FetchResult.createFailed(peer, Status.FETCH_FAILED);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@

package tech.pegasys.teku.beacon.sync.fetch;

import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;

public interface FetchTaskFactory {

Expand All @@ -28,10 +26,10 @@ default FetchBlockTask createFetchBlockTask(final Bytes32 blockRoot) {
FetchBlockTask createFetchBlockTask(Bytes32 blockRoot, Optional<Eth2Peer> preferredPeer);

default FetchBlobSidecarsTask createFetchBlobSidecarsTask(
final Bytes32 blockRoot, final List<BlobIdentifier> blobIdentifiers) {
return createFetchBlobSidecarsTask(blockRoot, blobIdentifiers, Optional.empty());
final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers) {
return createFetchBlobSidecarsTask(blockRootAndBlobIdentifiers, Optional.empty());
}

FetchBlobSidecarsTask createFetchBlobSidecarsTask(
Bytes32 blockRoot, List<BlobIdentifier> blobIdentifiers, Optional<Eth2Peer> preferredPeer);
BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers, Optional<Eth2Peer> preferredPeer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,7 @@ private String getTaskName(final T task) {
return task.getClass().getSimpleName();
}

public abstract T createTask(K key);

public abstract void processFetchedResult(T task, R result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.beacon.sync.fetch.BlockRootAndBlobIdentifiers;
import tech.pegasys.teku.beacon.sync.fetch.FetchBlobSidecarsTask;
import tech.pegasys.teku.beacon.sync.fetch.FetchTaskFactory;
import tech.pegasys.teku.beacon.sync.forward.ForwardSync;
Expand All @@ -30,7 +31,8 @@
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;

public class RecentBlobSidecarsFetchService
extends AbstractFetchService<Bytes32, FetchBlobSidecarsTask, List<BlobSidecar>>
extends AbstractFetchService<
BlockRootAndBlobIdentifiers, FetchBlobSidecarsTask, List<BlobSidecar>>
implements RecentBlobSidecarsFetcher {

private static final Logger LOG = LogManager.getLogger();
Expand Down Expand Up @@ -102,9 +104,10 @@ public void requestRecentBlobSidecars(
// We already have all required blob sidecars
return;
}
final FetchBlobSidecarsTask task =
fetchTaskFactory.createFetchBlobSidecarsTask(blockRoot, requiredBlobIdentifiers);
if (allTasks.putIfAbsent(blockRoot, task) != null) {
final BlockRootAndBlobIdentifiers key =
new BlockRootAndBlobIdentifiers(blockRoot, requiredBlobIdentifiers);
final FetchBlobSidecarsTask task = createTask(key);
if (allTasks.putIfAbsent(key, task) != null) {
// We're already tracking this task
task.cancel();
return;
Expand All @@ -114,8 +117,18 @@ public void requestRecentBlobSidecars(
}

@Override
public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {
cancelRequest(blockRoot);
public void cancelRecentBlobSidecarsRequests(final Bytes32 blockRoot) {
allTasks.forEach(
(key, __) -> {
if (key.blockRoot().equals(blockRoot)) {
cancelRequest(key);
}
});
}

@Override
public FetchBlobSidecarsTask createTask(final BlockRootAndBlobIdentifiers key) {
return fetchTaskFactory.createFetchBlobSidecarsTask(key);
}

@Override
Expand All @@ -135,13 +148,13 @@ public void onBlockValidated(final SignedBeaconBlock block) {}

@Override
public void onBlockImported(final SignedBeaconBlock block, final boolean executionOptimistic) {
cancelRecentBlobSidecarsRequest(block.getRoot());
cancelRecentBlobSidecarsRequests(block.getRoot());
}

private void setupSubscribers() {
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecars(this::requestRecentBlobSidecars);
blockBlobSidecarsTrackersPool.subscribeRequiredBlobSidecarsDropped(
this::cancelRecentBlobSidecarsRequest);
this::cancelRecentBlobSidecarsRequests);
forwardSync.subscribeToSyncChanges(this::onSyncStatusChanged);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void requestRecentBlobSidecars(
final Bytes32 blockRoot, final List<BlobIdentifier> blobIdentifiers) {}

@Override
public void cancelRecentBlobSidecarsRequest(final Bytes32 blockRoot) {}
public void cancelRecentBlobSidecarsRequests(final Bytes32 blockRoot) {}

@Override
public SafeFuture<?> start() {
Expand Down Expand Up @@ -86,5 +86,5 @@ static RecentBlobSidecarsFetcher create(

void requestRecentBlobSidecars(Bytes32 blockRoot, List<BlobIdentifier> blobIdentifiers);

void cancelRecentBlobSidecarsRequest(Bytes32 blockRoot);
void cancelRecentBlobSidecarsRequests(Bytes32 blockRoot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void requestRecentBlock(final Bytes32 blockRoot) {
// We already have this block, waiting for blobs
return;
}
final FetchBlockTask task = fetchTaskFactory.createFetchBlockTask(blockRoot);
final FetchBlockTask task = createTask(blockRoot);
if (allTasks.putIfAbsent(blockRoot, task) != null) {
// We're already tracking this task
task.cancel();
Expand All @@ -121,6 +121,11 @@ public void cancelRecentBlockRequest(final Bytes32 blockRoot) {
cancelRequest(blockRoot);
}

@Override
public FetchBlockTask createTask(final Bytes32 key) {
return fetchTaskFactory.createFetchBlockTask(key);
}

@Override
public void processFetchedResult(final FetchBlockTask task, final SignedBeaconBlock block) {
LOG.trace("Successfully fetched block: {}", block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public void createsFetchBlockTask() {
public void createsFetchBlobSidecarsTask() {
final FetchBlobSidecarsTask task =
fetchTaskFactory.createFetchBlobSidecarsTask(
Bytes32.ZERO, List.of(new BlobIdentifier(Bytes32.ZERO, UInt64.ZERO)));
new BlockRootAndBlobIdentifiers(
Bytes32.ZERO, List.of(new BlobIdentifier(Bytes32.ZERO, UInt64.ZERO))));
assertThat(task).isNotNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ public void run_successful() {
final List<BlobSidecar> blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block);
final List<BlobIdentifier> blobIdentifiers = getBlobIdentifiers(blobSidecars);

final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers =
new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers);
final FetchBlobSidecarsTask task =
new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers);
assertThat(task.getKey()).isEqualTo(block.getRoot());
new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers);
assertThat(task.getKey()).isEqualTo(blockRootAndBlobIdentifiers);

final Eth2Peer peer = registerNewPeer(1);
mockRpcResponse(peer, blobIdentifiers, blobSidecars);
Expand All @@ -58,8 +60,10 @@ public void run_noPeers() {
final List<BlobSidecar> blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block);
final List<BlobIdentifier> blobIdentifiers = getBlobIdentifiers(blobSidecars);

final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers =
new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers);
final FetchBlobSidecarsTask task =
new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers);
new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers);

final SafeFuture<FetchResult<List<BlobSidecar>>> result = task.run();
assertThat(result).isDone();
Expand All @@ -75,8 +79,10 @@ public void run_failAndRetryWithNoNewPeers() {
final List<BlobSidecar> blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block);
final List<BlobIdentifier> blobIdentifiers = getBlobIdentifiers(blobSidecars);

final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers =
new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers);
final FetchBlobSidecarsTask task =
new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers);
new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers);

final Eth2Peer peer = registerNewPeer(1);
when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any()))
Expand Down Expand Up @@ -106,8 +112,10 @@ public void run_failAndRetryWithNewPeer() {
final List<BlobSidecar> blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block);
final List<BlobIdentifier> blobIdentifiers = getBlobIdentifiers(blobSidecars);

final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers =
new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers);
final FetchBlobSidecarsTask task =
new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers);
new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers);

final Eth2Peer peer = registerNewPeer(1);
when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any()))
Expand Down Expand Up @@ -141,8 +149,10 @@ public void run_withMultiplesPeersAvailable() {
final List<BlobSidecar> blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block);
final List<BlobIdentifier> blobIdentifiers = getBlobIdentifiers(blobSidecars);

final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers =
new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers);
final FetchBlobSidecarsTask task =
new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers);
new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers);

final Eth2Peer peer = registerNewPeer(1);
when(peer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any()))
Expand Down Expand Up @@ -170,9 +180,11 @@ public void run_withPreferredPeer() {
final List<BlobIdentifier> blobIdentifiers = getBlobIdentifiers(blobSidecars);
mockRpcResponse(preferredPeer, blobIdentifiers, blobSidecars);

final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers =
new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers);
final FetchBlobSidecarsTask task =
new FetchBlobSidecarsTask(
eth2P2PNetwork, Optional.of(preferredPeer), block.getRoot(), blobIdentifiers);
eth2P2PNetwork, Optional.of(preferredPeer), blockRootAndBlobIdentifiers);

// Add a peer
registerNewPeer(2);
Expand All @@ -195,9 +207,11 @@ public void run_withRandomPeerWhenFetchingWithPreferredPeerFails() {
when(preferredPeer.requestBlobSidecarsByRoot(eq(blobIdentifiers), any()))
.thenReturn(SafeFuture.failedFuture(new RuntimeException("whoops")));

final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers =
new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers);
final FetchBlobSidecarsTask task =
new FetchBlobSidecarsTask(
eth2P2PNetwork, Optional.of(preferredPeer), block.getRoot(), blobIdentifiers);
eth2P2PNetwork, Optional.of(preferredPeer), blockRootAndBlobIdentifiers);

final SafeFuture<FetchResult<List<BlobSidecar>>> result = task.run();
assertThat(result).isDone();
Expand Down Expand Up @@ -227,9 +241,11 @@ public void cancel() {
final List<BlobSidecar> blobSidecars = dataStructureUtil.randomBlobSidecarsForBlock(block);
final List<BlobIdentifier> blobIdentifiers = getBlobIdentifiers(blobSidecars);

final BlockRootAndBlobIdentifiers blockRootAndBlobIdentifiers =
new BlockRootAndBlobIdentifiers(block.getRoot(), blobIdentifiers);
final FetchBlobSidecarsTask task =
new FetchBlobSidecarsTask(eth2P2PNetwork, block.getRoot(), blobIdentifiers);
assertThat(task.getKey()).isEqualTo(block.getRoot());
new FetchBlobSidecarsTask(eth2P2PNetwork, blockRootAndBlobIdentifiers);
assertThat(task.getKey()).isEqualTo(blockRootAndBlobIdentifiers);

final Eth2Peer peer = registerNewPeer(1);
mockRpcResponse(peer, blobIdentifiers, blobSidecars);
Expand Down
Loading

0 comments on commit cc702fb

Please sign in to comment.