Skip to content

Commit

Permalink
deferrable op finished
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelcv committed Aug 15, 2024
1 parent 2ad400f commit f2c3fa0
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 20 deletions.
6 changes: 3 additions & 3 deletions chapter08/dags/01_python.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import datetime as dt
from datetime import datetime, timedelta
import json
import logging
import os
Expand Down Expand Up @@ -69,8 +69,8 @@ def _get_with_pagination(session, url, params, batch_size=100):
with DAG(
dag_id="01_python",
description="Fetches ratings from the Movielens API using the Python Operator.",
start_date=dt.datetime(2023, 1, 1),
end_date=dt.datetime(2023, 1, 10),
start_date=datetime(2023, 1, 1),
end_date=datetime(2023, 1, 10),
schedule="@daily",
):

Expand Down
6 changes: 3 additions & 3 deletions chapter08/dags/02_hook.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import datetime as dt
from datetime import datetime, timedelta
import json
import logging
import os
Expand All @@ -10,8 +10,8 @@
with DAG(
dag_id="02_hook",
description="Fetches ratings from the Movielens API using a custom hook.",
start_date=dt.datetime(2023, 1, 1),
end_date=dt.datetime(2023, 1, 10),
start_date=datetime(2023, 1, 1),
end_date=datetime(2023, 1, 10),
schedule="@daily",
):

Expand Down
6 changes: 3 additions & 3 deletions chapter08/dags/03_operator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import datetime as dt
from datetime import datetime, timedelta

from airflow import DAG
from custom.operators import MovielensFetchRatingsOperator

with DAG(
dag_id="03_operator",
description="Fetches ratings from the Movielens API using a custom operator.",
start_date=dt.datetime(2023, 1, 1),
end_date=dt.datetime(2023, 1, 10),
start_date=datetime(2023, 1, 1),
end_date=datetime(2023, 1, 10),
schedule="@daily",
):
MovielensFetchRatingsOperator(
Expand Down
6 changes: 3 additions & 3 deletions chapter08/dags/04_sensor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import datetime as dt
from datetime import datetime, timedelta

from airflow import DAG
from custom.operators import MovielensFetchRatingsOperator
Expand All @@ -7,8 +7,8 @@
with DAG(
dag_id="04_sensor",
description="Fetches ratings from the Movielens API, with a custom sensor.",
start_date=dt.datetime(2023, 1, 1),
end_date=dt.datetime(2023, 1, 10),
start_date=datetime(2023, 1, 1),
end_date=datetime(2023, 1, 10),
schedule="@daily",
):
wait_for_ratings = MovielensRatingsSensor(
Expand Down
8 changes: 4 additions & 4 deletions chapter08/dags/05_deferrable_sensor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime as dt

from datetime import datetime, timedelta
from airflow import DAG
from custom.operators import MovielensFetchRatingsOperator

Expand All @@ -8,15 +7,16 @@
with DAG(
dag_id="05_deferrable_sensor",
description="Fetches ratings from the Movielens API, with a deferrable custom sensor.",
start_date=dt.datetime(2023, 1, 1),
end_date=dt.datetime(2023, 1, 10),
start_date=datetime(2023, 1, 1),
end_date=datetime(2023, 1, 10),
schedule="@daily",
):
wait_for_ratings = AwaitMovielensRatingsSensor(
task_id="wait_for_ratings",
conn_id="movielens",
start_date="{{data_interval_start | ds}}",
end_date="{{data_interval_end | ds}}",
timeout= timedelta(seconds=5),
)

fetch_ratings = MovielensFetchRatingsOperator(
Expand Down
7 changes: 3 additions & 4 deletions chapter08/dags/custom/deferrable_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self,
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date

self._timeout = kwargs.get('timeout')

def execute(self, context: Context) -> None:

Expand All @@ -41,7 +41,8 @@ def execute(self, context: Context) -> None:
start_date=self._start_date,
end_date=self._end_date,
),
method_name='execute_completed'
method_name='execute_completed',
timeout = self._timeout
)

def execute_completed(
Expand All @@ -50,7 +51,6 @@ def execute_completed(
event: dict[str, Any] | None = None,
) -> None:
print(f"Movie Ratings are Available! for {self._start_date}-{self._end_date}")
# TODO: How do we handle failure (e.g. timeout?) In the trigger?
return True


Expand Down Expand Up @@ -88,7 +88,6 @@ async def run(self):
fetch_records = True
while fetch_records:
try:
# TODO: Also implement async get_ratings?
next(hook.get_ratings(start_date=self._start_date, end_date=self._end_date, batch_size=1))
# If no StopIteration is raised, the request returned at least one record.
# This means that there are records for the given period, which we indicate
Expand Down

0 comments on commit f2c3fa0

Please sign in to comment.