Skip to content

Commit

Permalink
Make thread id a tag instead of field in BankingStage metrics (#644)
Browse files Browse the repository at this point in the history
Making the id a tag instead of a field will allow group-by operations on
id in chronograph
  • Loading branch information
steviez authored Apr 8, 2024
1 parent 0af9aaa commit 191a997
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 49 deletions.
6 changes: 3 additions & 3 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
#[derive(Debug, Default)]
pub struct BankingStageStats {
last_report: AtomicInterval,
id: u32,
id: String,
receive_and_buffer_packets_count: AtomicUsize,
dropped_packets_count: AtomicUsize,
pub(crate) dropped_duplicated_packets_count: AtomicUsize,
Expand All @@ -113,7 +113,7 @@ pub struct BankingStageStats {
impl BankingStageStats {
pub fn new(id: u32) -> Self {
BankingStageStats {
id,
id: id.to_string(),
batch_packet_indexes_len: Histogram::configure()
.max_value(PACKETS_PER_BATCH as u64)
.build()
Expand Down Expand Up @@ -157,7 +157,7 @@ impl BankingStageStats {
if self.last_report.should_update(report_interval_ms) {
datapoint_info!(
"banking_stage-loop-stats",
("id", self.id, i64),
"id" => self.id,
(
"receive_and_buffer_packets_count",
self.receive_and_buffer_packets_count
Expand Down
22 changes: 11 additions & 11 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ fn try_drain_iter<T>(work: T, receiver: &Receiver<T>) -> impl Iterator<Item = T>
/// since the consume worker thread is sleeping unless there is work to be
/// done.
pub(crate) struct ConsumeWorkerMetrics {
id: u32,
id: String,
interval: AtomicInterval,
has_data: AtomicBool,

Expand All @@ -185,15 +185,15 @@ impl ConsumeWorkerMetrics {
if self.interval.should_update(REPORT_INTERVAL_MS)
&& self.has_data.swap(false, Ordering::Relaxed)
{
self.count_metrics.report_and_reset(self.id);
self.timing_metrics.report_and_reset(self.id);
self.error_metrics.report_and_reset(self.id);
self.count_metrics.report_and_reset(&self.id);
self.timing_metrics.report_and_reset(&self.id);
self.error_metrics.report_and_reset(&self.id);
}
}

fn new(id: u32) -> Self {
Self {
id,
id: id.to_string(),
interval: AtomicInterval::default(),
has_data: AtomicBool::new(false),
count_metrics: ConsumeWorkerCountMetrics::default(),
Expand Down Expand Up @@ -428,10 +428,10 @@ impl Default for ConsumeWorkerCountMetrics {
}

impl ConsumeWorkerCountMetrics {
fn report_and_reset(&self, id: u32) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_counts",
("id", id, i64),
"id" => id,
(
"transactions_attempted_execution_count",
self.transactions_attempted_execution_count
Expand Down Expand Up @@ -495,10 +495,10 @@ struct ConsumeWorkerTimingMetrics {
}

impl ConsumeWorkerTimingMetrics {
fn report_and_reset(&self, id: u32) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_timing",
("id", id, i64),
"id" => id,
(
"cost_model_us",
self.cost_model_us.swap(0, Ordering::Relaxed),
Expand Down Expand Up @@ -573,10 +573,10 @@ struct ConsumeWorkerTransactionErrorMetrics {
}

impl ConsumeWorkerTransactionErrorMetrics {
fn report_and_reset(&self, id: u32) {
fn report_and_reset(&self, id: &str) {
datapoint_info!(
"banking_stage_worker_error_metrics",
("id", id, i64),
"id" => id,
("total", self.total.swap(0, Ordering::Relaxed), i64),
(
"account_in_use",
Expand Down
26 changes: 13 additions & 13 deletions core/src/banking_stage/leader_slot_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ impl LeaderPrioritizationFeesMetrics {
}
}

fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_prioritization_fees_info",
("id", id, i64),
"id" => id,
("slot", slot, i64),
(
"min_prioritization_fees_per_cu",
Expand Down Expand Up @@ -199,10 +199,10 @@ impl LeaderSlotPacketCountMetrics {
Self::default()
}

fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_packet_counts",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"total_new_valid_packets",
Expand Down Expand Up @@ -328,7 +328,7 @@ pub(crate) struct LeaderSlotMetrics {
// banking_stage creates one QosService instance per working threads, that is uniquely
// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
// and other transactions.
id: u32,
id: String,

// aggregate metrics per slot
slot: Slot,
Expand All @@ -355,7 +355,7 @@ impl LeaderSlotMetrics {
unprocessed_transaction_storage: Option<&UnprocessedTransactionStorage>,
) -> Self {
Self {
id,
id: id.to_string(),
slot,
packet_count_metrics: LeaderSlotPacketCountMetrics::new(),
transaction_error_metrics: TransactionErrorMetrics::new(),
Expand All @@ -371,11 +371,11 @@ impl LeaderSlotMetrics {
pub(crate) fn report(&mut self) {
self.is_reported = true;

self.timing_metrics.report(self.id, self.slot);
self.transaction_error_metrics.report(self.id, self.slot);
self.packet_count_metrics.report(self.id, self.slot);
self.vote_packet_count_metrics.report(self.id, self.slot);
self.prioritization_fees_metric.report(self.id, self.slot);
self.timing_metrics.report(&self.id, self.slot);
self.transaction_error_metrics.report(&self.id, self.slot);
self.packet_count_metrics.report(&self.id, self.slot);
self.vote_packet_count_metrics.report(&self.id, self.slot);
self.prioritization_fees_metric.report(&self.id, self.slot);
}

/// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None
Expand Down Expand Up @@ -408,10 +408,10 @@ impl VotePacketCountMetrics {
Self::default()
}

fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-vote_packet_counts",
("id", id, i64),
"id" => id,
("slot", slot, i64),
("dropped_gossip_votes", self.dropped_gossip_votes, i64),
("dropped_tpu_votes", self.dropped_tpu_votes, i64)
Expand Down
24 changes: 12 additions & 12 deletions core/src/banking_stage/leader_slot_timing_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ impl LeaderExecuteAndCommitTimings {
self.execute_timings.accumulate(&other.execute_timings);
}

pub fn report(&self, id: u32, slot: Slot) {
pub fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_execute_and_commit_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
("collect_balances_us", self.collect_balances_us as i64, i64),
("load_execute_us", self.load_execute_us as i64, i64),
Expand All @@ -52,7 +52,7 @@ impl LeaderExecuteAndCommitTimings {

datapoint_info!(
"banking_stage-leader_slot_record_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"execution_results_to_transactions_us",
Expand Down Expand Up @@ -96,7 +96,7 @@ impl LeaderSlotTimingMetrics {
}
}

pub(crate) fn report(&self, id: u32, slot: Slot) {
pub(crate) fn report(&self, id: &str, slot: Slot) {
self.outer_loop_timings.report(id, slot);
self.process_buffered_packets_timings.report(id, slot);
self.consume_buffered_packets_timings.report(id, slot);
Expand Down Expand Up @@ -148,10 +148,10 @@ impl OuterLoopTimings {
self.bank_detected_time.elapsed().as_micros() as u64;
}

fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_loop_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"bank_detected_to_slot_end_detected_us",
Expand Down Expand Up @@ -191,10 +191,10 @@ pub(crate) struct ProcessBufferedPacketsTimings {
pub forward_and_hold_us: u64,
}
impl ProcessBufferedPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_process_buffered_packets_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
("make_decision_us", self.make_decision_us as i64, i64),
(
Expand All @@ -215,10 +215,10 @@ pub(crate) struct ConsumeBufferedPacketsTimings {
}

impl ConsumeBufferedPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_consume_buffered_packets_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"process_packets_transactions_us",
Expand Down Expand Up @@ -247,10 +247,10 @@ pub(crate) struct ProcessPacketsTimings {
}

impl ProcessPacketsTimings {
fn report(&self, id: u32, slot: Slot) {
fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_process_packets_timings",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
(
"transactions_from_packets_us",
Expand Down
8 changes: 4 additions & 4 deletions core/src/banking_stage/qos_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ struct QosServiceMetrics {
/// banking_stage creates one QosService instance per working threads, that is uniquely
/// identified by id. This field allows to categorize metrics for gossip votes, TPU votes
/// and other transactions.
id: u32,
id: String,

/// aggregate metrics per slot
slot: AtomicU64,
Expand Down Expand Up @@ -460,7 +460,7 @@ struct QosServiceMetricsErrors {
impl QosServiceMetrics {
pub fn new(id: u32) -> Self {
QosServiceMetrics {
id,
id: id.to_string(),
..QosServiceMetrics::default()
}
}
Expand All @@ -469,7 +469,7 @@ impl QosServiceMetrics {
if bank_slot != self.slot.load(Ordering::Relaxed) {
datapoint_info!(
"qos-service-stats",
("id", self.id, i64),
"id" => self.id,
("bank_slot", bank_slot, i64),
(
"compute_cost_time",
Expand Down Expand Up @@ -532,7 +532,7 @@ impl QosServiceMetrics {
);
datapoint_info!(
"qos-service-errors",
("id", self.id, i64),
"id" => self.id,
("bank_slot", bank_slot, i64),
(
"retried_txs_per_block_limit_count",
Expand Down
14 changes: 10 additions & 4 deletions core/src/tracer_packet_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@ pub struct ModifiableTracerPacketStats {

#[derive(Debug, Default)]
pub struct TracerPacketStats {
id: u32,
id: String,
last_report: u64,
modifiable_tracer_packet_stats: Option<ModifiableTracerPacketStats>,
}

impl TracerPacketStats {
pub fn new(id: u32) -> Self {
Self {
id: id.to_string(),
..Self::default()
}
}

fn reset(id: String) -> Self {
Self {
id,
..Self::default()
Expand Down Expand Up @@ -116,7 +123,7 @@ impl TracerPacketStats {
{
datapoint_info!(
"tracer-packet-stats",
("id", self.id, i64),
"id" => &self.id,
(
"total_removed_before_sigverify",
modifiable_tracer_packet_stats
Expand Down Expand Up @@ -199,8 +206,7 @@ impl TracerPacketStats {
)
);

let id = self.id;
*self = Self::new(id);
*self = Self::reset(self.id.clone());
self.last_report = timestamp();
}
}
Expand Down
4 changes: 2 additions & 2 deletions svm/src/transaction_error_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ impl TransactionErrorMetrics {
);
}

pub fn report(&self, id: u32, slot: Slot) {
pub fn report(&self, id: &str, slot: Slot) {
datapoint_info!(
"banking_stage-leader_slot_transaction_errors",
("id", id as i64, i64),
"id" => id,
("slot", slot as i64, i64),
("total", self.total as i64, i64),
("account_in_use", self.account_in_use as i64, i64),
Expand Down

0 comments on commit 191a997

Please sign in to comment.