Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check swap #124

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion configs/debug/diloco.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ total_steps = 4
fake = true

[diloco]
inner_steps = 2
inner_steps = 5

File renamed without changes.
19 changes: 19 additions & 0 deletions scripts/mem_use.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import sys
import time
import torch


def allocate_memory(size_bytes):
# Allocate tensor on CPU
return torch.ones((size_bytes) // 4, dtype=torch.float32)


if __name__ == "__main__":
size_gb = float(sys.argv[1])
size_bytes = int(size_gb * 1024 * 1024 * 1024)

data = allocate_memory(size_bytes)
print(f"Allocated {size_gb} GB of RAM using NumPy")
while True:
time.sleep(1)
print(f"Allocated {size_gb} GB of RAM using NumPy, data.shape: {data.shape}")
File renamed without changes.
68 changes: 68 additions & 0 deletions scripts/swap_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import torch
import psutil
import time


def get_total_ram():
return psutil.virtual_memory().total


def get_available_ram():
return psutil.virtual_memory().available


def allocate_memory(size_bytes):
# Allocate tensor on CPU
return torch.ones((size_bytes) // 4, dtype=torch.float32)


def main():
print("Starting memory allocation test...")

total_ram = get_total_ram()
print(f"Total physical RAM: {total_ram / (1024**3):.2f} GB")

# Start with 1% of total RAM
initial_percentage = 50
percentage_increment = 10
current_percentage = initial_percentage

tensors = []

while True:
try:
available_ram = get_available_ram()
allocation_size = int(total_ram * (current_percentage / 100))

# Allocate memory
tensor = allocate_memory(allocation_size)
tensors.append(tensor)

# Get current memory usage
process = psutil.Process()
memory_info = process.memory_info()

print(
f"Allocated {allocation_size / (1024**2):.2f}MB ({current_percentage}% of total RAM). "
f"Process memory used: {memory_info.rss / (1024**3):.2f}GB. "
f"Available RAM: {available_ram / (1024**3):.2f}GB"
)

# Increase percentage for next iteration
current_percentage += percentage_increment

# Sleep to allow for monitoring
time.sleep(1)

except RuntimeError as e:
print(f"Memory allocation failed: {e}")
break
except KeyboardInterrupt:
print("Test stopped by user.")
break

print("Test completed. Check your system monitor to see if swap was used.")


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions src/zeroband/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def _init_global_pg(self) -> None:
)

self._global_leader = self.world_info.global_rank == 0
self._logger.info(f"[{self.world_info.global_unique_id}] Global leader: {self._global_leader}")
self.global_store = dist.TCPStore(
host_name=self.world_info.global_addr,
port=self.world_info.global_port + self.world_info.rank,
Expand Down
29 changes: 16 additions & 13 deletions src/zeroband/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,19 +330,22 @@ def train(config: Config):
loss_batch = 0
z_loss_batch = 0

maybe_dest_rank = elastic_device_mesh.live_recovery.should_send_ckpt_to()
if maybe_dest_rank is not None:
logger.info(f"Start live recovery to rank {maybe_dest_rank}")
if config.train.log_model_hash:
logger.info(
f"live recovery outer optimizer hash: {get_optimizer_signature(diloco.outer_optimizer)}"
)
logger.info(f"live recovery outer model hash: {get_tensor_list_signature(diloco.param_list_cpu)}")
logger.info(f"inner optimizer hash: {get_optimizer_signature(inner_optimizer)}")

ckpt_manager.send_ckpt_to_peer(elastic_device_mesh.global_pg, maybe_dest_rank)

elastic_device_mesh.live_recovery.reset()
if config.diloco is not None:
maybe_dest_rank = elastic_device_mesh.live_recovery.should_send_ckpt_to()
if maybe_dest_rank is not None:
logger.info(f"Start live recovery to rank {maybe_dest_rank}")
if config.train.log_model_hash:
logger.info(
f"live recovery outer optimizer hash: {get_optimizer_signature(diloco.outer_optimizer)}"
)
logger.info(
f"live recovery outer model hash: {get_tensor_list_signature(diloco.param_list_cpu)}"
)
logger.info(f"inner optimizer hash: {get_optimizer_signature(inner_optimizer)}")

ckpt_manager.send_ckpt_to_peer(elastic_device_mesh.global_pg, maybe_dest_rank)

elastic_device_mesh.live_recovery.reset()

for grad_acc_step in range(gradient_accumulation_steps):
is_accumulating = grad_acc_step < gradient_accumulation_steps - 1
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dist/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
def test_elastic_device_mesh_no_global(world_size: int, random_available_port: int, mock_env):
def foo(**kwargs):
with mock_env(**kwargs):
edm = ElasticDeviceMesh()
edm = ElasticDeviceMesh(enable=False)

rank = int(kwargs["RANK"])
a = torch.arange(3) * (rank + 1)
Expand Down
27 changes: 19 additions & 8 deletions tests/test_torchrun/test_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ def gpus_to_use(num_nodes, num_gpu, rank):
return ",".join(map(str, range(rank * num_gpu, (rank + 1) * num_gpu)))


def _test_multi_gpu(num_gpus, config, extra_args=[]):
def _test_multi_gpu(num_gpus, config, extra_args=[], diloco=False):
num_nodes, num_gpu = num_gpus[0], num_gpus[1]

processes = []
ports = get_random_available_port_list(num_nodes)
new_port = get_random_available_port(1)
for i in range(num_nodes):
cmd = [
"torchrun",
Expand All @@ -47,7 +48,20 @@ def _test_multi_gpu(num_gpus, config, extra_args=[]):
]

env = copy.deepcopy(os.environ)

if diloco:
new_env = {
"GLOBAL_RANK": str(i),
"GLOBAL_UNIQUE_ID": str(i),
"GLOBAL_ADDR": "localhost",
"GLOBAL_WORLD_SIZE": str(num_nodes),
"GLOBAL_PORT": str(new_port),
}
env.update(new_env)

env["CUDA_VISIBLE_DEVICES"] = gpus_to_use(num_nodes, num_gpu, i)
env["ZERO_BAND_LOG_LEVEL"] = "DEBUG"

process1 = subprocess.Popen(cmd, env=env)
processes.append(process1)

Expand All @@ -62,10 +76,9 @@ def test_multi_gpu(num_gpus):
_test_multi_gpu(num_gpus, "debug/normal.toml")


@pytest.mark.parametrize("num_gpus", [[1, 2], [2, 2]])
@pytest.mark.parametrize("num_gpus", [[2, 1], [2, 2]])
def test_multi_gpu_diloco(num_gpus):
# we don't test 1,1 and 2,1 because 1 solo gpu failed with fsdp
_test_multi_gpu(num_gpus, "debug/diloco.toml")
_test_multi_gpu(num_gpus, "debug/diloco.toml", diloco=True)


def test_act_ckpt():
Expand All @@ -78,12 +91,10 @@ def test_act_ckpt_num():
_test_multi_gpu(num_gpus, "debug/normal.toml", extra_args=["--train.ac_ckpt", "2"])


@pytest.mark.parametrize(
"backend", [Compression.NO, Compression.UINT8]
) # not adding CINT8 because the compile is too slow
@pytest.mark.parametrize("backend", [Compression.NO, Compression.UINT8])
def test_all_reduce_diloco(backend: Compression):
num_gpus = [2, 1]
_test_multi_gpu(num_gpus, "debug/diloco.toml", extra_args=["--diloco.compression", backend.value])
_test_multi_gpu(num_gpus, "debug/diloco.toml", extra_args=["--diloco.compression", backend.value], diloco=True)


def test_z_loss():
Expand Down