diff --git a/docs/docs/concepts/backends.md b/docs/docs/concepts/backends.md index 2ed49bda37..10dcf67eee 100644 --- a/docs/docs/concepts/backends.md +++ b/docs/docs/concepts/backends.md @@ -1049,7 +1049,7 @@ projects: verbs: ["get", "create"] - apiGroups: [""] resources: ["pods"] - verbs: ["get", "create", "delete"] + verbs: ["get", "create", "delete", "list"] - apiGroups: [""] resources: ["services"] verbs: ["get", "create", "delete"] diff --git a/src/dstack/_internal/core/backends/kubernetes/compute.py b/src/dstack/_internal/core/backends/kubernetes/compute.py index 7f8ef9123f..10b8e5366f 100644 --- a/src/dstack/_internal/core/backends/kubernetes/compute.py +++ b/src/dstack/_internal/core/backends/kubernetes/compute.py @@ -6,7 +6,7 @@ from enum import Enum from typing import List, Optional -from gpuhunt import KNOWN_AMD_GPUS, KNOWN_NVIDIA_GPUS, AcceleratorVendor +from gpuhunt import AcceleratorVendor from kubernetes import client from dstack._internal.core.backends.base.compute import ( @@ -19,13 +19,32 @@ generate_unique_instance_name_for_job, get_docker_commands, get_dstack_gateway_commands, - normalize_arch, ) -from dstack._internal.core.backends.base.offers import filter_offers_by_requirements from dstack._internal.core.backends.kubernetes.models import ( KubernetesConfig, KubernetesProxyJumpConfig, ) +from dstack._internal.core.backends.kubernetes.resources import ( + AMD_GPU_DEVICE_ID_LABEL_PREFIX, + AMD_GPU_NAME_TO_DEVICE_IDS, + AMD_GPU_NODE_TAINT, + AMD_GPU_RESOURCE, + DUMMY_REGION, + NVIDIA_GPU_NAME_TO_GPU_INFO, + NVIDIA_GPU_NODE_TAINT, + NVIDIA_GPU_PRODUCT_LABEL, + NVIDIA_GPU_RESOURCE, + TaintEffect, + format_memory, + get_amd_gpu_from_node_labels, + get_gpu_request_from_gpu_spec, + get_instance_offer_from_node, + get_instance_offers, + get_node_labels, + get_nvidia_gpu_from_node_labels, + is_hard_taint, + is_taint_tolerated, +) from dstack._internal.core.backends.kubernetes.utils import ( call_api_method, get_api_from_config_data, @@ -33,58 +52,27 @@ ) from dstack._internal.core.consts import DSTACK_RUNNER_SSH_PORT from dstack._internal.core.errors import ComputeError -from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.gateways import ( GatewayComputeConfiguration, GatewayProvisioningData, ) from dstack._internal.core.models.instances import ( - Disk, Gpu, - InstanceAvailability, InstanceOfferWithAvailability, - InstanceRuntime, - InstanceType, - Resources, SSHConnectionParams, ) from dstack._internal.core.models.placement import PlacementGroup -from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory +from dstack._internal.core.models.resources import CPUSpec, GPUSpec from dstack._internal.core.models.routers import AnyRouterConfig from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run from dstack._internal.core.models.volumes import Volume -from dstack._internal.utils.common import get_or_error, parse_memory +from dstack._internal.utils.common import get_or_error from dstack._internal.utils.logging import get_logger logger = get_logger(__name__) JUMP_POD_IMAGE = "testcontainers/sshd:1.3.0@sha256:c50c0f59554dcdb2d9e5e705112144428ae9d04ac0af6322b365a18e24213a6a" JUMP_POD_SSH_PORT = 22 -DUMMY_REGION = "-" - -NVIDIA_GPU_RESOURCE = "nvidia.com/gpu" -NVIDIA_GPU_NODE_TAINT = NVIDIA_GPU_RESOURCE -NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product" - -AMD_GPU_RESOURCE = "amd.com/gpu" -AMD_GPU_NODE_TAINT = AMD_GPU_RESOURCE -# The oldest but still supported label format, the safest option, see the commit message: -# https://github.com/ROCm/k8s-device-plugin/commit/c0b0231b391a56bc9da4f362d561e25e960d7a48 -# E.g., beta.amd.com/gpu.device-id.74b5=4 - A node with four MI300X VF (0x74b5) GPUs -# We cannot rely on the beta.amd.com/gpu.product-name.* label, as it may be missing, see the issue: -# https://github.com/ROCm/k8s-device-plugin/issues/112 -AMD_GPU_DEVICE_ID_LABEL_PREFIX = f"beta.{AMD_GPU_RESOURCE}.device-id." - -# Taints we know and tolerate when creating our objects, e.g., the jump pod. -TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT, AMD_GPU_NODE_TAINT) - -NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS} -NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys() - -AMD_GPU_DEVICE_ID_TO_GPU_INFO = { - device_id: gpu_info for gpu_info in KNOWN_AMD_GPUS for device_id in gpu_info.device_ids -} -AMD_GPU_NAME_TO_DEVICE_IDS = {gpu.name: gpu.device_ids for gpu in KNOWN_AMD_GPUS} class Operator(str, Enum): @@ -92,12 +80,6 @@ class Operator(str, Enum): IN = "In" -class TaintEffect(str, Enum): - NO_EXECUTE = "NoExecute" - NO_SCHEDULE = "NoSchedule" - PREFER_NO_SCHEDULE = "PreferNoSchedule" - - class KubernetesCompute( ComputeWithFilteredOffersCached, ComputeWithPrivilegedSupport, @@ -117,16 +99,7 @@ def __init__(self, config: KubernetesConfig): def get_offers_by_requirements( self, requirements: Requirements ) -> list[InstanceOfferWithAvailability]: - gpu_request = 0 - if (gpu_spec := requirements.resources.gpu) is not None: - gpu_request = _get_gpu_request_from_gpu_spec(gpu_spec) - instance_offers: list[InstanceOfferWithAvailability] = [] - for node in self.api.list_node().items: - if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None: - instance_offers.extend( - filter_offers_by_requirements([instance_offer], requirements) - ) - return instance_offers + return get_instance_offers(self.api, requirements) def run_job( self, @@ -191,7 +164,7 @@ def run_job( if (cpu_max := resources_spec.cpu.count.max) is not None: resources_limits["cpu"] = str(cpu_max) if (gpu_spec := resources_spec.gpu) is not None: - if (gpu_request := _get_gpu_request_from_gpu_spec(gpu_spec)) > 0: + if (gpu_request := get_gpu_request_from_gpu_spec(gpu_spec)) > 0: gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu( self.api, gpu_spec ) @@ -208,14 +181,14 @@ def run_job( ) ) if (memory_min := resources_spec.memory.min) is not None: - resources_requests["memory"] = _render_memory(memory_min) + resources_requests["memory"] = format_memory(memory_min) if (memory_max := resources_spec.memory.max) is not None: - resources_limits["memory"] = _render_memory(memory_max) + resources_limits["memory"] = format_memory(memory_max) if (disk_spec := resources_spec.disk) is not None: if (disk_min := disk_spec.size.min) is not None: - resources_requests["ephemeral-storage"] = _render_memory(disk_min) + resources_requests["ephemeral-storage"] = format_memory(disk_min) if (disk_max := disk_spec.size.max) is not None: - resources_limits["ephemeral-storage"] = _render_memory(disk_max) + resources_limits["ephemeral-storage"] = format_memory(disk_max) if (shm_size := resources_spec.shm_size) is not None: shm_volume_name = "dev-shm" volumes_.append( @@ -223,7 +196,7 @@ def run_job( name=shm_volume_name, empty_dir=client.V1EmptyDirVolumeSource( medium="Memory", - size_limit=_render_memory(shm_size), + size_limit=format_memory(shm_size), ), ) ) @@ -338,10 +311,17 @@ def update_provisioning_data( provisioning_data.hostname = get_or_error(service_spec.cluster_ip) pod_spec = get_or_error(pod.spec) node = self.api.read_node(name=get_or_error(pod_spec.node_name)) - # The original offer has a list of GPUs already sliced according to pod spec's GPU resource - # request, which is inferred from dstack's GPUSpec, see _get_gpu_request_from_gpu_spec - gpu_request = len(provisioning_data.instance_type.resources.gpus) - if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None: + # In the original offer, the resources have already been adjusted according to + # the run configuration resource requirements, see get_offers_by_requirements() + original_resources = provisioning_data.instance_type.resources + instance_offer = get_instance_offer_from_node( + node=node, + cpu_request=original_resources.cpus, + memory_mib_request=original_resources.memory_mib, + gpu_request=len(original_resources.gpus), + disk_mib_request=original_resources.disk.size_mib, + ) + if instance_offer is not None: provisioning_data.instance_type = instance_offer.instance provisioning_data.region = instance_offer.region provisioning_data.price = instance_offer.price @@ -481,146 +461,6 @@ def terminate_gateway( ) -def _get_gpu_request_from_gpu_spec(gpu_spec: GPUSpec) -> int: - return gpu_spec.count.min or 0 - - -def _get_instance_offer_from_node( - node: client.V1Node, gpu_request: int -) -> Optional[InstanceOfferWithAvailability]: - try: - node_name = get_or_error(get_or_error(node.metadata).name) - node_status = get_or_error(node.status) - allocatable = get_or_error(node_status.allocatable) - _cpu_arch: Optional[str] = None - if node_status.node_info is not None: - _cpu_arch = node_status.node_info.architecture - cpu_arch = normalize_arch(_cpu_arch).to_cpu_architecture() - cpus = _parse_cpu(allocatable["cpu"]) - memory_mib = _parse_memory(allocatable["memory"]) - disk_size_mib = _parse_memory(allocatable["ephemeral-storage"]) - gpus = _get_node_gpus(node) - except (ValueError, KeyError) as e: - logger.exception("Failed to process node: %s: %s", type(e).__name__, e) - return None - return InstanceOfferWithAvailability( - backend=BackendType.KUBERNETES, - instance=InstanceType( - name=node_name, - resources=Resources( - cpus=cpus, - cpu_arch=cpu_arch, - memory_mib=memory_mib, - gpus=gpus[:gpu_request], - spot=False, - disk=Disk(size_mib=disk_size_mib), - ), - ), - price=0, - region=DUMMY_REGION, - availability=InstanceAvailability.AVAILABLE, - instance_runtime=InstanceRuntime.RUNNER, - ) - - -def _parse_cpu(cpu: str) -> int: - if cpu.endswith("m"): - # "m" means millicpu (1/1000 CPU), e.g., 7900m -> 7.9 -> 7 - return int(float(cpu[:-1]) / 1000) - return int(cpu) - - -def _parse_memory(memory: str) -> int: - if memory.isdigit(): - # no suffix means that the value is in bytes - return int(memory) // 2**20 - return int(parse_memory(memory, as_untis="M")) - - -def _render_memory(memory: Memory) -> str: - return f"{float(memory)}Gi" - - -def _get_node_labels(node: client.V1Node) -> dict[str, str]: - if (metadata := node.metadata) is None: - return {} - if (labels := metadata.labels) is None: - return {} - return labels - - -def _get_node_gpus(node: client.V1Node) -> list[Gpu]: - node_name = get_or_error(get_or_error(node.metadata).name) - allocatable = get_or_error(get_or_error(node.status).allocatable) - labels = _get_node_labels(node) - for gpu_resource, gpu_getter in ( - (NVIDIA_GPU_RESOURCE, _get_nvidia_gpu_from_node_labels), - (AMD_GPU_RESOURCE, _get_amd_gpu_from_node_labels), - ): - _gpu_count = allocatable.get(gpu_resource) - if not _gpu_count: - continue - gpu_count = int(_gpu_count) - if gpu_count < 1: - continue - gpu = gpu_getter(labels) - if gpu is None: - logger.warning( - "Node %s: GPU resource found, but failed to detect its model: %s=%d", - node_name, - gpu_resource, - gpu_count, - ) - return [] - return [gpu] * gpu_count - logger.debug("Node %s: no GPU resource found", node_name) - return [] - - -def _get_nvidia_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: - # We rely on https://github.com/NVIDIA/k8s-device-plugin/tree/main/docs/gpu-feature-discovery - # to detect gpus. Note that "nvidia.com/gpu.product" is not a short gpu name like "T4" or - # "A100" but a product name like "Tesla-T4" or "A100-SXM4-40GB". - # Thus, we convert the product name to a known gpu name. - gpu_product = labels.get(NVIDIA_GPU_PRODUCT_LABEL) - if gpu_product is None: - return None - gpu_product = gpu_product.replace("RTX-", "RTX") - for gpu_name in NVIDIA_GPU_NAMES: - if gpu_name.lower() in gpu_product.lower().split("-"): - break - else: - return None - gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO[gpu_name] - gpu_memory = gpu_info.memory * 1024 - # A100 may come in two variants - if "40GB" in gpu_product: - gpu_memory = 40 * 1024 - return Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory) - - -def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: - # (AMDGPUInfo.name, AMDGPUInfo.memory) pairs - gpus: set[tuple[str, int]] = set() - for label in labels: - if not label.startswith(AMD_GPU_DEVICE_ID_LABEL_PREFIX): - continue - _, _, _device_id = label.rpartition(".") - device_id = int(_device_id, 16) - gpu_info = AMD_GPU_DEVICE_ID_TO_GPU_INFO.get(device_id) - if gpu_info is None: - logger.warning("Unknown AMD GPU device id: %X", device_id) - continue - gpus.add((gpu_info.name, gpu_info.memory)) - if not gpus: - return None - if len(gpus) == 1: - gpu_name, gpu_memory_gib = next(iter(gpus)) - return Gpu(vendor=AcceleratorVendor.AMD, name=gpu_name, memory_mib=gpu_memory_gib * 1024) - logger.warning("Multiple AMD GPU models detected: %s, ignoring all GPUs", gpus) - return None - - def _get_pod_spec_parameters_for_gpu( api: client.CoreV1Api, gpu_spec: GPUSpec ) -> tuple[str, client.V1NodeAffinity, str]: @@ -643,8 +483,8 @@ def _get_nvidia_gpu_node_affinity( ) -> client.V1NodeAffinity: matching_gpu_label_values: set[str] = set() for node in nodes: - labels = _get_node_labels(node) - gpu = _get_nvidia_gpu_from_node_labels(labels) + labels = get_node_labels(node) + gpu = get_nvidia_gpu_from_node_labels(labels) if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec): matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL]) if not matching_gpu_label_values: @@ -676,8 +516,8 @@ def _get_amd_gpu_node_affinity( ) -> client.V1NodeAffinity: matching_device_ids: set[int] = set() for node in nodes: - labels = _get_node_labels(node) - gpu = _get_amd_gpu_from_node_labels(labels) + labels = get_node_labels(node) + gpu = get_amd_gpu_from_node_labels(labels) if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec): matching_device_ids.update(AMD_GPU_NAME_TO_DEVICE_IDS[gpu.name]) return client.V1NodeAffinity( @@ -828,10 +668,10 @@ def _create_jump_pod_service( taints = node_spec.taints or [] for taint in taints: # A "soft" taint, ignore. - if taint.effect == TaintEffect.PREFER_NO_SCHEDULE: + if not is_hard_taint(taint): continue has_hard_taint = True - if taint.key in TOLERATED_NODE_TAINTS: + if is_taint_tolerated(taint): tolerated_taints.add((taint.key, taint.effect)) if not has_hard_taint: toleration_required = False diff --git a/src/dstack/_internal/core/backends/kubernetes/resources.py b/src/dstack/_internal/core/backends/kubernetes/resources.py new file mode 100644 index 0000000000..018ff5fb62 --- /dev/null +++ b/src/dstack/_internal/core/backends/kubernetes/resources.py @@ -0,0 +1,363 @@ +import dataclasses +from collections.abc import Mapping +from decimal import Decimal +from enum import Enum +from typing import Callable, Optional, Union, cast + +from gpuhunt import KNOWN_AMD_GPUS, KNOWN_NVIDIA_GPUS, AcceleratorVendor + +# XXX: kubernetes.utils is missing in the stubs package +from kubernetes import utils as _kubernetes_utils # pyright: ignore[reportAttributeAccessIssue] +from kubernetes.client import CoreV1Api, V1Node, V1Taint +from typing_extensions import Self + +from dstack._internal.core.backends.base.compute import normalize_arch +from dstack._internal.core.backends.base.offers import filter_offers_by_requirements +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.instances import ( + Disk, + Gpu, + InstanceAvailability, + InstanceOfferWithAvailability, + InstanceRuntime, + InstanceType, + Resources, +) +from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory +from dstack._internal.core.models.runs import Requirements +from dstack._internal.utils.common import get_or_error +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + +DUMMY_REGION = "-" + +NVIDIA_GPU_RESOURCE = "nvidia.com/gpu" +NVIDIA_GPU_NODE_TAINT = NVIDIA_GPU_RESOURCE +NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product" + +AMD_GPU_RESOURCE = "amd.com/gpu" +AMD_GPU_NODE_TAINT = AMD_GPU_RESOURCE +# The oldest but still supported label format, the safest option, see the commit message: +# https://github.com/ROCm/k8s-device-plugin/commit/c0b0231b391a56bc9da4f362d561e25e960d7a48 +# E.g., beta.amd.com/gpu.device-id.74b5=4 - A node with four MI300X VF (0x74b5) GPUs +# We cannot rely on the beta.amd.com/gpu.product-name.* label, as it may be missing, see the issue: +# https://github.com/ROCm/k8s-device-plugin/issues/112 +AMD_GPU_DEVICE_ID_LABEL_PREFIX = f"beta.{AMD_GPU_RESOURCE}.device-id." + +NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS} +NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys() + +AMD_GPU_DEVICE_ID_TO_GPU_INFO = { + device_id: gpu_info for gpu_info in KNOWN_AMD_GPUS for device_id in gpu_info.device_ids +} +AMD_GPU_NAME_TO_DEVICE_IDS = {gpu.name: gpu.device_ids for gpu in KNOWN_AMD_GPUS} + + +class PodPhase(str, Enum): + PENDING = "Pending" + RUNNING = "Running" + SUCCEEDED = "Succeeded" + FAILED = "Failed" + UNKNOWN = "Unknown" # Deprecated: It isn't being set since 2015 + + @classmethod + def finished_statuses(cls) -> list["PodPhase"]: + return [cls.SUCCEEDED, cls.FAILED] + + def is_finished(self): + return self in self.finished_statuses() + + +class TaintEffect(str, Enum): + NO_EXECUTE = "NoExecute" + NO_SCHEDULE = "NoSchedule" + PREFER_NO_SCHEDULE = "PreferNoSchedule" + + +class KubernetesResource(str, Enum): + CPU = "cpu" + MEMORY = "memory" + EPHEMERAL_STORAGE = "ephemeral-storage" + NVIDIA_GPU = NVIDIA_GPU_RESOURCE + AMD_GPU = AMD_GPU_RESOURCE + + +@dataclasses.dataclass +class KubernetesResources: + cpu: Decimal = Decimal("0") + memory: Decimal = Decimal("0") + ephemeral_storage: Decimal = Decimal("0") + nvidia_gpu: Decimal = Decimal("0") + amd_gpu: Decimal = Decimal("0") + + @classmethod + def from_kubernetes_map(cls, map_: Mapping[str, str]) -> Self: + dct: dict[str, Decimal] = {} + for resource in KubernetesResource: + if (qty := map_.get(resource.value)) is not None: + dct[resource.name.lower()] = parse_quantity(qty) + return cls(**dct) + + def __getitem__(self, key: str) -> Decimal: + try: + resource = KubernetesResource(key) + except ValueError: + raise KeyError(key) + return getattr(self, resource.name.lower()) + + def __add__(self, other: Self) -> Self: + dct: dict[str, Decimal] = dataclasses.asdict(self) + qty: Decimal + for field, qty in dataclasses.asdict(other).items(): + dct[field] += qty + return type(self)(**dct) + + def __sub__(self, other: Self) -> Self: + dct: dict[str, Decimal] = dataclasses.asdict(self) + qty: Decimal + for field, qty in dataclasses.asdict(other).items(): + dct[field] -= qty + return type(self)(**dct) + + +parse_quantity = cast( + Callable[[Union[str, int, float, Decimal]], Decimal], _kubernetes_utils.parse_quantity +) + + +def format_memory(memory: Memory) -> str: + return f"{float(memory)}Gi" + + +def get_gpu_request_from_gpu_spec(gpu_spec: GPUSpec) -> int: + return gpu_spec.count.min or 0 + + +def get_node_name(node: V1Node) -> Optional[str]: + if (metadata := node.metadata) is None: + return None + return metadata.name + + +def get_node_labels(node: V1Node) -> dict[str, str]: + if (metadata := node.metadata) is None: + return {} + if (labels := metadata.labels) is None: + return {} + return labels + + +def is_hard_taint(taint: V1Taint) -> bool: + if taint.effect == TaintEffect.PREFER_NO_SCHEDULE: + return False + if taint.effect not in TaintEffect: + logger.warning( + "Unexpected taint %s=%s effect: %s", taint.key, taint.value or "", taint.effect + ) + return True + + +def is_taint_tolerated(taint: V1Taint) -> bool: + return taint.key in (NVIDIA_GPU_NODE_TAINT, AMD_GPU_NODE_TAINT) + + +def get_instance_offers( + api: CoreV1Api, requirements: Requirements +) -> list[InstanceOfferWithAvailability]: + resources_spec = requirements.resources + assert isinstance(resources_spec.cpu, CPUSpec) + cpu_request = resources_spec.cpu.count.min or 0 + memory_mib_request = round((resources_spec.memory.min or 0) * 1024) + gpu_request = 0 + if (gpu_spec := resources_spec.gpu) is not None: + gpu_request = get_gpu_request_from_gpu_spec(gpu_spec) + disk_mib_request = 0 + if (disk_spec := resources_spec.disk) is not None: + disk_mib_request = round((disk_spec.size.min or 0) * 1024) + + nodes_allocated_resources = _get_nodes_allocated_resources(api) + offers: list[InstanceOfferWithAvailability] = [] + for node in api.list_node().items: + if (node_name := get_node_name(node)) is None: + continue + offer = _get_instance_offer_from_node( + node=node, + node_name=node_name, + node_allocated_resources=nodes_allocated_resources.get(node_name), + cpu_request=cpu_request, + memory_mib_request=memory_mib_request, + gpu_request=gpu_request, + disk_mib_request=disk_mib_request, + ) + if offer is not None: + offers.extend(filter_offers_by_requirements([offer], requirements)) + return offers + + +def get_instance_offer_from_node( + node: V1Node, + *, + cpu_request: int, + memory_mib_request: int, + gpu_request: int, + disk_mib_request: int, +) -> Optional[InstanceOfferWithAvailability]: + node_name = get_node_name(node) + if node_name is None: + return None + return _get_instance_offer_from_node( + node=node, + node_name=node_name, + node_allocated_resources=None, + cpu_request=cpu_request, + memory_mib_request=memory_mib_request, + gpu_request=gpu_request, + disk_mib_request=disk_mib_request, + ) + + +def get_nvidia_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: + # We rely on https://github.com/NVIDIA/k8s-device-plugin/tree/main/docs/gpu-feature-discovery + # to detect gpus. Note that "nvidia.com/gpu.product" is not a short gpu name like "T4" or + # "A100" but a product name like "Tesla-T4" or "A100-SXM4-40GB". + # Thus, we convert the product name to a known gpu name. + gpu_product = labels.get(NVIDIA_GPU_PRODUCT_LABEL) + if gpu_product is None: + return None + gpu_product = gpu_product.replace("RTX-", "RTX") + for gpu_name in NVIDIA_GPU_NAMES: + if gpu_name.lower() in gpu_product.lower().split("-"): + break + else: + return None + gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO[gpu_name] + gpu_memory = gpu_info.memory * 1024 + # A100 may come in two variants + if "40GB" in gpu_product: + gpu_memory = 40 * 1024 + return Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory) + + +def get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: + # (AMDGPUInfo.name, AMDGPUInfo.memory) pairs + gpus: set[tuple[str, int]] = set() + for label in labels: + if not label.startswith(AMD_GPU_DEVICE_ID_LABEL_PREFIX): + continue + _, _, _device_id = label.rpartition(".") + device_id = int(_device_id, 16) + gpu_info = AMD_GPU_DEVICE_ID_TO_GPU_INFO.get(device_id) + if gpu_info is None: + logger.warning("Unknown AMD GPU device id: %X", device_id) + continue + gpus.add((gpu_info.name, gpu_info.memory)) + if not gpus: + return None + if len(gpus) == 1: + gpu_name, gpu_memory_gib = next(iter(gpus)) + return Gpu(vendor=AcceleratorVendor.AMD, name=gpu_name, memory_mib=gpu_memory_gib * 1024) + logger.warning("Multiple AMD GPU models detected: %s, ignoring all GPUs", gpus) + return None + + +def _get_instance_offer_from_node( + node: V1Node, + node_name: str, + node_allocated_resources: Optional[KubernetesResources], + cpu_request: int, + memory_mib_request: int, + gpu_request: int, + disk_mib_request: int, +) -> Optional[InstanceOfferWithAvailability]: + try: + node_spec = get_or_error(node.spec) + if any(is_hard_taint(t) and not is_taint_tolerated(t) for t in node_spec.taints or []): + logger.debug("Node %s: untolerated taint(s) found, skipping", node_name) + return None + node_status = get_or_error(node.status) + allocatable = get_or_error(node_status.allocatable) + _cpu_arch: Optional[str] = None + if node_status.node_info is not None: + _cpu_arch = node_status.node_info.architecture + cpu_arch = normalize_arch(_cpu_arch).to_cpu_architecture() + except ValueError as e: + logger.exception("Failed to process node %s: %s: %s", node_name, type(e).__name__, e) + return None + + node_resources = KubernetesResources.from_kubernetes_map(allocatable) + if node_allocated_resources is not None: + node_resources = node_resources - node_allocated_resources + cpu = max(0, int(node_resources.cpu)) + memory_mib = max(0, int(node_resources.memory / 2**20)) + disk_mib = max(0, int(node_resources.ephemeral_storage / 2**20)) + gpus = _get_gpus_from_node(node, node_name, node_resources) + + return InstanceOfferWithAvailability( + backend=BackendType.KUBERNETES, + instance=InstanceType( + name=node_name, + resources=Resources( + cpus=min(cpu_request, cpu), + cpu_arch=cpu_arch, + memory_mib=min(memory_mib_request, memory_mib), + gpus=gpus[:gpu_request], + disk=Disk(size_mib=min(disk_mib_request, disk_mib)), + spot=False, + ), + ), + price=0, + region=DUMMY_REGION, + availability=InstanceAvailability.AVAILABLE, + instance_runtime=InstanceRuntime.RUNNER, + ) + + +def _get_gpus_from_node( + node: V1Node, node_name: str, node_resources: KubernetesResources +) -> list[Gpu]: + labels = get_node_labels(node) + for gpu_resource, gpu_getter in ( + (NVIDIA_GPU_RESOURCE, get_nvidia_gpu_from_node_labels), + (AMD_GPU_RESOURCE, get_amd_gpu_from_node_labels), + ): + gpu_count = int(node_resources[gpu_resource]) + if gpu_count < 1: + continue + gpu = gpu_getter(labels) + if gpu is None: + logger.warning( + "Node %s: GPU resource found, but failed to detect its model: %s=%d", + node_name, + gpu_resource, + gpu_count, + ) + return [] + return [gpu] * gpu_count + logger.debug("Node %s: no available GPU resource found", node_name) + return [] + + +def _get_nodes_allocated_resources(api: CoreV1Api) -> dict[str, KubernetesResources]: + nodes_allocated_resources: dict[str, KubernetesResources] = {} + for pod in api.list_pod_for_all_namespaces().items: + pod_status = get_or_error(pod.status) + pod_phase = PodPhase(get_or_error(pod_status.phase)) + if pod_phase.is_finished(): + continue + pod_spec = get_or_error(pod.spec) + node_name = pod_spec.node_name + if node_name is None: + continue + pod_requests = KubernetesResources() + # TODO: Should we also check PodSpec.resources? As of 2026-01-21, it's in alpha + for container in pod_spec.containers: + if container.resources is not None and container.resources.requests: + pod_requests += KubernetesResources.from_kubernetes_map( + container.resources.requests + ) + try: + nodes_allocated_resources[node_name] += pod_requests + except KeyError: + nodes_allocated_resources[node_name] = pod_requests + return nodes_allocated_resources diff --git a/src/tests/_internal/core/backends/kubernetes/test_compute.py b/src/tests/_internal/core/backends/kubernetes/test_resources.py similarity index 71% rename from src/tests/_internal/core/backends/kubernetes/test_compute.py rename to src/tests/_internal/core/backends/kubernetes/test_resources.py index f0bcbbc174..6a74233a53 100644 --- a/src/tests/_internal/core/backends/kubernetes/test_compute.py +++ b/src/tests/_internal/core/backends/kubernetes/test_resources.py @@ -3,51 +3,51 @@ import pytest from gpuhunt import AcceleratorVendor -from dstack._internal.core.backends.kubernetes.compute import ( - _get_amd_gpu_from_node_labels, - _get_nvidia_gpu_from_node_labels, +from dstack._internal.core.backends.kubernetes.resources import ( + get_amd_gpu_from_node_labels, + get_nvidia_gpu_from_node_labels, ) from dstack._internal.core.models.instances import Gpu class TestGetNvidiaGPUFromNodeLabels: def test_returns_none_if_no_labels(self): - assert _get_nvidia_gpu_from_node_labels({}) is None + assert get_nvidia_gpu_from_node_labels({}) is None def test_returns_correct_memory_for_different_A100(self): - assert _get_nvidia_gpu_from_node_labels( + assert get_nvidia_gpu_from_node_labels( {"nvidia.com/gpu.product": "A100-SXM4-40GB"} ) == Gpu(vendor=AcceleratorVendor.NVIDIA, name="A100", memory_mib=40 * 1024) - assert _get_nvidia_gpu_from_node_labels( + assert get_nvidia_gpu_from_node_labels( {"nvidia.com/gpu.product": "A100-SXM4-80GB"} ) == Gpu(vendor=AcceleratorVendor.NVIDIA, name="A100", memory_mib=80 * 1024) class TestGetAMDGPUFromNodeLabels: def test_returns_no_gpus_if_no_labels(self): - assert _get_amd_gpu_from_node_labels({}) is None + assert get_amd_gpu_from_node_labels({}) is None def test_returns_known_gpu(self): - assert _get_amd_gpu_from_node_labels({"beta.amd.com/gpu.device-id.74b5": "4"}) == Gpu( + assert get_amd_gpu_from_node_labels({"beta.amd.com/gpu.device-id.74b5": "4"}) == Gpu( vendor=AcceleratorVendor.AMD, name="MI300X", memory_mib=192 * 1024 ) def test_returns_known_gpu_if_multiple_device_ids_match_the_same_gpu(self): # 4x AMD Instinct MI300X VF + 4x AMD Instinct MI300X labels = {"beta.amd.com/gpu.device-id.74b5": "4", "beta.amd.com/gpu.device-id.74a1": "4"} - assert _get_amd_gpu_from_node_labels(labels) == Gpu( + assert get_amd_gpu_from_node_labels(labels) == Gpu( vendor=AcceleratorVendor.AMD, name="MI300X", memory_mib=192 * 1024 ) def test_returns_none_if_device_id_is_unknown(self, caplog: pytest.LogCaptureFixture): caplog.set_level(logging.WARNING) - assert _get_amd_gpu_from_node_labels({"beta.amd.com/gpu.device-id.ffff": "4"}) is None + assert get_amd_gpu_from_node_labels({"beta.amd.com/gpu.device-id.ffff": "4"}) is None assert "Unknown AMD GPU device id: FFFF" in caplog.text def test_returns_none_if_multiple_gpu_models(self, caplog: pytest.LogCaptureFixture): caplog.set_level(logging.WARNING) # 4x AMD Instinct MI300X VF + 4x AMD Instinct MI325X labels = {"beta.amd.com/gpu.device-id.74b5": "4", "beta.amd.com/gpu.device-id.74a5": "4"} - assert _get_amd_gpu_from_node_labels(labels) is None + assert get_amd_gpu_from_node_labels(labels) is None assert "Multiple AMD GPU models detected" in caplog.text