Skip to content

Commit

Permalink
Create workflow for checking if a recipe has already been published
Browse files Browse the repository at this point in the history
Will be used for control flow. We should avoid overwriting existing, published
data unless an overwrite flag is given (TODO).
  • Loading branch information
trey-stafford committed Jan 16, 2025
1 parent 793f523 commit 2e9f997
Showing 1 changed file with 75 additions and 1 deletion.
76 changes: 75 additions & 1 deletion src/ogdc_runner/recipe/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
from hera.workflows import (
Artifact,
Container,
Parameter,
Steps,
Workflow,
models,
)

from ogdc_runner.argo import ARGO_WORKFLOW_SERVICE
from ogdc_runner.argo import ARGO_WORKFLOW_SERVICE, submit_workflow
from ogdc_runner.constants import SIMPLE_RECIPE_FILENAME
from ogdc_runner.models.recipe_config import RecipeConfig
from ogdc_runner.recipe import get_recipe_config
Expand Down Expand Up @@ -87,11 +88,84 @@ def _cmds_from_simple_recipe(recipe_dir: str) -> list[str]:
return commands


def data_already_published(recipe_config: RecipeConfig) -> bool:
"""Workflow that checks for the existence of published data for the given
recipe."""
with Workflow(
generate_name=f"{recipe_config.id}-check-published",
entrypoint="steps",
workflows_service=ARGO_WORKFLOW_SERVICE,
volumes=[
models.Volume(
name="workflow-volume",
# TODO: parameterize this!
persistent_volume_claim={"claim_name": "qgnet-ogdc-workflow-pvc"},
)
],
) as w:
check_dir_template = Container(
name="check-already-published",
command=["sh", "-c"],
# Check for the existence of the recipe-specific subpath. If it
# exists, writ eout a file with "yes". Otherwise write out a file
# with "no". This file becomes an argo parameter that we can check
# later.
args=[
f'test -d /mnt/{recipe_config.id} && echo "yes" > /tmp/published.txt || echo "no" > /tmp/published.txt',
],
outputs=[
Parameter(
name="data-published",
value_from=models.ValueFrom(path="/tmp/published.txt"),
),
],
volume_mounts=[
models.VolumeMount(
name="workflow-volume",
mount_path="/mnt/",
),
],
)

with Steps(name="steps"):
check_dir_template()

# wait for the workflow to complete.
workflow_name = submit_workflow(workflow=w, wait=True)

# Check the result. Get an updated instance of the workflow, with the latest
# states for all notdes. Then, iterate through the nodes and find the
# template we define above ("check-already-published") and extract its
# output parameter.
completed_workflow = ARGO_WORKFLOW_SERVICE.get_workflow(name=workflow_name)
result = None
for node in completed_workflow.status.nodes.values():
if node.template_name == "check-already-published":
result = node.outputs.parameters[0].value
if not result:
err_msg = "Failed to check if data have been published"
raise RuntimeError(err_msg)

assert result in ("yes", "no")

return result == "yes"


def make_simple_workflow(recipe_dir: str) -> Workflow:
"""Run the workflow and return its name as a str."""
commands = _cmds_from_simple_recipe(recipe_dir)
recipe_config = get_recipe_config(recipe_dir)

# TODO: consider moving this into a higher-level function. This should just
# make a simple workflow from a simple recipe, and not submit workflows of
# its own.
# First, check if the data are already published. If so, we should raise an
# error or overwrite if requested.
if data_already_published(recipe_config):
# TODO: better error handling (raise `OGDCRecipeError` or something similar)
err_msg = "Data for recipe {recipe_config.id} have already been published."
raise RuntimeError(err_msg)

with Workflow(
generate_name=f"{recipe_config.id}-",
entrypoint="steps",
Expand Down

0 comments on commit 2e9f997

Please sign in to comment.