Skip to content

Commit

Permalink
change StateSelector logic
Browse files Browse the repository at this point in the history
fixes Consensys#7955

needs more testing, need to see how AT goes initially.

Signed-off-by: Paul Harris <[email protected]>
  • Loading branch information
rolfyone committed Jan 13, 2025
1 parent 500e413 commit a7ce4de
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,13 @@ public StateSelector genesisSelector() {
@Override
public StateSelector finalizedSelector() {
return () ->
SafeFuture.completedFuture(
client
.getLatestFinalized()
.map(
finalized ->
addMetaData(
finalized.getState(),
// The finalized checkpoint may change because of optimistically
// imported blocks at the head and if the head isn't optimistic, the
// finalized block can't be optimistic.
client.isChainHeadOptimistic(),
true,
true)));
client
.getBestFinalizedState()
.thenApply(
maybeFinalized ->
maybeFinalized.map(
finalized ->
addMetaData(finalized, client.isChainHeadOptimistic(), true, true)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlockHeader;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.metadata.StateAndMetaData;
import tech.pegasys.teku.spec.datastructures.state.AnchorPoint;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.api.StorageQueryChannel;
Expand Down Expand Up @@ -66,8 +65,7 @@ public void headSelector_shouldGetBestState() {

@Test
public void finalizedSelector_shouldGetFinalizedState() {
when(client.getLatestFinalized())
.thenReturn(Optional.of(AnchorPoint.fromInitialState(spec, state)));
when(client.getBestFinalizedState()).thenReturn(SafeFuture.completedFuture(Optional.of(state)));
Optional<StateAndMetaData> result = safeJoin(factory.finalizedSelector().getState());
assertThat(result).contains(withMetaData(state, true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ public int hashCode() {

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("blockSummary", blockSummary).toString();
return MoreObjects.toStringHelper(this)
.add("blockSummary", blockSummary)
.add("state", state)
.toString();
}

private Optional<ExecutionPayloadHeader> getLatestExecutionPayloadHeader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlockSummary;
import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
Expand Down Expand Up @@ -197,34 +196,6 @@ public SafeFuture<List<BlobSidecar>> getAllBlobSidecars(
.thenCompose(this::getAllBlobSidecars);
}

private List<BlobSidecar> filterBlobSidecars(
final List<BlobSidecar> blobSidecars, final List<UInt64> indices) {
if (indices.isEmpty()) {
return blobSidecars;
}
return blobSidecars.stream().filter(key -> indices.contains(key.getIndex())).toList();
}

private Stream<SlotAndBlockRootAndBlobIndex> filterBlobSidecarKeys(
final List<SlotAndBlockRootAndBlobIndex> keys, final List<UInt64> indices) {
if (indices.isEmpty()) {
return keys.stream();
}
return keys.stream().filter(key -> indices.contains(key.getBlobIndex()));
}

private SafeFuture<List<BlobSidecar>> getBlobSidecars(
final Stream<SlotAndBlockRootAndBlobIndex> keys) {
return SafeFuture.collectAll(keys.map(this::getAllBlobSidecarByKey))
.thenApply(blobSidecars -> blobSidecars.stream().flatMap(Optional::stream).toList());
}

private SafeFuture<List<BlobSidecar>> getAllBlobSidecars(
final Stream<SlotAndBlockRootAndBlobIndex> keys) {
return SafeFuture.collectAll(keys.map(this::getAllBlobSidecarByKey))
.thenApply(blobSidecars -> blobSidecars.stream().flatMap(Optional::stream).toList());
}

public SafeFuture<Optional<BeaconState>> getStateAtSlotExact(final UInt64 slot) {
final Optional<Bytes32> recentBlockRoot = recentChainData.getBlockRootInEffectBySlot(slot);
if (recentBlockRoot.isPresent()) {
Expand Down Expand Up @@ -285,14 +256,6 @@ public SafeFuture<Optional<BeaconState>> getStateAtSlotExact(
}
}

private SafeFuture<Optional<BeaconState>> regenerateStateAndSlotExact(final UInt64 slot) {
return getBlockAndStateInEffectAtSlot(slot)
.thenApplyChecked(
maybeBlockAndState ->
maybeBlockAndState.flatMap(
blockAndState -> regenerateBeaconState(blockAndState.getState(), slot)));
}

public SafeFuture<Optional<CheckpointState>> getCheckpointStateAtEpoch(final UInt64 epoch) {
final UInt64 epochSlot = spec.computeStartSlotAtEpoch(epoch);
return getSignedBlockAndStateInEffectAtSlot(epochSlot)
Expand All @@ -318,12 +281,6 @@ public SafeFuture<CheckpointState> getCheckpointState(
});
}

private SafeFuture<Optional<SignedBlockAndState>> getStateForBlock(
final SignedBeaconBlock block) {
return getStateByBlockRoot(block.getRoot())
.thenApply(maybeState -> maybeState.map(state -> new SignedBlockAndState(block, state)));
}

public boolean isFinalized(final UInt64 slot) {
final UInt64 finalizedEpoch = recentChainData.getFinalizedEpoch();
final UInt64 finalizedSlot = spec.computeStartSlotAtEpoch(finalizedEpoch);
Expand Down Expand Up @@ -413,71 +370,10 @@ public SafeFuture<Optional<BeaconState>> getStateByStateRoot(final Bytes32 state
.orElseGet(() -> getFinalizedStateFromStateRoot(stateRoot)));
}

private SafeFuture<Optional<BeaconState>> getFinalizedStateFromStateRoot(
final Bytes32 stateRoot) {
return historicalChainData
.getFinalizedSlotByStateRoot(stateRoot)
.thenCompose(
maybeSlot -> maybeSlot.map(this::getStateAtSlotExact).orElse(STATE_NOT_AVAILABLE));
}

public SafeFuture<Optional<UInt64>> getFinalizedSlotByBlockRoot(final Bytes32 blockRoot) {
return historicalChainData.getFinalizedSlotByBlockRoot(blockRoot);
}

private SafeFuture<Optional<BeaconState>> getStateFromSlotAndBlock(
final SlotAndBlockRoot slotAndBlockRoot) {
final UpdatableStore store = getStore();
if (store == null) {
LOG.trace(
"No state at slot and block root {} because the store is not set", slotAndBlockRoot);
return STATE_NOT_AVAILABLE;
}
return store
.retrieveStateAtSlot(slotAndBlockRoot)
.thenCompose(
maybeState -> {
if (maybeState.isPresent()) {
return SafeFuture.completedFuture(maybeState);
}
LOG.debug(
"State not found in store, querying historicalData for {}:{}",
slotAndBlockRoot::getSlot,
slotAndBlockRoot::getBlockRoot);
return getFinalizedStateFromSlotAndBlock(slotAndBlockRoot);
});
}

private SafeFuture<Optional<BeaconState>> getFinalizedStateFromSlotAndBlock(
final SlotAndBlockRoot slotAndBlockRoot) {
return historicalChainData
.getFinalizedStateByBlockRoot(slotAndBlockRoot.getBlockRoot())
.thenApply(
maybeState ->
maybeState.flatMap(
preState -> regenerateBeaconState(preState, slotAndBlockRoot.getSlot())));
}

private Optional<BeaconState> regenerateBeaconState(
final BeaconState preState, final UInt64 slot) {
if (preState.getSlot().equals(slot)) {
return Optional.of(preState);
} else if (slot.compareTo(getCurrentSlot()) > 0) {
LOG.debug("Attempted to wind forward to a future state: {}", slot.toString());
return Optional.empty();
}
try {
LOG.debug(
"Processing slots to regenerate state from slot {}, target slot {}",
preState::getSlot,
() -> slot);
return Optional.of(spec.processSlots(preState, slot));
} catch (SlotProcessingException | EpochProcessingException | IllegalArgumentException e) {
LOG.debug("State Transition error", e);
return Optional.empty();
}
}

public Optional<SafeFuture<BeaconState>> getBestState() {
return recentChainData.getBestState();
}
Expand All @@ -486,10 +382,6 @@ public Optional<Bytes32> getBestBlockRoot() {
return recentChainData.getBestBlockRoot();
}

public Optional<MinimalBeaconBlockSummary> getBestBlock() {
return recentChainData.getHeadBlock();
}

public Optional<ChainHead> getChainHead() {
return recentChainData.getChainHead();
}
Expand All @@ -502,7 +394,7 @@ public boolean isStoreAvailable() {
return recentChainData != null && recentChainData.getStore() != null;
}

public boolean isChainDataFullyAvailable() {
private boolean isChainDataFullyAvailable() {
return !recentChainData.isPreGenesis() && !recentChainData.isPreForkChoice();
}

Expand All @@ -523,10 +415,6 @@ public List<CommitteeAssignment> getCommitteesFromState(
return result;
}

public Optional<UInt64> getGenesisTime() {
return Optional.ofNullable(recentChainData.getGenesisTime());
}

/**
* @return The slot at which the chain head block was proposed
*/
Expand Down Expand Up @@ -662,17 +550,25 @@ public Optional<AnchorPoint> getLatestFinalized() {
return Optional.ofNullable(getStore()).map(ReadOnlyStore::getLatestFinalized);
}

// in line with spec on_block, computing finalized slot as the
// first slot of the epoch stored in store.finalized_checkpoint
public SafeFuture<Optional<BeaconState>> getBestFinalizedState() {
final Optional<Checkpoint> maybeFinalizedCheckpoint = getFinalizedCheckpoint();
if (maybeFinalizedCheckpoint.isEmpty()) {
return SafeFuture.completedFuture(Optional.empty());
}
final UInt64 finalizedSlot =
spec.computeStartSlotAtEpoch(maybeFinalizedCheckpoint.get().getEpoch());
return getStateAtSlotExact(finalizedSlot);
}

public SafeFuture<Optional<BeaconState>> getJustifiedState() {
if (recentChainData.isPreGenesis()) {
return SafeFuture.completedFuture(Optional.empty());
}
return getStore().retrieveCheckpointState(getStore().getJustifiedCheckpoint());
}

public Optional<Checkpoint> getJustifiedCheckpoint() {
return Optional.ofNullable(getStore()).map(ReadOnlyStore::getJustifiedCheckpoint);
}

public Optional<GenesisData> getGenesisData() {
return recentChainData.getGenesisData();
}
Expand Down Expand Up @@ -823,4 +719,111 @@ private boolean isOptimistic(
public SafeFuture<Optional<Checkpoint>> getInitialAnchor() {
return historicalChainData.getAnchor();
}

private SafeFuture<Optional<BeaconState>> getStateFromSlotAndBlock(
final SlotAndBlockRoot slotAndBlockRoot) {
final UpdatableStore store = getStore();
if (store == null) {
LOG.trace(
"No state at slot and block root {} because the store is not set", slotAndBlockRoot);
return STATE_NOT_AVAILABLE;
}
return store
.retrieveStateAtSlot(slotAndBlockRoot)
.thenCompose(
maybeState -> {
if (maybeState.isPresent()) {
return SafeFuture.completedFuture(maybeState);
}
LOG.debug(
"State not found in store, querying historicalData for {}:{}",
slotAndBlockRoot::getSlot,
slotAndBlockRoot::getBlockRoot);
return getFinalizedStateFromSlotAndBlock(slotAndBlockRoot);
});
}

private SafeFuture<Optional<BeaconState>> getFinalizedStateFromSlotAndBlock(
final SlotAndBlockRoot slotAndBlockRoot) {
return historicalChainData
.getFinalizedStateByBlockRoot(slotAndBlockRoot.getBlockRoot())
.thenApply(
maybeState ->
maybeState.flatMap(
preState -> regenerateBeaconState(preState, slotAndBlockRoot.getSlot())));
}

private Optional<BeaconState> regenerateBeaconState(
final BeaconState preState, final UInt64 slot) {
if (preState.getSlot().equals(slot)) {
return Optional.of(preState);
} else if (slot.compareTo(getCurrentSlot()) > 0) {
LOG.debug("Attempted to wind forward to a future state: {}", slot.toString());
return Optional.empty();
}
try {
LOG.debug(
"Processing slots to regenerate state from slot {}, target slot {}",
preState::getSlot,
() -> slot);
return Optional.of(spec.processSlots(preState, slot));
} catch (SlotProcessingException | EpochProcessingException | IllegalArgumentException e) {
LOG.debug("State Transition error", e);
return Optional.empty();
}
}

private SafeFuture<Optional<SignedBlockAndState>> getStateForBlock(
final SignedBeaconBlock block) {
return getStateByBlockRoot(block.getRoot())
.thenApply(maybeState -> maybeState.map(state -> new SignedBlockAndState(block, state)));
}

private SafeFuture<Optional<BeaconState>> getFinalizedStateFromStateRoot(
final Bytes32 stateRoot) {
return historicalChainData
.getFinalizedSlotByStateRoot(stateRoot)
.thenCompose(
maybeSlot -> maybeSlot.map(this::getStateAtSlotExact).orElse(STATE_NOT_AVAILABLE));
}

private SafeFuture<Optional<BeaconState>> regenerateStateAndSlotExact(final UInt64 slot) {
return getBlockAndStateInEffectAtSlot(slot)
.thenApplyChecked(
maybeBlockAndState ->
maybeBlockAndState.flatMap(
blockAndState -> regenerateBeaconState(blockAndState.getState(), slot)));
}

private List<BlobSidecar> filterBlobSidecars(
final List<BlobSidecar> blobSidecars, final List<UInt64> indices) {
if (indices.isEmpty()) {
return blobSidecars;
}
return blobSidecars.stream().filter(key -> indices.contains(key.getIndex())).toList();
}

private Stream<SlotAndBlockRootAndBlobIndex> filterBlobSidecarKeys(
final List<SlotAndBlockRootAndBlobIndex> keys, final List<UInt64> indices) {
if (indices.isEmpty()) {
return keys.stream();
}
return keys.stream().filter(key -> indices.contains(key.getBlobIndex()));
}

private SafeFuture<List<BlobSidecar>> getBlobSidecars(
final Stream<SlotAndBlockRootAndBlobIndex> keys) {
return SafeFuture.collectAll(keys.map(this::getAllBlobSidecarByKey))
.thenApply(blobSidecars -> blobSidecars.stream().flatMap(Optional::stream).toList());
}

private SafeFuture<List<BlobSidecar>> getAllBlobSidecars(
final Stream<SlotAndBlockRootAndBlobIndex> keys) {
return SafeFuture.collectAll(keys.map(this::getAllBlobSidecarByKey))
.thenApply(blobSidecars -> blobSidecars.stream().flatMap(Optional::stream).toList());
}

private Optional<Checkpoint> getFinalizedCheckpoint() {
return Optional.ofNullable(getStore()).map(ReadOnlyStore::getFinalizedCheckpoint);
}
}
Loading

0 comments on commit a7ce4de

Please sign in to comment.