Skip to content

Commit

Permalink
Merge pull request #62 from neuro-ml/dev close #60
Browse files Browse the repository at this point in the history
move to s3fs
  • Loading branch information
STNLd2 authored Oct 18, 2023
2 parents b97bd38 + 55eb5f0 commit f871bf5
Show file tree
Hide file tree
Showing 19 changed files with 203 additions and 251 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pyyaml
humanfriendly
pydantic<2.0.0
requests
boto3>=1.23.0,<2.0.0
s3fs>=2023.1.0
2 changes: 1 addition & 1 deletion tarn/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.12.0'
__version__ = '0.13.0'
9 changes: 2 additions & 7 deletions tarn/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,9 @@
from typing import Self
except ImportError:
Self = Any
try:
# just a convenience lib for typing
from mypy_boto3_s3 import S3Client
except ImportError:
S3Client = Any
# we will try to support both versions 1 and 2 while they are more or less popular
try:
from pydantic import field_validator as _field_validator, model_validator, BaseModel
from pydantic import BaseModel, field_validator as _field_validator, model_validator


def field_validator(*args, always=None, **kwargs):
Expand All @@ -55,7 +50,7 @@ class NoExtra(BaseModel):


except ImportError:
from pydantic import root_validator, validator as _field_validator, BaseModel
from pydantic import BaseModel, root_validator, validator as _field_validator


def model_validator(mode: str):
Expand Down
4 changes: 2 additions & 2 deletions tarn/location/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from .disk_dict import DiskDict
from .fanout import Fanout
from .interface import Location, Writable
from .interface import Location, ReadOnly
from .levels import Level, Levels
from .nginx import Nginx
from .redis import RedisLocation
from .s3 import S3
from .ssh import SCP, SFTP
from .small import Small
from .ssh import SCP, SFTP

# TODO: deprecated
SmallLocation = Small
2 changes: 1 addition & 1 deletion tarn/location/disk_dict/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pydantic import Field
from yaml import safe_dump, safe_load

from ...compat import field_validator, get_path_group, model_validator, model_validate, model_dump, NoExtra
from ...compat import NoExtra, field_validator, get_path_group, model_dump, model_validate, model_validator
from ...interface import PathOrStr
from ...tools import DummyLabels, DummyLocker, DummySize, DummyUsage, LabelsStorage, Locker, SizeTracker, UsageTracker
from ...utils import mkdir
Expand Down
23 changes: 15 additions & 8 deletions tarn/location/disk_dict/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
from ...digest import key_to_relative
from ...exceptions import CollisionError, StorageCorruption
from ...interface import Key, MaybeLabels, MaybeValue, PathOrStr, Value
from ...tools import Locker, SizeTracker, UsageTracker, LabelsStorage
from ...tools import LabelsStorage, Locker, SizeTracker, UsageTracker
from ...utils import adjust_permissions, create_folders, get_size, match_buffers, match_files
from ..interface import Meta, Writable
from .config import StorageConfig, init_storage, load_config, root_params, CONFIG_NAME
from ..interface import Location, Meta
from .config import CONFIG_NAME, StorageConfig, init_storage, load_config, root_params

logger = logging.getLogger(__name__)
MaybePath = Optional[Path]


class DiskDict(Writable):
class DiskDict(Location):
def __init__(self, root: PathOrStr, levels: Optional[Sequence[int]] = None):
root = Path(root)
config = root / CONFIG_NAME
Expand Down Expand Up @@ -65,7 +65,7 @@ def contents(self) -> Iterable[Tuple[Key, Self, Meta]]:

key = bytes.fromhex(''.join(file.relative_to(self.root).parts))
with self.locker.read(key):
yield key, self, str(self.root)
yield key, self, DiskDictMeta(key, self.usage_tracker, self.labels)

@contextmanager
def read(self, key: Key, return_labels: bool) -> ContextManager[Union[None, Value, Tuple[Value, MaybeLabels]]]:
Expand All @@ -77,7 +77,7 @@ def read(self, key: Key, return_labels: bool) -> ContextManager[Union[None, Valu
if file.is_dir():
file = file / 'data'

self.usage_tracker.update(key)
self.touch(key)
try:
if return_labels:
yield file, self.labels.get(key)
Expand Down Expand Up @@ -142,7 +142,7 @@ def write(self, key: Key, value: Value, labels: MaybeLabels) -> ContextManager[M

# metadata
self.size_tracker.inc(get_size(file))
self.usage_tracker.update(key)
self.touch(key)
self.labels.update(key, labels)

yield file
Expand Down Expand Up @@ -172,6 +172,13 @@ def delete(self, key: Key) -> bool:

return True

def touch(self, key: Key) -> bool:
file = self._key_to_path(key)
if not file.exists():
return False
self.usage_tracker.update(key)
return True

def _key_to_path(self, key: Key):
assert key, 'The key must be non-empty'
return self.root / key_to_relative(key, self.levels)
Expand All @@ -195,7 +202,7 @@ def __eq__(self, other):


class DiskDictMeta(Meta):
def __init__(self, key, usage, labels):
def __init__(self, key: Key, usage: UsageTracker, labels: LabelsStorage):
self._key, self._usage, self._labels = key, usage, labels

@property
Expand Down
40 changes: 22 additions & 18 deletions tarn/location/fanout.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from ..compat import Self
from ..interface import Key, Keys, MaybeLabels, MaybeValue, Meta, Value
from ..utils import is_binary_io
from .interface import Location, Writable
from .interface import Location


class Fanout(Writable):
class Fanout(Location):
def __init__(self, *locations: Location):
hashes = _get_not_none(locations, 'hash')
assert len(hashes) <= 1, hashes
Expand All @@ -33,27 +33,25 @@ def read(self, key: Key, return_labels: bool) -> ContextManager[Union[None, Valu
@contextmanager
def write(self, key: Key, value: Value, labels: MaybeLabels) -> ContextManager[MaybeValue]:
for location in self._locations:
if isinstance(location, Writable):
if is_binary_io(value):
offset = value.tell()
leave = False
with location.write(key, value, labels) as written:
if written is not None:
leave = True
yield written
# see more info on the "leave" trick in `Levels`
if leave:
return
if is_binary_io(value) and offset != value.tell():
value.seek(offset)
if is_binary_io(value):
offset = value.tell()
leave = False
with location.write(key, value, labels) as written:
if written is not None:
leave = True
yield written
# see more info on the "leave" trick in `Levels`
if leave:
return
if is_binary_io(value) and offset != value.tell():
value.seek(offset)
yield None

def delete(self, key: Key) -> bool:
deleted = False
for location in self._locations:
if isinstance(location, Writable):
if location.delete(key):
deleted = True
if location.delete(key):
deleted = True

return deleted

Expand All @@ -78,6 +76,12 @@ def contents(self) -> Iterable[Tuple[Key, Self, Meta]]:
for location in self._locations:
yield from location.contents()

def touch(self, key: Key):
touched = False
for location in self._locations:
touched = location.touch(key)
return touched


def _get_not_none(seq, name):
result = set()
Expand Down
20 changes: 18 additions & 2 deletions tarn/location/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ def read_batch(self, keys: Keys) -> Iterable[Tuple[Key, Union[None, Tuple[Value,
def contents(self) -> Iterable[Tuple[Key, Self, Meta]]:
pass


class Writable(Location, ABC):
@abstractmethod
def write(self, key: Key, value: Value, labels: MaybeLabels) -> ContextManager[MaybeValue]:
pass
Expand All @@ -51,5 +49,23 @@ def write(self, key: Key, value: Value, labels: MaybeLabels) -> ContextManager[M
def delete(self, key: Key) -> bool:
pass

@abstractmethod
def touch(self, key: Key) -> bool:
"""
Update usage date for a given `key`
"""
pass


class ReadOnly(Location):
def write(self, key: Key, value: Value, labels: MaybeLabels) -> ContextManager[MaybeValue]:
yield None

def delete(self, key: Key) -> bool:
return False

def touch(self, key: Key) -> bool:
return False


Locations = Sequence[Location]
44 changes: 24 additions & 20 deletions tarn/location/levels.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from ..compat import Self
from ..interface import Key, Keys, MaybeLabels, MaybeValue, Meta, Value
from ..location import Location, Writable
from ..location import Location
from ..utils import is_binary_io
from .fanout import _get_not_none

Expand All @@ -17,7 +17,7 @@ class Level(NamedTuple):
name: Optional[str] = None


class Levels(Writable):
class Levels(Location):
def __init__(self, *levels: Union[Level, Location]):
levels = [
level if isinstance(level, Level) else Level(level, write=True, replicate=True)
Expand Down Expand Up @@ -54,29 +54,27 @@ def read(self, key: Key, return_labels: bool) -> ContextManager[Union[None, Valu
def write(self, key: Key, value: Value, labels: MaybeLabels) -> ContextManager[MaybeValue]:
for config in self._levels:
location = config.location
if config.write and isinstance(location, Writable):
if is_binary_io(value):
offset = value.tell()
leave = False
with location.write(key, value, labels) as written:
if written is not None:
# we must leave the loop after the first successful write
leave = True
yield written
# but the context manager might have silenced the error, so we need an extra return here
if leave:
return
if is_binary_io(value) and offset != value.tell():
value.seek(offset)
if is_binary_io(value):
offset = value.tell()
leave = False
with location.write(key, value, labels) as written:
if written is not None:
# we must leave the loop after the first successful write
leave = True
yield written
# but the context manager might have silenced the error, so we need an extra return here
if leave:
return
if is_binary_io(value) and offset != value.tell():
value.seek(offset)

yield None

def delete(self, key: Key) -> bool:
deleted = False
for config in self._levels:
if config.write and isinstance(config.location, Writable):
if config.location.delete(key):
deleted = True
if config.location.delete(key):
deleted = True

return deleted

Expand All @@ -102,11 +100,17 @@ def contents(self) -> Iterable[Tuple[Key, Self, Meta]]:
for level in self._levels:
yield from level.location.contents()

def touch(self, key: Key):
touched = False
for level in self._levels:
touched = level.location.touch(key)
return touched

@contextmanager
def _replicate(self, key: Key, value: Value, labels: MaybeLabels, index: int):
for config in islice(self._levels, index):
location = config.location
if config.replicate and isinstance(location, Writable):
if config.replicate:
if is_binary_io(value):
offset = value.tell()
with _propagate_exception(location.write(key, value, labels)) as written:
Expand Down
4 changes: 2 additions & 2 deletions tarn/location/nginx.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from ..config import load_config_buffer
from ..digest import key_to_relative
from ..interface import MaybeLabels, Meta
from .interface import Key, Keys, Location, MaybeValue
from .interface import Key, Keys, MaybeValue, ReadOnly


class Nginx(Location):
class Nginx(ReadOnly):
def __init__(self, url: str):
if not url.endswith('/'):
url += '/'
Expand Down
Loading

0 comments on commit f871bf5

Please sign in to comment.