diff --git a/examples/multiprocess_dataset_conversion.ipynb b/examples/multiprocess_dataset_conversion.ipynb index 7d44afe0c..d0ce9f134 100644 --- a/examples/multiprocess_dataset_conversion.ipynb +++ b/examples/multiprocess_dataset_conversion.ipynb @@ -413,93 +413,7 @@ "source": [ "### Merge meta data\n", "\n", - "The last step of the conversion process is to merge all the sub-directories `index.json` file. The content of the Shard files will remain as it is. \n", - "\n", - "**Steps:**\n", - "- Read the sub-directory `index.json` file.\n", - "- Modify a local `raw_data` shard file name to a global shard file name.\n", - "- Modify a local `zip_data` shard file name to a global shard file name if any.\n", - "- Rename the shard file path if shard present locally.\n", - "- Write the global shard information to a new `index.json` file.\n", - "\n", - "**[Optional] steps:**\n", - "- Delete the sub-directories" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "id": "RmCrcCq_NMN2" - }, - "outputs": [], - "source": [ - "def with_id(basename: str, shard_id: int) -> str:\n", - " \"\"\"Get a new basename with the given shard_id.\n", - "\n", - " Args:\n", - " basename (str): Old basename of file.\n", - " shard_id (int): New shard ID.\n", - "\n", - " Returns:\n", - " str: New basename of file.\n", - " \"\"\"\n", - " parts = basename.split('.')\n", - " parts[1] = f'{shard_id:05}'\n", - " return '.'.join(parts)\n", - "\n", - "\n", - "def merge_shard_groups(root: str) -> None:\n", - " \"\"\"Merge ephemeral sub-datasets created in parallel into one dataset.\n", - "\n", - " Args:\n", - " root (str): Root directory.\n", - " \"\"\"\n", - " pattern = os.path.join(root, '*')\n", - " subdirs = sorted(glob(pattern))\n", - " shard_id = 0\n", - " infos = []\n", - " for subdir in subdirs:\n", - " index_filename = os.path.join(subdir, 'index.json')\n", - " obj = json.load(open(index_filename))\n", - " for info in obj['shards']:\n", - " old_basename = info['raw_data']['basename']\n", - " new_basename = with_id(old_basename, shard_id)\n", - " info['raw_data']['basename'] = new_basename\n", - "\n", - " if info['zip_data'] is not None:\n", - " old_basename = info['zip_data']['basename']\n", - " new_basename = with_id(old_basename, shard_id)\n", - " info['zip_data']['basename'] = new_basename\n", - "\n", - " old_filename = os.path.join(subdir, old_basename)\n", - " new_filename = os.path.join(root, new_basename)\n", - " assert not os.rename(old_filename, new_filename)\n", - "\n", - " shard_id += 1\n", - " infos.append(info)\n", - "\n", - " assert not os.remove(index_filename)\n", - " assert not os.rmdir(subdir)\n", - "\n", - " index_filename = os.path.join(root, 'index.json')\n", - " obj = {\n", - " 'version': 2,\n", - " 'shards': infos,\n", - " }\n", - " text = json.dumps(obj, sort_keys=True)\n", - " with open(index_filename, 'w') as out:\n", - " out.write(text)" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "metadata": { - "id": "Kj3FZWaBNQdk" - }, - "source": [ - "Merge the shards" + "The last step of the conversion process is to merge all the sub-directories `index.json` file. The content of the Shard files will remain as it is. By calling the merge_index utility function, the global shard information will be written to a new `index.json` file placed in `out`." ] }, { @@ -510,7 +424,8 @@ }, "outputs": [], "source": [ - "merge_shard_groups(out_root)" + "from streaming.base.util import merge_index\n", + "merge_index(out_root, keep_local=True)" ] }, { @@ -626,7 +541,7 @@ "provenance": [] }, "kernelspec": { - "display_name": "cloud_upload_streaming", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -640,10 +555,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.10" - }, - "orig_nbformat": 4 + "version": "3.10.12" + } }, "nbformat": 4, - "nbformat_minor": 0 + "nbformat_minor": 4 } diff --git a/streaming/base/storage/upload.py b/streaming/base/storage/upload.py index 527823588..dab805bf5 100644 --- a/streaming/base/storage/upload.py +++ b/streaming/base/storage/upload.py @@ -999,8 +999,12 @@ def list_objects(self, prefix: Optional[str] = None) -> List[str]: """ if prefix is None: prefix = '' + prefix = os.path.join(self.local, prefix) + file_paths = [] - for dirpath, _, files in sorted(os.walk(os.path.join(self.local, prefix))): + for dirpath, _, files in os.walk(self.local): for file in files: - file_paths.append(os.path.join(dirpath, file)) - return file_paths + file_path = os.path.join(dirpath, file) + if file_path.startswith(prefix): + file_paths.append(file_path) + return sorted(file_paths) diff --git a/streaming/base/util.py b/streaming/base/util.py index 545d6ce69..d8042eda0 100644 --- a/streaming/base/util.py +++ b/streaming/base/util.py @@ -12,6 +12,7 @@ import shutil import tempfile import urllib.parse +from collections import OrderedDict from multiprocessing.shared_memory import SharedMemory as BuiltinSharedMemory from pathlib import Path from time import sleep, time @@ -286,7 +287,7 @@ def _merge_index_from_list(index_file_urls: List[Union[str, Tuple[str, str]]], cu = CloudUploader.get(out, keep_local=True, exist_ok=True) # Remove duplicates, and strip '/' from right if any - index_file_urls = list(set(index_file_urls)) + index_file_urls = list(OrderedDict.fromkeys(index_file_urls)) urls = [] for url in index_file_urls: if isinstance(url, str): diff --git a/tests/test_upload.py b/tests/test_upload.py index dc4ae9201..57c0046f0 100644 --- a/tests/test_upload.py +++ b/tests/test_upload.py @@ -501,3 +501,38 @@ def test_upload_file_exception(self, local_remote_dir: Tuple[str, str]): lc = LocalUploader(out=(local, remote)) with pytest.raises(FileNotFoundError, match=f'No such file or directory:.*'): lc.upload_file(filename) + + def test_list_objects_no_prefix(self, local_remote_dir: Tuple[str, str]): + local, _ = local_remote_dir + lc = LocalUploader(out=local) + + # Generate some local files for testing + test_dir = os.path.join(local, 'test_dir') + os.makedirs(test_dir, exist_ok=True) + with open(os.path.join(test_dir, 'file2.txt'), + 'w') as f2, open(os.path.join(test_dir, 'file1.txt'), 'w') as f1: + f1.write('Content of file1') + f2.write('Content of file2') + + result = lc.list_objects() + expected = [os.path.join(test_dir, 'file1.txt'), os.path.join(test_dir, 'file2.txt')] + assert (result == expected) + + def test_list_objects_with_prefix(self, local_remote_dir: Tuple[str, str]): + local, _ = local_remote_dir + lc = LocalUploader(out=local) + + # Generate some local files for testing + test_dir = os.path.join(local, 'test_dir') + os.makedirs(test_dir, exist_ok=True) + with open(os.path.join(test_dir, 'prefix_file2.txt'), + 'w') as f2, open(os.path.join(test_dir, 'prefix_file1.txt'), 'w') as f1: + f1.write('Content of file1') + f2.write('Content of file2') + + result = lc.list_objects(prefix='test_dir/prefix') + expected = [ + os.path.join(test_dir, 'prefix_file1.txt'), + os.path.join(test_dir, 'prefix_file2.txt') + ] + assert (result == expected)