Skip to content

Commit

Permalink
BigQuery Worker Wait Adjustments (#515)
Browse files Browse the repository at this point in the history
* Setting a base sleep for 5 seconds and if it's not done and there's no error then enqueue to check again in 30 seconds. Any kind of exponential backoff or wait period here is unnecessary and duplicative. Just let the queue handle the wait if the response is not more or less immediate (within 5 seconds).

* Adds location param to the BigQuery get_job method as it appears to be looking at multiple regions at times and this results in an error for the region where the job does not exist thus confusing the current logic.

---------

Co-authored-by: Robert McMahan <[email protected]>
  • Loading branch information
blackwire and Robert McMahan authored Aug 20, 2024
1 parent 32fe53d commit 26d1bd6
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 31 deletions.
11 changes: 7 additions & 4 deletions backend/jobs/workers/bigquery/bq_waiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ class BQWaiter(bq_worker.BQWorker):

def _execute(self):
client = self._get_client()
job = client.get_job(self._params['job_id'])
if job.error_result is not None:
job = client.get_job(
job_id=self._params['job_id'],
location=self._params['location'])
if job.error_result:
raise worker.WorkerException(job.error_result['message'])
if job.state != 'DONE':
self._enqueue('BQWaiter', {'job_id': self._params['job_id']}, 60)
if not job.done():
self.log_info(f'Current BigQuery job state: {job.state}')
self._enqueue('BQWaiter', {'job_id': job.job_id, 'location': job.location}, 60)
else:
self.log_info('Finished successfully!')
21 changes: 6 additions & 15 deletions backend/jobs/workers/bigquery/bq_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
# Param name used to specify a BQ project ID
BQ_PROJECT_ID_PARAM_NAME = 'bq_project_id'

# Param name used to specify a BQ data set name
# Param name used to specify a BQ dataset name
BQ_DATASET_NAME_PARAM_NAME = 'bq_dataset_id'

# Param name used to specify a BQ table name
Expand All @@ -50,8 +50,7 @@ def _get_client(self):
client_info = ClientInfo(user_agent='cloud-solutions/crmint-usage-v3')
return bigquery.Client(
client_options={'scopes': self._SCOPES},
client_info=client_info,
)
client_info=client_info)

def _get_prefix(self):
return f'{self._pipeline_id}_{self._job_id}_{self.__class__.__name__}'
Expand All @@ -68,16 +67,8 @@ def _generate_qualified_bq_table_name(self):

def _wait(self, job):
"""Waits for job completion and relays to BQWaiter if it takes too long."""
delay = 5
waiting_time = 5
time.sleep(delay)
while not job.done():
if waiting_time > 300: # Once 5 minutes have passed, spawn BQWaiter.
self._enqueue('BQWaiter', {'job_id': job.job_id}, 60)
return
if delay < 30:
delay = [5, 10, 15, 20, 30][int(waiting_time / 60)]
time.sleep(delay)
waiting_time += delay
if job.error_result is not None:
time.sleep(5)
if job.error_result:
raise worker.WorkerException(job.error_result['message'])
if not job.done():
self._enqueue('BQWaiter', {'job_id': job.job_id, 'location': job.location}, 30)
10 changes: 7 additions & 3 deletions backend/tests/jobs/unit/workers/bq_waiter_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ def test_execute_job_with_status(self, job_status, enqueue_called):
'logger_credentials': _make_credentials(),
}
worker_inst = bq_waiter.BQWaiter(
{'job_id': 'JOBID',}, 1, 1, **logging_creds)
{'job_id': 'JOBID', 'location': 'US'}, 1, 1, **logging_creds)
mock_job = mock.create_autospec(
bigquery.job.QueryJob, instance=True, spec_set=True)
mock_job.job_id = 'JOBID'
mock_job.location = 'US'
mock_job.error_result = None
mock_job.state = job_status
mock_job.done.return_value = not enqueue_called
mock_client = mock.create_autospec(
bigquery.Client, instance=True, spec_set=True)
mock_client.get_job.return_value = mock_job
Expand All @@ -52,10 +55,11 @@ def test_execute_job_with_status(self, job_status, enqueue_called):
spec_set=True))
self.enter_context(mock.patch.object(worker_inst, '_log', autospec=True))
worker_inst._execute()
mock_client.get_job.assert_called_with(job_id='JOBID', location='US')
if enqueue_called:
patched_enqueue.assert_called_once()
self.assertEqual(patched_enqueue.call_args[0][0], 'BQWaiter')
self.assertEqual(patched_enqueue.call_args[0][1], {'job_id': 'JOBID'})
self.assertEqual(patched_enqueue.call_args[0][1], {'job_id': 'JOBID', 'location': 'US'})
else:
patched_enqueue.assert_not_called()

Expand All @@ -76,7 +80,7 @@ def test_job_error_raises_worker_exception(self):
autospec=True,
spec_set=True))
worker_inst = bq_waiter.BQWaiter(
{'job_id': 'JOBID',}, 1, 1,
{'job_id': 'JOBID', 'location': 'US'}, 1, 1,
logger_project='PROJECT',
logger_credentials=_make_credentials())
worker_inst._execute()
Expand Down
18 changes: 9 additions & 9 deletions backend/tests/jobs/unit/workers/bq_worker_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,24 @@ def test_enqueues_bqwaiter_after_some_time(self, job_is_done, enqueue_called):
}
worker_inst = bq_worker.BQWorker(
{'bq_project_id': 'BQID'}, 1, 1, **logging_creds)
mock_client = mock.create_autospec(
bigquery.Client, instance=True, spec_set=False)
mock_client.project = 'PROJECT'
job = bigquery.job.QueryJob('JOBID', 'query', mock_client)
self.enter_context(
mock.patch.object(job, 'done', return_value=job_is_done, autospec=True))
mock_job = mock.create_autospec(
bigquery.job.QueryJob, instance=True, spec_set=False)
mock_job.done.return_value = job_is_done
mock_job.job_id = 'JOBID'
mock_job.location = 'US'
mock_job.error_result = None
patched_enqueue = self.enter_context(
mock.patch.object(
worker_inst,
'_enqueue',
return_value=True,
autospec=True,
spec_set=True))
worker_inst._wait(job)
worker_inst._wait(mock_job)
if enqueue_called:
patched_enqueue.assert_called_once()
self.assertEqual(patched_enqueue.call_args[0][0], 'BQWaiter')
self.assertEqual(patched_enqueue.call_args[0][1], {'job_id': 'JOBID'})
self.assertEqual(patched_enqueue.call_args[0][1], {'job_id': 'JOBID', 'location': 'US'})
else:
patched_enqueue.assert_not_called()

Expand Down Expand Up @@ -93,7 +93,7 @@ class BQWorkerGetClientTest(parameterized.TestCase):
'client_info_user_agent': 'cloud-solutions/crmint-usage-v3',
},
{
'report_usage_id_present': False,
'report_usage_id_present': False,
'client_info_user_agent': None
},
)
Expand Down

0 comments on commit 26d1bd6

Please sign in to comment.