diff --git a/bats/core/notifications/notifications.bats b/bats/core/notifications/notifications.bats index 81cf6378c6..8866e95cb0 100644 --- a/bats/core/notifications/notifications.bats +++ b/bats/core/notifications/notifications.bats @@ -10,6 +10,10 @@ setup_file() { create_user 'alice' } +NOTIFICATIONS_GRPC_ENDPOINT="localhost:6685" +IMPORT_PATH="${REPO_ROOT}/core/notifications/proto" +NOTIFICATIONS_PROTO_FILE="${REPO_ROOT}/core/notifications/proto/notifications.proto" + @test "notifications: list stateful transactions" { btc_wallet_name="alice.btc_wallet_id" amount="0.01" @@ -79,3 +83,41 @@ setup_file() { acknowledged_at=$(graphql_output '.data.statefulNotificationAcknowledge.notification.acknowledgedAt') [[ "$acknowledged_at" != "null" ]] || exit 1 } + +@test "notifications: load test" { + update_user_locale_method="services.notifications.v1.NotificationsService/UpdateUserLocale" + + declare -a user_ids + + for i in $(seq 1 10000); do + request_data=$(jq -n --arg userId "$i" --arg locale "es" '{ + "userId": $userId, + "locale": $locale + }') + grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$update_user_locale_method" "$request_data" + + done + + # localized_content_en='{"title": "Hello", "body": "World"}' + # localized_content_es='{"title": "Hola", "body": "World"}' + # user_ids=$(printf '%s\n' "${user_ids[@]}" | jq -R . | jq -s .) + # request_data=$(jq -n \ + # --argjson user_ids "$user_ids" \ + # --argjson localized_content_en "$localized_content_en" \ + # --argjson localized_content_es "$localized_content_es" \ + # '{ + # "event": { + # "marketingNotificationTriggered": { + # "user_ids": $user_ids, + # "localized_push_content": { + # "en": $localized_content_en, + # "es": $localized_content_es + # } + # } + # } + # }') + + # handle_notification_event_method="services.notifications.v1.NotificationsService/HandleNotificationEvent" + # grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$handle_notification_event_method" "$request_data" + +} diff --git a/core/notifications/src/app/mod.rs b/core/notifications/src/app/mod.rs index a8cd365c2b..51f5e53a3d 100644 --- a/core/notifications/src/app/mod.rs +++ b/core/notifications/src/app/mod.rs @@ -268,10 +268,12 @@ impl NotificationsApp { marketing_notification: MarketingNotificationTriggered, ) -> Result<(), ApplicationError> { let mut tx = self.pool.begin().await?; + let mut user_ids: Vec = user_ids.into_iter().collect(); + user_ids.sort(); job::spawn_multi_user_event_dispatch( &mut tx, ( - user_ids.into_iter().collect(), + user_ids, NotificationEventPayload::from(marketing_notification), ), ) diff --git a/core/notifications/src/job/mod.rs b/core/notifications/src/job/mod.rs index fe8c0a0ee4..13ab387bf1 100644 --- a/core/notifications/src/job/mod.rs +++ b/core/notifications/src/job/mod.rs @@ -1,4 +1,5 @@ mod config; +mod multi_user_event_dispatch; mod send_email_notification; mod send_push_notification; @@ -20,6 +21,7 @@ use crate::{ pub use config::*; use error::JobError; +use multi_user_event_dispatch::MultiUserEventDispatchData; use send_email_notification::SendEmailNotificationData; use send_push_notification::SendPushNotificationData; @@ -187,41 +189,13 @@ async fn multi_user_event_dispatch( ) -> Result<(), JobError> { let pool = current_job.pool().clone(); JobExecutor::builder(&mut current_job) + .initial_retry_delay(std::time::Duration::from_secs(20)) .build() .expect("couldn't build JobExecutor") .execute(|data| async move { let data: MultiUserEventDispatchData = data.expect("no MultiUserEventDispatchData available"); - let batch_limit = 1000; - let (ids, next_user_ids) = data - .user_ids - .split_at(std::cmp::min(data.user_ids.len(), batch_limit)); - - let mut tx = pool.begin().await?; - if !next_user_ids.is_empty() { - let data = MultiUserEventDispatchData { - user_ids: next_user_ids.to_vec(), - payload: data.payload.clone(), - tracing_data: HashMap::default(), - }; - spawn_multi_user_event_dispatch(&mut tx, data).await?; - } - - let payload = data.payload.clone(); - - history.add_events(&mut tx, ids, payload.clone()).await?; - - for user_id in ids { - if payload.should_send_email() { - spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone())) - .await?; - } - if payload.should_send_push() { - spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone())) - .await?; - } - } - Ok::<_, JobError>(JobResult::CompleteWithTx(tx)) + multi_user_event_dispatch::execute(data, history, pool).await }) .await?; Ok(()) @@ -252,9 +226,12 @@ pub async fn spawn_send_push_notification( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, data: impl Into, ) -> Result<(), JobError> { + println!("spawn_send_push_notification"); let data = data.into(); if let Err(e) = send_push_notification .builder() + .set_retry_backoff(std::time::Duration::from_secs(20)) + .set_ordered(true) .set_json(&data) .expect("Couldn't set json") .spawn(&mut **tx) @@ -290,6 +267,7 @@ pub async fn spawn_multi_user_event_dispatch( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, data: impl Into, ) -> Result<(), JobError> { + println!("spawn_multi_user_event_dispatch"); let data = data.into(); if let Err(e) = multi_user_event_dispatch .builder() @@ -418,21 +396,3 @@ impl From<()> for LinkEmailReminderData { } } } - -#[derive(Debug, Serialize, Deserialize)] -pub(super) struct MultiUserEventDispatchData { - user_ids: Vec, - payload: NotificationEventPayload, - #[serde(flatten)] - pub(super) tracing_data: HashMap, -} - -impl From<(Vec, NotificationEventPayload)> for MultiUserEventDispatchData { - fn from((user_ids, payload): (Vec, NotificationEventPayload)) -> Self { - Self { - user_ids, - payload, - tracing_data: HashMap::default(), - } - } -} diff --git a/core/notifications/src/job/multi_user_event_dispatch.rs b/core/notifications/src/job/multi_user_event_dispatch.rs new file mode 100644 index 0000000000..934e7b82af --- /dev/null +++ b/core/notifications/src/job/multi_user_event_dispatch.rs @@ -0,0 +1,87 @@ +use serde::{Deserialize, Serialize}; +use tracing::instrument; + +use std::collections::HashMap; + +use crate::{ + history::NotificationHistory, notification_event::NotificationEventPayload, + primitives::GaloyUserId, +}; +use job_executor::JobResult; + +use super::error::JobError; + +#[derive(Debug, Serialize, Deserialize)] +pub(super) struct MultiUserEventDispatchData { + user_ids: Vec, + payload: NotificationEventPayload, + #[serde(flatten)] + pub(super) tracing_data: HashMap, +} + +impl From<(Vec, NotificationEventPayload)> for MultiUserEventDispatchData { + fn from((user_ids, payload): (Vec, NotificationEventPayload)) -> Self { + Self { + user_ids, + payload, + tracing_data: HashMap::default(), + } + } +} + +#[instrument( + name = "job.multi_user_event_dispatch", + skip(history, pool), + fields(first_id, last_id, ids_len, next_ids_len), + err +)] +pub async fn execute( + data: MultiUserEventDispatchData, + history: NotificationHistory, + pool: sqlx::PgPool, +) -> Result { + let batch_limit = 10; + let (ids, next_user_ids) = data + .user_ids + .split_at(std::cmp::min(data.user_ids.len(), batch_limit)); + let span = tracing::Span::current(); + if ids.len() > 0 { + span.record("first_id", &tracing::field::display(&ids[0])); + span.record("last_id", &tracing::field::display(&ids[ids.len() - 1])); + span.record("ids_len", &tracing::field::display(ids.len())); + span.record( + "next_ids_len", + &tracing::field::display(next_user_ids.len()), + ); + } + println!( + "multi_user_event_dispatch: {}, {}", + ids[0], + ids[ids.len() - 1] + ); + let mut tx = pool.begin().await?; + if !next_user_ids.is_empty() { + let data = MultiUserEventDispatchData { + user_ids: next_user_ids.to_vec(), + payload: data.payload.clone(), + tracing_data: HashMap::default(), + }; + super::spawn_multi_user_event_dispatch(&mut tx, data).await?; + } + + let payload = data.payload.clone(); + + history.add_events(&mut tx, ids, payload.clone()).await?; + + for user_id in ids { + if payload.should_send_email() { + super::spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone())) + .await?; + } + if payload.should_send_push() { + super::spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone())) + .await?; + } + } + Ok::<_, JobError>(JobResult::CompleteWithTx(tx)) +} diff --git a/core/notifications/src/primitives.rs b/core/notifications/src/primitives.rs index 23adc12f4f..92367e8caf 100644 --- a/core/notifications/src/primitives.rs +++ b/core/notifications/src/primitives.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Hash, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +#[derive(Hash, Ord, PartialOrd, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct GaloyUserId(String); impl GaloyUserId { pub fn search_begin() -> Self { diff --git a/dev/config/notifications/notifications-alt.yml b/dev/config/notifications/notifications-alt.yml new file mode 100644 index 0000000000..fc925a5c6b --- /dev/null +++ b/dev/config/notifications/notifications-alt.yml @@ -0,0 +1,8 @@ +app: + push_executor: + fcm: + google_application_credentials_path: "dev/config/notifications/fake_service_account.json" +subgraph_server: + port: 1234 +grpc_server: + port: 2345 diff --git a/dev/config/notifications/notifications.yml b/dev/config/notifications/notifications.yml index 1f1702c732..8d484fa4b3 100644 --- a/dev/config/notifications/notifications.yml +++ b/dev/config/notifications/notifications.yml @@ -1,4 +1,4 @@ app: push_executor: fcm: - google_application_credentials_path: "./config/notifications/fake_service_account.json" + google_application_credentials_path: "dev/config/notifications/fake_service_account.json" diff --git a/lib/job-executor-rs/src/lib.rs b/lib/job-executor-rs/src/lib.rs index 2b693fe616..24198d7944 100644 --- a/lib/job-executor-rs/src/lib.rs +++ b/lib/job-executor-rs/src/lib.rs @@ -99,8 +99,6 @@ impl<'a> JobExecutor<'a> { let max_interval = self.max_retry_delay; let handle = tokio::spawn(async move { loop { - tokio::time::sleep(interval / 2).await; - interval = max_interval.min(interval * 2); if let Err(e) = sqlx::query("SELECT mq_keep_alive(ARRAY[$1], $2)") .bind(id) .bind(interval) @@ -110,6 +108,8 @@ impl<'a> JobExecutor<'a> { tracing::error!("Failed to keep job {id} alive: {e}"); break; } + tokio::time::sleep(interval / 2).await; + interval = max_interval.min(interval * 2); } }); KeepAliveHandle::new(handle) diff --git a/load.sh b/load.sh new file mode 100755 index 0000000000..1ef2c6ecb6 --- /dev/null +++ b/load.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +set -xe + +source "bats/helpers/_common.bash" +source "bats/helpers/user.bash" +source "bats/helpers/onchain.bash" + +NOTIFICATIONS_GRPC_ENDPOINT="localhost:6685" +IMPORT_PATH="${REPO_ROOT}/core/notifications/proto" +NOTIFICATIONS_PROTO_FILE="${REPO_ROOT}/core/notifications/proto/notifications.proto" + + # update_user_locale_method="services.notifications.v1.NotificationsService/UpdateUserLocale" + + # for i in $(seq 1 10000); do + # request_data=$(jq -n --arg userId "$i" --arg locale "es" '{ + # "userId": $userId, + # "locale": $locale + # }') + # grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$update_user_locale_method" "$request_data" + + # done + localized_content_en='{"title": "Hello", "body": "World"}' + localized_content_es='{"title": "Hola", "body": "World"}' + # Generate user_ids array from "1" to "10000" +user_ids=($(seq -f "%04g" 1 8000)) + +# Convert user_ids array to a JSON array of strings +user_ids=$(printf '%s\n' "${user_ids[@]}" | jq -R . | jq -s .) + +# Create the JSON request payload using jq +request_data=$(jq -n \ + --argjson user_ids "$user_ids" \ + --argjson localized_content_en "$localized_content_en" \ + --argjson localized_content_es "$localized_content_es" \ + '{ + "event": { + "marketingNotificationTriggered": { + "user_ids": $user_ids, + "localized_push_content": { + "en": $localized_content_en, + "es": $localized_content_es + } + } + } +}') + +echo $request_data | jq +# Specify the GRPC method to call +handle_notification_event_method="services.notifications.v1.NotificationsService/HandleNotificationEvent" + +# Make the GRPC request using grpcurl +grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$handle_notification_event_method" "$request_data" +