From 2af1999375ca780c721e9ad8229359ffa8fb9f95 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Thu, 29 Aug 2024 10:13:27 +0200 Subject: [PATCH] chore: added locks with timeouts (#765) * chore: added locks with timeouts * chore: roll back collab locks in client api --- Cargo.lock | 12 +-- Cargo.toml | 12 +-- libs/client-api-test/src/test_client.rs | 8 +- .../client-api/src/collab_sync/collab_sink.rs | 25 +++--- .../src/collab_sync/collab_stream.rs | 2 +- libs/client-api/src/ws/client.rs | 26 +++--- libs/client-api/src/ws/msg_queue.rs | 11 ++- libs/collab-rt-protocol/src/protocol.rs | 2 +- .../src/document/getting_started.rs | 3 +- libs/workspace-template/src/lib.rs | 4 +- .../src/tests/getting_started_tests.rs | 19 +++-- .../appflowy-collaborate/src/collab/queue.rs | 6 +- .../src/group/broadcast.rs | 19 ++--- .../src/group/group_init.rs | 3 +- .../appflowy-collaborate/src/group/manager.rs | 7 +- .../src/group/persistence.rs | 3 +- .../src/group/plugin/history_plugin.rs | 2 +- .../src/snapshot/cache.rs | 11 ++- .../src/snapshot/snapshot_control.rs | 39 ++++----- services/appflowy-history/src/biz/history.rs | 2 +- services/appflowy-history/src/biz/snapshot.rs | 2 +- .../appflowy-history/src/core/open_handle.rs | 2 +- src/application.rs | 80 +++++++++---------- src/state.rs | 6 +- tests/collab/storage_test.rs | 2 +- 25 files changed, 161 insertions(+), 147 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d5a8f416c..476288766 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2086,7 +2086,7 @@ dependencies = [ [[package]] name = "collab" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4" dependencies = [ "anyhow", "arc-swap", @@ -2111,7 +2111,7 @@ dependencies = [ [[package]] name = "collab-database" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4" dependencies = [ "anyhow", "async-trait", @@ -2140,7 +2140,7 @@ dependencies = [ [[package]] name = "collab-document" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4" dependencies = [ "anyhow", "arc-swap", @@ -2160,7 +2160,7 @@ dependencies = [ [[package]] name = "collab-entity" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4" dependencies = [ "anyhow", "bytes", @@ -2179,7 +2179,7 @@ dependencies = [ [[package]] name = "collab-folder" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4" dependencies = [ "anyhow", "arc-swap", @@ -2263,7 +2263,7 @@ dependencies = [ [[package]] name = "collab-user" version = "0.2.0" -source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=af8ffdab672e4eccfc426525dcf8daa42cbe7087#af8ffdab672e4eccfc426525dcf8daa42cbe7087" +source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=b864751dc54c6873d72b368e5cff5e72d9f54da4#b864751dc54c6873d72b368e5cff5e72d9f54da4" dependencies = [ "anyhow", "collab", diff --git a/Cargo.toml b/Cargo.toml index 7fc4ff90e..d89c3cc39 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -285,12 +285,12 @@ debug = true [patch.crates-io] # It's diffcult to resovle different version with the same crate used in AppFlowy Frontend and the Client-API crate. # So using patch to workaround this issue. -collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" } -collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" } -collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" } -collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" } -collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" } -collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "af8ffdab672e4eccfc426525dcf8daa42cbe7087" } +collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" } +collab-entity = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" } +collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" } +collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" } +collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" } +collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "b864751dc54c6873d72b368e5cff5e72d9f54da4" } [features] history = [] diff --git a/libs/client-api-test/src/test_client.rs b/libs/client-api-test/src/test_client.rs index abd84d53a..2a4b674cb 100644 --- a/libs/client-api-test/src/test_client.rs +++ b/libs/client-api-test/src/test_client.rs @@ -13,6 +13,7 @@ use collab::core::collab::DataSource; use collab::core::collab_state::SyncState; use collab::core::origin::{CollabClient, CollabOrigin}; use collab::entity::EncodedCollab; +use collab::lock::{Mutex, RwLock}; use collab::preclude::{Collab, Prelim}; use collab_entity::CollabType; use collab_folder::Folder; @@ -20,7 +21,6 @@ use collab_user::core::UserAwareness; use mime::Mime; use serde::Deserialize; use serde_json::{json, Value}; -use tokio::sync::{Mutex, RwLock}; use tokio::time::{sleep, timeout, Duration}; use tokio_stream::StreamExt; use tracing::trace; @@ -583,7 +583,7 @@ impl TestClient { .await .unwrap(); - let collab = Arc::new(RwLock::new(collab)) as CollabRef; + let collab = Arc::new(RwLock::from(collab)) as CollabRef; #[cfg(feature = "collab-sync")] { let handler = self @@ -655,7 +655,7 @@ impl TestClient { ) .unwrap(); collab.emit_awareness_state(); - let collab = Arc::new(RwLock::new(collab)) as CollabRef; + let collab = Arc::new(RwLock::from(collab)) as CollabRef; #[cfg(feature = "collab-sync")] { @@ -818,7 +818,7 @@ pub async fn assert_server_collab( let duration = Duration::from_secs(timeout_secs); let collab_type = collab_type.clone(); let object_id = object_id.to_string(); - let final_json = Arc::new(Mutex::new(json!({}))); + let final_json = Arc::new(Mutex::from(json!({}))); // Use tokio::time::timeout to apply a timeout to the entire operation let cloned_final_json = final_json.clone(); diff --git a/libs/client-api/src/collab_sync/collab_sink.rs b/libs/client-api/src/collab_sync/collab_sink.rs index c26fc75a3..ceb83f911 100644 --- a/libs/client-api/src/collab_sync/collab_sink.rs +++ b/libs/client-api/src/collab_sync/collab_sink.rs @@ -1,21 +1,24 @@ -use crate::af_spawn; -use crate::collab_sync::collab_stream::SeqNumCounter; - -use crate::collab_sync::{SinkConfig, SyncError, SyncObject}; -use anyhow::Error; -use collab::core::origin::{CollabClient, CollabOrigin}; -use collab_rt_entity::{ClientCollabMessage, MsgId, ServerCollabMessage, SinkMessage}; -use futures_util::SinkExt; use std::collections::BinaryHeap; use std::collections::{HashMap, HashSet}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; -use tokio::sync::{broadcast, watch, Mutex}; + +use anyhow::Error; +use collab::core::origin::{CollabClient, CollabOrigin}; +use collab::lock::Mutex; +use futures_util::SinkExt; +use tokio::sync::{broadcast, watch}; use tokio::time::{interval, sleep}; use tracing::{error, trace, warn}; +use collab_rt_entity::{ClientCollabMessage, MsgId, ServerCollabMessage, SinkMessage}; + +use crate::af_spawn; +use crate::collab_sync::collab_stream::SeqNumCounter; +use crate::collab_sync::{SinkConfig, SyncError, SyncObject}; + pub(crate) const SEND_INTERVAL: Duration = Duration::from_secs(8); pub const COLLAB_SINK_DELAY_MILLIS: u64 = 500; @@ -63,7 +66,7 @@ where config: SinkConfig, ) -> Self { let notifier = Arc::new(notifier); - let sender = Arc::new(Mutex::new(sink)); + let sender = Arc::new(Mutex::from(sink)); let message_queue = Arc::new(parking_lot::Mutex::new(SinkQueue::new())); let sending_messages = Arc::new(parking_lot::Mutex::new(HashSet::new())); let state = Arc::new(CollabSinkState::new()); @@ -513,7 +516,7 @@ impl SyncTimestamp { fn new() -> Self { let now = Instant::now(); SyncTimestamp { - last_sync: Mutex::new(now.checked_sub(Duration::from_secs(60)).unwrap_or(now)), + last_sync: Mutex::from(now.checked_sub(Duration::from_secs(60)).unwrap_or(now)), } } diff --git a/libs/client-api/src/collab_sync/collab_stream.rs b/libs/client-api/src/collab_sync/collab_stream.rs index c7f629940..fdfadf376 100644 --- a/libs/client-api/src/collab_sync/collab_stream.rs +++ b/libs/client-api/src/collab_sync/collab_stream.rs @@ -6,10 +6,10 @@ use std::time::Duration; use arc_swap::ArcSwap; use collab::core::origin::CollabOrigin; +use collab::lock::RwLock; use collab::preclude::Collab; use futures_util::{SinkExt, StreamExt}; use tokio::select; -use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use tracing::{error, instrument, trace, warn}; use yrs::encoding::read::Cursor; diff --git a/libs/client-api/src/ws/client.rs b/libs/client-api/src/ws/client.rs index 2e1d4c188..2fa55dab0 100644 --- a/libs/client-api/src/ws/client.rs +++ b/libs/client-api/src/ws/client.rs @@ -1,27 +1,29 @@ -use futures_util::{SinkExt, StreamExt}; -use parking_lot::RwLock; use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Display; +use std::sync::{Arc, Weak}; +use std::time::Duration; use futures_util::stream::{SplitSink, SplitStream}; +use futures_util::{SinkExt, StreamExt}; +use parking_lot::RwLock; use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; use semver::Version; -use std::sync::{Arc, Weak}; -use std::time::Duration; use tokio::sync::broadcast::{channel, Receiver, Sender}; +use tokio::sync::oneshot; +use tokio::sync::Mutex; +use tracing::{error, info, trace, warn}; -use crate::ws::msg_queue::{AggregateMessageQueue, AggregateMessagesReceiver}; -use crate::ws::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel}; -use crate::ServerFixIntervalPing; -use crate::{af_spawn, retry_connect}; use client_websocket::{CloseCode, CloseFrame, Message, WebSocketStream}; use collab_rt_entity::user::UserMessage; use collab_rt_entity::ClientCollabMessage; use collab_rt_entity::ServerCollabMessage; use collab_rt_entity::{RealtimeMessage, SystemMessage}; -use tokio::sync::{oneshot, Mutex}; -use tracing::{error, info, trace, warn}; + +use crate::ws::msg_queue::{AggregateMessageQueue, AggregateMessagesReceiver}; +use crate::ws::{ConnectState, ConnectStateNotify, WSError, WebSocketChannel}; +use crate::ServerFixIntervalPing; +use crate::{af_spawn, retry_connect}; pub struct WSClientConfig { /// specifies the number of messages that the channel can hold at any given @@ -91,7 +93,7 @@ impl WSClient { let (ws_msg_sender, _) = channel(config.buffer_capacity); let state_notify = Arc::new(parking_lot::Mutex::new(ConnectStateNotify::new())); let channels = Arc::new(RwLock::new(HashMap::new())); - let ping = Arc::new(Mutex::new(None)); + let ping = Arc::new(Mutex::from(None)); let http_sender = Arc::new(http_sender); let (user_channel, _) = channel(1); let (rt_msg_sender, _) = channel(config.buffer_capacity); @@ -106,7 +108,7 @@ impl WSClient { user_channel: Arc::new(user_channel), channels, ping, - stop_ws_msg_loop_tx: Mutex::new(None), + stop_ws_msg_loop_tx: Mutex::from(None), aggregate_queue, #[cfg(debug_assertions)] diff --git a/libs/client-api/src/ws/msg_queue.rs b/libs/client-api/src/ws/msg_queue.rs index 7394589fc..6601c741c 100644 --- a/libs/client-api/src/ws/msg_queue.rs +++ b/libs/client-api/src/ws/msg_queue.rs @@ -1,13 +1,16 @@ -use client_websocket::Message; -use collab_rt_entity::RealtimeMessage; -use collab_rt_entity::{ClientCollabMessage, MsgId}; use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::{Arc, Weak}; use std::time::Duration; -use tokio::sync::{mpsc, Mutex}; + +use tokio::sync::mpsc; +use tokio::sync::Mutex; use tokio::time::{sleep_until, Instant}; use tracing::{error, trace}; +use client_websocket::Message; +use collab_rt_entity::RealtimeMessage; +use collab_rt_entity::{ClientCollabMessage, MsgId}; + pub type AggregateMessagesSender = mpsc::Sender; pub type AggregateMessagesReceiver = mpsc::Receiver; diff --git a/libs/collab-rt-protocol/src/protocol.rs b/libs/collab-rt-protocol/src/protocol.rs index b8ff0738a..7ae934216 100644 --- a/libs/collab-rt-protocol/src/protocol.rs +++ b/libs/collab-rt-protocol/src/protocol.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use collab::core::awareness::{Awareness, AwarenessUpdate}; use collab::core::collab::{TransactionExt, TransactionMutExt}; use collab::core::origin::CollabOrigin; +use collab::lock::RwLock; use collab::preclude::Collab; -use tokio::sync::RwLock; use yrs::updates::decoder::Decode; use yrs::updates::encoder::{Encode, Encoder}; use yrs::{ReadTxn, StateVector, Transact, Update}; diff --git a/libs/workspace-template/src/document/getting_started.rs b/libs/workspace-template/src/document/getting_started.rs index 0140e9def..e368b6465 100644 --- a/libs/workspace-template/src/document/getting_started.rs +++ b/libs/workspace-template/src/document/getting_started.rs @@ -1,10 +1,10 @@ use std::collections::HashMap; - use std::sync::Arc; use anyhow::Error; use async_trait::async_trait; use collab::core::origin::CollabOrigin; +use collab::lock::RwLock; use collab::preclude::Collab; use collab_database::database::{timestamp, DatabaseData}; use collab_database::entity::CreateDatabaseParams; @@ -13,7 +13,6 @@ use collab_document::document::Document; use collab_entity::CollabType; use collab_folder::ViewLayout; use serde_json::Value; -use tokio::sync::RwLock; use crate::database::database_collab::create_database_collab; use crate::document::parser::JsonToDocumentParser; diff --git a/libs/workspace-template/src/lib.rs b/libs/workspace-template/src/lib.rs index d6eba8578..135074e08 100644 --- a/libs/workspace-template/src/lib.rs +++ b/libs/workspace-template/src/lib.rs @@ -5,12 +5,12 @@ pub use anyhow::Result; use async_trait::async_trait; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; +use collab::lock::RwLock; use collab::preclude::Collab; use collab_entity::CollabType; use collab_folder::{ timestamp, Folder, FolderData, RepeatedViewIdentifier, ViewIdentifier, ViewLayout, Workspace, }; -use tokio::sync::RwLock; use crate::hierarchy_builder::{FlattedViews, WorkspaceViewBuilder}; @@ -91,7 +91,7 @@ impl WorkspaceTemplateBuilder { } pub async fn build(&self) -> Result> { - let workspace_view_builder = Arc::new(RwLock::new(WorkspaceViewBuilder::new( + let workspace_view_builder = Arc::new(RwLock::from(WorkspaceViewBuilder::new( self.workspace_id.clone(), self.uid, ))); diff --git a/libs/workspace-template/src/tests/getting_started_tests.rs b/libs/workspace-template/src/tests/getting_started_tests.rs index 5d8456581..1a83f8ae4 100644 --- a/libs/workspace-template/src/tests/getting_started_tests.rs +++ b/libs/workspace-template/src/tests/getting_started_tests.rs @@ -1,22 +1,25 @@ -use crate::document::getting_started::*; -use crate::TemplateData; -use crate::TemplateObjectId; -use crate::{hierarchy_builder::WorkspaceViewBuilder, WorkspaceTemplate}; +use std::collections::HashMap; +use std::sync::Arc; + +use collab::lock::RwLock; use collab::preclude::uuid_v4; use collab_database::database::DatabaseData; use collab_database::entity::CreateDatabaseParams; use collab_document::document_data::generate_id; use collab_entity::CollabType; use serde_json::json; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::RwLock; + +use crate::document::getting_started::*; +use crate::TemplateData; +use crate::TemplateObjectId; +use crate::{hierarchy_builder::WorkspaceViewBuilder, WorkspaceTemplate}; #[cfg(test)] mod tests { - use super::*; use collab_database::database::gen_database_view_id; + use super::*; + #[tokio::test] async fn create_document_from_desktop_guide_json_test() { let json_str = include_str!("../../assets/desktop_guide.json"); diff --git a/services/appflowy-collaborate/src/collab/queue.rs b/services/appflowy-collaborate/src/collab/queue.rs index b2089c6a1..99958aaf0 100644 --- a/services/appflowy-collaborate/src/collab/queue.rs +++ b/services/appflowy-collaborate/src/collab/queue.rs @@ -5,17 +5,17 @@ use std::sync::Arc; use std::time::Duration; use anyhow::{anyhow, Context}; +use collab::lock::Mutex; use collab_entity::CollabType; use serde::{Deserialize, Serialize}; use sqlx::PgPool; -use tokio::sync::Mutex; use tokio::time::{interval, sleep, sleep_until, Instant}; use tracing::{error, instrument, trace, warn}; -use crate::collab::cache::CollabCache; use app_error::AppError; use database_entity::dto::{AFCollabEmbeddings, CollabParams, QueryCollab, QueryCollabResult}; +use crate::collab::cache::CollabCache; use crate::collab::queue_redis_ops::{ get_pending_meta, remove_pending_meta, storage_cache_key, PendingWrite, WritePriority, PENDING_WRITE_META_EXPIRE_SECS, @@ -52,7 +52,7 @@ impl StorageQueue { queue_name: &str, metrics: Option>, ) -> Self { - let next_duration = Arc::new(Mutex::new(Duration::from_secs(1))); + let next_duration = Arc::new(Mutex::from(Duration::from_secs(1))); let pending_id_counter = Arc::new(AtomicI64::new(0)); let pending_write_set = Arc::new(RedisSortedSet::new(connection_manager.clone(), queue_name)); diff --git a/services/appflowy-collaborate/src/group/broadcast.rs b/services/appflowy-collaborate/src/group/broadcast.rs index 8b4b379fc..a63ac5102 100644 --- a/services/appflowy-collaborate/src/group/broadcast.rs +++ b/services/appflowy-collaborate/src/group/broadcast.rs @@ -5,19 +5,11 @@ use std::sync::{Arc, Weak}; use anyhow::anyhow; use bytes::Bytes; use collab::core::origin::CollabOrigin; +use collab::lock::RwLock; use collab::preclude::Collab; -use collab_rt_entity::user::RealtimeUser; -use collab_rt_entity::MessageByObjectId; -use collab_rt_entity::{AckCode, MsgId}; -use collab_rt_entity::{ - AwarenessSync, BroadcastSync, ClientCollabMessage, CollabAck, CollabMessage, -}; -use collab_rt_protocol::{handle_message_follow_protocol, RTProtocolError, SyncMessage}; -use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; use futures_util::{SinkExt, StreamExt}; use tokio::select; use tokio::sync::broadcast::{channel, Sender}; -use tokio::sync::RwLock; use tokio::time::Instant; use tracing::{error, trace, warn}; use yrs::encoding::write::Write; @@ -25,6 +17,15 @@ use yrs::updates::decoder::DecoderV1; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; use yrs::Subscription as YrsSubscription; +use collab_rt_entity::user::RealtimeUser; +use collab_rt_entity::MessageByObjectId; +use collab_rt_entity::{AckCode, MsgId}; +use collab_rt_entity::{ + AwarenessSync, BroadcastSync, ClientCollabMessage, CollabAck, CollabMessage, +}; +use collab_rt_protocol::{handle_message_follow_protocol, RTProtocolError, SyncMessage}; +use collab_rt_protocol::{Message, MessageReader, MSG_SYNC, MSG_SYNC_UPDATE}; + use crate::error::RealtimeError; use crate::group::group_init::EditState; use crate::group::protocol::ServerSyncProtocol; diff --git a/services/appflowy-collaborate/src/group/group_init.rs b/services/appflowy-collaborate/src/group/group_init.rs index d759e2347..d5b669d55 100644 --- a/services/appflowy-collaborate/src/group/group_init.rs +++ b/services/appflowy-collaborate/src/group/group_init.rs @@ -6,11 +6,12 @@ use std::time::Duration; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; +use collab::lock::RwLock; use collab::preclude::Collab; use collab_entity::CollabType; use dashmap::DashMap; use futures_util::{SinkExt, StreamExt}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::mpsc; use tracing::{error, event, info, trace}; use yrs::updates::decoder::Decode; use yrs::updates::encoder::Encode; diff --git a/services/appflowy-collaborate/src/group/manager.rs b/services/appflowy-collaborate/src/group/manager.rs index caff78786..a2a81577c 100644 --- a/services/appflowy-collaborate/src/group/manager.rs +++ b/services/appflowy-collaborate/src/group/manager.rs @@ -4,9 +4,9 @@ use std::time::Duration; use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::entity::EncodedCollab; +use collab::lock::{Mutex, RwLock}; use collab::preclude::Collab; use collab_entity::CollabType; -use tokio::sync::{Mutex, RwLock}; use tracing::{error, instrument, trace}; use access_control::collab::RealtimeAccessControl; @@ -22,7 +22,6 @@ use database_entity::dto::QueryCollabParams; use crate::client::client_msg_router::ClientMessageRouter; use crate::error::{CreateGroupFailedReason, RealtimeError}; use crate::group::group_init::CollabGroup; - use crate::group::state::GroupManagementState; use crate::indexer::IndexerProvider; use crate::metrics::CollabMetricsCalculate; @@ -61,7 +60,7 @@ where .collab_control_stream(CONTROL_STREAM_KEY, "collaboration") .await .map_err(|err| RealtimeError::Internal(err.into()))?; - let control_event_stream = Arc::new(Mutex::new(control_event_stream)); + let control_event_stream = Arc::new(Mutex::from(control_event_stream)); Ok(Self { state: GroupManagementState::new(metrics_calculate.clone()), storage, @@ -195,7 +194,7 @@ where }; collab.initialize(); - let collab = Arc::new(RwLock::new(collab)); + let collab = Arc::new(RwLock::from(collab)); (collab, encode_collab) }; diff --git a/services/appflowy-collaborate/src/group/persistence.rs b/services/appflowy-collaborate/src/group/persistence.rs index 8fd185b80..083d1001d 100644 --- a/services/appflowy-collaborate/src/group/persistence.rs +++ b/services/appflowy-collaborate/src/group/persistence.rs @@ -2,9 +2,10 @@ use std::sync::{Arc, Weak}; use std::time::Duration; use anyhow::anyhow; +use collab::lock::RwLock; use collab::preclude::Collab; use collab_entity::{validate_data_for_folder, CollabType}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::mpsc; use tokio::time::interval; use tracing::{trace, warn}; diff --git a/services/appflowy-collaborate/src/group/plugin/history_plugin.rs b/services/appflowy-collaborate/src/group/plugin/history_plugin.rs index 1ec6d6ff0..0ac505ede 100644 --- a/services/appflowy-collaborate/src/group/plugin/history_plugin.rs +++ b/services/appflowy-collaborate/src/group/plugin/history_plugin.rs @@ -1,9 +1,9 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::{Arc, Weak}; +use collab::lock::RwLock; use collab::preclude::{Collab, CollabPlugin}; use collab_entity::CollabType; -use tokio::sync::RwLock; use tokio::time::sleep; use tracing::{error, trace}; use yrs::TransactionMut; diff --git a/services/appflowy-collaborate/src/snapshot/cache.rs b/services/appflowy-collaborate/src/snapshot/cache.rs index 9227e024a..acb0c8ee2 100644 --- a/services/appflowy-collaborate/src/snapshot/cache.rs +++ b/services/appflowy-collaborate/src/snapshot/cache.rs @@ -1,9 +1,12 @@ -use crate::state::RedisConnectionManager; +use std::sync::Arc; + use anyhow::anyhow; -use app_error::AppError; +use collab::lock::Mutex; use redis::AsyncCommands; -use std::sync::Arc; -use tokio::sync::Mutex; + +use app_error::AppError; + +use crate::state::RedisConnectionManager; #[derive(Clone)] pub(crate) struct SnapshotCache { diff --git a/services/appflowy-collaborate/src/snapshot/snapshot_control.rs b/services/appflowy-collaborate/src/snapshot/snapshot_control.rs index a6f8333e3..118c36b59 100644 --- a/services/appflowy-collaborate/src/snapshot/snapshot_control.rs +++ b/services/appflowy-collaborate/src/snapshot/snapshot_control.rs @@ -1,28 +1,29 @@ -use crate::metrics::CollabMetrics; -use crate::snapshot::cache::SnapshotCache; -use crate::snapshot::queue::PendingQueue; -use crate::state::RedisConnectionManager; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + use anyhow::anyhow; -use app_error::AppError; use async_stream::stream; +use chrono::{DateTime, Utc}; +use collab::lock::{Mutex, RwLock}; +use futures_util::StreamExt; +use sqlx::PgPool; +use tokio::time::interval; +use tracing::{debug, error, trace, warn}; +use validator::Validate; + +use app_error::AppError; +use collab_rt_protocol::validate_encode_collab; use database::collab::{ create_snapshot_and_maintain_limit, get_all_collab_snapshot_meta, latest_snapshot_time, select_snapshot, AppResult, COLLAB_SNAPSHOT_LIMIT, SNAPSHOT_PER_HOUR, }; use database_entity::dto::{AFSnapshotMeta, AFSnapshotMetas, InsertSnapshotParams, SnapshotData}; -use futures_util::StreamExt; - -use chrono::{DateTime, Utc}; -use collab_rt_protocol::validate_encode_collab; -use sqlx::PgPool; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{Mutex, RwLock}; -use tokio::time::interval; -use tracing::{debug, error, trace, warn}; -use validator::Validate; +use crate::metrics::CollabMetrics; +use crate::snapshot::cache::SnapshotCache; +use crate::snapshot::queue::PendingQueue; +use crate::state::RedisConnectionManager; pub type SnapshotCommandReceiver = tokio::sync::mpsc::Receiver; pub type SnapshotCommandSender = tokio::sync::mpsc::Sender; @@ -48,7 +49,7 @@ impl SnapshotControl { pg_pool: PgPool, collab_metrics: Arc, ) -> Self { - let redis_client = Arc::new(Mutex::new(redis_client)); + let redis_client = Arc::new(Mutex::from(redis_client)); let (command_sender, rx) = tokio::sync::mpsc::channel(2000); let cache = SnapshotCache::new(redis_client); @@ -199,7 +200,7 @@ impl SnapshotCommandRunner { let queue = PendingQueue::new(); Self { pg_pool, - queue: RwLock::new(queue), + queue: RwLock::from(queue), cache, recv: Some(recv), success_attempts: Default::default(), diff --git a/services/appflowy-history/src/biz/history.rs b/services/appflowy-history/src/biz/history.rs index a29dd496b..167f4e05d 100644 --- a/services/appflowy-history/src/biz/history.rs +++ b/services/appflowy-history/src/biz/history.rs @@ -1,11 +1,11 @@ use std::sync::Arc; +use collab::lock::RwLock; use collab::preclude::updates::encoder::{Encoder, EncoderV2}; use collab::preclude::{Collab, CollabPlugin, ReadTxn, Snapshot, StateVector, TransactionMut}; use collab_entity::CollabType; use serde_json::Value; use sqlx::PgPool; -use tokio::sync::RwLock; use tracing::trace; use database::history::ops::get_snapshot_meta_list; diff --git a/services/appflowy-history/src/biz/snapshot.rs b/services/appflowy-history/src/biz/snapshot.rs index e030e282f..cc89b8a81 100644 --- a/services/appflowy-history/src/biz/snapshot.rs +++ b/services/appflowy-history/src/biz/snapshot.rs @@ -2,10 +2,10 @@ use std::ops::Deref; use std::sync::atomic::AtomicU32; use std::sync::{Arc, Weak}; +use collab::lock::{Mutex, RwLock}; use collab::preclude::updates::encoder::Encode; use collab::preclude::{Collab, ReadTxn, Snapshot, StateVector}; use collab_entity::CollabType; -use tokio::sync::{Mutex, RwLock}; use tracing::{trace, warn}; use tonic_proto::history::SnapshotMetaPb; diff --git a/services/appflowy-history/src/core/open_handle.rs b/services/appflowy-history/src/core/open_handle.rs index 7f0fa7bd6..6451b1429 100644 --- a/services/appflowy-history/src/core/open_handle.rs +++ b/services/appflowy-history/src/core/open_handle.rs @@ -4,10 +4,10 @@ use std::time::Duration; use collab::core::collab::DataSource; use collab::core::origin::CollabOrigin; use collab::error::CollabError; +use collab::lock::RwLock; use collab::preclude::updates::decoder::Decode; use collab::preclude::{Collab, Update}; use collab_entity::CollabType; -use tokio::sync::RwLock; use tokio::time::interval; use tracing::{error, trace}; diff --git a/src/application.rs b/src/application.rs index c4e85215d..6086c9c88 100644 --- a/src/application.rs +++ b/src/application.rs @@ -1,27 +1,8 @@ -use crate::api::metrics::metrics_scope; - -use crate::api::file_storage::file_storage_scope; -use crate::api::template::template_scope; -use crate::api::user::user_scope; -use crate::api::workspace::{collab_scope, workspace_scope}; -use crate::api::ws::ws_scope; -use crate::mailer::Mailer; -use access_control::access::{enable_access_control, AccessControl}; -use gotrue::grant::{Grant, PasswordGrant}; +use std::net::TcpListener; +use std::sync::Arc; +use std::time::Duration; -use crate::api::chat::chat_scope; -use crate::api::history::history_scope; -use crate::biz::collab::access_control::CollabMiddlewareAccessControl; -use crate::biz::pg_listener::PgListeners; -use crate::biz::workspace::access_control::WorkspaceMiddlewareAccessControl; -use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, S3Setting}; -use crate::middleware::access_control_mw::MiddlewareAccessControlTransform; -use crate::middleware::metrics_mw::MetricsMiddleware; -use crate::middleware::request_id::RequestIdMiddleware; -use crate::self_signed::create_self_signed_certificate; -use crate::state::{AppMetrics, AppState, GoTrueAdmin, UserCache}; use actix::Supervisor; - use actix_identity::IdentityMiddleware; use actix_session::storage::RedisSessionStore; use actix_session::SessionMiddleware; @@ -29,6 +10,20 @@ use actix_web::cookie::Key; use actix_web::middleware::NormalizePath; use actix_web::{dev::Server, web::Data, App, HttpServer}; use anyhow::{Context, Error}; +use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider}; +use aws_sdk_s3::operation::create_bucket::CreateBucketError; +use aws_sdk_s3::types::{ + BucketInfo, BucketLocationConstraint, BucketType, CreateBucketConfiguration, +}; +use collab::lock::Mutex; +use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod}; +use openssl::x509::X509; +use secrecy::{ExposeSecret, Secret}; +use sqlx::{postgres::PgPoolOptions, PgPool}; +use tokio::sync::RwLock; +use tracing::{error, info, warn}; + +use access_control::access::{enable_access_control, AccessControl}; use appflowy_ai_client::client::AppFlowyAIClient; use appflowy_collaborate::actix_ws::server::RealtimeServerActor; use appflowy_collaborate::collab::access_control::{ @@ -37,33 +32,36 @@ use appflowy_collaborate::collab::access_control::{ use appflowy_collaborate::collab::cache::CollabCache; use appflowy_collaborate::collab::storage::CollabStorageImpl; use appflowy_collaborate::command::{CLCommandReceiver, CLCommandSender}; +use appflowy_collaborate::indexer::IndexerProvider; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::snapshot::SnapshotControl; use appflowy_collaborate::CollaborationServer; - -use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider}; -use aws_sdk_s3::operation::create_bucket::CreateBucketError; -use aws_sdk_s3::types::{ - BucketInfo, BucketLocationConstraint, BucketType, CreateBucketConfiguration, -}; -use openssl::ssl::{SslAcceptor, SslAcceptorBuilder, SslFiletype, SslMethod}; -use openssl::x509::X509; -use secrecy::{ExposeSecret, Secret}; +use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; +use gotrue::grant::{Grant, PasswordGrant}; use snowflake::Snowflake; -use sqlx::{postgres::PgPoolOptions, PgPool}; -use std::net::TcpListener; - -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{Mutex, RwLock}; use tonic_proto::history::history_client::HistoryClient; +use workspace_access::WorkspaceAccessControlImpl; use crate::api::ai::ai_completion_scope; +use crate::api::chat::chat_scope; +use crate::api::file_storage::file_storage_scope; +use crate::api::history::history_scope; +use crate::api::metrics::metrics_scope; use crate::api::search::search_scope; -use appflowy_collaborate::indexer::IndexerProvider; -use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; -use tracing::{error, info, warn}; -use workspace_access::WorkspaceAccessControlImpl; +use crate::api::template::template_scope; +use crate::api::user::user_scope; +use crate::api::workspace::{collab_scope, workspace_scope}; +use crate::api::ws::ws_scope; +use crate::biz::collab::access_control::CollabMiddlewareAccessControl; +use crate::biz::pg_listener::PgListeners; +use crate::biz::workspace::access_control::WorkspaceMiddlewareAccessControl; +use crate::config::config::{Config, DatabaseSetting, GoTrueSetting, S3Setting}; +use crate::mailer::Mailer; +use crate::middleware::access_control_mw::MiddlewareAccessControlTransform; +use crate::middleware::metrics_mw::MetricsMiddleware; +use crate::middleware::request_id::RequestIdMiddleware; +use crate::self_signed::create_self_signed_certificate; +use crate::state::{AppMetrics, AppState, GoTrueAdmin, UserCache}; pub struct Application { port: u16, diff --git a/src/state.rs b/src/state.rs index e56da8b8c..45bb34a72 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,16 +1,17 @@ use std::sync::Arc; -use appflowy_ai_client::client::AppFlowyAIClient; +use collab::lock::Mutex; use dashmap::DashMap; use secrecy::{ExposeSecret, Secret}; use sqlx::PgPool; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; use tokio_stream::StreamExt; use uuid::Uuid; use access_control::access::AccessControl; use access_control::metrics::AccessControlMetrics; use app_error::AppError; +use appflowy_ai_client::client::AppFlowyAIClient; use appflowy_collaborate::collab::access_control::CollabAccessControlImpl; use appflowy_collaborate::collab::cache::CollabCache; use appflowy_collaborate::collab::storage::CollabAccessControlStorage; @@ -19,7 +20,6 @@ use appflowy_collaborate::metrics::CollabMetrics; use appflowy_collaborate::shared_state::RealtimeSharedState; use appflowy_collaborate::CollabRealtimeMetrics; use database::file::s3_client_impl::{AwsS3BucketClientImpl, S3BucketStorage}; - use database::user::{select_all_uid_uuid, select_uid_from_uuid}; use gotrue::grant::{Grant, PasswordGrant}; use snowflake::Snowflake; diff --git a/tests/collab/storage_test.rs b/tests/collab/storage_test.rs index 38dc1db4b..f2f59be1e 100644 --- a/tests/collab/storage_test.rs +++ b/tests/collab/storage_test.rs @@ -4,11 +4,11 @@ use std::time::Duration; use collab::core::transaction::DocTransactionExtension; use collab::entity::EncodedCollab; +use collab::lock::Mutex; use collab::preclude::{Doc, Transact}; use collab_entity::CollabType; use sqlx::types::Uuid; use sqlx::PgPool; -use tokio::sync::Mutex; use tokio::time::sleep; use app_error::ErrorCode;