Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Init work on publishing final output of "simple" workflow to volume #50

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Docker image for running gdal/ogr and fetch (wget) commands for ogdc recipes.
FROM ghcr.io/osgeo/gdal:alpine-normal-latest
# We use wget to fetch data from remote sources.
RUN apk update && apk add wget
RUN apk update && apk add wget rsync
115 changes: 114 additions & 1 deletion src/ogdc_runner/recipe/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from hera.workflows import (
Artifact,
Container,
Parameter,
Steps,
Workflow,
models,
)

from ogdc_runner.argo import ARGO_WORKFLOW_SERVICE
from ogdc_runner.argo import ARGO_WORKFLOW_SERVICE, submit_workflow
from ogdc_runner.constants import SIMPLE_RECIPE_FILENAME
from ogdc_runner.models.recipe_config import RecipeConfig
from ogdc_runner.recipe import get_recipe_config
Expand Down Expand Up @@ -45,6 +47,28 @@ def _make_fetch_url_template(recipe_config: RecipeConfig) -> Container:
return template


def _make_publish_template(recipe_id: str) -> Container:
"""Creates a container template that will move final output data into the
OGDC data storage volume under a subpath named for the recipe_id."""
template = Container(
name="publish-data",
command=["sh", "-c"],
args=[
"rsync --progress /input_dir/* /output_dir/",
],
inputs=[Artifact(name="input-dir", path="/input_dir/")],
volume_mounts=[
models.VolumeMount(
name="workflow-volume",
mount_path="/output_dir/",
sub_path=recipe_id,
)
],
)

return template


def _cmds_from_simple_recipe(recipe_dir: str) -> list[str]:
"""Read commands from a 'simple' OGDC recipe.

Expand All @@ -64,27 +88,116 @@ def _cmds_from_simple_recipe(recipe_dir: str) -> list[str]:
return commands


def data_already_published(recipe_config: RecipeConfig) -> bool:
"""Workflow that checks for the existence of published data for the given
recipe."""
with Workflow(
generate_name=f"{recipe_config.id}-check-published",
entrypoint="steps",
workflows_service=ARGO_WORKFLOW_SERVICE,
volumes=[
models.Volume(
name="workflow-volume",
# TODO: parameterize this!
persistent_volume_claim={"claim_name": "qgnet-ogdc-workflow-pvc"},
)
],
) as w:
check_dir_template = Container(
name="check-already-published",
command=["sh", "-c"],
# Check for the existence of the recipe-specific subpath. If it
# exists, writ eout a file with "yes". Otherwise write out a file
# with "no". This file becomes an argo parameter that we can check
# later.
args=[
f'test -d /mnt/{recipe_config.id} && echo "yes" > /tmp/published.txt || echo "no" > /tmp/published.txt',
],
outputs=[
Parameter(
name="data-published",
value_from=models.ValueFrom(path="/tmp/published.txt"),
),
],
volume_mounts=[
models.VolumeMount(
name="workflow-volume",
mount_path="/mnt/",
),
],
)

with Steps(name="steps"):
check_dir_template()

# wait for the workflow to complete.
workflow_name = submit_workflow(workflow=w, wait=True)

# Check the result. Get an updated instance of the workflow, with the latest
# states for all notdes. Then, iterate through the nodes and find the
# template we define above ("check-already-published") and extract its
# output parameter.
completed_workflow = ARGO_WORKFLOW_SERVICE.get_workflow(name=workflow_name)
result = None
for node in completed_workflow.status.nodes.values():
if node.template_name == "check-already-published":
result = node.outputs.parameters[0].value
if not result:
err_msg = "Failed to check if data have been published"
raise RuntimeError(err_msg)

assert result in ("yes", "no")

return result == "yes"


def make_simple_workflow(recipe_dir: str) -> Workflow:
"""Run the workflow and return its name as a str."""
commands = _cmds_from_simple_recipe(recipe_dir)
recipe_config = get_recipe_config(recipe_dir)

# TODO: consider moving this into a higher-level function. This should just
# make a simple workflow from a simple recipe, and not submit workflows of
# its own.
# First, check if the data are already published. If so, we should raise an
# error or overwrite if requested.
if data_already_published(recipe_config):
# TODO: better error handling (raise `OGDCRecipeError` or something similar)
err_msg = f"Data for recipe {recipe_config.id} have already been published."
raise RuntimeError(err_msg)

with Workflow(
generate_name=f"{recipe_config.id}-",
entrypoint="steps",
workflows_service=ARGO_WORKFLOW_SERVICE,
volumes=[
models.Volume(
name="workflow-volume",
# TODO: parameterize this!
persistent_volume_claim={"claim_name": "qgnet-ogdc-workflow-pvc"},
)
],
) as w:
cmd_templates = []
for idx, command in enumerate(commands):
cmd_template = _make_cmd_template(name=f"run-cmd-{idx}", command=command)
cmd_templates.append(cmd_template)
fetch_template = _make_fetch_url_template(recipe_config)

# create publication template
publish_template = _make_publish_template(recipe_config.id)

with Steps(name="steps"):
step = fetch_template()
for idx, cmd_template in enumerate(cmd_templates):
step = cmd_template(
name=f"step-{idx}",
arguments=step.get_artifact("output-dir").with_name("input-dir"), # type: ignore[union-attr]
)
# publish final data
publish_template(
name="publish-data",
arguments=step.get_artifact("output-dir").with_name("input-dir"), # type: ignore[union-attr]
)

return w
2 changes: 1 addition & 1 deletion tests/integration/test_recipe/meta.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: "Test argo workflow"
id: "test-wargo-workflow"
id: "test-argo-workflow"
input:
url: "https://raw.githubusercontent.com/nsidc/qgreenland/refs/heads/main/qgreenland/assets/arctic_circle.geojson"
output:
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_argo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def test__configure_argo_settings_envvar_override(monkeypatch):
):
monkeypatch.setenv(envvar, f"{envvar.lower()}_test")

monkeypatch.setenv("ENVIRONMENT", "prod")
workflow_service = _configure_argo_settings()

assert workflow_service.host == "argo_workflows_service_url_test"
Expand Down
Loading