Skip to content

Commit

Permalink
feat: add slack sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Jan 17, 2024
1 parent ddde695 commit 813cd60
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 3 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ __pycache__

# Tempdir
.tmp

.env
2 changes: 2 additions & 0 deletions dags/luchtmeetnet_ingestion/.justfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set dotenv-load

PROJECT := "jasper-ginn-dagster"
ARTIFACT_STORE := "dags-areg-euw4-jgdag-prd"
DOCKER_BASE := "europe-west4-docker.pkg.dev/" + PROJECT + "/" + ARTIFACT_STORE
Expand Down
14 changes: 11 additions & 3 deletions dags/luchtmeetnet_ingestion/src/luchtmeetnet_ingestion/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import os
from importlib import metadata

from dagster import Definitions
from dagster import Definitions, EnvVar
from dagster_slack import SlackResource
from luchtmeetnet_ingestion.assets import air_quality_data
from luchtmeetnet_ingestion.IO.duckdb_io_manager import duckdb_parquet_io_manager
from luchtmeetnet_ingestion.IO.resources import LuchtMeetNetResource
from luchtmeetnet_ingestion.jobs import ingestion_job
from luchtmeetnet_ingestion.sensors import (
slack_message_on_failure,
slack_message_on_success,
)

try:
__version__ = metadata.version("luchtmeetnet_ingestion")
Expand All @@ -17,7 +22,8 @@

env_resources = {
"dev": shared_resources
| {"landing_zone": duckdb_parquet_io_manager.configured({"path": ".tmp/landing_zone"})},
| {"landing_zone": duckdb_parquet_io_manager.configured({"path": ".tmp/landing_zone"})}
| {"slack": SlackResource(token=EnvVar("DAGSTER_SECRET_SLACK_BOT_OAUTH_TOKEN"))},
"prd": shared_resources
| {
"landing_zone": duckdb_parquet_io_manager.configured(
Expand All @@ -28,7 +34,8 @@
"aws_endpoint": "storage.googleapis.com",
}
)
},
}
| {"slack": SlackResource(token=EnvVar("DAGSTER_SECRET_SLACK_BOT_OAUTH_TOKEN"))},
}

environment = os.getenv("ENVIRONMENT", "dev")
Expand All @@ -39,4 +46,5 @@
],
resources=env_resources[environment],
jobs=[ingestion_job],
sensors=[slack_message_on_failure, slack_message_on_success],
)
35 changes: 35 additions & 0 deletions dags/luchtmeetnet_ingestion/src/luchtmeetnet_ingestion/sensors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from dagster import (
DagsterRunStatus,
DefaultSensorStatus,
RunFailureSensorContext,
RunStatusSensorContext,
run_status_sensor,
)
from dagster_slack import SlackResource
from luchtmeetnet_ingestion.jobs import ingestion_job


def my_message_fn(slack: SlackResource, message: str) -> str:
slack.get_client().chat_postMessage(channel="#dagster-notifications", text=message)


@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
monitored_jobs=[ingestion_job],
default_status=DefaultSensorStatus.RUNNING,
)
def slack_message_on_success(context: RunStatusSensorContext, slack: SlackResource):
message = f"Job **{context.dagster_run.job_name}** succeeded!"
my_message_fn(slack, message)


@run_status_sensor(
run_status=DagsterRunStatus.FAILURE,
monitored_jobs=[ingestion_job],
default_status=DefaultSensorStatus.RUNNING,
)
def slack_message_on_failure(context: RunFailureSensorContext, slack: SlackResource):
message = (
f"Job **{context.dagster_run.job_name}** failed!" f"Error: {context.failure_event.message}"
)
my_message_fn(slack, message)
1 change: 1 addition & 0 deletions deployment/values.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ deployments:
envSecrets:
- name: gcs-access-key-id
- name: gcs-secret-access-key
- name: dagster-secret-slack-bot-oauth-token

annotations: {}
nodeSelector: {}
Expand Down

0 comments on commit 813cd60

Please sign in to comment.