Skip to content

Commit

Permalink
Fix protocol handler retrieval thread CPU usage when no blocks are pr… (
Browse files Browse the repository at this point in the history
#4617)

* Fix protocol handler retrieval thread CPU usage when no blocks are processed

* Fix thread name max length

Signed-off-by: Jean-François <[email protected]>

* Shorten more thread name + static assert

* Update note message

* Clippy fixes

---------

Signed-off-by: Jean-François <[email protected]>
Co-authored-by: sydhds <[email protected]>
Co-authored-by: Jean-François <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2024
1 parent 4563909 commit 5697687
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 10 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ serial_test = "2.0"
sha2 = "=0.10"
sha3 = "=0.10"
socket2 = "0.5"
static_assertions = "1.1"
stream_limiter = "3.2"
structopt = "0.3"
strum = "0.25"
Expand Down
5 changes: 4 additions & 1 deletion massa-node/src/survey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,16 @@ impl MassaSurvey {
if massa_metrics.is_enabled() {
#[cfg(all(not(feature = "sandbox"), not(test)))]
{
// massa-survey
const THREAD_NAME: &str = "massa-survey";

let mut data_sent = 0;
let mut data_received = 0;
let (tx_stop, rx_stop) =
MassaChannel::new("massa_survey_stop".to_string(), Some(1));
let update_tick = tick(tick_delay);
match std::thread::Builder::new()
.name("massa-survey".to_string())
.name(THREAD_NAME.to_string())
.spawn(move || loop {
select! {
recv(rx_stop) -> _ => {
Expand Down
1 change: 1 addition & 0 deletions massa-protocol-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ massa_serialization = {workspace = true}
massa_signature = {workspace = true}
massa_time = {workspace = true}
massa_versioning = {workspace = true}
static_assertions = {workspace = true}

[dev-dependencies]
tempfile = {workspace = true} # BOM UPGRADE Revert to "3.3" if problem
Expand Down
6 changes: 5 additions & 1 deletion massa-protocol-worker/src/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ use crate::{
wrap_network::NetworkController,
};

// protocol-connectivity
const THREAD_NAME: &str = "p-connectivity";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

#[derive(Clone)]
pub enum ConnectivityCommand {
Stop,
Expand Down Expand Up @@ -82,7 +86,7 @@ pub(crate) fn start_connectivity_thread(
massa_metrics: MassaMetrics,
) -> Result<(MassaSender<ConnectivityCommand>, JoinHandle<()>), ProtocolError> {
let handle = std::thread::Builder::new()
.name("protocol-connectivity".to_string())
.name(THREAD_NAME.to_string())
.spawn({
let sender_endorsements_propagation_ext = protocol_channels.endorsement_handler_propagation.0.clone();
let sender_blocks_retrieval_ext = protocol_channels.block_handler_retrieval.0.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ use std::thread::JoinHandle;
use std::time::Instant;
use tracing::{debug, info, warn};

// protocol-block-handler-propagation
const THREAD_NAME: &str = "pbh-propagation";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

#[derive(Debug)]
struct BlockPropagationData {
/// Time when propagation was initiated
Expand Down Expand Up @@ -222,7 +226,7 @@ pub fn start_propagation_thread(
cache: SharedBlockCache,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("protocol-block-handler-propagation".to_string())
.name(THREAD_NAME.to_string())
.spawn(move || {
let block_serializer = MessagesSerializer::new()
.with_block_message_serializer(BlockMessageSerializer::new());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ use super::{
BlockMessageSerializer,
};

// protocol-block-handler-retrieval
const THREAD_NAME: &str = "pbh-retrieval";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

/// Info about a block we've seen
#[derive(Debug, Clone)]
pub(crate) struct BlockInfo {
Expand Down Expand Up @@ -933,6 +937,9 @@ impl RetrievalThread {
.expect("could not compute next block retrieval timer tick");

if self.asked_blocks.is_empty() && self.block_wishlist.is_empty() {
// Note: in mainnet and before genesis, no blocks are processed but the timer needs to be updated
// or the thread will use the CPU at 100%
self.next_timer_ask_block = next_tick;
return;
}

Expand Down Expand Up @@ -1275,7 +1282,7 @@ pub fn start_retrieval_thread(
let block_message_serializer =
MessagesSerializer::new().with_block_message_serializer(BlockMessageSerializer::new());
std::thread::Builder::new()
.name("protocol-block-handler-retrieval".to_string())
.name(THREAD_NAME.to_string())
.spawn(move || {
let mut retrieval_thread = RetrievalThread {
active_connections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use massa_storage::Storage;
use std::thread::JoinHandle;
use tracing::{info, log::warn};

// protocol-endorsement-handler-propagation
const THREAD_NAME: &str = "peh-propagation";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

/// Endorsements need to propagate fast, so no buffering
struct PropagationThread {
receiver: MassaReceiver<EndorsementHandlerPropagationCommand>,
Expand Down Expand Up @@ -142,7 +146,7 @@ pub fn start_propagation_thread(
active_connections: Box<dyn ActiveConnectionsTrait>,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("protocol-endorsement-handler-propagation".to_string())
.name(THREAD_NAME.to_string())
.spawn(move || {
let endorsement_serializer = MessagesSerializer::new()
.with_endorsement_message_serializer(EndorsementMessageSerializer::new());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ use super::{
messages::{EndorsementMessageDeserializer, EndorsementMessageDeserializerArgs},
};

// protocol-endorsement-handler-retrieval
const THREAD_NAME: &str = "peh-retrieval";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

pub struct RetrievalThread {
receiver: MassaReceiver<PeerMessageTuple>,
receiver_ext: MassaReceiver<EndorsementHandlerRetrievalCommand>,
Expand Down Expand Up @@ -302,7 +306,7 @@ pub fn start_retrieval_thread(
chain_id: config.chain_id,
});
std::thread::Builder::new()
.name("protocol-endorsement-handler-retrieval".to_string())
.name(THREAD_NAME.to_string())
.spawn(move || {
let mut retrieval_thread = RetrievalThread {
receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ use super::{
OperationMessageSerializer,
};

// protocol-operation-handler-propagation
const THREAD_NAME: &str = "poh-tester";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

struct PropagationThread {
internal_receiver: MassaReceiver<OperationHandlerPropagationCommand>,
active_connections: Box<dyn ActiveConnectionsTrait>,
Expand Down Expand Up @@ -206,7 +210,7 @@ pub fn start_propagation_thread(
massa_metrics: MassaMetrics,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("protocol-operation-handler-propagation".to_string())
.name(THREAD_NAME.to_string())
.spawn(move || {
let mut propagation_thread = PropagationThread {
internal_receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ use super::{
OperationMessageSerializer,
};

// protocol-operation-handler-retrieval
const THREAD_NAME: &str = "poh-retrieval";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

/// Structure containing a Batch of `operation_ids` we would like to ask
/// to a `peer_id` now or later. Mainly used in protocol and translated into
/// simple combination of a `peer_id` and `operations_prefix_ids`
Expand Down Expand Up @@ -478,7 +482,7 @@ pub fn start_retrieval_thread(
massa_metrics: MassaMetrics,
) -> JoinHandle<()> {
std::thread::Builder::new()
.name("protocol-operation-handler-retrieval".to_string())
.name(THREAD_NAME.to_string())
.spawn(move || {
let mut retrieval_thread = RetrievalThread {
receiver,
Expand Down
6 changes: 5 additions & 1 deletion massa-protocol-worker/src/handlers/peer_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ pub struct PeerManagementHandler {
testers: Vec<Tester>,
}

// protocol-peer-handler
const THREAD_NAME: &str = "pph";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

impl PeerManagementHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
Expand Down Expand Up @@ -97,7 +101,7 @@ impl PeerManagementHandler {
);

let thread_join = std::thread::Builder::new()
.name("protocol-peer-handler".to_string())
.name(THREAD_NAME.to_string())
.spawn({
let peer_db = peer_db.clone();
let ticker = tick(Duration::from_secs(10));
Expand Down
6 changes: 5 additions & 1 deletion massa-protocol-worker/src/handlers/peer_handler/tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ use super::{
SharedPeerDB,
};
use crate::wrap_network::ActiveConnectionsTrait;

const THREAD_NAME: &str = "pph-tester";
static_assertions::const_assert!(THREAD_NAME.len() < 16);

pub struct Tester {
pub handler: Option<JoinHandle<()>>,
}
Expand Down Expand Up @@ -275,7 +279,7 @@ impl Tester {
massa_metrics: MassaMetrics,
) -> Self {
let handle = std::thread::Builder::new()
.name("protocol-peer-handler-tester".to_string())
.name(THREAD_NAME.to_string())
.spawn(move || {
let db = peer_db;
let active_connections = active_connections.clone();
Expand Down

0 comments on commit 5697687

Please sign in to comment.