Skip to content

Commit

Permalink
feat: get location automatically and make `is not writable the region…
Browse files Browse the repository at this point in the history
…` retryable
  • Loading branch information
joker1007 committed Nov 22, 2024
1 parent f458399 commit 9315b6c
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ script/
.idea/

fluentd-0.12

integration/log
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ Because embedded gem dependency sometimes restricts ruby environment.
| private_key_path | string | yes (private_key) | no | nil | GCP Private Key file path |
| private_key_passphrase | string | yes (private_key) | no | nil | GCP Private Key Passphrase |
| json_key | string | yes (json_key) | no | nil | GCP JSON Key file path or JSON Key string |
| location | string | no | no | nil | BigQuery Data Location. The geographic location of the job. Required except for US and EU. |
| project | string | yes | yes | nil | |
| dataset | string | yes | yes | nil | |
| table | string | yes (either `tables`) | yes | nil | |
Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/plugin/bigquery/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class Error < StandardError
RETRYABLE_ERROR_REASON = %w(backendError internalError rateLimitExceeded tableUnavailable).freeze
RETRYABLE_INSERT_ERRORS_REASON = %w(timeout backendError internalError rateLimitExceeded).freeze
RETRYABLE_STATUS_CODE = [500, 502, 503, 504]
REGION_NOT_WRITABLE_MESSAGE = -"is not writable in the region"

class << self
# @param e [Google::Apis::Error]
Expand All @@ -19,6 +20,10 @@ def wrap(e, message = nil)

# @param e [Google::Apis::Error]
def retryable_error?(e)
retryable_server_error?(e) || retryable_region_not_writable?(e)
end

def retryable_server_error?(e)
e.is_a?(Google::Apis::ServerError) && RETRYABLE_STATUS_CODE.include?(e.status_code)
end

Expand All @@ -30,6 +35,10 @@ def retryable_insert_errors_reason?(reason)
RETRYABLE_INSERT_ERRORS_REASON.include?(reason)
end

def retryable_region_not_writable?(e)
e.is_a?(Google::Apis::ClientError) && e.status_code == 400 && e.message.include?(REGION_NOT_WRITABLE_MESSAGE)
end

# Guard for instantiation
private :new
def inherited(subclass)
Expand Down
7 changes: 4 additions & 3 deletions lib/fluent/plugin/bigquery/writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil)
end
end
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
log.debug "insert error: #{e.message}", status_code: e.respond_to?(:status_code) ? e.status_code : nil, reason: e.respond_to?(:reason) ? e.reason : nil
error_data = { project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message }
wrapped = Fluent::BigQuery::Error.wrap(e)
if wrapped.retryable?
Expand All @@ -112,7 +113,7 @@ def insert_rows(project, dataset, table_id, rows, schema, template_suffix: nil)
raise wrapped
end

JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id) do
JobReference = Struct.new(:chunk_id, :chunk_id_hex, :project_id, :dataset_id, :table_id, :job_id, :location) do
def as_hash(*keys)
if keys.empty?
to_h
Expand Down Expand Up @@ -161,7 +162,7 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
upload_source: upload_source,
content_type: "application/octet-stream",
)
JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id)
JobReference.new(chunk_id, chunk_id_hex, project, dataset, table_id, res.job_reference.job_id, res.job_reference.location)
rescue Google::Apis::ServerError, Google::Apis::ClientError, Google::Apis::AuthorizationError => e
log.error "job.load API", project_id: project, dataset: dataset, table: table_id, code: e.status_code, message: e.message

Expand All @@ -175,7 +176,7 @@ def create_load_job(chunk_id, chunk_id_hex, project, dataset, table_id, upload_s
def fetch_load_job(job_reference)
project = job_reference.project_id
job_id = job_reference.job_id
location = @options[:location]
location = job_reference.location

res = client.get_job(project, job_id, location: location)
log.debug "load job fetched", id: job_id, state: res.status.state, **job_reference.as_hash(:project_id, :dataset_id, :table_id)
Expand Down
4 changes: 0 additions & 4 deletions lib/fluent/plugin/out_bigquery_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ class BigQueryBaseOutput < Output
config_param :private_key_path, :string, default: nil
config_param :private_key_passphrase, :string, default: 'notasecret', secret: true
config_param :json_key, default: nil, secret: true
# The geographic location of the job. Required except for US and EU.
# https://github.com/googleapis/google-api-ruby-client/blob/master/generated/google/apis/bigquery_v2/service.rb#L350
config_param :location, :string, default: nil

# see as simple reference
# https://github.com/abronte/BigQuery/blob/master/lib/bigquery.rb
Expand Down Expand Up @@ -135,7 +132,6 @@ def writer
private_key_path: @private_key_path, private_key_passphrase: @private_key_passphrase,
email: @email,
json_key: @json_key,
location: @location,
source_format: @source_format,
skip_invalid_rows: @skip_invalid_rows,
ignore_unknown_values: @ignore_unknown_values,
Expand Down
29 changes: 22 additions & 7 deletions test/plugin/test_out_bigquery_load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ def test_write
}
}
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end
end

Expand Down Expand Up @@ -118,7 +121,10 @@ def test_write_with_prevent_duplicate_load
},
job_reference: {project_id: 'yourproject_id', job_id: satisfy { |x| x =~ /fluentd_job_.*/}} ,
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end
end

Expand Down Expand Up @@ -155,10 +161,13 @@ def test_write_with_retryable_error
}
}
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end

mock(writer.client).get_job('yourproject_id', 'dummy_job_id', :location=>nil) do
mock(writer.client).get_job('yourproject_id', 'dummy_job_id', location: "us") do
stub! do |s|
s.id { 'dummy_job_id' }
s.configuration.stub! do |_s|
Expand Down Expand Up @@ -238,10 +247,13 @@ def test_write_with_not_retryable_error
}
}
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end

mock(writer.client).get_job('yourproject_id', 'dummy_job_id', :location=>nil) do
mock(writer.client).get_job('yourproject_id', 'dummy_job_id', location: "us") do
stub! do |s|
s.id { 'dummy_job_id' }
s.configuration.stub! do |_s|
Expand Down Expand Up @@ -318,7 +330,10 @@ def test_write_with_auto_create_table
}
}
}, upload_source: duck_type(:write, :sync, :rewind), content_type: "application/octet-stream") do
stub!.job_reference.stub!.job_id { "dummy_job_id" }
stub!.job_reference.stub! do |s|
s.job_id { "dummy_job_id" }
s.location { "us" }
end
end
end

Expand Down

0 comments on commit 9315b6c

Please sign in to comment.