diff --git a/ext/scheduler/airflow/__lib.py b/ext/scheduler/airflow/__lib.py index 7346a5bf9d..15242aa77e 100644 --- a/ext/scheduler/airflow/__lib.py +++ b/ext/scheduler/airflow/__lib.py @@ -69,8 +69,9 @@ def execute(self, context): class OptimusAPIClient: - def __init__(self, optimus_host): + def __init__(self, optimus_host, timeout): self.host = self._add_connection_adapter_if_absent(optimus_host) + self.timeout = timeout def _add_connection_adapter_if_absent(self, host): if host.startswith("http://") or host.startswith("https://"): @@ -88,7 +89,7 @@ def get_job_run(self, project_name: str, job_name: str, start_date: str, end_dat 'end_date': end_date, 'downstream_project_name': downstream_project_name, 'downstream_job_name': downstream_job_name - }) + }, timeout=self.timeout) self._raise_error_if_request_failed(response) return response.json() @@ -192,6 +193,7 @@ def __init__( upstream_optimus_project: str, upstream_optimus_namespace: str, upstream_optimus_job: str, + timeout: int, *args, **kwargs) -> None: kwargs['mode'] = kwargs.get('mode', 'reschedule') @@ -201,8 +203,8 @@ def __init__( self.upstream_optimus_project = upstream_optimus_project self.upstream_optimus_namespace = upstream_optimus_namespace self.upstream_optimus_job = upstream_optimus_job - self._optimus_client = OptimusAPIClient(optimus_hostname) - self._upstream_optimus_client = OptimusAPIClient(upstream_optimus_hostname) + self._optimus_client = OptimusAPIClient(optimus_hostname, timeout) + self._upstream_optimus_client = OptimusAPIClient(upstream_optimus_hostname, timeout) def poke(self, context): job_cron_iter = croniter(context.get("dag").schedule_interval, context.get('execution_date'))