-
Notifications
You must be signed in to change notification settings - Fork 16
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(iota-json-rpc-api): Add metric processors back
- Loading branch information
1 parent
15f157c
commit 903b920
Showing
18 changed files
with
1,671 additions
and
1 deletion.
There are no files selected for viewing
124 changes: 124 additions & 0 deletions
124
crates/iota-indexer/src/handlers/address_metrics_processor.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
// Copyright (c) Mysten Labs, Inc. | ||
// Modifications Copyright (c) 2024 IOTA Stiftung | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use tap::tap::TapFallible; | ||
use tracing::{error, info}; | ||
|
||
use crate::metrics::IndexerMetrics; | ||
use crate::store::IndexerAnalyticalStore; | ||
use crate::types::IndexerResult; | ||
|
||
const ADDRESS_PROCESSOR_BATCH_SIZE: usize = 80000; | ||
const PARALLELISM: usize = 10; | ||
|
||
pub struct AddressMetricsProcessor<S> { | ||
pub store: S, | ||
metrics: IndexerMetrics, | ||
pub address_processor_batch_size: usize, | ||
pub address_processor_parallelism: usize, | ||
} | ||
|
||
impl<S> AddressMetricsProcessor<S> | ||
where | ||
S: IndexerAnalyticalStore + Clone + Sync + Send + 'static, | ||
{ | ||
pub fn new(store: S, metrics: IndexerMetrics) -> AddressMetricsProcessor<S> { | ||
let address_processor_batch_size = std::env::var("ADDRESS_PROCESSOR_BATCH_SIZE") | ||
.map(|s| s.parse::<usize>().unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE)) | ||
.unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE); | ||
let address_processor_parallelism = std::env::var("ADDRESS_PROCESSOR_PARALLELISM") | ||
.map(|s| s.parse::<usize>().unwrap_or(PARALLELISM)) | ||
.unwrap_or(PARALLELISM); | ||
Self { | ||
store, | ||
metrics, | ||
address_processor_batch_size, | ||
address_processor_parallelism, | ||
} | ||
} | ||
|
||
pub async fn start(&self) -> IndexerResult<()> { | ||
info!("Indexer address metrics async processor started..."); | ||
let latest_tx_seq = self | ||
.store | ||
.get_address_metrics_last_processed_tx_seq() | ||
.await?; | ||
let mut last_processed_tx_seq = latest_tx_seq.unwrap_or_default().seq; | ||
loop { | ||
let mut latest_tx = self.store.get_latest_stored_transaction().await?; | ||
while if let Some(tx) = latest_tx { | ||
tx.tx_sequence_number | ||
< last_processed_tx_seq + self.address_processor_batch_size as i64 | ||
} else { | ||
true | ||
} { | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
latest_tx = self.store.get_latest_stored_transaction().await?; | ||
} | ||
|
||
let mut persist_tasks = vec![]; | ||
let batch_size = self.address_processor_batch_size; | ||
let step_size = batch_size / self.address_processor_parallelism; | ||
for chunk_start_tx_seq in (last_processed_tx_seq + 1 | ||
..last_processed_tx_seq + batch_size as i64 + 1) | ||
.step_by(step_size) | ||
{ | ||
let active_address_store = self.store.clone(); | ||
persist_tasks.push(tokio::task::spawn_blocking(move || { | ||
active_address_store.persist_active_addresses_in_tx_range( | ||
chunk_start_tx_seq, | ||
chunk_start_tx_seq + step_size as i64, | ||
) | ||
})); | ||
} | ||
for chunk_start_tx_seq in (last_processed_tx_seq + 1 | ||
..last_processed_tx_seq + batch_size as i64 + 1) | ||
.step_by(step_size) | ||
{ | ||
let address_store = self.store.clone(); | ||
persist_tasks.push(tokio::task::spawn_blocking(move || { | ||
address_store.persist_addresses_in_tx_range( | ||
chunk_start_tx_seq, | ||
chunk_start_tx_seq + step_size as i64, | ||
) | ||
})); | ||
} | ||
futures::future::join_all(persist_tasks) | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>() | ||
.tap_err(|e| { | ||
error!("Error joining address persist tasks: {:?}", e); | ||
})? | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>() | ||
.tap_err(|e| { | ||
error!("Error persisting addresses or active addresses: {:?}", e); | ||
})?; | ||
last_processed_tx_seq += self.address_processor_batch_size as i64; | ||
info!( | ||
"Persisted addresses and active addresses for tx seq: {}", | ||
last_processed_tx_seq, | ||
); | ||
self.metrics | ||
.latest_address_metrics_tx_seq | ||
.set(last_processed_tx_seq); | ||
|
||
let mut last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?; | ||
while last_processed_tx.is_none() { | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?; | ||
} | ||
// unwrap is safe here b/c we just checked that it's not None | ||
let last_processed_cp = last_processed_tx.unwrap().checkpoint_sequence_number; | ||
self.store | ||
.calculate_and_persist_address_metrics(last_processed_cp) | ||
.await?; | ||
info!( | ||
"Persisted address metrics for checkpoint: {}", | ||
last_processed_cp | ||
); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
113 changes: 113 additions & 0 deletions
113
crates/iota-indexer/src/handlers/move_call_metrics_processor.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
// Copyright (c) Mysten Labs, Inc. | ||
// Modifications Copyright (c) 2024 IOTA Stiftung | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use tap::tap::TapFallible; | ||
use tracing::{error, info}; | ||
|
||
use crate::metrics::IndexerMetrics; | ||
use crate::store::IndexerAnalyticalStore; | ||
use crate::types::IndexerResult; | ||
|
||
const MOVE_CALL_PROCESSOR_BATCH_SIZE: usize = 80000; | ||
const PARALLELISM: usize = 10; | ||
|
||
pub struct MoveCallMetricsProcessor<S> { | ||
pub store: S, | ||
metrics: IndexerMetrics, | ||
pub move_call_processor_batch_size: usize, | ||
pub move_call_processor_parallelism: usize, | ||
} | ||
|
||
impl<S> MoveCallMetricsProcessor<S> | ||
where | ||
S: IndexerAnalyticalStore + Clone + Sync + Send + 'static, | ||
{ | ||
pub fn new(store: S, metrics: IndexerMetrics) -> MoveCallMetricsProcessor<S> { | ||
let move_call_processor_batch_size = std::env::var("MOVE_CALL_PROCESSOR_BATCH_SIZE") | ||
.map(|s| s.parse::<usize>().unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE)) | ||
.unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE); | ||
let move_call_processor_parallelism = std::env::var("MOVE_CALL_PROCESSOR_PARALLELISM") | ||
.map(|s| s.parse::<usize>().unwrap_or(PARALLELISM)) | ||
.unwrap_or(PARALLELISM); | ||
Self { | ||
store, | ||
metrics, | ||
move_call_processor_batch_size, | ||
move_call_processor_parallelism, | ||
} | ||
} | ||
|
||
pub async fn start(&self) -> IndexerResult<()> { | ||
info!("Indexer move call metrics async processor started..."); | ||
let latest_move_call_tx_seq = self.store.get_latest_move_call_tx_seq().await?; | ||
let mut last_processed_tx_seq = latest_move_call_tx_seq.unwrap_or_default().seq; | ||
let latest_move_call_epoch = self.store.get_latest_move_call_metrics().await?; | ||
let mut last_processed_epoch = latest_move_call_epoch.unwrap_or_default().epoch; | ||
loop { | ||
let mut latest_tx = self.store.get_latest_stored_transaction().await?; | ||
while if let Some(tx) = latest_tx { | ||
tx.tx_sequence_number | ||
< last_processed_tx_seq + self.move_call_processor_batch_size as i64 | ||
} else { | ||
true | ||
} { | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
latest_tx = self.store.get_latest_stored_transaction().await?; | ||
} | ||
|
||
let batch_size = self.move_call_processor_batch_size; | ||
let step_size = batch_size / self.move_call_processor_parallelism; | ||
let mut persist_tasks = vec![]; | ||
for chunk_start_tx_seq in (last_processed_tx_seq + 1 | ||
..last_processed_tx_seq + batch_size as i64 + 1) | ||
.step_by(step_size) | ||
{ | ||
let move_call_store = self.store.clone(); | ||
persist_tasks.push(tokio::task::spawn_blocking(move || { | ||
move_call_store.persist_move_calls_in_tx_range( | ||
chunk_start_tx_seq, | ||
chunk_start_tx_seq + step_size as i64, | ||
) | ||
})); | ||
} | ||
futures::future::join_all(persist_tasks) | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>() | ||
.tap_err(|e| { | ||
error!("Error joining move call persist tasks: {:?}", e); | ||
})? | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>() | ||
.tap_err(|e| { | ||
error!("Error persisting move calls: {:?}", e); | ||
})?; | ||
last_processed_tx_seq += batch_size as i64; | ||
info!("Persisted move_calls at tx seq: {}", last_processed_tx_seq); | ||
self.metrics | ||
.latest_move_call_metrics_tx_seq | ||
.set(last_processed_tx_seq); | ||
|
||
let mut tx = self.store.get_tx(last_processed_tx_seq).await?; | ||
while tx.is_none() { | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
tx = self.store.get_tx(last_processed_tx_seq).await?; | ||
} | ||
let cp_seq = tx.unwrap().checkpoint_sequence_number; | ||
let mut cp = self.store.get_cp(cp_seq).await?; | ||
while cp.is_none() { | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
cp = self.store.get_cp(cp_seq).await?; | ||
} | ||
let end_epoch = cp.unwrap().epoch; | ||
for epoch in last_processed_epoch + 1..end_epoch { | ||
self.store | ||
.calculate_and_persist_move_call_metrics(epoch) | ||
.await?; | ||
info!("Persisted move_call_metrics for epoch: {}", epoch); | ||
} | ||
last_processed_epoch = end_epoch - 1; | ||
} | ||
} | ||
} |
129 changes: 129 additions & 0 deletions
129
crates/iota-indexer/src/handlers/network_metrics_processor.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
// Copyright (c) Mysten Labs, Inc. | ||
// Modifications Copyright (c) 2024 IOTA Stiftung | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use tap::tap::TapFallible; | ||
use tracing::{error, info}; | ||
|
||
use crate::errors::IndexerError; | ||
use crate::metrics::IndexerMetrics; | ||
use crate::store::IndexerAnalyticalStore; | ||
use crate::types::IndexerResult; | ||
|
||
const NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 10; | ||
const PARALLELISM: usize = 1; | ||
|
||
pub struct NetworkMetricsProcessor<S> { | ||
pub store: S, | ||
metrics: IndexerMetrics, | ||
pub network_processor_metrics_batch_size: usize, | ||
pub network_processor_metrics_parallelism: usize, | ||
} | ||
|
||
impl<S> NetworkMetricsProcessor<S> | ||
where | ||
S: IndexerAnalyticalStore + Clone + Sync + Send + 'static, | ||
{ | ||
pub fn new(store: S, metrics: IndexerMetrics) -> NetworkMetricsProcessor<S> { | ||
let network_processor_metrics_batch_size = | ||
std::env::var("NETWORK_PROCESSOR_METRICS_BATCH_SIZE") | ||
.map(|s| { | ||
s.parse::<usize>() | ||
.unwrap_or(NETWORK_METRICS_PROCESSOR_BATCH_SIZE) | ||
}) | ||
.unwrap_or(NETWORK_METRICS_PROCESSOR_BATCH_SIZE); | ||
let network_processor_metrics_parallelism = | ||
std::env::var("NETWORK_PROCESSOR_METRICS_PARALLELISM") | ||
.map(|s| s.parse::<usize>().unwrap_or(PARALLELISM)) | ||
.unwrap_or(PARALLELISM); | ||
Self { | ||
store, | ||
metrics, | ||
network_processor_metrics_batch_size, | ||
network_processor_metrics_parallelism, | ||
} | ||
} | ||
|
||
pub async fn start(&self) -> IndexerResult<()> { | ||
info!("Indexer network metrics async processor started..."); | ||
let latest_tx_count_metrics = self | ||
.store | ||
.get_latest_tx_count_metrics() | ||
.await | ||
.unwrap_or_default(); | ||
let latest_epoch_peak_tps = self | ||
.store | ||
.get_latest_epoch_peak_tps() | ||
.await | ||
.unwrap_or_default(); | ||
let mut last_processed_cp_seq = latest_tx_count_metrics | ||
.unwrap_or_default() | ||
.checkpoint_sequence_number; | ||
let mut last_processed_peak_tps_epoch = latest_epoch_peak_tps.unwrap_or_default().epoch; | ||
loop { | ||
let mut latest_stored_checkpoint = self.store.get_latest_stored_checkpoint().await?; | ||
while if let Some(cp) = latest_stored_checkpoint { | ||
cp.sequence_number | ||
< last_processed_cp_seq + self.network_processor_metrics_batch_size as i64 | ||
} else { | ||
true | ||
} { | ||
tokio::time::sleep(std::time::Duration::from_secs(1)).await; | ||
latest_stored_checkpoint = self.store.get_latest_stored_checkpoint().await?; | ||
} | ||
|
||
info!( | ||
"Persisting tx count metrics for checkpoint sequence number {}", | ||
last_processed_cp_seq | ||
); | ||
let batch_size = self.network_processor_metrics_batch_size; | ||
let step_size = batch_size / self.network_processor_metrics_parallelism; | ||
let mut persist_tasks = vec![]; | ||
for chunk_start_cp in (last_processed_cp_seq + 1 | ||
..last_processed_cp_seq + batch_size as i64 + 1) | ||
.step_by(step_size) | ||
{ | ||
let store = self.store.clone(); | ||
persist_tasks.push(tokio::task::spawn_blocking(move || { | ||
store | ||
.persist_tx_count_metrics(chunk_start_cp, chunk_start_cp + step_size as i64) | ||
})); | ||
} | ||
futures::future::join_all(persist_tasks) | ||
.await | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>() | ||
.tap_err(|e| { | ||
error!("Error joining network persist tasks: {:?}", e); | ||
})? | ||
.into_iter() | ||
.collect::<Result<Vec<_>, _>>() | ||
.tap_err(|e| { | ||
error!("Error persisting tx count metrics: {:?}", e); | ||
})?; | ||
last_processed_cp_seq += batch_size as i64; | ||
info!( | ||
"Persisted tx count metrics for checkpoint sequence number {}", | ||
last_processed_cp_seq | ||
); | ||
self.metrics | ||
.latest_network_metrics_cp_seq | ||
.set(last_processed_cp_seq); | ||
|
||
let end_cp = self | ||
.store | ||
.get_checkpoints_in_range(last_processed_cp_seq, last_processed_cp_seq + 1) | ||
.await? | ||
.first() | ||
.ok_or(IndexerError::PostgresReadError( | ||
"Cannot read checkpoint from PG for epoch peak TPS".to_string(), | ||
))? | ||
.clone(); | ||
for epoch in last_processed_peak_tps_epoch + 1..end_cp.epoch { | ||
self.store.persist_epoch_peak_tps(epoch).await?; | ||
last_processed_peak_tps_epoch = epoch; | ||
info!("Persisted epoch peak TPS for epoch {}", epoch); | ||
} | ||
} | ||
} | ||
} |
Oops, something went wrong.