diff --git a/lib/octopus/proxy.rb b/lib/octopus/proxy.rb index c3bac619..bea1f6a6 100644 --- a/lib/octopus/proxy.rb +++ b/lib/octopus/proxy.rb @@ -1,9 +1,12 @@ require 'set' require 'octopus/slave_group' require 'octopus/load_balancing/round_robin' +require 'octopus/query_analysis' module Octopus class Proxy + include ::Octopus::QueryAnalysis + attr_accessor :config, :sharded CURRENT_MODEL_KEY = 'octopus.current_model'.freeze @@ -293,11 +296,11 @@ def method_missing(method, *args, &block) self.last_current_shard = current_shard clean_connection_proxy conn.send(method, *args, &block) - elsif should_send_queries_to_shard_slave_group?(method) + elsif should_send_queries_to_shard_slave_group?(method, args) send_queries_to_shard_slave_group(method, *args, &block) - elsif should_send_queries_to_slave_group?(method) + elsif should_send_queries_to_slave_group?(method, args) send_queries_to_slave_group(method, *args, &block) - elsif should_send_queries_to_replicated_databases?(method) + elsif should_send_queries_to_replicated_databases?(method, args) send_queries_to_selected_slave(method, *args, &block) else select_connection.send(method, *args, &block) @@ -337,16 +340,16 @@ def connected? @shards.any? { |_k, v| v.connected? } end - def should_send_queries_to_shard_slave_group?(method) - should_use_slaves_for_method?(method) && @shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present? + def should_send_queries_to_shard_slave_group?(method, args) + should_use_slaves_for_method?(method, args) && @shards_slave_groups.try(:[], current_shard).try(:[], current_slave_group).present? end def send_queries_to_shard_slave_group(method, *args, &block) send_queries_to_balancer(@shards_slave_groups[current_shard][current_slave_group], method, *args, &block) end - def should_send_queries_to_slave_group?(method) - should_use_slaves_for_method?(method) && @slave_groups.try(:[], current_slave_group).present? + def should_send_queries_to_slave_group?(method, args) + should_use_slaves_for_method?(method, args) && @slave_groups.try(:[], current_slave_group).present? end def send_queries_to_slave_group(method, *args, &block) @@ -431,8 +434,8 @@ def should_clean_connection_proxy?(method) end # Try to use slaves if and only if `replicated: true` is specified in `shards.yml` and no slaves groups are defined - def should_send_queries_to_replicated_databases?(method) - @replicated && method.to_s =~ /select/ && !block && !slaves_grouped? + def should_send_queries_to_replicated_databases?(method, args) + @replicated && select?(method, args) && !block && !slaves_grouped? end def current_model_replicated? @@ -458,8 +461,23 @@ def send_queries_to_selected_slave(method, *args, &block) # (3) It's a SELECT query # while ensuring that we revert `current_shard` from the selected slave to the (shard's) master # not to make queries other than SELECT leak to the slave. - def should_use_slaves_for_method?(method) - current_model_replicated? && method.to_s =~ /select/ + def should_use_slaves_for_method?(method, args) + current_model_replicated? && select?(method, args) + end + + # Given an ActiveRecord::Base.connection method and its arguments, determine if it is a single select query + # suitable to send to a slave. + def select?(method, args) + is_single_select = method.to_s =~ /select/ + unless is_single_select + if method.to_s =~ /execute/ + query = args.first + if query.kind_of? String + is_single_select = definitely_select_query?(query) && !possibly_multiple_queries?(query) + end + end + end + is_single_select end def slaves_grouped? diff --git a/lib/octopus/query_analysis.rb b/lib/octopus/query_analysis.rb new file mode 100644 index 00000000..14a16f4a --- /dev/null +++ b/lib/octopus/query_analysis.rb @@ -0,0 +1,16 @@ +module Octopus + module QueryAnalysis + # Given a mysql query string, determines if it is definitely a select query. Due to the simple regex used, it will + # sometimes miss detecting valid select queries, hence why it only determines if something is definitely a select. + def definitely_select_query?( str ) + str =~ /^\s*select/i + end + + # Given a mysql query string, determines if the string might contain multiple queries. + # We are simply checking if it contains a semi colon with non whitespace to the right of it, so this check will + # sometimes falsely detect a string containing one query as sometimes having multiple queries. + def possibly_multiple_queries?( str ) + str =~ /;.*\S+.*$/ + end + end +end \ No newline at end of file diff --git a/spec/octopus/replicated_slave_grouped_spec.rb b/spec/octopus/replicated_slave_grouped_spec.rb index 49148cb7..a7eb9744 100644 --- a/spec/octopus/replicated_slave_grouped_spec.rb +++ b/spec/octopus/replicated_slave_grouped_spec.rb @@ -88,4 +88,42 @@ end end end + + it 'should send to slave group when performing select via execute' do + OctopusHelper.using_environment :replicated_slave_grouped do + Cat.create!(:name => 'Thiago1') + Cat.create!(:name => 'Thiago2') + + Octopus.using(:slave_group => :slaves1) do + Octopus.using(:slave_group => :slaves2) do + count = ActiveRecord::Base.connection.execute('select count(*) from cats').to_a.flatten.first + expect(count).to eq(2) + end + count = ActiveRecord::Base.connection.execute('select count(*) from cats').to_a.flatten.first + expect(count).to eq(0) + end + end + end + + + it 'should only send to master when running insert via execute' do + OctopusHelper.using_environment :replicated_slave_grouped do + Octopus.using(:slave_group => :slaves1) do + Octopus.using(:slave_group => :slaves2) do + expect{ ActiveRecord::Base.connection.execute("insert into cats (name) values ('Thiago1')").to_a.flatten.first }.not_to raise_error + end + expect{ ActiveRecord::Base.connection.execute("insert into cats (name) values ('Thiago2')").to_a.flatten.first }.not_to raise_error + end + + Octopus.using(:slave_group => :slaves1) do + Octopus.using(:slave_group => :slaves2) do + expect(Cat.count).to eq(2) + end + expect(Cat.count).to eq(0) + end + end + end + + + end