Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Apply obstore as storage backend #3033

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ RUN SETUPTOOLS_SCM_PRETEND_VERSION_FOR_FLYTEKIT=$PSEUDO_VERSION \
-e /flytekit \
-e /flytekit/plugins/flytekit-deck-standard \
-e /flytekit/plugins/flytekit-flyteinteractive \
obstore==0.3.0b9 \
markdown \
pandas \
pillow \
Expand Down
165 changes: 133 additions & 32 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,20 @@
import tempfile
import typing
from time import sleep
from typing import Any, Dict, Optional, Union, cast
from typing import Any, Dict, Optional, Tuple, Union, cast
from uuid import UUID

import fsspec
from decorator import decorator
from fsspec.asyn import AsyncFileSystem
from fsspec.utils import get_protocol
from obstore.store import AzureStore, GCSStore, S3Store
from typing_extensions import Unpack

from flytekit import configuration
from flytekit.configuration import DataConfig
from flytekit.core.local_fsspec import FlyteLocalFileSystem
from flytekit.core.obstore_filesystem import ObstoreAzureBlobFileSystem, ObstoreGCSFileSystem, ObstoreS3FileSystem
from flytekit.core.utils import timeit
from flytekit.exceptions.system import FlyteDownloadDataException, FlyteUploadDataException
from flytekit.exceptions.user import FlyteAssertion, FlyteDataNotFoundException
Expand All @@ -46,47 +48,128 @@

# Refer to https://github.com/fsspec/s3fs/blob/50bafe4d8766c3b2a4e1fc09669cf02fb2d71454/s3fs/core.py#L198
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update this link if we're going to change the args.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I updated the link in the new commit

# for key and secret
_FSSPEC_S3_KEY_ID = "key"
_FSSPEC_S3_SECRET = "secret"
_FSSPEC_S3_KEY_ID = "access_key_id"
_FSSPEC_S3_SECRET = "secret_access_key"
_ANON = "anon"

Uploadable = typing.Union[str, os.PathLike, pathlib.Path, bytes, io.BufferedReader, io.BytesIO, io.StringIO]


def s3_setup_args(s3_cfg: configuration.S3Config, anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {
"cache_regions": True,
}
def s3_setup_args(s3_cfg: configuration.S3Config, bucket: str = "", anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}
store_kwargs: Dict[str, Any] = {}

Check warning on line 60 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L59-L60

Added lines #L59 - L60 were not covered by tests

if s3_cfg.access_key_id:
kwargs[_FSSPEC_S3_KEY_ID] = s3_cfg.access_key_id
store_kwargs[_FSSPEC_S3_KEY_ID] = s3_cfg.access_key_id

Check warning on line 63 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L63

Added line #L63 was not covered by tests

if s3_cfg.secret_access_key:
kwargs[_FSSPEC_S3_SECRET] = s3_cfg.secret_access_key
store_kwargs[_FSSPEC_S3_SECRET] = s3_cfg.secret_access_key

Check warning on line 66 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L66

Added line #L66 was not covered by tests

# S3fs takes this as a special arg
if s3_cfg.endpoint is not None:
kwargs["client_kwargs"] = {"endpoint_url": s3_cfg.endpoint}
store_kwargs["endpoint_url"] = s3_cfg.endpoint

Check warning on line 70 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L70

Added line #L70 was not covered by tests
# kwargs["client_kwargs"] = {"endpoint_url": s3_cfg.endpoint}

store = S3Store.from_env(

Check warning on line 73 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L73

Added line #L73 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we cache these setup args functions? i think each call to S3Store is creating a new client underneath the hood in the object store library. let's add lru_cache to this call? @pingsutw

bucket,
config={
**store_kwargs,
"aws_allow_http": "true", # Allow HTTP connections
"aws_virtual_hosted_style_request": "false", # Use path-style addressing
},
)

kwargs["retries"] = s3_cfg.retries

Check warning on line 82 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L82

Added line #L82 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating S3 retries value

Consider validating the retries value before assigning it to kwargs. A negative or extremely large value could cause issues.

Code suggestion
Check the AI-generated fix before applying
Suggested change
kwargs["retries"] = s3_cfg.retries
if s3_cfg.retries is not None and 0 <= s3_cfg.retries <= 10:
kwargs["retries"] = s3_cfg.retries

Code Review Run #e101cd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


if anonymous:
kwargs[_ANON] = True

kwargs["store"] = store

Check warning on line 87 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L87

Added line #L87 was not covered by tests

return kwargs


def azure_setup_args(azure_cfg: configuration.AzureBlobStorageConfig, anonymous: bool = False) -> Dict[str, Any]:
def gs_setup_args(gcs_cfg: configuration.GCSConfig, bucket: str = "", anonymous: bool = False) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}

store = GCSStore.from_env(

Check warning on line 95 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L95

Added line #L95 was not covered by tests
bucket,
)

if anonymous:
kwargs["token"] = _ANON

Check warning on line 100 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L100

Added line #L100 was not covered by tests

kwargs["store"] = store

Check warning on line 102 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L102

Added line #L102 was not covered by tests

return kwargs

Check warning on line 104 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L104

Added line #L104 was not covered by tests


def split_path(path: str) -> Tuple[str, str]:
"""
Split bucket and file path

Parameters
----------
path : string
Input path, like `s3://mybucket/path/to/file`

Examples
--------
>>> split_path("s3://mybucket/path/to/file")
['mybucket', 'path/to/file']
"""
support_types = ["s3", "gs", "abfs"]
protocol = get_protocol(path)
if protocol not in support_types:
Comment on lines +119 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving support types to constant

Consider moving the support_types list to a module-level constant since it represents static configuration data. This would improve maintainability and reusability.

Code suggestion
Check the AI-generated fix before applying
 @@ -1,1 +1,3 @@
 +SUPPORTED_PROTOCOLS = ["s3", "gs", "abfs"]
 +
  def split_path(path: str) -> Tuple[str, str]:
 -    support_types = ["s3", "gs", "abfs"]
 -    protocol = get_protocol(path)
 -    if protocol not in support_types:
 +    protocol = get_protocol(path)
 +    if protocol not in SUPPORTED_PROTOCOLS:

Code Review Run #e101cd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

# no bucket for file
return "", path

if path.startswith(protocol + "://"):
path = path[len(protocol) + 3 :]

Check warning on line 128 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L128

Added line #L128 was not covered by tests
elif path.startswith(protocol + "::"):
path = path[len(protocol) + 2 :]
path = path.strip("/")

Check warning on line 131 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L130-L131

Added lines #L130 - L131 were not covered by tests

if "/" not in path:
return path, ""

Check warning on line 134 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L134

Added line #L134 was not covered by tests
else:
path_li = path.split("/")
bucket = path_li[0]

Check warning on line 137 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L136-L137

Added lines #L136 - L137 were not covered by tests
# use obstore for s3 and gcs only now, no need to split
# bucket out of path for other storage
file_path = "/".join(path_li[1:])
return (bucket, file_path)

Check warning on line 141 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L140-L141

Added lines #L140 - L141 were not covered by tests


def azure_setup_args(
azure_cfg: configuration.AzureBlobStorageConfig, container: str = "", anonymous: bool = False
) -> Dict[str, Any]:
kwargs: Dict[str, Any] = {}
store_kwargs: Dict[str, Any] = {}

Check warning on line 148 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L147-L148

Added lines #L147 - L148 were not covered by tests

if azure_cfg.account_name:
kwargs["account_name"] = azure_cfg.account_name
store_kwargs["account_name"] = azure_cfg.account_name

Check warning on line 151 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L151

Added line #L151 was not covered by tests
if azure_cfg.account_key:
kwargs["account_key"] = azure_cfg.account_key
store_kwargs["account_key"] = azure_cfg.account_key

Check warning on line 153 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L153

Added line #L153 was not covered by tests
if azure_cfg.client_id:
kwargs["client_id"] = azure_cfg.client_id
store_kwargs["client_id"] = azure_cfg.client_id

Check warning on line 155 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L155

Added line #L155 was not covered by tests
if azure_cfg.client_secret:
kwargs["client_secret"] = azure_cfg.client_secret
store_kwargs["client_secret"] = azure_cfg.client_secret

Check warning on line 157 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L157

Added line #L157 was not covered by tests
if azure_cfg.tenant_id:
kwargs["tenant_id"] = azure_cfg.tenant_id
kwargs[_ANON] = anonymous
store_kwargs["tenant_id"] = azure_cfg.tenant_id

Check warning on line 159 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L159

Added line #L159 was not covered by tests

store = AzureStore.from_env(

Check warning on line 161 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L161

Added line #L161 was not covered by tests
container,
config={
**store_kwargs,
},
)

kwargs["store"] = store

Check warning on line 168 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L168

Added line #L168 was not covered by tests

if anonymous:
kwargs[_ANON] = True

Check warning on line 171 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L171

Added line #L171 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using anonymous parameter for _ANON

Consider using kwargs[_ANON] = anonymous instead of hardcoding True to maintain consistency with the input parameter value.

Code suggestion
Check the AI-generated fix before applying
Suggested change
kwargs[_ANON] = True
kwargs[_ANON] = anonymous

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


return kwargs


Expand Down Expand Up @@ -189,21 +272,27 @@
protocol: typing.Optional[str] = None,
anonymous: bool = False,
path: typing.Optional[str] = None,
bucket: str = "",
**kwargs,
) -> fsspec.AbstractFileSystem:
# TODO: add bucket to adlfs
if not protocol:
return self._default_remote
if protocol == "file":
kwargs["auto_mkdir"] = True
return FlyteLocalFileSystem(**kwargs)
elif protocol == "s3":
s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
s3kwargs = s3_setup_args(self._data_config.s3, bucket, anonymous=anonymous)

Check warning on line 285 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L285

Added line #L285 was not covered by tests
s3kwargs.update(kwargs)
return fsspec.filesystem(protocol, **s3kwargs) # type: ignore
elif protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
return fsspec.filesystem(protocol, **kwargs) # type: ignore
gskwargs = gs_setup_args(self._data_config.gcs, bucket, anonymous=anonymous)
gskwargs.update(kwargs)
return fsspec.filesystem(protocol, **gskwargs) # type: ignore

Check warning on line 291 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L289-L291

Added lines #L289 - L291 were not covered by tests
elif protocol == "abfs":
azkwargs = azure_setup_args(self._data_config.azure, bucket, anonymous=anonymous)
azkwargs.update(kwargs)
return fsspec.filesystem(protocol, **azkwargs) # type: ignore

Check warning on line 295 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L293-L295

Added lines #L293 - L295 were not covered by tests
elif protocol == "ftp":
kwargs.update(fsspec.implementations.ftp.FTPFileSystem._get_kwargs_from_urls(path))
return fsspec.filesystem(protocol, **kwargs)
Expand All @@ -216,16 +305,20 @@
return fsspec.filesystem(protocol, **kwargs)

async def get_async_filesystem_for_path(
self, path: str = "", anonymous: bool = False, **kwargs
self, path: str = "", bucket: str = "", anonymous: bool = False, **kwargs
) -> Union[AsyncFileSystem, fsspec.AbstractFileSystem]:
protocol = get_protocol(path)
loop = asyncio.get_running_loop()

return self.get_filesystem(protocol, anonymous=anonymous, path=path, asynchronous=True, loop=loop, **kwargs)
return self.get_filesystem(
protocol, anonymous=anonymous, path=path, bucket=bucket, asynchronous=True, loop=loop, **kwargs
)

def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem:
def get_filesystem_for_path(
self, path: str = "", bucket: str = "", anonymous: bool = False, **kwargs
) -> fsspec.AbstractFileSystem:
protocol = get_protocol(path)
return self.get_filesystem(protocol, anonymous=anonymous, path=path, **kwargs)
return self.get_filesystem(protocol, anonymous=anonymous, path=path, bucket=bucket, **kwargs)

Check warning on line 321 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L321

Added line #L321 was not covered by tests

@staticmethod
def is_remote(path: Union[str, os.PathLike]) -> bool:
Expand Down Expand Up @@ -295,7 +388,8 @@

@retry_request
async def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs):
file_system = await self.get_async_filesystem_for_path(from_path)
bucket, from_path_file_only = split_path(from_path)
file_system = await self.get_async_filesystem_for_path(from_path, bucket)
Comment on lines +389 to +390
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle empty bucket case for storage

Consider handling the case where split_path() returns empty bucket for non-file protocols. Currently passing empty bucket to get_async_filesystem_for_path() could cause issues with cloud storage access.

Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, from_path_file_only = split_path(from_path)
file_system = await self.get_async_filesystem_for_path(from_path, bucket)
bucket, from_path_file_only = split_path(from_path)
protocol = get_protocol(from_path)
if protocol not in ['file'] and not bucket:
raise ValueError(f'Empty bucket not allowed for protocol {protocol}')
file_system = await self.get_async_filesystem_for_path(from_path, bucket)

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

if recursive:
from_path, to_path = self.recursive_paths(from_path, to_path)
try:
Expand All @@ -307,7 +401,7 @@
)
logger.info(f"Getting {from_path} to {to_path}")
if isinstance(file_system, AsyncFileSystem):
dst = await file_system._get(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212
dst = await file_system._get(from_path_file_only, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212

Check warning on line 404 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L404

Added line #L404 was not covered by tests
else:
dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
Expand Down Expand Up @@ -336,7 +430,8 @@
More of an internal function to be called by put_data and put_raw_data
This does not need a separate sync function.
"""
file_system = await self.get_async_filesystem_for_path(to_path)
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
Comment on lines +431 to +432
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider validating bucket before filesystem call

Consider validating the bucket parameter before passing it to get_async_filesystem_for_path(). An empty bucket could cause issues with certain storage backends. Similar issues were also found in:

  • flytekit/core/data_persistence.py (line 318)
  • flytekit/core/data_persistence.py (line 521)
  • flytekit/core/data_persistence.py (line 308)
Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
bucket, to_path_file_only = split_path(to_path)
protocol = get_protocol(to_path)
if protocol in ['s3', 'gs', 'abfs'] and not bucket:
raise ValueError(f'Bucket cannot be empty for {protocol} protocol')
file_system = await self.get_async_filesystem_for_path(to_path, bucket)

Code Review Run #39883a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +431 to +432
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting path splitting logic

Consider extracting the bucket and path splitting logic into a separate method to improve code reusability and maintainability. The split_path function is used in multiple places and could be encapsulated better.

Code suggestion
Check the AI-generated fix before applying
Suggested change
bucket, to_path_file_only = split_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)
bucket, path = self._split_and_get_bucket_path(to_path)
file_system = await self.get_async_filesystem_for_path(to_path, bucket)

Code Review Run #0b7f4d


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

from_path = self.strip_file_header(from_path)
if recursive:
# Only check this for the local filesystem
Expand All @@ -354,7 +449,7 @@
kwargs["metadata"] = {}
kwargs["metadata"].update(self._execution_metadata)
if isinstance(file_system, AsyncFileSystem):
dst = await file_system._put(from_path, to_path, recursive=recursive, **kwargs) # pylint: disable=W0212
dst = await file_system._put(from_path, to_path_file_only, recursive=recursive, **kwargs) # pylint: disable=W0212

Check warning on line 452 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L452

Added line #L452 was not covered by tests
else:
dst = file_system.put(from_path, to_path, recursive=recursive, **kwargs)
if isinstance(dst, (str, pathlib.Path)):
Expand Down Expand Up @@ -423,11 +518,13 @@
r = await self._put(from_path, to_path, **kwargs)
return r or to_path

bucket, _ = split_path(to_path)

Check warning on line 521 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L521

Added line #L521 was not covered by tests

# See https://github.com/fsspec/s3fs/issues/871 for more background and pending work on the fsspec side to
# support effectively async open(). For now these use-cases below will revert to sync calls.
# raw bytes
if isinstance(lpath, bytes):
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)

Check warning on line 527 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L527

Added line #L527 was not covered by tests
with fs.open(to_path, "wb", **kwargs) as s:
s.write(lpath)
return to_path
Expand All @@ -436,7 +533,7 @@
if isinstance(lpath, io.BufferedReader) or isinstance(lpath, io.BytesIO):
if not lpath.readable():
raise FlyteAssertion("Buffered reader must be readable")
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)

Check warning on line 536 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L536

Added line #L536 was not covered by tests
lpath.seek(0)
with fs.open(to_path, "wb", **kwargs) as s:
while data := lpath.read(read_chunk_size_bytes):
Expand All @@ -446,7 +543,7 @@
if isinstance(lpath, io.StringIO):
if not lpath.readable():
raise FlyteAssertion("Buffered reader must be readable")
fs = self.get_filesystem_for_path(to_path)
fs = self.get_filesystem_for_path(to_path, bucket)

Check warning on line 546 in flytekit/core/data_persistence.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/data_persistence.py#L546

Added line #L546 was not covered by tests
lpath.seek(0)
with fs.open(to_path, "wb", **kwargs) as s:
while data_str := lpath.read(read_chunk_size_bytes):
Expand Down Expand Up @@ -635,6 +732,10 @@
put_data = loop_manager.synced(async_put_data)


fsspec.register_implementation("s3", ObstoreS3FileSystem)
fsspec.register_implementation("gs", ObstoreGCSFileSystem)
fsspec.register_implementation("abfs", ObstoreAzureBlobFileSystem)

flyte_tmp_dir = tempfile.mkdtemp(prefix="flyte-")
default_local_file_access_provider = FileAccessProvider(
local_sandbox_dir=os.path.join(flyte_tmp_dir, "sandbox"),
Expand Down
56 changes: 56 additions & 0 deletions flytekit/core/obstore_filesystem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
Classes that overrides the AsyncFsspecStore that specify the filesystem specific parameters
"""

from typing import Optional

from obstore.fsspec import AsyncFsspecStore

DEFAULT_BLOCK_SIZE = 5 * 2**20


class ObstoreS3FileSystem(AsyncFsspecStore):
"""
Add following property used in S3FileSystem
"""

root_marker = ""
connect_timeout = 5
retries = 5
read_timeout = 15
default_block_size = 5 * 2**20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using DEFAULT_BLOCK_SIZE constant instead

Consider using the DEFAULT_BLOCK_SIZE constant defined on line 9 instead of duplicating the value 5 * 2**20 in ObstoreS3FileSystem. This would improve maintainability and reduce the risk of inconsistencies.

Code suggestion
Check the AI-generated fix before applying
Suggested change
default_block_size = 5 * 2**20
default_block_size = DEFAULT_BLOCK_SIZE

Code Review Run #e101cd


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this and use the one defined at line 9, right

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, sorry I forgot to use the one in line 9, just fixed it to default_block_size = DEFAULT_BLOCK_SIZE

protocol = ("s3", "s3a")
_extra_tokenize_attributes = ("default_block_size",)

def __init__(self, retries: Optional[int] = None, **kwargs):
"""
Initialize the ObstoreS3FileSystem with optional retries.

Args:
retries (int): Number of retry for requests
**kwargs: Other keyword arguments passed to the parent class
"""
if retries is not None:
self.retries = retries

Check warning on line 34 in flytekit/core/obstore_filesystem.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/obstore_filesystem.py#L34

Added line #L34 was not covered by tests

super().__init__(**kwargs)

Check warning on line 36 in flytekit/core/obstore_filesystem.py

View check run for this annotation

Codecov / codecov/patch

flytekit/core/obstore_filesystem.py#L36

Added line #L36 was not covered by tests


class ObstoreGCSFileSystem(AsyncFsspecStore):
"""
Add following property used in GCSFileSystem
"""

scopes = {"read_only", "read_write", "full_control"}
retries = 6 # number of retries on http failure
default_block_size = DEFAULT_BLOCK_SIZE
protocol = "gcs", "gs"
async_impl = True


class ObstoreAzureBlobFileSystem(AsyncFsspecStore):
"""
Add following property used in AzureBlobFileSystem
"""

protocol = "abfs"
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ dependencies = [
"marshmallow-jsonschema>=0.12.0",
"mashumaro>=3.15",
"msgpack>=1.1.0",
"obstore==0.3.0b9",
"protobuf!=4.25.0",
"pygments",
"python-json-logger>=2.0.0",
Expand Down
6 changes: 3 additions & 3 deletions tests/flytekit/unit/core/test_flyte_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,11 @@ def test_directory_guess():
assert fft.extension() == ""


@mock.patch("s3fs.core.S3FileSystem._lsdir")
@mock.patch("flytekit.core.obstore_filesystem.ObstoreS3FileSystem._ls")
@mock.patch("flytekit.core.data_persistence.FileAccessProvider.get_data")
def test_list_dir(mock_get_data, mock_lsdir):
def test_list_dir(mock_get_data, mock_ls):
remote_dir = "s3://test-flytedir"
mock_lsdir.return_value = [
mock_ls.return_value = [
{"name": os.path.join(remote_dir, "file1.txt"), "type": "file"},
{"name": os.path.join(remote_dir, "file2.txt"), "type": "file"},
{"name": os.path.join(remote_dir, "subdir"), "type": "directory"},
Expand Down
Loading