diff --git a/src/kiara/interfaces/python_api/__init__.py b/src/kiara/interfaces/python_api/__init__.py index de22f4fd1..55c7090fb 100644 --- a/src/kiara/interfaces/python_api/__init__.py +++ b/src/kiara/interfaces/python_api/__init__.py @@ -2759,7 +2759,10 @@ def assemble_render_pipeline( # ------------------------------------------------------------------------------------------------------------------ # job-related methods def queue_manifest( - self, manifest: Manifest, inputs: Union[None, Mapping[str, Any]] = None + self, + manifest: Manifest, + inputs: Union[None, Mapping[str, Any]] = None, + **job_metadata: Any, ) -> uuid.UUID: """ Queue a job using the provided manifest to describe the module and config that should be executed. @@ -2786,7 +2789,10 @@ def queue_manifest( return job_id def run_manifest( - self, manifest: Manifest, inputs: Union[None, Mapping[str, Any]] = None + self, + manifest: Manifest, + inputs: Union[None, Mapping[str, Any]] = None, + **job_metadata: Any, ) -> ValueMapReadOnly: """ Run a job using the provided manifest to describe the module and config that should be executed. @@ -2796,11 +2802,12 @@ def run_manifest( Arguments: manifest: the manifest inputs: the job inputs (can be either references to values, or raw inputs + job_metadata: additional metadata to store with the job Returns: a result value map instance """ - job_id = self.queue_manifest(manifest=manifest, inputs=inputs) + job_id = self.queue_manifest(manifest=manifest, inputs=inputs, **job_metadata) return self.context.job_registry.retrieve_result(job_id=job_id) def queue_job( @@ -2822,19 +2829,12 @@ def queue_job( operation: a module name, operation id, or a path to a pipeline file (resolved in this order, until a match is found).. inputs: the operation inputs operation_config: the (optional) module config in case 'operation' is a module name - **job_metadata: additional metadata to store with the job + job_metadata: additional metadata to store with the job Returns: the queued job id """ - if "comment" not in job_metadata.keys(): - raise KiaraException("You need to provide a 'comment' for the job.") - - comment = job_metadata.get("comment") - if not isinstance(comment, str): - raise KiaraException("The 'comment' must be a string.") - if inputs is None: inputs = {} @@ -2903,14 +2903,8 @@ def queue_job( else: manifest = _operation - job_id = self.queue_manifest(manifest=manifest, inputs=inputs) + job_id = self.queue_manifest(manifest=manifest, inputs=inputs, **job_metadata) - from kiara.models.metadata import CommentMetadata - - comment_metadata = CommentMetadata(comment=comment) - self.context.metadata_registry.register_metadata_item( - key="comment", item=comment_metadata, force=False, store=None - ) return job_id def run_job( diff --git a/src/kiara/models/module/pipeline/controller.py b/src/kiara/models/module/pipeline/controller.py index 0f329a7de..4db188105 100644 --- a/src/kiara/models/module/pipeline/controller.py +++ b/src/kiara/models/module/pipeline/controller.py @@ -134,9 +134,13 @@ def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID: """ job_config = self.pipeline.create_job_config_for_step(step_id) - job_metadata = {"is_pipeline_step": True, "step_id": step_id} + job_metadata = { + "is_pipeline_step": True, + "step_id": step_id, + "pipeline_id": self.pipeline.pipeline_id, + } job_id = self._job_registry.execute_job( - job_config=job_config, job_metadata=job_metadata + job_config=job_config, pipeline_metadata=job_metadata ) # job_id = self._processor.create_job(job_config=job_config) # self._processor.queue_job(job_id=job_id) diff --git a/src/kiara/processing/__init__.py b/src/kiara/processing/__init__.py index 5e605070a..f13635f40 100644 --- a/src/kiara/processing/__init__.py +++ b/src/kiara/processing/__init__.py @@ -100,7 +100,7 @@ def get_job_record(self, job_id: uuid.UUID) -> JobRecord: raise Exception(f"No job record for job with id '{job_id}' registered.") def create_job( - self, job_config: JobConfig, job_metadata: Union[None, Mapping[str, Any]] + self, job_config: JobConfig, pipeline_metadata: Union[None, Mapping[str, Any]] ) -> uuid.UUID: environments = { @@ -108,8 +108,8 @@ def create_job( for env_name, env in self._kiara.current_environments.items() } - if job_metadata is None: - job_metadata = {} + if pipeline_metadata is None: + pipeline_metadata = {} result_pedigree = ValuePedigree( kiara_id=self._kiara.id, @@ -142,7 +142,7 @@ def create_job( "job": job, "module": module, "outputs": outputs, - "job_metadata": job_metadata, + "job_metadata": pipeline_metadata, } self._created_jobs[job_id] = job_details @@ -159,10 +159,10 @@ def create_job( or dev_settings.log.pre_run.internal_modules ): - is_pipeline_step = job_metadata.get("is_pipeline_step", False) + is_pipeline_step = pipeline_metadata.get("is_pipeline_step", False) if is_pipeline_step: if dev_settings.log.pre_run.pipeline_steps: - step_id = job_metadata.get("step_id", None) + step_id = pipeline_metadata.get("step_id", None) assert step_id is not None title = ( f"Pre-run information for pipeline step: [i]{step_id}[/i]" diff --git a/src/kiara/registries/jobs/__init__.py b/src/kiara/registries/jobs/__init__.py index 026ac419b..280957ca8 100644 --- a/src/kiara/registries/jobs/__init__.py +++ b/src/kiara/registries/jobs/__init__.py @@ -430,19 +430,31 @@ def execute( manifest: Manifest, inputs: Mapping[str, Any], wait: bool = False, - job_metadata: Union[None, Any] = None, ) -> uuid.UUID: job_config = self.prepare_job_config(manifest=manifest, inputs=inputs) - return self.execute_job(job_config, wait=wait, job_metadata=job_metadata) + return self.execute_job(job_config, wait=wait) def execute_job( self, job_config: JobConfig, wait: bool = False, - job_metadata: Union[None, Any] = None, + pipeline_metadata: Union[None, Any] = None, ) -> uuid.UUID: + # from kiara.models.metadata import CommentMetadata + # if "comment" not in job_metadata.keys(): + # raise KiaraException("You need to provide a 'comment' for the job.") + # + # comment = job_metadata.get("comment") + # if not isinstance(comment, str): + # raise KiaraException("The 'comment' must be a string.") + # + # comment_metadata = CommentMetadata(comment=comment) + # self.context.metadata_registry.register_metadata_item( + # key="comment", item=comment_metadata, force=False, store=None + # ) + if job_config.module_type != "pipeline": log = logger.bind( module_type=job_config.module_type, @@ -469,8 +481,10 @@ def execute_job( if is_develop(): module = self._kiara.module_registry.create_module(manifest=job_config) - if job_metadata and job_metadata.get("is_pipeline_step", True): - step_id = job_metadata.get("step_id", None) + if pipeline_metadata and pipeline_metadata.get( + "is_pipeline_step", True + ): + step_id = pipeline_metadata.get("step_id", None) title = f"Using cached pipeline step: {step_id}" else: title = f"Using cached job for: {module.module_type_name}" @@ -497,20 +511,20 @@ def execute_job( return stored_job - if job_metadata is None: - job_metadata = {} + if pipeline_metadata is None: + pipeline_metadata = {} - is_pipeline_step = job_metadata.get("is_pipeline_step", False) + is_pipeline_step = pipeline_metadata.get("is_pipeline_step", False) dbg_data = { "module_type": job_config.module_type, "is_pipeline_step": is_pipeline_step, } if is_pipeline_step: - dbg_data["step_id"] = job_metadata["step_id"] + dbg_data["step_id"] = pipeline_metadata["step_id"] log.debug("job.execute", **dbg_data) job_id = self._processor.create_job( - job_config=job_config, job_metadata=job_metadata + job_config=job_config, pipeline_metadata=pipeline_metadata ) self._active_jobs[job_config.job_hash] = job_id diff --git a/src/kiara/registries/metadata/metadata_store/sqlite_store.py b/src/kiara/registries/metadata/metadata_store/sqlite_store.py index 470e9d3fd..315ea9631 100644 --- a/src/kiara/registries/metadata/metadata_store/sqlite_store.py +++ b/src/kiara/registries/metadata/metadata_store/sqlite_store.py @@ -239,3 +239,8 @@ def _store_metadata_item( conn.commit() return metadata_item_id + + def _store_metadata_reference( + self, reference_item_type: str, reference_item_id: str, metadata_item_id: str + ) -> None: + raise NotImplementedError()