Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

actix-ws: use BigBytes to avoid copying ws messages between various buffers #480

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,9 @@ actix-web-httpauth = { path = "./actix-web-httpauth" }
# uncomment to quickly test against local actix-web repo
# actix-http = { path = "../actix-web/actix-http" }
# actix-router = { path = "../actix-web/actix-router" }
# actix-web = { path = "../actix-web" }
# actix-web = { path = "../actix-web/actix-web" }
# awc = { path = "../actix-web/awc" }
actix-http = { git = "https://github.com/asonix/actix-web", branch = "asonix/play-with-h1-encoding" }
actix-router = { git = "https://github.com/asonix/actix-web", branch = "asonix/play-with-h1-encoding" }
actix-web = { git = "https://github.com/asonix/actix-web", branch = "asonix/play-with-h1-encoding" }
awc = { git = "https://github.com/asonix/actix-web", branch = "asonix/play-with-h1-encoding" }
4 changes: 2 additions & 2 deletions actix-ws/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn ws(

loop {
interval.tick().await;
if session2.ping(b"").await.is_err() {
if session2.ping(&b""[..]).await.is_err() {
break;
}

Expand All @@ -97,7 +97,7 @@ async fn ws(
while let Some(Ok(msg)) = stream.recv().await {
match msg {
AggregatedMessage::Ping(bytes) => {
if session.pong(&bytes).await.is_err() {
if session.pong(bytes).await.is_err() {
return;
}
}
Expand Down
2 changes: 1 addition & 1 deletion actix-ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use self::{
/// while let Some(Ok(msg)) = msg_stream.next().await {
/// match msg {
/// Message::Ping(bytes) => {
/// if session.pong(&bytes).await.is_err() {
/// if session.pong(bytes).await.is_err() {
/// return;
/// }
/// }
Expand Down
12 changes: 6 additions & 6 deletions actix-ws/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,16 @@ impl Session {
/// ```no_run
/// # use actix_ws::Session;
/// # async fn test(mut session: Session) {
/// if session.ping(b"").await.is_err() {
/// if session.ping(&b""[..]).await.is_err() {
/// // session is closed
/// }
/// # }
/// ```
pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
pub async fn ping(&mut self, msg: impl Into<Bytes>) -> Result<(), Closed> {
self.pre_check();
if let Some(inner) = self.inner.as_mut() {
inner
.send(Message::Ping(Bytes::copy_from_slice(msg)))
.send(Message::Ping(msg.into()))
.await
.map_err(|_| Closed)
} else {
Expand All @@ -122,16 +122,16 @@ impl Session {
/// # async fn test(mut session: Session, msg: Message) {
/// match msg {
/// Message::Ping(bytes) => {
/// let _ = session.pong(&bytes).await;
/// let _ = session.pong(bytes).await;
/// }
/// _ => (),
/// }
/// # }
pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
pub async fn pong(&mut self, msg: impl Into<Bytes>) -> Result<(), Closed> {
self.pre_check();
if let Some(inner) = self.inner.as_mut() {
inner
.send(Message::Pong(Bytes::copy_from_slice(msg)))
.send(Message::Pong(msg.into()))
.await
.map_err(|_| Closed)
} else {
Expand Down
35 changes: 17 additions & 18 deletions actix-ws/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{
collections::VecDeque,
future::poll_fn,
io, mem,
io,
pin::Pin,
task::{Context, Poll},
};

use actix_codec::{Decoder, Encoder};
use actix_codec::Decoder;
use actix_http::{
big_bytes::BigBytes,
ws::{Codec, Frame, Message, ProtocolError},
Payload,
};
Expand All @@ -24,8 +25,7 @@ use crate::AggregatedMessageStream;
/// Response body for a WebSocket.
pub struct StreamingBody {
session_rx: Receiver<Message>,
messages: VecDeque<Message>,
buf: BytesMut,
buf: BigBytes,
codec: Codec,
closing: bool,
}
Expand All @@ -34,8 +34,7 @@ impl StreamingBody {
pub(super) fn new(session_rx: Receiver<Message>) -> Self {
StreamingBody {
session_rx,
messages: VecDeque::new(),
buf: BytesMut::new(),
buf: BigBytes::with_capacity(0),
codec: Codec::new(),
closing: false,
}
Expand Down Expand Up @@ -118,14 +117,12 @@ impl Stream for StreamingBody {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

if this.closing {
return Poll::Ready(None);
}

loop {
while !this.closing {
match Pin::new(&mut this.session_rx).poll_recv(cx) {
Poll::Ready(Some(msg)) => {
this.messages.push_back(msg);
if let Err(err) = this.codec.encode_bigbytes(msg, &mut this.buf) {
return Poll::Ready(Some(Err(err.into())));
}
}
Poll::Ready(None) => {
this.closing = true;
Expand All @@ -135,16 +132,18 @@ impl Stream for StreamingBody {
}
}

while let Some(msg) = this.messages.pop_front() {
if let Err(err) = this.codec.encode(msg, &mut this.buf) {
return Poll::Ready(Some(Err(err.into())));
}
if let Some(bytes) = this.buf.pop_front() {
return Poll::Ready(Some(Ok(bytes)));
}

if !this.buf.is_empty() {
return Poll::Ready(Some(Ok(mem::take(&mut this.buf).freeze())));
if this.closing {
return Poll::Ready(None);
}

// When we have a moment (pending) allow the BigBytes to release memory
// arbitrary 8KB (page size)
this.buf.clear(1024 * 8);

Poll::Pending
}
}
Expand Down
Loading