Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terry.dry run mode #19

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dagfactory/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Module contains the version of dag-factory"""
__version__ = "0.17.1.post9"
__version__ = "0.17.1.post10"
265 changes: 145 additions & 120 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __init__(
self.default_config: Dict[str, Any] = deepcopy(default_config)

# pylint: disable=too-many-branches
def get_dag_params(self) -> Dict[str, Any]:
def get_dag_params(self, dry_run: bool = False) -> Dict[str, Any]:
"""
Merges default config with dag config, sets dag_id, and extropolates dag_start_date

Expand All @@ -131,142 +131,144 @@ def get_dag_params(self) -> Dict[str, Any]:
raise Exception("Failed to merge config with default config") from err
dag_params["dag_id"]: str = dag_params.get("dag_id_prefix", "") + self.dag_name

if dag_params.get("task_groups") and version.parse(
AIRFLOW_VERSION
) < version.parse("2.0.0"):
raise Exception("`task_groups` key can only be used with Airflow 2.x.x")

if (
utils.check_dict_key(dag_params, "schedule_interval")
and dag_params["schedule_interval"] == "None"
):
dag_params["schedule_interval"] = None

# Convert from 'dagrun_timeout_sec: int' to 'dagrun_timeout: timedelta'
if utils.check_dict_key(dag_params, "dagrun_timeout_sec"):
dag_params["dagrun_timeout"]: timedelta = timedelta(
seconds=dag_params["dagrun_timeout_sec"]
)
del dag_params["dagrun_timeout_sec"]
if not dry_run:

# Convert from 'end_date: Union[str, datetime, date]' to 'end_date: datetime'
if utils.check_dict_key(dag_params["default_args"], "end_date"):
dag_params["default_args"]["end_date"]: datetime = utils.get_datetime(
date_value=dag_params["default_args"]["end_date"],
timezone=dag_params["default_args"].get("timezone", "UTC"),
)
if dag_params.get("task_groups") and version.parse(
AIRFLOW_VERSION
) < version.parse("2.0.0"):
raise Exception("`task_groups` key can only be used with Airflow 2.x.x")

if utils.check_dict_key(dag_params["default_args"], "retry_delay_sec"):
dag_params["default_args"]["retry_delay"]: timedelta = timedelta(
seconds=dag_params["default_args"]["retry_delay_sec"]
)
del dag_params["default_args"]["retry_delay_sec"]
if (
utils.check_dict_key(dag_params, "schedule_interval")
and dag_params["schedule_interval"] == "None"
):
dag_params["schedule_interval"] = None

if utils.check_dict_key(dag_params["default_args"], "sla_secs"):
dag_params["default_args"]["sla"]: timedelta = timedelta(
seconds=dag_params["default_args"]["sla_secs"]
)
del dag_params["default_args"]["sla_secs"]

if utils.check_dict_key(dag_params["default_args"], "sla_miss_callback"):
if isinstance(dag_params["default_args"]["sla_miss_callback"], str):
dag_params["default_args"][
"sla_miss_callback"
]: Callable = import_string(
dag_params["default_args"]["sla_miss_callback"]
# Convert from 'dagrun_timeout_sec: int' to 'dagrun_timeout: timedelta'
if utils.check_dict_key(dag_params, "dagrun_timeout_sec"):
dag_params["dagrun_timeout"]: timedelta = timedelta(
seconds=dag_params["dagrun_timeout_sec"]
)
del dag_params["dagrun_timeout_sec"]

# Convert from 'end_date: Union[str, datetime, date]' to 'end_date: datetime'
if utils.check_dict_key(dag_params["default_args"], "end_date"):
dag_params["default_args"]["end_date"]: datetime = utils.get_datetime(
date_value=dag_params["default_args"]["end_date"],
timezone=dag_params["default_args"].get("timezone", "UTC"),
)

if utils.check_dict_key(dag_params["default_args"], "retry_delay_sec"):
dag_params["default_args"]["retry_delay"]: timedelta = timedelta(
seconds=dag_params["default_args"]["retry_delay_sec"]
)
del dag_params["default_args"]["retry_delay_sec"]

if utils.check_dict_key(dag_params["default_args"], "on_success_callback"):
if isinstance(dag_params["default_args"]["on_success_callback"], str):
dag_params["default_args"][
"on_success_callback"
]: Callable = import_string(
dag_params["default_args"]["on_success_callback"]
if utils.check_dict_key(dag_params["default_args"], "sla_secs"):
dag_params["default_args"]["sla"]: timedelta = timedelta(
seconds=dag_params["default_args"]["sla_secs"]
)
if utils.check_dict_key(dag_params["default_args"], "on_success_callback_kwargs"):
on_success_callback_kwargs = dag_params["default_args"]["on_success_callback_kwargs"]
del dag_params["default_args"]["sla_secs"]

if utils.check_dict_key(dag_params["default_args"], "sla_miss_callback"):
if isinstance(dag_params["default_args"]["sla_miss_callback"], str):
dag_params["default_args"][
"sla_miss_callback"
]: Callable = import_string(
dag_params["default_args"]["sla_miss_callback"]
)

if utils.check_dict_key(dag_params["default_args"], "on_success_callback"):
if isinstance(dag_params["default_args"]["on_success_callback"], str):
dag_params["default_args"][
"on_success_callback"
]: Callable = dag_params["default_args"]["on_success_callback"](**on_success_callback_kwargs)
del dag_params["default_args"]["on_success_callback_kwargs"]

if utils.check_dict_key(dag_params["default_args"], "on_failure_callback"):
if isinstance(dag_params["default_args"]["on_failure_callback"], str):
dag_params["default_args"][
"on_failure_callback"
]: Callable = import_string(
dag_params["default_args"]["on_failure_callback"]
)
if utils.check_dict_key(dag_params["default_args"], "on_failure_callback_kwargs"):
on_failure_callback_kwargs = dag_params["default_args"]["on_failure_callback_kwargs"]
]: Callable = import_string(
dag_params["default_args"]["on_success_callback"]
)
if utils.check_dict_key(dag_params["default_args"], "on_success_callback_kwargs"):
on_success_callback_kwargs = dag_params["default_args"]["on_success_callback_kwargs"]
dag_params["default_args"][
"on_success_callback"
]: Callable = dag_params["default_args"]["on_success_callback"](**on_success_callback_kwargs)
del dag_params["default_args"]["on_success_callback_kwargs"]

if utils.check_dict_key(dag_params["default_args"], "on_failure_callback"):
if isinstance(dag_params["default_args"]["on_failure_callback"], str):
dag_params["default_args"][
"on_failure_callback"
]: Callable = dag_params["default_args"]["on_failure_callback"](**on_failure_callback_kwargs)
del dag_params["default_args"]["on_failure_callback_kwargs"]

if utils.check_dict_key(dag_params["default_args"], "on_retry_callback"):
if isinstance(dag_params["default_args"]["on_retry_callback"], str):
dag_params["default_args"][
"on_retry_callback"
]: Callable = import_string(
dag_params["default_args"]["on_retry_callback"]
)
]: Callable = import_string(
dag_params["default_args"]["on_failure_callback"]
)
if utils.check_dict_key(dag_params["default_args"], "on_failure_callback_kwargs"):
on_failure_callback_kwargs = dag_params["default_args"]["on_failure_callback_kwargs"]
dag_params["default_args"][
"on_failure_callback"
]: Callable = dag_params["default_args"]["on_failure_callback"](**on_failure_callback_kwargs)
del dag_params["default_args"]["on_failure_callback_kwargs"]

if utils.check_dict_key(dag_params["default_args"], "on_retry_callback"):
if isinstance(dag_params["default_args"]["on_retry_callback"], str):
dag_params["default_args"][
"on_retry_callback"
]: Callable = import_string(
dag_params["default_args"]["on_retry_callback"]
)

if utils.check_dict_key(dag_params, "sla_miss_callback"):
if isinstance(dag_params["sla_miss_callback"], str):
dag_params["sla_miss_callback"]: Callable = import_string(
dag_params["sla_miss_callback"]
)
if utils.check_dict_key(dag_params, "sla_miss_callback"):
if isinstance(dag_params["sla_miss_callback"], str):
dag_params["sla_miss_callback"]: Callable = import_string(
dag_params["sla_miss_callback"]
)

if utils.check_dict_key(dag_params, "on_success_callback"):
if isinstance(dag_params["on_success_callback"], str):
dag_params["on_success_callback"]: Callable = import_string(
dag_params["on_success_callback"]
)
if utils.check_dict_key(dag_params, "on_success_callback_kwargs"):
on_success_callback_kwargs = dag_params["on_success_callback_kwargs"]
dag_params["on_success_callback"]: Callable = dag_params["on_success_callback"](
**on_success_callback_kwargs
if utils.check_dict_key(dag_params, "on_success_callback"):
if isinstance(dag_params["on_success_callback"], str):
dag_params["on_success_callback"]: Callable = import_string(
dag_params["on_success_callback"]
)
del dag_params["on_success_callback_kwargs"]
if utils.check_dict_key(dag_params, "on_success_callback_kwargs"):
on_success_callback_kwargs = dag_params["on_success_callback_kwargs"]
dag_params["on_success_callback"]: Callable = dag_params["on_success_callback"](
**on_success_callback_kwargs
)
del dag_params["on_success_callback_kwargs"]

if utils.check_dict_key(dag_params, "on_failure_callback"):
if isinstance(dag_params["on_failure_callback"], str):
dag_params["on_failure_callback"]: Callable = import_string(
dag_params["on_failure_callback"]
)
if utils.check_dict_key(dag_params, "on_failure_callback_kwargs"):
on_failure_callback_kwargs = dag_params["on_failure_callback_kwargs"]
dag_params["on_failure_callback"]: Callable = dag_params["on_failure_callback"](
**on_failure_callback_kwargs
if utils.check_dict_key(dag_params, "on_failure_callback"):
if isinstance(dag_params["on_failure_callback"], str):
dag_params["on_failure_callback"]: Callable = import_string(
dag_params["on_failure_callback"]
)
del dag_params["on_failure_callback_kwargs"]

if utils.check_dict_key(
dag_params, "on_success_callback_name"
) and utils.check_dict_key(dag_params, "on_success_callback_file"):
dag_params["on_success_callback"]: Callable = utils.get_python_callable(
dag_params["on_success_callback_name"],
dag_params["on_success_callback_file"],
)
if utils.check_dict_key(dag_params, "on_failure_callback_kwargs"):
on_failure_callback_kwargs = dag_params["on_failure_callback_kwargs"]
dag_params["on_failure_callback"]: Callable = dag_params["on_failure_callback"](
**on_failure_callback_kwargs
)
del dag_params["on_failure_callback_kwargs"]

if utils.check_dict_key(
dag_params, "on_failure_callback_name"
) and utils.check_dict_key(dag_params, "on_failure_callback_file"):
dag_params["on_failure_callback"]: Callable = utils.get_python_callable(
dag_params["on_failure_callback_name"],
dag_params["on_failure_callback_file"],
)
if utils.check_dict_key(
dag_params, "on_success_callback_name"
) and utils.check_dict_key(dag_params, "on_success_callback_file"):
dag_params["on_success_callback"]: Callable = utils.get_python_callable(
dag_params["on_success_callback_name"],
dag_params["on_success_callback_file"],
)

try:
# ensure that default_args dictionary contains key "start_date"
# with "datetime" value in specified timezone
dag_params["default_args"]["start_date"]: datetime = utils.get_datetime(
date_value=dag_params["default_args"]["start_date"],
timezone=dag_params["default_args"].get("timezone", "UTC"),
)
except KeyError as err:
raise Exception(f"{self.dag_name} config is missing start_date") from err
if utils.check_dict_key(
dag_params, "on_failure_callback_name"
) and utils.check_dict_key(dag_params, "on_failure_callback_file"):
dag_params["on_failure_callback"]: Callable = utils.get_python_callable(
dag_params["on_failure_callback_name"],
dag_params["on_failure_callback_file"],
)

try:
# ensure that default_args dictionary contains key "start_date"
# with "datetime" value in specified timezone
dag_params["default_args"]["start_date"]: datetime = utils.get_datetime(
date_value=dag_params["default_args"]["start_date"],
timezone=dag_params["default_args"].get("timezone", "UTC"),
)
except KeyError as err:
raise Exception(f"{self.dag_name} config is missing start_date") from err
return dag_params

@staticmethod
Expand Down Expand Up @@ -614,6 +616,29 @@ def set_dependencies(
] = tasks_and_task_groups_instances[dep]
source.set_upstream(dep)

def compile(self) -> Dict[str, Any]:
"""
Generates manifest of a dag in dictionary.

:returns: dict representing dag manifest
:type: Dict[str, Any]
"""
dag_params = self.get_dag_params(dry_run=True)
tasks: Dict[str, Dict[str, Any]] = dag_params["tasks"]

operator_defaults: Optional[Dict] = None
if utils.check_dict_key(dag_params, "operator_defaults"):
operator_defaults = dag_params.pop("operator_defaults")

# create dictionary to track tasks and set dependencies
for task_name, task_conf in tasks.items():
operator: str = task_conf["operator"]
# merge task configs with global task configs if there's any
if operator_defaults and operator in set(operator_defaults.keys()):
task_conf = merge_configs(task_conf, operator_defaults[operator])

return {dag_params['dag_id']: dag_params}

def build(self) -> Dict[str, Union[str, DAG]]:
"""
Generates a DAG from the DAG parameters.
Expand Down
Loading
Loading