Skip to content

Commit

Permalink
fixes airflow tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix committed Aug 13, 2023
1 parent a2b7d5b commit e455c8a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test_airflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ hashFiles('**/poetry.lock') }}-airflow-runner

- name: Install dependencies
run: poetry install --no-interaction --with airflow -E duckdb -E pyarrow -E cli
run: poetry install --no-interaction --with airflow -E duckdb -E pyarrow

- run: |
poetry run pytest tests/helpers/airflow_tests
Expand Down
37 changes: 28 additions & 9 deletions tests/helpers/airflow_tests/test_airflow_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from airflow import DAG
from airflow.decorators import dag
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType

import dlt
from dlt.common import pendulum
Expand Down Expand Up @@ -199,6 +202,7 @@ def dag_decomposed():
def test_run_with_retry() -> None:

retries = 2
now = pendulum.now()

@dlt.resource
def _fail_3():
Expand All @@ -223,9 +227,10 @@ def dag_fail_3():
tasks.add_run(pipeline_fail_3, _fail_3, trigger_rule="all_done", retries=0, provide_context=True)

dag_def: DAG = dag_fail_3()
# will fail on extract
ti = get_task_run(dag_def, "pipeline_fail_3.pipeline_fail_3", now)
with pytest.raises(PipelineStepFailed) as pip_ex:
dag_def.test()
# this is the way to get the exception from a task
ti._run_raw_task()
assert pip_ex.value.step == "extract"

@dag(
Expand All @@ -234,19 +239,20 @@ def dag_fail_3():
catchup=False,
default_args=default_args
)
def dag_fail_3():
def dag_fail_4():
# by default we do not retry extract so we fail
tasks = PipelineTasksGroup("pipeline_fail_3", retry_policy=DEFAULT_RETRY_BACKOFF, local_data_folder=TEST_STORAGE_ROOT, wipe_local_data=False)

pipeline_fail_3 = dlt.pipeline(
pipeline_name="pipeline_fail_3", dataset_name="mock_data_" + uniq_id(), destination="duckdb", credentials=":pipeline:")
tasks.add_run(pipeline_fail_3, _fail_3, trigger_rule="all_done", retries=0, provide_context=True)

dag_def: DAG = dag_fail_3()
dag_def: DAG = dag_fail_4()
ti = get_task_run(dag_def, "pipeline_fail_3.pipeline_fail_3", now)
# will fail on extract
with pytest.raises(PipelineStepFailed) as pip_ex:
retries = 2
dag_def.test()
ti._run_raw_task()
assert pip_ex.value.step == "extract"

@dag(
Expand All @@ -255,17 +261,18 @@ def dag_fail_3():
catchup=False,
default_args=default_args
)
def dag_fail_3():
def dag_fail_5():
# this will retry
tasks = PipelineTasksGroup("pipeline_fail_3", retry_policy=DEFAULT_RETRY_BACKOFF, retry_pipeline_steps=("load", "extract"), local_data_folder=TEST_STORAGE_ROOT, wipe_local_data=False)

pipeline_fail_3 = dlt.pipeline(
pipeline_name="pipeline_fail_3", dataset_name="mock_data_" + uniq_id(), destination="duckdb", credentials=":pipeline:")
tasks.add_run(pipeline_fail_3, _fail_3, trigger_rule="all_done", retries=0, provide_context=True)

dag_def: DAG = dag_fail_3()
dag_def: DAG = dag_fail_5()
ti = get_task_run(dag_def, "pipeline_fail_3.pipeline_fail_3", now)
retries = 2
dag_def.test()
ti._run_raw_task()
assert retries == 0


Expand Down Expand Up @@ -419,4 +426,16 @@ def dag_mixed():
assert counters_nst_tasks == counters_nst_tasks_par
assert pipeline_dag_serial.state["sources"] == pipeline_dag_mixed.state["sources"]

# TODO: here we create two pipelines in two separate task groups
# TODO: here we create two pipelines in two separate task groups


def get_task_run(dag_def: DAG, task_name: str, now: pendulum.DateTime) -> TaskInstance:
dag_def.create_dagrun(
state=DagRunState.RUNNING,
execution_date=now,
run_type=DagRunType.MANUAL,
data_interval=(now, now)
)
dag_def.run(start_date=now, run_at_least_once=True)
task_def = dag_def.task_dict[task_name]
return TaskInstance(task=task_def, execution_date=now)

0 comments on commit e455c8a

Please sign in to comment.