Skip to content

Commit

Permalink
Improved streamable body implementation. (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix authored Sep 9, 2024
1 parent e9a5ffc commit 0bb3b0a
Show file tree
Hide file tree
Showing 23 changed files with 761 additions and 116 deletions.
56 changes: 56 additions & 0 deletions examples/streaming/bidirectional.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'

require 'protocol/http/body/streamable'
require 'protocol/http/body/writable'
require 'protocol/http/body/stream'

endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')

Async do
server = Async::HTTP::Server.for(endpoint) do |request|
output = Protocol::HTTP::Body::Streamable.response(request) do |stream|
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

Protocol::HTTP::Response[200, {}, output]
end

server_task = Async{server.run}

client = Async::HTTP::Client.new(endpoint)

streamable = Protocol::HTTP::Body::Streamable.request do |stream|
stream.write("Hello, ")
stream.write("World!")
stream.close_write

while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

response = client.get("/", body: streamable)
streamable.stream(response.body)
ensure
server_task.stop
end
57 changes: 57 additions & 0 deletions examples/streaming/gems.locked
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
PATH
remote: ../..
specs:
protocol-http (0.33.0)

GEM
remote: https://rubygems.org/
specs:
async (2.17.0)
console (~> 1.26)
fiber-annotation
io-event (~> 1.6, >= 1.6.5)
async-http (0.75.0)
async (>= 2.10.2)
async-pool (~> 0.7)
io-endpoint (~> 0.11)
io-stream (~> 0.4)
protocol-http (~> 0.30)
protocol-http1 (~> 0.20)
protocol-http2 (~> 0.18)
traces (>= 0.10)
async-pool (0.8.1)
async (>= 1.25)
metrics
traces
console (1.27.0)
fiber-annotation
fiber-local (~> 1.1)
json
fiber-annotation (0.2.0)
fiber-local (1.1.0)
fiber-storage
fiber-storage (1.0.0)
io-endpoint (0.13.1)
io-event (1.6.5)
io-stream (0.4.0)
json (2.7.2)
metrics (0.10.2)
protocol-hpack (1.5.0)
protocol-http1 (0.22.0)
protocol-http (~> 0.22)
protocol-http2 (0.18.0)
protocol-hpack (~> 1.4)
protocol-http (~> 0.18)
traces (0.13.1)

PLATFORMS
ruby
x86_64-linux

DEPENDENCIES
async
async-http
protocol-http!

BUNDLED WITH
2.5.16
10 changes: 10 additions & 0 deletions examples/streaming/gems.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

source "https://rubygems.org"

gem "async"
gem "async-http"
gem "protocol-http", path: "../../"
60 changes: 60 additions & 0 deletions examples/streaming/unidirectional.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'

require 'protocol/http/body/stream'
require 'protocol/http/body/writable'

endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')

Async do
server = Async::HTTP::Server.for(endpoint) do |request|
output = Protocol::HTTP::Body::Writable.new
stream = Protocol::HTTP::Body::Stream.new(request.body, output)

Async do
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

Protocol::HTTP::Response[200, {}, output]
end

server_task = Async{server.run}

client = Async::HTTP::Client.new(endpoint)

input = Protocol::HTTP::Body::Writable.new
response = client.get("/", body: input)

begin
stream = Protocol::HTTP::Body::Stream.new(response.body, input)

stream.write("Hello, ")
stream.write("World!")
stream.close_write

while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
ensure
server_task.stop
end
28 changes: 28 additions & 0 deletions fixtures/protocol/http/body/a_readable_body.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

module Protocol
module HTTP
module Body
AReadableBody = Sus::Shared("a readable body") do
with "#read" do
it "after closing, returns nil" do
body.close

expect(body.read).to be_nil
end
end

with "empty?" do
it "returns true after closing" do
body.close

expect(body).to be(:empty?)
end
end
end
end
end
end
41 changes: 41 additions & 0 deletions fixtures/protocol/http/body/a_writable_body.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2024, by Samuel Williams.

module Protocol
module HTTP
module Body
AWritableBody = Sus::Shared("a readable body") do
with "#read" do
it "after closing the write end, returns all chunks" do
body.write("Hello ")
body.write("World!")
body.close_write

expect(body.read).to be == "Hello "
expect(body.read).to be == "World!"
expect(body.read).to be_nil
end
end

with "empty?" do
it "returns false before writing" do
expect(body).not.to be(:empty?)
end

it "returns true after all chunks are consumed" do
body.write("Hello")
body.close_write

expect(body).not.to be(:empty?)
expect(body.read).to be == "Hello"
expect(body.read).to be_nil

expect(body).to be(:empty?)
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion guides/links.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
getting-started:
order: 1
design-overview:
order: 2
order: 10
131 changes: 131 additions & 0 deletions guides/streaming/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Streaming

This guide gives an overview of how to implement streaming requests and responses.

## Independent Uni-directional Streaming

The request and response body work independently of each other can stream data in both directions. {ruby Protocol::HTTP::Body::Stream} provides an interface to merge these independent streams into an IO-like interface.

```ruby
#!/usr/bin/env ruby

require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'

require 'protocol/http/body/stream'
require 'protocol/http/body/writable'

endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')

Async do
server = Async::HTTP::Server.for(endpoint) do |request|
output = Protocol::HTTP::Body::Writable.new
stream = Protocol::HTTP::Body::Stream.new(request.body, output)

Async do
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

Protocol::HTTP::Response[200, {}, output]
end

server_task = Async{server.run}

client = Async::HTTP::Client.new(endpoint)

input = Protocol::HTTP::Body::Writable.new
response = client.get("/", body: input)

begin
stream = Protocol::HTTP::Body::Stream.new(response.body, input)

stream.write("Hello, ")
stream.write("World!")
stream.close_write

while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
ensure
server_task.stop
end
```

This approach works quite well, especially when the input and output bodies are independently compressed, decompressed, or chunked. However, some protocols, notably, WebSockets operate on the raw connection and don't require this level of abstraction.

## Bi-directional Streaming

While WebSockets can work on the above streaming interface, it's a bit more convenient to use the streaming interface directly, which gives raw access to the underlying stream where possible.

```ruby
#!/usr/bin/env ruby

require 'async'
require 'async/http/client'
require 'async/http/server'
require 'async/http/endpoint'

require 'protocol/http/body/stream'
require 'protocol/http/body/writable'

endpoint = Async::HTTP::Endpoint.parse('http://localhost:3000')

Async do
server = Async::HTTP::Server.for(endpoint) do |request|
streamable = Protocol::HTTP::Body::Streamable.
output = Protocol::HTTP::Body::Writable.new
stream = Protocol::HTTP::Body::Stream.new(request.body, output)

Async do
# Simple echo server:
while chunk = stream.readpartial(1024)
stream.write(chunk)
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end

Protocol::HTTP::Response[200, {}, output]
end

server_task = Async{server.run}

client = Async::HTTP::Client.new(endpoint)

input = Protocol::HTTP::Body::Writable.new
response = client.get("/", body: input)

begin
stream = Protocol::HTTP::Body::Stream.new(response.body, input)

stream.write("Hello, ")
stream.write("World!")
stream.close_write

while chunk = stream.readpartial(1024)
puts chunk
end
rescue EOFError
# Ignore EOF errors.
ensure
stream.close
end
ensure
server_task.stop
end
Loading

0 comments on commit 0bb3b0a

Please sign in to comment.