Skip to content

Commit

Permalink
L1-191: Slo metrics refactor (#1749)
Browse files Browse the repository at this point in the history
# Description
Refactor, which:
1. Unifies handling of metrics like `aleph_best_block[_1s]`,
`aleph_reorgs`, `aleph_transaction_to_block_time` with the good old
timing block metrics based on checkpoints.
2. Changes how `Checkpoint::Imported` and `Checkpoint::Finalized` are
reported - instead of reporting these in wrappers, like
`TracingBlockImport`, now these will be imported using notification
streams. This may cause these metrics to show slightly larger time, but
as experiments show this has in practice little impact (`Importing ->
Imported` is ~4ms instead of ~3.7ms under normal conditions and almost
empty blocks on featurenet).
3. As a consequence, we can remove `TracingBlockImport` and move metrics
initialization from `bin/node/service.rs` to `finality-aleph`.
4. Small consistency improvements in what was previously
`AllBlockMetrics` - imho now it's slightly better, but this is
subjective.

Tests in `metrics/transaction-pool` were unchanged, only setup for them
was slightly adjusted, so feel free to just skim through them.

## Type of change

(refactor)

# Checklist:

(none)
  • Loading branch information
ggawryal authored Jun 11, 2024
1 parent bbf0c2b commit 73468f1
Show file tree
Hide file tree
Showing 21 changed files with 742 additions and 976 deletions.
13 changes: 4 additions & 9 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use std::{

use fake_runtime_api::fake_runtime::RuntimeApi;
use finality_aleph::{
build_network, get_aleph_block_import, run_validator_node, AlephConfig, AllBlockMetrics,
BlockImporter, BuildNetworkOutput, ChannelProvider, FavouriteSelectChainProvider,
Justification, JustificationTranslator, MillisecsPerBlock, RateLimiterConfig,
RedirectingBlockImport, SessionPeriod, SubstrateChainStatus, SyncOracle, ValidatorAddressCache,
build_network, get_aleph_block_import, run_validator_node, AlephConfig, BlockImporter,
BuildNetworkOutput, ChannelProvider, FavouriteSelectChainProvider, Justification,
JustificationTranslator, MillisecsPerBlock, RateLimiterConfig, RedirectingBlockImport,
SessionPeriod, SubstrateChainStatus, SyncOracle, ValidatorAddressCache,
};
use log::warn;
use pallet_aleph_runtime_api::AlephSessionApi;
Expand Down Expand Up @@ -51,7 +51,6 @@ pub struct ServiceComponents {
pub keystore_container: KeystoreContainer,
pub justification_channel_provider: ChannelProvider<Justification>,
pub telemetry: Option<Telemetry>,
pub metrics: AllBlockMetrics,
}
struct LimitNonfinalized(u32);

Expand Down Expand Up @@ -133,14 +132,12 @@ pub fn new_partial(config: &Configuration) -> Result<ServiceComponents, ServiceE
SubstrateChainStatus::new(backend.clone())
.map_err(|e| ServiceError::Other(format!("failed to set up chain status: {e}")))?,
);
let metrics = AllBlockMetrics::new(config.prometheus_registry());
let justification_channel_provider = ChannelProvider::new();
let aleph_block_import = get_aleph_block_import(
client.clone(),
justification_channel_provider.get_sender(),
justification_translator,
select_chain_provider.select_chain(),
metrics.clone(),
);

let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
Expand Down Expand Up @@ -182,7 +179,6 @@ pub fn new_partial(config: &Configuration) -> Result<ServiceComponents, ServiceE
transaction_pool,
justification_channel_provider,
telemetry,
metrics,
})
}

Expand Down Expand Up @@ -394,7 +390,6 @@ pub fn new_authority(
keystore: service_components.keystore_container.local_keystore(),
justification_channel_provider: service_components.justification_channel_provider,
block_rx,
metrics: service_components.metrics,
registry: prometheus_registry,
unit_creation_delay: aleph_config.unit_creation_delay(),
backup_saving_path: backup_path,
Expand Down
13 changes: 13 additions & 0 deletions finality-aleph/src/block/substrate/chain_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ impl Display for Error {
}
}

impl std::error::Error for Error {}

impl From<BackendError> for Error {
fn from(value: BackendError) -> Self {
Error::Backend(value)
Expand Down Expand Up @@ -159,6 +161,17 @@ impl SubstrateChainStatus {
fn finalized_hash(&self) -> AlephHash {
self.info().finalized_hash
}

/// Computes lowest common ancestor between two blocks. Warning: complexity
/// O(distance between blocks).
pub fn lowest_common_ancestor(&self, from: &BlockId, to: &BlockId) -> Result<BlockId, Error> {
let result = sp_blockchain::lowest_common_ancestor(
self.backend.blockchain(),
from.hash(),
to.hash(),
)?;
Ok((result.hash, result.number).into())
}
}

impl ChainStatus<Block, Justification> for SubstrateChainStatus {
Expand Down
19 changes: 10 additions & 9 deletions finality-aleph/src/block/substrate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use sp_runtime::traits::{CheckedSub, Header as _, One};
use crate::{
aleph_primitives::{Block, Header},
block::{Block as BlockT, BlockId, BlockImport, Header as HeaderT, UnverifiedHeader},
metrics::{AllBlockMetrics, Checkpoint},
metrics::TimingBlockMetrics,
};

mod chain_status;
Expand All @@ -19,9 +19,12 @@ pub use justification::{
InnerJustification, Justification, JustificationTranslator, TranslateError,
};
pub use status_notifier::SubstrateChainStatusNotifier;
pub use verification::{SessionVerifier, SubstrateFinalizationInfo, VerifierCache};
pub use verification::{SubstrateFinalizationInfo, VerifierCache};

use crate::block::{BestBlockSelector, BlockchainEvents};
use crate::{
block::{BestBlockSelector, BlockchainEvents},
metrics::Checkpoint,
};

const LOG_TARGET: &str = "aleph-substrate";

Expand Down Expand Up @@ -60,18 +63,18 @@ impl HeaderT for Header {
/// Wrapper around the trait object that we get from Substrate.
pub struct BlockImporter {
importer: Box<dyn ImportQueueService<Block>>,
metrics: AllBlockMetrics,
metrics: TimingBlockMetrics,
}

impl BlockImporter {
pub fn new(importer: Box<dyn ImportQueueService<Block>>) -> Self {
Self {
importer,
metrics: AllBlockMetrics::new(None),
metrics: TimingBlockMetrics::noop(),
}
}

pub fn attach_metrics(&mut self, metrics: AllBlockMetrics) {
pub fn attach_metrics(&mut self, metrics: TimingBlockMetrics) {
self.metrics = metrics;
}
}
Expand All @@ -85,7 +88,6 @@ impl BlockImport<Block> for BlockImporter {
false => BlockOrigin::NetworkBroadcast,
};
let hash = block.header.hash();
let number = *block.header.number();
let incoming_block = IncomingBlock::<Block> {
hash,
header: Some(block.header),
Expand All @@ -98,8 +100,7 @@ impl BlockImport<Block> for BlockImporter {
import_existing: false,
state: None,
};
self.metrics
.report_block(BlockId::new(hash, number), Checkpoint::Importing, Some(own));
self.metrics.report_block(hash, Checkpoint::Importing);
self.importer.import_blocks(origin, vec![incoming_block]);
}
}
Expand Down
12 changes: 6 additions & 6 deletions finality-aleph/src/data_io/data_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
aleph_primitives::BlockNumber,
block::{BestBlockSelector, Header, HeaderBackend, UnverifiedHeader},
data_io::{proposal::UnvalidatedAlephProposal, AlephData, MAX_DATA_BRANCH_LEN},
metrics::{AllBlockMetrics, Checkpoint},
metrics::{Checkpoint, TimingBlockMetrics},
party::manager::Runnable,
BlockId, SessionBoundaries,
};
Expand Down Expand Up @@ -161,7 +161,7 @@ where
client: C,
session_boundaries: SessionBoundaries,
config: ChainTrackerConfig,
metrics: AllBlockMetrics,
metrics: TimingBlockMetrics,
) -> (Self, DataProvider<H::Unverified>) {
let data_to_propose = Arc::new(Mutex::new(None));
(
Expand Down Expand Up @@ -317,7 +317,7 @@ where
#[derive(Clone)]
pub struct DataProvider<UH: UnverifiedHeader> {
data_to_propose: Arc<Mutex<Option<AlephData<UH>>>>,
metrics: AllBlockMetrics,
metrics: TimingBlockMetrics,
}

// Honest nodes propose data in session `k` as follows:
Expand All @@ -335,7 +335,7 @@ impl<UH: UnverifiedHeader> DataProvider<UH> {
if let Some(data) = &data_to_propose {
let top_block = data.head_proposal.top_block();
self.metrics
.report_block(top_block, Checkpoint::Proposed, None);
.report_block(top_block.hash(), Checkpoint::Proposed);
debug!(target: LOG_TARGET, "Outputting {:?} in get_data", data);
};

Expand All @@ -355,7 +355,7 @@ mod tests {
data_provider::{ChainTracker, ChainTrackerConfig},
AlephData, DataProvider, MAX_DATA_BRANCH_LEN,
},
metrics::AllBlockMetrics,
metrics::TimingBlockMetrics,
party::manager::Runnable,
testing::{
client_chain_builder::ClientChainBuilder,
Expand Down Expand Up @@ -394,7 +394,7 @@ mod tests {
client,
session_boundaries,
config,
AllBlockMetrics::new(None),
TimingBlockMetrics::noop(),
);

let (exit_chain_tracker_tx, exit_chain_tracker_rx) = oneshot::channel();
Expand Down
7 changes: 1 addition & 6 deletions finality-aleph/src/finalization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use sp_runtime::{

use crate::{
aleph_primitives::{BlockHash, BlockNumber},
metrics::{AllBlockMetrics, Checkpoint},
BlockId,
};

Expand All @@ -26,7 +25,6 @@ where
C: HeaderBackend<B> + LockImportRun<B, BE> + Finalizer<B, BE>,
{
client: Arc<C>,
metrics: AllBlockMetrics,
phantom: PhantomData<(B, BE)>,
}

Expand All @@ -36,10 +34,9 @@ where
BE: Backend<B>,
C: HeaderBackend<B> + LockImportRun<B, BE> + Finalizer<B, BE>,
{
pub(crate) fn new(client: Arc<C>, metrics: AllBlockMetrics) -> Self {
pub(crate) fn new(client: Arc<C>) -> Self {
AlephFinalizer {
client,
metrics,
phantom: PhantomData,
}
}
Expand Down Expand Up @@ -74,8 +71,6 @@ where
match &update_res {
Ok(_) => {
debug!(target: "aleph-finality", "Successfully finalized block with hash {:?} and number {:?}. Current best: #{:?}.", hash, number, status.best_number);
self.metrics
.report_block(block, Checkpoint::Finalized, None);
}
Err(_) => {
debug!(target: "aleph-finality", "Failed to finalize block with hash {:?} and number {:?}. Current best: #{:?}.", hash, number, status.best_number)
Expand Down
69 changes: 2 additions & 67 deletions finality-aleph/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ use sc_consensus::{
BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
JustificationImport,
};
use sp_consensus::{BlockOrigin, Error as ConsensusError, SelectChain};
use sp_consensus::{Error as ConsensusError, SelectChain};
use sp_runtime::{traits::Header as HeaderT, Justification as SubstrateJustification};

use crate::{
aleph_primitives::{Block, BlockHash, BlockNumber, ALEPH_ENGINE_ID},
block::substrate::{Justification, JustificationTranslator, TranslateError},
justification::{backwards_compatible_decode, DecodeError},
metrics::{AllBlockMetrics, Checkpoint},
BlockId,
};

Expand All @@ -26,14 +25,12 @@ pub fn get_aleph_block_import<I, SC>(
justification_tx: UnboundedSender<Justification>,
translator: JustificationTranslator,
select_chain: SC,
metrics: AllBlockMetrics,
) -> impl BlockImport<Block, Error = I::Error> + JustificationImport<Block, Error = ConsensusError> + Clone
where
I: BlockImport<Block> + Send + Sync + Clone,
SC: SelectChain<Block> + Send + Sync,
{
let tracing_import = TracingBlockImport::new(inner, metrics);
let favourite_marker_import = FavouriteMarkerBlockImport::new(tracing_import, select_chain);
let favourite_marker_import = FavouriteMarkerBlockImport::new(inner, select_chain);

AlephBlockImport::new(favourite_marker_import, justification_tx, translator)
}
Expand Down Expand Up @@ -92,68 +89,6 @@ where
}
}

/// A wrapper around a block import that also marks the start and end of the import of every block
/// in the metrics, if provided.
#[derive(Clone)]
struct TracingBlockImport<I>
where
I: BlockImport<Block> + Send + Sync,
{
inner: I,
metrics: AllBlockMetrics,
}

impl<I> TracingBlockImport<I>
where
I: BlockImport<Block> + Send + Sync,
{
pub fn new(inner: I, metrics: AllBlockMetrics) -> Self {
TracingBlockImport { inner, metrics }
}
}

#[async_trait::async_trait]
impl<I> BlockImport<Block> for TracingBlockImport<I>
where
I: BlockImport<Block> + Send + Sync,
{
type Error = I::Error;

async fn check_block(
&mut self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.check_block(block).await
}

async fn import_block(
&mut self,
block: BlockImportParams<Block>,
) -> Result<ImportResult, Self::Error> {
let post_hash = block.post_hash();
let number = *block.post_header().number();
let is_own = block.origin == BlockOrigin::Own;
// Self-created blocks are imported without using the import queue,
// so we need to report them here.
self.metrics.report_block(
BlockId::new(post_hash, number),
Checkpoint::Importing,
Some(is_own),
);

let result = self.inner.import_block(block).await;

if let Ok(ImportResult::Imported(_)) = &result {
self.metrics.report_block(
BlockId::new(post_hash, number),
Checkpoint::Imported,
Some(is_own),
);
}
result
}
}

/// A wrapper around a block import that also extracts any present justifications and sends them to
/// our components which will process them further and possibly finalize the block.
#[derive(Clone)]
Expand Down
2 changes: 0 additions & 2 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ pub use crate::{
},
import::{get_aleph_block_import, AlephBlockImport, RedirectingBlockImport},
justification::AlephJustification,
metrics::{AllBlockMetrics, DefaultClock, FinalityRateMetrics, TimingBlockMetrics},
network::{
address_cache::{ValidatorAddressCache, ValidatorAddressingInfo},
build_network, BuildNetworkOutput, ProtocolNetwork, SubstratePeerId,
Expand Down Expand Up @@ -271,7 +270,6 @@ pub struct AlephConfig<C, T> {
pub keystore: Arc<LocalKeystore>,
pub justification_channel_provider: ChannelProvider<Justification>,
pub block_rx: mpsc::UnboundedReceiver<AlephBlock>,
pub metrics: AllBlockMetrics,
pub registry: Option<Registry>,
pub session_period: SessionPeriod,
pub millisecs_per_block: MillisecsPerBlock,
Expand Down
Loading

0 comments on commit 73468f1

Please sign in to comment.