Skip to content

Commit

Permalink
chore: try to fix pg lock timeout (#864)
Browse files Browse the repository at this point in the history
* chore: try to fix pg lock timeout

* chore: add logs for insert collab

* chore: add timeout for duplicate

* chore: timeout for pending write
  • Loading branch information
appflowy authored Oct 7, 2024
1 parent 820db65 commit 33e63fa
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 74 deletions.
12 changes: 12 additions & 0 deletions libs/database-entity/src/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ pub struct CollabParams {
pub embeddings: Option<AFCollabEmbeddings>,
}

impl Display for CollabParams {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"object_id: {}, collab_type: {:?}, size:{}",
self.object_id,
self.collab_type,
self.encoded_collab_v1.len()
)
}
}

impl CollabParams {
pub fn new<T: ToString>(
object_id: T,
Expand Down
33 changes: 18 additions & 15 deletions libs/database/src/collab/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ impl CollabCache {
params: &CollabParams,
transaction: &mut Transaction<'_, sqlx::Postgres>,
) -> Result<(), AppError> {
let collab_type = params.collab_type.clone();
let object_id = params.object_id.clone();
let encode_collab_data = params.encoded_collab_v1.clone();
self
Expand All @@ -152,21 +153,23 @@ impl CollabCache {

// when the data is written to the disk cache but fails to be written to the memory cache
// we log the error and continue.
if let Err(err) = self
.mem_cache
.insert_encode_collab_data(
&object_id,
&encode_collab_data,
chrono::Utc::now().timestamp(),
Some(cache_exp_secs_from_collab_type(&params.collab_type)),
)
.await
{
error!(
"Failed to insert encode collab into memory cache: {:?}",
err
);
}
let mem_cache = self.mem_cache.clone();
tokio::spawn(async move {
if let Err(err) = mem_cache
.insert_encode_collab_data(
&object_id,
&encode_collab_data,
chrono::Utc::now().timestamp(),
Some(cache_exp_secs_from_collab_type(&collab_type)),
)
.await
{
error!(
"Failed to insert encode collab into memory cache: {:?}",
err
);
}
});

Ok(())
}
Expand Down
10 changes: 9 additions & 1 deletion libs/database/src/collab/collab_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ pub trait CollabStorage: Send + Sync + 'static {
uid: &i64,
params: CollabParams,
transaction: &mut Transaction<'_, sqlx::Postgres>,
action_description: &str,
) -> AppResult<()>;

/// Retrieves a collaboration from the storage.
Expand Down Expand Up @@ -216,10 +217,17 @@ where
uid: &i64,
params: CollabParams,
transaction: &mut Transaction<'_, sqlx::Postgres>,
action_description: &str,
) -> AppResult<()> {
self
.as_ref()
.insert_new_collab_with_transaction(workspace_id, uid, params, transaction)
.insert_new_collab_with_transaction(
workspace_id,
uid,
params,
transaction,
action_description,
)
.await
}

Expand Down
84 changes: 47 additions & 37 deletions services/appflowy-collaborate/src/collab/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use collab::lock::Mutex;
use collab_entity::CollabType;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;

use tokio::time::{interval, sleep, sleep_until, Instant};
use tracing::{error, instrument, trace, warn};

Expand Down Expand Up @@ -237,7 +238,7 @@ impl StorageQueue {
/// Spawn a task that periodically checks the number of active connections in the PostgreSQL pool
/// It aims to adjust the write interval based on the number of active connections.
fn spawn_period_check_pg_conn_count(pg_pool: PgPool, next_duration: Arc<Mutex<Duration>>) {
let mut interval = interval(tokio::time::Duration::from_secs(5));
let mut interval = interval(tokio::time::Duration::from_secs(10));
tokio::spawn(async move {
loop {
interval.tick().await;
Expand All @@ -247,7 +248,7 @@ fn spawn_period_check_pg_conn_count(pg_pool: PgPool, next_duration: Arc<Mutex<Du
*next_duration.lock().await = Duration::from_secs(1);
},
_ => {
*next_duration.lock().await = Duration::from_secs(2);
*next_duration.lock().await = Duration::from_secs(5);
},
}
}
Expand Down Expand Up @@ -284,7 +285,7 @@ fn spawn_period_write(
);
}

let chunk_keys = consume_pending_write(&pending_write_set, 30, 10).await;
let chunk_keys = consume_pending_write(&pending_write_set, 20, 5).await;
if chunk_keys.is_empty() {
continue;
}
Expand All @@ -299,35 +300,33 @@ fn spawn_period_write(
let cloned_total_write_count = total_write_count.clone();
let cloned_total_success_write_count = success_write_count.clone();

tokio::spawn(async move {
if let Ok(metas) = get_pending_meta(&keys, &mut cloned_connection_manager).await {
if metas.is_empty() {
error!("the pending write keys is not empty, but metas is empty");
return;
}

match retry_write_pending_to_disk(&cloned_collab_cache, metas).await {
Ok(success_result) => {
#[cfg(debug_assertions)]
tracing::info!("success write pending: {:?}", keys,);

trace!("{:?}", success_result);
cloned_total_write_count.fetch_add(
success_result.expected as i64,
std::sync::atomic::Ordering::Relaxed,
);
cloned_total_success_write_count.fetch_add(
success_result.success as i64,
std::sync::atomic::Ordering::Relaxed,
);
},
Err(err) => error!("{:?}", err),
}
// Remove pending metadata from Redis even if some records fail to write to disk after retries.
// Records that fail repeatedly are considered potentially corrupt or invalid.
let _ = remove_pending_meta(&keys, &mut cloned_connection_manager).await;
if let Ok(metas) = get_pending_meta(&keys, &mut cloned_connection_manager).await {
if metas.is_empty() {
error!("the pending write keys is not empty, but metas is empty");
return;
}

match retry_write_pending_to_disk(&cloned_collab_cache, metas).await {
Ok(success_result) => {
#[cfg(debug_assertions)]
tracing::info!("success write pending: {:?}", keys,);

trace!("{:?}", success_result);
cloned_total_write_count.fetch_add(
success_result.expected as i64,
std::sync::atomic::Ordering::Relaxed,
);
cloned_total_success_write_count.fetch_add(
success_result.success as i64,
std::sync::atomic::Ordering::Relaxed,
);
},
Err(err) => error!("{:?}", err),
}
});
// Remove pending metadata from Redis even if some records fail to write to disk after retries.
// Records that fail repeatedly are considered potentially corrupt or invalid.
let _ = remove_pending_meta(&keys, &mut cloned_connection_manager).await;
}
}
}
});
Expand Down Expand Up @@ -442,13 +441,15 @@ async fn write_pending_to_disk(
.map_err(AppError::from)?;

// Insert each record into the database within the transaction context
let mut action_description = String::new();
for (index, record) in records.into_iter().enumerate() {
let params = CollabParams {
object_id: record.object_id.clone(),
collab_type: record.collab_type,
encoded_collab_v1: record.encode_collab_v1,
embeddings: record.embeddings,
};
action_description = format!("{}", params);
let savepoint_name = format!("sp_{}", index);

// using savepoint to rollback the transaction if the insert fails
Expand All @@ -468,12 +469,21 @@ async fn write_pending_to_disk(
}

// Commit the transaction to finalize all writes
transaction
.commit()
.await
.context("Failed to commit the transaction for pending collaboration data")
.map_err(AppError::from)?;
Ok(success_write_objects)
match tokio::time::timeout(Duration::from_secs(10), transaction.commit()).await {
Ok(result) => {
result.map_err(AppError::from)?;
Ok(success_write_objects)
},
Err(_) => {
error!(
"Timeout waiting for committing the transaction for pending write:{}",
action_description
);
Err(AppError::Internal(anyhow!(
"Timeout when committing the transaction for pending collaboration data"
)))
},
}
}

const MAXIMUM_CHUNK_SIZE: usize = 5 * 1024 * 1024;
Expand Down
44 changes: 34 additions & 10 deletions services/appflowy-collaborate/src/collab/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use collab_rt_entity::ClientCollabMessage;
use itertools::{Either, Itertools};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use sqlx::Transaction;

use tokio::time::timeout;
use tracing::warn;
use tracing::{error, instrument, trace};
Expand Down Expand Up @@ -188,7 +189,7 @@ where
HashMap::new()
},
Err(_) => {
error!("Timeout waiting for encode collab from realtime server");
error!("Timeout waiting for batch encode collab from realtime server");
HashMap::new()
},
}
Expand Down Expand Up @@ -308,10 +309,19 @@ where
.update_policy(uid, &params.object_id, AFAccessLevel::FullAccess)
.await?;
}
self
.batch_insert_collabs(workspace_id, uid, params_list)
.await?;
Ok(())

match tokio::time::timeout(
Duration::from_secs(60),
self.batch_insert_collabs(workspace_id, uid, params_list),
)
.await
{
Ok(result) => result,
Err(_) => {
error!("Timeout waiting for action completed",);
Err(AppError::RequestTimeout("".to_string()))
},
}
}

#[instrument(level = "trace", skip(self, params), oid = %params.oid, ty = %params.collab_type, err)]
Expand All @@ -322,6 +332,7 @@ where
uid: &i64,
params: CollabParams,
transaction: &mut Transaction<'_, sqlx::Postgres>,
action_description: &str,
) -> AppResult<()> {
params.validate()?;
self
Expand All @@ -331,11 +342,24 @@ where
.access_control
.update_policy(uid, &params.object_id, AFAccessLevel::FullAccess)
.await?;
self
.cache
.insert_encode_collab_data(workspace_id, uid, &params, transaction)
.await?;
Ok(())

match tokio::time::timeout(
Duration::from_secs(120),
self
.cache
.insert_encode_collab_data(workspace_id, uid, &params, transaction),
)
.await
{
Ok(result) => result,
Err(_) => {
error!(
"Timeout waiting for action completed: {}",
action_description
);
Err(AppError::RequestTimeout(action_description.to_string()))
},
}
}

#[instrument(level = "trace", skip_all, fields(oid = %params.object_id, from_editing_collab = %from_editing_collab))]
Expand Down
4 changes: 3 additions & 1 deletion src/api/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,11 @@ async fn create_collab_handler(
.await
.context("acquire transaction to upsert collab")
.map_err(AppError::from)?;

let action = format!("Create new collab: {}", params);
state
.collab_access_control_storage
.insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction)
.insert_new_collab_with_transaction(&workspace_id, &uid, params, &mut transaction, &action)
.await?;

transaction
Expand Down
4 changes: 4 additions & 0 deletions src/biz/user/user_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ where
embeddings: None,
},
txn,
"initialize workspace for user",
)
.await?;

Expand Down Expand Up @@ -143,6 +144,7 @@ pub(crate) async fn create_user_awareness(
embeddings: None,
},
txn,
"create user awareness",
)
.await?;
Ok(object_id)
Expand Down Expand Up @@ -179,6 +181,7 @@ pub(crate) async fn create_workspace_collab(
embeddings: None,
},
txn,
"create workspace collab",
)
.await?;
Ok(())
Expand Down Expand Up @@ -217,6 +220,7 @@ pub(crate) async fn create_workspace_database_collab(
embeddings: None,
},
txn,
"create database collab",
)
.await?;

Expand Down
Loading

0 comments on commit 33e63fa

Please sign in to comment.