Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions checkpoint_engine/ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class MemoryBuffer(BaseModel):
buffer: _TorchTensor
size: int
metas: list[ParameterMeta]
manually_pinned: bool = False


class MemoryBufferMetaList(BaseModel):
Expand Down Expand Up @@ -520,7 +521,7 @@ def _pin(t: torch.Tensor):
logger.info(
f"[rank{rank}] inplace pin memory for file {file_path} finished, size {buffer.nbytes / 1024 / 1024:.2f}MiB"
)
return MemoryBuffer(buffer=buffer, size=buffer.nbytes, metas=metas)
return MemoryBuffer(buffer=buffer, size=buffer.nbytes, metas=metas, manually_pinned=True)

memory_buffers: list[MemoryBuffer] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
Expand Down Expand Up @@ -965,12 +966,12 @@ def register_checkpoint(
files: list[str] | None = None,
named_tensors: dict[str, torch.Tensor] | None = None,
use_shared_memory_pool: bool = False,
use_inplace_pin_memory: bool = False,
use_inplace_pin_memory: bool = True,
Comment thread
specture724 marked this conversation as resolved.
) -> None:
"""
Register a checkpoint to the parameter server. Both files and named_tensors will be registered together.
Warning: if `use_inplace_pin_memory` is True, .safetensors files in /dev/shm/ will be pinned in-place, and the files will be REMOVED after pinning.
Please make sure to copy the files to disks if you need to keep them.
Please make sure to copy the files to disks if you need to keep them. NPU does not support inplace pin memory.

Args:
checkpoint_name: The name of the checkpoint.
Expand All @@ -981,9 +982,14 @@ def register_checkpoint(
cannot accommodate checkpoints with different memory requirements.
To free the actual memory of the shared pool or to modify its shape,
please unregister the current user of the shared memory pool using `unregister_checkpoint` with `force=True`.
use_inplace_pin_memory: If True, allows inplace pin memory for /dev/shm/ safetensors files. This option is ignored when ``use_shared_memory_pool`` is True.
Currently, this feature is experimental and may crash.
use_inplace_pin_memory: If True (default), allows inplace pin memory for /dev/shm/ safetensors files.
This option is ignored when ``use_shared_memory_pool`` is True.
Comment thread
specture724 marked this conversation as resolved.
"""
if self.device_manager.device_type != "cuda" and use_inplace_pin_memory:
logger.warning(
f"[rank{self._rank}] Only cuda devices support in-place pin memory, set use_inplace_pin_memory to False"
)
use_inplace_pin_memory = False
try:
if use_shared_memory_pool:
logger.info(
Expand All @@ -1002,6 +1008,7 @@ def register_checkpoint(
named_tensors=named_tensors or {},
rank=self._rank,
shared_pin_memory=self._memory_pool[self.shared_memory_pool_name],
inplace_pin=False, # inplace pin memory is not compatible with shared memory pool
)
self._current_shared_memory_pool_user = checkpoint_name
if self._p2p_store is not None and _is_first_time:
Expand Down Expand Up @@ -1060,6 +1067,46 @@ def unregister_checkpoint(self, checkpoint_name: str, force: bool = False) -> No
del self._memory_pool[self.shared_memory_pool_name]
self._memory_pool[self.shared_memory_pool_name] = []
else:

def _unpin(t: torch.Tensor):
"""
Un-pin the pinned memory.
"""
p_flags = ctypes.c_uint()
try:
libc = ctypes.CDLL(None) # get all symbols from the current process
cuda_host_get_flags = libc.cudaHostGetFlags
cuda_host_get_flags.argtypes = [ctypes.POINTER(ctypes.c_uint), ctypes.c_void_p]
cuda_host_get_flags.restype = ctypes.c_int
except AttributeError:
logger.error("cudaHostGetFlags not found in libc, cannot unpin memory manually")
raise
r = cuda_host_get_flags(ctypes.byref(p_flags), ctypes.c_void_p(t.data_ptr()))
assert r == 0, f"get pin flags error, error code: {r}"
# p_flags value meaning from cuda/include/driver_types.h
# cudaHostRegisterDefault 0x00 /**< Default host memory registration flag */
# cudaHostRegisterPortable 0x01 /**< Pinned memory accessible by all CUDA contexts */
# cudaHostRegisterMapped 0x02 /**< Map registered memory into device space */
# cudaHostRegisterIoMemory 0x04 /**< Memory-mapped I/O space */
# cudaHostRegisterReadOnly 0x08 /**< Memory-mapped read-only */
assert p_flags.value == 0x02, (
f"pin memory flag error, expected: 0x02 (cudaHostRegisterMapped), got flag: {p_flags.value}"
)
cudart = torch.cuda.cudart()
r = cudart.cudaHostUnregister(t.data_ptr())
assert r == 0, f"unpin memory error, error code: {r}"

# if the checkpoint is pinned by cudaHostRegister manually, we need to unpin it manually
try:
for memory_buffer in self._memory_pool.get(checkpoint_name, []):
if memory_buffer.manually_pinned:
_unpin(memory_buffer.buffer)
except Exception as e:
logger.error(
f"[rank{self._rank}] fail to unpin memory for checkpoint {checkpoint_name}: {e}"
)
raise
# we won't delete the memory pool if unpinning fails.
del self._memory_pool[checkpoint_name]
# see https://github.com/pytorch/pytorch/blob/31d5c675394705f8a6bc767f80ae14bf4f01246b/torch/csrc/cuda/Module.cpp#L2018
# this works by using torch>=2.5.0
Expand Down
81 changes: 81 additions & 0 deletions tests/test_inplace_unpin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import os
import subprocess
import time

import pytest
import torch.distributed as dist
from test_update import device_manager, gen_test_tensors, get_world_size

from checkpoint_engine.ps import ParameterServer


dev_shm_dir = "/dev/shm/checkpoint_engine_tests" # noqa: S108


def get_files() -> list[str]:
rank = int(os.getenv("RANK"))
named_tensors = dict(gen_test_tensors(rank))
import safetensors.torch

files = []
os.makedirs(dev_shm_dir, exist_ok=True)
tensors_in_dev_shm = named_tensors
time.sleep(1)
dev_shm_files = [
os.path.join(dev_shm_dir, f"rank{rank}_checkpoint.safetensors")
for _ in range(get_world_size())
]
safetensors.torch.save_file(tensors_in_dev_shm, dev_shm_files[rank])
time.sleep(1)
files.append(dev_shm_files[rank])
return files


def run_pin_and_unpin(num_runs: int):
ps = ParameterServer(auto_pg=True)
checkpoint_name = "test_with_files"
for _ in range(num_runs):
files = get_files()
ps.register_checkpoint(checkpoint_name, files=files)
ps.gather_metas(checkpoint_name)
dist.barrier()
ps.unregister_checkpoint(checkpoint_name)
Comment thread
specture724 marked this conversation as resolved.
if ps._rank == 0:
import shutil

shutil.rmtree(dev_shm_dir)

dist.destroy_process_group()


@pytest.mark.gpu
def test_unpin_files():
world_size = device_manager.device_module.device_count()
assert world_size >= 2, "This test requires at least 2 GPUs."
master_addr = "localhost"
master_port = 25400
cmd = [
"torchrun",
"--nproc_per_node",
str(world_size),
"--master_addr",
master_addr,
"--master_port",
str(master_port),
__file__,
]

result = subprocess.run( # noqa: S603
cmd,
capture_output=False,
text=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
shell=False,
check=False,
)

assert result.returncode == 0


if __name__ == "__main__":
run_pin_and_unpin(3)
File renamed without changes.
2 changes: 0 additions & 2 deletions tests/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ def run_with_files(
os.makedirs(dev_shm_dir, exist_ok=True)
os.makedirs(disk_dir, exist_ok=True)
tensors_items = list(named_tensors.items())
tensors_in_dev_shm = named_tensors
tensors_in_dev_shm = dict(tensors_items[: len(tensors_items) // 2])
tensors_in_disk = dict(tensors_items[len(tensors_items) // 3 : 2 * len(tensors_items) // 3])
tensors_in_memory = dict(tensors_items[1 * len(tensors_items) // 2 :])
Expand Down Expand Up @@ -218,7 +217,6 @@ def run_with_files(
if rank == 0:
import shutil

# this test should be run under use_inplace_pin_memory=False. Otherwise, the files in /dev/shm/ will be deleted.
shutil.rmtree(dev_shm_dir)
shutil.rmtree(disk_dir)
assert proc.exitcode == 0
Expand Down