Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the # of messages prioritized in the queue to the http resp… #5043

Merged
merged 13 commits into from
Jan 6, 2025
Merged
31 changes: 24 additions & 7 deletions rust/main/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ typetag = "0.2"
uint = "0.9.5"
ureq = { version = "2.4", default-features = false }
url = "2.3"
uuid = { version = "1.11.0", features = ["v4"] }
walkdir = "2"
warp = "0.3"
which = "4.3"
Expand Down
3 changes: 3 additions & 0 deletions rust/main/agents/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tokio-metrics.workspace = true
tracing-futures.workspace = true
tracing.workspace = true
typetag.workspace = true
uuid.workspace = true

hyperlane-core = { path = "../../hyperlane-core", features = [
"agent",
Expand All @@ -53,12 +54,14 @@ hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] }
hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" }

[dev-dependencies]
axum = { workspace = true, features = ["macros"] }
once_cell.workspace = true
mockall.workspace = true
tokio-test.workspace = true
hyperlane-test = { path = "../../hyperlane-test" }
hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] }
hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "async", "test-utils"] }
tracing-test.workspace = true

[features]
default = ["color-eyre", "oneline-errors"]
Expand Down
87 changes: 75 additions & 12 deletions rust/main/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use prometheus::{IntGauge, IntGaugeVec};
use tokio::sync::{broadcast::Receiver, Mutex};
use tracing::{debug, info, instrument};

use crate::settings::matching_list::MatchingList;
use crate::server::{MessageRetryRequest, MessageRetryResponse};

pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>>;

Expand All @@ -16,7 +16,7 @@ pub type OperationPriorityQueue = Arc<Mutex<BinaryHeap<Reverse<QueueOperation>>>
pub struct OpQueue {
metrics: IntGaugeVec,
queue_metrics_label: String,
retry_rx: Arc<Mutex<Receiver<MatchingList>>>,
retry_receiver: Arc<Mutex<Receiver<MessageRetryRequest>>>,
#[new(default)]
pub queue: OperationPriorityQueue,
}
Expand Down Expand Up @@ -74,27 +74,71 @@ impl OpQueue {
// The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task
// that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now.
let mut message_retry_requests = vec![];
while let Ok(message_id) = self.retry_rx.lock().await.try_recv() {
message_retry_requests.push(message_id);

while let Ok(retry_request) = self.retry_receiver.lock().await.try_recv() {
let uuid = retry_request.uuid.clone();
message_retry_requests.push((
retry_request,
MessageRetryResponse {
uuid,
evaluated: 0,
matched: 0,
},
));
}

if message_retry_requests.is_empty() {
return;
}

let mut queue = self.queue.lock().await;
let queue_length = queue.len();

let mut reprioritized_queue: BinaryHeap<_> = queue
.drain()
.map(|Reverse(mut op)| {
if message_retry_requests.iter().any(|r| r.op_matches(&op)) {
let matched_requests: Vec<_> = message_retry_requests
.iter_mut()
.filter_map(|(retry_req, retry_response)| {
let match_res = retry_req.pattern.op_matches(&op);
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
// update retry metrics
if match_res {
retry_response.matched += 1;
Some(retry_req.uuid.clone())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this isn't used anymore? think we can go back to using .any instead of filter_map and avoid this allocation

} else {
None
}
})
.collect();

let matches = !matched_requests.is_empty();
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
if matches {
info!(
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
operation = %op,
queue_label = %self.queue_metrics_label,
"Retrying OpQueue operation"
);
op.reset_attempts()
op.reset_attempts();
for matched_req in matched_requests {
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
info!(uuid = matched_req, "Matched request");
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
}
}
Reverse(op)
})
.collect();

for (retry_req, mut retry_response) in message_retry_requests {
retry_response.evaluated = queue_length;
tracing::debug!(
uuid = retry_response.uuid,
evaluated = retry_response.evaluated,
matched = retry_response.matched,
"Sending relayer retry response back"
);
if let Err(err) = retry_req.transmitter.send(retry_response).await {
tracing::error!(err = err.to_string(), "Failed to send retry response");
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
}
}
queue.append(&mut reprioritized_queue);
}

Expand All @@ -112,7 +156,10 @@ impl OpQueue {

#[cfg(test)]
pub mod test {
use crate::{server::ENDPOINT_MESSAGES_QUEUE_SIZE, settings::matching_list::MatchingList};

use super::*;

use hyperlane_core::{
HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneDomainTechnicalStack,
HyperlaneDomainType, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult,
Expand All @@ -124,7 +171,7 @@ pub mod test {
str::FromStr,
time::{Duration, Instant},
};
use tokio::sync;
use tokio::sync::{self, mpsc};

#[derive(Debug, Clone, Serialize)]
pub struct MockPendingOperation {
Expand Down Expand Up @@ -317,6 +364,7 @@ pub mod test {
async fn test_multiple_op_queues_message_id() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);

let mut op_queue_1 = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Expand Down Expand Up @@ -361,12 +409,22 @@ pub mod test {
.await;
}

let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE);

// Retry by message ids
broadcaster
.send(MatchingList::with_message_id(op_ids[1]))
.send(MessageRetryRequest {
uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(),
pattern: MatchingList::with_message_id(op_ids[1]),
transmitter: transmitter.clone(),
})
.unwrap();
broadcaster
.send(MatchingList::with_message_id(op_ids[2]))
.send(MessageRetryRequest {
uuid: "59400966-e7fa-4fb9-9372-9a671d4392c3".to_string(),
pattern: MatchingList::with_message_id(op_ids[2]),
transmitter,
})
.unwrap();

// Pop elements from queue 1
Expand Down Expand Up @@ -396,6 +454,7 @@ pub mod test {
async fn test_destination_domain() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);

let mut op_queue = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Expand All @@ -422,11 +481,15 @@ pub mod test {
.await;
}

let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE);

// Retry by domain
broadcaster
.send(MatchingList::with_destination_domain(
destination_domain_2.id(),
))
.send(MessageRetryRequest {
uuid: "a5b39473-7cc5-48a1-8bed-565454ba1037".to_string(),
pattern: MatchingList::with_destination_domain(destination_domain_2.id()),
transmitter,
})
.unwrap();

// Pop elements from queue
Expand Down
4 changes: 2 additions & 2 deletions rust/main/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use hyperlane_core::{
};

use crate::msg::pending_message::CONFIRM_DELAY;
use crate::settings::matching_list::MatchingList;
use crate::server::MessageRetryRequest;

use super::op_queue::OpQueue;
use super::op_queue::OperationPriorityQueue;
Expand Down Expand Up @@ -105,7 +105,7 @@ impl SerialSubmitter {
pub fn new(
domain: HyperlaneDomain,
rx: mpsc::UnboundedReceiver<QueueOperation>,
retry_op_transmitter: Sender<MatchingList>,
retry_op_transmitter: Sender<MessageRetryRequest>,
metrics: SerialSubmitterMetrics,
max_batch_size: u32,
task_monitor: TaskMonitor,
Expand Down
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,11 @@ impl BaseAgent for Relayer {
}));
tasks.push(console_server.instrument(info_span!("Tokio console server")));
}
let sender = BroadcastSender::<MatchingList>::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
let sender = BroadcastSender::new(ENDPOINT_MESSAGES_QUEUE_SIZE);
// send channels by destination chain
let mut send_channels = HashMap::with_capacity(self.destination_chains.len());
let mut prep_queues = HashMap::with_capacity(self.destination_chains.len());

for (dest_domain, dest_conf) in &self.destination_chains {
let (send_channel, receive_channel) = mpsc::unbounded_channel::<QueueOperation>();
send_channels.insert(dest_domain.id(), send_channel);
Expand Down
1 change: 1 addition & 0 deletions rust/main/agents/relayer/src/server/list_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ mod tests {
fn setup_test_server() -> (SocketAddr, OperationPriorityQueue) {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);

let op_queue = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Expand Down
Loading
Loading