Skip to content

Commit

Permalink
Maintain order for merge_index_from_list (#472)
Browse files Browse the repository at this point in the history
* Maintain order for merge_index_from_list

* Update doc

* Fix lints
  • Loading branch information
XiaohanZhangCMU authored Oct 17, 2023
1 parent d74af48 commit 35c5d36
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 97 deletions.
100 changes: 7 additions & 93 deletions examples/multiprocess_dataset_conversion.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -413,93 +413,7 @@
"source": [
"### Merge meta data\n",
"\n",
"The last step of the conversion process is to merge all the sub-directories `index.json` file. The content of the Shard files will remain as it is. \n",
"\n",
"**Steps:**\n",
"- Read the sub-directory `index.json` file.\n",
"- Modify a local `raw_data` shard file name to a global shard file name.\n",
"- Modify a local `zip_data` shard file name to a global shard file name if any.\n",
"- Rename the shard file path if shard present locally.\n",
"- Write the global shard information to a new `index.json` file.\n",
"\n",
"**[Optional] steps:**\n",
"- Delete the sub-directories"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "RmCrcCq_NMN2"
},
"outputs": [],
"source": [
"def with_id(basename: str, shard_id: int) -> str:\n",
" \"\"\"Get a new basename with the given shard_id.\n",
"\n",
" Args:\n",
" basename (str): Old basename of file.\n",
" shard_id (int): New shard ID.\n",
"\n",
" Returns:\n",
" str: New basename of file.\n",
" \"\"\"\n",
" parts = basename.split('.')\n",
" parts[1] = f'{shard_id:05}'\n",
" return '.'.join(parts)\n",
"\n",
"\n",
"def merge_shard_groups(root: str) -> None:\n",
" \"\"\"Merge ephemeral sub-datasets created in parallel into one dataset.\n",
"\n",
" Args:\n",
" root (str): Root directory.\n",
" \"\"\"\n",
" pattern = os.path.join(root, '*')\n",
" subdirs = sorted(glob(pattern))\n",
" shard_id = 0\n",
" infos = []\n",
" for subdir in subdirs:\n",
" index_filename = os.path.join(subdir, 'index.json')\n",
" obj = json.load(open(index_filename))\n",
" for info in obj['shards']:\n",
" old_basename = info['raw_data']['basename']\n",
" new_basename = with_id(old_basename, shard_id)\n",
" info['raw_data']['basename'] = new_basename\n",
"\n",
" if info['zip_data'] is not None:\n",
" old_basename = info['zip_data']['basename']\n",
" new_basename = with_id(old_basename, shard_id)\n",
" info['zip_data']['basename'] = new_basename\n",
"\n",
" old_filename = os.path.join(subdir, old_basename)\n",
" new_filename = os.path.join(root, new_basename)\n",
" assert not os.rename(old_filename, new_filename)\n",
"\n",
" shard_id += 1\n",
" infos.append(info)\n",
"\n",
" assert not os.remove(index_filename)\n",
" assert not os.rmdir(subdir)\n",
"\n",
" index_filename = os.path.join(root, 'index.json')\n",
" obj = {\n",
" 'version': 2,\n",
" 'shards': infos,\n",
" }\n",
" text = json.dumps(obj, sort_keys=True)\n",
" with open(index_filename, 'w') as out:\n",
" out.write(text)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {
"id": "Kj3FZWaBNQdk"
},
"source": [
"Merge the shards"
"The last step of the conversion process is to merge all the sub-directories `index.json` file. The content of the Shard files will remain as it is. By calling the merge_index utility function, the global shard information will be written to a new `index.json` file placed in `out`."
]
},
{
Expand All @@ -510,7 +424,8 @@
},
"outputs": [],
"source": [
"merge_shard_groups(out_root)"
"from streaming.base.util import merge_index\n",
"merge_index(out_root, keep_local=True)"
]
},
{
Expand Down Expand Up @@ -626,7 +541,7 @@
"provenance": []
},
"kernelspec": {
"display_name": "cloud_upload_streaming",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
Expand All @@ -640,10 +555,9 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.10"
},
"orig_nbformat": 4
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
"nbformat_minor": 4
}
10 changes: 7 additions & 3 deletions streaming/base/storage/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,8 +999,12 @@ def list_objects(self, prefix: Optional[str] = None) -> List[str]:
"""
if prefix is None:
prefix = ''
prefix = os.path.join(self.local, prefix)

file_paths = []
for dirpath, _, files in sorted(os.walk(os.path.join(self.local, prefix))):
for dirpath, _, files in os.walk(self.local):
for file in files:
file_paths.append(os.path.join(dirpath, file))
return file_paths
file_path = os.path.join(dirpath, file)
if file_path.startswith(prefix):
file_paths.append(file_path)
return sorted(file_paths)
3 changes: 2 additions & 1 deletion streaming/base/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import shutil
import tempfile
import urllib.parse
from collections import OrderedDict
from multiprocessing.shared_memory import SharedMemory as BuiltinSharedMemory
from pathlib import Path
from time import sleep, time
Expand Down Expand Up @@ -286,7 +287,7 @@ def _merge_index_from_list(index_file_urls: List[Union[str, Tuple[str, str]]],
cu = CloudUploader.get(out, keep_local=True, exist_ok=True)

# Remove duplicates, and strip '/' from right if any
index_file_urls = list(set(index_file_urls))
index_file_urls = list(OrderedDict.fromkeys(index_file_urls))
urls = []
for url in index_file_urls:
if isinstance(url, str):
Expand Down
35 changes: 35 additions & 0 deletions tests/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,3 +501,38 @@ def test_upload_file_exception(self, local_remote_dir: Tuple[str, str]):
lc = LocalUploader(out=(local, remote))
with pytest.raises(FileNotFoundError, match=f'No such file or directory:.*'):
lc.upload_file(filename)

def test_list_objects_no_prefix(self, local_remote_dir: Tuple[str, str]):
local, _ = local_remote_dir
lc = LocalUploader(out=local)

# Generate some local files for testing
test_dir = os.path.join(local, 'test_dir')
os.makedirs(test_dir, exist_ok=True)
with open(os.path.join(test_dir, 'file2.txt'),
'w') as f2, open(os.path.join(test_dir, 'file1.txt'), 'w') as f1:
f1.write('Content of file1')
f2.write('Content of file2')

result = lc.list_objects()
expected = [os.path.join(test_dir, 'file1.txt'), os.path.join(test_dir, 'file2.txt')]
assert (result == expected)

def test_list_objects_with_prefix(self, local_remote_dir: Tuple[str, str]):
local, _ = local_remote_dir
lc = LocalUploader(out=local)

# Generate some local files for testing
test_dir = os.path.join(local, 'test_dir')
os.makedirs(test_dir, exist_ok=True)
with open(os.path.join(test_dir, 'prefix_file2.txt'),
'w') as f2, open(os.path.join(test_dir, 'prefix_file1.txt'), 'w') as f1:
f1.write('Content of file1')
f2.write('Content of file2')

result = lc.list_objects(prefix='test_dir/prefix')
expected = [
os.path.join(test_dir, 'prefix_file1.txt'),
os.path.join(test_dir, 'prefix_file2.txt')
]
assert (result == expected)

0 comments on commit 35c5d36

Please sign in to comment.