diff --git a/dds/_api.py b/dds/_api.py index 388ba3e..74cc471 100644 --- a/dds/_api.py +++ b/dds/_api.py @@ -6,6 +6,7 @@ import logging import pathlib import time +import tempfile from collections import OrderedDict from typing import TypeVar, Tuple, Callable, Dict, Any, Optional, Union, Set, List @@ -36,7 +37,7 @@ # TODO: set up in the use temporary space -_store: Store = LocalFileStore("/tmp/dds/internal/", "/tmp/dds/data/") +_store_var: Optional[Store] = None _eval_ctx: Optional[EvalContext] = None @@ -64,11 +65,11 @@ def eval( def load(path: Union[str, DDSPath, pathlib.Path]) -> Any: path_ = DDSPathUtils.create(path) - key = _store.fetch_paths([path_]).get(path_) + key = _store().fetch_paths([path_]).get(path_) if key is None: - raise DDSException(f"The store {_store} did not return path {path_}") + raise DDSException(f"The store {_store()} did not return path {path_}") else: - return _store.fetch_blob(key) + return _store().fetch_blob(key) def set_store( @@ -84,21 +85,23 @@ def set_store( store: either a store, or 'local' or 'dbfs' """ - global _store + global _store_var if isinstance(store, Store): if cache_objects is not None: raise DDSException( f"Cannot provide a caching option and a store object of type 'Store' at the same time" ) # Directly setting the store - _store = store + _store_var = store return elif store == "local": if not internal_dir: - internal_dir = "/tmp" + internal_dir = str( + pathlib.Path(tempfile.gettempdir()).joinpath("dds", "store") + ) if not data_dir: - data_dir = "/tmp/data" - _store = LocalFileStore(internal_dir, data_dir) + data_dir = str(pathlib.Path(tempfile.gettempdir()).joinpath("dds", "data")) + _store_var = LocalFileStore(internal_dir, data_dir) elif store == "dbfs": if data_dir is None: raise DDSException("Missing data_dir argument") @@ -115,7 +118,7 @@ def set_store( commit_type = str(commit_type or CommitType.FULL.name).upper() commit_type_ = CommitType[commit_type] - _store = DBFSStore( + _store_var = DBFSStore( DBFSURI.parse(internal_dir), DBFSURI.parse(data_dir), dbutils, commit_type_ ) else: @@ -136,8 +139,8 @@ def set_store( elif cache_objects > 0: num_objects = cache_objects if num_objects is not None: - _store = LRUCacheStore(_store, num_elem=num_objects) - _logger.debug(f"Setting the store to {_store}") + _store_var = LRUCacheStore(_store(), num_elem=num_objects) + _logger.debug(f"Setting the store to {_store()}") def _parse_stages( @@ -196,9 +199,9 @@ def _eval( ) key = None if path is None else _eval_ctx.requested_paths[path] t = _time() - if key is not None and _store.has_blob(key): + if key is not None and _store().has_blob(key): _logger.debug(f"_eval:Return cached {path} from {key}") - blob = _store.fetch_blob(key) + blob = _store().fetch_blob(key) _add_delta(t, ProcessingStage.STORE_COMMIT) return blob else: @@ -217,11 +220,27 @@ def _eval( if key is not None: _logger.info(f"_eval:Storing blob into key {key}") t = _time() - _store.store_blob(key, res, codec=None) + _store().store_blob(key, res, codec=None) _add_delta(t, ProcessingStage.STORE_COMMIT) return res +def _store() -> Store: + """ + Gets the current store (or initializes it to the local default store if necessary) + """ + global _store_var + if _store_var is None: + p = pathlib.Path(tempfile.gettempdir()).joinpath("dds") + store_path = p.joinpath("store") + data_path = p.joinpath("data") + _logger.info( + f"Initializing default store. store dir: {store_path} data dir: {data_path}" + ) + _store_var = LocalFileStore(str(store_path), str(data_path)) + return _store_var + + def _time() -> float: return time.monotonic() @@ -272,7 +291,7 @@ def _eval_new_ctx( _logger.debug( f"_eval_new_ctx: need to resolve indirect references: {loads_to_check}" ) - resolved_indirect_refs = _store.fetch_paths(loads_to_check) + resolved_indirect_refs = _store().fetch_paths(loads_to_check) _logger.debug( f"_eval_new_ctx: fetched indirect references: {resolved_indirect_refs}" ) @@ -296,7 +315,7 @@ def _eval_new_ctx( present_blobs: Optional[Set[PyHash]] if extra_debug: present_blobs = set( - [key for key in set(store_paths.values()) if _store.has_blob(key)] + [key for key in set(store_paths.values()) if _store().has_blob(key)] ) _logger.debug(f"_eval_new_ctx: {len(present_blobs)} present blobs") else: @@ -327,9 +346,9 @@ def _eval_new_ctx( current_sig = inters.fun_return_sig _logger.debug(f"_eval_new_ctx:current_sig: {current_sig}") t = _time() - if _store.has_blob(current_sig): + if _store().has_blob(current_sig): _logger.debug(f"_eval_new_ctx:Return cached signature {current_sig}") - res = _store.fetch_blob(current_sig) + res = _store().fetch_blob(current_sig) _add_delta(t, ProcessingStage.STORE_COMMIT) else: arg_repr = [str(type(arg)) for arg in args] @@ -349,13 +368,13 @@ def _eval_new_ctx( # TODO: add a phase for storing the blobs _logger.info(f"_eval:Storing blob into key {obj_key}") t = _time() - _store.store_blob(obj_key, res, codec=None) + _store().store_blob(obj_key, res, codec=None) _add_delta(t, ProcessingStage.STORE_COMMIT) if ProcessingStage.PATH_COMMIT in stages: _logger.debug(f"Starting stage {ProcessingStage.PATH_COMMIT}") t = _time() - _store.sync_paths(store_paths) + _store().sync_paths(store_paths) _add_delta(t, ProcessingStage.PATH_COMMIT) _logger.debug(f"Stage {ProcessingStage.PATH_COMMIT} done") else: diff --git a/dds/_version.py b/dds/_version.py index 691a48c..58dc983 100644 --- a/dds/_version.py +++ b/dds/_version.py @@ -1 +1 @@ -version = "0.7.1" +version = "0.7.2" diff --git a/dds/store.py b/dds/store.py index 5cbab87..530681e 100644 --- a/dds/store.py +++ b/dds/store.py @@ -86,6 +86,9 @@ def __init__(self, internal_dir: str, data_dir: str, create_dirs: bool = True): if not os.path.exists(p_blobs): os.makedirs(p_blobs) + def __repr__(self): + return f"LocalFileStore(internal_dir={self._root} data_dir={self._data_root})" + def fetch_blob(self, key: PyHash) -> Any: p = os.path.join(self._root, "blobs", key) meta_p = os.path.join(self._root, "blobs", key + ".meta") diff --git a/doc_source/changelog.md b/doc_source/changelog.md index e3b09c3..e14c48e 100644 --- a/doc_source/changelog.md +++ b/doc_source/changelog.md @@ -1,5 +1,13 @@ # Changelog +## v0.7.2 + +Small usability fixes in this release: + +* delaying the creation of a default store (and all its side effects) to better support highly concurrent environments +* fix to the type signature of `dds.keep` and `dds.eval` +* improves debugging messages (with a potential extra round trip to the store) + ## v0.7.0 Adds a major feature: caching in memory of most recently used objects. See the documentation of diff --git a/doc_source/tut_custom_types.ipynb b/doc_source/tut_custom_types.ipynb index 65de56a..cb76c67 100644 --- a/doc_source/tut_custom_types.ipynb +++ b/doc_source/tut_custom_types.ipynb @@ -196,7 +196,7 @@ "metadata": {}, "outputs": [], "source": [ - "dds._api._store.codec_registry().add_file_codec(PilFileCodec())" + "dds._api._store().codec_registry().add_file_codec(PilFileCodec())" ] }, { diff --git a/docs/changelog/changelog.md b/docs/changelog/changelog.md index e3b09c3..e14c48e 100644 --- a/docs/changelog/changelog.md +++ b/docs/changelog/changelog.md @@ -1,5 +1,13 @@ # Changelog +## v0.7.2 + +Small usability fixes in this release: + +* delaying the creation of a default store (and all its side effects) to better support highly concurrent environments +* fix to the type signature of `dds.keep` and `dds.eval` +* improves debugging messages (with a potential extra round trip to the store) + ## v0.7.0 Adds a major feature: caching in memory of most recently used objects. See the documentation of diff --git a/docs/changelog/index.html b/docs/changelog/index.html index 7ee36db..1472497 100644 --- a/docs/changelog/index.html +++ b/docs/changelog/index.html @@ -87,6 +87,8 @@