Skip to content

Commit

Permalink
feat: add 'export_archive' & 'copy_archive' API endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Feb 20, 2024
1 parent 11f505d commit cdccf69
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 50 deletions.
15 changes: 15 additions & 0 deletions src/kiara/context/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,25 @@ def register_external_archive(

if archive_type == "data":
result["data"] = self.data_registry.register_data_archive(_archive_inst) # type: ignore
log_message(
"archive.registered",
archive=_archive_inst.archive_name,
archive_type="data",
)
elif archive_type == "alias":
result["alias"] = self.alias_registry.register_archive(_archive_inst) # type: ignore
log_message(
"archive.registered",
archive=_archive_inst.archive_name,
archive_type="alias",
)
elif archive_type == "job_record":
result["job_record"] = self.job_registry.register_job_archive(_archive_inst) # type: ignore
log_message(
"archive.registered",
archive=_archive_inst.archive_name,
archive_type="job_record",
)
else:
raise Exception(f"Can't register archive of type '{archive_type}'.")

Expand Down
1 change: 1 addition & 0 deletions src/kiara/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"kiarchive",
]
"""List of reserved names, inputs/outputs can't use those."""
DEFAULT_STORE_MARKER = "default_store"

DEFAULT_DATA_STORE_MARKER = "default_data_store"
"""Name for the default context data store."""
Expand Down
49 changes: 39 additions & 10 deletions src/kiara/interfaces/cli/archive/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
# Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/)
import rich_click as click

from kiara.defaults import (
CHUNK_COMPRESSION_TYPE,
DEFAULT_CHUNK_COMPRESSION,
)
from kiara.utils.cli import (
output_format_option,
terminal_print_model,
Expand Down Expand Up @@ -39,25 +43,50 @@ def explain_archive(
terminal_print_model(info, format=format, in_panel=f"Archive info: {archive}")


@archive.command("export")
@click.argument("path", nargs=1, required=True)
@click.option(
"--compression",
"-c",
help="The compression inside the archive. If not provided, 'zstd' will be used. Ignored if archive already exists and 'append' is used.",
type=click.Choice(["zstd", "lz4", "lzma", "none"]),
default=DEFAULT_CHUNK_COMPRESSION.ZSTD.name.lower(),
)
@click.option("--append", "-a", help="Append data to existing archive.", is_flag=True)
@click.option("--no-aliases", "-na", help="Do not store aliases.", is_flag=True)
@click.pass_context
@handle_exception()
def export_archive(ctx, path: str, compression: str, append: bool, no_aliases: bool):

from kiara.api import KiaraAPI

api: KiaraAPI = ctx.obj.kiara_api

target_store_params = {"compression": CHUNK_COMPRESSION_TYPE[compression.upper()]}
result = api.export_archive(
target_archive=path,
append=append,
target_store_params=target_store_params,
no_aliases=no_aliases,
)

render_config = {"add_field_column": False}
terminal_print_model(result, **render_config)


@archive.command("import")
@click.argument("archive", nargs=1, required=True)
# @click.option(
# "--all-values",
# "-a",
# is_flag=True,
# default=False,
# help="Import all values from the archive, even if they don't have an alias associated with them.",
# )
@click.argument("path", nargs=1, required=True)
@click.option("--no-aliases", "-na", help="Do not store aliases.", is_flag=True)
@click.pass_context
@handle_exception()
def import_archive(ctx, archive: str, all_values: bool = False):
def import_archive(ctx, path: str, no_aliases: bool):
"""Import an archive file."""

from kiara.interfaces.python_api import KiaraAPI

kiara_api: KiaraAPI = ctx.obj.kiara_api

result = kiara_api.import_archive(archive)
result = kiara_api.import_archive(source_archive=path, no_aliases=no_aliases)

render_config = {"add_field_column": False}
terminal_print_model(result, **render_config)
202 changes: 167 additions & 35 deletions src/kiara/interfaces/python_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@
from ruamel.yaml import YAML

from kiara.defaults import (
CHUNK_COMPRESSION_TYPE,
DATA_ARCHIVE_DEFAULT_VALUE_MARKER,
DEFAULT_CHUNK_COMPRESSION,
DEFAULT_STORE_MARKER,
OFFICIAL_KIARA_PLUGINS,
VALID_VALUE_QUERY_CATEGORIES,
VALUE_ATTR_DELIMITER,
Expand Down Expand Up @@ -1557,6 +1558,7 @@ def list_aliases(self, **matcher_params) -> ValueMapReadOnly:
if matcher_params:
matcher_params["has_alias"] = True
all_values = self.list_values(**matcher_params)

result: Dict[str, Value] = {}
for value in all_values.values():
aliases = self.context.alias_registry.find_aliases_for_value_id(
Expand Down Expand Up @@ -1884,6 +1886,8 @@ def register_archive(
allow_write_access: bool = False,
registered_name: Union[str, None] = None,
create_if_not_exists: bool = True,
existing_ok: bool = True,
**create_params: Any,
) -> str:
"""Register a kiarchive with the current context.
Expand All @@ -1899,12 +1903,19 @@ def register_archive(
allow_write_access: whether to allow write access to the archive
registered_name: the name/alias that the archive is registered in the context, and which can be used in the 'store_value(s)' endpoint, if not provided, it will be auto-determined from the file name
create_if_not_exists: if the file does not exist, create it. If this is 'False', an exception will be raised if the file does not exist.
existing_ok: whether the file is allowed to exist already, if 'False', an exception will be raised if the file exists
create_params: additional parameters to pass to the 'create_kiarchive' method if the file does not exist yet
Returns:
the name/alias that the archive is registered in the context, and which can be used in the 'store_value(s)' endpoint
"""
from kiara.interfaces.python_api.models.archive import KiArchive

if not existing_ok and not create_if_not_exists:
raise KiaraException(
"Both 'existing_ok' and 'create_if_not_exists' cannot be 'False' at the same time."
)

if isinstance(archive, str):
archive = Path(archive)

Expand All @@ -1914,9 +1925,14 @@ def register_archive(
if isinstance(archive, Path):

if archive.exists():
if not existing_ok:
raise KiaraException(
f"Archive file '{archive.as_posix()}' already exists."
)
archive = KiArchive.load_kiarchive(
kiara=self.context, path=archive, archive_name=registered_name
)
log_message("archive.loaded", archive_name=archive.archive_name)
else:
if not create_if_not_exists:
raise KiaraException(
Expand All @@ -1926,19 +1942,27 @@ def register_archive(
if kiarchive_alias.endswith(".kiarchive"):
kiarchive_alias = kiarchive_alias[:-10]

if "compression" not in create_params.keys():
create_params["compression"] = DEFAULT_CHUNK_COMPRESSION

archive = KiArchive.create_kiarchive(
kiara=self.context,
kiarchive_uri=archive.as_posix(),
allow_existing=False,
archive_name=kiarchive_alias,
compression=CHUNK_COMPRESSION_TYPE.ZSTD,
allow_write_access=allow_write_access,
**create_params,
)
log_message("archive.created", archive_name=archive.archive_name)

else:
raise NotImplementedError("Only local files are supported for now.")

data_alias = self.context.register_external_archive(
archive.data_archive,
allow_write_access=allow_write_access,
)

alias_alias = self.context.register_external_archive(
archive.alias_archive, allow_write_access=allow_write_access
)
Expand Down Expand Up @@ -1996,15 +2020,54 @@ def retrieve_archive_info(
)
return kiarchive_info

def export_archive(
self,
target_archive: Union[None, str, Path],
target_registered_name: Union[str, None] = None,
append: bool = False,
no_aliases: bool = False,
target_store_params: Union[None, Mapping[str, Any]] = None,
) -> StoreValuesResult:
"""Export all data from the default context store into the specfied archive path.
The target archives will be registered into the context, either using the provided registered_name, or the name
will be auto-determined from the archive metadata.
Currently, this only works with an external archive file, not with an archive that is already registered into the context.
This will be added later on.
This method does not raise an error if the storing of the value fails, so you have to investigate the
'StoreValuesResult' instance that is returned to see if the storing was successful
Arguments:
target_archive: the registered_name or uri of the target archive, defaults to the context default data/alias store
target_registered_name: the name/alias that the archive should be registered in the context (if necessary)
append: whether to append to an existing archive or error out if the target already exists
no_aliases: whether to skip importing aliases
target_store_params: additional parameters to pass to the 'create_kiarchive' method if the target file does not exist yet
Returns:
an object outlining which values (identified by the specified value key or an enumerated index) where stored and how
"""

result = self.copy_archive(
source_archive=DEFAULT_STORE_MARKER,
target_archive=target_archive,
append=append,
target_store_params=target_store_params,
no_aliases=no_aliases,
)
return result

def import_archive(
self,
archive: Union[str, Path],
# only_aliases: bool = True,
registered_name: Union[str, None] = None,
source_archive: Union[None, str, Path],
source_registered_name: Union[str, None] = None,
no_aliases: bool = False,
) -> StoreValuesResult:
"""Import all data from the specified archive into the current context.
"""Import all data from the specified archive into the current contexts default data & alias store.
The archive will be registered into the context, either ussing the provided registered_name, otherwise the name
The source target will be registered into the context, either using the provided registered_name, otherwise the name
will be auto-determined from the archive metadata.
Currently, this only works with an external archive file, not with an archive that is registered into the context.
Expand All @@ -2014,43 +2077,112 @@ def import_archive(
'StoreValuesResult' instance that is returned to see if the storing was successful
Arguments:
archive: the uri of the archive (file path)
only_aliases: whether to only import the aliases, or all values, even if they don't have an alias
registered_name: the name/alias that the archive is registered in the context
source_archive: the registered_name or uri of the source archive, if None, the context default data/alias store will be used
source_registered_name: the name/alias that the archive should be registered in the context (if necessary)
no_aliases: whether to skip importing aliases
Returns:
an object outlining which values (identified by the specified value key or an enumerated index) where stored and how
"""

archive_ref = self.register_archive(
archive, registered_name=registered_name, allow_write_access=False
result = self.copy_archive(
source_archive=source_archive,
target_archive=DEFAULT_STORE_MARKER,
no_aliases=no_aliases,
)
return result

only_aliases = False
if not only_aliases:
values = self.list_values(
in_data_archives=[archive_ref], allow_internal=True, has_alias=False
).values()
aliases = self.list_aliases(in_data_archives=[archive_ref])
def copy_archive(
self,
source_archive: Union[None, str, Path],
target_archive: Union[None, str, Path] = None,
source_registered_name: Union[str, None] = None,
target_registered_name: Union[str, None] = None,
append: bool = False,
no_aliases: bool = False,
target_store_params: Union[None, Mapping[str, Any]] = None,
) -> StoreValuesResult:
"""Import all data from the specified archive into the current context.
The archives will be registered into the context, either using the provided registered_name, otherwise the name
will be auto-determined from the archive metadata.
Currently, this only works with an external archive file, not with an archive that is registered into the context.
This will be added later on.
This method does not raise an error if the storing of the value fails, so you have to investigate the
'StoreValuesResult' instance that is returned to see if the storing was successful
Arguments:
source_archive: the registered_name or uri of the source archive, if None, the context default data/alias store will be used
target_archive: the registered_name or uri of the target archive, defaults to the context default data/alias store
source_registered_name: the name/alias that the archive should be registered in the context (if necessary)
target_registered_name: the name/alias that the archive should be registered in the context (if necessary)
append: whether to append to an existing archive or error out if the target already exists
no_aliases: whether to skip importing aliases
target_store_params: additional parameters to pass to the 'create_kiarchive' method if the target file does not exist yet
Returns:
an object outlining which values (identified by the specified value key or an enumerated index) where stored and how
"""

if source_archive in [None, DEFAULT_STORE_MARKER]:
source_archive_ref = DEFAULT_STORE_MARKER
else:
source_archive_ref = self.register_archive(
archive=source_archive,
registered_name=source_registered_name,
create_if_not_exists=False,
existing_ok=True,
)

if target_archive in [None, DEFAULT_STORE_MARKER]:
target_archive_ref = DEFAULT_STORE_MARKER
else:
if target_store_params is None:
target_store_params = {}
target_archive_ref = self.register_archive(
archive=target_archive,
registered_name=target_registered_name,
create_if_not_exists=True,
allow_write_access=True,
existing_ok=True if append else False,
**target_store_params,
)

if source_archive_ref == target_archive_ref:
raise KiaraException(
f"Source and target archive cannot be the same: {source_archive_ref} != {target_archive_ref}"
)

source_values = self.list_values(
in_data_archives=[source_archive_ref], allow_internal=True, has_alias=False
).values()

if not no_aliases:
aliases = self.list_aliases(in_data_archives=[source_archive_ref])
alias_map: Union[bool, Dict[str, List[str]]] = {}
for alias, value in aliases.items():
# TODO: maybe add a matcher arg to the list_aliases endpoint
if not alias.startswith(f"{archive_ref}#"):
continue
alias_map.setdefault(str(value.value_id), []).append(
alias[len(archive_ref) + 1 :]
)

if source_archive_ref != DEFAULT_STORE_MARKER:
# TODO: maybe add a matcher arg to the list_aliases endpoint
if not alias.startswith(f"{source_archive_ref}#"):
continue
alias_map.setdefault(str(value.value_id), []).append(
alias[len(source_archive_ref) + 1 :]
)
else:
if "#" in alias:
continue
alias_map.setdefault(str(value.value_id), []).append(alias)
else:
_values = self.list_aliases(
in_data_archives=[archive_ref], allow_internal=True
)
values = {}
for alias, value in _values.items():
# TODO: maybe add a matcher arg to the list_aliases endpoint
if not alias.startswith(f"{archive_ref}#"):
continue
values[alias[len(archive_ref) + 1 :]] = value
alias_map = True
alias_map = False

result = self.store_values(values, alias_map=alias_map)
result = self.store_values(
source_values, alias_map=alias_map, store=target_archive_ref
)
return result

# ------------------------------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit cdccf69

Please sign in to comment.