Skip to content

Commit

Permalink
WIP reproduce notifications job load
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Apr 26, 2024
1 parent 51fc6f6 commit 9ed8781
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 84 deletions.
56 changes: 25 additions & 31 deletions bats/core/notifications/notifications.bats
Original file line number Diff line number Diff line change
Expand Up @@ -89,41 +89,35 @@ NOTIFICATIONS_PROTO_FILE="${REPO_ROOT}/core/notifications/proto/notifications.pr

declare -a user_ids

for i in $(seq 1 500); do

create_user "user_$i"
exec_graphql "user_$i" 'identity'
user_id="$(graphql_output '.data.me.id')"
user_ids+=("$user_id")

request_data=$(jq -n --arg userId "$user_id" --arg locale "es" '{
for i in $(seq 1 10000); do
request_data=$(jq -n --arg userId "$i" --arg locale "es" '{
"userId": $userId,
"locale": $locale
}')
grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$update_user_locale_method" "$request_data"

done

localized_content_en='{"title": "Hello", "body": "World"}'
localized_content_es='{"title": "Hola", "body": "World"}'
user_ids=$(printf '%s\n' "${user_ids[@]}" | jq -R . | jq -s .)
request_data=$(jq -n \
--argjson user_ids "$user_ids" \
--argjson localized_content_en "$localized_content_en" \
--argjson localized_content_es "$localized_content_es" \
'{
"event": {
"marketingNotificationTriggered": {
"user_ids": $user_ids,
"localized_push_content": {
"en": $localized_content_en,
"es": $localized_content_es
}
}
}
}')

handle_notification_event_method="services.notifications.v1.NotificationsService/HandleNotificationEvent"
grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$handle_notification_event_method" "$request_data"
done

# localized_content_en='{"title": "Hello", "body": "World"}'
# localized_content_es='{"title": "Hola", "body": "World"}'
# user_ids=$(printf '%s\n' "${user_ids[@]}" | jq -R . | jq -s .)
# request_data=$(jq -n \
# --argjson user_ids "$user_ids" \
# --argjson localized_content_en "$localized_content_en" \
# --argjson localized_content_es "$localized_content_es" \
# '{
# "event": {
# "marketingNotificationTriggered": {
# "user_ids": $user_ids,
# "localized_push_content": {
# "en": $localized_content_en,
# "es": $localized_content_es
# }
# }
# }
# }')

# handle_notification_event_method="services.notifications.v1.NotificationsService/HandleNotificationEvent"
# grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$handle_notification_event_method" "$request_data"

}
4 changes: 3 additions & 1 deletion core/notifications/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,12 @@ impl NotificationsApp {
marketing_notification: MarketingNotificationTriggered,
) -> Result<(), ApplicationError> {
let mut tx = self.pool.begin().await?;
let mut user_ids: Vec<GaloyUserId> = user_ids.into_iter().collect();
user_ids.sort();
job::spawn_multi_user_event_dispatch(
&mut tx,
(
user_ids.into_iter().collect(),
user_ids,
NotificationEventPayload::from(marketing_notification),
),
)
Expand Down
56 changes: 8 additions & 48 deletions core/notifications/src/job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod config;
mod multi_user_event_dispatch;
mod send_email_notification;
mod send_push_notification;

Expand All @@ -20,6 +21,7 @@ use crate::{

pub use config::*;
use error::JobError;
use multi_user_event_dispatch::MultiUserEventDispatchData;
use send_email_notification::SendEmailNotificationData;
use send_push_notification::SendPushNotificationData;

Expand Down Expand Up @@ -187,41 +189,13 @@ async fn multi_user_event_dispatch(
) -> Result<(), JobError> {
let pool = current_job.pool().clone();
JobExecutor::builder(&mut current_job)
.initial_retry_delay(std::time::Duration::from_secs(20))
.build()
.expect("couldn't build JobExecutor")
.execute(|data| async move {
let data: MultiUserEventDispatchData =
data.expect("no MultiUserEventDispatchData available");
let batch_limit = 1000;
let (ids, next_user_ids) = data
.user_ids
.split_at(std::cmp::min(data.user_ids.len(), batch_limit));

let mut tx = pool.begin().await?;
if !next_user_ids.is_empty() {
let data = MultiUserEventDispatchData {
user_ids: next_user_ids.to_vec(),
payload: data.payload.clone(),
tracing_data: HashMap::default(),
};
spawn_multi_user_event_dispatch(&mut tx, data).await?;
}

let payload = data.payload.clone();

history.add_events(&mut tx, ids, payload.clone()).await?;

for user_id in ids {
if payload.should_send_email() {
spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
if payload.should_send_push() {
spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
}
Ok::<_, JobError>(JobResult::CompleteWithTx(tx))
multi_user_event_dispatch::execute(data, history, pool).await
})
.await?;
Ok(())
Expand Down Expand Up @@ -252,9 +226,12 @@ pub async fn spawn_send_push_notification(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
data: impl Into<SendPushNotificationData>,
) -> Result<(), JobError> {
println!("spawn_send_push_notification");
let data = data.into();
if let Err(e) = send_push_notification
.builder()
.set_retry_backoff(std::time::Duration::from_secs(20))
.set_ordered(true)
.set_json(&data)
.expect("Couldn't set json")
.spawn(&mut **tx)
Expand Down Expand Up @@ -290,6 +267,7 @@ pub async fn spawn_multi_user_event_dispatch(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
data: impl Into<MultiUserEventDispatchData>,
) -> Result<(), JobError> {
println!("spawn_multi_user_event_dispatch");
let data = data.into();
if let Err(e) = multi_user_event_dispatch
.builder()
Expand Down Expand Up @@ -418,21 +396,3 @@ impl From<()> for LinkEmailReminderData {
}
}
}

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct MultiUserEventDispatchData {
user_ids: Vec<GaloyUserId>,
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, serde_json::Value>,
}

impl From<(Vec<GaloyUserId>, NotificationEventPayload)> for MultiUserEventDispatchData {
fn from((user_ids, payload): (Vec<GaloyUserId>, NotificationEventPayload)) -> Self {
Self {
user_ids,
payload,
tracing_data: HashMap::default(),
}
}
}
87 changes: 87 additions & 0 deletions core/notifications/src/job/multi_user_event_dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use serde::{Deserialize, Serialize};
use tracing::instrument;

use std::collections::HashMap;

use crate::{
history::NotificationHistory, notification_event::NotificationEventPayload,
primitives::GaloyUserId,
};
use job_executor::JobResult;

use super::error::JobError;

#[derive(Debug, Serialize, Deserialize)]
pub(super) struct MultiUserEventDispatchData {
user_ids: Vec<GaloyUserId>,
payload: NotificationEventPayload,
#[serde(flatten)]
pub(super) tracing_data: HashMap<String, serde_json::Value>,
}

impl From<(Vec<GaloyUserId>, NotificationEventPayload)> for MultiUserEventDispatchData {
fn from((user_ids, payload): (Vec<GaloyUserId>, NotificationEventPayload)) -> Self {
Self {
user_ids,
payload,
tracing_data: HashMap::default(),
}
}
}

#[instrument(
name = "job.multi_user_event_dispatch",
skip(history, pool),
fields(first_id, last_id, ids_len, next_ids_len),
err
)]
pub async fn execute(
data: MultiUserEventDispatchData,
history: NotificationHistory,
pool: sqlx::PgPool,
) -> Result<JobResult, JobError> {
let batch_limit = 10;
let (ids, next_user_ids) = data
.user_ids
.split_at(std::cmp::min(data.user_ids.len(), batch_limit));
let span = tracing::Span::current();
if ids.len() > 0 {
span.record("first_id", &tracing::field::display(&ids[0]));
span.record("last_id", &tracing::field::display(&ids[ids.len() - 1]));
span.record("ids_len", &tracing::field::display(ids.len()));
span.record(
"next_ids_len",
&tracing::field::display(next_user_ids.len()),
);
}
println!(
"multi_user_event_dispatch: {}, {}",
ids[0],
ids[ids.len() - 1]
);
let mut tx = pool.begin().await?;
if !next_user_ids.is_empty() {
let data = MultiUserEventDispatchData {
user_ids: next_user_ids.to_vec(),
payload: data.payload.clone(),
tracing_data: HashMap::default(),
};
super::spawn_multi_user_event_dispatch(&mut tx, data).await?;
}

let payload = data.payload.clone();

history.add_events(&mut tx, ids, payload.clone()).await?;

for user_id in ids {
if payload.should_send_email() {
super::spawn_send_email_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
if payload.should_send_push() {
super::spawn_send_push_notification(&mut tx, (user_id.clone(), payload.clone()))
.await?;
}
}
Ok::<_, JobError>(JobResult::CompleteWithTx(tx))
}
2 changes: 1 addition & 1 deletion core/notifications/src/primitives.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};

#[derive(Hash, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
#[derive(Hash, Ord, PartialOrd, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct GaloyUserId(String);
impl GaloyUserId {
pub fn search_begin() -> Self {
Expand Down
8 changes: 8 additions & 0 deletions dev/config/notifications/notifications-alt.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
app:
push_executor:
fcm:
google_application_credentials_path: "dev/config/notifications/fake_service_account.json"
subgraph_server:
port: 1234
grpc_server:
port: 2345
2 changes: 1 addition & 1 deletion dev/config/notifications/notifications.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
app:
push_executor:
fcm:
google_application_credentials_path: "./config/notifications/fake_service_account.json"
google_application_credentials_path: "dev/config/notifications/fake_service_account.json"
4 changes: 2 additions & 2 deletions lib/job-executor-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ impl<'a> JobExecutor<'a> {
let max_interval = self.max_retry_delay;
let handle = tokio::spawn(async move {
loop {
tokio::time::sleep(interval / 2).await;
interval = max_interval.min(interval * 2);
if let Err(e) = sqlx::query("SELECT mq_keep_alive(ARRAY[$1], $2)")
.bind(id)
.bind(interval)
Expand All @@ -110,6 +108,8 @@ impl<'a> JobExecutor<'a> {
tracing::error!("Failed to keep job {id} alive: {e}");
break;
}
tokio::time::sleep(interval / 2).await;
interval = max_interval.min(interval * 2);
}
});
KeepAliveHandle::new(handle)
Expand Down
54 changes: 54 additions & 0 deletions load.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash

set -xe

source "bats/helpers/_common.bash"
source "bats/helpers/user.bash"
source "bats/helpers/onchain.bash"

NOTIFICATIONS_GRPC_ENDPOINT="localhost:6685"
IMPORT_PATH="${REPO_ROOT}/core/notifications/proto"
NOTIFICATIONS_PROTO_FILE="${REPO_ROOT}/core/notifications/proto/notifications.proto"

# update_user_locale_method="services.notifications.v1.NotificationsService/UpdateUserLocale"

# for i in $(seq 1 10000); do
# request_data=$(jq -n --arg userId "$i" --arg locale "es" '{
# "userId": $userId,
# "locale": $locale
# }')
# grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$update_user_locale_method" "$request_data"

# done
localized_content_en='{"title": "Hello", "body": "World"}'
localized_content_es='{"title": "Hola", "body": "World"}'
# Generate user_ids array from "1" to "10000"
user_ids=($(seq -f "%04g" 1 8000))

# Convert user_ids array to a JSON array of strings
user_ids=$(printf '%s\n' "${user_ids[@]}" | jq -R . | jq -s .)

# Create the JSON request payload using jq
request_data=$(jq -n \
--argjson user_ids "$user_ids" \
--argjson localized_content_en "$localized_content_en" \
--argjson localized_content_es "$localized_content_es" \
'{
"event": {
"marketingNotificationTriggered": {
"user_ids": $user_ids,
"localized_push_content": {
"en": $localized_content_en,
"es": $localized_content_es
}
}
}
}')

echo $request_data | jq
# Specify the GRPC method to call
handle_notification_event_method="services.notifications.v1.NotificationsService/HandleNotificationEvent"

# Make the GRPC request using grpcurl
grpcurl_request $IMPORT_PATH $NOTIFICATIONS_PROTO_FILE $NOTIFICATIONS_GRPC_ENDPOINT "$handle_notification_event_method" "$request_data"

0 comments on commit 9ed8781

Please sign in to comment.