From 73cb2333505e6ffe703e66769f84313159263d16 Mon Sep 17 00:00:00 2001 From: sdc50 Date: Thu, 10 Oct 2024 16:24:07 -0600 Subject: [PATCH] docs --- docs/tethys_sdk/jobs.rst | 24 +++++++++++++++++++-- tethys_compute/tasks.py | 30 +++++++++++++++++++++++++++ tethys_compute/views/update_status.py | 30 +++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/docs/tethys_sdk/jobs.rst b/docs/tethys_sdk/jobs.rst index 3e8bd2d0e..67131d8b9 100644 --- a/docs/tethys_sdk/jobs.rst +++ b/docs/tethys_sdk/jobs.rst @@ -228,9 +228,9 @@ For example, a URL may look something like this: http://example.com/update-job-status/27/ -The output would look something like this: +The response would look something like this: -.. code-block:: python +.. code-block:: javascript {"success": true} @@ -241,6 +241,26 @@ This URL can be retrieved from the job manager with the ``get_job_status_callbac job_manager = App.get_job_manager() callback_url = job_manager.get_job_status_callback_url(request, job_id) +The callback URL can be used to update the jobs status after a specified delay by passing the ``delay`` query parameter: + +.. code-block:: + + http:///update-job-status//?delay= + +For example, to schedule a job update in 30 seconds: + +.. code-block:: + + http:///update-job-status/27/?delay=30 + +In this case the response would look like this: + +.. code-block:: javascript + + {"success": "scheduled"} + +This delay can be useful so the job itself can hit the endpoint just before completing to trigger the Tethys Portal to check its status after it has time to complete and exit. This will allow the portal to register that the job has completed and start any data transfer that is triggered upon job completion. + Custom Statuses --------------- Custom statuses can be given to jobs simply by assigning the ``status`` attribute: diff --git a/tethys_compute/tasks.py b/tethys_compute/tasks.py index 1a4119216..a2a461208 100644 --- a/tethys_compute/tasks.py +++ b/tethys_compute/tasks.py @@ -15,6 +15,22 @@ def create_task(func, /, *args, delay=0, periodic=False, count=None, **kwargs): + """ + Schedules a task to be executed after some delay. This is run asynchronously and must be called from a context + where there is an active event loop (e.g. from a controller). + + Can be set to run periodically (i.e. it will be rescheduled after it is run) either indefinitely or for a + specified number of times. + Args: + func (callable): the function to schedule + *args: args to pass to the function when it is called + delay (int): number of seconds to wait before executing `func` or between calls if `periodic=True` + periodic (bool): if `True` the function will be rescheduled after each execution until `count=0` or + indefinitely if `count=None` + count (int): the number of times to execute the function if `periodic=True`. If `periodic=False` then + this argument is ignored + **kwargs: key-word arguments to pass to `func` + """ asyncio.create_task( _run_after_delay( func, *args, delay=delay, periodic=periodic, count=count, **kwargs @@ -23,6 +39,20 @@ def create_task(func, /, *args, delay=0, periodic=False, count=None, **kwargs): async def _run_after_delay(func, /, *args, delay, periodic, count, **kwargs): + """ + Helper function to `create_task` that delays before executing a function. It is called recursively to handle + `periodic` tasks. + + Args: + func (callable): the function to schedule + *args: args to pass to the function when it is called + delay (int): number of seconds to wait before executing `func` or between calls if `periodic=True` + periodic (bool): if `True` the function will be rescheduled after each execution until `count=0` or + indefinitely if `count=None` + count (int): the number of times to execute the function if `periodic=True`. If `periodic=False` then + this argument is ignored + **kwargs: key-word arguments to pass to `func` + """ await asyncio.sleep(delay) try: logger.info(f'Running task "{func}" with args="{args}" and kwargs="{kwargs}".') diff --git a/tethys_compute/views/update_status.py b/tethys_compute/views/update_status.py index cdc73ed86..786708009 100644 --- a/tethys_compute/views/update_status.py +++ b/tethys_compute/views/update_status.py @@ -23,6 +23,16 @@ @database_sync_to_async def get_job(job_id, user=None): + """ + Helper method to query a `TethysJob` object safely from an asynchronous context. + + Args: + job_id: database ID of a `TethysJob` + user: django user object. If `None` then permission are not checked. Default=None + + Returns: `TethysJob` object + + """ if ( user is None or user.is_staff @@ -33,6 +43,17 @@ def get_job(job_id, user=None): async def do_job_action(job, action): + """ + Helper function to call job actions from an asynchronous context. + Handles both sync methods and coroutine job actions. + + Args: + job: `TethysJob` object + action (str): name of method to call (without arguments) on the `job` + + Returns: return value of `action` + + """ func = getattr(job, action) if asyncio.iscoroutinefunction(func): ret = await func() @@ -43,6 +64,15 @@ async def do_job_action(job, action): async def _update_job_status(job_id): + """ + Helper method to update a jobs status as a task (with delayed execution). + + Args: + job_id: database ID for a `TethysJob` + + Returns: `True` if status was successfully updated, `False` otherwise. + + """ try: job = await get_job(job_id) await do_job_action(job, "update_status")