Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/concepts/backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ projects:
verbs: ["get", "create"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "create", "delete"]
verbs: ["get", "create", "delete", "list"]
- apiGroups: [""]
resources: ["services"]
verbs: ["get", "create", "delete"]
Expand Down
256 changes: 48 additions & 208 deletions src/dstack/_internal/core/backends/kubernetes/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from enum import Enum
from typing import List, Optional

from gpuhunt import KNOWN_AMD_GPUS, KNOWN_NVIDIA_GPUS, AcceleratorVendor
from gpuhunt import AcceleratorVendor
from kubernetes import client

from dstack._internal.core.backends.base.compute import (
Expand All @@ -19,85 +19,67 @@
generate_unique_instance_name_for_job,
get_docker_commands,
get_dstack_gateway_commands,
normalize_arch,
)
from dstack._internal.core.backends.base.offers import filter_offers_by_requirements
from dstack._internal.core.backends.kubernetes.models import (
KubernetesConfig,
KubernetesProxyJumpConfig,
)
from dstack._internal.core.backends.kubernetes.resources import (
AMD_GPU_DEVICE_ID_LABEL_PREFIX,
AMD_GPU_NAME_TO_DEVICE_IDS,
AMD_GPU_NODE_TAINT,
AMD_GPU_RESOURCE,
DUMMY_REGION,
NVIDIA_GPU_NAME_TO_GPU_INFO,
NVIDIA_GPU_NODE_TAINT,
NVIDIA_GPU_PRODUCT_LABEL,
NVIDIA_GPU_RESOURCE,
TaintEffect,
format_memory,
get_amd_gpu_from_node_labels,
get_gpu_request_from_gpu_spec,
get_instance_offer_from_node,
get_instance_offers,
get_node_labels,
get_nvidia_gpu_from_node_labels,
is_hard_taint,
is_taint_tolerated,
)
from dstack._internal.core.backends.kubernetes.utils import (
call_api_method,
get_api_from_config_data,
get_cluster_public_ip,
)
from dstack._internal.core.consts import DSTACK_RUNNER_SSH_PORT
from dstack._internal.core.errors import ComputeError
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.gateways import (
GatewayComputeConfiguration,
GatewayProvisioningData,
)
from dstack._internal.core.models.instances import (
Disk,
Gpu,
InstanceAvailability,
InstanceOfferWithAvailability,
InstanceRuntime,
InstanceType,
Resources,
SSHConnectionParams,
)
from dstack._internal.core.models.placement import PlacementGroup
from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory
from dstack._internal.core.models.resources import CPUSpec, GPUSpec
from dstack._internal.core.models.routers import AnyRouterConfig
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
from dstack._internal.core.models.volumes import Volume
from dstack._internal.utils.common import get_or_error, parse_memory
from dstack._internal.utils.common import get_or_error
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)

JUMP_POD_IMAGE = "testcontainers/sshd:1.3.0@sha256:c50c0f59554dcdb2d9e5e705112144428ae9d04ac0af6322b365a18e24213a6a"
JUMP_POD_SSH_PORT = 22
DUMMY_REGION = "-"

NVIDIA_GPU_RESOURCE = "nvidia.com/gpu"
NVIDIA_GPU_NODE_TAINT = NVIDIA_GPU_RESOURCE
NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product"

AMD_GPU_RESOURCE = "amd.com/gpu"
AMD_GPU_NODE_TAINT = AMD_GPU_RESOURCE
# The oldest but still supported label format, the safest option, see the commit message:
# https://github.com/ROCm/k8s-device-plugin/commit/c0b0231b391a56bc9da4f362d561e25e960d7a48
# E.g., beta.amd.com/gpu.device-id.74b5=4 - A node with four MI300X VF (0x74b5) GPUs
# We cannot rely on the beta.amd.com/gpu.product-name.* label, as it may be missing, see the issue:
# https://github.com/ROCm/k8s-device-plugin/issues/112
AMD_GPU_DEVICE_ID_LABEL_PREFIX = f"beta.{AMD_GPU_RESOURCE}.device-id."

# Taints we know and tolerate when creating our objects, e.g., the jump pod.
TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT, AMD_GPU_NODE_TAINT)

NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS}
NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys()

AMD_GPU_DEVICE_ID_TO_GPU_INFO = {
device_id: gpu_info for gpu_info in KNOWN_AMD_GPUS for device_id in gpu_info.device_ids
}
AMD_GPU_NAME_TO_DEVICE_IDS = {gpu.name: gpu.device_ids for gpu in KNOWN_AMD_GPUS}


class Operator(str, Enum):
EXISTS = "Exists"
IN = "In"


class TaintEffect(str, Enum):
NO_EXECUTE = "NoExecute"
NO_SCHEDULE = "NoSchedule"
PREFER_NO_SCHEDULE = "PreferNoSchedule"


class KubernetesCompute(
ComputeWithFilteredOffersCached,
ComputeWithPrivilegedSupport,
Expand All @@ -117,16 +99,7 @@ def __init__(self, config: KubernetesConfig):
def get_offers_by_requirements(
self, requirements: Requirements
) -> list[InstanceOfferWithAvailability]:
gpu_request = 0
if (gpu_spec := requirements.resources.gpu) is not None:
gpu_request = _get_gpu_request_from_gpu_spec(gpu_spec)
instance_offers: list[InstanceOfferWithAvailability] = []
for node in self.api.list_node().items:
if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None:
instance_offers.extend(
filter_offers_by_requirements([instance_offer], requirements)
)
return instance_offers
return get_instance_offers(self.api, requirements)

def run_job(
self,
Expand Down Expand Up @@ -191,7 +164,7 @@ def run_job(
if (cpu_max := resources_spec.cpu.count.max) is not None:
resources_limits["cpu"] = str(cpu_max)
if (gpu_spec := resources_spec.gpu) is not None:
if (gpu_request := _get_gpu_request_from_gpu_spec(gpu_spec)) > 0:
if (gpu_request := get_gpu_request_from_gpu_spec(gpu_spec)) > 0:
gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu(
self.api, gpu_spec
)
Expand All @@ -208,22 +181,22 @@ def run_job(
)
)
if (memory_min := resources_spec.memory.min) is not None:
resources_requests["memory"] = _render_memory(memory_min)
resources_requests["memory"] = format_memory(memory_min)
if (memory_max := resources_spec.memory.max) is not None:
resources_limits["memory"] = _render_memory(memory_max)
resources_limits["memory"] = format_memory(memory_max)
if (disk_spec := resources_spec.disk) is not None:
if (disk_min := disk_spec.size.min) is not None:
resources_requests["ephemeral-storage"] = _render_memory(disk_min)
resources_requests["ephemeral-storage"] = format_memory(disk_min)
if (disk_max := disk_spec.size.max) is not None:
resources_limits["ephemeral-storage"] = _render_memory(disk_max)
resources_limits["ephemeral-storage"] = format_memory(disk_max)
if (shm_size := resources_spec.shm_size) is not None:
shm_volume_name = "dev-shm"
volumes_.append(
client.V1Volume(
name=shm_volume_name,
empty_dir=client.V1EmptyDirVolumeSource(
medium="Memory",
size_limit=_render_memory(shm_size),
size_limit=format_memory(shm_size),
),
)
)
Expand Down Expand Up @@ -338,10 +311,17 @@ def update_provisioning_data(
provisioning_data.hostname = get_or_error(service_spec.cluster_ip)
pod_spec = get_or_error(pod.spec)
node = self.api.read_node(name=get_or_error(pod_spec.node_name))
# The original offer has a list of GPUs already sliced according to pod spec's GPU resource
# request, which is inferred from dstack's GPUSpec, see _get_gpu_request_from_gpu_spec
gpu_request = len(provisioning_data.instance_type.resources.gpus)
if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None:
# In the original offer, the resources have already been adjusted according to
# the run configuration resource requirements, see get_offers_by_requirements()
original_resources = provisioning_data.instance_type.resources
instance_offer = get_instance_offer_from_node(
node=node,
cpu_request=original_resources.cpus,
memory_mib_request=original_resources.memory_mib,
gpu_request=len(original_resources.gpus),
disk_mib_request=original_resources.disk.size_mib,
)
if instance_offer is not None:
provisioning_data.instance_type = instance_offer.instance
provisioning_data.region = instance_offer.region
provisioning_data.price = instance_offer.price
Expand Down Expand Up @@ -481,146 +461,6 @@ def terminate_gateway(
)


def _get_gpu_request_from_gpu_spec(gpu_spec: GPUSpec) -> int:
return gpu_spec.count.min or 0


def _get_instance_offer_from_node(
node: client.V1Node, gpu_request: int
) -> Optional[InstanceOfferWithAvailability]:
try:
node_name = get_or_error(get_or_error(node.metadata).name)
node_status = get_or_error(node.status)
allocatable = get_or_error(node_status.allocatable)
_cpu_arch: Optional[str] = None
if node_status.node_info is not None:
_cpu_arch = node_status.node_info.architecture
cpu_arch = normalize_arch(_cpu_arch).to_cpu_architecture()
cpus = _parse_cpu(allocatable["cpu"])
memory_mib = _parse_memory(allocatable["memory"])
disk_size_mib = _parse_memory(allocatable["ephemeral-storage"])
gpus = _get_node_gpus(node)
except (ValueError, KeyError) as e:
logger.exception("Failed to process node: %s: %s", type(e).__name__, e)
return None
return InstanceOfferWithAvailability(
backend=BackendType.KUBERNETES,
instance=InstanceType(
name=node_name,
resources=Resources(
cpus=cpus,
cpu_arch=cpu_arch,
memory_mib=memory_mib,
gpus=gpus[:gpu_request],
spot=False,
disk=Disk(size_mib=disk_size_mib),
),
),
price=0,
region=DUMMY_REGION,
availability=InstanceAvailability.AVAILABLE,
instance_runtime=InstanceRuntime.RUNNER,
)


def _parse_cpu(cpu: str) -> int:
if cpu.endswith("m"):
# "m" means millicpu (1/1000 CPU), e.g., 7900m -> 7.9 -> 7
return int(float(cpu[:-1]) / 1000)
return int(cpu)


def _parse_memory(memory: str) -> int:
if memory.isdigit():
# no suffix means that the value is in bytes
return int(memory) // 2**20
return int(parse_memory(memory, as_untis="M"))


def _render_memory(memory: Memory) -> str:
return f"{float(memory)}Gi"


def _get_node_labels(node: client.V1Node) -> dict[str, str]:
if (metadata := node.metadata) is None:
return {}
if (labels := metadata.labels) is None:
return {}
return labels


def _get_node_gpus(node: client.V1Node) -> list[Gpu]:
node_name = get_or_error(get_or_error(node.metadata).name)
allocatable = get_or_error(get_or_error(node.status).allocatable)
labels = _get_node_labels(node)
for gpu_resource, gpu_getter in (
(NVIDIA_GPU_RESOURCE, _get_nvidia_gpu_from_node_labels),
(AMD_GPU_RESOURCE, _get_amd_gpu_from_node_labels),
):
_gpu_count = allocatable.get(gpu_resource)
if not _gpu_count:
continue
gpu_count = int(_gpu_count)
if gpu_count < 1:
continue
gpu = gpu_getter(labels)
if gpu is None:
logger.warning(
"Node %s: GPU resource found, but failed to detect its model: %s=%d",
node_name,
gpu_resource,
gpu_count,
)
return []
return [gpu] * gpu_count
logger.debug("Node %s: no GPU resource found", node_name)
return []


def _get_nvidia_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]:
# We rely on https://github.com/NVIDIA/k8s-device-plugin/tree/main/docs/gpu-feature-discovery
# to detect gpus. Note that "nvidia.com/gpu.product" is not a short gpu name like "T4" or
# "A100" but a product name like "Tesla-T4" or "A100-SXM4-40GB".
# Thus, we convert the product name to a known gpu name.
gpu_product = labels.get(NVIDIA_GPU_PRODUCT_LABEL)
if gpu_product is None:
return None
gpu_product = gpu_product.replace("RTX-", "RTX")
for gpu_name in NVIDIA_GPU_NAMES:
if gpu_name.lower() in gpu_product.lower().split("-"):
break
else:
return None
gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO[gpu_name]
gpu_memory = gpu_info.memory * 1024
# A100 may come in two variants
if "40GB" in gpu_product:
gpu_memory = 40 * 1024
return Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory)


def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]:
# (AMDGPUInfo.name, AMDGPUInfo.memory) pairs
gpus: set[tuple[str, int]] = set()
for label in labels:
if not label.startswith(AMD_GPU_DEVICE_ID_LABEL_PREFIX):
continue
_, _, _device_id = label.rpartition(".")
device_id = int(_device_id, 16)
gpu_info = AMD_GPU_DEVICE_ID_TO_GPU_INFO.get(device_id)
if gpu_info is None:
logger.warning("Unknown AMD GPU device id: %X", device_id)
continue
gpus.add((gpu_info.name, gpu_info.memory))
if not gpus:
return None
if len(gpus) == 1:
gpu_name, gpu_memory_gib = next(iter(gpus))
return Gpu(vendor=AcceleratorVendor.AMD, name=gpu_name, memory_mib=gpu_memory_gib * 1024)
logger.warning("Multiple AMD GPU models detected: %s, ignoring all GPUs", gpus)
return None


def _get_pod_spec_parameters_for_gpu(
api: client.CoreV1Api, gpu_spec: GPUSpec
) -> tuple[str, client.V1NodeAffinity, str]:
Expand All @@ -643,8 +483,8 @@ def _get_nvidia_gpu_node_affinity(
) -> client.V1NodeAffinity:
matching_gpu_label_values: set[str] = set()
for node in nodes:
labels = _get_node_labels(node)
gpu = _get_nvidia_gpu_from_node_labels(labels)
labels = get_node_labels(node)
gpu = get_nvidia_gpu_from_node_labels(labels)
if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec):
matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL])
if not matching_gpu_label_values:
Expand Down Expand Up @@ -676,8 +516,8 @@ def _get_amd_gpu_node_affinity(
) -> client.V1NodeAffinity:
matching_device_ids: set[int] = set()
for node in nodes:
labels = _get_node_labels(node)
gpu = _get_amd_gpu_from_node_labels(labels)
labels = get_node_labels(node)
gpu = get_amd_gpu_from_node_labels(labels)
if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec):
matching_device_ids.update(AMD_GPU_NAME_TO_DEVICE_IDS[gpu.name])
return client.V1NodeAffinity(
Expand Down Expand Up @@ -828,10 +668,10 @@ def _create_jump_pod_service(
taints = node_spec.taints or []
for taint in taints:
# A "soft" taint, ignore.
if taint.effect == TaintEffect.PREFER_NO_SCHEDULE:
if not is_hard_taint(taint):
continue
has_hard_taint = True
if taint.key in TOLERATED_NODE_TAINTS:
if is_taint_tolerated(taint):
tolerated_taints.add((taint.key, taint.effect))
if not has_hard_taint:
toleration_required = False
Expand Down
Loading