From dd3b233d3d932b133d4f5a49b93bb962c5b9c3bf Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 28 Nov 2024 17:20:22 +1300 Subject: [PATCH] Improved documentatio for `Protocol::HTTP::Body`. --- lib/protocol/http/body.rb | 16 ++++++++ lib/protocol/http/body/buffered.rb | 31 ++++++++++++++- lib/protocol/http/body/completable.rb | 13 +++++++ lib/protocol/http/body/deflate.rb | 33 ++++++++++++++++ lib/protocol/http/body/digestable.rb | 23 +++++++++-- lib/protocol/http/body/file.rb | 38 +++++++++++++++++- lib/protocol/http/body/head.rb | 8 ++++ lib/protocol/http/body/inflate.rb | 12 +++++- lib/protocol/http/body/readable.rb | 43 +++++++++++++++------ lib/protocol/http/body/reader.rb | 17 +++++++++ lib/protocol/http/body/rewindable.rb | 20 +++++++++- lib/protocol/http/body/stream.rb | 40 ++++++++++++++++--- lib/protocol/http/body/streamable.rb | 51 ++++++++++++++++++++++--- lib/protocol/http/body/wrapper.rb | 28 ++++++++++++-- lib/protocol/http/body/writable.rb | 55 +++++++++++++++++++++++---- lib/protocol/http/headers.rb | 4 ++ 16 files changed, 390 insertions(+), 42 deletions(-) create mode 100644 lib/protocol/http/body.rb diff --git a/lib/protocol/http/body.rb b/lib/protocol/http/body.rb new file mode 100644 index 0000000..719bed8 --- /dev/null +++ b/lib/protocol/http/body.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2024, by Samuel Williams. + +require_relative "body/readable" +require_relative "body/writable" +require_relative "body/wrapper" + +module Protocol + module HTTP + # @namespace + module Body + end + end +end diff --git a/lib/protocol/http/body/buffered.rb b/lib/protocol/http/body/buffered.rb index ea66557..c1bc958 100644 --- a/lib/protocol/http/body/buffered.rb +++ b/lib/protocol/http/body/buffered.rb @@ -43,6 +43,10 @@ def self.read(body) self.new(chunks) end + # Initialize the buffered body with some chunks. + # + # @parameter chunks [Array(String)] the chunks to buffer. + # @parameter length [Integer] the length of the body, if known. def initialize(chunks = [], length = nil) @chunks = chunks @length = length @@ -50,6 +54,7 @@ def initialize(chunks = [], length = nil) @index = 0 end + # @attribute [Array(String)] chunks the buffered chunks. attr :chunks # A rewindable body wraps some other body. Convert it to a buffered body. The buffered body will share the same chunks as the rewindable body. @@ -59,36 +64,48 @@ def buffered self.class.new(@chunks) end + # Finish the body, this is a no-op. + # + # @returns [Buffered] self. def finish self end - # Ensure that future reads return nil, but allow for rewinding. + # Ensure that future reads return `nil`, but allow for rewinding. + # + # @parameter error [Exception | Nil] the error that caused the body to be closed, if any. def close(error = nil) @index = @chunks.length return nil end + # Clear the buffered chunks. def clear @chunks = [] @length = 0 @index = 0 end + # The length of the body. Will compute and cache the length of the body, if it was not provided. def length @length ||= @chunks.inject(0) {|sum, chunk| sum + chunk.bytesize} end + # @returns [Boolean] if the body is empty. def empty? @index >= @chunks.length end - # A buffered response is always ready. + # Whether the body is ready to be read. + # @returns [Boolean] a buffered response is always ready. def ready? true end + # Read the next chunk from the buffered body. + # + # @returns [String | Nil] the next chunk or nil if there are no more chunks. def read return nil unless @chunks @@ -99,23 +116,30 @@ def read end end + # Discard the body. Invokes {#close}. def discard # It's safe to call close here because there is no underlying stream to close: self.close end + # Write a chunk to the buffered body. def write(chunk) @chunks << chunk end + # Close the body for writing. This is a no-op. def close_write(error) # Nothing to do. end + # Whether the body can be rewound. + # + # @returns [Boolean] if the body has chunks. def rewindable? @chunks != nil end + # Rewind the body to the beginning, causing a subsequent read to return the first chunk. def rewind return false unless @chunks @@ -124,6 +148,9 @@ def rewind return true end + # Inspect the buffered body. + # + # @returns [String] a string representation of the buffered body. def inspect if @chunks "\#<#{self.class} #{@chunks.size} chunks, #{self.length} bytes>" diff --git a/lib/protocol/http/body/completable.rb b/lib/protocol/http/body/completable.rb index 0d83a41..b111de3 100644 --- a/lib/protocol/http/body/completable.rb +++ b/lib/protocol/http/body/completable.rb @@ -10,6 +10,10 @@ module HTTP module Body # Invokes a callback once the body has completed, either successfully or due to an error. class Completable < Wrapper + # Wrap a message body with a callback. If the body is empty, the callback is invoked immediately. + # + # @parameter message [Request | Response] the message body. + # @parameter block [Proc] the callback to invoke when the body is closed. def self.wrap(message, &block) if body = message&.body and !body.empty? message.body = self.new(message.body, block) @@ -18,20 +22,29 @@ def self.wrap(message, &block) end end + # Initialize the completable body with a callback. + # + # @parameter body [Readable] the body to wrap. + # @parameter callback [Proc] the callback to invoke when the body is closed. def initialize(body, callback) super(body) @callback = callback end + # @returns [Boolean] completable bodies are not rewindable. def rewindable? false end + # Rewind the body, is not supported. def rewind false end + # Close the body and invoke the callback. If an error is given, it is passed to the callback. + # + # The calback is only invoked once, and before `super` is invoked. def close(error = nil) if @callback @callback.call(error) diff --git a/lib/protocol/http/body/deflate.rb b/lib/protocol/http/body/deflate.rb index aef35cc..f7bddcf 100644 --- a/lib/protocol/http/body/deflate.rb +++ b/lib/protocol/http/body/deflate.rb @@ -10,17 +10,27 @@ module Protocol module HTTP module Body + # A body which compresses or decompresses the contents using the DEFLATE or GZIP algorithm. class ZStream < Wrapper + # The default compression level. DEFAULT_LEVEL = 7 + # The DEFLATE window size. DEFLATE = -Zlib::MAX_WBITS + + # The GZIP window size. GZIP = Zlib::MAX_WBITS | 16 + # The supported encodings. ENCODINGS = { "deflate" => DEFLATE, "gzip" => GZIP, } + # Initialize the body with the given stream. + # + # @parameter body [Readable] the body to wrap. + # @parameter stream [Zlib::Deflate | Zlib::Inflate] the stream to use for compression or decompression. def initialize(body, stream) super(body) @@ -30,6 +40,9 @@ def initialize(body, stream) @output_length = 0 end + # Close the stream. + # + # @parameter error [Exception | Nil] the error that caused the stream to be closed. def close(error = nil) if stream = @stream @stream = nil @@ -39,14 +52,21 @@ def close(error = nil) super end + # The length of the output, if known. Generally, this is not known due to the nature of compression. def length # We don't know the length of the output until after it's been compressed. nil end + # @attribute [Integer] input_length the total number of bytes read from the input. attr :input_length + + # @attribute [Integer] output_length the total number of bytes written to the output. attr :output_length + # The compression ratio, according to the input and output lengths. + # + # @returns [Float] the compression ratio, e.g. 0.5 for 50% compression. def ratio if @input_length != 0 @output_length.to_f / @input_length.to_f @@ -55,16 +75,29 @@ def ratio end end + # Inspect the body, including the compression ratio. + # + # @returns [String] a string representation of the body. def inspect "#{super} | \#<#{self.class} #{(ratio*100).round(2)}%>" end end + # A body which compresses the contents using the DEFLATE or GZIP algorithm. class Deflate < ZStream + # Create a new body which compresses the given body using the GZIP algorithm by default. + # + # @parameter body [Readable] the body to wrap. + # @parameter window_size [Integer] the window size to use for compression. + # @parameter level [Integer] the compression level to use. + # @returns [Deflate] the wrapped body. def self.for(body, window_size = GZIP, level = DEFAULT_LEVEL) self.new(body, Zlib::Deflate.new(level, window_size)) end + # Read a chunk from the underlying body and compress it. If the body is finished, the stream is flushed and finished, and the remaining data is returned. + # + # @returns [String | Nil] the compressed chunk or `nil` if the stream is closed. def read return if @stream.finished? diff --git a/lib/protocol/http/body/digestable.rb b/lib/protocol/http/body/digestable.rb index f174868..56cd5b6 100644 --- a/lib/protocol/http/body/digestable.rb +++ b/lib/protocol/http/body/digestable.rb @@ -12,12 +12,21 @@ module HTTP module Body # Invokes a callback once the body has finished reading. class Digestable < Wrapper + # Wrap a message body with a callback. If the body is empty, the callback is not invoked, as there is no data to digest. + # + # @parameter message [Request | Response] the message body. + # @parameter digest [Digest] the digest to use. + # @parameter block [Proc] the callback to invoke when the body is closed. def self.wrap(message, digest = Digest::SHA256.new, &block) if body = message&.body and !body.empty? message.body = self.new(message.body, digest, block) end end + # Initialize the digestable body with a callback. + # + # @parameter body [Readable] the body to wrap. + # @parameter digest [Digest] the digest to use. # @parameter callback [Block] The callback is invoked when the digest is complete. def initialize(body, digest = Digest::SHA256.new, callback = nil) super(body) @@ -25,11 +34,14 @@ def initialize(body, digest = Digest::SHA256.new, callback = nil) @digest = digest @callback = callback end + + # @attribute [Digest] digest the digest object. + attr :digest - def digest - @digest - end - + # Generate an appropriate ETag for the digest, assuming it is complete. If you call this method before the body is fully read, the ETag will be incorrect. + # + # @parameter weak [Boolean] If true, the ETag is marked as weak. + # @returns [String] the ETag. def etag(weak: false) if weak "W/\"#{digest.hexdigest}\"" @@ -38,6 +50,9 @@ def etag(weak: false) end end + # Read the body and update the digest. When the body is fully read, the callback is invoked with `self` as the argument. + # + # @returns [String | Nil] the next chunk of data, or nil if the body is fully read. def read if chunk = super @digest.update(chunk) diff --git a/lib/protocol/http/body/file.rb b/lib/protocol/http/body/file.rb index 1320e18..e7517a3 100644 --- a/lib/protocol/http/body/file.rb +++ b/lib/protocol/http/body/file.rb @@ -8,14 +8,27 @@ module Protocol module HTTP module Body + # A body which reads from a file. class File < Readable - BLOCK_SIZE = 4096 + # The default block size. + BLOCK_SIZE = 64*1024 + + # The default mode for opening files. MODE = ::File::RDONLY | ::File::BINARY + # Open a file at the given path. + # + # @parameter path [String] the path to the file. def self.open(path, *arguments, **options) self.new(::File.open(path, MODE), *arguments, **options) end + # Initialize the file body with the given file. + # + # @parameter file [::File] the file to read from. + # @parameter range [Range] the range of bytes to read from the file. + # @parameter size [Integer] the size of the file, if known. + # @parameter block_size [Integer] the block size to use when reading from the file. def initialize(file, range = nil, size: file.size, block_size: BLOCK_SIZE) @file = file @range = range @@ -33,6 +46,9 @@ def initialize(file, range = nil, size: file.size, block_size: BLOCK_SIZE) end end + # Close the file. + # + # @parameter error [Exception | Nil] the error that caused the file to be closed. def close(error = nil) @file.close @remaining = 0 @@ -40,32 +56,46 @@ def close(error = nil) super end + # @attribute [::File] file the file to read from. attr :file + # @attribute [Integer] the offset to read from. attr :offset + + # @attribute [Integer] the number of bytes to read. attr :length + # @returns [Boolean] whether more data should be read. def empty? @remaining == 0 end + # @returns [Boolean] whether the body is ready to be read, always true for files. def ready? true end + # Returns a copy of the body, by duplicating the file descriptor, including the same range if specified. + # + # @returns [File] the duplicated body. def buffered self.class.new(@file.dup, @range, block_size: @block_size) end + # Rewind the file to the beginning of the range. def rewind @file.seek(@offset) @remaining = @length end + # @returns [Boolean] whether the body is rewindable, generally always true for seekable files. def rewindable? true end + # Read the next chunk of data from the file. + # + # @returns [String | Nil] the next chunk of data, or nil if the file is fully read. def read if @remaining > 0 amount = [@remaining, @block_size].min @@ -88,6 +118,9 @@ def read # stream.close # end + # Read all the remaining data from the file and return it as a single string. + # + # @returns [String] the remaining data. def join return "" if @remaining == 0 @@ -98,6 +131,9 @@ def join return buffer end + # Inspect the file body. + # + # @returns [String] a string representation of the file body. def inspect "\#<#{self.class} file=#{@file.inspect} offset=#{@offset} remaining=#{@remaining}>" end diff --git a/lib/protocol/http/body/head.rb b/lib/protocol/http/body/head.rb index 8ca4b68..65171e0 100644 --- a/lib/protocol/http/body/head.rb +++ b/lib/protocol/http/body/head.rb @@ -8,7 +8,9 @@ module Protocol module HTTP module Body + # Represents a body suitable for HEAD requests, in other words, a body that is empty and has a known length. class Head < Readable + # Create a head body for the given body, capturing it's length and then closing it. def self.for(body) head = self.new(body.length) @@ -17,18 +19,24 @@ def self.for(body) return head end + # Initialize the head body with the given length. + # + # @parameter length [Integer] the length of the body. def initialize(length) @length = length end + # @returns [Boolean] the body is empty. def empty? true end + # @returns [Boolean] the body is ready. def ready? true end + # @returns [Integer] the length of the body, if known. def length @length end diff --git a/lib/protocol/http/body/inflate.rb b/lib/protocol/http/body/inflate.rb index 5c46453..61c817a 100644 --- a/lib/protocol/http/body/inflate.rb +++ b/lib/protocol/http/body/inflate.rb @@ -10,11 +10,19 @@ module Protocol module HTTP module Body + # A body which decompresses the contents using the DEFLATE or GZIP algorithm. class Inflate < ZStream - def self.for(body, encoding = GZIP) - self.new(body, Zlib::Inflate.new(encoding)) + # Create a new body which decompresses the given body using the GZIP algorithm by default. + # + # @parameter body [Readable] the body to wrap. + # @parameter window_size [Integer] the window size to use for decompression. + def self.for(body, window_size = GZIP) + self.new(body, Zlib::Inflate.new(window_size)) end + # Read from the underlying stream and inflate it. + # + # @returns [String | Nil] the inflated data, or nil if the stream is finished. def read if stream = @stream # Read from the underlying stream and inflate it: diff --git a/lib/protocol/http/body/readable.rb b/lib/protocol/http/body/readable.rb index 0ac2a8c..21b04b0 100644 --- a/lib/protocol/http/body/readable.rb +++ b/lib/protocol/http/body/readable.rb @@ -9,42 +9,51 @@ module HTTP module Body # Represents a readable input streams. # - # Typically, you'd override `#read` to return chunks of data. + # There are two major modes of operation: # - # In general, you read chunks of data from a body until it is empty and returns `nil`. Upon reading `nil`, the body is considered consumed and should not be read from again. + # 1. Reading chunks using {read} (or {each}/{join}), until the body is empty, or + # 2. Streaming chunks using {call}, which writes chunks to a provided output stream. # - # Reading can also fail, for example if the body represents a streaming upload, and the connection is lost. In this case, `#read` will raise some kind of error. + # In both cases, reading can fail, for example if the body represents a streaming upload, and the connection is lost. In this case, {read} will raise some kind of error, or the stream will be closed with an error. # - # If you don't want to read from a stream, and instead want to close it immediately, you can call `close` on the body. If the body is already completely consumed, `close` will do nothing, but if there is still data to be read, it will cause the underlying stream to be reset (and possibly closed). + # At any point, you can use {close} to close the stream and release any resources, or {discard} to read all remaining data without processing it which may allow the underlying connection to be reused (but can be slower). class Readable # Close the stream immediately. After invoking this method, the stream should be considered closed, and all internal resources should be released. # # If an error occured while handling the output, it can be passed as an argument. This may be propagated to the client, for example the client may be informed that the stream was not fully read correctly. # - # Invoking `#read` after `#close` will return `nil`. + # Invoking {read} after {close} will return `nil`. + # + # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close(error = nil) end # Optimistically determine whether read (may) return any data. - # If this returns true, then calling read will definitely return nil. - # If this returns false, then calling read may return nil. + # + # - If this returns true, then calling read will definitely return nil. + # - If this returns false, then calling read may return nil. + # + # @return [Boolean] Whether the stream is empty. def empty? false end - # Whether calling read will return a chunk of data without blocking. - # We prefer pessimistic implementation, and thus default to `false`. - # @return [Boolean] + # Whether calling read will return a chunk of data without blocking. We prefer pessimistic implementation, and thus default to `false`. + # + # @return [Boolean] Whether the stream is ready (read will not block). def ready? false end # Whether the stream can be rewound using {rewind}. + # + # @return [Boolean] Whether the stream is rewindable. def rewindable? false end # Rewind the stream to the beginning. + # # @returns [Boolean] Whether the stream was successfully rewound. def rewind false @@ -60,19 +69,21 @@ def buffered end # The total length of the body, if known. + # # @returns [Integer | Nil] The total length of the body, or `nil` if the length is unknown. def length nil end # Read the next available chunk. + # # @returns [String | Nil] The chunk of data, or `nil` if the stream has finished. # @raises [StandardError] If an error occurs while reading. def read nil end - # Enumerate all chunks until finished, then invoke `#close`. + # Enumerate all chunks until finished, then invoke {close}. # # Closes the stream when finished or if an error occurs. # @@ -109,6 +120,9 @@ def join end end + # Whether to prefer streaming the body using {call} rather than reading it using {read} or {each}. + # + # @returns [Boolean] Whether the body should be streamed. def stream? false end @@ -131,6 +145,7 @@ def call(stream) end end ensure + # TODO Should this invoke close_write(error) instead? stream.close end @@ -152,6 +167,9 @@ def discard end end + # Convert the body to a hash suitable for serialization. This won't include the contents of the body, but will include metadata such as the length, streamability, and readiness, etc. + # + # @returns [Hash] The body as a hash. def as_json(...) { class: self.class.name, @@ -162,6 +180,9 @@ def as_json(...) } end + # Convert the body to JSON. + # + # @returns [String] The body as JSON. def to_json(...) as_json.to_json(...) end diff --git a/lib/protocol/http/body/reader.rb b/lib/protocol/http/body/reader.rb index 8859db9..c04fbc3 100644 --- a/lib/protocol/http/body/reader.rb +++ b/lib/protocol/http/body/reader.rb @@ -8,8 +8,11 @@ module Protocol module HTTP module Body # General operations for interacting with a request or response body. + # + # This module is included in both {Request} and {Response}. module Reader # Read chunks from the body. + # # @yields {|chunk| ...} chunks from the body. def each(&block) if @body @@ -19,6 +22,7 @@ def each(&block) end # Reads the entire request/response body. + # # @returns [String] the entire body as a string. def read if @body @@ -30,6 +34,7 @@ def read end # Gracefully finish reading the body. This will buffer the remainder of the body. + # # @returns [Buffered] buffers the entire body. def finish if @body @@ -46,19 +51,27 @@ def discard @body = nil body.discard end + + return nil end # Buffer the entire request/response body. + # # @returns [Reader] itself. def buffered! if @body @body = @body.finish end + # TODO Should this return @body instead? It seems more useful. return self end # Write the body of the response to the given file path. + # + # @parameter path [String] the path to write the body to. + # @parameter mode [Integer] the mode to open the file with. + # @parameter options [Hash] additional options to pass to `File.open`. def save(path, mode = ::File::WRONLY|::File::CREAT|::File::TRUNC, **options) if @body ::File.open(path, mode, **options) do |file| @@ -70,6 +83,8 @@ def save(path, mode = ::File::WRONLY|::File::CREAT|::File::TRUNC, **options) end # Close the connection as quickly as possible. Discards body. May close the underlying connection if necessary to terminate the stream. + # + # @parameter error [Exception | Nil] the error that caused the stream to be closed, if any. def close(error = nil) if @body @body.close(error) @@ -78,6 +93,8 @@ def close(error = nil) end # Whether there is a body? + # + # @returns [Boolean] whether there is a body. def body? @body and !@body.empty? end diff --git a/lib/protocol/http/body/rewindable.rb b/lib/protocol/http/body/rewindable.rb index 80f93a4..716a52e 100644 --- a/lib/protocol/http/body/rewindable.rb +++ b/lib/protocol/http/body/rewindable.rb @@ -9,8 +9,13 @@ module Protocol module HTTP module Body - # A body which buffers all it's contents as it is `#read`. + # A body which buffers all it's contents as it is read. + # + # As the body is buffered in memory, you may want to ensure your server has sufficient (virtual) memory available to buffer the entire body. class Rewindable < Wrapper + # Wrap the given message body in a rewindable body, if it is not already rewindable. + # + # @parameter message [Request | Response] the message to wrap. def self.wrap(message) if body = message.body if body.rewindable? @@ -21,6 +26,9 @@ def self.wrap(message) end end + # Initialize the body with the given body. + # + # @parameter body [Readable] the body to wrap. def initialize(body) super(body) @@ -28,10 +36,12 @@ def initialize(body) @index = 0 end + # @returns [Boolean] Whether the body is empty. def empty? (@index >= @chunks.size) && super end + # @returns [Boolean] Whether the body is ready to be read. def ready? (@index < @chunks.size) || super end @@ -43,6 +53,9 @@ def buffered Buffered.new(@chunks) end + # Read the next available chunk. This may return a buffered chunk if the stream has been rewound, or a chunk from the underlying stream, if available. + # + # @returns [String | Nil] The chunk of data, or `nil` if the stream has finished. def read if @index < @chunks.size chunk = @chunks[@index] @@ -58,14 +71,19 @@ def read return chunk end + # Rewind the stream to the beginning. def rewind @index = 0 end + # @returns [Boolean] Whether the stream is rewindable, which it is. def rewindable? true end + # Inspect the rewindable body. + # + # @returns [String] a string representation of the body. def inspect "\#<#{self.class} #{@index}/#{@chunks.size} chunks read>" end diff --git a/lib/protocol/http/body/stream.rb b/lib/protocol/http/body/stream.rb index bccce6a..62b0314 100644 --- a/lib/protocol/http/body/stream.rb +++ b/lib/protocol/http/body/stream.rb @@ -11,8 +11,13 @@ module HTTP module Body # The input stream is an IO-like object which contains the raw HTTP POST data. When applicable, its external encoding must be “ASCII-8BIT” and it must be opened in binary mode, for Ruby 1.9 compatibility. The input stream must respond to gets, each, read and rewind. class Stream + # The default line separator, used by {gets}. NEWLINE = "\n" + # Initialize the stream with the given input and output. + # + # @parameter input [Readable] The input stream. + # @parameter output [Writable] The output stream. def initialize(input = nil, output = Buffered.new) @input = input @output = output @@ -26,7 +31,10 @@ def initialize(input = nil, output = Buffered.new) @closed_read = false end + # @attribute [Readable] The input stream. attr :input + + # @attribute [Writable] The output stream. attr :output # This provides a read-only interface for data, which is surprisingly tricky to implement correctly. @@ -39,9 +47,9 @@ module Reader # # If buffer is given, then the read data will be placed into buffer instead of a newly created String object. # - # @param length [Integer] the amount of data to read - # @param buffer [String] the buffer which will receive the data - # @return a buffer containing the data + # @parameterlength [Integer] the amount of data to read + # @parameter buffer [String] the buffer which will receive the data + # @returns [String] a buffer containing the data def read(length = nil, buffer = nil) return "" if length == 0 @@ -125,11 +133,14 @@ def read_partial(length = nil, buffer = nil) end # Similar to {read_partial} but raises an `EOFError` if the stream is at EOF. + # + # @parameter length [Integer] The maximum number of bytes to read. + # @parameter buffer [String] The buffer to read into. def readpartial(length, buffer = nil) read_partial(length, buffer) or raise EOFError, "End of file reached!" end - # Iterate over each chunk of data in the stream. + # Iterate over each chunk of data from the input stream. # # @yields {|chunk| ...} Each chunk of data. def each(&block) @@ -146,6 +157,9 @@ def each(&block) end # Read data from the stream without blocking if possible. + # + # @parameter length [Integer] The maximum number of bytes to read. + # @parameter buffer [String | Nil] The buffer to read into. def read_nonblock(length, buffer = nil, exception: nil) @buffer ||= read_next chunk = nil @@ -323,6 +337,10 @@ def flush end # Close the input body. + # + # If, while processing the data that was read from this stream, an error is encountered, it should be passed to this method. + # + # @parameter error [Exception | Nil] The error that was encountered, if any. def close_read(error = nil) if input = @input @input = nil @@ -334,6 +352,10 @@ def close_read(error = nil) end # Close the output body. + # + # If, while generating the data that is written to this stream, an error is encountered, it should be passed to this method. + # + # @parameter error [Exception | Nil] The error that was encountered, if any. def close_write(error = nil) if output = @output @output = nil @@ -343,6 +365,8 @@ def close_write(error = nil) end # Close the input and output bodies. + # + # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close(error = nil) self.close_read(error) self.close_write(error) @@ -352,18 +376,22 @@ def close(error = nil) @closed = true end - # Whether the stream has been closed. + # @returns [Boolean] Whether the stream has been closed. def closed? @closed end - # Whether there are any output chunks remaining? + # @returns [Boolean] Whether there are any output chunks remaining. def empty? @output.empty? end private + # Read the next chunk of data from the input stream. + # + # @returns [String] The next chunk of data. + # @raises [IOError] If the input stream was explicitly closed. def read_next if @input return @input.read diff --git a/lib/protocol/http/body/streamable.rb b/lib/protocol/http/body/streamable.rb index c213730..957d2a1 100644 --- a/lib/protocol/http/body/streamable.rb +++ b/lib/protocol/http/body/streamable.rb @@ -13,39 +13,65 @@ module HTTP module Body # A body that invokes a block that can read and write to a stream. # - # In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement `stream?` and return `true`. When `stream?` returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using `#each`. + # In some cases, it's advantageous to directly read and write to the underlying stream if possible. For example, HTTP/1 upgrade requests, WebSockets, and similar. To handle that case, response bodies can implement {stream?} and return `true`. When {stream?} returns true, the body **should** be consumed by calling `call(stream)`. Server implementations may choose to always invoke `call(stream)` if it's efficient to do so. Bodies that don't support it will fall back to using {each}. # # When invoking `call(stream)`, the stream can be read from and written to, and closed. However, the stream is only guaranteed to be open for the duration of the `call(stream)` call. Once the method returns, the stream **should** be closed by the server. module Streamable + # Generate a new streaming request body using the given block to generate the body. + # + # @parameter block [Proc] The block that generates the body. + # @returns [RequestBody] The streaming request body. def self.request(&block) RequestBody.new(block) end + # Generate a new streaming response body using the given block to generate the body. + # + # @parameter request [Request] The request. + # @parameter block [Proc] The block that generates the body. + # @returns [ResponseBody] The streaming response body. def self.response(request, &block) ResponseBody.new(block, request.body) end + # A output stream that can be written to by a block. class Output + # Schedule the block to be executed in a fiber. + # + # @parameter input [Readable] The input stream. + # @parameter block [Proc] The block that generates the output. + # @returns [Output] The output stream. def self.schedule(input, block) self.new(input, block).tap(&:schedule) end + # Initialize the output stream with the given input and block. + # + # @parameter input [Readable] The input stream. + # @parameter block [Proc] The block that generates the output. def initialize(input, block) @output = Writable.new @stream = Stream.new(input, @output) @block = block end + # Schedule the block to be executed in a fiber. + # + # @returns [Fiber] The fiber. def schedule @fiber ||= Fiber.schedule do @block.call(@stream) end end + # Read from the output stream (may block). def read @output.read end + # Close the output stream. + # + # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close(error = nil) @output.close_write(error) end @@ -55,13 +81,19 @@ def close(error = nil) class ConsumedError < StandardError end + # A streaming body that can be read from and written to. class Body < Readable + # Initialize the body with the given block and input. + # + # @parameter block [Proc] The block that generates the body. + # @parameter input [Readable] The input stream, if known. def initialize(block, input = nil) @block = block @input = input @output = nil end + # @returns [Boolean] Whether the body can be streamed, which is true. def stream? true end @@ -81,9 +113,9 @@ def read @output.read end - # Invoke the block with the given stream. + # Invoke the block with the given stream. The block can read and write to the stream, and must close the stream when finishing. # - # The block can read and write to the stream, and must close the stream when finishing. + # @parameter stream [Stream] The stream to read and write to. def call(stream) if @block.nil? raise ConsumedError, "Streaming block has already been consumed!" @@ -101,6 +133,9 @@ def call(stream) raise end + # Close the input. The streaming body will eventually read all the input. + # + # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close_input(error = nil) if input = @input @input = nil @@ -108,6 +143,9 @@ def close_input(error = nil) end end + # Close the output, the streaming body will be unable to write any more output. + # + # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close_output(error = nil) @output&.close(error) end @@ -115,8 +153,8 @@ def close_output(error = nil) # A response body is used on the server side to generate the response body using a block. class ResponseBody < Body + # Close will be invoked when all the output is written. def close(error = nil) - # Close will be invoked when all the output is written. self.close_output(error) end end @@ -125,12 +163,15 @@ def close(error = nil) # # As the response body isn't available until the request is sent, the response body must be {stream}ed into the request body. class RequestBody < Body + # Initialize the request body with the given block. + # + # @parameter block [Proc] The block that generates the body. def initialize(block) super(block, Writable.new) end + # Close will be invoked when all the input is read. def close(error = nil) - # Close will be invoked when all the input is read. self.close_input(error) end diff --git a/lib/protocol/http/body/wrapper.rb b/lib/protocol/http/body/wrapper.rb index 3be26b7..c4a22e7 100644 --- a/lib/protocol/http/body/wrapper.rb +++ b/lib/protocol/http/body/wrapper.rb @@ -13,20 +13,26 @@ class Wrapper < Readable # Wrap the body of the given message in a new instance of this class. # # @parameter message [Request | Response] the message to wrap. - # @returns [Wrapper | nil] the wrapped body or nil if the body was nil. + # @returns [Wrapper | Nil] the wrapped body or `nil`` if the body was `nil`. def self.wrap(message) if body = message.body message.body = self.new(body) end end + # Initialize the wrapper with the given body. + # + # @parameter body [Readable] The body to wrap. def initialize(body) @body = body end - # The wrapped body. + # @attribute [Readable] The wrapped body. attr :body + # Close the body. + # + # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close(error = nil) @body.close(error) @@ -34,39 +40,49 @@ def close(error = nil) # super end + # Forwards to the wrapped body. def empty? @body.empty? end + # Forwards to the wrapped body. def ready? @body.ready? end + # Forwards to the wrapped body. def buffered @body.buffered end + # Forwards to the wrapped body. def rewind @body.rewind end + # Forwards to the wrapped body. def rewindable? @body.rewindable? end + # Forwards to the wrapped body. def length @body.length end - # Read the next available chunk. + # Forwards to the wrapped body. def read @body.read end + # Forwards to the wrapped body. def discard @body.discard end + # Convert the body to a hash suitable for serialization. + # + # @returns [Hash] The body as a hash. def as_json(...) { class: self.class.name, @@ -74,10 +90,16 @@ def as_json(...) } end + # Convert the body to JSON. + # + # @returns [String] The body as JSON. def to_json(...) as_json.to_json(...) end + # Inspect the wrapped body. The wrapper, by default, is transparent. + # + # @returns [String] a string representation of the wrapped body. def inspect @body.inspect end diff --git a/lib/protocol/http/body/writable.rb b/lib/protocol/http/body/writable.rb index 68a780d..f73635e 100644 --- a/lib/protocol/http/body/writable.rb +++ b/lib/protocol/http/body/writable.rb @@ -10,11 +10,14 @@ module HTTP module Body # A dynamic body which you can write to and read from. class Writable < Readable + # An error indicating that the body has been closed and no further writes are allowed. class Closed < StandardError end - # @param [Integer] length The length of the response body if known. - # @param [Async::Queue] queue Specify a different queue implementation, e.g. `Async::LimitedQueue.new(8)` to enable back-pressure streaming. + # Initialize the writable body. + # + # @parameter length [Integer] The length of the response body if known. + # @parameter queue [Thread::Queue] Specify a different queue implementation, e.g. `Thread::SizedQueue` to enable back-pressure. def initialize(length = nil, queue: Thread::Queue.new) @length = length @queue = queue @@ -22,11 +25,12 @@ def initialize(length = nil, queue: Thread::Queue.new) @error = nil end - def length - @length - end + # @attribute [Integer] The length of the response body if known. + attr :length # Stop generating output; cause the next call to write to fail with the given error. Does not prevent existing chunks from being read. In other words, this indicates both that no more data will be or should be written to the body. + # + # @parameter error [Exception] The error that caused this body to be closed, if any. Will be raised on the next call to {read}. def close(error = nil) @error ||= error @@ -36,20 +40,29 @@ def close(error = nil) super end + # Whether the body is closed. A closed body can not be written to or read from. + # + # @returns [Boolean] Whether the body is closed. def closed? @queue.closed? end + # @returns [Boolean] Whether the body is ready to be read from, without blocking. def ready? !@queue.empty? || @queue.closed? end - # Has the producer called #finish and has the reader consumed the nil token? + # Indicates whether the body is empty. This can occur if the body has been closed, or if the producer has invoked {close_write} and the reader has consumed all available chunks. + # + # @returns [Boolean] Whether the body is empty. def empty? @queue.empty? && @queue.closed? end # Read the next available chunk. + # + # @returns [String | Nil] The next chunk, or `nil` if the body is finished. + # @raises [Exception] If the body was closed due to an error. def read if @error raise @error @@ -65,7 +78,11 @@ def read return chunk end - # Write a single chunk to the body. Signal completion by calling `#finish`. + # Write a single chunk to the body. Signal completion by calling {close_write}. + # + # @parameter chunk [String] The chunk to write. + # @raises [Closed] If the body has been closed without error. + # @raises [Exception] If the body has been closed due to an error. def write(chunk) if @queue.closed? raise(@error || Closed) @@ -75,27 +92,41 @@ def write(chunk) @count += 1 end + # Signal that no more data will be written to the body. + # + # @parameter error [Exception] The error that caused this body to be closed, if any. def close_write(error = nil) @error ||= error @queue.close end + # The output interface for writing chunks to the body. class Output + # Initialize the output with the given writable body. + # + # @parameter writable [Writable] The writable body. def initialize(writable) @writable = writable @closed = false end + # @returns [Boolean] Whether the output is closed for writing only. def closed? @closed || @writable.closed? end + # Write a chunk to the body. def write(chunk) @writable.write(chunk) end alias << write + # Close the output stream. + # + # If an error is given, the error will be used to close the body by invoking {close} with the error. Otherwise, only the write side of the body will be closed. + # + # @parameter error [Exception | Nil] The error that caused this stream to be closed, if any. def close(error = nil) @closed = true @@ -108,6 +139,12 @@ def close(error = nil) end # Create an output wrapper which can be used to write chunks to the body. + # + # If a block is given, and the block raises an error, the error will used to close the body by invoking {close} with the error. + # + # @yields {|output| ...} if a block is given. + # @parameter output [Output] The output wrapper. + # @returns [Output] The output wrapper. def output output = Output.new(self) @@ -124,6 +161,9 @@ def output end end + # Inspect the body. + # + # @returns [String] A string representation of the body. def inspect if @error "\#<#{self.class} #{@count} chunks written, #{status}, error=#{@error}>" @@ -134,6 +174,7 @@ def inspect private + # @returns [String] A string representation of the body's status. def status if @queue.empty? if @queue.closed? diff --git a/lib/protocol/http/headers.rb b/lib/protocol/http/headers.rb index c882430..c91e11e 100644 --- a/lib/protocol/http/headers.rb +++ b/lib/protocol/http/headers.rb @@ -17,6 +17,10 @@ module Protocol module HTTP + # @namespace + module Header + end + # Headers are an array of key-value pairs. Some header keys represent multiple values. class Headers Split = Header::Split