Skip to content

Commit

Permalink
refactor: change metadata property keys
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 20, 2024
1 parent 7aabc08 commit df51b8a
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 65 deletions.
3 changes: 3 additions & 0 deletions src/kiara/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ class SpecialValue(Enum):
KIARA_MODEL_DATA_KEY = "data"
KIARA_MODEL_SCHEMA_KEY = "schema"

ENVIRONMENT_MARKER_KEY = "environment"
"""Constant string to indicate this is a metadata entry of type 'environment'."""

SYMLINK_ISSUE_MSG = """Your operating system does not support symlinks, which is a requirement for kiara to work.
You can enable developer mode to fix this issue:
Expand Down
17 changes: 11 additions & 6 deletions src/kiara/models/module/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):
module = kiara.module_registry.create_module(active_job.job_config)
is_internal = module.characteristics.is_internal

env_hashes = {
env.model_type_id: str(env.instance_cid)
for env in kiara.current_environments.values()
}

job_record = JobRecord(
job_id=active_job.job_id,
job_submitted=active_job.submitted,
Expand All @@ -264,7 +269,7 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):
inputs=active_job.job_config.inputs,
outputs=active_job.results,
runtime_details=job_details,
environment_hashes=kiara.environment_registry.environment_hashes,
environment_hashes=env_hashes,
# input_ids_hash=active_job.job_config.input_ids_hash,
inputs_data_hash=inputs_data_hash,
)
Expand All @@ -276,13 +281,13 @@ def from_active_job(self, kiara: "Kiara", active_job: ActiveJob):

job_id: uuid.UUID = Field(description="The globally unique id for this job.")
job_submitted: datetime = Field(description="When the job was submitted.")
environment_hashes: Mapping[str, Mapping[str, str]] = Field(
environment_hashes: Mapping[str, str] = Field(
description="Hashes for the environments this value was created in."
)
enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field(
description="Information about the environments this value was created in.",
default=None,
)
# enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field(
# description="Information about the environments this value was created in.",
# default=None,
# )
is_internal: bool = Field(description="Whether this job was created by the system.")
# job_hash: str = Field(description="The hash of the job. Calculated from manifest & input_ids hashes.")
# manifest_hash: str = Field(description="The hash of the manifest.")
Expand Down
4 changes: 2 additions & 2 deletions src/kiara/models/runtime_environment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
from rich.table import Table

from kiara.defaults import DEFAULT_ENV_HASH_KEY, ENVIRONMENT_TYPE_CATEGORY_ID
from kiara.models import KiaraModel
from kiara.models.metadata import KiaraMetadata
from kiara.utils.hashing import compute_cid
from kiara.utils.json import orjson_dumps
from kiara.utils.output import extract_renderable

logger = structlog.get_logger()


class RuntimeEnvironment(KiaraModel):
class RuntimeEnvironment(KiaraMetadata):
model_config = ConfigDict(frozen=True)

@classmethod
Expand Down
4 changes: 2 additions & 2 deletions src/kiara/processing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def create_job(
) -> uuid.UUID:

environments = {
env_name: env.instance_id
for env_name, env in self._kiara.current_environments.items()
env.model_type_id: str(env.instance_cid)
for env in self._kiara.current_environments.values()
}

result_pedigree = ValuePedigree(
Expand Down
61 changes: 13 additions & 48 deletions src/kiara/registries/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
NOT_SET_VALUE_ID,
ORPHAN_PEDIGREE_OUTPUT_NAME,
STRICT_CHECKS,
SpecialValue,
SpecialValue, ENVIRONMENT_MARKER_KEY,
)
from kiara.exceptions import (
InvalidValuesException,
Expand Down Expand Up @@ -85,6 +85,7 @@
from kiara.context import Kiara
from kiara.models.module.destiny import Destiny
from kiara.models.module.manifest import Manifest
from kiara.models.runtime_environment import RuntimeEnvironment


logger = structlog.getLogger()
Expand Down Expand Up @@ -259,7 +260,7 @@ def __init__(self, kiara: "Kiara"):
self._cached_data[NOT_SET_VALUE_ID] = SpecialValue.NOT_SET
self._registered_values[NOT_SET_VALUE_ID] = self._not_set_value
self._persisted_value_descs[NOT_SET_VALUE_ID] = NONE_PERSISTED_DATA
self._env_cache: Dict[str, Dict[str, Mapping[str, Any]]] = {}
self._env_cache: Dict[str, Dict[str, RuntimeEnvironment]] = {}

self._none_value: Value = Value(
value_id=NONE_VALUE_ID,
Expand Down Expand Up @@ -503,30 +504,16 @@ def get_value(self, value: Union[uuid.UUID, ValueLink, str, Path]) -> Value:

def _persist_environment(self, env_type: str, env_hash: str):

cached = self._env_cache.get(env_type, {}).get(env_hash, None)
if cached is not None:
return

environment = self._kiara.environment_registry.get_environment_for_cid(
env_hash
)
# env_type = environment.get_environment_type_name()
# env_hash = str(environment.instance_cid)
#
# env = self._env_cache.get(env_type, {}).get(env_hash, None)
# if env is not None:
# return

env_data = environment.as_dict_with_schema()
ENVIRONMENT_MARKER_KEY = "environment"
self._kiara.metadata_registry.register_metadata_item(key = ENVIRONMENT_MARKER_KEY, item=environment)
# self._persist_environment_details(
# env_type=env_type, env_hash=env_hash, env_data=env_data
# )
self._env_cache.setdefault(env_type, {})[env_hash] = env_data

cached = self._env_cache.get(env_type, {}).get(env_hash, None)
if cached is not None:
return

environment = self._kiara.environment_registry.get_environment_for_cid(env_hash)

self._kiara.metadata_registry.register_metadata_item(
key=ENVIRONMENT_MARKER_KEY, item=environment
)
self._env_cache.setdefault(env_type, {})[env_hash] = environment

def store_value(
self,
Expand All @@ -540,35 +527,13 @@ def store_value(
again, the archive_id is used, if not, the string is used as the archive alias.
"""

_value = self.get_value(value)

# first, persist environment information
for env_type, env_hash in _value.pedigree.environments.items():

self._persist_environment(env_type, env_hash, _value)
# cached = self._env_cache.get(env_type, {}).get(env_hash, None)
# if cached is not None:
# continue
#
# environment = self.kiara_context.environment_registry.get_environment_for_cid(
# env_hash
# )
# env_type = environment.get_environment_type_name()
# env_hash = str(environment.instance_cid)
#
# env = self._env_cache.get(env_type, {}).get(env_hash, None)
# if env is not None:
# return
#
# env_data = environment.as_dict_with_schema()
# self._persist_environment_details(
# env_type=env_type, env_hash=env_hash, env_data=env_data
# )
# self._env_cache.setdefault(env_type, {})[env_hash] = env_data
#
# self.persist_environment(env)

self._persist_environment(env_type, env_hash)


store: DataStore = self.get_archive(archive_id_or_alias=data_store) # type: ignore
Expand Down
1 change: 0 additions & 1 deletion src/kiara/registries/data/data_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import structlog
from rich.console import RenderableType

from kiara.models.runtime_environment import RuntimeEnvironment
from kiara.models.values.matchers import ValueMatcher
from kiara.models.values.value import (
SERIALIZE_TYPES,
Expand Down
1 change: 0 additions & 1 deletion src/kiara/registries/data/data_store/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from kiara.registries.data import DataArchive
from kiara.registries.data.data_store import BaseDataStore
from kiara.utils.hashfs import shard
from kiara.utils.json import orjson_dumps

if TYPE_CHECKING:
from multiformats import CID
Expand Down
2 changes: 1 addition & 1 deletion src/kiara/registries/environment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(

def get_environment_for_cid(self, env_cid: str) -> RuntimeEnvironment:

envs = [env for env in self.environments.values() if env.instance_id == env_cid]
envs = [env for env in self.environments.values() if str(env.instance_cid) == env_cid]
if len(envs) == 0:
raise Exception(f"No environment with id '{env_cid}' available.")
elif len(envs) > 1:
Expand Down
20 changes: 20 additions & 0 deletions src/kiara/registries/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from bidict import bidict
from rich.console import Group

from kiara.defaults import ENVIRONMENT_MARKER_KEY
from kiara.exceptions import FailedJobException
from kiara.models.events import KiaraEvent
from kiara.models.events.job_registry import (
Expand All @@ -38,6 +39,7 @@
if TYPE_CHECKING:
from kiara.context import Kiara
from kiara.context.runtime_config import JobCacheStrategy
from kiara.models.runtime_environment import RuntimeEnvironment

logger = structlog.getLogger()

Expand Down Expand Up @@ -171,6 +173,9 @@ def __init__(self, kiara: "Kiara"):

self._event_callback = self._kiara.event_registry.add_producer(self)

self._env_cache: Dict[str, Dict[str, RuntimeEnvironment]] = {}


# default_archive = FileSystemJobStore.create_from_kiara_context(self._kiara)
# self.register_job_archive(default_archive, store_alias=DEFAULT_STORE_MARKER)

Expand Down Expand Up @@ -282,12 +287,27 @@ def job_status_changed(
self._finished_jobs[job_hash] = job_id
self._archived_records[job_id] = job_record

def _persist_environment(self, env_type: str, env_hash: str):

cached = self._env_cache.get(env_type, {}).get(env_hash, None)
if cached is not None:
return

environment = self._kiara.environment_registry.get_environment_for_cid(env_hash)
self._kiara.metadata_registry.register_metadata_item(
key=ENVIRONMENT_MARKER_KEY, item=environment
)
self._env_cache.setdefault(env_type, {})[env_hash] = environment

def store_job_record(self, job_id: uuid.UUID):

# TODO: allow to store job record to external store

job_record = self.get_job_record(job_id=job_id)

for env_type, env_hash in job_record.environment_hashes.items():
self._persist_environment(env_type, env_hash)

if job_record._is_stored:
logger.debug(
"ignore.store.job_record", reason="already stored", job_id=str(job_id)
Expand Down
2 changes: 1 addition & 1 deletion src/kiara/registries/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, kiara: "Kiara"):
self._metadata_archives: Dict[str, MetadataArchive] = {}
self._default_data_store: Union[str, None] = None

self._env_registry: EnvironmentRegistry = self._kiara.environment_registry
# self._env_registry: EnvironmentRegistry = self._kiara.environment_registry

@property
def kiara_id(self) -> uuid.UUID:
Expand Down
5 changes: 3 additions & 2 deletions src/kiara/registries/metadata/metadata_store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def __init__(
self._schema_stored_cache: Dict[str, Any] = {}
self._schema_stored_item: Dict[str, Any] = {}


def retrieve_metadata_item(
self,
key: str,
Expand Down Expand Up @@ -133,7 +132,8 @@ def store_metadata_item(
data_json = item.model_dump_json()
data_hash = str(item.instance_cid)

if data_hash not in self._schema_stored_item.keys():
metadata_item_id = self._schema_stored_item.get(data_hash, None)
if not metadata_item_id:

metadata_item_id = self._store_metadata_item(
key=key,
Expand All @@ -145,6 +145,7 @@ def store_metadata_item(
)
self._schema_stored_item[data_hash] = metadata_item_id


if (reference_item_id and not reference_item_type) or (
reference_item_type and not reference_item_id
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def sqlite_engine(self) -> "Engine":
model_schema_hash TEXT NOT NULL,
metadata_value TEXT NOT NULL,
FOREIGN KEY (model_schema_hash) REFERENCES metadata_schemas (model_schema_hash),
UNIQUE (metadata_item_key, metadata_item_hash, model_type_id, model_schema_hash)
UNIQUE (metadata_item_key, metadata_item_hash)
);
CREATE TABLE IF NOT EXISTS metadata_references (
reference_item_type TEXT NOT NULL,
Expand Down

0 comments on commit df51b8a

Please sign in to comment.