Skip to content

Commit

Permalink
Added more utilities for late block reorg (#7741)
Browse files Browse the repository at this point in the history
* Added more utilities for late block reorg

 - Added validator_is_connected
 - added is_timely

 Currently Timeliness is going to be enabled, adding to a small cache. This is minimal overhead.

 partially addresses #6595

Signed-off-by: Paul Harris <[email protected]>
  • Loading branch information
rolfyone authored Nov 21, 2023
1 parent b345751 commit 47a457a
Show file tree
Hide file tree
Showing 16 changed files with 333 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public SafeFuture<InternalValidationResult> validateAndImportBlock(

final Optional<BlockImportPerformance> blockImportPerformance;

arrivalTimestamp.ifPresent(
arrivalTime -> recentChainData.setBlockTimelinessFromArrivalTime(block, arrivalTime));

if (blockImportMetrics.isPresent()) {
final BlockImportPerformance performance =
new BlockImportPerformance(timeProvider, blockImportMetrics.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ SafeFuture<Optional<ExecutionPayloadContext>> getPayloadId(

void onTerminalBlockReached(Bytes32 executionBlockHash);

boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot);

long subscribeToForkChoiceUpdatedResult(ForkChoiceUpdatedResultSubscriber subscriber);

boolean unsubscribeFromForkChoiceUpdatedResult(long subscriberId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public void onTerminalBlockReached(Bytes32 executionBlockHash) {
eventThread.execute(() -> internalTerminalBlockReached(executionBlockHash));
}

@Override
public boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot) {
return proposersDataManager.validatorIsConnected(validatorIndex, currentSlot);
}

@Override
public void onPreparedProposersUpdated() {
eventThread.execute(this::internalUpdatePreparableProposers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ public SafeFuture<Void> updateValidatorRegistrations(
headState, signedValidatorRegistrations, currentSlot));
}

// used in ForkChoice validator_is_connected
public boolean validatorIsConnected(final UInt64 validatorIndex, final UInt64 currentSlot) {
final PreparedProposerInfo info = preparedProposerInfoByValidatorIndex.get(validatorIndex);
return info != null && !info.hasExpired(currentSlot);
}

private void updatePreparedProposerCache(
final Collection<BeaconPreparableProposer> preparedProposers, final UInt64 currentSlot) {
final UInt64 expirySlot =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright Consensys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.statetransition.forkchoice;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import java.util.List;
import java.util.Optional;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.ethereum.execution.types.Eth1Address;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.operations.versions.bellatrix.BeaconPreparableProposer;
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.client.RecentChainData;

class ProposersDataManagerTest {

private final Spec spec = TestSpecFactory.createMinimalCapella();

private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec);

private final RecentChainData recentChainData = mock(RecentChainData.class);

private final ExecutionLayerChannel channel = ExecutionLayerChannel.NOOP;
private final MetricsSystem metricsSystem = new NoOpMetricsSystem();

private final Eth1Address defaultAddress = dataStructureUtil.randomEth1Address();
private final ProposersDataManager manager =
new ProposersDataManager(
mock(EventThread.class),
spec,
metricsSystem,
channel,
recentChainData,
Optional.of(defaultAddress));

final List<BeaconPreparableProposer> proposers =
List.of(
new BeaconPreparableProposer(UInt64.ONE, dataStructureUtil.randomEth1Address()),
new BeaconPreparableProposer(UInt64.ZERO, defaultAddress));

@Test
void validatorIsConnected_notFound_withEmptyPreparedList() {
assertThat(manager.validatorIsConnected(UInt64.ZERO, UInt64.ZERO)).isFalse();
}

@Test
void validatorIsConnected_found_withPreparedProposer() {
manager.updatePreparedProposers(proposers, UInt64.ONE);
assertThat(manager.validatorIsConnected(UInt64.ONE, UInt64.valueOf(1))).isTrue();
}

@Test
void validatorIsConnected_notFound_withDifferentPreparedProposer() {
manager.updatePreparedProposers(proposers, UInt64.ONE);
assertThat(manager.validatorIsConnected(UInt64.valueOf(2), UInt64.valueOf(2))).isFalse();
}

@Test
void validatorIsConnected_notFound_withExpiredPreparedProposer() {
manager.updatePreparedProposers(proposers, UInt64.ONE);
assertThat(manager.validatorIsConnected(UInt64.ONE, UInt64.valueOf(26))).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,9 @@ public SafeFuture<Optional<ExecutionPayloadContext>> getPayloadId(

@Override
public void onTerminalBlockReached(final Bytes32 executionBlockHash) {}

@Override
public boolean validatorIsConnected(UInt64 validatorIndex, UInt64 currentSlot) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ protected SafeFuture<?> initialize() {
metricsSystem,
storeConfig,
beaconAsyncRunner,
timeProvider,
storageQueryChannel,
storageUpdateChannel,
voteUpdateChannel,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Consensys Software Inc., 2023
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.storage.client;

import static tech.pegasys.teku.spec.constants.NetworkConstants.INTERVALS_PER_SLOT;

import java.util.Map;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.infrastructure.collections.LimitedMap;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public class BlockTimelinessTracker {
private static final Logger LOG = LogManager.getLogger();
private final Map<Bytes32, Boolean> blockTimeliness;
private final TimeProvider timeProvider;
private final Spec spec;
private final RecentChainData recentChainData;

// implements is_timely from Consensus Spec
public BlockTimelinessTracker(
final Spec spec, final RecentChainData recentChainData, final TimeProvider timeProvider) {
this.spec = spec;
final int epochsForTimeliness =
Math.max(spec.getGenesisSpecConfig().getReorgMaxEpochsSinceFinalization(), 3);
this.blockTimeliness =
LimitedMap.createSynchronizedNatural(
spec.getGenesisSpec().getSlotsPerEpoch() * epochsForTimeliness);
this.timeProvider = timeProvider;
this.recentChainData = recentChainData;
}

public void setBlockTimelinessFromArrivalTime(
final SignedBeaconBlock block, final UInt64 arrivalTimeMillis) {
final UInt64 genesisTime = recentChainData.getGenesisTime();
final UInt64 computedSlot = spec.getCurrentSlot(timeProvider.getTimeInSeconds(), genesisTime);
final Bytes32 root = block.getRoot();
if (computedSlot.isGreaterThan(block.getMessage().getSlot())) {
LOG.debug(
"Block {}:{} is before computed slot {}, timeliness set to false.",
root,
block.getSlot(),
computedSlot);
blockTimeliness.put(root, false);
return;
}
recentChainData
.getCurrentSlot()
.ifPresent(
slot -> {
final UInt64 slotStartTimeMillis =
spec.getSlotStartTimeMillis(slot, genesisTime.times(1000));
final int millisIntoSlot =
arrivalTimeMillis.minusMinZero(slotStartTimeMillis).intValue();

final UInt64 timelinessLimit =
spec.getMillisPerSlot(slot).dividedBy(INTERVALS_PER_SLOT);

final boolean isTimely =
block.getMessage().getSlot().equals(slot)
&& timelinessLimit.isGreaterThan(millisIntoSlot);
LOG.debug(
"Block {}:{} arrived at {} ms into slot {}, timeliness limit is {} ms. result: {}",
root,
block.getSlot(),
millisIntoSlot,
computedSlot,
timelinessLimit,
isTimely);
blockTimeliness.put(root, isTimely);
});
}

public Optional<Boolean> isBlockTimely(final Bytes32 root) {
return Optional.ofNullable(blockTimeliness.get(root));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.bytes.Bytes4;
import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
Expand Down Expand Up @@ -98,10 +99,13 @@ public abstract class RecentChainData implements StoreUpdateHandler {
private volatile Optional<ChainHead> chainHead = Optional.empty();
private volatile UInt64 genesisTime;

private final BlockTimelinessTracker blockTimelinessTracker;

RecentChainData(
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final StoreConfig storeConfig,
final TimeProvider timeProvider,
final BlockProvider blockProvider,
final StateAndBlockSummaryProvider stateProvider,
final EarliestBlobSidecarSlotProvider earliestBlobSidecarSlotProvider,
Expand All @@ -120,6 +124,7 @@ public abstract class RecentChainData implements StoreUpdateHandler {
this.chainHeadChannel = chainHeadChannel;
this.storageUpdateChannel = storageUpdateChannel;
this.finalizedCheckpointChannel = finalizedCheckpointChannel;
this.blockTimelinessTracker = new BlockTimelinessTracker(spec, this, timeProvider);
reorgCounter =
metricsSystem.createCounter(
TekuMetricCategory.BEACON,
Expand Down Expand Up @@ -613,4 +618,13 @@ public List<Bytes32> getAllBlockRootsAtSlot(final UInt64 slot) {
.map(forkChoiceStrategy -> forkChoiceStrategy.getBlockRootsAtSlot(slot))
.orElse(Collections.emptyList());
}

public void setBlockTimelinessFromArrivalTime(
final SignedBeaconBlock block, final UInt64 arrivalTime) {
blockTimelinessTracker.setBlockTimelinessFromArrivalTime(block, arrivalTime);
}

public Optional<Boolean> getBlockTimeliness(final Bytes32 blockRoot) {
return blockTimelinessTracker.isBlockTimely(blockRoot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.teku.dataproviders.lookup.StateAndBlockSummaryProvider;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.Constants;
import tech.pegasys.teku.storage.api.ChainHeadChannel;
Expand All @@ -49,6 +50,7 @@ public StorageBackedRecentChainData(
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final StoreConfig storeConfig,
final TimeProvider timeProvider,
final StorageQueryChannel storageQueryChannel,
final StorageUpdateChannel storageUpdateChannel,
final VoteUpdateChannel voteUpdateChannel,
Expand All @@ -59,6 +61,7 @@ public StorageBackedRecentChainData(
asyncRunner,
metricsSystem,
storeConfig,
timeProvider,
storageQueryChannel::getHotBlocksByRoot,
storageQueryChannel::getHotStateAndBlockSummaryByBlockRoot,
storageQueryChannel::getEarliestAvailableBlobSidecarSlot,
Expand All @@ -77,6 +80,7 @@ public static SafeFuture<RecentChainData> create(
final MetricsSystem metricsSystem,
final StoreConfig storeConfig,
final AsyncRunner asyncRunner,
final TimeProvider timeProvider,
final StorageQueryChannel storageQueryChannel,
final StorageUpdateChannel storageUpdateChannel,
final VoteUpdateChannel voteUpdateChannel,
Expand All @@ -88,6 +92,7 @@ public static SafeFuture<RecentChainData> create(
asyncRunner,
metricsSystem,
storeConfig,
timeProvider,
storageQueryChannel,
storageUpdateChannel,
voteUpdateChannel,
Expand All @@ -103,6 +108,7 @@ public static RecentChainData createImmediately(
final AsyncRunner asyncRunner,
final MetricsSystem metricsSystem,
final StoreConfig storeConfig,
final TimeProvider timeProvider,
final StorageQueryChannel storageQueryChannel,
final StorageUpdateChannel storageUpdateChannel,
final VoteUpdateChannel voteUpdateChannel,
Expand All @@ -114,6 +120,7 @@ public static RecentChainData createImmediately(
asyncRunner,
metricsSystem,
storeConfig,
timeProvider,
storageQueryChannel,
storageUpdateChannel,
voteUpdateChannel,
Expand Down
Loading

0 comments on commit 47a457a

Please sign in to comment.