Skip to content

Commit

Permalink
chore: auto save everything
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 12, 2024
1 parent 391b703 commit efd02fc
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 28 deletions.
17 changes: 10 additions & 7 deletions src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
)
from kiara.interfaces.python_api.workflow import Workflow
from kiara.models.archives import KiArchiveInfo
from kiara.models.metadata import CommentMetadata
from kiara.models.module.jobs import ActiveJob, JobRecord
from kiara.models.module.pipeline import PipelineConfig, PipelineStructure
from kiara.models.module.pipeline.pipeline import PipelineGroupInfo, PipelineInfo
Expand Down Expand Up @@ -2802,6 +2801,10 @@ def queue_manifest(
if "comment" not in job_metadata.keys():
raise KiaraException(msg="You need to provide a 'comment' for the job.")

save_values = True
else:
save_values = False

if inputs is None:
inputs = {}

Expand All @@ -2810,7 +2813,7 @@ def queue_manifest(
)

job_id = self.context.job_registry.execute_job(
job_config=job_config, wait=False
job_config=job_config, wait=False, auto_save_result=save_values
)

if job_metadata:
Expand Down Expand Up @@ -2847,7 +2850,7 @@ def queue_job(
operation: Union[str, Path, Manifest, OperationInfo, JobDesc],
inputs: Union[Mapping[str, Any], None],
operation_config: Union[None, Mapping[str, Any]] = None,
**job_metadata,
**job_metadata: Any,
) -> uuid.UUID:
"""
Queue a job from a operation id, module_name (and config), or pipeline file, wait for the job to finish and retrieve the result.
Expand Down Expand Up @@ -2994,7 +2997,7 @@ def get_job_result(self, job_id: Union[str, uuid.UUID]) -> ValueMapReadOnly:
result = self.context.job_registry.retrieve_result(job_id=job_id)
return result

def list_job_record_ids(self, **matcher_params):
def list_job_record_ids(self, **matcher_params) -> List[uuid.UUID]:
"""List all available job ids in this kiara context, ordered from newest to oldest.
This method exists mainly so frontends can retrieve a list of all job ids in order, without having
Expand All @@ -3011,7 +3014,7 @@ def list_job_record_ids(self, **matcher_params):
"""

if matcher_params:
records = list(self.list_job_records(**matcher_params).keys())
job_ids = list(self.list_job_records(**matcher_params).keys())
else:
job_ids = self.context.job_registry.retrieve_all_job_record_ids()

Expand Down Expand Up @@ -3045,7 +3048,7 @@ def list_job_records(self, **matcher_params) -> Mapping[uuid.UUID, "JobRecord"]:

return job_records

def get_job_record(self, job_id: Union[str, uuid.UUID]) -> "JobRecord":
def get_job_record(self, job_id: Union[str, uuid.UUID]) -> Union["JobRecord", None]:

if isinstance(job_id, str):
job_id = uuid.UUID(job_id)
Expand All @@ -3072,7 +3075,7 @@ def get_job_comment(self, job_id: Union[str, uuid.UUID]) -> Union[str, None]:

metadata: Union[
None, CommentMetadata
] = self.context.metadata_registry.retrieve_job_metadata_item(
] = self.context.metadata_registry.retrieve_job_metadata_item( # type: ignore
job_id=job_id, key="comment"
)

Expand Down
37 changes: 29 additions & 8 deletions src/kiara/processing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

import abc
import uuid
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Protocol, Union
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Protocol, Set, Union

import structlog
from pydantic import BaseModel

from kiara.exceptions import KiaraProcessingException
from kiara.exceptions import KiaraException, KiaraProcessingException
from kiara.models.module.jobs import (
ActiveJob,
JobConfig,
Expand Down Expand Up @@ -63,6 +63,7 @@ def __init__(self, kiara: "Kiara"):
self._finished_jobs: Dict[uuid.UUID, ActiveJob] = {}
self._output_refs: Dict[uuid.UUID, ValueMapWritable] = {}
self._job_records: Dict[uuid.UUID, JobRecord] = {}
self._auto_save_jobs: Set[uuid.UUID] = set()

self._listeners: List[JobStatusListener] = []

Expand Down Expand Up @@ -105,7 +106,9 @@ def get_job_record(self, job_id: uuid.UUID) -> JobRecord:
else:
raise Exception(f"No job record for job with id '{job_id}' registered.")

def create_job(self, job_config: JobConfig) -> uuid.UUID:
def create_job(
self, job_config: JobConfig, auto_save_result: bool = False
) -> uuid.UUID:

environments = {
env_name: env.instance_id
Expand Down Expand Up @@ -138,7 +141,7 @@ def create_job(self, job_config: JobConfig) -> uuid.UUID:
ID_REGISTRY.update_metadata(job_id, obj=job)
job.job_log.add_log("job created")

job_details = {
job_details: Dict[str, Any] = {
"job_config": job_config,
"job": job,
"module": module,
Expand Down Expand Up @@ -186,6 +189,9 @@ def create_job(self, job_config: JobConfig) -> uuid.UUID:
)
log_dev_message(table, title=title)

if auto_save_result:
self._auto_save_jobs.add(job_id)

return job_id

def queue_job(self, job_id: uuid.UUID) -> ActiveJob:
Expand Down Expand Up @@ -254,18 +260,20 @@ def job_status_updated(

old_status = job.status

result_values = None

if status == JobStatus.SUCCESS:
self._active_jobs.pop(job_id)
job.job_log.add_log("job finished successfully")
job.status = JobStatus.SUCCESS
job.finished = get_current_time_incl_timezone()
values = self._output_refs[job_id]
result_values = self._output_refs[job_id]
try:
values.sync_values()
for field, val in values.items():
result_values.sync_values()
for field, val in result_values.items():
val.job_id = job_id

value_ids = values.get_all_value_ids()
value_ids = result_values.get_all_value_ids()
job.results = value_ids
job.job_log.percent_finished = 100
job_record = JobRecord.from_active_job(
Expand Down Expand Up @@ -368,6 +376,19 @@ def job_status_updated(
job_id=job_id, old_status=old_status, new_status=job.status
)

if status is JobStatus.SUCCESS:
if job_id in self._auto_save_jobs:
assert result_values is not None
try:
for val in result_values.values():
self._kiara.data_registry.store_value(val)
except Exception as e:
log_exception(e)
raise KiaraException(
msg=f"Failed to auto-save job results for job: {job_id}",
parent=e,
)

def wait_for(self, *job_ids: uuid.UUID):
"""Wait for the jobs with the specified ids, also optionally sync their outputs with the pipeline value state."""
self._wait_for(*job_ids)
Expand Down
22 changes: 13 additions & 9 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def get_job_record(self, job_id: uuid.UUID) -> Union[JobRecord, None]:

def find_job_records(self, matcher: JobMatcher) -> Mapping[uuid.UUID, JobRecord]:

pass
raise NotImplementedError("Job matching is Not implemented yet.")

def retrieve_all_job_record_ids(self) -> List[uuid.UUID]:
"""Retrieve a list of all available job record ids, sorted from latest to earliest."""
Expand Down Expand Up @@ -389,14 +389,14 @@ def retrieve_all_job_records(self) -> Mapping[uuid.UUID, JobRecord]:
assert job_record is not None
all_records[r] = job_record

all_records_sorted = {
job_id: job
for job_id, job in sorted(
all_records_sorted = dict(
sorted(
all_records.items(),
key=lambda item: item[1].job_submitted,
reverse=True,
)
}
)

return all_records_sorted

def find_job_record_for_manifest(
Expand Down Expand Up @@ -475,15 +475,14 @@ def execute(
return self.execute_job(job_config, wait=wait)

def execute_job(
self,
job_config: JobConfig,
wait: bool = False,
self, job_config: JobConfig, wait: bool = False, auto_save_result=False
) -> uuid.UUID:
"""Execute the job specified by the job config.
Arguments:
job_config: the job config
wait: whether to wait for the job to finish
auto_save_result: whether to automatically save the job's outputs to the data registry once the job finished successfully
"""

# from kiara.models.metadata import CommentMetadata
Expand Down Expand Up @@ -524,6 +523,7 @@ def execute_job(
else:
pipeline_step_id = None
pipeline_id = None

if stored_job is not None:
log.debug(
"job.use_cached",
Expand Down Expand Up @@ -558,6 +558,8 @@ def execute_job(
panel = Group(table, table_job_record)
log_dev_message(panel, title=title)

# TODO: in this case, and if 'auto_save_result' is True, we should also verify the outputs are saved?

return stored_job

dbg_data = {
Expand All @@ -570,7 +572,9 @@ def execute_job(

log.debug("job.execute", **dbg_data)

job_id = self._processor.create_job(job_config=job_config)
job_id = self._processor.create_job(
job_config=job_config, auto_save_result=auto_save_result
)
self._active_jobs[job_config.job_hash] = job_id

try:
Expand Down
2 changes: 1 addition & 1 deletion src/kiara/registries/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def retrieve_metadata_item(
)

model_instance = model_cls(**data)
return model_instance
return model_instance # type: ignore

def register_metadata_item(
self,
Expand Down
11 changes: 10 additions & 1 deletion src/kiara/registries/metadata/metadata_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import uuid
from typing import Any, Dict, Generic, Iterable, Mapping, Tuple, Union

from kiara.exceptions import KiaraException
from kiara.models.metadata import KiaraMetadata
from kiara.registries import ARCHIVE_CONFIG_CLS, BaseArchive

Expand Down Expand Up @@ -37,7 +38,15 @@ def retrieve_metadata_item(
reference_id: Union[str, None] = None,
) -> Union[Tuple[str, Mapping[str, Any]], None]:

if reference_id and not reference_type:
raise ValueError(
"If reference_id is set, reference_type must be set as well."
)
if reference_type:
if reference_id is None:
raise KiaraException(
msg="reference_id must set also if reference_type is set."
)
result = self._retrieve_referenced_metadata_item_data(
key=key, reference_type=reference_type, reference_id=reference_id
)
Expand All @@ -55,7 +64,6 @@ def _retrieve_referenced_metadata_item_data(
self, key: str, reference_type: str, reference_id: str
) -> Union[Tuple[str, Mapping[str, Any]], None]:
"""Return the model type id and model data for the specified referenced metadata item."""
pass


class MetadataStore(MetadataArchive):
Expand Down Expand Up @@ -131,6 +139,7 @@ def store_metadata_item(
)

if reference_item_type:
assert reference_item_id is not None
self._store_metadata_reference(
reference_item_type, reference_item_id, str(metadata_item_id)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path
from typing import Any, Dict, Mapping, Tuple, Union

from orjson import orjson
import orjson
from sqlalchemy import text
from sqlalchemy.engine import Engine, create_engine

Expand Down
4 changes: 3 additions & 1 deletion src/kiara/utils/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ def execute_job(
if comment is not None:
job_metadata["comment"] = comment

job_id = api.queue_job(operation=operation, inputs=inputs, **job_metadata)
job_id = api.queue_job(
operation=operation, inputs=inputs, operation_config=None, **job_metadata
)

try:
outputs = api.get_job_result(job_id=job_id)
Expand Down

0 comments on commit efd02fc

Please sign in to comment.