Skip to content

Commit

Permalink
Fix error handling edge cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Sep 16, 2024
1 parent 0361015 commit 1d85bf2
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 32 deletions.
4 changes: 2 additions & 2 deletions lib/protocol/http/body/stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,8 @@ def close_write(error = nil)

# Close the input and output bodies.
def close(error = nil)
self.close_read
self.close_write
self.close_read(error)
self.close_write(error)

return nil
ensure
Expand Down
45 changes: 21 additions & 24 deletions lib/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,12 @@ module Body
#
# When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server.
module Streamable
def self.new(*arguments)
if arguments.size == 1
DeferredBody.new(*arguments)
else
Body.new(*arguments)
end
end

def self.request(&block)
DeferredBody.new(block)
RequestBody.new(block)
end

def self.response(request, &block)
Body.new(block, request.body)
ResponseBody.new(block, request.body)
end

class Output
Expand Down Expand Up @@ -109,32 +101,37 @@ def call(stream)
raise
end

# Closing a stream indicates we are no longer interested in reading from it.
def close(error = nil)
if output = @output
@output = nil
# Closing the output here may take some time, as it may need to finish handling the stream:
output.close(error)
end

def close_input(error = nil)
if input = @input
@input = nil
input.close(error)
end
end

def close_output(error = nil)
@output&.close(error)
end
end

# A response body is used on the server side to generate the response body using a block.
class ResponseBody < Body
def close(error = nil)
# Close will be invoked when all the output is written.
self.close_output(error)
end
end

# A deferred body has an extra `stream` method which can be used to stream data into the body, as the response body won't be available until the request has been sent.
class DeferredBody < Body
# A request body is used on the client side to generate the request body using a block.
#
# As the response body isn't available until the request is sent, the response body must be {stream}ed into the request body.
class RequestBody < Body
def initialize(block)
super(block, Writable.new)
end

# Closing a stream indicates we are no longer interested in reading from it, but in this case that does not mean that the output block is finished generating data.
def close(error = nil)
if error
super
end
# Close will be invoked when all the input is read.
self.close_input(error)
end

# Stream the response body into the block's input.
Expand Down
15 changes: 13 additions & 2 deletions lib/protocol/http/body/writable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ def read
raise @error
end

@queue.pop
# This operation may result in @error being set.
chunk = @queue.pop

if @error
raise @error
end

return chunk
end

# Write a single chunk to the body. Signal completion by calling `#finish`.
Expand Down Expand Up @@ -118,7 +125,11 @@ def output
end

def inspect
"\#<#{self.class} #{@count} chunks written, #{status}>"
if @error
"\#<#{self.class} #{@count} chunks written, #{status}, error=#{@error}>"
else
"\#<#{self.class} #{@count} chunks written, #{status}>"
end
end

private
Expand Down
45 changes: 41 additions & 4 deletions test/protocol/http/body/streamable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,23 @@
end
end

let(:body) {subject.new(block)}
let(:body) {subject.request(&block)}

with ".request" do
it "can create a new body" do
body = subject.request(&block)
expect(body).to be_a(Protocol::HTTP::Body::Streamable::RequestBody)
end
end

with ".response" do
let(:request) {Protocol::HTTP::Request.new("GET", "/")}

it "can create a new body" do
body = subject.response(request, &block)
expect(body).to be_a(Protocol::HTTP::Body::Streamable::Body)
end
end

with "#stream?" do
it "should be streamable" do
Expand All @@ -40,8 +56,6 @@
end
end

let(:body) {subject.new(block)}

it "can close the output body" do
expect(body.read).to be == nil
end
Expand Down Expand Up @@ -141,7 +155,8 @@
end
end

let(:body) {subject.new(block, input)}
let(:response) {Protocol::HTTP::Response[200, body: input]}
let(:body) {subject.response(response, &block)}

it "can read from input" do
expect(body.read).to be == "Hello"
Expand All @@ -160,6 +175,8 @@

with '#close' do
it "can close the body" do
expect(input).not.to receive(:close)

expect(body.read).to be == "Hello"
body.close
end
Expand All @@ -172,6 +189,8 @@
while chunk = stream.read_partial
stream.write(chunk)
end
rescue => error
stream.close(error)
end
end

Expand All @@ -186,5 +205,23 @@

body.close
end

it "can stream to output with an error" do
input = Protocol::HTTP::Body::Buffered.new(["Hello", " ", "World"])

mock(input) do |mock|
mock.replace(:read) do
raise "Oh no!"
end
end

expect do
body.stream(input)
end.to raise_exception(RuntimeError, message: be =~ /Oh no!/)

expect do
body.read
end.to raise_exception(RuntimeError, message: be =~ /Oh no!/)
end
end
end

0 comments on commit 1d85bf2

Please sign in to comment.