diff --git a/dlt/helpers/airflow_helper.py b/dlt/helpers/airflow_helper.py index aaa19ea97d..6b95a810da 100644 --- a/dlt/helpers/airflow_helper.py +++ b/dlt/helpers/airflow_helper.py @@ -18,7 +18,7 @@ from airflow.configuration import conf from airflow.models import TaskInstance from airflow.utils.task_group import TaskGroup - from airflow.operators.dummy import DummyOperator + from airflow.operators.empty import EmptyOperator from airflow.operators.python import PythonOperator, get_current_context except ModuleNotFoundError: raise MissingDependencyException("Airflow", ["apache-airflow>=2.5"]) @@ -448,10 +448,10 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator tasks = [] sources = data.decompose("scc") t_name = self._task_name(pipeline, data) - start = make_task(pipeline, sources[0]) + start = EmptyOperator(task_id=f"{t_name}_start") # parallel tasks - for source in sources[1:]: + for source in sources: for resource in source.resources.values(): if resource.incremental: logger.warn( @@ -466,7 +466,7 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator tasks.append(make_task(pipeline, source)) - end = DummyOperator(task_id=f"{t_name}_end") + end = EmptyOperator(task_id=f"{t_name}_end") if tasks: start >> tasks >> end @@ -485,14 +485,10 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator tasks = [] naming = SnakeCaseNamingConvention() sources = data.decompose("scc") - start = make_task( - pipeline, - sources[0], - naming.normalize_identifier(self._task_name(pipeline, sources[0])), - ) + start = EmptyOperator(task_id=f"{t_name}_start") # parallel tasks - for source in sources[1:]: + for source in sources: # name pipeline the same as task new_pipeline_name = naming.normalize_identifier( self._task_name(pipeline, source) @@ -500,7 +496,7 @@ def make_task(pipeline: Pipeline, data: Any, name: str = None) -> PythonOperator tasks.append(make_task(pipeline, source, new_pipeline_name)) t_name = self._task_name(pipeline, data) - end = DummyOperator(task_id=f"{t_name}_end") + end = EmptyOperator(task_id=f"{t_name}_end") if tasks: start >> tasks >> end