Skip to content

Commit

Permalink
chore: fix &#@^%@%*$&^@ sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
JasperHG90 committed Mar 3, 2024
1 parent 6c80047 commit d94c06d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ def __init__(
self._logger: logging.Logger = logging.getLogger(
"dagster_utils.factories.sensors.MultiToSinglePartitionJobTriggerSensorFactory"
)
self.run_key_requests_this_sensor: typing.List[str] = []
self.unfinished_downstream_partitions: typing.Dict[str, int] = {}
self.callback_fn = callable_fn

def _get_mapped_downstream_partition_key_from_upstream_partition_key(
Expand Down Expand Up @@ -176,6 +174,8 @@ def _sensor(context: SensorEvaluationContext):
run_requests = 0
job_name = self.monitored_job.name
onetime_logs_emitted: bool = False
run_key_requests_this_sensor: typing.List[str] = []
unfinished_downstream_partitions: typing.Dict[str, int] = {}
context.log.debug(f"Job name: {job_name}")
run_records = self._get_run_records_for_job(context.instance)
context.log.debug(
Expand Down Expand Up @@ -222,9 +222,23 @@ def _sensor(context: SensorEvaluationContext):
)
run_key = f"{downstream_partition_key}_{backfill_name if backfill_name is not None else scheduled_run_name}"
if self._increment_unfinished_downstream_partitions(
run_key
) or self._sensor_already_triggered_with_run_key(run_key):
context.log.debug(f"Skipping run with key {run_key} . . .")
run_key, unfinished_downstream_partitions=unfinished_downstream_partitions
):
skip_number = unfinished_downstream_partitions[run_key]
if skip_number == self.skip_when_unfinished_count:
del unfinished_downstream_partitions[run_key]
else:
unfinished_downstream_partitions[run_key] += 1
context.log.debug(
f"Skipping run with key {run_key} because it has known unfinished partitions . . ."
)
continue
if self._sensor_already_triggered_with_run_key(
run_key, run_key_requests_this_sensor
):
context.log.debug(
f"Skipping run with key {run_key} because the run has already been requested . . ."
)
continue
# NB: this calls the postgres db, so don't want to call it in above lines since these are just lookups and
# as such much faster
Expand Down Expand Up @@ -252,7 +266,7 @@ def _sensor(context: SensorEvaluationContext):
context.log.debug(
f"Only {num_done} out of {num_total} partitions have been materialized for partition {downstream_partition_key}. Skipping . . ."
)
self.unfinished_downstream_partitions[run_key] = 0
unfinished_downstream_partitions[run_key] = 0
continue
else:
if num_failed > 0:
Expand All @@ -264,7 +278,7 @@ def _sensor(context: SensorEvaluationContext):
run_key=run_key if self.use_run_key else None,
partition_key=downstream_partition_key,
)
self.run_key_requests_this_sensor.append(run_key)
run_key_requests_this_sensor.append(run_key)
run_requests += 1
if self.callback_fn is not None:
context.log.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,26 +140,25 @@ def _get_backfill_partitions(
if partition in all_upstream_partitions
]

def _increment_unfinished_downstream_partitions(self, run_key: str) -> bool:
def _increment_unfinished_downstream_partitions(
self, run_key: str, unfinished_downstream_partitions: typing.Dict[str, str]
) -> bool:
# If already know that run_key is unfinished (still require materializations), then continue
# until the skip number is self.skip_when_unfinished_count, then remove from the list and try again
# this keeps us from doing expensive computations and timing out the sensor
if run_key in self.unfinished_downstream_partitions:
if run_key in unfinished_downstream_partitions:
self._logger.debug(
"Run key has recentely been reported as unfinished. Skipping this partition ..."
)
skip_number = self.unfinished_downstream_partitions[run_key]
if skip_number == self.skip_when_unfinished_count:
del self.unfinished_downstream_partitions[run_key]
else:
self.unfinished_downstream_partitions[run_key] += 1
return True
else:
return False

def _sensor_already_triggered_with_run_key(self, run_key: str) -> bool:
def _sensor_already_triggered_with_run_key(
self, run_key: str, run_key_requests_this_sensor: typing.List[str]
) -> bool:
# If this run key has already been requested during this evaluation, skip
if run_key in self.run_key_requests_this_sensor:
if run_key in run_key_requests_this_sensor:
self._logger.debug(f"Run key {run_key} already requested. Skipping . . .")
return True
else:
Expand Down

0 comments on commit d94c06d

Please sign in to comment.