Skip to content

vllm.utils.numa_utils

NUMA binding utilities for vLLM worker processes.

Adapted in part from SGLang's NUMA helper implementation: https://github.com/sgl-project/sglang/blob/ba6d54d0f08f82f42b8224908ae2459a496b31b3/python/sglang/srt/utils/numa_utils.py

_PctSku

Bases: NamedTuple

Per-SKU config used by the PCT auto-detection gate.

Source code in vllm/utils/numa_utils.py
class _PctSku(NamedTuple):
    """Per-SKU config used by the PCT auto-detection gate."""

    highest_perf: int
    priority_stride: int

_can_set_mempolicy

_can_set_mempolicy() -> bool

Check whether the current process can use NUMA memory policy syscalls.

Source code in vllm/utils/numa_utils.py
def _can_set_mempolicy() -> bool:
    """Check whether the current process can use NUMA memory policy syscalls."""
    try:
        libnuma = get_libnuma()
        if libnuma is None or libnuma.numa_available() < 0:
            return False
        mode = ctypes.c_int()
        ret = libnuma.get_mempolicy(
            ctypes.byref(mode), None, ctypes.c_ulong(0), None, ctypes.c_ulong(0)
        )
        return ret == 0
    except Exception:
        return False

_get_cpu_binding

_get_cpu_binding(
    parallel_config, gpu_index: int, numa_nodes: list[int]
) -> str | None

Return the CPU list a process should be pinned to (or None).

Source code in vllm/utils/numa_utils.py
def _get_cpu_binding(
    parallel_config, gpu_index: int, numa_nodes: list[int]
) -> str | None:
    """Return the CPU list a process should be pinned to (or None)."""
    cpu_bindings = parallel_config.numa_bind_cpus
    if cpu_bindings is None:
        pct_cpus = _maybe_get_pct_cpu_binding(numa_nodes)
        if pct_cpus is None:
            return None
        return ",".join(str(c) for c in pct_cpus)

    if gpu_index >= len(cpu_bindings):
        raise ValueError(
            f"GPU index {gpu_index} exceeds numa_bind_cpus size "
            f"{len(cpu_bindings)}. Ensure the binding lists cover every visible GPU."
        )
    return cpu_bindings[gpu_index]

_get_enginecore_numa_nodes

_get_enginecore_numa_nodes(
    parallel_config, dp_local_rank: int | None = None
) -> list[int]

Return the sorted, unique NUMA nodes of the EngineCore's DP shard.

Source code in vllm/utils/numa_utils.py
def _get_enginecore_numa_nodes(
    parallel_config, dp_local_rank: int | None = None
) -> list[int]:
    """Return the sorted, unique NUMA nodes of the EngineCore's DP shard."""
    numa_nodes = parallel_config.numa_bind_nodes
    if numa_nodes is None:
        # Trigger auto-detection (it caches into parallel_config).
        _get_numa_node(parallel_config, 0)
        numa_nodes = parallel_config.numa_bind_nodes

    if (
        parallel_config.distributed_executor_backend not in ("ray", "external_launcher")
        and parallel_config.data_parallel_backend != "ray"
        and parallel_config.nnodes_within_dp == 1
    ):
        if dp_local_rank is None:
            dp_local_rank = parallel_config.data_parallel_rank_local
            if dp_local_rank is None:
                dp_local_rank = parallel_config.data_parallel_index

        tp_pp_world_size = (
            parallel_config.pipeline_parallel_size
            * parallel_config.tensor_parallel_size
        )
        shard_start = dp_local_rank * tp_pp_world_size
        shard_end = min(shard_start + tp_pp_world_size, len(numa_nodes))
        shard_indices: range | tuple[int, ...] = range(shard_start, shard_end)
    else:
        shard_indices = range(len(numa_nodes))

    if not shard_indices:
        return [numa_nodes[0]]
    return sorted({numa_nodes[i] for i in shard_indices})

_get_gpu_index

_get_gpu_index(
    parallel_config,
    local_rank: int,
    dp_local_rank: int | None = None,
) -> int

Compute the physical GPU index used for NUMA lookup.

Source code in vllm/utils/numa_utils.py
def _get_gpu_index(
    parallel_config, local_rank: int, dp_local_rank: int | None = None
) -> int:
    """Compute the physical GPU index used for NUMA lookup."""
    if (
        parallel_config.distributed_executor_backend not in ("ray", "external_launcher")
        and parallel_config.data_parallel_backend != "ray"
        and parallel_config.nnodes_within_dp == 1
    ):
        if dp_local_rank is None:
            dp_local_rank = parallel_config.data_parallel_rank_local
            if dp_local_rank is None:
                dp_local_rank = parallel_config.data_parallel_index

        tp_pp_world_size = (
            parallel_config.pipeline_parallel_size
            * parallel_config.tensor_parallel_size
        )
        return local_rank + dp_local_rank * tp_pp_world_size

    return local_rank

_get_numactl_enginecore_args

_get_numactl_enginecore_args(
    parallel_config,
    local_rank: int,
    dp_local_rank: int | None = None,
) -> str

Compute the numactl args for an EngineCore subprocess.

--numa-bind-cpus is deliberately ignored here: the user provides a per-worker CPU list, and binding EngineCore to any of those entries would shrink its cpus_allowed below the strict-superset that the workers' --physcpubind spawns require. We fall back to --cpunodebind=<shard nodes> instead, which is always a safe superset. PCT auto-detection still applies when the user did not pass --numa-bind-cpus (its priority-core union across the shard nodes is also a safe superset by construction).

Source code in vllm/utils/numa_utils.py
def _get_numactl_enginecore_args(
    parallel_config, local_rank: int, dp_local_rank: int | None = None
) -> str:
    """Compute the numactl args for an EngineCore subprocess.

    ``--numa-bind-cpus`` is deliberately ignored here: the user provides a
    per-worker CPU list, and binding EngineCore to any of those entries
    would shrink its ``cpus_allowed`` below the strict-superset that the
    workers' ``--physcpubind`` spawns require. We fall back to
    ``--cpunodebind=<shard nodes>`` instead, which is always a safe
    superset. PCT auto-detection still applies when the user did not pass
    ``--numa-bind-cpus`` (its priority-core union across the shard nodes
    is also a safe superset by construction).
    """
    shard_nodes = _get_enginecore_numa_nodes(parallel_config, dp_local_rank)
    membind_arg = ",".join(str(n) for n in shard_nodes)

    pct_cpus = (
        None
        if parallel_config.numa_bind_cpus is not None
        else _maybe_get_pct_cpu_binding(shard_nodes)
    )

    if pct_cpus is not None:
        cpu_binding = ",".join(str(c) for c in pct_cpus)
        logger.info(
            "Binding EngineCore subprocess (local_rank=%s) to CPUs %s "
            "and NUMA nodes %s",
            local_rank,
            cpu_binding,
            membind_arg,
        )
        return f"--physcpubind={cpu_binding} --membind={membind_arg}"

    logger.info(
        "Binding EngineCore subprocess (local_rank=%s) to NUMA nodes %s",
        local_rank,
        membind_arg,
    )
    return f"--cpunodebind={membind_arg} --membind={membind_arg}"

_get_numactl_executable

_get_numactl_executable() -> tuple[str, str]

Return the fixed wrapper executable used to launch numactl.

Source code in vllm/utils/numa_utils.py
def _get_numactl_executable() -> tuple[str, str]:
    """Return the fixed wrapper executable used to launch numactl."""
    from shutil import which

    if which("numactl") is None:
        raise RuntimeError(
            "numactl is required for NUMA binding but is not installed or "
            "not available on PATH."
        )

    script_path = Path(__file__).with_name("numa_wrapper.sh")
    return str(script_path), f"{script_path} via {_NUMACTL_ARGS_ENV}"

_get_numactl_worker_args

_get_numactl_worker_args(
    parallel_config,
    local_rank: int,
    dp_local_rank: int | None = None,
) -> str

Compute the numactl args for a single TP/PP worker subprocess.

Source code in vllm/utils/numa_utils.py
def _get_numactl_worker_args(
    parallel_config, local_rank: int, dp_local_rank: int | None = None
) -> str:
    """Compute the numactl args for a single TP/PP worker subprocess."""
    gpu_index = _get_gpu_index(parallel_config, local_rank, dp_local_rank)
    numa_node = _get_numa_node(parallel_config, gpu_index)
    cpu_binding = _get_cpu_binding(parallel_config, gpu_index, [numa_node])

    if cpu_binding is not None:
        logger.info(
            "Binding worker subprocess (local_rank=%s, gpu_index=%s) to CPUs %s and NUMA node %s",  # noqa: E501
            local_rank,
            gpu_index,
            cpu_binding,
            numa_node,
        )
        return f"--physcpubind={cpu_binding} --membind={numa_node}"

    logger.info(
        "Binding worker subprocess (local_rank=%s, gpu_index=%s) to NUMA node %s",
        local_rank,
        gpu_index,
        numa_node,
    )
    return f"--cpunodebind={numa_node} --membind={numa_node}"

_is_auto_numa_available

_is_auto_numa_available() -> bool

Check whether automatic GPU-to-NUMA detection should be attempted.

Source code in vllm/utils/numa_utils.py
def _is_auto_numa_available() -> bool:
    """Check whether automatic GPU-to-NUMA detection should be attempted."""
    from vllm.platforms import current_platform

    if not current_platform.is_cuda_alike():
        return False

    if not os.path.isdir("/sys/devices/system/node/node1"):
        return False

    try:
        process = psutil.Process(os.getpid())
        cpu_affinity = process.cpu_affinity()
        cpu_count = psutil.cpu_count()
        if cpu_count is not None and cpu_affinity != list(range(cpu_count)):
            logger.warning(
                "CPU affinity is already constrained for this process. "
                "Skipping automatic NUMA binding; pass --numa-bind-nodes "
                "explicitly to override."
            )
            return False
    except (AttributeError, NotImplementedError, psutil.Error):
        pass

    if not _can_set_mempolicy():
        logger.warning(
            "User lacks permission to set NUMA memory policy. "
            "Automatic NUMA detection may not work; if you are using Docker, "
            "try adding --cap-add SYS_NICE."
        )
        return False

    if not hasattr(current_platform, "get_all_device_numa_nodes"):
        logger.warning(
            "Platform %s does not support automatic NUMA detection",
            type(current_platform).__name__,
        )
        return False

    return True

_maybe_get_pct_cpu_binding

_maybe_get_pct_cpu_binding(
    numa_nodes: list[int],
) -> list[int] | None

Return the union of PCT priority cores across numa_nodes (or None).

PCT (Priority Core Turbo) lets a subset of cores boost above the rest; we want workers and the EngineCore on those cores. The Linux kernel does not expose PCT membership without root, so we use the empirical heuristic documented above _PCT_CAPABLE_SKUS: priority cores within each NUMA node satisfy cpu_id % stride in (0, 1) intersected with the node's cpulist, where stride is the SKU's logical CPUs per priority group (16 on 64-core SKUs, 18 on 72-core SKUs). Only triggers on the SKUs in _PCT_CAPABLE_SKUS with the expected CPPC highest_perf signal; on any other host it returns None and the caller falls back to the default NUMA-node bind.

Returns the sorted CPU ids as a list[int]; the caller is expected to format them for the chosen tool (e.g. comma-joined for numactl --physcpubind).

Source code in vllm/utils/numa_utils.py
def _maybe_get_pct_cpu_binding(numa_nodes: list[int]) -> list[int] | None:
    """Return the union of PCT priority cores across ``numa_nodes`` (or None).

    PCT (Priority Core Turbo) lets a subset of cores boost above the rest;
    we want workers and the EngineCore on those cores. The Linux kernel does
    not expose PCT membership without root, so we use the empirical heuristic
    documented above ``_PCT_CAPABLE_SKUS``: priority cores within each NUMA
    node satisfy ``cpu_id % stride in (0, 1)`` intersected with the node's
    ``cpulist``, where ``stride`` is the SKU's logical CPUs per priority
    group (16 on 64-core SKUs, 18 on 72-core SKUs). Only triggers on the
    SKUs in ``_PCT_CAPABLE_SKUS`` with the expected CPPC ``highest_perf``
    signal; on any other host it returns None and the caller falls back to
    the default NUMA-node bind.

    Returns the sorted CPU ids as a ``list[int]``; the caller is expected
    to format them for the chosen tool (e.g. comma-joined for
    ``numactl --physcpubind``).
    """
    sku = _pct_sku_config()
    if sku is None:
        return None

    from vllm.utils.cpu_resource_utils import parse_id_list

    stride = sku.priority_stride
    union_cpus: set[int] = set()
    for numa_node in numa_nodes:
        cpulist_path = Path(f"/sys/devices/system/node/node{numa_node}/cpulist")
        try:
            cpulist_raw = cpulist_path.read_text().strip()
        except OSError:
            continue
        if not cpulist_raw:
            continue
        try:
            node_cpus = parse_id_list(cpulist_raw)
        except ValueError:
            continue

        priority = [cpu for cpu in node_cpus if cpu % stride in (0, 1)]
        if not priority:
            continue
        union_cpus.update(priority)
        logger.info(
            "Detected PCT-capable Granite Rapids Xeon (stride=%d); "
            "NUMA node %d priority cores: %s",
            stride,
            numa_node,
            ",".join(str(c) for c in priority),
        )

    if not union_cpus:
        return None
    return sorted(union_cpus)

_pct_sku_config cached

_pct_sku_config() -> _PctSku | None

Detect a PCT-capable Granite Rapids Xeon with PCT enabled.

See the comment block above _PCT_CAPABLE_SKUS for the full context (why we hard-code SKUs, why we read CPPC highest_perf, etc.).

Returns the matching _PctSku config when both gates hold: * /proc/cpuinfo model name contains an SKU listed in _PCT_CAPABLE_SKUS. * /sys/devices/system/cpu/cpu0/acpi_cppc/highest_perf matches that SKU's expected highest_perf. Otherwise returns None and the caller falls back to the default NUMA-node bind.

Source code in vllm/utils/numa_utils.py
@cache
def _pct_sku_config() -> _PctSku | None:
    """Detect a PCT-capable Granite Rapids Xeon with PCT enabled.

    See the comment block above ``_PCT_CAPABLE_SKUS`` for the full context
    (why we hard-code SKUs, why we read CPPC ``highest_perf``, etc.).

    Returns the matching ``_PctSku`` config when both gates hold:
      * ``/proc/cpuinfo`` ``model name`` contains an SKU listed in
        ``_PCT_CAPABLE_SKUS``.
      * ``/sys/devices/system/cpu/cpu0/acpi_cppc/highest_perf`` matches
        that SKU's expected ``highest_perf``.
    Otherwise returns ``None`` and the caller falls back to the default
    NUMA-node bind.
    """
    sku = _pct_sku_from_cpuinfo()
    if sku is None:
        return None

    try:
        with open(_PCT_HIGHEST_PERF_PATH) as f:
            actual = int(f.read().strip())
    except (OSError, ValueError):
        return None
    if actual != sku.highest_perf:
        return None
    return sku

_pct_sku_from_cpuinfo

_pct_sku_from_cpuinfo() -> _PctSku | None

Return the _PctSku config for this host's SKU, or None.

Reads /proc/cpuinfo's model name and looks the SKU up in _PCT_CAPABLE_SKUS. Returns None when the host is not a known PCT-capable Granite Rapids Xeon (or when /proc/cpuinfo is unreadable).

Source code in vllm/utils/numa_utils.py
def _pct_sku_from_cpuinfo() -> _PctSku | None:
    """Return the ``_PctSku`` config for this host's SKU, or None.

    Reads ``/proc/cpuinfo``'s ``model name`` and looks the SKU up in
    ``_PCT_CAPABLE_SKUS``. Returns ``None`` when the host is not a known
    PCT-capable Granite Rapids Xeon (or when ``/proc/cpuinfo`` is
    unreadable).
    """
    try:
        with open(_PROC_CPUINFO_PATH) as f:
            for line in f:
                if not line.lstrip().lower().startswith("model name"):
                    continue
                for sku, config in _PCT_CAPABLE_SKUS.items():
                    if sku in line:
                        return config
    except OSError:
        return None
    return None

configure_subprocess

configure_subprocess(
    vllm_config: VllmConfig,
    local_rank: int,
    dp_local_rank: int | None = None,
    process_kind: str = "worker",
)

Temporarily replace the multiprocessing executable with a numactl wrapper.

Source code in vllm/utils/numa_utils.py
@contextmanager
def configure_subprocess(
    vllm_config: "VllmConfig",
    local_rank: int,
    dp_local_rank: int | None = None,
    process_kind: str = "worker",
):
    """Temporarily replace the multiprocessing executable with a numactl wrapper."""
    parallel_config = vllm_config.parallel_config
    if not parallel_config.numa_bind:
        yield
        return

    if process_kind == "EngineCore":
        numactl_args = _get_numactl_enginecore_args(
            parallel_config, local_rank, dp_local_rank
        )
    elif process_kind == "worker":
        numactl_args = _get_numactl_worker_args(
            parallel_config, local_rank, dp_local_rank
        )
    else:
        raise ValueError(
            f"Unknown process_kind {process_kind!r}; expected 'worker' or 'EngineCore'."
        )

    executable, debug_str = _get_numactl_executable()
    python_executable = os.fsdecode(multiprocessing.spawn.get_executable())
    with (
        _set_numa_wrapper_env(numactl_args, python_executable),
        _mp_set_executable(executable, debug_str),
    ):
        yield

get_auto_numa_nodes cached

get_auto_numa_nodes() -> list[int] | None

Auto-detect NUMA nodes for all visible GPUs.

Source code in vllm/utils/numa_utils.py
@cache
def get_auto_numa_nodes() -> list[int] | None:
    """Auto-detect NUMA nodes for all visible GPUs."""
    from vllm.platforms import current_platform

    if not _is_auto_numa_available():
        return None

    numa_nodes = current_platform.get_all_device_numa_nodes()
    if numa_nodes is not None:
        logger.info("Auto-detected NUMA nodes for GPUs: %s", numa_nodes)
    return numa_nodes

log_current_affinity_state

log_current_affinity_state(label: str) -> None

Log the process's effective NUMA affinity state.

Source code in vllm/utils/numa_utils.py
def log_current_affinity_state(label: str) -> None:
    """Log the process's effective NUMA affinity state."""
    _log_numactl_show(label)