Skip to content

Commit

Permalink
refactor: decouple hook from gcp metric
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Mar 21, 2024
1 parent a78c350 commit 4edcf32
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 61 deletions.
28 changes: 21 additions & 7 deletions dags/luchtmeetnet_ingestion/src/luchtmeetnet_ingestion/jobs.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
import os
import typing

from dagster import define_asset_job, multiprocess_executor
from dagster_utils.factories.hooks import gcp_metric_job_success_hook_factory
from dagster import HookContext, define_asset_job, multiprocess_executor
from dagster_utils.factories.hooks import metric_job_success_hook_factory
from luchtmeetnet_ingestion.assets import air_quality_data, daily_air_quality_data
from luchtmeetnet_ingestion.partitions import daily_partition, daily_station_partition

environment = os.getenv("ENVIRONMENT", "dev")


gcp_metric_on_success = gcp_metric_job_success_hook_factory(
def post_metric(context: HookContext, value: int, labels: typing.Dict[str, str]):
context.log.debug(labels)
if environment == "dev":
context.log.info("Skipping posting metric in dev environment")
else:
context.resources.gcp_metrics.post_time_series(
series_type="custom.googleapis.com/dagster/job_success",
value={"bool_value": value},
metric_labels=labels,
)


gcp_metric_on_success = metric_job_success_hook_factory(
name="gcp_metric_on_success",
description="GCP metric hook for success",
on_success=True,
gcp_resource_name="gcp_metrics",
callable_fn=post_metric,
required_resource_keys={"gcp_metrics"},
)


gcp_metric_on_failure = gcp_metric_job_success_hook_factory(
gcp_metric_on_failure = metric_job_success_hook_factory(
name="gcp_metric_on_failure",
description="GCP metric hook for failure",
on_success=False,
gcp_resource_name="gcp_metrics",
callable_fn=post_metric,
required_resource_keys={"gcp_metrics"},
)


Expand Down
90 changes: 36 additions & 54 deletions shared/dagster_utils/src/dagster_utils/factories/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,29 @@
from dagster_utils.factories.base import DagsterObjectFactory


def gcp_metric_job_success_hook_factory(
name: str, description: str, on_success: bool, gcp_resource_name: typing.Optional[str] = None
class CallableHookFn(typing.Protocol):
def __call__(
self, context: HookContext, value: typing.Dict[str, bool], labels: typing.Dict[str, str]
) -> None:
...


def metric_job_success_hook_factory(
name: str,
description: str,
on_success: bool,
callable_fn: CallableHookFn,
required_resource_keys: typing.Optional[typing.Set[str]] = None,
) -> typing.Callable:
return GcpMetricJobSuccessHookFactory(
return MetricJobSuccessHookFactory(
name=name,
description=description,
on_success=on_success,
gcp_resource_name=gcp_resource_name,
callable_fn=callable_fn,
required_resource_keys=required_resource_keys,
)()


class GcpMetricLabels:
def __init__(self):
self.__labels = {}

def add(self, key: str, value: str):
self.__labels[key] = value
return self

@property
def labels(self):
return self.__labels


def parse_tags(tags: typing.Mapping[str, str]) -> typing.Iterator[typing.Dict[str, str]]:
for k, v in tags.items():
if k == "dagster/partition":
Expand All @@ -38,70 +37,53 @@ def parse_tags(tags: typing.Mapping[str, str]) -> typing.Iterator[typing.Dict[st
continue


def generate_gcp_metric_labels(context: HookContext, labels: GcpMetricLabels) -> GcpMetricLabels:
def generate_metric_labels(context: HookContext) -> typing.Dict[str, str]:
labels = {}
run = context.instance.get_run_by_id(context.run_id)
labels.add("location", run.external_job_origin.location_name)
labels["location"] = run.external_job_origin.location_name
for tag_dict in parse_tags(run.tags):
for k, v in tag_dict.items():
labels.add(k, v)
labels[k] = v
return labels


def post_metric(context: HookContext, value: int, labels: typing.Dict[str, str]):
_labels = generate_gcp_metric_labels(context, GcpMetricLabels())
for k, v in labels.items():
_labels.add(k, v)
context.log.debug(_labels.labels)
context.resources.gcp_metrics.post_time_series(
series_type="custom.googleapis.com/dagster/job_success",
value={"bool_value": value},
metric_labels=_labels.labels,
)


class GcpMetricJobSuccessHookFactory(DagsterObjectFactory):
class MetricJobSuccessHookFactory(DagsterObjectFactory):
def __init__(
self,
name: str,
description: str,
on_success: bool,
gcp_resource_name: typing.Optional[str] = None,
callable_fn: CallableHookFn,
required_resource_keys: typing.Optional[typing.Set[str]] = None,
):
super().__init__(name, description)
self.on_success = on_success
self.gcp_resource_name = gcp_resource_name
self.required_resource_keys = required_resource_keys
self.callable_fn = callable_fn

def _generate_labels(self, context: HookContext) -> typing.Dict[str, str]:
labels = generate_metric_labels(context)
labels["job_name"] = context.job_name
labels["run_id"] = context.run_id
return labels

def __call__(self) -> typing.Callable:
if self.on_success:

@success_hook(
name=self.name,
required_resource_keys={self.gcp_resource_name},
required_resource_keys=self.required_resource_keys,
)
def _function(context: HookContext):
post_metric(
context,
1,
{
"job_name": context.job_name,
"run_id": context.run_id,
},
)
self.callable_fn(context, 1, self._generate_labels(context))

else:

@failure_hook(
name="job_failure_gcp_metric",
required_resource_keys={self.gcp_resource_name},
name=self.name,
required_resource_keys=self.required_resource_keys,
)
def _function(context: HookContext):
post_metric(
context,
0,
{
"job_name": context.job_name,
"run_id": context.run_id,
},
)
self.callable_fn(context, 0, self._generate_labels(context))

return _function

0 comments on commit 4edcf32

Please sign in to comment.