Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encoder does not support intermediate flushes before the reader has ended #154

Open
Geal opened this issue Aug 24, 2022 · 11 comments · May be fixed by #178
Open

Encoder does not support intermediate flushes before the reader has ended #154

Geal opened this issue Aug 24, 2022 · 11 comments · May be fixed by #178

Comments

@Geal
Copy link

Geal commented Aug 24, 2022

disclaimer: this is about fixing apollographql/router#1572, for which I'm under tight timing constraints, so right now this issue's goal (and the related PR) is discussing and finding a quick fix that we can use directly in the router, and then I'll help find a proper solution.

I'm encountering an issue with the following setup:

  • I use axum with tower-http's CompressionLayer, which uses async-compression
  • the router sends a HTTP response with multipart
  • one multipart element is sent right away, the next one is sent after 5s
  • if the response is compressed, the router will wait for 5s to send both parts at the same time, while without compression, the first part comes immediately, and the second one after 5s

I tracked that down to async-compression, where in the tokio based Encoder, due to the use of ready!(), whenever the underlying reader returns Poll::Pending, it is transmitted directly to the caller, so there is no flush until the entire data has been read:

https://github.com/Nemo157/async-compression/blob/ada65c660bcea83dc6a0c3d6149e5fbcd039f739/src/tokio/bufread/generic/encoder.rs#L63-L74

I would like the encoder to send the data it already compressed when the reader returned Poll::Pending, and let the upper layer decide on buffering or sending the HTTP chunk directly.

@Nemo157
Copy link
Member

Nemo157 commented Aug 24, 2022

It sounds like there's 2 parts to this:

  1. similar to Decoder might block if there is data remaining to be flushed from the codec and the stream blocks #123 if we have data available from the encoder, but the input isn't ready, we won't send that data out (EDIT: this is less of an issue here since you don't really control when the encoder will have data available, whereas when decoding the other end may have purposefully synchronized at this point to ensure you will be able to read up to it)
  2. you also need to be able to force the encoder to flush any stored data at the end of your first multipart element

For the second part, there's no real way to indicate that from the BufRead. Flushing any time a Pending is received will reduce the encoding efficiency since the encoders generally build up state across multiple input chunks to reduce overhead, a flush forces it to output everything it has internally buffered so far.

This would be easy to do in an impl Write based API

response.write_all(first_element).await?;
response.flush().await?;
response.write_all(second_element).await?;

I'm not sure how to support it for BufRead.

@Geal
Copy link
Author

Geal commented Aug 24, 2022

right, BufRead does not really support such hints. I'm in a special case here, where I know exactly when I have enough data to send, and must send it as soon as possible, so I don't care much about compression efficiency, but that's not the general case. Although there should be a way to flush from time to time, otherwise we'll still have the entire compressed data stored in a buffer before sending, which defeats the purpose of a streaming API

@Nemo157
Copy link
Member

Nemo157 commented Aug 25, 2022

Maybe an option to enable this behavior would be the best approach fn flush_on_pending(&mut self, bool)

@Geal
Copy link
Author

Geal commented Aug 26, 2022

flushing on pending is only one case, we might want to produce data depending on other conditions, like:

  • there's enough data in the encoder to produce 1kB of data
  • no data was produced since 1ms

So maybe the right way would be to add a poll_flush method to Encoder and let the calling code decide if it should produce data.

@Nemo157
Copy link
Member

Nemo157 commented Aug 26, 2022

poll_flush seems plausible. Then if you want flush-on-pending you would just always call poll_flush instead of poll_read (I imagine poll_flush is identical to poll_read but flushes on pending); for other usecases you would switch between them as needed.

@Geal
Copy link
Author

Geal commented Oct 12, 2022

so, thinking a bit more about this, the write oriented API would be a bit cumbersome. For HTTP, I have a Stream of Bytes buffers going in, and must have a Stream of (compressed) Bytes buffers going out. If I use the write oriented approach, I must write into an AsyncWrite that somehow will output them to a Stream of buffers. IIRC that will require having the encoding part in a spawned task and have a channel on the other end. Doable but annoying (are there other solutions for that?).

Now, taking a step back: if we want to go from Stream of Bytes to Stream of Bytes, would it be possible to avoid going through AsyncRead and AsyncWrite, and instead pass through the encoder directly, and flush after compressing each chunk? The structures in src/codec could be used to do that. Should I add some kind of map_compress method for Stream implementations, instead of trying out poll_flush?

@Nemo157
Copy link
Member

Nemo157 commented Oct 12, 2022

For HTTP, I have a Stream of Bytes buffers going in, and must have a Stream of (compressed) Bytes buffers going out.

For hyper yep, I've never understood why it is fn(Readable) -> Readable instead of fn(Readable, Writeable) which is to me the more natural mapping of HTTP semantics (especially when you get into streaming/long-polling) and can be trivially converted to request/response.

The stream module is deprecated since it is no more efficient to implement directly than using the bufread module + utilities to convert to/from Stream<Bytes>. I'd prefer the bufread::*Encoder::poll_flush approach.

@Geal
Copy link
Author

Geal commented Oct 12, 2022

alright, I'll look into poll_flush then

@Geal
Copy link
Author

Geal commented Oct 12, 2022

it feels like the real complexity here is fitting poll_flush in tower-http's compression layer. It has to interact with WrapBody, which is used both for compression and decompression https://github.com/tower-rs/tower-http/blob/master/tower-http/src/compression_utils.rs#L123-L210

@Geal
Copy link
Author

Geal commented Oct 12, 2022

Now that I am looking around at other crates that use async-compression (and there are a LOT 😀 ), maybe the right approach would be to provide wrappers for encoders and decoders with configurable trapdoors, like flushing when enough data is received, or after enough time without activity. That could keep the AsyncRead interface, and would be a drop in replacement for HTTP related crates

@Nemo157
Copy link
Member

Nemo157 commented Oct 12, 2022

That could be implemented on top of poll_flush by having the wrapper forward AsyncRead::poll_read to either poll_read or poll_flush as it chooses.

@Geal Geal linked a pull request Oct 17, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants