Skip to content

Commit

Permalink
Merge branch 'apache:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulgoyal2987 authored Dec 6, 2024
2 parents 17acc46 + 7418b53 commit aea9b4f
Show file tree
Hide file tree
Showing 205 changed files with 4,878 additions and 2,249 deletions.
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,13 @@ repos:
pass_filenames: true
files: ^providers/src/airflow/providers/.*/(operators|transfers|sensors)/.*\.py$
additional_dependencies: [ 'rich>=12.4.4' ]
- id: update-providers-init-py
name: Update providers __init__.py files
entry: ./scripts/ci/pre_commit/update_providers_init.py
language: python
pass_filenames: true
files: ^providers/.*/__init__.py$|^providers/.*/provider.yaml$|^airflow_breeze/templates/PROVIDER__INIT__PY_TEMPLATE.py.jinja2^
additional_dependencies: ['rich>=12.4.4','requests']
- id: ruff
name: Run 'ruff' for extremely fast Python linting
description: "Run 'ruff' for extremely fast Python linting"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ARG PYTHON_BASE_IMAGE="python:3.9-slim-bookworm"
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.5
ARG AIRFLOW_UV_VERSION=0.5.6
ARG AIRFLOW_USE_UV="false"
ARG UV_HTTP_TIMEOUT="300"
ARG AIRFLOW_IMAGE_REPOSITORY="https://github.com/apache/airflow"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ RUN bash /scripts/docker/install_packaging_tools.sh; \
# Also use `force pip` label on your PR to swap all places we use `uv` to `pip`
ARG AIRFLOW_PIP_VERSION=24.3.1
# ARG AIRFLOW_PIP_VERSION="git+https://github.com/pypa/pip.git@main"
ARG AIRFLOW_UV_VERSION=0.5.5
ARG AIRFLOW_UV_VERSION=0.5.6
# TODO(potiuk): automate with upgrade check (possibly)
ARG AIRFLOW_PRE_COMMIT_VERSION="4.0.1"
ARG AIRFLOW_PRE_COMMIT_UV_VERSION="4.1.4"
Expand Down
7 changes: 4 additions & 3 deletions airflow/api_fastapi/core_api/datamodels/ui/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ class NodeResponse(BaseModel):
"""Node serializer for responses."""

children: list[NodeResponse] | None = None
id: str | None
id: str
is_mapped: bool | None = None
label: str | None = None
label: str
tooltip: str | None = None
setup_teardown_type: Literal["setup", "teardown"] | None = None
type: Literal["join", "sensor", "task", "task_group"]
type: Literal["join", "task", "asset_condition"]
operator: str | None = None


class StructureDataResponse(BaseModel):
Expand Down
17 changes: 9 additions & 8 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7782,19 +7782,15 @@ components:
- type: 'null'
title: Children
id:
anyOf:
- type: string
- type: 'null'
type: string
title: Id
is_mapped:
anyOf:
- type: boolean
- type: 'null'
title: Is Mapped
label:
anyOf:
- type: string
- type: 'null'
type: string
title: Label
tooltip:
anyOf:
Expand All @@ -7813,13 +7809,18 @@ components:
type: string
enum:
- join
- sensor
- task
- task_group
- asset_condition
title: Type
operator:
anyOf:
- type: string
- type: 'null'
title: Operator
type: object
required:
- id
- label
- type
title: NodeResponse
description: Node serializer for responses.
Expand Down
4 changes: 3 additions & 1 deletion airflow/api_fastapi/execution_api/datamodels/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from pydantic import Field

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.base import BaseModel, ConfigDict


class VariableResponse(BaseModel):
Expand All @@ -32,5 +32,7 @@ class VariableResponse(BaseModel):
class VariablePostBody(BaseModel):
"""Request body schema for creating variables."""

model_config = ConfigDict(extra="forbid")

value: str | None = Field(serialization_alias="val")
description: str | None = Field(default=None)
5 changes: 3 additions & 2 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ def string_lower_type(val):

ARG_ASSET_NAME = Arg(("--name",), default="", help="Asset name")
ARG_ASSET_URI = Arg(("--uri",), default="", help="Asset URI")
ARG_ASSET_ALIAS = Arg(("--alias",), default=False, action="store_true", help="Show asset alias")

ALTERNATIVE_CONN_SPECS_ARGS = [
ARG_CONN_TYPE,
Expand Down Expand Up @@ -978,13 +979,13 @@ class GroupCommand(NamedTuple):
name="list",
help="List assets",
func=lazy_load_command("airflow.cli.commands.remote_commands.asset_command.asset_list"),
args=(ARG_OUTPUT, ARG_VERBOSE, ARG_ASSET_LIST_COLUMNS),
args=(ARG_ASSET_ALIAS, ARG_OUTPUT, ARG_VERBOSE, ARG_ASSET_LIST_COLUMNS),
),
ActionCommand(
name="details",
help="Show asset details",
func=lazy_load_command("airflow.cli.commands.remote_commands.asset_command.asset_details"),
args=(ARG_ASSET_NAME, ARG_ASSET_URI, ARG_OUTPUT, ARG_VERBOSE),
args=(ARG_ASSET_ALIAS, ARG_ASSET_NAME, ARG_ASSET_URI, ARG_OUTPUT, ARG_VERBOSE),
),
ActionCommand(
name="materialize",
Expand Down
67 changes: 50 additions & 17 deletions airflow/cli/commands/remote_commands/asset_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
from sqlalchemy import select

from airflow.api.common.trigger_dag import trigger_dag
from airflow.api_fastapi.core_api.datamodels.assets import AssetResponse
from airflow.api_fastapi.core_api.datamodels.assets import AssetAliasSchema, AssetResponse
from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
from airflow.cli.simple_table import AirflowConsole
from airflow.models.asset import AssetModel, TaskOutletAssetReference
from airflow.models.asset import AssetAliasModel, AssetModel, TaskOutletAssetReference
from airflow.utils import cli as cli_utils
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.types import DagRunTriggeredByType
Expand All @@ -36,30 +36,51 @@

from sqlalchemy.orm import Session

from airflow.api_fastapi.core_api.base import BaseModel

log = logging.getLogger(__name__)


def _list_asset_aliases(args, *, session: Session) -> tuple[Any, type[BaseModel]]:
aliases = session.scalars(select(AssetAliasModel).order_by(AssetAliasModel.name))
return aliases, AssetAliasSchema


def _list_assets(args, *, session: Session) -> tuple[Any, type[BaseModel]]:
assets = session.scalars(select(AssetModel).order_by(AssetModel.name))
return assets, AssetResponse


@cli_utils.action_cli
@provide_session
def asset_list(args, *, session: Session = NEW_SESSION) -> None:
"""Display assets in the command line."""
assets = session.scalars(select(AssetModel).order_by(AssetModel.name))
if args.alias:
data, model_cls = _list_asset_aliases(args, session=session)
else:
data, model_cls = _list_assets(args, session=session)

def detail_mapper(asset: AssetModel) -> dict[str, Any]:
model = AssetResponse.model_validate(asset)
return model.model_dump(include=args.columns)
def detail_mapper(asset: Any) -> dict[str, Any]:
model = model_cls.model_validate(asset)
return model.model_dump(mode="json", include=args.columns)

AirflowConsole().print_as(
data=assets,
output=args.output,
mapper=detail_mapper,
)
AirflowConsole().print_as(data=data, output=args.output, mapper=detail_mapper)


@cli_utils.action_cli
@provide_session
def asset_details(args, *, session: Session = NEW_SESSION) -> None:
"""Display details of an asset."""
def _detail_asset_alias(args, *, session: Session) -> BaseModel:
if not args.name:
raise SystemExit("Required --name with --alias")
if args.uri:
raise SystemExit("Cannot use --uri with --alias")

alias = session.scalar(select(AssetAliasModel).where(AssetAliasModel.name == args.name))
if alias is None:
raise SystemExit(f"Asset alias with name {args.name} does not exist.")

return AssetAliasSchema.model_validate(alias)


def _detail_asset(args, *, session: Session) -> BaseModel:
if not args.name and not args.uri:
raise SystemExit("Either --name or --uri is required")

Expand All @@ -79,7 +100,19 @@ def asset_details(args, *, session: Session = NEW_SESSION) -> None:
if next(asset_it, None) is not None:
raise SystemExit(f"More than one asset exists with {select_message}.")

model_data = AssetResponse.model_validate(asset).model_dump()
return AssetResponse.model_validate(asset)


@cli_utils.action_cli
@provide_session
def asset_details(args, *, session: Session = NEW_SESSION) -> None:
"""Display details of an asset."""
if args.alias:
model = _detail_asset_alias(args, session=session)
else:
model = _detail_asset(args, session=session)

model_data = model.model_dump(mode="json")
if args.output in ["table", "plain"]:
data = [{"property_name": key, "property_value": value} for key, value in model_data.items()]
else:
Expand Down Expand Up @@ -118,7 +151,7 @@ def asset_materialize(args, *, session: Session = NEW_SESSION) -> None:

dagrun = trigger_dag(dag_id=dag_id, triggered_by=DagRunTriggeredByType.CLI, session=session)
if dagrun is not None:
data = [DAGRunResponse.model_validate(dagrun).model_dump()]
data = [DAGRunResponse.model_validate(dagrun).model_dump(mode="json")]
else:
data = []

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -15,3 +14,15 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from airflow import settings
from airflow.dag_processing.bundles.local import LocalDagBundle


class DagsFolderDagBundle(LocalDagBundle):
"""A bundle for the DAGs folder."""

def __init__(self, **kwargs):
super().__init__(local_folder=settings.DAGS_FOLDER, **kwargs)
11 changes: 9 additions & 2 deletions airflow/dag_processing/bundles/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,15 @@ def _ensure_version_in_bare_repo(self) -> None:
if not self._has_version(self.bare_repo, self.version):
raise AirflowException(f"Version {self.version} not found in the repository")

def __hash__(self) -> int:
return hash((self.name, self.get_current_version()))
def __repr__(self):
return (
f"<GitDagBundle("
f"name={self.name!r}, "
f"tracking_ref={self.tracking_ref!r}, "
f"subdir={self.subdir!r}, "
f"version={self.version!r}"
f")>"
)

def get_current_version(self) -> str:
return self.repo.head.commit.hexsha
Expand Down
29 changes: 14 additions & 15 deletions airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,6 @@
log = logging.getLogger(__name__)


def _find_orm_dags(dag_ids: Iterable[str], *, session: Session) -> dict[str, DagModel]:
"""Find existing DagModel objects from DAG objects."""
stmt = (
select(DagModel)
.options(joinedload(DagModel.tags, innerjoin=False))
.where(DagModel.dag_id.in_(dag_ids))
.options(joinedload(DagModel.schedule_asset_references))
.options(joinedload(DagModel.schedule_asset_alias_references))
.options(joinedload(DagModel.task_outlet_asset_references))
)
stmt = with_row_locks(stmt, of=DagModel, session=session)
return {dm.dag_id: dm for dm in session.scalars(stmt).unique()}


def _create_orm_dags(dags: Iterable[DAG], *, session: Session) -> Iterator[DagModel]:
for dag in dags:
orm_dag = DagModel(dag_id=dag.dag_id)
Expand Down Expand Up @@ -181,8 +167,21 @@ class DagModelOperation(NamedTuple):

dags: dict[str, DAG]

def find_orm_dags(self, *, session: Session) -> dict[str, DagModel]:
"""Find existing DagModel objects from DAG objects."""
stmt = (
select(DagModel)
.options(joinedload(DagModel.tags, innerjoin=False))
.where(DagModel.dag_id.in_(self.dags))
.options(joinedload(DagModel.schedule_asset_references))
.options(joinedload(DagModel.schedule_asset_alias_references))
.options(joinedload(DagModel.task_outlet_asset_references))
)
stmt = with_row_locks(stmt, of=DagModel, session=session)
return {dm.dag_id: dm for dm in session.scalars(stmt).unique()}

def add_dags(self, *, session: Session) -> dict[str, DagModel]:
orm_dags = _find_orm_dags(self.dags, session=session)
orm_dags = self.find_orm_dags(session=session)
orm_dags.update(
(model.dag_id, model)
for model in _create_orm_dags(
Expand Down
60 changes: 60 additions & 0 deletions airflow/example_dags/example_asset_with_watchers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Example DAG for demonstrating the usage of event driven scheduling using assets and triggers.
"""

from __future__ import annotations

import os
import tempfile

from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.standard.triggers.file import FileTrigger
from airflow.sdk.definitions.asset import Asset

file_path = tempfile.NamedTemporaryFile().name

with DAG(
dag_id="example_create_file",
catchup=False,
):

@task
def create_file():
with open(file_path, "w") as file:
file.write("This is an example file.\n")

chain(create_file())

trigger = FileTrigger(filepath=file_path, poke_interval=10)
asset = Asset("example_asset", watchers=[trigger])

with DAG(
dag_id="example_asset_with_watchers",
schedule=[asset],
catchup=False,
):

@task
def delete_file():
if os.path.exists(file_path):
os.remove(file_path) # Delete the file

chain(delete_file())
Loading

0 comments on commit aea9b4f

Please sign in to comment.