From 20e0b91345d9485134916bf0e42604eb75af79e8 Mon Sep 17 00:00:00 2001 From: Dmitry Vorotilin Date: Thu, 4 Jan 2024 12:54:57 +0300 Subject: [PATCH] ref: Get rid of concurrent async We can easily workaround async with queue and a thread and btw simplify the subscriber. Swallowing errors is not good, closes #396 --- .../gemfiles/websocket-driver-0.6.x.gemfile | 9 ---- .../gemfiles/websocket-driver-0.7.x.gemfile | 9 ---- .github/workflows/tests.yml | 2 - CHANGELOG.md | 8 +++- ferrum.gemspec | 2 +- lib/ferrum.rb | 1 + lib/ferrum/browser/client.rb | 47 ++++++++----------- lib/ferrum/browser/subscriber.rb | 46 +++++++++++++++--- lib/ferrum/browser/web_socket.rb | 31 ++++++------ lib/ferrum/context.rb | 2 +- lib/ferrum/page/frames.rb | 2 +- lib/ferrum/proxy.rb | 2 +- lib/ferrum/utils/thread.rb | 18 +++++++ spec/spec_helper.rb | 2 +- 14 files changed, 106 insertions(+), 75 deletions(-) delete mode 100644 .github/gemfiles/websocket-driver-0.6.x.gemfile delete mode 100644 .github/gemfiles/websocket-driver-0.7.x.gemfile create mode 100644 lib/ferrum/utils/thread.rb diff --git a/.github/gemfiles/websocket-driver-0.6.x.gemfile b/.github/gemfiles/websocket-driver-0.6.x.gemfile deleted file mode 100644 index a970283f..00000000 --- a/.github/gemfiles/websocket-driver-0.6.x.gemfile +++ /dev/null @@ -1,9 +0,0 @@ -# frozen_string_literal: true - -source "https://rubygems.org" - -gem "websocket-driver", "~> 0.6.5" - -eval_gemfile "../../Gemfile" - -gemspec path: "../../" diff --git a/.github/gemfiles/websocket-driver-0.7.x.gemfile b/.github/gemfiles/websocket-driver-0.7.x.gemfile deleted file mode 100644 index 2f57a0e0..00000000 --- a/.github/gemfiles/websocket-driver-0.7.x.gemfile +++ /dev/null @@ -1,9 +0,0 @@ -# frozen_string_literal: true - -source "https://rubygems.org" - -gem "websocket-driver", "~> 0.7.1" - -eval_gemfile "../../Gemfile" - -gemspec path: "../../" diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 90b72d1d..244d57ab 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -11,13 +11,11 @@ jobs: strategy: fail-fast: false matrix: - gemfile: [websocket-driver-0.6.x, websocket-driver-0.7.x] ruby: [2.7, "3.0", 3.1, 3.2, 3.3] runs-on: ubuntu-latest env: FERRUM_PROCESS_TIMEOUT: 25 FERRUM_DEFAULT_TIMEOUT: 15 - BUNDLE_GEMFILE: .github/gemfiles/${{ matrix.gemfile }}.gemfile steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/CHANGELOG.md b/CHANGELOG.md index d330754e..751b840a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,11 +12,15 @@ - `Ferrum::Page#screeshot` accepts :area option [#410] - Resizing page on creation is gone and moved to Cuprite [#427] - Min Ruby version is 2.7 -- Refactored internal API of `Browser`, `Page`, `Context`, `Contexts`, `Target` instead of passing browser and making -cyclic dependency on the browser instance, we pass now a simple client [#431] +- Refactored internal API of `Ferrum::Browser`, `Ferrum::Page`, `Ferrum::Context`, `Ferrum::Contexts`, `Ferrum::Target` +instead of passing browser and making cyclic dependency on the browser instance, we pass now a simple client [#431] +- Bump `websocket-driver` to `~> 0.7` [#432] +- Got rid of `Concurrent::Async` in `Ferrum::Browser::Subscriber` [#432] ### Fixed +- Exceptions within `.on()` were swallowed by a thread pool of `Concurrent::Async` [#432] + ### Removed diff --git a/ferrum.gemspec b/ferrum.gemspec index ed7f0a71..753e3580 100644 --- a/ferrum.gemspec +++ b/ferrum.gemspec @@ -28,5 +28,5 @@ Gem::Specification.new do |s| s.add_runtime_dependency "addressable", "~> 2.5" s.add_runtime_dependency "concurrent-ruby", "~> 1.1" s.add_runtime_dependency "webrick", "~> 1.7" - s.add_runtime_dependency "websocket-driver", ">= 0.6", "< 0.8" + s.add_runtime_dependency "websocket-driver", "~> 0.7" end diff --git a/lib/ferrum.rb b/lib/ferrum.rb index 6354f2b5..5278288c 100644 --- a/lib/ferrum.rb +++ b/lib/ferrum.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +require "ferrum/utils/thread" require "ferrum/utils/platform" require "ferrum/utils/elapsed_time" require "ferrum/utils/attempt" diff --git a/lib/ferrum/browser/client.rb b/lib/ferrum/browser/client.rb index 9faf6b9f..2cbb729e 100644 --- a/lib/ferrum/browser/client.rb +++ b/lib/ferrum/browser/client.rb @@ -7,8 +7,6 @@ module Ferrum class Browser class Client - INTERRUPTIONS = %w[Fetch.requestPaused Fetch.authRequired].freeze - extend Forwardable delegate %i[timeout timeout=] => :options @@ -19,25 +17,9 @@ def initialize(ws_url, options) @options = options @pendings = Concurrent::Hash.new @ws = WebSocket.new(ws_url, options.ws_max_receive_size, options.logger) - @subscriber, @interrupter = Subscriber.build(2) - - @thread = Thread.new do - Thread.current.abort_on_exception = true - Thread.current.report_on_exception = true if Thread.current.respond_to?(:report_on_exception=) - - loop do - message = @ws.messages.pop - break unless message + @subscriber = Subscriber.new - if INTERRUPTIONS.include?(message["method"]) - @interrupter.async.call(message) - elsif message.key?("method") - @subscriber.async.call(message) - else - @pendings[message["id"]]&.set(message) - end - end - end + start end def command(method, params = {}) @@ -57,16 +39,11 @@ def command(method, params = {}) end def on(event, &block) - case event - when *INTERRUPTIONS - @interrupter.on(event, &block) - else - @subscriber.on(event, &block) - end + @subscriber.on(event, &block) end def subscribed?(event) - [@interrupter, @subscriber].any? { |s| s.subscribed?(event) } + @subscriber.subscribed?(event) end def close @@ -74,6 +51,7 @@ def close # Give a thread some time to handle a tail of messages @pendings.clear @thread.kill unless @thread.join(1) + @subscriber.close end def inspect @@ -85,6 +63,21 @@ def inspect private + def start + @thread = Utils::Thread.spawn do + loop do + message = @ws.messages.pop + break unless message + + if message.key?("method") + @subscriber << message + else + @pendings[message["id"]]&.set(message) + end + end + end + end + def build_message(method, params) { method: method, params: params }.merge(id: next_command_id) end diff --git a/lib/ferrum/browser/subscriber.rb b/lib/ferrum/browser/subscriber.rb index 239117f7..1a9fe7ee 100644 --- a/lib/ferrum/browser/subscriber.rb +++ b/lib/ferrum/browser/subscriber.rb @@ -3,15 +3,22 @@ module Ferrum class Browser class Subscriber - include Concurrent::Async - - def self.build(size) - (0..size).map { new } - end + INTERRUPTIONS = %w[Fetch.requestPaused Fetch.authRequired].freeze def initialize - super + @regular = Queue.new + @priority = Queue.new @on = Concurrent::Hash.new { |h, k| h[k] = Concurrent::Array.new } + + start + end + + def <<(message) + if INTERRUPTIONS.include?(message["method"]) + @priority.push(message) + else + @regular.push(message) + end end def on(event, &block) @@ -23,6 +30,33 @@ def subscribed?(event) @on.key?(event) end + def close + @regular_thread&.kill + @priority_thread&.kill + end + + private + + def start + @regular_thread = Utils::Thread.spawn(abort_on_exception: false) do + loop do + message = @regular.pop + break unless message + + call(message) + end + end + + @priority_thread = Utils::Thread.spawn(abort_on_exception: false) do + loop do + message = @priority.pop + break unless message + + call(message) + end + end + end + def call(message) method, params = message.values_at("method", "params") total = @on[method].size diff --git a/lib/ferrum/browser/web_socket.rb b/lib/ferrum/browser/web_socket.rb index 6954b6a3..97af3010 100644 --- a/lib/ferrum/browser/web_socket.rb +++ b/lib/ferrum/browser/web_socket.rb @@ -27,21 +27,7 @@ def initialize(url, max_receive_size, logger) @driver.on(:message, &method(:on_message)) @driver.on(:close, &method(:on_close)) - @thread = Thread.new do - Thread.current.abort_on_exception = true - Thread.current.report_on_exception = true if Thread.current.respond_to?(:report_on_exception=) - - begin - loop do - data = @sock.readpartial(512) - break unless data - - @driver.parse(data) - end - rescue EOFError, Errno::ECONNRESET, Errno::EPIPE - @messages.close - end - end + start @driver.start end @@ -86,6 +72,21 @@ def write(data) def close @driver.close end + + private + + def start + @thread = Utils::Thread.spawn do + loop do + data = @sock.readpartial(512) + break unless data + + @driver.parse(data) + end + rescue EOFError, Errno::ECONNRESET, Errno::EPIPE + @messages.close + end + end end end end diff --git a/lib/ferrum/context.rb b/lib/ferrum/context.rb index 954a7315..a22c583f 100644 --- a/lib/ferrum/context.rb +++ b/lib/ferrum/context.rb @@ -64,7 +64,7 @@ def add_target(params) end def update_target(target_id, params) - @targets[target_id].update(params) + @targets[target_id]&.update(params) end def delete_target(target_id) diff --git a/lib/ferrum/page/frames.rb b/lib/ferrum/page/frames.rb index 8c399ba2..04f07359 100644 --- a/lib/ferrum/page/frames.rb +++ b/lib/ferrum/page/frames.rb @@ -134,7 +134,7 @@ def subscribe_frame_stopped_loading end frame = @frames[params["frameId"]] - frame.state = :stopped_loading + frame&.state = :stopped_loading @event.set if idling? end diff --git a/lib/ferrum/proxy.rb b/lib/ferrum/proxy.rb index 62974db3..13049bc0 100644 --- a/lib/ferrum/proxy.rb +++ b/lib/ferrum/proxy.rb @@ -52,7 +52,7 @@ def rotate(host:, port:, user: nil, password: nil) end def stop - @file&.unlink + @file&.close(true) @server.shutdown end diff --git a/lib/ferrum/utils/thread.rb b/lib/ferrum/utils/thread.rb new file mode 100644 index 00000000..6f4fa072 --- /dev/null +++ b/lib/ferrum/utils/thread.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Ferrum + module Utils + module Thread + module_function + + def spawn(abort_on_exception: true) + ::Thread.new(abort_on_exception) do + ::Thread.current.abort_on_exception = abort_on_exception + ::Thread.current.report_on_exception = true if ::Thread.current.respond_to?(:report_on_exception=) + + yield + end + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9f94527b..4c944a29 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -46,7 +46,7 @@ end config.after(:all) do - @browser.quit + @browser&.quit end config.before(:each) do