From 73468f126a113fde454de58a249e103a6da89a00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grzegorz=20Gawrya=C5=82?= <31205678+ggawryal@users.noreply.github.com> Date: Tue, 11 Jun 2024 17:22:06 +0200 Subject: [PATCH] L1-191: Slo metrics refactor (#1749) # Description Refactor, which: 1. Unifies handling of metrics like `aleph_best_block[_1s]`, `aleph_reorgs`, `aleph_transaction_to_block_time` with the good old timing block metrics based on checkpoints. 2. Changes how `Checkpoint::Imported` and `Checkpoint::Finalized` are reported - instead of reporting these in wrappers, like `TracingBlockImport`, now these will be imported using notification streams. This may cause these metrics to show slightly larger time, but as experiments show this has in practice little impact (`Importing -> Imported` is ~4ms instead of ~3.7ms under normal conditions and almost empty blocks on featurenet). 3. As a consequence, we can remove `TracingBlockImport` and move metrics initialization from `bin/node/service.rs` to `finality-aleph`. 4. Small consistency improvements in what was previously `AllBlockMetrics` - imho now it's slightly better, but this is subjective. Tests in `metrics/transaction-pool` were unchanged, only setup for them was slightly adjusted, so feel free to just skim through them. ## Type of change (refactor) # Checklist: (none) --- bin/node/src/service.rs | 13 +- .../src/block/substrate/chain_status.rs | 13 + finality-aleph/src/block/substrate/mod.rs | 19 +- finality-aleph/src/data_io/data_provider.rs | 12 +- finality-aleph/src/finalization.rs | 7 +- finality-aleph/src/import.rs | 69 +- finality-aleph/src/lib.rs | 2 - finality-aleph/src/metrics/all_block.rs | 53 -- finality-aleph/src/metrics/best_block.rs | 99 +++ finality-aleph/src/metrics/chain_state.rs | 682 ------------------ finality-aleph/src/metrics/finality_rate.rs | 53 +- finality-aleph/src/metrics/mod.rs | 12 +- finality-aleph/src/metrics/slo.rs | 122 ++++ finality-aleph/src/metrics/timing.rs | 40 +- .../src/metrics/transaction_pool.rs | 436 ++++++++++- finality-aleph/src/network/session/mod.rs | 2 +- finality-aleph/src/nodes.rs | 40 +- .../src/party/manager/aggregator.rs | 11 +- finality-aleph/src/party/manager/mod.rs | 6 +- finality-aleph/src/sync/service.rs | 24 +- primitives/src/lib.rs | 3 + 21 files changed, 742 insertions(+), 976 deletions(-) delete mode 100644 finality-aleph/src/metrics/all_block.rs create mode 100644 finality-aleph/src/metrics/best_block.rs delete mode 100644 finality-aleph/src/metrics/chain_state.rs create mode 100644 finality-aleph/src/metrics/slo.rs diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index b18f76ab3d..e78ae0e114 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -7,10 +7,10 @@ use std::{ use fake_runtime_api::fake_runtime::RuntimeApi; use finality_aleph::{ - build_network, get_aleph_block_import, run_validator_node, AlephConfig, AllBlockMetrics, - BlockImporter, BuildNetworkOutput, ChannelProvider, FavouriteSelectChainProvider, - Justification, JustificationTranslator, MillisecsPerBlock, RateLimiterConfig, - RedirectingBlockImport, SessionPeriod, SubstrateChainStatus, SyncOracle, ValidatorAddressCache, + build_network, get_aleph_block_import, run_validator_node, AlephConfig, BlockImporter, + BuildNetworkOutput, ChannelProvider, FavouriteSelectChainProvider, Justification, + JustificationTranslator, MillisecsPerBlock, RateLimiterConfig, RedirectingBlockImport, + SessionPeriod, SubstrateChainStatus, SyncOracle, ValidatorAddressCache, }; use log::warn; use pallet_aleph_runtime_api::AlephSessionApi; @@ -51,7 +51,6 @@ pub struct ServiceComponents { pub keystore_container: KeystoreContainer, pub justification_channel_provider: ChannelProvider, pub telemetry: Option, - pub metrics: AllBlockMetrics, } struct LimitNonfinalized(u32); @@ -133,14 +132,12 @@ pub fn new_partial(config: &Configuration) -> Result Result for Error { fn from(value: BackendError) -> Self { Error::Backend(value) @@ -159,6 +161,17 @@ impl SubstrateChainStatus { fn finalized_hash(&self) -> AlephHash { self.info().finalized_hash } + + /// Computes lowest common ancestor between two blocks. Warning: complexity + /// O(distance between blocks). + pub fn lowest_common_ancestor(&self, from: &BlockId, to: &BlockId) -> Result { + let result = sp_blockchain::lowest_common_ancestor( + self.backend.blockchain(), + from.hash(), + to.hash(), + )?; + Ok((result.hash, result.number).into()) + } } impl ChainStatus for SubstrateChainStatus { diff --git a/finality-aleph/src/block/substrate/mod.rs b/finality-aleph/src/block/substrate/mod.rs index d69651d937..aab2729a81 100644 --- a/finality-aleph/src/block/substrate/mod.rs +++ b/finality-aleph/src/block/substrate/mod.rs @@ -5,7 +5,7 @@ use sp_runtime::traits::{CheckedSub, Header as _, One}; use crate::{ aleph_primitives::{Block, Header}, block::{Block as BlockT, BlockId, BlockImport, Header as HeaderT, UnverifiedHeader}, - metrics::{AllBlockMetrics, Checkpoint}, + metrics::TimingBlockMetrics, }; mod chain_status; @@ -19,9 +19,12 @@ pub use justification::{ InnerJustification, Justification, JustificationTranslator, TranslateError, }; pub use status_notifier::SubstrateChainStatusNotifier; -pub use verification::{SessionVerifier, SubstrateFinalizationInfo, VerifierCache}; +pub use verification::{SubstrateFinalizationInfo, VerifierCache}; -use crate::block::{BestBlockSelector, BlockchainEvents}; +use crate::{ + block::{BestBlockSelector, BlockchainEvents}, + metrics::Checkpoint, +}; const LOG_TARGET: &str = "aleph-substrate"; @@ -60,18 +63,18 @@ impl HeaderT for Header { /// Wrapper around the trait object that we get from Substrate. pub struct BlockImporter { importer: Box>, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, } impl BlockImporter { pub fn new(importer: Box>) -> Self { Self { importer, - metrics: AllBlockMetrics::new(None), + metrics: TimingBlockMetrics::noop(), } } - pub fn attach_metrics(&mut self, metrics: AllBlockMetrics) { + pub fn attach_metrics(&mut self, metrics: TimingBlockMetrics) { self.metrics = metrics; } } @@ -85,7 +88,6 @@ impl BlockImport for BlockImporter { false => BlockOrigin::NetworkBroadcast, }; let hash = block.header.hash(); - let number = *block.header.number(); let incoming_block = IncomingBlock:: { hash, header: Some(block.header), @@ -98,8 +100,7 @@ impl BlockImport for BlockImporter { import_existing: false, state: None, }; - self.metrics - .report_block(BlockId::new(hash, number), Checkpoint::Importing, Some(own)); + self.metrics.report_block(hash, Checkpoint::Importing); self.importer.import_blocks(origin, vec![incoming_block]); } } diff --git a/finality-aleph/src/data_io/data_provider.rs b/finality-aleph/src/data_io/data_provider.rs index ddcf086b57..7a8a0be3e7 100644 --- a/finality-aleph/src/data_io/data_provider.rs +++ b/finality-aleph/src/data_io/data_provider.rs @@ -9,7 +9,7 @@ use crate::{ aleph_primitives::BlockNumber, block::{BestBlockSelector, Header, HeaderBackend, UnverifiedHeader}, data_io::{proposal::UnvalidatedAlephProposal, AlephData, MAX_DATA_BRANCH_LEN}, - metrics::{AllBlockMetrics, Checkpoint}, + metrics::{Checkpoint, TimingBlockMetrics}, party::manager::Runnable, BlockId, SessionBoundaries, }; @@ -161,7 +161,7 @@ where client: C, session_boundaries: SessionBoundaries, config: ChainTrackerConfig, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, ) -> (Self, DataProvider) { let data_to_propose = Arc::new(Mutex::new(None)); ( @@ -317,7 +317,7 @@ where #[derive(Clone)] pub struct DataProvider { data_to_propose: Arc>>>, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, } // Honest nodes propose data in session `k` as follows: @@ -335,7 +335,7 @@ impl DataProvider { if let Some(data) = &data_to_propose { let top_block = data.head_proposal.top_block(); self.metrics - .report_block(top_block, Checkpoint::Proposed, None); + .report_block(top_block.hash(), Checkpoint::Proposed); debug!(target: LOG_TARGET, "Outputting {:?} in get_data", data); }; @@ -355,7 +355,7 @@ mod tests { data_provider::{ChainTracker, ChainTrackerConfig}, AlephData, DataProvider, MAX_DATA_BRANCH_LEN, }, - metrics::AllBlockMetrics, + metrics::TimingBlockMetrics, party::manager::Runnable, testing::{ client_chain_builder::ClientChainBuilder, @@ -394,7 +394,7 @@ mod tests { client, session_boundaries, config, - AllBlockMetrics::new(None), + TimingBlockMetrics::noop(), ); let (exit_chain_tracker_tx, exit_chain_tracker_rx) = oneshot::channel(); diff --git a/finality-aleph/src/finalization.rs b/finality-aleph/src/finalization.rs index 5244a1ae6b..8f9d287753 100644 --- a/finality-aleph/src/finalization.rs +++ b/finality-aleph/src/finalization.rs @@ -11,7 +11,6 @@ use sp_runtime::{ use crate::{ aleph_primitives::{BlockHash, BlockNumber}, - metrics::{AllBlockMetrics, Checkpoint}, BlockId, }; @@ -26,7 +25,6 @@ where C: HeaderBackend + LockImportRun + Finalizer, { client: Arc, - metrics: AllBlockMetrics, phantom: PhantomData<(B, BE)>, } @@ -36,10 +34,9 @@ where BE: Backend, C: HeaderBackend + LockImportRun + Finalizer, { - pub(crate) fn new(client: Arc, metrics: AllBlockMetrics) -> Self { + pub(crate) fn new(client: Arc) -> Self { AlephFinalizer { client, - metrics, phantom: PhantomData, } } @@ -74,8 +71,6 @@ where match &update_res { Ok(_) => { debug!(target: "aleph-finality", "Successfully finalized block with hash {:?} and number {:?}. Current best: #{:?}.", hash, number, status.best_number); - self.metrics - .report_block(block, Checkpoint::Finalized, None); } Err(_) => { debug!(target: "aleph-finality", "Failed to finalize block with hash {:?} and number {:?}. Current best: #{:?}.", hash, number, status.best_number) diff --git a/finality-aleph/src/import.rs b/finality-aleph/src/import.rs index 0d24ed43e5..e9c6394ceb 100644 --- a/finality-aleph/src/import.rs +++ b/finality-aleph/src/import.rs @@ -9,14 +9,13 @@ use sc_consensus::{ BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, JustificationImport, }; -use sp_consensus::{BlockOrigin, Error as ConsensusError, SelectChain}; +use sp_consensus::{Error as ConsensusError, SelectChain}; use sp_runtime::{traits::Header as HeaderT, Justification as SubstrateJustification}; use crate::{ aleph_primitives::{Block, BlockHash, BlockNumber, ALEPH_ENGINE_ID}, block::substrate::{Justification, JustificationTranslator, TranslateError}, justification::{backwards_compatible_decode, DecodeError}, - metrics::{AllBlockMetrics, Checkpoint}, BlockId, }; @@ -26,14 +25,12 @@ pub fn get_aleph_block_import( justification_tx: UnboundedSender, translator: JustificationTranslator, select_chain: SC, - metrics: AllBlockMetrics, ) -> impl BlockImport + JustificationImport + Clone where I: BlockImport + Send + Sync + Clone, SC: SelectChain + Send + Sync, { - let tracing_import = TracingBlockImport::new(inner, metrics); - let favourite_marker_import = FavouriteMarkerBlockImport::new(tracing_import, select_chain); + let favourite_marker_import = FavouriteMarkerBlockImport::new(inner, select_chain); AlephBlockImport::new(favourite_marker_import, justification_tx, translator) } @@ -92,68 +89,6 @@ where } } -/// A wrapper around a block import that also marks the start and end of the import of every block -/// in the metrics, if provided. -#[derive(Clone)] -struct TracingBlockImport -where - I: BlockImport + Send + Sync, -{ - inner: I, - metrics: AllBlockMetrics, -} - -impl TracingBlockImport -where - I: BlockImport + Send + Sync, -{ - pub fn new(inner: I, metrics: AllBlockMetrics) -> Self { - TracingBlockImport { inner, metrics } - } -} - -#[async_trait::async_trait] -impl BlockImport for TracingBlockImport -where - I: BlockImport + Send + Sync, -{ - type Error = I::Error; - - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { - self.inner.check_block(block).await - } - - async fn import_block( - &mut self, - block: BlockImportParams, - ) -> Result { - let post_hash = block.post_hash(); - let number = *block.post_header().number(); - let is_own = block.origin == BlockOrigin::Own; - // Self-created blocks are imported without using the import queue, - // so we need to report them here. - self.metrics.report_block( - BlockId::new(post_hash, number), - Checkpoint::Importing, - Some(is_own), - ); - - let result = self.inner.import_block(block).await; - - if let Ok(ImportResult::Imported(_)) = &result { - self.metrics.report_block( - BlockId::new(post_hash, number), - Checkpoint::Imported, - Some(is_own), - ); - } - result - } -} - /// A wrapper around a block import that also extracts any present justifications and sends them to /// our components which will process them further and possibly finalize the block. #[derive(Clone)] diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 8a6ce39add..73d0fdd7b6 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -70,7 +70,6 @@ pub use crate::{ }, import::{get_aleph_block_import, AlephBlockImport, RedirectingBlockImport}, justification::AlephJustification, - metrics::{AllBlockMetrics, DefaultClock, FinalityRateMetrics, TimingBlockMetrics}, network::{ address_cache::{ValidatorAddressCache, ValidatorAddressingInfo}, build_network, BuildNetworkOutput, ProtocolNetwork, SubstratePeerId, @@ -271,7 +270,6 @@ pub struct AlephConfig { pub keystore: Arc, pub justification_channel_provider: ChannelProvider, pub block_rx: mpsc::UnboundedReceiver, - pub metrics: AllBlockMetrics, pub registry: Option, pub session_period: SessionPeriod, pub millisecs_per_block: MillisecsPerBlock, diff --git a/finality-aleph/src/metrics/all_block.rs b/finality-aleph/src/metrics/all_block.rs deleted file mode 100644 index ac66384027..0000000000 --- a/finality-aleph/src/metrics/all_block.rs +++ /dev/null @@ -1,53 +0,0 @@ -use log::warn; -use substrate_prometheus_endpoint::Registry; - -use super::{finality_rate::FinalityRateMetrics, timing::DefaultClock, Checkpoint}; -use crate::{metrics::LOG_TARGET, BlockId, TimingBlockMetrics}; - -/// Wrapper around various block-related metrics. -#[derive(Clone)] -pub struct AllBlockMetrics { - timing_metrics: TimingBlockMetrics, - finality_rate_metrics: FinalityRateMetrics, -} - -impl AllBlockMetrics { - pub fn new(registry: Option<&Registry>) -> Self { - let timing_metrics = match TimingBlockMetrics::new(registry, DefaultClock) { - Ok(timing_metrics) => timing_metrics, - Err(e) => { - warn!( - target: LOG_TARGET, - "Failed to register Prometheus block timing metrics: {:?}.", e - ); - TimingBlockMetrics::Noop - } - }; - let finality_rate_metrics = match FinalityRateMetrics::new(registry) { - Ok(finality_rate_metrics) => finality_rate_metrics, - Err(e) => { - warn!( - target: LOG_TARGET, - "Failed to register Prometheus finality rate metrics: {:?}.", e - ); - FinalityRateMetrics::Noop - } - }; - AllBlockMetrics { - timing_metrics, - finality_rate_metrics, - } - } - - /// Triggers all contained block metrics. - pub fn report_block(&self, block_id: BlockId, checkpoint: Checkpoint, own: Option) { - self.timing_metrics - .report_block(block_id.hash(), checkpoint); - self.finality_rate_metrics.report_block( - block_id.hash(), - block_id.number(), - checkpoint, - own, - ); - } -} diff --git a/finality-aleph/src/metrics/best_block.rs b/finality-aleph/src/metrics/best_block.rs new file mode 100644 index 0000000000..a05a69ccca --- /dev/null +++ b/finality-aleph/src/metrics/best_block.rs @@ -0,0 +1,99 @@ +use std::error::Error; + +use substrate_prometheus_endpoint::{ + register, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64, +}; + +use crate::{BlockId, BlockNumber, SubstrateChainStatus}; + +#[derive(Clone)] +pub enum BestBlockMetrics { + Prometheus { + top_finalized_block: Gauge, + best_block: Gauge, + reorgs: Histogram, + best_block_id: BlockId, + chain_status: SubstrateChainStatus, + }, + Noop, +} + +impl BestBlockMetrics { + pub fn new( + registry: Option, + chain_status: SubstrateChainStatus, + ) -> Result { + let registry = match registry { + Some(registry) => registry, + None => return Ok(Self::Noop), + }; + + Ok(Self::Prometheus { + top_finalized_block: register( + Gauge::new("aleph_top_finalized_block", "Top finalized block number")?, + ®istry, + )?, + best_block: register( + Gauge::new( + "aleph_best_block", + "Best (or more precisely, favourite) block number", + )?, + ®istry, + )?, + reorgs: register( + Histogram::with_opts( + HistogramOpts::new("aleph_reorgs", "Number of reorgs by length") + .buckets(vec![1., 2., 4., 9.]), + )?, + ®istry, + )?, + best_block_id: (Default::default(), 0u32).into(), + chain_status, + }) + } + + pub fn report_best_block_imported(&mut self, block_id: BlockId) { + if let Self::Prometheus { + best_block, + ref mut best_block_id, + reorgs, + chain_status, + .. + } = self + { + let reorg_len = retracted_path_length(chain_status, best_block_id, &block_id); + best_block.set(block_id.number() as u64); + *best_block_id = block_id; + match reorg_len { + Ok(0) => {} + Ok(reorg_len) => { + reorgs.observe(reorg_len as f64); + } + Err(e) => { + log::warn!("Failed to calculate reorg length: {:?}", e); + } + } + } + } + + pub fn report_block_finalized(&self, block_id: BlockId) { + if let Self::Prometheus { + top_finalized_block, + .. + } = self + { + top_finalized_block.set(block_id.number() as u64); + } + } +} + +fn retracted_path_length( + chain_status: &SubstrateChainStatus, + from: &BlockId, + to: &BlockId, +) -> Result> { + let lca = chain_status + .lowest_common_ancestor(from, to) + .map_err(Box::new)?; + Ok(from.number().saturating_sub(lca.number())) +} diff --git a/finality-aleph/src/metrics/chain_state.rs b/finality-aleph/src/metrics/chain_state.rs deleted file mode 100644 index e8b69b9287..0000000000 --- a/finality-aleph/src/metrics/chain_state.rs +++ /dev/null @@ -1,682 +0,0 @@ -use std::{ - fmt::Display, - num::NonZeroUsize, - time::{Duration, Instant}, -}; - -use futures::{stream::FusedStream, StreamExt}; -use lru::LruCache; -use sc_client_api::{ - BlockBackend, BlockImportNotification, FinalityNotification, FinalityNotifications, - ImportNotifications, -}; -use sp_blockchain::{lowest_common_ancestor, HeaderMetadata}; -use sp_runtime::{ - traits::{Block as BlockT, Extrinsic, Header as HeaderT, Zero}, - Saturating, -}; -use substrate_prometheus_endpoint::{ - register, Counter, Gauge, Histogram, HistogramOpts, PrometheusError, Registry, U64, -}; - -use crate::{ - metrics::{exponential_buckets_two_sided, TransactionPoolInfoProvider}, - BlockNumber, -}; - -// Size of transaction cache: 32B (Hash) + 16B (Instant) * `100_000` is approximately 4.8MB -const TRANSACTION_CACHE_SIZE: usize = 100_000; - -const BUCKETS_FACTOR: f64 = 1.4; - -#[derive(Debug)] -pub enum Error { - NoRegistry, - UnableToCreateMetrics(PrometheusError), - BlockImportStreamClosed, - FinalizedBlocksStreamClosed, - TransactionStreamClosed, -} - -impl Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Error::NoRegistry => write!(f, "Registry can not be empty."), - Error::UnableToCreateMetrics(e) => { - write!(f, "Failed to create metrics: {e}.") - } - Error::BlockImportStreamClosed => { - write!(f, "Block import notification stream ended unexpectedly.") - } - Error::FinalizedBlocksStreamClosed => { - write!(f, "Finality notification stream ended unexpectedly.") - } - Error::TransactionStreamClosed => { - write!(f, "Transaction stream ended unexpectedly.") - } - } - } -} - -enum ChainStateMetrics { - Prometheus { - top_finalized_block: Gauge, - best_block: Gauge, - reorgs: Histogram, - time_till_block_inclusion: Histogram, - transactions_not_seen_in_the_pool: Counter, - }, - Noop, -} - -impl ChainStateMetrics { - fn new(registry: Option) -> Result { - let registry = match registry { - Some(registry) => registry, - None => return Ok(ChainStateMetrics::Noop), - }; - - Ok(ChainStateMetrics::Prometheus { - top_finalized_block: register( - Gauge::new("aleph_top_finalized_block", "no help")?, - ®istry, - )?, - best_block: register(Gauge::new("aleph_best_block", "no help")?, ®istry)?, - reorgs: register( - Histogram::with_opts( - HistogramOpts::new("aleph_reorgs", "Number of reorgs by length") - .buckets(vec![1., 2., 4., 9.]), - )?, - ®istry, - )?, - time_till_block_inclusion: register( - Histogram::with_opts( - HistogramOpts::new( - "aleph_transaction_to_block_time", - "Time from becoming ready in the pool to inclusion in some valid block.", - ) - .buckets(exponential_buckets_two_sided( - 2000.0, - BUCKETS_FACTOR, - 2, - 8, - )?), - )?, - ®istry, - )?, - transactions_not_seen_in_the_pool: register( - Counter::new("aleph_transactions_not_seen_in_the_pool", "no help")?, - ®istry, - )?, - }) - } - - fn update_best_block(&self, number: BlockNumber) { - if let ChainStateMetrics::Prometheus { best_block, .. } = self { - best_block.set(number as u64) - } - } - - fn update_top_finalized_block(&self, number: BlockNumber) { - if let ChainStateMetrics::Prometheus { - top_finalized_block, - .. - } = self - { - top_finalized_block.set(number as u64); - } - } - - fn report_reorg(&self, length: BlockNumber) { - if let ChainStateMetrics::Prometheus { reorgs, .. } = self { - reorgs.observe(length as f64); - } - } - - fn report_transaction_in_block(&self, elapsed: Duration) { - if let ChainStateMetrics::Prometheus { - time_till_block_inclusion, - .. - } = self - { - time_till_block_inclusion.observe(elapsed.as_secs_f64() * 1000.); - } - } - - fn report_transaction_not_seen_in_the_pool(&self) { - if let ChainStateMetrics::Prometheus { - transactions_not_seen_in_the_pool, - .. - } = self - { - transactions_not_seen_in_the_pool.inc(); - } - } -} - -pub async fn run_chain_state_metrics< - X, - HE: HeaderT, - B: BlockT
, - BE: HeaderMetadata + BlockBackend, - TP: TransactionPoolInfoProvider, ->( - backend: &BE, - mut import_notifications: ImportNotifications, - mut finality_notifications: FinalityNotifications, - registry: Option, - mut transaction_pool_info_provider: TP, -) -> Result<(), Error> { - if registry.is_none() { - return Err(Error::NoRegistry); - } - if import_notifications.is_terminated() { - return Err(Error::BlockImportStreamClosed); - } - if finality_notifications.is_terminated() { - return Err(Error::FinalizedBlocksStreamClosed); - } - - let metrics = ChainStateMetrics::new(registry).map_err(Error::UnableToCreateMetrics)?; - let mut cache: LruCache = LruCache::new( - NonZeroUsize::new(TRANSACTION_CACHE_SIZE).expect("the cache size is a non-zero constant"), - ); - - let mut previous_best: Option = None; - - loop { - tokio::select! { - maybe_block = import_notifications.next() => { - let block = maybe_block.ok_or(Error::BlockImportStreamClosed)?; - handle_block_imported( - block, - backend, - &metrics, - &mut transaction_pool_info_provider, - &mut cache, - &mut previous_best, - ); - }, - maybe_block = finality_notifications.next() => { - let block = maybe_block.ok_or(Error::FinalizedBlocksStreamClosed)?; - handle_block_finalized(block, &metrics); - }, - maybe_transaction = transaction_pool_info_provider.next_transaction() => { - let hash = maybe_transaction.ok_or(Error::TransactionStreamClosed)?; - handle_transaction_in_pool(hash, &mut cache); - } - } - } -} - -fn handle_block_imported< - X, - HE: HeaderT, - B: BlockT
, - BE: HeaderMetadata + BlockBackend, - TP: TransactionPoolInfoProvider, ->( - block: BlockImportNotification, - backend: &BE, - metrics: &ChainStateMetrics, - transaction_pool_info_provider: &mut TP, - cache: &mut LruCache, - previous_best: &mut Option, -) { - if block.is_new_best { - metrics.update_best_block(*block.header.number()); - if let Some(reorg_len) = detect_reorgs(backend, previous_best.clone(), block.header.clone()) - { - metrics.report_reorg(reorg_len); - } - *previous_best = Some(block.header); - } - if let Ok(Some(body)) = backend.block_body(block.hash) { - report_transactions_included_in_block( - transaction_pool_info_provider, - &body, - metrics, - cache, - ); - } -} - -fn handle_block_finalized, B: BlockT
>( - block: FinalityNotification, - metrics: &ChainStateMetrics, -) { - // Sometimes finalization can also cause best block update. However, - // RPC best block subscription won't notify about that immediately, so - // we also don't update there. Also in that case, substrate sets best_block to - // the newly finalized block (see test), so the best block will be updated - // after importing anything on the newly finalized branch. - metrics.update_top_finalized_block(*block.header.number()); -} - -fn handle_transaction_in_pool( - hash: TxHash, - cache: &mut LruCache, -) { - // Putting new transaction can evict the oldest one. However, even if the - // removed transaction was actually still in the pool, we don't have - // any guarantees that it would be eventually included in the block. - // Therefore, we ignore such transaction. - cache.put(hash, Instant::now()); -} - -fn detect_reorgs, B: BlockT
, BE: HeaderMetadata>( - backend: &BE, - prev_best: Option, - best: HE, -) -> Option { - let prev_best = prev_best?; - if best.hash() == prev_best.hash() || *best.parent_hash() == prev_best.hash() { - // Quit early when no change or the best is a child of the previous best. - return None; - } - let lca = lowest_common_ancestor(backend, best.hash(), prev_best.hash()).ok()?; - let len = prev_best.number().saturating_sub(lca.number); - if len == HE::Number::zero() { - return None; - } - Some(len) -} - -fn report_transactions_included_in_block< - 'a, - TP: TransactionPoolInfoProvider, - I: IntoIterator, ->( - pool: &'a TP, - body: I, - metrics: &ChainStateMetrics, - cache: &mut LruCache, -) where - ::TxHash: std::hash::Hash + PartialEq + Eq, -{ - for xt in body { - let hash = pool.hash_of(xt); - if let Some(insert_time) = cache.pop(&hash) { - let elapsed = insert_time.elapsed(); - metrics.report_transaction_in_block(elapsed); - } else if let Some(true) = xt.is_signed() { - // Either it was never in the pool (eg. submitted locally), or we've got BlockImport - // notification faster than transaction in pool one. The latter is more likely, - // so we report it as zero. - metrics.report_transaction_in_block(Duration::ZERO); - metrics.report_transaction_not_seen_in_the_pool(); - } - } -} - -#[cfg(test)] -mod test { - use std::{collections::HashMap, sync::Arc}; - - use futures::{FutureExt, Stream}; - use parity_scale_codec::Encode; - use sc_block_builder::BlockBuilderBuilder; - use sc_client_api::{BlockchainEvents, HeaderBackend}; - use substrate_test_runtime_client::AccountKeyring; - - use super::*; - use crate::{ - metrics::transaction_pool::test::TestTransactionPoolSetup, - testing::{ - client_chain_builder::ClientChainBuilder, - mocks::{TBlock, THash, TestClientBuilder, TestClientBuilderExt}, - }, - }; - - #[tokio::test] - async fn when_finalizing_with_reorg_best_block_is_set_to_that_finalized_block() { - let client = Arc::new(TestClientBuilder::new().build()); - let client_builder = Arc::new(TestClientBuilder::new().build()); - let mut chain_builder = ClientChainBuilder::new(client.clone(), client_builder); - - let chain_a = chain_builder - .build_and_import_branch_above(&chain_builder.genesis_hash(), 5) - .await; - - // (G) - A1 - A2 - A3 - A4 - A5; - - assert_eq!( - client.chain_info().finalized_hash, - chain_builder.genesis_hash() - ); - assert_eq!(client.chain_info().best_number, 5); - - let chain_b = chain_builder - .build_and_import_branch_above(&chain_a[0].header.hash(), 3) - .await; - chain_builder.finalize_block(&chain_b[0].header.hash()); - - // (G) - (A1) - A2 - A3 - A4 - A5 - // \ - // (B2) - B3 - B4 - - assert_eq!(client.chain_info().best_number, 2); - assert_eq!(client.chain_info().finalized_hash, chain_b[0].header.hash()); - } - - #[tokio::test] - async fn test_reorg_detection() { - let client = Arc::new(TestClientBuilder::new().build()); - let client_builder = Arc::new(TestClientBuilder::new().build()); - let mut chain_builder = ClientChainBuilder::new(client.clone(), client_builder); - - let a = chain_builder - .build_and_import_branch_above(&chain_builder.genesis_hash(), 5) - .await; - let b = chain_builder - .build_and_import_branch_above(&a[0].header.hash(), 3) - .await; - let c = chain_builder - .build_and_import_branch_above(&a[2].header.hash(), 2) - .await; - - // - C0 - C1 - // / - // G - A0 - A1 - A2 - A3 - A4 - // \ - // - B0 - B1 - B2 - - for (prev, new_best, expected) in [ - (&a[1], &a[2], None), - (&a[1], &a[4], None), - (&a[1], &a[1], None), - (&a[2], &b[0], Some(2)), - (&b[0], &a[2], Some(1)), - (&c[1], &b[2], Some(4)), - ] { - assert_eq!( - detect_reorgs( - client.as_ref(), - Some(prev.header().clone()), - new_best.header().clone() - ), - expected, - ); - } - } - - // Transaction pool metrics tests - struct TestSetup { - pub pool: TestTransactionPoolSetup, - pub metrics: ChainStateMetrics, - pub cache: LruCache, - pub block_import_notifications: - Box> + Unpin>, - pub finality_notifications: Box> + Unpin>, - } - - #[derive(PartialEq, Eq, Hash, Debug)] - enum NotificationType { - BlockImport, - Finality, - Transaction, - } - - impl TestSetup { - fn new() -> Self { - let client = Arc::new(TestClientBuilder::new().build()); - - let block_import_notifications = - Box::new(client.every_import_notification_stream().fuse()); - let finality_notifications = Box::new(client.finality_notification_stream().fuse()); - - let pool = TestTransactionPoolSetup::new(client); - - let registry = Registry::new(); - let metrics = ChainStateMetrics::new(Some(registry)).expect("metrics"); - let cache = LruCache::new(NonZeroUsize::new(10).expect("cache")); - - TestSetup { - pool, - metrics, - cache, - block_import_notifications, - finality_notifications, - } - } - - fn genesis(&self) -> THash { - self.pool.client.info().genesis_hash - } - - fn transactions_histogram(&self) -> &Histogram { - match &self.metrics { - ChainStateMetrics::Prometheus { - time_till_block_inclusion, - .. - } => time_till_block_inclusion, - _ => panic!("metrics"), - } - } - - fn process_notifications(&mut self) -> HashMap { - let mut block_imported_notifications = 0; - let mut finality_notifications = 0; - let mut transaction_notifications = 0; - - while let Some(block) = self.block_import_notifications.next().now_or_never() { - handle_block_imported( - block.expect("stream should not end"), - self.pool.client.as_ref(), - &self.metrics, - &mut self.pool.transaction_pool_info_provider, - &mut self.cache, - &mut None, - ); - block_imported_notifications += 1; - } - while let Some(finality) = self.finality_notifications.next().now_or_never() { - handle_block_finalized(finality.expect("stream should not end"), &self.metrics); - finality_notifications += 1; - } - while let Some(transaction) = self - .pool - .transaction_pool_info_provider - .next_transaction() - .now_or_never() - { - handle_transaction_in_pool( - transaction.expect("stream should not end"), - &mut self.cache, - ); - transaction_notifications += 1; - } - HashMap::from_iter(vec![ - (NotificationType::BlockImport, block_imported_notifications), - (NotificationType::Finality, finality_notifications), - (NotificationType::Transaction, transaction_notifications), - ]) - } - } - - fn blocks_imported(n: usize) -> HashMap { - HashMap::from_iter(vec![ - (NotificationType::BlockImport, n), - (NotificationType::Finality, 0), - (NotificationType::Transaction, 0), - ]) - } - fn transactions(n: usize) -> HashMap { - HashMap::from_iter(vec![ - (NotificationType::BlockImport, 0), - (NotificationType::Finality, 0), - (NotificationType::Transaction, n), - ]) - } - - const EPS: Duration = Duration::from_nanos(1); - - #[tokio::test] - async fn transactions_are_reported() { - let mut setup = TestSetup::new(); - let genesis = setup.genesis(); - let xt = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); - - let time_before_submit = Instant::now(); - setup.pool.submit(&genesis, xt).await; - - assert_eq!( - setup.process_notifications(), - transactions(1), - "'In pool' notification wasn't sent" - ); - let time_after_submit = Instant::now(); - - tokio::time::sleep(Duration::from_millis(20)).await; - - let time_before_import = Instant::now(); - let _block_1 = setup.pool.propose_block(genesis, None).await; - let pre_count = setup.transactions_histogram().get_sample_count(); - - assert_eq!( - setup.process_notifications(), - blocks_imported(1), - "Block import notification wasn't sent" - ); - - let time_after_import = Instant::now(); - - let duration = - Duration::from_secs_f64(setup.transactions_histogram().get_sample_sum() / 1000.); - - assert_eq!(pre_count, 0); - assert_eq!(setup.transactions_histogram().get_sample_count(), 1); - assert!(duration >= time_before_import - time_after_submit - EPS); - assert!(duration <= time_after_import - time_before_submit + EPS); - } - - #[tokio::test] - async fn transactions_are_reported_only_if_ready_when_added_to_the_pool() { - let mut setup = TestSetup::new(); - let genesis = setup.genesis(); - - let xt1 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); - let xt2 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 1); - let xt3 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 2); - - setup.pool.submit(&genesis, xt2.clone()).await; - - // No notification for xt2 as it is not ready - assert_eq!( - setup.process_notifications(), - transactions(0), - "Future transactions should not be reported" - ); - - setup.pool.submit(&genesis, xt1.clone()).await; - setup.pool.submit(&genesis, xt3.clone()).await; - - // Notifications for xt1 and xt3 - assert_eq!(setup.process_notifications(), transactions(2)); - - let block_1 = setup.pool.propose_block(genesis, None).await; - // Block import notification. xt1 notification never appears - assert_eq!(setup.process_notifications(), blocks_imported(1)); - // All 3 extrinsics are included in the block - assert_eq!(block_1.extrinsics.len(), 3); - } - - #[tokio::test] - async fn retracted_transactions_are_reported_only_once() { - let mut setup = TestSetup::new(); - let genesis = setup.genesis(); - - let xt1 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); - let xt2 = setup - .pool - .extrinsic(AccountKeyring::Charlie, AccountKeyring::Dave, 0); - - setup.pool.submit(&genesis, xt1.clone()).await; - setup.pool.submit(&genesis, xt2.clone()).await; - - // make sure import notifications are received before block import - assert_eq!(setup.process_notifications(), transactions(2)); - - let block_1a = setup.pool.propose_block(genesis, None).await; - assert_eq!(block_1a.extrinsics.len(), 2); - assert_eq!(setup.process_notifications(), blocks_imported(1)); - assert_eq!(setup.transactions_histogram().get_sample_count(), 2); - - let sum_before = setup.transactions_histogram().get_sample_sum(); - - // external fork block with xt1 - let mut block_1b_builder = BlockBuilderBuilder::new(&*setup.pool.client) - .on_parent_block(genesis) - .with_parent_block_number(0) - .build() - .unwrap(); - - block_1b_builder.push(xt1.into()).unwrap(); - let block_1b = block_1b_builder.build().unwrap().block; - setup.pool.import_block(block_1b.clone()).await; - setup.pool.finalize(block_1b.hash()).await; - - let block_2b = setup.pool.propose_block(block_1b.hash(), None).await; - - assert_eq!(block_2b.extrinsics.len(), 1); - assert_eq!(setup.transactions_histogram().get_sample_count(), 2); - assert_eq!(setup.transactions_histogram().get_sample_sum(), sum_before); - } - - #[tokio::test] - async fn transactions_skipped_in_block_authorship_are_not_reported_at_that_time() { - let mut setup = TestSetup::new(); - let genesis = setup.genesis(); - - let xt1 = setup - .pool - .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); - let xt2 = setup - .pool - .extrinsic(AccountKeyring::Charlie, AccountKeyring::Eve, 0); - - setup.pool.submit(&genesis, xt1.clone()).await; - setup.pool.submit(&genesis, xt2.clone()).await; - assert_eq!(setup.process_notifications(), transactions(2)); - - let time_after_submit = Instant::now(); - - let block_1 = setup - .pool - .propose_block(genesis, Some(2 * xt1.encoded_size() - 1)) - .await; - - assert_eq!(setup.process_notifications(), blocks_imported(1)); - assert_eq!(block_1.extrinsics.len(), 1); - assert_eq!(setup.transactions_histogram().get_sample_count(), 1); - let sample_1 = setup.transactions_histogram().get_sample_sum(); - - tokio::time::sleep(Duration::from_millis(10)).await; - - let time_before_block_2 = Instant::now(); - let block_2 = setup - .pool - .propose_block(block_1.hash(), Some(2 * xt1.encoded_size() - 1)) - .await; - - assert_eq!(setup.process_notifications(), blocks_imported(1)); - assert_eq!(block_2.extrinsics.len(), 1); - assert_eq!(setup.transactions_histogram().get_sample_count(), 2); - - let sample_2 = setup.transactions_histogram().get_sample_sum() - sample_1; - - let duration = Duration::from_secs_f64(sample_2 / 1000.0); - - assert!(duration >= time_before_block_2 - time_after_submit - EPS); - } -} diff --git a/finality-aleph/src/metrics/finality_rate.rs b/finality-aleph/src/metrics/finality_rate.rs index 09d084b5e9..cd2022b669 100644 --- a/finality-aleph/src/metrics/finality_rate.rs +++ b/finality-aleph/src/metrics/finality_rate.rs @@ -8,8 +8,7 @@ use sc_service::Arc; use sp_core::{bounded_vec::BoundedVec, ConstU32}; use substrate_prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; -use super::Checkpoint; -use crate::metrics::LOG_TARGET; +use crate::{metrics::LOG_TARGET, BlockId}; const MAX_CACHE_SIZE: usize = 1800; const MAX_INNER_SIZE: u32 = 64; @@ -36,11 +35,11 @@ impl FinalityRateMetrics { Ok(FinalityRateMetrics::Prometheus { own_finalized: register( - Counter::new("aleph_own_finalized_blocks", "no help")?, + Counter::new("aleph_own_finalized_blocks", "Number of self-produced blocks that became finalized")?, registry, )?, own_hopeless: register( - Counter::new("aleph_own_hopeless_blocks", "no help")?, + Counter::new("aleph_own_hopeless_blocks", "Number of self-produced blocks, such that some alternative block with the same block number was finalized")?, registry, )?, imported_cache: Arc::new(Mutex::new(LruCache::new( @@ -49,39 +48,21 @@ impl FinalityRateMetrics { }) } - pub fn report_block( - &self, - block_hash: BlockHash, - block_number: BlockNumber, - checkpoint: Checkpoint, - own: Option, - ) { - match checkpoint { - Checkpoint::Imported => { - if let Some(true) = own { - self.report_own_imported(block_hash, block_number); - } - } - Checkpoint::Finalized => self.report_finalized(block_hash, block_number), - _ => {} - } - } - /// Stores the imported block's hash. Assumes that the imported block is own. - fn report_own_imported(&self, hash: BlockHash, number: BlockNumber) { + pub fn report_own_imported(&self, id: BlockId) { let mut imported_cache = match self { FinalityRateMetrics::Prometheus { imported_cache, .. } => imported_cache.lock(), FinalityRateMetrics::Noop => return, }; let entry = imported_cache - .get_or_insert_mut(number, BoundedVec::<_, ConstU32>::new); + .get_or_insert_mut(id.number(), BoundedVec::<_, ConstU32>::new); - if entry.try_push(hash).is_err() { + if entry.try_push(id.hash()).is_err() { warn!( target: LOG_TARGET, "Finality Rate Metrics encountered too many own imported blocks at level {}", - number + id.number() ); } } @@ -89,7 +70,7 @@ impl FinalityRateMetrics { /// Counts the blocks at the level of `number` different than the passed block /// and reports them as hopeless. If `hash` is a hash of own block it will be found /// in `imported_cache` and reported as finalized. - fn report_finalized(&self, hash: BlockHash, number: BlockNumber) { + pub fn report_finalized(&self, id: BlockId) { let (own_finalized, own_hopeless, imported_cache) = match self { FinalityRateMetrics::Prometheus { own_finalized, @@ -100,12 +81,12 @@ impl FinalityRateMetrics { }; let mut imported_cache = imported_cache.lock(); - if let Some(hashes) = imported_cache.get_mut(&number) { - let new_hopeless_count = hashes.iter().filter(|h| **h != hash).count(); + if let Some(hashes) = imported_cache.get_mut(&id.number()) { + let new_hopeless_count = hashes.iter().filter(|h| **h != id.hash()).count(); own_hopeless.inc_by(new_hopeless_count as u64); own_finalized.inc_by((hashes.len() - new_hopeless_count) as u64); } - imported_cache.pop(&number); + imported_cache.pop(&id.number()); } } @@ -116,7 +97,7 @@ mod tests { use primitives::{BlockHash, BlockNumber}; use substrate_prometheus_endpoint::{Counter, Registry, U64}; - use crate::{metrics::finality_rate::ImportedHashesCache, FinalityRateMetrics}; + use super::{FinalityRateMetrics, ImportedHashesCache}; type FinalityRateMetricsInternals = (Counter, Counter, ImportedHashesCache); @@ -158,12 +139,12 @@ mod tests { verify_state(&metrics, 0, 0, HashMap::new()); let hash0 = BlockHash::random(); - metrics.report_own_imported(hash0, 0); + metrics.report_own_imported((hash0, 0).into()); verify_state(&metrics, 0, 0, HashMap::from([(0, vec![hash0])])); let hash1 = BlockHash::random(); - metrics.report_own_imported(hash1, 1); + metrics.report_own_imported((hash1, 1).into()); verify_state( &metrics, @@ -173,7 +154,7 @@ mod tests { ); let hash2 = BlockHash::random(); - metrics.report_own_imported(hash2, 1); + metrics.report_own_imported((hash2, 1).into()); verify_state( &metrics, @@ -182,11 +163,11 @@ mod tests { HashMap::from([(0, vec![hash0]), (1, vec![hash1, hash2])]), ); - metrics.report_finalized(hash0, 0); + metrics.report_finalized((hash0, 0).into()); verify_state(&metrics, 1, 0, HashMap::from([(1, vec![hash1, hash2])])); - metrics.report_finalized(BlockHash::random(), 1); + metrics.report_finalized((BlockHash::random(), 1).into()); verify_state(&metrics, 1, 2, HashMap::new()); } diff --git a/finality-aleph/src/metrics/mod.rs b/finality-aleph/src/metrics/mod.rs index 6e322fcdab..4ea3ce6ce0 100644 --- a/finality-aleph/src/metrics/mod.rs +++ b/finality-aleph/src/metrics/mod.rs @@ -1,15 +1,13 @@ -mod all_block; -mod chain_state; +mod best_block; mod finality_rate; +mod slo; mod timing; pub mod transaction_pool; -pub use all_block::AllBlockMetrics; -pub use chain_state::run_chain_state_metrics; -pub use finality_rate::FinalityRateMetrics; +pub use slo::{run_metrics_service, SloMetrics}; +pub use timing::{Checkpoint, DefaultClock}; +pub type TimingBlockMetrics = timing::TimingBlockMetrics; use substrate_prometheus_endpoint::{exponential_buckets, prometheus}; -pub use timing::{Checkpoint, DefaultClock, TimingBlockMetrics}; -pub use transaction_pool::TransactionPoolInfoProvider; const LOG_TARGET: &str = "aleph-metrics"; diff --git a/finality-aleph/src/metrics/slo.rs b/finality-aleph/src/metrics/slo.rs new file mode 100644 index 0000000000..b84ff4f418 --- /dev/null +++ b/finality-aleph/src/metrics/slo.rs @@ -0,0 +1,122 @@ +use futures::{Stream, StreamExt}; +use log::warn; +use parity_scale_codec::Encode; +use primitives::Block; +use sp_runtime::traits::Block as _; +use substrate_prometheus_endpoint::Registry; + +use super::{finality_rate::FinalityRateMetrics, timing::DefaultClock}; +use crate::{ + block::ChainStatus, + metrics::{ + best_block::BestBlockMetrics, timing::Checkpoint, transaction_pool::TransactionPoolMetrics, + TimingBlockMetrics, LOG_TARGET, + }, + BlockId, SubstrateChainStatus, +}; + +pub async fn run_metrics_service + Unpin>( + metrics: &SloMetrics, + transaction_pool_stream: &mut TS, +) { + if !metrics.is_noop() { + while let Some(tx) = transaction_pool_stream.next().await { + metrics.report_transaction_in_pool(tx); + } + warn!(target: LOG_TARGET, "SLO Metrics service terminated, because the transaction pool stream ended."); + } +} + +pub type Hashing = sp_runtime::traits::HashingFor; +pub type TxHash = ::Output; + +#[derive(Clone)] +pub struct SloMetrics { + timing_metrics: TimingBlockMetrics, + finality_rate_metrics: FinalityRateMetrics, + best_block_metrics: BestBlockMetrics, + transaction_metrics: TransactionPoolMetrics, + chain_status: SubstrateChainStatus, +} + +impl SloMetrics { + pub fn new(registry: Option<&Registry>, chain_status: SubstrateChainStatus) -> Self { + let warn_creation_failed = |name, e| warn!(target: LOG_TARGET, "Failed to register Prometheus {name} metrics: {e:?}."); + let timing_metrics = TimingBlockMetrics::new(registry, DefaultClock).unwrap_or_else(|e| { + warn!( + target: LOG_TARGET, + "Failed to register Prometheus block timing metrics: {:?}.", e + ); + TimingBlockMetrics::Noop + }); + let finality_rate_metrics = FinalityRateMetrics::new(registry).unwrap_or_else(|e| { + warn!( + target: LOG_TARGET, + "Failed to register Prometheus finality rate metrics: {:?}.", e + ); + FinalityRateMetrics::Noop + }); + let best_block_metrics = BestBlockMetrics::new(registry.cloned(), chain_status.clone()) + .unwrap_or_else(|e| { + warn_creation_failed("best block related", e); + BestBlockMetrics::Noop + }); + let transaction_metrics = TransactionPoolMetrics::new(registry, DefaultClock) + .unwrap_or_else(|e| { + warn_creation_failed("transaction pool", e); + TransactionPoolMetrics::Noop + }); + + SloMetrics { + timing_metrics, + finality_rate_metrics, + best_block_metrics, + transaction_metrics, + chain_status, + } + } + + pub fn is_noop(&self) -> bool { + matches!(self.timing_metrics, TimingBlockMetrics::Noop) + && matches!(self.finality_rate_metrics, FinalityRateMetrics::Noop) + && matches!(self.best_block_metrics, BestBlockMetrics::Noop) + && matches!(self.transaction_metrics, TransactionPoolMetrics::Noop) + } + + pub fn timing_metrics(&self) -> &TimingBlockMetrics { + &self.timing_metrics + } + + pub fn report_transaction_in_pool(&self, hash: TxHash) { + self.transaction_metrics.report_in_pool(hash); + } + + pub fn report_block_imported(&mut self, block_id: BlockId, is_new_best: bool, own: bool) { + self.timing_metrics + .report_block(block_id.hash(), Checkpoint::Imported); + if own { + self.finality_rate_metrics + .report_own_imported(block_id.clone()); + } + if is_new_best { + self.best_block_metrics + .report_best_block_imported(block_id.clone()); + } + if let Ok(Some(block)) = self.chain_status.block(block_id.clone()) { + // Skip inherents - there is always exactly one, namely the timestamp inherent. + for xt in block.extrinsics().iter().skip(1) { + self.transaction_metrics + .report_in_block(xt.using_encoded(::hash)); + } + } + } + + pub fn report_block_finalized(&self, block_id: BlockId) { + self.timing_metrics + .report_block(block_id.hash(), Checkpoint::Finalized); + self.finality_rate_metrics + .report_finalized(block_id.clone()); + self.best_block_metrics + .report_block_finalized(block_id.clone()); + } +} diff --git a/finality-aleph/src/metrics/timing.rs b/finality-aleph/src/metrics/timing.rs index 53b338cd16..a71fae5221 100644 --- a/finality-aleph/src/metrics/timing.rs +++ b/finality-aleph/src/metrics/timing.rs @@ -24,7 +24,7 @@ pub trait Clock { fn now(&self) -> Instant; } -#[derive(Clone)] +#[derive(Clone, Default)] pub struct DefaultClock; impl Clock for DefaultClock { fn now(&self) -> Instant { @@ -166,7 +166,7 @@ impl TimingBlockMetrics { let duration = checkpoint_time .checked_duration_since(*start) .unwrap_or_else(|| { - Self::warn_about_monotonicity_violation( + warn_about_monotonicity_violation( *start, checkpoint_time, checkpoint_type, @@ -190,7 +190,7 @@ impl TimingBlockMetrics { let duration = checkpoint_time .checked_duration_since(*start) .unwrap_or_else(|| { - Self::warn_about_monotonicity_violation( + warn_about_monotonicity_violation( *start, checkpoint_time, checkpoint_type, @@ -202,23 +202,6 @@ impl TimingBlockMetrics { } } } - - fn warn_about_monotonicity_violation( - start: Instant, - checkpoint_time: Instant, - checkpoint_type: Checkpoint, - hash: BlockHash, - ) { - warn!( - target: LOG_TARGET, - "Earlier metrics time {:?} is later that current one \ - {:?}. Checkpoint type {:?}, block: {:?}", - start, - checkpoint_time, - checkpoint_type, - hash - ); - } } #[derive(Clone, Copy, Debug, Display, Hash, PartialEq, Eq)] @@ -243,6 +226,23 @@ impl Checkpoint { } } +fn warn_about_monotonicity_violation( + start: Instant, + checkpoint_time: Instant, + checkpoint_type: Checkpoint, + hash: BlockHash, +) { + warn!( + target: LOG_TARGET, + "Earlier metrics time {:?} is later that current one \ + {:?}. Checkpoint type {:?}, block: {:?}", + start, + checkpoint_time, + checkpoint_type, + hash + ); +} + #[cfg(test)] mod tests { use std::{cell::RefCell, cmp::min}; diff --git a/finality-aleph/src/metrics/transaction_pool.rs b/finality-aleph/src/metrics/transaction_pool.rs index aa954051dc..7ed61a299a 100644 --- a/finality-aleph/src/metrics/transaction_pool.rs +++ b/finality-aleph/src/metrics/transaction_pool.rs @@ -1,63 +1,145 @@ -use std::sync::Arc; +use std::{ + num::NonZeroUsize, + sync::Arc, + time::{Duration, Instant}, +}; -use futures::StreamExt; -use sc_transaction_pool_api::{ImportNotificationStream, TransactionFor, TransactionPool}; -use sp_runtime::traits::Member; +use lru::LruCache; +use parking_lot::Mutex; +use substrate_prometheus_endpoint::{ + register, Counter, Histogram, HistogramOpts, PrometheusError, Registry, U64, +}; -#[async_trait::async_trait] -pub trait TransactionPoolInfoProvider { - type TxHash: Member + std::hash::Hash; - type Extrinsic: sp_runtime::traits::Extrinsic; - async fn next_transaction(&mut self) -> Option; +use crate::metrics::{exponential_buckets_two_sided, timing::Clock}; - fn hash_of(&self, extrinsic: &Self::Extrinsic) -> Self::TxHash; -} +// Size of transaction cache: 32B (Hash) + 16B (Instant) * `100_000` is approximately 4.8MB +const TRANSACTION_CACHE_SIZE: usize = 100_000; +const BUCKETS_FACTOR: f64 = 1.4; -pub struct TransactionPoolWrapper { - pool: Arc, - import_notification_stream: ImportNotificationStream, +#[derive(Clone)] +pub enum TransactionPoolMetrics { + Prometheus { + time_till_block_inclusion: Histogram, + transactions_not_seen_in_the_pool: Counter, + cache: Arc>>, + clock: C, + }, + Noop, } -impl TransactionPoolWrapper { - pub fn new(pool: Arc) -> Self { - Self { - pool: pool.clone(), - import_notification_stream: pool.import_notification_stream(), - } - } -} +impl TransactionPoolMetrics { + pub fn new(registry: Option<&Registry>, clock: C) -> Result { + let registry = match registry { + None => return Ok(Self::Noop), + Some(registry) => registry, + }; -#[async_trait::async_trait] -impl TransactionPoolInfoProvider for TransactionPoolWrapper { - type TxHash = T::Hash; - type Extrinsic = TransactionFor; + Ok(Self::Prometheus { + time_till_block_inclusion: register( + Histogram::with_opts( + HistogramOpts::new( + "aleph_transaction_to_block_time", + "Time from becoming ready in the pool to inclusion in some valid block.", + ) + .buckets(exponential_buckets_two_sided( + 2000.0, + BUCKETS_FACTOR, + 2, + 8, + )?), + )?, + registry, + )?, + transactions_not_seen_in_the_pool: register( + Counter::new( + "aleph_transactions_not_seen_in_the_pool", + "\ + Number of transactions that were reported to be in block before reporting of \ + being in the ready queue in the transaction pool. This could happen \ + for many reasons, e.g. when a transaction has been added to the future pool, \ + has been submitted locally, or because of a race condition \ + (especially probable when there is an increased transaction load)", + )?, + registry, + )?, + cache: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(TRANSACTION_CACHE_SIZE) + .expect("the cache size is a non-zero constant"), + ))), + clock, + }) + } - async fn next_transaction(&mut self) -> Option { - self.import_notification_stream.next().await + pub fn report_in_pool(&self, hash: TxHash) { + if let Self::Prometheus { cache, clock, .. } = self { + // Putting new transaction can evict the oldest one. However, even if the + // removed transaction was actually still in the pool, we don't have + // any guarantees that it would be eventually included in the block. + // Therefore, we ignore such transaction. + let cache = &mut *cache.lock(); + cache.put(hash, clock.now()); + } } - fn hash_of(&self, extrinsic: &Self::Extrinsic) -> Self::TxHash { - self.pool.hash_of(extrinsic) + pub fn report_in_block(&self, hash: TxHash) { + if let Self::Prometheus { + time_till_block_inclusion, + transactions_not_seen_in_the_pool, + cache, + .. + } = self + { + let cache = &mut *cache.lock(); + let elapsed = match cache.pop(&hash) { + Some(insert_time) => insert_time.elapsed(), + None => { + // Either it was never in the pool (e.g. submitted locally), or we've got BlockImport + // notification faster than transaction in pool one. The latter is much more likely, + // so we report it as zero. + transactions_not_seen_in_the_pool.inc(); + Duration::ZERO + } + }; + time_till_block_inclusion.observe(elapsed.as_secs_f64() * 1000.); + } } } #[cfg(test)] pub mod test { - use std::{sync::Arc, time::Duration}; + use std::{ + collections::HashMap, + hash::Hash, + sync::Arc, + time::{Duration, Instant}, + }; - use futures::{future, StreamExt}; + use futures::{future, FutureExt, Stream, StreamExt}; + use parity_scale_codec::Encode; use sc_basic_authorship::ProposerFactory; - use sc_client_api::{BlockchainEvents, HeaderBackend}; + use sc_block_builder::BlockBuilderBuilder; + use sc_client_api::{ + BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification, + HeaderBackend, + }; use sc_transaction_pool::{BasicPool, FullChainApi}; - use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool}; + use sc_transaction_pool_api::{ + ImportNotificationStream, MaintainedTransactionPool, TransactionPool, + }; use sp_consensus::{BlockOrigin, DisableProofRecording, Environment, Proposer as _}; use sp_runtime::{traits::Block as BlockT, transaction_validity::TransactionSource}; + use substrate_prometheus_endpoint::{Histogram, Registry}; + use substrate_test_client::TestClientBuilder; use substrate_test_runtime::{Extrinsic, ExtrinsicBuilder, Transfer}; use substrate_test_runtime_client::{AccountKeyring, ClientBlockImportExt, ClientExt}; use crate::{ - metrics::transaction_pool::TransactionPoolWrapper, - testing::mocks::{TBlock, THash, TestClient}, + metrics::{ + slo::{Hashing, TxHash}, + timing::DefaultClock, + transaction_pool::TransactionPoolMetrics, + }, + testing::mocks::{TBlock, THash, TestClient, TestClientBuilderExt}, }; type TChainApi = FullChainApi; @@ -68,7 +150,6 @@ pub mod test { pub client: Arc, pub pool: Arc, pub proposer_factory: TProposerFactory, - pub transaction_pool_info_provider: TransactionPoolWrapper>, } impl TestTransactionPoolSetup { @@ -81,7 +162,6 @@ pub mod test { spawner.clone(), client.clone(), ); - let transaction_pool_info_provider = TransactionPoolWrapper::new(pool.clone()); let proposer_factory = ProposerFactory::new(spawner, client.clone(), pool.clone(), None, None); @@ -90,10 +170,13 @@ pub mod test { client, pool, proposer_factory, - transaction_pool_info_provider, } } + pub fn import_notification_stream(&self) -> ImportNotificationStream { + self.pool.import_notification_stream() + } + pub async fn propose_block(&mut self, at: THash, weight_limit: Option) -> TBlock { let proposer = self .proposer_factory @@ -169,4 +252,279 @@ pub mod test { .unwrap(); } } + + // Transaction pool metrics tests + struct TestSetup { + pub pool: TestTransactionPoolSetup, + pub metrics: TransactionPoolMetrics, + pub block_import_notifications: + Box> + Unpin>, + pub finality_notifications: Box> + Unpin>, + pub pool_import_notifications: ImportNotificationStream, + } + + #[derive(PartialEq, Eq, Hash, Debug)] + enum NotificationType { + BlockImport, + Finality, + Transaction, + } + + impl TestSetup { + fn new() -> Self { + let client = Arc::new(TestClientBuilder::new().build()); + + let block_import_notifications = + Box::new(client.every_import_notification_stream().fuse()); + let finality_notifications = Box::new(client.finality_notification_stream().fuse()); + + let pool = TestTransactionPoolSetup::new(client); + let pool_import_notifications = pool.import_notification_stream(); + + let registry = Registry::new(); + let metrics = + TransactionPoolMetrics::new(Some(®istry), DefaultClock).expect("metrics"); + + TestSetup { + pool, + metrics, + block_import_notifications, + finality_notifications, + pool_import_notifications, + } + } + + fn genesis(&self) -> THash { + self.pool.client.info().genesis_hash + } + + fn transactions_histogram(&self) -> &Histogram { + match &self.metrics { + TransactionPoolMetrics::Prometheus { + time_till_block_inclusion, + .. + } => time_till_block_inclusion, + _ => panic!("metrics"), + } + } + + fn process_notifications(&mut self) -> HashMap { + let mut block_imported_notifications = 0; + let mut finality_notifications = 0; + let mut transaction_notifications = 0; + + while let Some(block) = self.block_import_notifications.next().now_or_never() { + let body = self + .pool + .client + .block_body(block.expect("stream should not end").hash) + .expect("block should exist") + .expect("block should have body"); + for xt in body { + let hash = xt.using_encoded(::hash); + self.metrics.report_in_block(hash); + } + block_imported_notifications += 1; + } + while self.finality_notifications.next().now_or_never().is_some() { + finality_notifications += 1; + } + while let Some(transaction) = self.pool_import_notifications.next().now_or_never() { + self.metrics + .report_in_pool(transaction.expect("stream should not end")); + transaction_notifications += 1; + } + HashMap::from_iter(vec![ + (NotificationType::BlockImport, block_imported_notifications), + (NotificationType::Finality, finality_notifications), + (NotificationType::Transaction, transaction_notifications), + ]) + } + } + + fn blocks_imported(n: usize) -> HashMap { + HashMap::from_iter(vec![ + (NotificationType::BlockImport, n), + (NotificationType::Finality, 0), + (NotificationType::Transaction, 0), + ]) + } + fn transactions(n: usize) -> HashMap { + HashMap::from_iter(vec![ + (NotificationType::BlockImport, 0), + (NotificationType::Finality, 0), + (NotificationType::Transaction, n), + ]) + } + + const EPS: Duration = Duration::from_nanos(1); + + #[tokio::test] + async fn transactions_are_reported() { + let mut setup = TestSetup::new(); + let genesis = setup.genesis(); + let xt = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); + + let time_before_submit = Instant::now(); + setup.pool.submit(&genesis, xt).await; + + assert_eq!( + setup.process_notifications(), + transactions(1), + "'In pool' notification wasn't sent" + ); + let time_after_submit = Instant::now(); + + tokio::time::sleep(Duration::from_millis(20)).await; + + let time_before_import = Instant::now(); + let _block_1 = setup.pool.propose_block(genesis, None).await; + let pre_count = setup.transactions_histogram().get_sample_count(); + + assert_eq!( + setup.process_notifications(), + blocks_imported(1), + "Block import notification wasn't sent" + ); + + let time_after_import = Instant::now(); + + let duration = + Duration::from_secs_f64(setup.transactions_histogram().get_sample_sum() / 1000.); + + assert_eq!(pre_count, 0); + assert_eq!(setup.transactions_histogram().get_sample_count(), 1); + assert!(duration >= time_before_import - time_after_submit - EPS); + assert!(duration <= time_after_import - time_before_submit + EPS); + } + + #[tokio::test] + async fn transactions_are_reported_only_if_ready_when_added_to_the_pool() { + let mut setup = TestSetup::new(); + let genesis = setup.genesis(); + + let xt1 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); + let xt2 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 1); + let xt3 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 2); + + setup.pool.submit(&genesis, xt2.clone()).await; + + // No notification for xt2 as it is not ready + assert_eq!( + setup.process_notifications(), + transactions(0), + "Future transactions should not be reported" + ); + + setup.pool.submit(&genesis, xt1.clone()).await; + setup.pool.submit(&genesis, xt3.clone()).await; + + // Notifications for xt1 and xt3 + assert_eq!(setup.process_notifications(), transactions(2)); + + let block_1 = setup.pool.propose_block(genesis, None).await; + // Block import notification. xt1 notification never appears + assert_eq!(setup.process_notifications(), blocks_imported(1)); + // All 3 extrinsics are included in the block + assert_eq!(block_1.extrinsics.len(), 3); + } + + #[tokio::test] + async fn retracted_transactions_are_reported_only_once() { + let mut setup = TestSetup::new(); + let genesis = setup.genesis(); + + let xt1 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); + let xt2 = setup + .pool + .extrinsic(AccountKeyring::Charlie, AccountKeyring::Dave, 0); + + setup.pool.submit(&genesis, xt1.clone()).await; + setup.pool.submit(&genesis, xt2.clone()).await; + + // make sure import notifications are received before block import + assert_eq!(setup.process_notifications(), transactions(2)); + + let block_1a = setup.pool.propose_block(genesis, None).await; + assert_eq!(block_1a.extrinsics.len(), 2); + assert_eq!(setup.process_notifications(), blocks_imported(1)); + assert_eq!(setup.transactions_histogram().get_sample_count(), 2); + + let sum_before = setup.transactions_histogram().get_sample_sum(); + + // external fork block with xt1 + let mut block_1b_builder = BlockBuilderBuilder::new(&*setup.pool.client) + .on_parent_block(genesis) + .with_parent_block_number(0) + .build() + .unwrap(); + + block_1b_builder.push(xt1.into()).unwrap(); + let block_1b = block_1b_builder.build().unwrap().block; + setup.pool.import_block(block_1b.clone()).await; + setup.pool.finalize(block_1b.hash()).await; + + let block_2b = setup.pool.propose_block(block_1b.hash(), None).await; + + assert_eq!(block_2b.extrinsics.len(), 1); + assert_eq!(setup.transactions_histogram().get_sample_count(), 2); + assert_eq!(setup.transactions_histogram().get_sample_sum(), sum_before); + } + + #[tokio::test] + async fn transactions_skipped_in_block_authorship_are_not_reported_at_that_time() { + let mut setup = TestSetup::new(); + let genesis = setup.genesis(); + + let xt1 = setup + .pool + .extrinsic(AccountKeyring::Alice, AccountKeyring::Bob, 0); + let xt2 = setup + .pool + .extrinsic(AccountKeyring::Charlie, AccountKeyring::Eve, 0); + + setup.pool.submit(&genesis, xt1.clone()).await; + setup.pool.submit(&genesis, xt2.clone()).await; + assert_eq!(setup.process_notifications(), transactions(2)); + + let time_after_submit = Instant::now(); + + let block_1 = setup + .pool + .propose_block(genesis, Some(2 * xt1.encoded_size() - 1)) + .await; + + assert_eq!(setup.process_notifications(), blocks_imported(1)); + assert_eq!(block_1.extrinsics.len(), 1); + assert_eq!(setup.transactions_histogram().get_sample_count(), 1); + let sample_1 = setup.transactions_histogram().get_sample_sum(); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let time_before_block_2 = Instant::now(); + let block_2 = setup + .pool + .propose_block(block_1.hash(), Some(2 * xt1.encoded_size() - 1)) + .await; + + assert_eq!(setup.process_notifications(), blocks_imported(1)); + assert_eq!(block_2.extrinsics.len(), 1); + assert_eq!(setup.transactions_histogram().get_sample_count(), 2); + + let sample_2 = setup.transactions_histogram().get_sample_sum() - sample_1; + + let duration = Duration::from_secs_f64(sample_2 / 1000.0); + + assert!(duration >= time_before_block_2 - time_after_submit - EPS); + } } diff --git a/finality-aleph/src/network/session/mod.rs b/finality-aleph/src/network/session/mod.rs index 81298e775b..bae67600da 100644 --- a/finality-aleph/src/network/session/mod.rs +++ b/finality-aleph/src/network/session/mod.rs @@ -32,7 +32,7 @@ pub use discovery::Discovery; #[cfg(test)] pub use handler::tests::authentication; pub use handler::{Handler as SessionHandler, HandlerError as SessionHandlerError}; -pub use service::{Config as ConnectionManagerConfig, ManagerError, Service as ConnectionManager}; +pub use service::{Config as ConnectionManagerConfig, Service as ConnectionManager}; /// The maximum size an authentication can have and be accepted. /// This leaves a generous margin of error, as the signature is 64 bytes, diff --git a/finality-aleph/src/nodes.rs b/finality-aleph/src/nodes.rs index e035c069db..e1dd0afb86 100644 --- a/finality-aleph/src/nodes.rs +++ b/finality-aleph/src/nodes.rs @@ -5,6 +5,7 @@ use futures::channel::oneshot; use log::{debug, error}; use network_clique::{RateLimitingDialer, RateLimitingListener, Service, SpawnHandleT}; use pallet_aleph_runtime_api::AlephSessionApi; +use primitives::TransactionHash; use rate_limiter::SleepingRateLimiter; use sc_client_api::Backend; use sc_keystore::{Keystore, LocalKeystore}; @@ -20,7 +21,7 @@ use crate::{ crypto::AuthorityPen, finalization::AlephFinalizer, idx_to_account::ValidatorIndexToAccountIdConverterImpl, - metrics::{run_chain_state_metrics, transaction_pool::TransactionPoolWrapper}, + metrics::{run_metrics_service, SloMetrics}, network::{ address_cache::validator_address_cache_updater, session::{ConnectionManager, ConnectionManagerConfig}, @@ -57,7 +58,7 @@ where C: crate::ClientForAleph + Send + Sync + 'static, C::Api: AlephSessionApi + AuraApi, BE: Backend + 'static, - TP: TransactionPool + 'static, + TP: TransactionPool + 'static, { let AlephConfig { authentication_network, @@ -68,7 +69,6 @@ where select_chain_provider, spawn_handle, keystore, - metrics, registry, unit_creation_delay, session_period, @@ -144,22 +144,17 @@ where let chain_events = client.chain_status_notifier(); - let client_for_slo_metrics = client.clone(); - let registry_for_slo_metrics = registry.clone(); - spawn_handle.spawn("aleph/slo-metrics", async move { - if let Err(err) = run_chain_state_metrics( - client_for_slo_metrics.as_ref(), - client_for_slo_metrics.every_import_notification_stream(), - client_for_slo_metrics.finality_notification_stream(), - registry_for_slo_metrics, - TransactionPoolWrapper::new(transaction_pool), - ) - .await - { - error!( - target: LOG_TARGET, - "ChainStateMetrics service finished with err: {err}." - ); + let slo_metrics = SloMetrics::new(registry.as_ref(), chain_status.clone()); + let timing_metrics = slo_metrics.timing_metrics().clone(); + + spawn_handle.spawn("aleph/slo-metrics", { + let slo_metrics = slo_metrics.clone(); + async move { + run_metrics_service( + &slo_metrics, + &mut transaction_pool.import_notification_stream(), + ) + .await; } }); @@ -177,8 +172,8 @@ where VERIFIER_CACHE_SIZE, genesis_header, ); - let finalizer = AlephFinalizer::new(client.clone(), metrics.clone()); - import_queue_handle.attach_metrics(metrics.clone()); + let finalizer = AlephFinalizer::new(client.clone()); + import_queue_handle.attach_metrics(timing_metrics.clone()); let justifications_for_sync = justification_channel_provider.get_sender(); let sync_io = SyncIO::new( SyncDatabaseIO::new(chain_status.clone(), finalizer, import_queue_handle), @@ -195,6 +190,7 @@ where session_info.clone(), sync_io, registry.clone(), + slo_metrics, favourite_block_user_requests, ) { Ok(x) => x, @@ -256,7 +252,7 @@ where justifications_for_sync, JustificationTranslator::new(chain_status.clone()), request_block, - metrics, + timing_metrics, spawn_handle, connection_manager, keystore, diff --git a/finality-aleph/src/party/manager/aggregator.rs b/finality-aleph/src/party/manager/aggregator.rs index a9eb0ac375..a28a58f771 100644 --- a/finality-aleph/src/party/manager/aggregator.rs +++ b/finality-aleph/src/party/manager/aggregator.rs @@ -19,7 +19,7 @@ use crate::{ }, crypto::Signature, justification::AlephJustification, - metrics::{AllBlockMetrics, Checkpoint}, + metrics::{Checkpoint, TimingBlockMetrics}, network::data::Network, party::{ manager::aggregator::AggregatorVersion::{Current, Legacy}, @@ -60,15 +60,14 @@ where async fn process_new_block_data( aggregator: &mut Aggregator, block: BlockId, - metrics: &mut AllBlockMetrics, + metrics: &mut TimingBlockMetrics, ) where CN: Network, LN: Network, { trace!(target: "aleph-party", "Received unit {:?} in aggregator.", block); let hash = block.hash(); - metrics.report_block(block, Checkpoint::Ordered, None); - + metrics.report_block(hash, Checkpoint::Ordered); aggregator.start_aggregation(hash).await; } @@ -108,7 +107,7 @@ async fn run_aggregator( io: IO, client: C, session_boundaries: &SessionBoundaries, - mut metrics: AllBlockMetrics, + mut metrics: TimingBlockMetrics, mut exit_rx: oneshot::Receiver<()>, ) -> Result<(), Error> where @@ -191,7 +190,7 @@ pub fn task( client: C, io: IO, session_boundaries: SessionBoundaries, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, multikeychain: Keychain, version: AggregatorVersion, ) -> Task diff --git a/finality-aleph/src/party/manager/mod.rs b/finality-aleph/src/party/manager/mod.rs index 4d600f8f91..6b1931b470 100644 --- a/finality-aleph/src/party/manager/mod.rs +++ b/finality-aleph/src/party/manager/mod.rs @@ -21,7 +21,7 @@ use crate::{ }, crypto::{AuthorityPen, AuthorityVerifier}, data_io::{ChainTracker, DataStore, OrderedDataInterpreter, SubstrateChainInfoProvider}, - metrics::AllBlockMetrics, + metrics::TimingBlockMetrics, mpsc, network::{ data::{ @@ -110,7 +110,7 @@ where justifications_for_sync: JS, justification_translator: JustificationTranslator, block_requester: RB, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, spawn_handle: SpawnHandle, session_manager: SM, keystore: Arc, @@ -142,7 +142,7 @@ where justifications_for_sync: JS, justification_translator: JustificationTranslator, block_requester: RB, - metrics: AllBlockMetrics, + metrics: TimingBlockMetrics, spawn_handle: SpawnHandle, session_manager: SM, keystore: Arc, diff --git a/finality-aleph/src/sync/service.rs b/finality-aleph/src/sync/service.rs index 0f36ca0213..ecfc24a618 100644 --- a/finality-aleph/src/sync/service.rs +++ b/finality-aleph/src/sync/service.rs @@ -15,6 +15,7 @@ use crate::{ EquivocationProof, Finalizer, Header, HeaderVerifier, Justification, JustificationVerifier, UnverifiedHeader, UnverifiedHeaderFor, }, + metrics::SloMetrics, network::GossipNetwork, session::SessionBoundaryInfo, sync::{ @@ -143,6 +144,7 @@ where blocks_from_creator: mpsc::UnboundedReceiver, major_sync_last_status: bool, metrics: Metrics, + slo_metrics: SloMetrics, favourite_block_request: mpsc::UnboundedReceiver>, } @@ -180,6 +182,7 @@ where session_info: SessionBoundaryInfo, io: IO, metrics_registry: Option, + slo_metrics: SloMetrics, favourite_block_request: mpsc::UnboundedReceiver>, ) -> Result<(Self, impl RequestBlocks), HandlerError> { let IO { @@ -196,13 +199,10 @@ where let broadcast_ticker = Ticker::new(TICK_PERIOD, BROADCAST_COOLDOWN); let chain_extension_ticker = Ticker::new(TICK_PERIOD, CHAIN_EXTENSION_COOLDOWN); let (block_requests_for_sync, block_requests_from_user) = mpsc::unbounded(); - let metrics = match Metrics::new(metrics_registry) { - Ok(metrics) => metrics, - Err(e) => { - warn!(target: LOG_TARGET, "Failed to create metrics: {}.", e); - Metrics::noop() - } - }; + let metrics = Metrics::new(metrics_registry).unwrap_or_else(|e| { + warn!(target: LOG_TARGET, "Failed to create metrics: {}.", e); + Metrics::noop() + }); Ok(( Service { @@ -217,6 +217,7 @@ where block_requests_from_user, major_sync_last_status: false, metrics, + slo_metrics, favourite_block_request, }, block_requests_for_sync, @@ -585,9 +586,12 @@ where match event { BlockImported(header) => { trace!(target: LOG_TARGET, "Handling a new imported block."); + let mut own_block = false; + let id = header.id(); self.metrics.report_event(Event::HandleBlockImported); match self.handler.block_imported(header) { Ok(Some(broadcast)) => { + own_block = true; if let Err(e) = self .network .broadcast(NetworkData::RequestResponse(broadcast)) @@ -607,10 +611,14 @@ where ); } } + let is_new_best = id == self.handler.favourite_block().id(); + self.slo_metrics + .report_block_imported(id, is_new_best, own_block); } - BlockFinalized(_) => { + BlockFinalized(header) => { trace!(target: LOG_TARGET, "Handling a new finalized block."); self.metrics.report_event(Event::HandleBlockFinalized); + self.slo_metrics.report_block_finalized(header.id()) } } // We either learned about a new finalized or best block, so we diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 0650268ea7..41ef259fe7 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -86,6 +86,9 @@ pub type BlockId = generic::BlockId; /// Block Hash type pub type BlockHash =
::Hash; +/// A hash of extrinsic. +pub type TransactionHash = Hash; + /// The address format for describing accounts. pub type Address = sp_runtime::MultiAddress;