Skip to content

Commit

Permalink
chore: added locks with timeouts (#765)
Browse files Browse the repository at this point in the history
* chore: added locks with timeouts

* chore: roll back collab locks in client api
  • Loading branch information
Horusiath authored Aug 29, 2024
1 parent 3b79ac5 commit 2af1999
Show file tree
Hide file tree
Showing 25 changed files with 161 additions and 147 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,12 @@ debug = true
[patch.crates-io]
# It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate.
# So using patch to workaround this issue.
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" }
collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }
collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" }

[features]
history = []
Expand Down
8 changes: 4 additions & 4 deletions libs/client-api-test/src/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use collab::core::collab::DataSource;
use collab::core::collab_state::SyncState;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::entity::EncodedCollab;
use collab::lock::{Mutex, RwLock};
use collab::preclude::{Collab, Prelim};
use collab_entity::CollabType;
use collab_folder::Folder;
use collab_user::core::UserAwareness;
use mime::Mime;
use serde::Deserialize;
use serde_json::{json, Value};
use tokio::sync::{Mutex, RwLock};
use tokio::time::{sleep, timeout, Duration};
use tokio_stream::StreamExt;
use tracing::trace;
Expand Down Expand Up @@ -583,7 +583,7 @@ impl TestClient {
.await
.unwrap();

let collab = Arc::new(RwLock::new(collab)) as CollabRef;
let collab = Arc::new(RwLock::from(collab)) as CollabRef;
#[cfg(feature = "collab-sync")]
{
let handler = self
Expand Down Expand Up @@ -655,7 +655,7 @@ impl TestClient {
)
.unwrap();
collab.emit_awareness_state();
let collab = Arc::new(RwLock::new(collab)) as CollabRef;
let collab = Arc::new(RwLock::from(collab)) as CollabRef;

#[cfg(feature = "collab-sync")]
{
Expand Down Expand Up @@ -818,7 +818,7 @@ pub async fn assert_server_collab(
let duration = Duration::from_secs(timeout_secs);
let collab_type = collab_type.clone();
let object_id = object_id.to_string();
let final_json = Arc::new(Mutex::new(json!({})));
let final_json = Arc::new(Mutex::from(json!({})));

// Use tokio::time::timeout to apply a timeout to the entire operation
let cloned_final_json = final_json.clone();
Expand Down
25 changes: 14 additions & 11 deletions libs/client-api/src/collab_sync/collab_sink.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use crate::af_spawn;
use crate::collab_sync::collab_stream::SeqNumCounter;

use crate::collab_sync::{SinkConfig, SyncError, SyncObject};
use anyhow::Error;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab_rt_entity::{ClientCollabMessage, MsgId, ServerCollabMessage, SinkMessage};
use futures_util::SinkExt;
use std::collections::BinaryHeap;
use std::collections::{HashMap, HashSet};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, watch, Mutex};

use anyhow::Error;
use collab::core::origin::{CollabClient, CollabOrigin};
use collab::lock::Mutex;
use futures_util::SinkExt;
use tokio::sync::{broadcast, watch};
use tokio::time::{interval, sleep};
use tracing::{error, trace, warn};

use collab_rt_entity::{ClientCollabMessage, MsgId, ServerCollabMessage, SinkMessage};

use crate::af_spawn;
use crate::collab_sync::collab_stream::SeqNumCounter;
use crate::collab_sync::{SinkConfig, SyncError, SyncObject};

pub(crate) const SEND_INTERVAL: Duration = Duration::from_secs(8);
pub const COLLAB_SINK_DELAY_MILLIS: u64 = 500;

Expand Down Expand Up @@ -63,7 +66,7 @@ where
config: SinkConfig,
) -> Self {
let notifier = Arc::new(notifier);
let sender = Arc::new(Mutex::new(sink));
let sender = Arc::new(Mutex::from(sink));
let message_queue = Arc::new(parking_lot::Mutex::new(SinkQueue::new()));
let sending_messages = Arc::new(parking_lot::Mutex::new(HashSet::new()));
let state = Arc::new(CollabSinkState::new());
Expand Down Expand Up @@ -513,7 +516,7 @@ impl SyncTimestamp {
fn new() -> Self {
let now = Instant::now();
SyncTimestamp {
last_sync: Mutex::new(now.checked_sub(Duration::from_secs(60)).unwrap_or(now)),
last_sync: Mutex::from(now.checked_sub(Duration::from_secs(60)).unwrap_or(now)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion libs/client-api/src/collab_sync/collab_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use std::time::Duration;

use arc_swap::ArcSwap;
use collab::core::origin::CollabOrigin;
use collab::lock::RwLock;
use collab::preclude::Collab;
use futures_util::{SinkExt, StreamExt};
use tokio::select;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, trace, warn};
use yrs::encoding::read::Cursor;
Expand Down
26 changes: 14 additions & 12 deletions libs/client-api/src/ws/client.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::{Arc, Weak};
use std::time::Duration;

use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use parking_lot::RwLock;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use semver::Version;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tokio::sync::Mutex;
use tracing::{error, info, trace, warn};

use crate::ws::msg_queue::{AggregateMessageQueue, AggregateMessagesReceiver};
use crate::ws::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel};
use crate::ServerFixIntervalPing;
use crate::{af_spawn, retry_connect};
use client_websocket::{CloseCode, CloseFrame, Message, WebSocketStream};
use collab_rt_entity::user::UserMessage;
use collab_rt_entity::ClientCollabMessage;
use collab_rt_entity::ServerCollabMessage;
use collab_rt_entity::{RealtimeMessage, SystemMessage};
use tokio::sync::{oneshot, Mutex};
use tracing::{error, info, trace, warn};

use crate::ws::msg_queue::{AggregateMessageQueue, AggregateMessagesReceiver};
use crate::ws::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel};
use crate::ServerFixIntervalPing;
use crate::{af_spawn, retry_connect};

pub struct WSClientConfig {
/// specifies the number of messages that the channel can hold at any given
Expand Down Expand Up @@ -91,7 +93,7 @@ impl WSClient {
let (ws_msg_sender, _) = channel(config.buffer_capacity);
let state_notify = Arc::new(parking_lot::Mutex::new(ConnectStateNotify::new()));
let channels = Arc::new(RwLock::new(HashMap::new()));
let ping = Arc::new(Mutex::new(None));
let ping = Arc::new(Mutex::from(None));
let http_sender = Arc::new(http_sender);
let (user_channel, _) = channel(1);
let (rt_msg_sender, _) = channel(config.buffer_capacity);
Expand All @@ -106,7 +108,7 @@ impl WSClient {
user_channel: Arc::new(user_channel),
channels,
ping,
stop_ws_msg_loop_tx: Mutex::new(None),
stop_ws_msg_loop_tx: Mutex::from(None),
aggregate_queue,

#[cfg(debug_assertions)]
Expand Down
11 changes: 7 additions & 4 deletions libs/client-api/src/ws/msg_queue.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use client_websocket::Message;
use collab_rt_entity::RealtimeMessage;
use collab_rt_entity::{ClientCollabMessage, MsgId};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};

use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::time::{sleep_until, Instant};
use tracing::{error, trace};

use client_websocket::Message;
use collab_rt_entity::RealtimeMessage;
use collab_rt_entity::{ClientCollabMessage, MsgId};

pub type AggregateMessagesSender = mpsc::Sender<Message>;
pub type AggregateMessagesReceiver = mpsc::Receiver<Message>;

Expand Down
2 changes: 1 addition & 1 deletion libs/collab-rt-protocol/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::sync::Arc;
use collab::core::awareness::{Awareness, AwarenessUpdate};
use collab::core::collab::{TransactionExt, TransactionMutExt};
use collab::core::origin::CollabOrigin;
use collab::lock::RwLock;
use collab::preclude::Collab;
use tokio::sync::RwLock;
use yrs::updates::decoder::Decode;
use yrs::updates::encoder::{Encode, Encoder};
use yrs::{ReadTxn, StateVector, Transact, Update};
Expand Down
3 changes: 1 addition & 2 deletions libs/workspace-template/src/document/getting_started.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;

use std::sync::Arc;

use anyhow::Error;
use async_trait::async_trait;
use collab::core::origin::CollabOrigin;
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_database::database::{timestamp, DatabaseData};
use collab_database::entity::CreateDatabaseParams;
Expand All @@ -13,7 +13,6 @@ use collab_document::document::Document;
use collab_entity::CollabType;
use collab_folder::ViewLayout;
use serde_json::Value;
use tokio::sync::RwLock;

use crate::database::database_collab::create_database_collab;
use crate::document::parser::JsonToDocumentParser;
Expand Down
4 changes: 2 additions & 2 deletions libs/workspace-template/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ pub use anyhow::Result;
use async_trait::async_trait;
use collab::core::origin::CollabOrigin;
use collab::entity::EncodedCollab;
use collab::lock::RwLock;
use collab::preclude::Collab;
use collab_entity::CollabType;
use collab_folder::{
timestamp, Folder, FolderData, RepeatedViewIdentifier, ViewIdentifier, ViewLayout, Workspace,
};
use tokio::sync::RwLock;

use crate::hierarchy_builder::{FlattedViews, WorkspaceViewBuilder};

Expand Down Expand Up @@ -91,7 +91,7 @@ impl WorkspaceTemplateBuilder {
}

pub async fn build(&self) -> Result<Vec<TemplateData>> {
let workspace_view_builder = Arc::new(RwLock::new(WorkspaceViewBuilder::new(
let workspace_view_builder = Arc::new(RwLock::from(WorkspaceViewBuilder::new(
self.workspace_id.clone(),
self.uid,
)));
Expand Down
19 changes: 11 additions & 8 deletions libs/workspace-template/src/tests/getting_started_tests.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
use crate::document::getting_started::*;
use crate::TemplateData;
use crate::TemplateObjectId;
use crate::{hierarchy_builder::WorkspaceViewBuilder, WorkspaceTemplate};
use std::collections::HashMap;
use std::sync::Arc;

use collab::lock::RwLock;
use collab::preclude::uuid_v4;
use collab_database::database::DatabaseData;
use collab_database::entity::CreateDatabaseParams;
use collab_document::document_data::generate_id;
use collab_entity::CollabType;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

use crate::document::getting_started::*;
use crate::TemplateData;
use crate::TemplateObjectId;
use crate::{hierarchy_builder::WorkspaceViewBuilder, WorkspaceTemplate};

#[cfg(test)]
mod tests {
use super::*;
use collab_database::database::gen_database_view_id;

use super::*;

#[tokio::test]
async fn create_document_from_desktop_guide_json_test() {
let json_str = include_str!("../../assets/desktop_guide.json");
Expand Down
6 changes: 3 additions & 3 deletions services/appflowy-collaborate/src/collab/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context};
use collab::lock::Mutex;
use collab_entity::CollabType;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tokio::sync::Mutex;
use tokio::time::{interval, sleep, sleep_until, Instant};
use tracing::{error, instrument, trace, warn};

use crate::collab::cache::CollabCache;
use app_error::AppError;
use database_entity::dto::{AFCollabEmbeddings, CollabParams, QueryCollab, QueryCollabResult};

use crate::collab::cache::CollabCache;
use crate::collab::queue_redis_ops::{
get_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, WritePriority,
PENDING_WRITE_META_EXPIRE_SECS,
Expand Down Expand Up @@ -52,7 +52,7 @@ impl StorageQueue {
queue_name: &str,
metrics: Option<Arc<CollabMetrics>>,
) -> Self {
let next_duration = Arc::new(Mutex::new(Duration::from_secs(1)));
let next_duration = Arc::new(Mutex::from(Duration::from_secs(1)));
let pending_id_counter = Arc::new(AtomicI64::new(0));
let pending_write_set = Arc::new(RedisSortedSet::new(connection_manager.clone(), queue_name));

Expand Down
Loading

0 comments on commit 2af1999

Please sign in to comment.