Skip to content

Commit

Permalink
chore: list job ids in stores
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 12, 2024
1 parent 6b8a5f5 commit a565b61
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 17 deletions.
86 changes: 83 additions & 3 deletions src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
from kiara.interfaces.python_api.models.job import JobDesc
from kiara.interfaces.python_api.value import StoreValueResult, StoreValuesResult
from kiara.models.context import ContextInfo, ContextInfos
from kiara.models.module.jobs import ActiveJob
from kiara.models.module.manifest import Manifest
from kiara.models.module.operation import Operation
from kiara.models.rendering import RenderValueResult
Expand Down Expand Up @@ -101,6 +100,7 @@
)
from kiara.interfaces.python_api.workflow import Workflow
from kiara.models.archives import KiArchiveInfo
from kiara.models.module.jobs import ActiveJob, JobRecord
from kiara.models.module.pipeline import PipelineConfig, PipelineStructure
from kiara.models.module.pipeline.pipeline import PipelineGroupInfo, PipelineInfo
from kiara.registries import KiaraArchive
Expand Down Expand Up @@ -1322,7 +1322,7 @@ def list_value_ids(self, **matcher_params) -> List[uuid.UUID]:
"""
List all available value ids for this kiara context.
This method exists mainly so frontend can retrieve a list of all value_ids that exists on the backend without
This method exists mainly so frontends can retrieve a list of all value_ids that exists on the backend without
having to look up the details of each value (like [list_values][kiara.interfaces.python_api.KiaraAPI.list_values]
does). This method can also be used with a matcher, but in this case the [list_values][kiara.interfaces.python_api.KiaraAPI.list_values]
would be preferable in most cases, because it is called under the hood, and the performance advantage of not
Expand Down Expand Up @@ -1385,6 +1385,16 @@ def get_value(self, value: Union[str, Value, uuid.UUID, Path]) -> Value:
return self.context.data_registry.get_value(value=value)

def get_values(self, **values: Union[str, Value, uuid.UUID]) -> ValueMapReadOnly:
"""Retrieve Value instances for the specified value ids or aliases.
This is a convenience method to get fully 'hydrated' `Value` objects from references to them.
Arguments:
values: a dictionary with value ids or aliases as keys, and value instances as values
Returns:
a mapping with value_id as key, and [kiara.models.values.value.Value] as value
"""

return self.context.data_registry.load_values(values=values)

Expand Down Expand Up @@ -2962,7 +2972,7 @@ def run_job(
)
return self.context.job_registry.retrieve_result(job_id=job_id)

def get_job(self, job_id: Union[str, uuid.UUID]) -> ActiveJob:
def get_job(self, job_id: Union[str, uuid.UUID]) -> "ActiveJob":
"""Retrieve the status of the job with the provided id."""
if isinstance(job_id, str):
job_id = uuid.UUID(job_id)
Expand All @@ -2978,6 +2988,76 @@ def get_job_result(self, job_id: Union[str, uuid.UUID]) -> ValueMapReadOnly:
result = self.context.job_registry.retrieve_result(job_id=job_id)
return result

def list_job_record_ids(self, **matcher_params):
"""List all available job ids in this kiara context, ordered from newest to oldest.
This method exists mainly so frontends can retrieve a list of all job ids in order, without having
to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could
also just use `list_jobs` and take the keys from the result.
You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class.
Arguments:
matcher_params: additional parameters to pass to the job matcher
Returns:
a list of job ids, ordered from latest to earliest
"""

if matcher_params:
records = list(self.list_job_records(**matcher_params).keys())
else:
job_ids = self.context.job_registry.retrieve_all_job_record_ids()

return job_ids

def list_job_records(self, **matcher_params) -> Mapping[uuid.UUID, "JobRecord"]:
"""List all available job ids in this kiara context, ordered from newest to oldest.
This method exists mainly so frontends can retrieve a list of all job ids in order, without having
to retrieve all job details as well (in the case where no matcher_params exist. Otherwise, you could
also just use `list_jobs` and take the keys from the result.
You can look up the supported matcher parameter arguments via the [JobMatcher][kiara.models.module.jobs.JobMatcher] class.
Arguments:
matcher_params: additional parameters to pass to the job matcher
Returns:
a list of job details, ordered from latest to earliest
"""

if matcher_params:
raise NotImplementedError("Job matching is not implemented yet")
from kiara.models.module.jobs import JobMatcher

matcher = JobMatcher(**matcher_params)
job_records = self.context.job_registry.find_job_records(matcher=matcher)
else:
job_records = self.context.job_registry.retrieve_all_job_records()

return job_records

def get_job_record(self, job_id: Union[str, uuid.UUID]) -> "JobRecord":

if isinstance(job_id, str):
job_id = uuid.UUID(job_id)

job_record = self.context.job_registry.get_job_record(job_id=job_id)
return job_record

def get_job_metadata(self, job_id: Union[str, uuid.UUID]) -> Mapping[str, Any]:
"""Retrieve the metadata for the specified job."""

if isinstance(job_id, str):
job_id = uuid.UUID(job_id)

metadata = self.context.metadata_registry.retrieve_job_metadata_items(
job_id=job_id
)
return metadata

def render_value(
self,
value: Union[str, uuid.UUID, Value],
Expand Down
2 changes: 1 addition & 1 deletion src/kiara/interfaces/python_api/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ def _apply_inputs(self) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]
is_resolved=step_details.step.module.manifest.is_resolved,
inputs=step_details.inputs,
)
match = self._kiara.job_registry.find_matching_job_record(
match = self._kiara.job_registry.find_job_record_for_manifest(
inputs_manifest=job_config
)
if match:
Expand Down
33 changes: 32 additions & 1 deletion src/kiara/models/module/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from kiara.exceptions import InvalidValuesException, KiaraException
from kiara.models import KiaraModel
from kiara.models.module.manifest import InputsManifest
from kiara.utils.dates import get_current_time_incl_timezone

if TYPE_CHECKING:
from kiara.context import DataRegistry, Kiara
Expand Down Expand Up @@ -138,7 +139,8 @@ class ActiveJob(KiaraModel):
)
job_log: JobLog = Field(description="The lob jog.")
submitted: datetime = Field(
description="When the job was submitted.", default_factory=datetime.now
description="When the job was submitted.",
default_factory=get_current_time_incl_timezone,
)
started: Union[datetime, None] = Field(
description="When the job was started.", default=None
Expand Down Expand Up @@ -250,6 +252,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):

job_record = JobRecord(
job_id=active_job.job_id,
job_submitted=active_job.submitted,
module_type=active_job.job_config.module_type,
module_config=active_job.job_config.module_config,
is_resolved=active_job.job_config.is_resolved,
Expand All @@ -267,6 +270,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):
return job_record

job_id: uuid.UUID = Field(description="The globally unique id for this job.")
job_submitted: datetime = Field(description="When the job was submitted.")
environment_hashes: Mapping[str, Mapping[str, str]] = Field(
description="Hashes for the environments this value was created in."
)
Expand Down Expand Up @@ -336,3 +340,30 @@ def create_renderable(self, **config: Any) -> RenderableType:
# h = DeepHash(obj, hasher=KIARA_HASH_FUNCTION)
# self._outputs_hash = h[obj]
# return self._outputs_hash


class JobMatcher(KiaraModel):
@classmethod
def create_matcher(self, **match_options: Any):

m = JobMatcher(**match_options)
return m

job_ids: List[uuid.UUID] = Field(
description="A list of job ids, if specified, only jobs with one of these ids will be included.",
default_factory=list,
)
earliest: Union[None, datetime] = Field(
description="The earliest time when the job was created.", default=None
)
latest: Union[None, datetime] = Field(
description="The latest time when the job was created.", default=None
)
operation_inputs: List[uuid.UUID] = Field(
description="A list of value ids, if specified, only jobs that use one of them will be included.",
default_factory=list,
)
produced_outputs: List[uuid.UUID] = Field(
description="A list of value ids, if specified, only jobs that produced one of them will be included.",
default_factory=list,
)
54 changes: 44 additions & 10 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import abc
import uuid
from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Type, Union
from datetime import datetime
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Type, Union

import structlog
from bidict import bidict
Expand Down Expand Up @@ -350,22 +351,55 @@ def get_job_record(self, job_id: uuid.UUID) -> Union[JobRecord, None]:

raise NotImplementedError()

def retrieve_all_job_records(self) -> Mapping[str, JobRecord]:
def find_job_records(self, matcher: JobMatcher) -> Mapping[uuid.UUID, JobRecord]:

all_records: Dict[str, JobRecord] = {}
pass

def retrieve_all_job_record_ids(self) -> List[uuid.UUID]:
"""Retrieve a list of all available job record ids, sorted from latest to earliest."""

all_records: Dict[uuid.UUID, datetime] = {}
for archive in self.job_archives.values():
all_record_ids = archive.retrieve_all_job_hashes()
if all_record_ids is None:
return {}
all_record_ids = archive.retrieve_all_job_ids()
# TODO: check for duplicates and mismatching datetimes
all_records.update(all_record_ids)

all_ids_sorted = [
uuid
for uuid, _ in sorted(
all_records.items(), key=lambda item: item[1], reverse=True
)
]

return all_ids_sorted

def retrieve_all_job_records(self) -> Mapping[uuid.UUID, JobRecord]:
"""Retrieves all job records from all job archives.
Returns:
a map of job-id/job-record pairs, sorted by job submission time, from latest to earliest
"""

all_records: Dict[uuid.UUID, JobRecord] = {}
for archive in self.job_archives.values():
all_record_ids = archive.retrieve_all_job_ids().keys()
for r in all_record_ids:
assert r not in all_records.keys()
job_record = archive.retrieve_record_for_job_hash(r)
job_record = archive.retrieve_record_for_job_id(r)
assert job_record is not None
all_records[r] = job_record

return all_records
all_records_sorted = {
job_id: job
for job_id, job in sorted(
all_records.items(),
key=lambda item: item[1].job_submitted,
reverse=True,
)
}
return all_records_sorted

def find_matching_job_record(
def find_job_record_for_manifest(
self, inputs_manifest: InputsManifest
) -> Union[uuid.UUID, None]:
"""
Expand Down Expand Up @@ -481,7 +515,7 @@ def execute_job(
job_hash=job_config.job_hash,
)

stored_job = self.find_matching_job_record(inputs_manifest=job_config)
stored_job = self.find_job_record_for_manifest(inputs_manifest=job_config)

is_pipeline_step = False if job_config.pipeline_metadata is None else True
if is_pipeline_step:
Expand Down
22 changes: 21 additions & 1 deletion src/kiara/registries/jobs/job_store/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# -*- coding: utf-8 -*-
import abc
from typing import Iterable, Union
import uuid
from datetime import datetime
from typing import Iterable, Mapping, Union

from kiara.models.module.jobs import JobRecord
from kiara.registries import BaseArchive
Expand Down Expand Up @@ -32,6 +34,24 @@ def retrieve_all_job_hashes(
If the job archive retrieves its jobs in a dynamic way, this will return 'None'.
"""

@abc.abstractmethod
def _retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]:
"""
Retrieve a list of all job record ids in the archive, along with when they where submitted.
"""

def retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]:
"""Retrieve a list of all job ids in the archive, along with when they where submitted."""
return self._retrieve_all_job_ids()

@abc.abstractmethod
def _retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]:
pass

def retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]:
job_record = self._retrieve_record_for_job_id(job_id=job_id)
return job_record

@abc.abstractmethod
def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]:
pass
Expand Down
38 changes: 37 additions & 1 deletion src/kiara/registries/jobs/job_store/sqlite_store.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, Mapping, Union

Expand Down Expand Up @@ -114,6 +116,7 @@ def sqlite_engine(self) -> "Engine":
CREATE TABLE IF NOT EXISTS job_records (
job_id TEXT PRIMARY KEY,
job_hash TEXT TEXT NOT NULL,
job_submitted TEXT NOT NULL,
manifest_hash TEXT NOT NULL,
input_ids_hash TEXT NOT NULL,
inputs_data_hash TEXT NOT NULL,
Expand Down Expand Up @@ -146,6 +149,36 @@ def _retrieve_record_for_job_hash(self, job_hash: str) -> Union[JobRecord, None]
job_record = JobRecord(**job_record_data)
return job_record

def _retrieve_all_job_ids(self) -> Mapping[uuid.UUID, datetime]:
"""
Retrieve a list of all job record ids in the archive.
"""

sql = text(
"SELECT job_id, job_submitted FROM job_records ORDER BY job_submitted DESC;"
)

with self.sqlite_engine.connect() as connection:
result = connection.execute(sql)
return {uuid.UUID(row[0]): datetime.fromisoformat(row[1]) for row in result}

def _retrieve_record_for_job_id(self, job_id: uuid.UUID) -> Union[JobRecord, None]:

sql = text("SELECT job_metadata FROM job_records WHERE job_id = :job_id")

params = {"job_id": str(job_id)}

with self.sqlite_engine.connect() as connection:
result = connection.execute(sql, params)
row = result.fetchone()
if not row:
return None

job_record_json = row[0]
job_record_data = orjson.loads(job_record_json)
job_record = JobRecord(**job_record_data)
return job_record

def retrieve_all_job_hashes(
self,
manifest_hash: Union[str, None] = None,
Expand Down Expand Up @@ -222,11 +255,14 @@ def store_job_record(self, job_record: JobRecord):

job_record_json = job_record.model_dump_json()

job_submitted = job_record.job_submitted.isoformat()

sql = text(
"INSERT OR IGNORE INTO job_records(job_id, job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_id, :job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)"
"INSERT OR IGNORE INTO job_records(job_id, job_submitted, job_hash, manifest_hash, input_ids_hash, inputs_data_hash, job_metadata) VALUES (:job_id, :job_submitted, :job_hash, :manifest_hash, :input_ids_hash, :inputs_data_hash, :job_metadata)"
)
params = {
"job_id": str(job_record.job_id),
"job_submitted": job_submitted,
"job_hash": job_hash,
"manifest_hash": manifest_hash,
"input_ids_hash": input_ids_hash,
Expand Down
Loading

0 comments on commit a565b61

Please sign in to comment.