From 33e63fae5c71bbb9c0d2268d0b5e437901bb56b5 Mon Sep 17 00:00:00 2001 From: "Nathan.fooo" <86001920+appflowy@users.noreply.github.com> Date: Mon, 7 Oct 2024 16:49:50 +0800 Subject: [PATCH] chore: try to fix pg lock timeout (#864) * chore: try to fix pg lock timeout * chore: add logs for insert collab * chore: add timeout for duplicate * chore: timeout for pending write --- libs/database-entity/src/dto.rs | 12 +++ libs/database/src/collab/cache.rs | 33 ++++---- libs/database/src/collab/collab_storage.rs | 10 ++- .../appflowy-collaborate/src/collab/queue.rs | 84 +++++++++++-------- .../src/collab/storage.rs | 44 +++++++--- src/api/workspace.rs | 4 +- src/biz/user/user_init.rs | 4 + src/biz/workspace/publish_dup.rs | 48 ++++++++--- 8 files changed, 165 insertions(+), 74 deletions(-) diff --git a/libs/database-entity/src/dto.rs b/libs/database-entity/src/dto.rs index 978db060c..9d7fd4ab3 100644 --- a/libs/database-entity/src/dto.rs +++ b/libs/database-entity/src/dto.rs @@ -77,6 +77,18 @@ pub struct CollabParams { pub embeddings: Option, } +impl Display for CollabParams { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "object_id: {}, collab_type: {:?}, size:{}", + self.object_id, + self.collab_type, + self.encoded_collab_v1.len() + ) + } +} + impl CollabParams { pub fn new( object_id: T, diff --git a/libs/database/src/collab/cache.rs b/libs/database/src/collab/cache.rs index a07b92308..c69d623f0 100644 --- a/libs/database/src/collab/cache.rs +++ b/libs/database/src/collab/cache.rs @@ -143,6 +143,7 @@ impl CollabCache { params: &CollabParams, transaction: &mut Transaction<'_, sqlx::Postgres>, ) -> Result<(), AppError> { + let collab_type = params.collab_type.clone(); let object_id = params.object_id.clone(); let encode_collab_data = params.encoded_collab_v1.clone(); self @@ -152,21 +153,23 @@ impl CollabCache { // when the data is written to the disk cache but fails to be written to the memory cache // we log the error and continue. - if let Err(err) = self - .mem_cache - .insert_encode_collab_data( - &object_id, - &encode_collab_data, - chrono::Utc::now().timestamp(), - Some(cache_exp_secs_from_collab_type(¶ms.collab_type)), - ) - .await - { - error!( - "Failed to insert encode collab into memory cache: {:?}", - err - ); - } + let mem_cache = self.mem_cache.clone(); + tokio::spawn(async move { + if let Err(err) = mem_cache + .insert_encode_collab_data( + &object_id, + &encode_collab_data, + chrono::Utc::now().timestamp(), + Some(cache_exp_secs_from_collab_type(&collab_type)), + ) + .await + { + error!( + "Failed to insert encode collab into memory cache: {:?}", + err + ); + } + }); Ok(()) } diff --git a/libs/database/src/collab/collab_storage.rs b/libs/database/src/collab/collab_storage.rs index 240e31c08..8ca3277b9 100644 --- a/libs/database/src/collab/collab_storage.rs +++ b/libs/database/src/collab/collab_storage.rs @@ -106,6 +106,7 @@ pub trait CollabStorage: Send + Sync + 'static { uid: &i64, params: CollabParams, transaction: &mut Transaction<'_, sqlx::Postgres>, + action_description: &str, ) -> AppResult<()>; /// Retrieves a collaboration from the storage. @@ -216,10 +217,17 @@ where uid: &i64, params: CollabParams, transaction: &mut Transaction<'_, sqlx::Postgres>, + action_description: &str, ) -> AppResult<()> { self .as_ref() - .insert_new_collab_with_transaction(workspace_id, uid, params, transaction) + .insert_new_collab_with_transaction( + workspace_id, + uid, + params, + transaction, + action_description, + ) .await } diff --git a/services/appflowy-collaborate/src/collab/queue.rs b/services/appflowy-collaborate/src/collab/queue.rs index 501414d98..914b42f2b 100644 --- a/services/appflowy-collaborate/src/collab/queue.rs +++ b/services/appflowy-collaborate/src/collab/queue.rs @@ -10,6 +10,7 @@ use collab::lock::Mutex; use collab_entity::CollabType; use serde::{Deserialize, Serialize}; use sqlx::PgPool; + use tokio::time::{interval, sleep, sleep_until, Instant}; use tracing::{error, instrument, trace, warn}; @@ -237,7 +238,7 @@ impl StorageQueue { /// Spawn a task that periodically checks the number of active connections in the PostgreSQL pool /// It aims to adjust the write interval based on the number of active connections. fn spawn_period_check_pg_conn_count(pg_pool: PgPool, next_duration: Arc>) { - let mut interval = interval(tokio::time::Duration::from_secs(5)); + let mut interval = interval(tokio::time::Duration::from_secs(10)); tokio::spawn(async move { loop { interval.tick().await; @@ -247,7 +248,7 @@ fn spawn_period_check_pg_conn_count(pg_pool: PgPool, next_duration: Arc { - *next_duration.lock().await = Duration::from_secs(2); + *next_duration.lock().await = Duration::from_secs(5); }, } } @@ -284,7 +285,7 @@ fn spawn_period_write( ); } - let chunk_keys = consume_pending_write(&pending_write_set, 30, 10).await; + let chunk_keys = consume_pending_write(&pending_write_set, 20, 5).await; if chunk_keys.is_empty() { continue; } @@ -299,35 +300,33 @@ fn spawn_period_write( let cloned_total_write_count = total_write_count.clone(); let cloned_total_success_write_count = success_write_count.clone(); - tokio::spawn(async move { - if let Ok(metas) = get_pending_meta(&keys, &mut cloned_connection_manager).await { - if metas.is_empty() { - error!("the pending write keys is not empty, but metas is empty"); - return; - } - - match retry_write_pending_to_disk(&cloned_collab_cache, metas).await { - Ok(success_result) => { - #[cfg(debug_assertions)] - tracing::info!("success write pending: {:?}", keys,); - - trace!("{:?}", success_result); - cloned_total_write_count.fetch_add( - success_result.expected as i64, - std::sync::atomic::Ordering::Relaxed, - ); - cloned_total_success_write_count.fetch_add( - success_result.success as i64, - std::sync::atomic::Ordering::Relaxed, - ); - }, - Err(err) => error!("{:?}", err), - } - // Remove pending metadata from Redis even if some records fail to write to disk after retries. - // Records that fail repeatedly are considered potentially corrupt or invalid. - let _ = remove_pending_meta(&keys, &mut cloned_connection_manager).await; + if let Ok(metas) = get_pending_meta(&keys, &mut cloned_connection_manager).await { + if metas.is_empty() { + error!("the pending write keys is not empty, but metas is empty"); + return; + } + + match retry_write_pending_to_disk(&cloned_collab_cache, metas).await { + Ok(success_result) => { + #[cfg(debug_assertions)] + tracing::info!("success write pending: {:?}", keys,); + + trace!("{:?}", success_result); + cloned_total_write_count.fetch_add( + success_result.expected as i64, + std::sync::atomic::Ordering::Relaxed, + ); + cloned_total_success_write_count.fetch_add( + success_result.success as i64, + std::sync::atomic::Ordering::Relaxed, + ); + }, + Err(err) => error!("{:?}", err), } - }); + // Remove pending metadata from Redis even if some records fail to write to disk after retries. + // Records that fail repeatedly are considered potentially corrupt or invalid. + let _ = remove_pending_meta(&keys, &mut cloned_connection_manager).await; + } } } }); @@ -442,6 +441,7 @@ async fn write_pending_to_disk( .map_err(AppError::from)?; // Insert each record into the database within the transaction context + let mut action_description = String::new(); for (index, record) in records.into_iter().enumerate() { let params = CollabParams { object_id: record.object_id.clone(), @@ -449,6 +449,7 @@ async fn write_pending_to_disk( encoded_collab_v1: record.encode_collab_v1, embeddings: record.embeddings, }; + action_description = format!("{}", params); let savepoint_name = format!("sp_{}", index); // using savepoint to rollback the transaction if the insert fails @@ -468,12 +469,21 @@ async fn write_pending_to_disk( } // Commit the transaction to finalize all writes - transaction - .commit() - .await - .context("Failed to commit the transaction for pending collaboration data") - .map_err(AppError::from)?; - Ok(success_write_objects) + match tokio::time::timeout(Duration::from_secs(10), transaction.commit()).await { + Ok(result) => { + result.map_err(AppError::from)?; + Ok(success_write_objects) + }, + Err(_) => { + error!( + "Timeout waiting for committing the transaction for pending write:{}", + action_description + ); + Err(AppError::Internal(anyhow!( + "Timeout when committing the transaction for pending collaboration data" + ))) + }, + } } const MAXIMUM_CHUNK_SIZE: usize = 5 * 1024 * 1024; diff --git a/services/appflowy-collaborate/src/collab/storage.rs b/services/appflowy-collaborate/src/collab/storage.rs index 969de897a..58416e5c8 100644 --- a/services/appflowy-collaborate/src/collab/storage.rs +++ b/services/appflowy-collaborate/src/collab/storage.rs @@ -9,6 +9,7 @@ use collab_rt_entity::ClientCollabMessage; use itertools::{Either, Itertools}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use sqlx::Transaction; + use tokio::time::timeout; use tracing::warn; use tracing::{error, instrument, trace}; @@ -188,7 +189,7 @@ where HashMap::new() }, Err(_) => { - error!("Timeout waiting for encode collab from realtime server"); + error!("Timeout waiting for batch encode collab from realtime server"); HashMap::new() }, } @@ -308,10 +309,19 @@ where .update_policy(uid, ¶ms.object_id, AFAccessLevel::FullAccess) .await?; } - self - .batch_insert_collabs(workspace_id, uid, params_list) - .await?; - Ok(()) + + match tokio::time::timeout( + Duration::from_secs(60), + self.batch_insert_collabs(workspace_id, uid, params_list), + ) + .await + { + Ok(result) => result, + Err(_) => { + error!("Timeout waiting for action completed",); + Err(AppError::RequestTimeout("".to_string())) + }, + } } #[instrument(level = "trace", skip(self, params), oid = %params.oid, ty = %params.collab_type, err)] @@ -322,6 +332,7 @@ where uid: &i64, params: CollabParams, transaction: &mut Transaction<'_, sqlx::Postgres>, + action_description: &str, ) -> AppResult<()> { params.validate()?; self @@ -331,11 +342,24 @@ where .access_control .update_policy(uid, ¶ms.object_id, AFAccessLevel::FullAccess) .await?; - self - .cache - .insert_encode_collab_data(workspace_id, uid, ¶ms, transaction) - .await?; - Ok(()) + + match tokio::time::timeout( + Duration::from_secs(120), + self + .cache + .insert_encode_collab_data(workspace_id, uid, ¶ms, transaction), + ) + .await + { + Ok(result) => result, + Err(_) => { + error!( + "Timeout waiting for action completed: {}", + action_description + ); + Err(AppError::RequestTimeout(action_description.to_string())) + }, + } } #[instrument(level = "trace", skip_all, fields(oid = %params.object_id, from_editing_collab = %from_editing_collab))] diff --git a/src/api/workspace.rs b/src/api/workspace.rs index 874e0e5e2..01f98409c 100644 --- a/src/api/workspace.rs +++ b/src/api/workspace.rs @@ -589,9 +589,11 @@ async fn create_collab_handler( .await .context("acquire transaction to upsert collab") .map_err(AppError::from)?; + + let action = format!("Create new collab: {}", params); state .collab_access_control_storage - .insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction) + .insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction, &action) .await?; transaction diff --git a/src/biz/user/user_init.rs b/src/biz/user/user_init.rs index 0d07068c1..5c46b8fbb 100644 --- a/src/biz/user/user_init.rs +++ b/src/biz/user/user_init.rs @@ -65,6 +65,7 @@ where embeddings: None, }, txn, + "initialize workspace for user", ) .await?; @@ -143,6 +144,7 @@ pub(crate) async fn create_user_awareness( embeddings: None, }, txn, + "create user awareness", ) .await?; Ok(object_id) @@ -179,6 +181,7 @@ pub(crate) async fn create_workspace_collab( embeddings: None, }, txn, + "create workspace collab", ) .await?; Ok(()) @@ -217,6 +220,7 @@ pub(crate) async fn create_workspace_database_collab( embeddings: None, }, txn, + "create database collab", ) .await?; diff --git a/src/biz/workspace/publish_dup.rs b/src/biz/workspace/publish_dup.rs index a07e6c12f..d9daafe96 100644 --- a/src/biz/workspace/publish_dup.rs +++ b/src/biz/workspace/publish_dup.rs @@ -34,7 +34,10 @@ use shared_entity::dto::workspace_dto; use shared_entity::dto::workspace_dto::ViewLayout; use sqlx::PgPool; use std::collections::HashSet; +use std::time::Duration; use std::{collections::HashMap, sync::Arc}; + +use tracing::error; use workspace_template::gen_view_id; use yrs::updates::encoder::Encode; use yrs::Any; @@ -172,17 +175,20 @@ impl PublishCollabDuplicator { // for self.collabs_to_insert let mut txn = pg_pool.begin().await?; for (oid, (collab_type, encoded_collab)) in collabs_to_insert.into_iter() { + let params = CollabParams { + object_id: oid.clone(), + encoded_collab_v1: encoded_collab.into(), + collab_type, + embeddings: None, + }; + let action = format!("duplicate collab: {}", params); collab_storage .insert_new_collab_with_transaction( &dest_workspace_id, &duplicator_uid, - CollabParams { - object_id: oid.clone(), - encoded_collab_v1: encoded_collab.into(), - collab_type, - embeddings: None, - }, + params, &mut txn, + &action, ) .await?; } @@ -243,6 +249,7 @@ impl PublishCollabDuplicator { embeddings: None, }, &mut txn, + "duplicate workspace database collab", ) .await?; broadcast_update(&collab_storage, &ws_db_oid, ws_db_updates).await?; @@ -334,14 +341,35 @@ impl PublishCollabDuplicator { embeddings: None, }, &mut txn, + "duplicate folder collab", ) .await?; - // broadcast folder changes - broadcast_update(&collab_storage, &dest_workspace_id, encoded_update).await?; + match tokio::time::timeout(Duration::from_secs(60), txn.commit()).await { + Ok(result) => result.map_err(AppError::from), + Err(_) => { + error!("Timeout waiting for duplicating collabs"); + Err(AppError::RequestTimeout( + "timeout while duplicating".to_string(), + )) + }, + }?; - txn.commit().await?; - Ok(()) + // broadcast folder changes + match tokio::time::timeout( + Duration::from_secs(30), + broadcast_update(&collab_storage, &dest_workspace_id, encoded_update), + ) + .await + { + Ok(result) => result.map_err(AppError::from), + Err(_) => { + error!("Timeout waiting for broadcasting the updates"); + Err(AppError::RequestTimeout( + "timeout while duplicating".to_string(), + )) + }, + } } /// Deep copy a published collab to the destination workspace.