Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow deployment options (chapter 16) #30

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions chapter16/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
K3S_VERSION=v1.31.4-k3s1
KUBECONFIG=.k3s/kubeconfig.yaml
12 changes: 0 additions & 12 deletions chapter16/.env.template

This file was deleted.

2 changes: 2 additions & 0 deletions chapter16/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# K3s generated config files
.k3s/*.yaml
4 changes: 4 additions & 0 deletions chapter16/.registryconfig/registries.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
mirrors:
"registry:5000":
endpoint:
- "http://registry:5000"
3 changes: 3 additions & 0 deletions chapter16/Dockerfile.dags-in-image
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM apache/airflow:2.9.3

COPY dags ${AIRFLOW_HOME}/dags/
7 changes: 7 additions & 0 deletions chapter16/Dockerfile.deps-in-image
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM apache/airflow:2.9.3

COPY dags-dependencies ${AIRFLOW_HOME}/dags/

COPY requirements.txt /requirements.txt

RUN pip install -r /requirements.txt
163 changes: 138 additions & 25 deletions chapter16/README.md
Original file line number Diff line number Diff line change
@@ -1,43 +1,156 @@
# Chapter 16 - Airflow on AWS
# Chapter 16

Code accompanying Chapter 16 of the book 'Data pipelines with Apache Airflow'.
Code accompanying Chapter 16 of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).

## Contents
The chapter explains the different options of deploying Airflow in Kubernetes. For executing these deployment commands a docker compose based Kubernetes cluster is available in this chapters `docker-compose.yml` provided with this repository. To start this cluster setup the following command can be used:

This code example contains the following files:
```bash
docker compose up -d
```

!! **This setup requires more resources so it is good to at least give docker 4 CPU and 8GB memory**

## More information

### Kubectl and helm

To work with the kubernetes cluster a separate container is available to execute `kubectl` and `helm` commands against the cluster. It is important to start this container with a so called `login shell` because we need the `kubectl` alias which provides the --server commandline option to connect to the k8s server.

```bash
docker exec -ti chapter16-k3s-cli-1 /bin/bash -l
```

#### K9s or local kubectl as an alternative

You could use [k9s](https://k9scli.io/) or install kubectl locally. To make sure you can connect to the k3s cluster you need to make use of the cluster config (`KUBECONFIG=.k3s/kubeconfig.yaml`).

### Deployment of default airflow in K8S

Inside the k3s-cli container we can deploy airflow with the following commands:

```bash
/enable-external-dns # make sure the other docker services can be reached from within the k3s cluster
helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace --set webserver.service.type=LoadBalancer
```

to verify the running services/pods we can check with the following command:

```bash
kubectl --namespace airflow get pods
```

access the webserver at http://localhost:8080 (or http://localhost:8081 if the webserver pod ends up being deployed at the agent node. This can be verified with the `kubectl --namespace airflow get pods -o wide` command). In the rest of this README we refer to the webserver as http://localhost:8080.


### 01 - Overriding the default user

In values/01-user-values.yaml we create a different admin user to have the same login as the other chapters and have a easy introduction in customizing your Airflow deployment

```bash
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --set webserver.service.type=LoadBalancer -f /etc/helm/values/01-user-values.yaml
```

You can verify that the Admin user is changed by logging in http://localhost:8080 with airflow/airflow and go to http://localhost:8080/users/userinfo/ to see the changed values.
You can also see that the affected pods (at least the webserver) have been changed (age is more recent)

```bash
kubectl --namespace airflow get pods
```

### 02 - Providing the webserver secret

In values/02-webserversecret-values.yaml we provide our own secret to prevent the UI warning about a non-static secret.

```bash
kubectl create secret generic my-webserver-secret --namespace airflow --from-literal="webserver-secret-key=$(python3 -c 'import secrets; print(secrets.token_hex(16))')"
```

```bash
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace --set webserver.service.type=LoadBalancer -f /etc/helm/values/02-webserversecret-values.yaml
```
├── Makefile # Makefile for helping run commands.
├── dags
│ ├── 01_aws_usecase.py # The actual DAG.
│ └── custom # Code supporting the DAG.
│ ├── __init__.py
│ ├── operators.py
│ └── hooks.py
├── docker
│ └── airflow-aws # Custom Airflow image with the required depedencies.
├── docker-compose.yml # Docker-compose file for Airflow.
├── readme.md # This file.
└── resources
└── stack.yml # CloudFormation template for AWS resources required for

You can verify this by logging in http://localhost:8080 with airflow/airflow.

### 03 - Using an external database

In values/03-external-database-values.yaml we configure the deployment to use an external database. This database is already provided in the docker compose file. The connnection info is provided to the helm chart via a kubernetes secret again.

```bash
kubectl create secret generic mydatabase --namespace airflow --from-literal=connection=postgresql://airflow:airflow@postgres:5432/airflow
```

```bash
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace --set webserver.service.type=LoadBalancer -f /etc/helm/values/03-external-database-values.yaml
```

```bash
kubectl delete statefulset airflow-postgresql --namespace airflow
```

## Usage
You can verify this by logging in http://localhost:8080 with airflow/airflow. (The original admin/admin user is not available anymore)

To get started with the code example, head over to the CloudFormation section in the AWS Console and use the provided CloudFormation template (*resources/stack.yml*) to create the required AWS resources. See the description in the Chapter for more details how to do so, if you're not yet familiar with the process.

Once the CloudFormation stack has been created, rename the file .env.template to .env and enter the details of the created resources. You should be able to get the bucket + crawler names from the CloudFormation stack resources tab. Don't forget to also create an access key/secret for the created user and include this in the .env file too.
### 04 - Dag Deployment options

Once this is all set up, you can start Airflow using:
#### 04a - Baking the DAGS in the airflow image

In values/04-dags-in-image-values.yaml we configure the deployment to use an custom container image. This image contains the dag files which are added during building the image. The image was pushed to the registry (available in docker compose) so it can be pulled by the helm deployment.

```bash
# on your local machine
./publish-custom-images.sh
```
docker-compose up --build

```bash
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace --set webserver.service.type=LoadBalancer -f /etc/helm/values/04-dags-in-image-values.yaml
```

Once you're done, you can tear down Airflow using:
Now when you log in http://localhost:8080 with airflow/airflow, you can see the dag `01_dag_in_image` being available.

#### 04b - DAGS in persistent volume

In values/04-dags-in-persistent-vol-values.yaml we configure the deployment to use an persistent volume. This volume contains the dag files and is used by all airflow services.

First we need to create the persistent volume and a volume claim.

```bash
kubectl -n airflow apply -f /etc/helm/values/dag-pvc.yaml
```
docker compose down -v

Then we can update the deployment to make use of this persistent volume

```bash
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace --set webserver.service.type=LoadBalancer -f /etc/helm/values/04-dags-in-persistent-vol-values.yaml
```

Now when you log in http://localhost:8080 with airflow/airflow, you can see the dags `02_teamA_dag_from_pvc` and `02_teamB_dag_from_pvc` being available.

#### 04c - DAGS in a git repository

In values/04-dags-in-git-values.yaml we configure the deployment to use a git sync sidecar container to sync the dags from a git repository. For this example we use the dags from chapter02.

We can update the deployment to make use of this method

```bash
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace --set webserver.service.type=LoadBalancer -f /etc/helm/values/04-dags-in-git-values.yaml
```

Now when you log in http://localhost:8080 with airflow/airflow, you can see the dags `02_teamA_dag_from_pvc` and `02_teamB_dag_from_pvc` being available.

### 05 - Dag dependencies

#### 05a - Baking the dependencies in the airflow image

In values/05-dependencies-in-image-values.yaml we configure the deployment to use an custom container image. This image contains the dag dependency libraries which are added during building the image. The image was pushed to the registry (available in docker compose) so it can be pulled by the helm deployment.

```bash
# on your local machine
./publish-custom-images.sh
```

```bash
helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace --set webserver.service.type=LoadBalancer -f /etc/helm/values/05-dependencies-in-image-values.yaml
```

Don't forget to clean up your AWS resources by deleting the created stack.
Now when you log in http://localhost:8080 with airflow/airflow, you can see the dag `01_dag_dependencies_in_image` being available. The version task should succeed and print the tensorflow version in the logs
2 changes: 2 additions & 0 deletions chapter16/bash_aliases.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alias kubectl='kubectl -s https://k3s-server:6443 "${@}"'
alias helm='helm --kube-apiserver https://k3s-server:6443 "${@}"'
104 changes: 104 additions & 0 deletions chapter16/compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
---
services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: airflow
PGUSER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready"]
interval: 10s
retries: 5
start_period: 5s
restart: always

k3s-server:
image: "rancher/k3s:${K3S_VERSION:-latest}"
command:
- server
- "--tls-san"
- "k3s-server"
tmpfs:
- /run
- /var/run
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
privileged: true
restart: always
environment:
- K3S_NODE_NAME=k3s-server
- K3S_TOKEN=26576147927211
- K3S_KUBECONFIG_OUTPUT=/output/kubeconfig.yaml
- K3S_KUBECONFIG_MODE=666
volumes:
- k3s-server:/var/lib/rancher/k3s
# This is just so that we get the kubeconfig file out
- .k3s:/output
- .registryconfig/registries.yaml:/etc/rancher/k3s/registries.yaml
- type: bind
source: dags-pvc
target: /data
bind:
propagation: shared
ports:
- 6443:6443 # Kubernetes API Server
- 80:80 # Ingress controller port 80
- 443:443 # Ingress controller port 443
- 8080:8080 # Airflow web UI

k3s-agent:
image: "rancher/k3s:${K3S_VERSION:-latest}"
tmpfs:
- /run
- /var/run
ulimits:
nproc: 65535
nofile:
soft: 65535
hard: 65535
privileged: true
restart: always
environment:
- K3S_NODE_NAME=k3s-agent
- K3S_URL=https://k3s-server:6443
- K3S_TOKEN=26576147927211
ports:
- 8081:8080 # Airflow web UI if loadbalancer is on agent server
volumes:
- .registryconfig/registries.yaml:/etc/rancher/k3s/registries.yaml
- type: bind
source: dags-pvc
target: /data
bind:
propagation: shared

k3s-cli:
image: alpine/k8s:1.32.0
tty: true
privileged: true
environment:
- KUBECONFIG=/input/kubeconfig.yaml
volumes:
- .k3s:/input
- ./enable-external-dns.sh:/enable-external-dns
- ./bash_aliases.sh:/etc/profile.d/bash_aliases.sh
- ./values:/etc/helm/values

registry:
image: registry:2
ports:
- 3632:5000
volumes:
- container-registry-volume:/var/lib/registry

volumes:
k3s-server:
postgres-db-volume:
container-registry-volume:
24 changes: 24 additions & 0 deletions chapter16/dags-dependencies/01_dag_dependencies_in_image.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""DAG demonstrating the umbrella use case with empty operators."""

import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator


def _tf_version():
import tensorflow as tf
print("TensorFlow version:", tf.__version__)

with DAG(
dag_id="01_dag_dependencies_in_image",
description="Dag dependencies in custom image example.",
start_date=pendulum.today("UTC").add(days=-5),
schedule="@daily",
):
some_init_task = EmptyOperator(task_id="init")
version = PythonOperator(task_id="version", python_callable=_tf_version)
finish = EmptyOperator(task_id="finish")

# Set dependencies between all tasks
some_init_task >> version >> finish
22 changes: 22 additions & 0 deletions chapter16/dags-pvc/teamA/02_teamA_dag_from_pvc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""DAG demonstrating the umbrella use case with empty operators."""

import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
dag_id="02_teamA_dag_from_pvc",
description="Dag persistence in PVC example.",
start_date=pendulum.today("UTC").add(days=-5),
schedule="@daily",
):
teamA_init = EmptyOperator(task_id="teamA_init")

echo_some = BashOperator(
task_id="echo_some",
bash_command='echo "Hello teamA from $(hostname)"', # noqa: E501
)

# Set dependencies between all tasks
teamA_init >> echo_some
22 changes: 22 additions & 0 deletions chapter16/dags-pvc/teamB/02_teamB_dag_from_pvc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""DAG demonstrating the umbrella use case with empty operators."""

import pendulum
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator

with DAG(
dag_id="02_teamB_dag_from_pvc",
description="Dag persistence in PVC example.",
start_date=pendulum.today("UTC").add(days=-5),
schedule="@daily",
):
teamB_init = EmptyOperator(task_id="teamB_init")

echo_some = BashOperator(
task_id="echo_some",
bash_command='echo "Hello teamB from $(hostname)"', # noqa: E501
)

# Set dependencies between all tasks
teamB_init >> echo_some
Loading