diff --git a/docs/GettingStarted.md b/docs/GettingStarted.md index 224d18818bc..fbccb00ffdb 100644 --- a/docs/GettingStarted.md +++ b/docs/GettingStarted.md @@ -1847,6 +1847,21 @@ end | `on_error` | | `Proc` | Custom error handler invoked when a job raises an error. Provided `span` and `error` as arguments. Sets error on the span by default. Useful for ignoring transient errors. | `proc { \|span, error\| span.set_error(error) unless span.nil? }` | | `quantize` | | `Hash` | Hash containing options for quantization of job arguments. | `{}` | +#### Log Correlation + +To correlate Sidekiq logs with traces, you can configure [Sidekiq's logger](https://github.com/sidekiq/sidekiq/wiki/Logging) +to either use an [existing supported logger instance](#for-logging-in-rails-applications) or use Sidekiq's JSON logger formatter: + +```ruby +Sidekiq.configure_client do |config| + config.logger.formatter = Sidekiq::Logger::Formatters::JSON.new +end + +Sidekiq.configure_server do |config| + config.logger.formatter = Sidekiq::Logger::Formatters::JSON.new +end +``` + ### Sinatra The Sinatra integration traces requests and template rendering. diff --git a/lib/datadog/tracing/contrib/sidekiq/patcher.rb b/lib/datadog/tracing/contrib/sidekiq/patcher.rb index 3d30bc75a51..aa5ea002e03 100644 --- a/lib/datadog/tracing/contrib/sidekiq/patcher.rb +++ b/lib/datadog/tracing/contrib/sidekiq/patcher.rb @@ -35,9 +35,16 @@ def patch config.server_middleware do |chain| chain.add(Sidekiq::ServerTracer) + + # Patch late to ensure `Sidekiq::Processor` is loaded. + # Used for log correlation. + ::Sidekiq::Processor.prepend(Sidekiq::ServerTracer::Processor) end patch_server_internals if Integration.compatible_with_server_internal_tracing? + + # Patch for log correlation + ::Sidekiq::Logger::Formatters::JSON.prepend(Sidekiq::ServerTracer::JSONFormatter) end end diff --git a/lib/datadog/tracing/contrib/sidekiq/server_tracer.rb b/lib/datadog/tracing/contrib/sidekiq/server_tracer.rb index 6927fd5e8c7..b07284a76b7 100644 --- a/lib/datadog/tracing/contrib/sidekiq/server_tracer.rb +++ b/lib/datadog/tracing/contrib/sidekiq/server_tracer.rb @@ -18,18 +18,12 @@ class ServerTracer def initialize(options = {}) @sidekiq_service = options[:service_name] || configuration[:service_name] @on_error = options[:on_error] || configuration[:on_error] - @distributed_tracing = options[:distributed_tracing] || configuration[:distributed_tracing] @quantize = options[:quantize] || configuration[:quantize] end def call(worker, job, queue) resource = job_resource(job) - if @distributed_tracing - trace_digest = Sidekiq.extract(job) - Datadog::Tracing.continue_trace!(trace_digest) - end - Datadog::Tracing.trace( Ext::SPAN_JOB, service: @sidekiq_service, @@ -81,6 +75,48 @@ def propagation def configuration Datadog.configuration.tracing[:sidekiq] end + + # Since Sidekiq 5, the server logger runs before any middleware is run. + # (https://github.com/sidekiq/sidekiq/blob/40de8236e927d752fc1ec5d220f276a9b4b5c84b/lib/sidekiq/processor.rb#L135) + # Due of this, we cannot create a trace early enough using middlewares that allow log correlation to work + # A way around it is to create a TraceOperation early (and thus a `trace_id`), and let the middleware handle + # the span creation. + # This works because logs are correlated on the `trace_id`, not `span_id`. + module Processor + # Copy visibility from Sidekiq::Processor's class declaration, to ensure + # we are declaring `dispatch` with the correct visibility. Only applicable in testing mode. + # @see https://github.com/sidekiq/sidekiq/blob/40de8236e927d752fc1ec5d220f276a9b4b5c84b/lib/sidekiq/processor.rb#L68 + private if defined?($TESTING) && $TESTING # rubocop:disable Layout/EmptyLinesAroundAccessModifier, Style/GlobalVars + + # The main method used by Sidekiq to process jobs. + # The Sidekiq logger runs inside this method. + # @see Sidekiq::Processor#dispatch + def dispatch(*args, **kwargs, &block) + if Datadog.configuration.tracing[:sidekiq][:distributed_tracing] + trace_digest = Sidekiq.extract(args.first) rescue nil + end + + Datadog::Tracing.continue_trace!(trace_digest) + + super + end + end + + # Performs log correlation injecting for Sidekiq. + # Currently only supports Sidekiq's JSON formatter. + module JSONFormatter + SKIP_FIRST_STRING_CHAR = (1..-1).freeze + + def call(severity, time, program_name, message) + entry = super + + # Concatenate the correlation with the JSON string log entry, + # since there's no way to inject the correlation values into + # the original JSON. + correlation = ::Sidekiq.dump_json(Tracing.correlation.to_h) + "#{correlation.chop},#{entry[SKIP_FIRST_STRING_CHAR]}" + end + end end end end diff --git a/spec/datadog/tracing/contrib/sidekiq/distributed_tracing_spec.rb b/spec/datadog/tracing/contrib/sidekiq/distributed_tracing_spec.rb index 15d7575585b..1394c1772e2 100644 --- a/spec/datadog/tracing/contrib/sidekiq/distributed_tracing_spec.rb +++ b/spec/datadog/tracing/contrib/sidekiq/distributed_tracing_spec.rb @@ -5,32 +5,24 @@ require 'datadog/tracing/contrib/sidekiq/server_tracer' require 'sidekiq/testing' +require_relative 'support/helper' require_relative 'support/legacy_test_helpers' if Sidekiq::VERSION < '4' require 'sidekiq/api' RSpec.describe 'Sidekiq distributed tracing' do - around do |example| - Sidekiq::Testing.fake! do - Sidekiq::Testing.server_middleware.clear - Sidekiq::Testing.server_middleware do |chain| - chain.add(Datadog::Tracing::Contrib::Sidekiq::ServerTracer) - end + include_context 'Sidekiq server' - example.run - end - end - - after do - Datadog.configuration.tracing[:sidekiq].reset! - Sidekiq::Queues.clear_all - end - - let!(:empty_worker) do + before do stub_const( - 'EmptyWorker', + 'PropagationWorker', Class.new do include Sidekiq::Worker - def perform; end + def perform + # Save the trace digest in the job span for future inspection + data = {} + Datadog::Tracing::Contrib::Sidekiq.inject(Datadog::Tracing.active_trace.to_digest, data) + Datadog::Tracing.active_span.set_tag('digest', data.to_json) + end end ) end @@ -43,6 +35,12 @@ def perform; end end context 'when dispatching' do + before do + configure_sidekiq + Sidekiq::Testing.fake! + Sidekiq::Queues.clear_all + end + it 'propagates through serialized job' do EmptyWorker.perform_async @@ -65,59 +63,31 @@ def perform; end end end - context 'when receiving' do - let(:trace_id) { Datadog::Tracing::Utils::TraceId.next_id } - let(:span_id) { Datadog::Tracing::Utils.next_id } - let(:jid) { '123abc' } - - it 'continues trace from serialized job' do - Sidekiq::Queues.push( - EmptyWorker.queue, - EmptyWorker.to_s, - EmptyWorker.sidekiq_options.merge( - 'args' => [], - 'class' => EmptyWorker.to_s, - 'jid' => jid, - 'x-datadog-trace-id' => low_order_trace_id(trace_id).to_s, - 'x-datadog-parent-id' => span_id.to_s, - 'x-datadog-sampling-priority' => '2', - 'x-datadog-tags' => "_dd.p.dm=-99,_dd.p.tid=#{high_order_hex_trace_id(trace_id)}", - 'x-datadog-origin' => 'my-origin' - ) - ) - - EmptyWorker.perform_one - - expect(span.trace_id).to eq(trace_id) - expect(span.parent_id).to eq(span_id) - expect(span.service).to eq(tracer.default_service) - expect(span.resource).to eq('EmptyWorker') - expect(span.get_tag('sidekiq.job.queue')).to eq('default') - expect(span.status).to eq(0) - expect(span.get_tag('component')).to eq('sidekiq') - expect(span.get_tag('operation')).to eq('job') - expect(span.get_tag('span.kind')).to eq('consumer') - - expect(trace.send(:meta)['_dd.p.dm']).to eq('-99') - expect(trace.sampling_priority).to eq(2) - expect(trace.origin).to eq('my-origin') - end - end - context 'round trip' do it 'creates 2 spans for a distributed trace' do - EmptyWorker.perform_async - EmptyWorker.perform_one + expect_in_sidekiq_server do + Datadog::Tracing.trace('test setup') do |_span, trace| + trace.sampling_priority = 2 + trace.origin = 'my-origin' + trace.set_tag('_dd.p.dm', '-99') - expect(spans).to have(2).items + PropagationWorker.perform_async + end - job_span, push_span = spans + job_span = fetch_job_span + push_span = spans.find { |s| s.name == 'sidekiq.push' } - expect(push_span).to be_root_span - expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id')) + expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id')) - expect(job_span.trace_id).to eq(push_span.trace_id) - expect(job_span.parent_id).to eq(push_span.id) + expect(job_span.trace_id).to eq(push_span.trace_id) + expect(job_span.parent_id).to eq(push_span.id) + + digest = Datadog::Tracing::Contrib::Sidekiq.extract(JSON.parse(job_span.get_tag('digest'))) + + expect(digest.trace_distributed_tags['_dd.p.dm']).to eq('-99') + expect(digest.trace_sampling_priority).to eq(2) + expect(digest.trace_origin).to eq('my-origin') + end end end end @@ -130,6 +100,12 @@ def perform; end end context 'when dispatching' do + before do + configure_sidekiq + Sidekiq::Testing.fake! + Sidekiq::Queues.clear_all + end + it 'does not propagate through serialized job' do EmptyWorker.perform_async @@ -152,63 +128,34 @@ def perform; end end end - context 'when receiving' do - let(:trace_id) { Datadog::Tracing::Utils::TraceId.next_id } - let(:span_id) { Datadog::Tracing::Utils.next_id } - let(:jid) { '123abc' } - - it 'does not continue trace from serialized job' do - Sidekiq::Queues.push( - EmptyWorker.queue, - EmptyWorker.to_s, - EmptyWorker.sidekiq_options.merge( - 'args' => [], - 'class' => EmptyWorker.to_s, - 'jid' => jid, - 'x-datadog-trace-id' => trace_id.to_s, - 'x-datadog-parent-id' => span_id.to_s, - 'x-datadog-sampling-priority' => '2', - 'x-datadog-tags' => '_dd.p.dm=99', - 'x-datadog-origin' => 'my-origin' - ) - ) - - EmptyWorker.perform_one - - expect(span).to be_root_span - expect(span.trace_id).not_to eq(trace_id) - expect(span.parent_id).to eq(0) - expect(span.service).to eq(tracer.default_service) - expect(span.resource).to eq('EmptyWorker') - expect(span.get_tag('sidekiq.job.queue')).to eq('default') - expect(span.status).to eq(0) - expect(span.get_tag('component')).to eq('sidekiq') - expect(span.get_tag('operation')).to eq('job') - expect(span.get_tag('span.kind')).to eq('consumer') - - expect(trace.send(:meta)['_dd.p.dm']).to eq('-0') - expect(trace.sampling_priority).to eq(1) - expect(trace.origin).to be_nil - end - end - context 'round trip' do it 'creates 2 spans with separate traces' do - EmptyWorker.perform_async - EmptyWorker.perform_one + expect_in_sidekiq_server do + Datadog::Tracing.trace('test setup') do |_span, trace| + trace.sampling_priority = 2 + trace.origin = 'my-origin' + trace.set_tag('_dd.p.dm', '-99') + + PropagationWorker.perform_async + end + + job_span = fetch_job_span + push_span = spans.find { |s| s.name == 'sidekiq.push' } - expect(spans).to have(2).items + expect(push_span.trace_id).to_not eq(job_span.trace_id) + expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id')) - job_span, push_span = spans + expect(job_span.resource).to eq('PropagationWorker') - expect(push_span.trace_id).to_not eq(job_span.trace_id) - expect(push_span.get_tag('sidekiq.job.id')).to eq(job_span.get_tag('sidekiq.job.id')) + expect(job_span).to be_root_span + expect(job_span.resource).to eq('PropagationWorker') - expect(push_span).to be_root_span - expect(job_span.resource).to eq('EmptyWorker') + digest = Datadog::Tracing::Contrib::Sidekiq.extract(JSON.parse(job_span.get_tag('digest'))) - expect(job_span).to be_root_span - expect(job_span.resource).to eq('EmptyWorker') + expect(digest.trace_distributed_tags['_dd.p.dm']).to_not eq('-99') + expect(digest.trace_sampling_priority).to_not eq(2) + expect(digest.trace_origin).to_not eq('my-origin') + end end end end diff --git a/spec/datadog/tracing/contrib/sidekiq/logging_spec.rb b/spec/datadog/tracing/contrib/sidekiq/logging_spec.rb new file mode 100644 index 00000000000..74f06aec681 --- /dev/null +++ b/spec/datadog/tracing/contrib/sidekiq/logging_spec.rb @@ -0,0 +1,37 @@ +require 'datadog/tracing/contrib/support/spec_helper' +require_relative 'support/helper' + +RSpec.describe 'Sidekiq Logging' do + include_context 'Sidekiq server' + + before do + stub_const( + 'EmptyWorker', + Class.new do + include Sidekiq::Worker + def perform + logger.info('Running EmptyWorker') + end + end + ) + end + + it 'traces the looping job fetching' do + expect_in_sidekiq_server(log_level: Logger::INFO) do + EmptyWorker.perform_async + + span = try_wait_until { fetch_spans.find { |s| s.name == 'sidekiq.job' } } + + # Traces in propagation can get truncated to 64-bits by default + trace_id = Datadog::Tracing::Utils::TraceId.to_low_order(span.trace_id).to_s + stdout = File.read($stdout) + + expect(stdout).to match(/"trace_id":"#{trace_id}".*start/) + + expect(stdout).to match(/"trace_id":"#{trace_id}".*Running EmptyWorker/) + expect(stdout).to match(/"span_id":"#{span.id}".*Running EmptyWorker/) + + expect(stdout).to match(/"trace_id":"#{trace_id}".*done/) + end + end +end diff --git a/spec/datadog/tracing/contrib/sidekiq/support/helper.rb b/spec/datadog/tracing/contrib/sidekiq/support/helper.rb index 3627be072ff..4015541530c 100644 --- a/spec/datadog/tracing/contrib/sidekiq/support/helper.rb +++ b/spec/datadog/tracing/contrib/sidekiq/support/helper.rb @@ -20,8 +20,37 @@ def perform; end end end +# Non-testing Sidekiq Server, performing the same operations as the production deployment. +RSpec.shared_context 'Sidekiq server' do + include SidekiqServerExpectations + + before do + unless Datadog::Tracing::Contrib::Sidekiq::Integration.compatible_with_server_internal_tracing? + skip 'Sidekiq internal server tracing is not supported on this version.' + end + + skip 'Sidekiq server requires forking' unless Process.respond_to?(:fork) + + # Fetches block for 2 seconds when there is nothing in the queue: + # https://github.com/mperham/sidekiq/blob/v6.2.2/lib/sidekiq/fetch.rb#L7-L9 + # https://redis.io/commands/blpop#blocking-behavior + # + # We change the constant here to ensure test runs as fast possible. + require 'sidekiq/fetch' # Require late, as this is not available if `compatible_with_server_internal_tracing?` is false + stub_const('Sidekiq::BasicFetch::TIMEOUT', 0.0011) # A timeout lower than 0.0011 is rounded to zero + + stub_const( + 'EmptyWorker', + Class.new do + include Sidekiq::Worker + def perform; end + end + ) + end +end + module SidekiqTestingConfiguration - def configure_sidekiq + def configure_sidekiq(log_level: Logger::ERROR) Datadog.configure do |c| c.tracing.instrument :sidekiq end @@ -33,10 +62,14 @@ def configure_sidekiq Sidekiq.configure_client do |config| config.redis = { url: redis_url } + config.logger.level = log_level + config.logger.formatter = Sidekiq::Logger::Formatters::JSON.new end Sidekiq.configure_server do |config| config.redis = { url: redis_url } + config.logger.level = log_level + config.logger.formatter = Sidekiq::Logger::Formatters::JSON.new end Sidekiq::Testing.inline! @@ -46,7 +79,10 @@ def configure_sidekiq module SidekiqServerExpectations include SidekiqTestingConfiguration - def expect_in_sidekiq_server(wait_until: nil) + def expect_in_sidekiq_server( + wait_until: -> { fetch_spans.any? { |s| s.name == 'sidekiq.job_fetch' } }, + log_level: Logger::ERROR + ) app_tempfile = Tempfile.new(['sidekiq-server-app', '.rb']) expect_in_fork do @@ -59,7 +95,9 @@ def expect_in_sidekiq_server(wait_until: nil) require 'sidekiq/cli' - configure_sidekiq + configure_sidekiq(log_level: log_level) + + Sidekiq::Testing.disable! # Ensure the real Sidekiq server is used to process the job t = Thread.new do cli = Sidekiq::CLI.instance @@ -90,6 +128,10 @@ def expect_in_sidekiq_server(wait_until: nil) app_tempfile.unlink end + def fetch_job_span + try_wait_until { fetch_spans.find { |s| s.name == 'sidekiq.job' } } + end + def expect_after_stopping_sidekiq_server expect_in_fork do # NB: This is needed because we want to patch within a forked process.