-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Apply obstore as storage backend #3033
base: master
Are you sure you want to change the base?
Conversation
Code Review Agent Run #39883aActionable Suggestions - 7
Additional Suggestions - 3
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
driver_pod=self.driver_pod, | ||
executor_pod=self.executor_pod, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding driver_pod
and executor_pod
to the with_overrides
method to maintain consistency with the constructor parameters.
Code suggestion
Check the AI-generated fix before applying
@@ -56,6 +56,8 @@ def with_overrides(
new_spark_conf: Optional[Dict[str, str]] = None,
new_hadoop_conf: Optional[Dict[str, str]] = None,
new_databricks_conf: Optional[Dict[str, Dict]] = None,
+ driver_pod: Optional[K8sPod] = None,
+ executor_pod: Optional[K8sPod] = None,
) -> "SparkJob":
if not new_spark_conf:
new_spark_conf = self.spark_conf
@@ -65,6 +67,12 @@ def with_overrides(
if not new_databricks_conf:
new_databricks_conf = self.databricks_conf
+ if not driver_pod:
+ driver_pod = self.driver_pod
+
+ if not executor_pod:
+ executor_pod = self.executor_pod
+
return SparkJob(
spark_type=self.spark_type,
application_file=self.application_file,
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod else None, | ||
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding null checks for to_flyte_idl()
calls on driver_pod
and executor_pod
to avoid potential NoneType errors.
Code suggestion
Check the AI-generated fix before applying
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod else None, | |
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod else None, | |
driverPod=self.driver_pod.to_flyte_idl() if self.driver_pod and hasattr(self.driver_pod, 'to_flyte_idl') else None, | |
executorPod=self.executor_pod.to_flyte_idl() if self.executor_pod and hasattr(self.executor_pod, 'to_flyte_idl') else None, |
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
if "file" in path: | ||
# no bucket for file | ||
return "", path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition if "file" in path
may match paths containing 'file' anywhere in the string, not just the protocol. Consider using if get_protocol(path) == "file"
for more precise protocol checking.
Code suggestion
Check the AI-generated fix before applying
if "file" in path: | |
# no bucket for file | |
return "", path | |
if get_protocol(path) == "file": | |
# no bucket for file | |
return "", path |
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
support_types = ["s3", "gs", "abfs"] | ||
if protocol in support_types: | ||
file_path = "/".join(path_li[1:]) | ||
return (bucket, file_path) | ||
else: | ||
return bucket, path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list of supported storage types support_types = ['s3', 'gs', 'abfs']
could be defined as a module-level constant since it's used for validation. Consider moving it outside the function to improve maintainability.
Code suggestion
Check the AI-generated fix before applying
@@ -53,1 +53,2 @@
_ANON = "anon"
+SUPPORTED_STORAGE_TYPES = ["s3", "gs", "abfs"]
@@ -136,2 +136,1 @@
- support_types = ["s3", "gs", "abfs"]
- if protocol in support_types:
+ if protocol in SUPPORTED_STORAGE_TYPES:
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
kwargs["store"] = store | ||
|
||
if anonymous: | ||
kwargs[_ANON] = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using kwargs[_ANON] = anonymous
instead of hardcoding True
to maintain consistency with the input parameter value.
Code suggestion
Check the AI-generated fix before applying
kwargs[_ANON] = True | |
kwargs[_ANON] = anonymous |
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
bucket, to_path_file_only = split_path(to_path) | ||
file_system = await self.get_async_filesystem_for_path(to_path, bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider validating the bucket
parameter before passing it to get_async_filesystem_for_path()
. An empty bucket could cause issues with certain storage backends. Similar issues were also found in:
- flytekit/core/data_persistence.py (line 318)
- flytekit/core/data_persistence.py (line 521)
- flytekit/core/data_persistence.py (line 308)
Code suggestion
Check the AI-generated fix before applying
bucket, to_path_file_only = split_path(to_path) | |
file_system = await self.get_async_filesystem_for_path(to_path, bucket) | |
bucket, to_path_file_only = split_path(to_path) | |
protocol = get_protocol(to_path) | |
if protocol in ['s3', 'gs', 'abfs'] and not bucket: | |
raise ValueError(f'Bucket cannot be empty for {protocol} protocol') | |
file_system = await self.get_async_filesystem_for_path(to_path, bucket) |
Code Review Run #39883a
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Code Review Agent Run #8926b7Actionable Suggestions - 0Review Details
|
successfully run it on local, not yet tested remote Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
7c76cc6
to
17bde4a
Compare
Signed-off-by: machichima <[email protected]>
Code Review Agent Run #0b7f4dActionable Suggestions - 4
Additional Suggestions - 1
Review Details
|
bucket, to_path_file_only = split_path(to_path) | ||
file_system = await self.get_async_filesystem_for_path(to_path, bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the bucket and path splitting logic into a separate method to improve code reusability and maintainability. The split_path
function is used in multiple places and could be encapsulated better.
Code suggestion
Check the AI-generated fix before applying
bucket, to_path_file_only = split_path(to_path) | |
file_system = await self.get_async_filesystem_for_path(to_path, bucket) | |
bucket, path = self._split_and_get_bucket_path(to_path) | |
file_system = await self.get_async_filesystem_for_path(to_path, bucket) |
Code Review Run #0b7f4d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
bucket, from_path_file_only = split_path(from_path) | ||
file_system = await self.get_async_filesystem_for_path(from_path, bucket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling the case where split_path()
returns empty bucket for non-file protocols. Currently passing empty bucket to get_async_filesystem_for_path()
could cause issues with cloud storage access.
Code suggestion
Check the AI-generated fix before applying
bucket, from_path_file_only = split_path(from_path) | |
file_system = await self.get_async_filesystem_for_path(from_path, bucket) | |
bucket, from_path_file_only = split_path(from_path) | |
protocol = get_protocol(from_path) | |
if protocol not in ['file'] and not bucket: | |
raise ValueError(f'Empty bucket not allowed for protocol {protocol}') | |
file_system = await self.get_async_filesystem_for_path(from_path, bucket) |
Code Review Run #0b7f4d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
fsspec.register_implementation("s3", AsyncFsspecStore) | ||
fsspec.register_implementation("gs", AsyncFsspecStore) | ||
fsspec.register_implementation("abfs", AsyncFsspecStore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving the fsspec
implementation registrations to a more appropriate initialization location, such as a module-level __init__.py
or a dedicated setup function. This would improve code organization and make the registrations more discoverable.
Code suggestion
Check the AI-generated fix before applying
fsspec.register_implementation("s3", AsyncFsspecStore) | |
fsspec.register_implementation("gs", AsyncFsspecStore) | |
fsspec.register_implementation("abfs", AsyncFsspecStore) | |
def register_fsspec_implementations(): | |
fsspec.register_implementation("s3", AsyncFsspecStore) | |
fsspec.register_implementation("gs", AsyncFsspecStore) | |
fsspec.register_implementation("abfs", AsyncFsspecStore) | |
# Call during module initialization | |
register_fsspec_implementations() |
Code Review Run #0b7f4d
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Specify the class properties for each file storage Signed-off-by: machichima <[email protected]>
not remove protocol from path other than s3, gs, and abfs Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3033 +/- ##
===========================================
- Coverage 82.79% 47.06% -35.73%
===========================================
Files 3 202 +199
Lines 186 21277 +21091
Branches 0 2740 +2740
===========================================
+ Hits 154 10015 +9861
- Misses 32 10773 +10741
- Partials 0 489 +489 ☔ View full report in Codecov by Sentry. |
Code Review Agent Run #e101cdActionable Suggestions - 3
Review Details
|
flytekit/core/obstore_filesystem.py
Outdated
connect_timeout = 5 | ||
retries = 5 | ||
read_timeout = 15 | ||
default_block_size = 5 * 2**20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using the DEFAULT_BLOCK_SIZE
constant defined on line 9 instead of duplicating the value 5 * 2**20
in ObstoreS3FileSystem
. This would improve maintainability and reduce the risk of inconsistencies.
Code suggestion
Check the AI-generated fix before applying
default_block_size = 5 * 2**20 | |
default_block_size = DEFAULT_BLOCK_SIZE |
Code Review Run #e101cd
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this and use the one defined at line 9, right
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, sorry I forgot to use the one in line 9, just fixed it to default_block_size = DEFAULT_BLOCK_SIZE
}, | ||
) | ||
|
||
kwargs["retries"] = s3_cfg.retries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider validating the retries
value before assigning it to kwargs
. A negative or extremely large value could cause issues.
Code suggestion
Check the AI-generated fix before applying
kwargs["retries"] = s3_cfg.retries | |
if s3_cfg.retries is not None and 0 <= s3_cfg.retries <= 10: | |
kwargs["retries"] = s3_cfg.retries |
Code Review Run #e101cd
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
support_types = ["s3", "gs", "abfs"] | ||
protocol = get_protocol(path) | ||
if protocol not in support_types: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving the support_types
list to a module-level constant since it represents static configuration data. This would improve maintainability and reusability.
Code suggestion
Check the AI-generated fix before applying
@@ -1,1 +1,3 @@
+SUPPORTED_PROTOCOLS = ["s3", "gs", "abfs"]
+
def split_path(path: str) -> Tuple[str, str]:
- support_types = ["s3", "gs", "abfs"]
- protocol = get_protocol(path)
- if protocol not in support_types:
+ protocol = get_protocol(path)
+ if protocol not in SUPPORTED_PROTOCOLS:
Code Review Run #e101cd
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
flytekit/core/data_persistence.py
Outdated
@@ -46,47 +48,128 @@ | |||
|
|||
# Refer to https://github.com/fsspec/s3fs/blob/50bafe4d8766c3b2a4e1fc09669cf02fb2d71454/s3fs/core.py#L198 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's update this link if we're going to change the args.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! I updated the link in the new commit
store_kwargs["endpoint_url"] = s3_cfg.endpoint | ||
# kwargs["client_kwargs"] = {"endpoint_url": s3_cfg.endpoint} | ||
|
||
store = S3Store.from_env( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we cache these setup args functions? i think each call to S3Store is creating a new client underneath the hood in the object store library. let's add lru_cache to this call? @pingsutw
assert if the specific function is called with provided parameters Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Code Review Agent Run #ffca15Actionable Suggestions - 6
Additional Suggestions - 4
Review Details
|
assert ( | ||
"file:///abc/happy/" | ||
), "s3://my-s3-bucket/bucket1/" == local_raw_fp.recursive_paths( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The assertion syntax appears incorrect. The tuple construction and comparison operator placement seems to be malformed. Consider restructuring the assertion to properly compare the tuple values. A similar issue was also found in tests/flytekit/unit/core/test_data.py (line 69-71).
Code suggestion
Check the AI-generated fix before applying
assert ( | |
"file:///abc/happy/" | |
), "s3://my-s3-bucket/bucket1/" == local_raw_fp.recursive_paths( | |
assert ("file:///abc/happy/", "s3://my-s3-bucket/bucket1/") == \ | |
local_raw_fp.recursive_paths( | |
"file:///abc/happy/", "s3://my-s3-bucket/bucket1/") |
Code Review Run #ffca15
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
mock_from_env.return_value = mock.Mock() | ||
mock_from_env.assert_called_with( | ||
"", | ||
config={ | ||
"aws_allow_http": "true", # Allow HTTP connections | ||
"aws_virtual_hosted_style_request": "false", # Use path-style addressing | ||
}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider moving the mock assertions before the s3_setup_args
call since the mock setup should ideally be done before exercising the code under test.
Code suggestion
Check the AI-generated fix before applying
@@ -242,14 +242,14 @@
def test_s3_setup_args_env_empty(mock_from_env, mock_os, mock_get_config_file):
mock_get_config_file.return_value = None
mock_os.get.return_value = None
+ mock_from_env.return_value = mock.Mock()
s3c = S3Config.auto()
kwargs = s3_setup_args(s3c)
-
- mock_from_env.return_value = mock.Mock()
mock_from_env.assert_called_with(
"",
config={
"aws_allow_http": "true", # Allow HTTP connections
"aws_virtual_hosted_style_request": "false", # Use path-style addressing
},
)
Code Review Run #ffca15
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
mock_from_env.assert_called_with( | ||
"", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider providing a meaningful value for the empty string parameter in mock_from_env.assert_called_with()
. An empty string for what appears to be a path/endpoint parameter may not properly test the intended behavior.
Code suggestion
Check the AI-generated fix before applying
mock_from_env.assert_called_with( | |
"", | |
mock_from_env.assert_called_with( | |
"s3://test-bucket", |
Code Review Run #ffca15
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
_ANON = "anon" | ||
_FSSPEC_S3_KEY_ID = "access_key_id" | ||
_FSSPEC_S3_SECRET = "secret_access_key" | ||
_ANON = "skip_signature" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider if changing _ANON
constant from "anon"
to "skip_signature"
might affect existing code that relies on this value. This appears to be a breaking change in the S3 authentication configuration.
Code suggestion
Check the AI-generated fix before applying
_ANON = "skip_signature" | |
# TODO: Deprecate "anon" in future versions | |
_ANON = "anon" # or support both: _ANON = ("anon", "skip_signature") |
Code Review Run #ffca15
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
if anonymous: | ||
store_kwargs[_ANON] = "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _ANON
value is being set to 'true'
as a string in s3_setup_args()
but was previously being set to True
boolean. This type inconsistency could cause issues with S3 authentication.
Code suggestion
Check the AI-generated fix before applying
if anonymous: | |
store_kwargs[_ANON] = "true" | |
if anonymous: | |
store_kwargs[_ANON] = True |
Code Review Run #ffca15
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
kwargs[_ANON] = anonymous | ||
store_kwargs["tenant_id"] = azure_cfg.tenant_id | ||
if anonymous: | ||
kwargs[_ANON] = "true" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a boolean value directly instead of string 'true'
for the _ANON
parameter to maintain type consistency. Many systems interpret string 'true'
differently than boolean True
.
Code suggestion
Check the AI-generated fix before applying
kwargs[_ANON] = "true" | |
kwargs[_ANON] = True |
Code Review Run #ffca15
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Tracking issue
Related to flyteorg/flyte#4081
Why are the changes needed?
Use a Rust/Pyo3 package - obstore - as the storage backend for cloud storages. This provides the smaller dependencies size and enable users to use their own s3fs, gsfs, abfs, ... version.
What changes were proposed in this pull request?
Use obstore as the storage backend to replace s3fs, gsfs, and abfs.
How was this patch tested?
Setup process
Screenshots
Performance
Check all the applicable boxes
Related PRs
Docs link
Summary by Bito
Implementation of obstore as the new storage backend for cloud services in Flytekit, replacing direct cloud storage implementations with obstore-based filesystem classes. The changes include enhanced path splitting functionality, S3 retry support, and updated Azure storage configuration parameters. The implementation provides robust bucket handling and async filesystem support while maintaining backward compatibility with existing storage protocols through obstore's configuration format.Unit tests added: True
Estimated effort to review (1-5, lower is better): 4