diff --git a/Conceptual_Guide/Part_9-gpu_semantic_caching/README.md b/Conceptual_Guide/Part_9-gpu_semantic_caching/README.md new file mode 100644 index 00000000..6b2c7725 --- /dev/null +++ b/Conceptual_Guide/Part_9-gpu_semantic_caching/README.md @@ -0,0 +1,337 @@ + + +# GPU-Accelerated Semantic Caching with cuVS CAGRA + +In [Part 8](../Part_8-semantic_caching/README.md), we introduced semantic +caching as a strategy to reduce LLM inference costs by reusing responses +for semantically similar queries. That implementation used CPU-based Faiss +for the vector similarity search. + +This tutorial extends that approach by moving the vector search to GPU +using [cuVS CAGRA](https://docs.rapids.ai/api/cuvs/stable/), NVIDIA's +GPU-accelerated approximate nearest neighbor library. We show that GPU +search provides dramatic speedups at scale, and demonstrate a tiered +response store that balances latency and capacity. + +## Why GPU Search? + +The Part 8 tutorial uses `faiss.IndexFlatL2` for exact nearest neighbor +search on CPU. This works well for small caches, but search latency +grows linearly with cache size: + +| Cache Size | CPU Faiss (mean) | GPU CAGRA single (mean) | GPU CAGRA batch-64 (mean) | +|------------|-----------------|------------------------|--------------------------| +| 1,000 | 108 us | 1,138 us | 21 us | +| 10,000 | 2,099 us | 1,179 us | 27 us | +| 100,000 | 20,878 us | 1,196 us | 26 us | + +*Measured on NVIDIA A10G (24GB), 1024-dim embeddings, 2000 queries. +RAPIDS 26.02, cuVS CAGRA, Python 3.12.* + +Key observations: + +1. **CAGRA has ~1.1ms fixed overhead** per GPU kernel launch. For small + caches (1K entries), CPU Faiss is actually faster. + +2. **CAGRA scales flat.** Latency barely changes from 1K to 100K entries + (1,138us to 1,196us), while Faiss CPU grows linearly + (108us to 20,878us). + +3. **Batching is the key.** Amortizing GPU kernel launch across 64 + queries gives 21-27us per query regardless of cache size, achieving + **817x speedup** over CPU Faiss at 100K entries. + +4. **Throughput.** At 100K entries, CPU Faiss delivers 48 QPS. CAGRA + batch-64 delivers 39,152 QPS. + +## Architecture + +This tutorial introduces two improvements over Part 8: + +1. **GPU-resident vector search** using cuVS CAGRA instead of CPU Faiss +2. **Tiered response storage** with an in-memory LRU hot tier and + optional Redis warm tier + +``` + Triton Inference Server ++------------------------------------------------------+ +| | +| +-------------+ +--------------------------+ | +| | Embedding |--->| cagra_cache (GPU) | | +| | Model | | | | +| | (GPU) | | +--------------------+ | | +| +-------------+ | | cuVS CAGRA Index | | | +| | | (GPU HBM) | | | +| | +--------------------+ | | +| | | | +| | +--------------------+ | | +| | | Tiered Response | | | +| | | Store | | | +| | | +------+ +------+ | | | +| | | | Hot | | Warm | | | | +| | | |(LRU) | |(Redis)| | | | +| | | +------+ +------+ | | | +| | +--------------------+ | | +| +--------------------------+ | +| | +| Cache Hit? | +| +-- YES (sim >= 0.85): Return cached response | +| +-- NO: Forward to LLM, cache response | +| | ++------------------------------------------------------+ +``` + +### Tiered Response Storage + +The response store uses two tiers: + +- **Hot tier** (in-memory LRU): Sub-microsecond access. Stores the + most recently accessed responses. When full, least-recently-used + entries are demoted to the warm tier. + +- **Warm tier** (Redis, optional): 1-5ms access. Stores demoted + responses with a configurable TTL. Warm hits are promoted back to + the hot tier. + +This creates a natural temperature gradient: frequently accessed +responses stay in the fast hot tier, while less popular entries +gracefully degrade to Redis before expiring. + +## Prerequisites + +- NVIDIA GPU (Ampere or newer: A10G, A100, H100) +- CUDA 12.x +- Docker with NVIDIA Container Toolkit +- Python 3.10+ + +## Quick Start + +### Running the Benchmark + +The simplest way to validate these results is with the standalone +benchmark script, which requires no Triton setup: + +```bash +# Clone this repository +git clone https://github.com/triton-inference-server/tutorials.git +cd tutorials/Conceptual_Guide/Part_9-gpu_semantic_caching + +# Install dependencies +pip install numpy faiss-cpu + +# CPU-only benchmark (works on any machine) +python artifacts/benchmark.py --faiss-only + +# Full CPU vs GPU comparison (requires cuVS) +pip install cuvs-cu12 cupy-cuda12x +python artifacts/benchmark.py +``` + +### Running with Triton + +```bash +# Copy model files to model repository +mkdir -p model_repository/cagra_cache/1 +cp artifacts/model.py model_repository/cagra_cache/1/model.py +cp artifacts/tiered_store.py model_repository/cagra_cache/1/tiered_store.py +cp artifacts/config.pbtxt model_repository/cagra_cache/config.pbtxt + +# Start Triton with GPU support +docker run --gpus all --rm -p 8000:8000 -p 8001:8001 -p 8002:8002 \ + -v $(pwd)/model_repository:/models \ + nvcr.io/nvidia/tritonserver:24.12-py3 \ + tritonserver --model-repository=/models +``` + +## How It Works + +### GPU Search with cuVS CAGRA + +CAGRA (CUDA Approximate Graph-based Nearest Neighbor) builds a +GPU-optimized proximity graph over the cached embeddings. Unlike +Faiss `IndexFlatL2` which performs exhaustive scan, CAGRA traverses +the graph to find approximate nearest neighbors in near-constant +time: + +```python +from cuvs.neighbors import cagra +import cupy as cp + +# Build index on GPU +dataset_gpu = cp.asarray(embeddings, dtype=cp.float32) +index_params = cagra.IndexParams( + intermediate_graph_degree=64, + graph_degree=32 +) +index = cagra.build(index_params, dataset_gpu) + +# Search on GPU +search_params = cagra.SearchParams(itopk_size=32) +query_gpu = cp.asarray(query_embedding, dtype=cp.float32) +distances, indices = cagra.search(search_params, index, query_gpu, k=1) +``` + +The search runs entirely in GPU HBM. If embeddings are already +generated on GPU (as they are in a Triton ensemble), no PCIe +transfer is needed in the hot path. + +### Distance to Similarity Conversion + +CAGRA returns L2 distances. For normalized embeddings, we convert +to cosine similarity: + +```python +# For unit-normalized vectors: +# L2_distance = 2 * (1 - cosine_similarity) +# cosine_similarity = 1 - L2_distance / 2 +similarity = 1.0 - l2_distance / 2.0 +``` + +### Tiered Storage + +The `TieredResponseStore` manages cached responses across two tiers: + +```python +from tiered_store import TieredResponseStore + +store = TieredResponseStore( + hot_capacity=10000, # Max entries in memory + redis_url="redis://localhost:6379", # Optional warm tier + redis_ttl_secs=86400, # 24h TTL for warm entries + promote_on_warm_hit=True # Promote warm hits to hot +) + +# Store a response +store.put(entry_index, "cached LLM response") + +# Retrieve (checks hot first, then warm) +response = store.get(entry_index) # Returns None on miss +``` + +## Configuration + +### Vector Search Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `similarity_threshold` | `0.85` | Minimum cosine similarity for cache hit | +| `max_entries` | `100000` | Maximum cached entries | +| `embedding_dim` | `1024` | Embedding dimension | +| `graph_degree` | `32` | CAGRA graph degree (higher = more accurate) | +| `use_gpu` | `true` | Use GPU CAGRA or fall back to CPU Faiss | + +### Tiered Storage Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `hot_capacity` | `10000` | Max entries in memory LRU | +| `redis_url` | (empty) | Redis URL for warm tier (empty = disabled) | +| `redis_ttl_secs` | `86400` | TTL for warm tier entries | +| `promote_on_warm_hit` | `true` | Promote warm hits to hot tier | + +## Benchmark Results + +Full benchmark results measured on NVIDIA A10G (g5.xlarge, EKS): + +### Latency + +| Cache Size | Backend | Mean | P50 | P95 | P99 | P99.9 | QPS | +|-----------|---------|------|-----|-----|-----|-------|-----| +| 1K | faiss-cpu | 108us | 108us | 113us | 128us | 152us | 9,221 | +| 1K | cagra-gpu | 1,138us | 1,137us | 1,155us | 1,168us | 1,231us | 879 | +| 1K | cagra-batch64 | 21us | 20us | 21us | 23us | 69us | 48,094 | +| 10K | faiss-cpu | 2,099us | 2,064us | 2,326us | 2,666us | 2,948us | 476 | +| 10K | cagra-gpu | 1,179us | 1,180us | 1,194us | 1,210us | 1,252us | 848 | +| 10K | cagra-batch64 | 27us | 26us | 28us | 29us | 84us | 37,575 | +| 100K | faiss-cpu | 20,878us | 20,655us | 22,435us | 24,532us | 26,586us | 48 | +| 100K | cagra-gpu | 1,196us | 1,197us | 1,211us | 1,245us | 1,371us | 836 | +| 100K | cagra-batch64 | 26us | 25us | 26us | 27us | 75us | 39,152 | + +### Speedup (vs CPU Faiss) + +| Cache Size | CAGRA Single | CAGRA Batch-64 | +|-----------|-------------|----------------| +| 1,000 | 0.1x | **5.2x** | +| 10,000 | **1.8x** | **79x** | +| 100,000 | **17.5x** | **817x** | + +### When to Use GPU vs CPU + +- **< 5K entries**: CPU Faiss is sufficient. GPU kernel launch overhead + (~1.1ms) dominates at small cache sizes. +- **5K-50K entries**: GPU CAGRA single-query matches or beats CPU. + Batch mode provides 10-80x speedup. +- **50K+ entries**: GPU CAGRA is strictly superior. CPU Faiss latency + becomes impractical (>10ms per query). +- **Batch workloads**: Always prefer GPU with batch sizes of 32-128 + for maximum throughput. + +## Comparison with Part 8 + +| Aspect | Part 8 (CPU) | Part 9 (GPU) | +|--------|-------------|-------------| +| Vector search | Faiss IndexFlatL2 | cuVS CAGRA | +| Hardware | CPU | GPU (Ampere+) | +| Search type | Exact | Approximate (>99% recall) | +| Scaling | O(n) linear | ~O(1) constant | +| Response store | theine LRU | Tiered (LRU + Redis) | +| Embedding model | all-MiniLM-L6-v2 (384-dim) | Any (1024-dim default) | +| Deployment | Single process | Docker Compose (+ Redis) | + +## Production Considerations + +This tutorial demonstrates GPU-accelerated cache search with tiered +response storage. For production deployments, additional considerations +include: + +- **Cache warming**: Pre-populate the cache with common queries +- **Multi-tenant isolation**: Separate caches per tenant +- **Monitoring**: Track hit rates, latency percentiles, tier sizes +- **Persistence**: Redis AOF/RDB for warm tier crash recovery +- **Horizontal scaling**: Multiple Triton replicas with shared Redis + +For a production implementation with multi-tier caching, +adaptive GPU memory management, multi-tenant isolation, and +enterprise SLA guarantees, see [Synapse](https://worldflowai.com). + +## Files + +| File | Description | +|------|-------------| +| `artifacts/benchmark.py` | Standalone CPU vs GPU benchmark | +| `artifacts/model.py` | Triton Python backend for cache model | +| `artifacts/tiered_store.py` | Tiered response store (LRU + Redis) | +| `artifacts/config.pbtxt` | Triton model configuration | + +## References + +- [cuVS CAGRA Documentation](https://docs.rapids.ai/api/cuvs/stable/) +- [Part 8 - Semantic Caching (CPU)](../Part_8-semantic_caching/README.md) +- [Triton Python Backend](https://github.com/triton-inference-server/python_backend) +- [RAPIDS AI](https://rapids.ai/) diff --git a/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/benchmark.py b/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/benchmark.py new file mode 100644 index 00000000..0a6daaf4 --- /dev/null +++ b/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/benchmark.py @@ -0,0 +1,488 @@ +# Copyright 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of NVIDIA CORPORATION nor the names of its +# contributors may be used to endorse or promote products derived +# from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +GPU Semantic Cache Benchmark: Faiss CPU vs cuVS CAGRA GPU + +Compares vector search performance for semantic cache lookup at +different cache sizes. Measures latency percentiles, throughput, +and hit rates. + +Usage: + # CPU only (no GPU required) + pip install numpy faiss-cpu + python benchmark.py --faiss-only + + # Full CPU vs GPU comparison + pip install numpy faiss-cpu cuvs-cu12 cupy-cuda12x + python benchmark.py + + # Custom parameters + python benchmark.py --entries 1000 10000 100000 --queries 2000 --dim 1024 +""" + +from __future__ import annotations + +import argparse +import json +import platform +import subprocess +import sys +import time +from dataclasses import asdict, dataclass, field + +import numpy as np + + +@dataclass +class LatencyStats: + mean: float = 0.0 + p50: float = 0.0 + p95: float = 0.0 + p99: float = 0.0 + p999: float = 0.0 + min: float = 0.0 + max: float = 0.0 + std: float = 0.0 + + @classmethod + def from_array(cls, arr): + if len(arr) == 0: + return cls() + return cls( + mean=float(np.mean(arr)), + p50=float(np.percentile(arr, 50)), + p95=float(np.percentile(arr, 95)), + p99=float(np.percentile(arr, 99)), + p999=float(np.percentile(arr, 99.9)), + min=float(np.min(arr)), + max=float(np.max(arr)), + std=float(np.std(arr)), + ) + + +@dataclass +class BenchmarkRun: + backend: str + cache_size: int + num_queries: int + dimension: int + similarity_threshold: float + latency: LatencyStats = field(default_factory=LatencyStats) + throughput_qps: float = 0.0 + hit_rate: float = 0.0 + avg_similarity: float = 0.0 + build_time_ms: float = 0.0 + + +def get_gpu_info(): + """Query GPU name and memory via nvidia-smi.""" + try: + result = subprocess.run( + [ + "nvidia-smi", + "--query-gpu=name,memory.total", + "--format=csv,noheader,nounits", + ], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode == 0: + parts = result.stdout.strip().split(", ") + name = parts[0] if parts else "Unknown" + mem = int(parts[1]) if len(parts) > 1 else 0 + return name, mem + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + return "Unknown", 0 + + +def generate_dataset(num_entries, dim, seed=42): + """Generate a normalized random dataset.""" + rng = np.random.RandomState(seed) + data = rng.randn(num_entries, dim).astype(np.float32) + norms = np.linalg.norm(data, axis=1, keepdims=True) + 1e-8 + return data / norms + + +def generate_queries(dataset, num_queries, noise=0.1, seed=123): + """Generate queries by adding noise to random dataset entries.""" + rng = np.random.RandomState(seed) + indices = rng.choice(len(dataset), size=num_queries, replace=True) + queries = dataset[indices].copy() + queries += rng.randn(num_queries, dataset.shape[1]).astype(np.float32) * noise + norms = np.linalg.norm(queries, axis=1, keepdims=True) + 1e-8 + return queries / norms, indices + + +def bench_faiss_cpu(dataset, queries, threshold, warmup=50): + """Benchmark Faiss CPU IndexFlatIP (exact inner product search).""" + import faiss + + dim = dataset.shape[1] + index = faiss.IndexFlatIP(dim) + + t0 = time.perf_counter() + index.add(dataset) + build_ms = (time.perf_counter() - t0) * 1000 + + # Warmup + for i in range(min(warmup, len(queries))): + index.search(queries[i : i + 1], 1) + + # Benchmark + latencies = [] + hits = 0 + total_sim = 0.0 + for i in range(len(queries)): + t = time.perf_counter_ns() + scores, _ = index.search(queries[i : i + 1], 1) + latencies.append((time.perf_counter_ns() - t) / 1000.0) + sim = float(scores[0][0]) + total_sim += sim + if sim >= threshold: + hits += 1 + + lat_arr = np.array(latencies) + total_s = lat_arr.sum() / 1e6 + return BenchmarkRun( + backend="faiss-cpu", + cache_size=len(dataset), + num_queries=len(queries), + dimension=dim, + similarity_threshold=threshold, + latency=LatencyStats.from_array(lat_arr), + throughput_qps=len(queries) / total_s if total_s > 0 else 0, + hit_rate=hits / len(queries), + avg_similarity=total_sim / len(queries), + build_time_ms=build_ms, + ) + + +def bench_cagra_single(dataset, queries, threshold, graph_degree=32, warmup=50): + """Benchmark cuVS CAGRA GPU single-query search.""" + from cuvs.neighbors import cagra + import cupy as cp + + dim = dataset.shape[1] + dataset_gpu = cp.asarray(dataset, dtype=cp.float32) + + index_params = cagra.IndexParams( + intermediate_graph_degree=graph_degree * 2, + graph_degree=graph_degree, + ) + t0 = time.perf_counter() + index = cagra.build(index_params, dataset_gpu) + cp.cuda.Device(0).synchronize() + build_ms = (time.perf_counter() - t0) * 1000 + + search_params = cagra.SearchParams(itopk_size=32) + + # Warmup + for i in range(min(warmup, len(queries))): + query_gpu = cp.asarray(queries[i : i + 1], dtype=cp.float32) + cagra.search(search_params, index, query_gpu, k=1) + cp.cuda.Device(0).synchronize() + + # Benchmark + latencies = [] + hits = 0 + total_sim = 0.0 + for i in range(len(queries)): + query_gpu = cp.asarray(queries[i : i + 1], dtype=cp.float32) + t = time.perf_counter_ns() + distances, _ = cagra.search(search_params, index, query_gpu, k=1) + cp.cuda.Device(0).synchronize() + latencies.append((time.perf_counter_ns() - t) / 1000.0) + dist = float(cp.asnumpy(distances)[0][0]) + sim = 1.0 - dist / 2.0 + total_sim += sim + if sim >= threshold: + hits += 1 + + lat_arr = np.array(latencies) + total_s = lat_arr.sum() / 1e6 + del index, dataset_gpu + cp.get_default_memory_pool().free_all_blocks() + return BenchmarkRun( + backend="cuvs-cagra-gpu", + cache_size=len(dataset), + num_queries=len(queries), + dimension=dim, + similarity_threshold=threshold, + latency=LatencyStats.from_array(lat_arr), + throughput_qps=len(queries) / total_s if total_s > 0 else 0, + hit_rate=hits / len(queries), + avg_similarity=total_sim / len(queries), + build_time_ms=build_ms, + ) + + +def bench_cagra_batch( + dataset, queries, threshold, batch_size=64, graph_degree=32, warmup=5 +): + """Benchmark cuVS CAGRA GPU batch search.""" + from cuvs.neighbors import cagra + import cupy as cp + + dim = dataset.shape[1] + dataset_gpu = cp.asarray(dataset, dtype=cp.float32) + + index_params = cagra.IndexParams( + intermediate_graph_degree=graph_degree * 2, + graph_degree=graph_degree, + ) + t0 = time.perf_counter() + index = cagra.build(index_params, dataset_gpu) + cp.cuda.Device(0).synchronize() + build_ms = (time.perf_counter() - t0) * 1000 + + search_params = cagra.SearchParams(itopk_size=32) + + # Warmup + for _ in range(warmup): + batch_gpu = cp.asarray(queries[:batch_size], dtype=cp.float32) + cagra.search(search_params, index, batch_gpu, k=1) + cp.cuda.Device(0).synchronize() + + # Benchmark + num_queries = len(queries) + latencies = [] + hits = 0 + total_sim = 0.0 + for start in range(0, num_queries, batch_size): + end = min(start + batch_size, num_queries) + batch = queries[start:end] + batch_gpu = cp.asarray(batch, dtype=cp.float32) + + t = time.perf_counter_ns() + distances, _ = cagra.search(search_params, index, batch_gpu, k=1) + cp.cuda.Device(0).synchronize() + elapsed_us = (time.perf_counter_ns() - t) / 1000.0 + + per_query = elapsed_us / len(batch) + latencies.extend([per_query] * len(batch)) + + dists = cp.asnumpy(distances).flatten() + sims = 1.0 - dists / 2.0 + total_sim += float(np.sum(sims)) + hits += int(np.sum(sims >= threshold)) + + lat_arr = np.array(latencies) + total_s = lat_arr.sum() / 1e6 + del index, dataset_gpu + cp.get_default_memory_pool().free_all_blocks() + return BenchmarkRun( + backend=f"cuvs-cagra-gpu-batch{batch_size}", + cache_size=len(dataset), + num_queries=num_queries, + dimension=dim, + similarity_threshold=threshold, + latency=LatencyStats.from_array(lat_arr), + throughput_qps=num_queries / total_s if total_s > 0 else 0, + hit_rate=hits / num_queries, + avg_similarity=total_sim / num_queries, + build_time_ms=build_ms, + ) + + +def print_results(all_runs, sizes, dim, num_queries): + """Print formatted results summary.""" + print("\n" + "=" * 100) + print(" RESULTS SUMMARY") + print("=" * 100) + for size in sizes: + runs = [r for r in all_runs if r.cache_size == size] + if not runs: + continue + print(f"\n Cache: {size:,} entries, {dim}-dim, {num_queries} queries") + print( + f" {'Backend':<25} {'Mean':>10} {'P50':>10} {'P95':>10} " + f"{'P99':>10} {'P99.9':>10} {'QPS':>12} {'Hits':>8}" + ) + print(" " + "-" * 97) + faiss_mean = None + for r in runs: + if r.backend == "faiss-cpu": + faiss_mean = r.latency.mean + speedup = "" + if faiss_mean and r.backend != "faiss-cpu" and r.latency.mean > 0: + speedup = f" ({faiss_mean / r.latency.mean:.0f}x)" + print( + f" {r.backend:<25} {r.latency.mean:>8.1f}us " + f"{r.latency.p50:>8.1f}us {r.latency.p95:>8.1f}us " + f"{r.latency.p99:>8.1f}us {r.latency.p999:>8.1f}us " + f"{r.throughput_qps:>10,.0f} {r.hit_rate:>7.1%}{speedup}" + ) + + +def main(): + parser = argparse.ArgumentParser( + description="Benchmark Faiss CPU vs cuVS CAGRA GPU for semantic cache search" + ) + parser.add_argument( + "--entries", + nargs="+", + type=int, + default=[1000, 10000, 100000], + help="Cache sizes to benchmark", + ) + parser.add_argument( + "--queries", type=int, default=2000, help="Number of queries per benchmark" + ) + parser.add_argument( + "--dim", type=int, default=1024, help="Embedding dimension" + ) + parser.add_argument( + "--threshold", type=float, default=0.85, help="Similarity threshold" + ) + parser.add_argument( + "--noise", type=float, default=0.1, help="Query noise level" + ) + parser.add_argument( + "--faiss-only", action="store_true", help="Only run Faiss CPU benchmark" + ) + parser.add_argument( + "--output", type=str, default=None, help="Output JSON file path" + ) + args = parser.parse_args() + + gpu_name, gpu_mem = get_gpu_info() + + print("=" * 70) + print(" GPU SEMANTIC CACHE BENCHMARK") + print("=" * 70) + print(f" GPU: {gpu_name} ({gpu_mem} MB)") + print(f" Python: {platform.python_version()}") + print(f" Host: {platform.node()}") + print(f" Dim: {args.dim}, Queries: {args.queries}") + print(f" Threshold: {args.threshold}") + print("=" * 70) + + # Check for cuVS + has_cuvs = False + if not args.faiss_only: + try: + from cuvs.neighbors import cagra # noqa: F401 + import cupy # noqa: F401 + + has_cuvs = True + print(" cuVS: AVAILABLE") + except ImportError as e: + print(f" cuVS: NOT AVAILABLE ({e})") + print(" Running Faiss CPU only. Install cuvs-cu12 for GPU benchmarks.") + + try: + import faiss # noqa: F401 + + print(" Faiss: AVAILABLE") + except ImportError: + print(" ERROR: faiss-cpu not installed!") + sys.exit(1) + + all_runs = [] + + for size in args.entries: + print(f"\n{'=' * 70}") + print(f" CACHE SIZE: {size:,}") + print("=" * 70) + + dataset = generate_dataset(size, args.dim) + queries, _ = generate_queries(dataset, args.queries, args.noise) + + # Faiss CPU + print(" [1/3] Faiss CPU...") + faiss_run = bench_faiss_cpu(dataset, queries, args.threshold) + all_runs.append(faiss_run) + print( + f" Mean:{faiss_run.latency.mean:.1f}us " + f"P99:{faiss_run.latency.p99:.1f}us " + f"QPS:{faiss_run.throughput_qps:,.0f} " + f"Hits:{faiss_run.hit_rate:.1%}" + ) + + if has_cuvs: + # CAGRA single-query + print(" [2/3] CAGRA single-query...") + cagra_run = bench_cagra_single(dataset, queries, args.threshold) + all_runs.append(cagra_run) + speedup = faiss_run.latency.mean / max(cagra_run.latency.mean, 0.001) + print( + f" Mean:{cagra_run.latency.mean:.1f}us " + f"P99:{cagra_run.latency.p99:.1f}us " + f"QPS:{cagra_run.throughput_qps:,.0f} " + f"Speedup:{speedup:.1f}x" + ) + + # CAGRA batch + print(" [3/3] CAGRA batch-64...") + batch_run = bench_cagra_batch(dataset, queries, args.threshold) + all_runs.append(batch_run) + batch_speedup = faiss_run.latency.mean / max( + batch_run.latency.mean, 0.001 + ) + print( + f" Mean:{batch_run.latency.mean:.1f}us " + f"P99:{batch_run.latency.p99:.1f}us " + f"QPS:{batch_run.throughput_qps:,.0f} " + f"Speedup:{batch_speedup:.1f}x" + ) + + # Print summary + print_results(all_runs, args.entries, args.dim, args.queries) + + # Build results dict + results = { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "hostname": platform.node(), + "gpu_name": gpu_name, + "gpu_memory_mb": gpu_mem, + "python_version": platform.python_version(), + "config": { + "dimension": args.dim, + "num_queries": args.queries, + "threshold": args.threshold, + "noise": args.noise, + "cache_sizes": args.entries, + }, + "runs": [asdict(r) for r in all_runs], + } + + # Save results + output_path = args.output or "/tmp/benchmark_results.json" + with open(output_path, "w") as f: + json.dump(results, f, indent=2) + print(f"\n Results saved to: {output_path}") + + # Print JSON for machine parsing + print("\n--- JSON_RESULTS_START ---") + print(json.dumps(results, indent=2)) + print("--- JSON_RESULTS_END ---") + + +if __name__ == "__main__": + main() diff --git a/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/config.pbtxt b/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/config.pbtxt new file mode 100644 index 00000000..bc82eb66 --- /dev/null +++ b/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/config.pbtxt @@ -0,0 +1,121 @@ +name: "cagra_cache" +backend: "python" +max_batch_size: 64 + +input [ + { + name: "query_embedding" + data_type: TYPE_FP32 + dims: [ -1 ] + }, + { + name: "operation" + data_type: TYPE_STRING + dims: [ 1 ] + }, + { + name: "response_text" + data_type: TYPE_STRING + dims: [ 1 ] + optional: true + } +] + +output [ + { + name: "cache_hit" + data_type: TYPE_BOOL + dims: [ 1 ] + }, + { + name: "similarity" + data_type: TYPE_FP32 + dims: [ 1 ] + }, + { + name: "cached_response" + data_type: TYPE_STRING + dims: [ 1 ] + }, + { + name: "latency_us" + data_type: TYPE_FP32 + dims: [ 1 ] + } +] + +# ------------------------------------------------------- +# Vector search parameters +# ------------------------------------------------------- +parameters: { + key: "similarity_threshold" + value: { string_value: "0.85" } +} + +parameters: { + key: "max_entries" + value: { string_value: "100000" } +} + +parameters: { + key: "embedding_dim" + value: { string_value: "1024" } +} + +parameters: { + key: "graph_degree" + value: { string_value: "32" } +} + +parameters: { + key: "use_gpu" + value: { string_value: "true" } +} + +# ------------------------------------------------------- +# Tiered response storage parameters +# ------------------------------------------------------- +# Hot tier: in-memory LRU cache for frequently accessed responses. +# Entries are evicted to the warm tier (Redis) when capacity is reached. +parameters: { + key: "hot_capacity" + value: { string_value: "10000" } +} + +# Warm tier: Redis for persistent response storage. +# Set to empty string to disable (responses evicted from hot are lost). +parameters: { + key: "redis_url" + value: { string_value: "redis://redis:6379" } +} + +# Key prefix for Redis entries (namespace isolation for multi-model). +parameters: { + key: "redis_prefix" + value: { string_value: "triton_cache:" } +} + +# TTL for warm tier entries in Redis (seconds). Default: 24 hours. +parameters: { + key: "redis_ttl_secs" + value: { string_value: "86400" } +} + +# Promote warm tier hits back to hot tier for faster subsequent access. +parameters: { + key: "promote_on_warm_hit" + value: { string_value: "true" } +} + +instance_group [ + { + count: 1 + kind: KIND_GPU + gpus: [ 0 ] + } +] + +dynamic_batching { + preferred_batch_size: [ 1, 4, 16 ] + max_queue_delay_microseconds: 500 +} diff --git a/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/model.py b/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/model.py new file mode 100644 index 00000000..952b8e92 --- /dev/null +++ b/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/model.py @@ -0,0 +1,459 @@ +""" +Triton Python backend for GPU-accelerated semantic caching using cuVS CAGRA. + +This backend maintains a CAGRA index in GPU memory for sub-2ms cache lookups, +with tiered response storage (Hot in-memory LRU -> Warm Redis -> Evicted). + +Architecture: + Query embedding (GPU) -> CAGRA search (GPU) -> threshold check -> response + Response storage: Hot (memory LRU) -> Warm (Redis) -> Evicted + +All vector operations stay in GPU HBM - no CPU-GPU transfers in the hot path. +Response retrieval uses the tiered store for optimal latency at any cache size. +""" + +import json +import time +from threading import Lock + +import numpy as np + +# Triton Python backend utilities +import triton_python_backend_utils as pb_utils + +# Tiered response store +from tiered_store import TieredResponseStore + +# cuVS CAGRA imports +try: + from cuvs.neighbors import cagra + import cupy as cp + + HAS_CUVS = True +except ImportError: + HAS_CUVS = False + # Fallback to faiss for comparison + import faiss + + +class TritonPythonModel: + """GPU-accelerated semantic cache with tiered response storage. + + This model maintains a GPU-resident CAGRA index for fast approximate + nearest neighbor search. Cached responses are stored in a tiered + hierarchy: hot (in-memory LRU) -> warm (Redis) -> evicted. + + Configuration (via config.pbtxt parameters): + Vector search: + - similarity_threshold: Minimum cosine similarity for cache hit (default: 0.85) + - max_entries: Maximum cache size (default: 100000) + - embedding_dim: Embedding dimension (default: 1024) + - graph_degree: CAGRA graph degree (default: 32) + - use_gpu: Use GPU CAGRA or CPU Faiss fallback (default: true) + + Tiered storage: + - hot_capacity: Max entries in hot in-memory tier (default: 10000) + - redis_url: Redis URL for warm tier (default: empty = disabled) + - redis_prefix: Key prefix for Redis entries (default: triton_cache:) + - redis_ttl_secs: TTL for warm tier entries (default: 86400) + - promote_on_warm_hit: Promote warm hits to hot (default: true) + """ + + def initialize(self, args): + """Initialize the CAGRA index and tiered response store.""" + self.model_config = json.loads(args["model_config"]) + + # Parse parameters + params = {} + for param in self.model_config.get("parameters", []): + params[param["key"]] = param["string_value"] + + # Vector search config + self.similarity_threshold = float( + params.get("similarity_threshold", "0.85") + ) + self.max_entries = int(params.get("max_entries", "100000")) + self.embedding_dim = int(params.get("embedding_dim", "1024")) + self.graph_degree = int(params.get("graph_degree", "32")) + self.use_gpu = params.get("use_gpu", "true").lower() == "true" + + # Tiered storage config + hot_capacity = int(params.get("hot_capacity", "10000")) + redis_url = params.get("redis_url", "") or None + redis_prefix = params.get("redis_prefix", "triton_cache:") + redis_ttl_secs = int(params.get("redis_ttl_secs", "86400")) + promote_on_warm_hit = ( + params.get("promote_on_warm_hit", "true").lower() == "true" + ) + + # Initialize tiered response store + self.store = TieredResponseStore( + hot_capacity=hot_capacity, + redis_url=redis_url, + redis_prefix=redis_prefix, + redis_ttl_secs=redis_ttl_secs, + promote_on_warm_hit=promote_on_warm_hit, + ) + + self.entry_count = 0 + self.lock = Lock() + + # Metrics + self.total_searches = 0 + self.cache_hits = 0 + self.total_search_latency_ns = 0 + + if self.use_gpu and HAS_CUVS: + self._init_cagra() + else: + self._init_faiss() + + # Log tiered storage configuration + warm_status = ( + f"Redis ({redis_url})" if redis_url else "disabled" + ) + pb_utils.Logger.log( + f"[cagra_cache] Tiered storage: " + f"hot_capacity={hot_capacity}, " + f"warm={warm_status}, " + f"ttl={redis_ttl_secs}s, " + f"promote_on_hit={promote_on_warm_hit}", + pb_utils.Logger.INFO, + ) + + def _init_cagra(self): + """Initialize GPU-resident CAGRA index.""" + self.backend = "cagra" + + # Pre-allocate dataset buffer on GPU + import cupy as cp + + self.dataset = cp.zeros( + (self.max_entries, self.embedding_dim), + dtype=cp.float32, + ) + + # CAGRA build parameters + self.build_params = cagra.IndexParams( + intermediate_graph_degree=self.graph_degree * 2, + graph_degree=self.graph_degree, + ) + + # Search parameters + self.search_params = cagra.SearchParams( + max_queries=256, + itopk_size=32, + ) + + self.index = None # Built after first N entries + self.index_dirty = True # Rebuild needed + self.min_build_size = max(self.graph_degree * 4, 64) + + pb_utils.Logger.log( + f"[cagra_cache] Initialized CAGRA backend: " + f"dim={self.embedding_dim}, max_entries={self.max_entries}, " + f"graph_degree={self.graph_degree}", + pb_utils.Logger.INFO, + ) + + def _init_faiss(self): + """Initialize CPU Faiss index (baseline comparison).""" + self.backend = "faiss" + # Inner product for cosine similarity on L2-normalized vectors + self.faiss_index = faiss.IndexFlatIP(self.embedding_dim) + + pb_utils.Logger.log( + f"[cagra_cache] Initialized Faiss CPU backend: " + f"dim={self.embedding_dim}", + pb_utils.Logger.INFO, + ) + + def _rebuild_cagra_index(self): + """Rebuild CAGRA index from current dataset.""" + if self.entry_count < self.min_build_size: + return + + active_data = self.dataset[: self.entry_count] + + # Build index on GPU + self.index = cagra.build( + self.build_params, + active_data, + ) + cp.cuda.Device(0).synchronize() + self.index_dirty = False + + def execute(self, requests): + """Process inference requests. + + Input tensors: + - query_embedding: float32 [batch_size, embedding_dim] + - operation: string [1] - "search", "insert", or "metrics" + - response_text: string [1] - cached response (for insert only) + + Output tensors: + - cache_hit: bool [batch_size] + - similarity: float32 [batch_size] + - cached_response: string [batch_size] + - latency_us: float32 [1] + """ + responses = [] + + for request in requests: + start_time = time.perf_counter_ns() + + # Get operation type + operation = ( + pb_utils.get_input_tensor_by_name(request, "operation") + .as_numpy()[0] + .decode("utf-8") + ) + + if operation == "metrics": + # Return tier metrics as JSON + metrics = self.store.get_metrics() + metrics["backend"] = self.backend + metrics["entry_count"] = self.entry_count + metrics["total_searches"] = self.total_searches + metrics["cache_hits"] = self.cache_hits + + elapsed_us = (time.perf_counter_ns() - start_time) / 1000.0 + + out_hit = pb_utils.Tensor( + "cache_hit", np.array([False]) + ) + out_sim = pb_utils.Tensor( + "similarity", + np.array([0.0], dtype=np.float32), + ) + out_resp = pb_utils.Tensor( + "cached_response", + np.array([json.dumps(metrics)], dtype=object), + ) + out_latency = pb_utils.Tensor( + "latency_us", + np.array([elapsed_us], dtype=np.float32), + ) + + responses.append( + pb_utils.InferenceResponse( + output_tensors=[ + out_hit, + out_sim, + out_resp, + out_latency, + ] + ) + ) + continue + + # Get embedding for search/insert + query_embedding = pb_utils.get_input_tensor_by_name( + request, "query_embedding" + ).as_numpy() + + if operation == "search": + cache_hit, similarity, cached_response = self._search( + query_embedding + ) + elif operation == "insert": + response_text = ( + pb_utils.get_input_tensor_by_name( + request, "response_text" + ) + .as_numpy()[0] + .decode("utf-8") + ) + cache_hit, similarity, cached_response = self._insert( + query_embedding, response_text + ) + else: + cache_hit = np.array([False]) + similarity = np.array([0.0], dtype=np.float32) + cached_response = np.array( + [f"Unknown operation: {operation}"], dtype=object + ) + + elapsed_us = (time.perf_counter_ns() - start_time) / 1000.0 + + # Create output tensors + out_hit = pb_utils.Tensor("cache_hit", cache_hit) + out_sim = pb_utils.Tensor("similarity", similarity) + out_resp = pb_utils.Tensor( + "cached_response", + np.array(cached_response, dtype=object), + ) + out_latency = pb_utils.Tensor( + "latency_us", + np.array([elapsed_us], dtype=np.float32), + ) + + responses.append( + pb_utils.InferenceResponse( + output_tensors=[out_hit, out_sim, out_resp, out_latency] + ) + ) + + return responses + + def _search(self, query_embedding): + """Search for similar cached entries using tiered store.""" + batch_size = query_embedding.shape[0] + + with self.lock: + self.total_searches += 1 + + if self.entry_count == 0: + return ( + np.array([False] * batch_size), + np.array([0.0] * batch_size, dtype=np.float32), + [""] * batch_size, + ) + + if self.backend == "cagra": + return self._search_cagra(query_embedding, batch_size) + return self._search_faiss(query_embedding, batch_size) + + def _search_cagra(self, query_embedding, batch_size): + """GPU CAGRA search - the fast path.""" + import cupy as cp + + if self.index is None or self.index_dirty: + self._rebuild_cagra_index() + + if self.index is None: + # Not enough entries to build index yet + return ( + np.array([False] * batch_size), + np.array([0.0] * batch_size, dtype=np.float32), + [""] * batch_size, + ) + + # Transfer query to GPU + query_gpu = cp.asarray(query_embedding, dtype=cp.float32) + + # CAGRA search on GPU + distances, indices = cagra.search( + self.search_params, + self.index, + query_gpu, + k=1, + ) + cp.cuda.Device(0).synchronize() + + # Transfer results back to CPU + distances_cpu = cp.asnumpy(distances).flatten() + indices_cpu = cp.asnumpy(indices).flatten() + + # Convert L2 distances to cosine similarity + # For L2-normalized vectors: similarity = 1 - distance/2 + similarities = 1.0 - distances_cpu / 2.0 + + hits = similarities >= self.similarity_threshold + cached_responses = [] + + for i in range(batch_size): + if hits[i]: + idx = int(indices_cpu[i]) + # Retrieve from tiered store + resp = self.store.get(idx) + if resp is not None: + cached_responses.append(resp) + self.cache_hits += 1 + else: + # Vector found but response evicted + hits[i] = False + cached_responses.append("") + else: + cached_responses.append("") + + return (hits, similarities.astype(np.float32), cached_responses) + + def _search_faiss(self, query_embedding, batch_size): + """CPU Faiss search - the baseline.""" + # Faiss inner product search + similarities, indices = self.faiss_index.search( + query_embedding.astype(np.float32), 1 + ) + similarities = similarities.flatten() + indices = indices.flatten() + + hits = similarities >= self.similarity_threshold + cached_responses = [] + + for i in range(batch_size): + if hits[i]: + idx = int(indices[i]) + # Retrieve from tiered store + resp = self.store.get(idx) + if resp is not None: + cached_responses.append(resp) + self.cache_hits += 1 + else: + # Vector found but response evicted + hits[i] = False + cached_responses.append("") + else: + cached_responses.append("") + + return (hits, similarities.astype(np.float32), cached_responses) + + def _insert(self, embedding, response_text): + """Insert a new entry into the cache with tiered storage.""" + with self.lock: + if self.entry_count >= self.max_entries: + return ( + np.array([False]), + np.array([0.0], dtype=np.float32), + ["Cache full"], + ) + + idx = self.entry_count + + if self.backend == "cagra": + import cupy as cp + + self.dataset[idx] = cp.asarray( + embedding[0], dtype=cp.float32 + ) + else: + # L2 normalize for inner product search + norm = np.linalg.norm(embedding, axis=1, keepdims=True) + normalized = embedding / (norm + 1e-8) + self.faiss_index.add(normalized.astype(np.float32)) + + # Store response in tiered store (not a plain dict) + self.store.put(idx, response_text) + self.entry_count += 1 + self.index_dirty = True + + return ( + np.array([True]), + np.array([1.0], dtype=np.float32), + ["Inserted"], + ) + + def finalize(self): + """Cleanup GPU resources and tiered store.""" + if hasattr(self, "index"): + del self.index + if hasattr(self, "dataset"): + del self.dataset + + # Log final metrics + metrics = self.store.get_metrics() + hit_rate = (self.cache_hits / max(self.total_searches, 1)) * 100 + + pb_utils.Logger.log( + f"[cagra_cache] Shutting down. " + f"Searches: {self.total_searches}, " + f"Hits: {self.cache_hits} ({hit_rate:.1f}%), " + f"Entries: {self.entry_count}, " + f"Hot: {metrics.get('hot_entries', 0)}, " + f"Warm promotions: {metrics.get('warm_promotions', 0)}, " + f"Warm demotions: {metrics.get('warm_demotions', 0)}", + pb_utils.Logger.INFO, + ) + + # Cleanup tiered store + self.store.finalize() diff --git a/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/tiered_store.py b/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/tiered_store.py new file mode 100644 index 00000000..40cbe5ec --- /dev/null +++ b/Conceptual_Guide/Part_9-gpu_semantic_caching/artifacts/tiered_store.py @@ -0,0 +1,452 @@ +""" +Tiered Response Store for GPU-accelerated semantic caching. + +Implements a three-tier response storage hierarchy: + + Hot (In-Memory LRU) -> Warm (Redis) -> Evicted + +Hot tier: + - OrderedDict with LRU eviction + - Sub-microsecond access for frequently-used responses + - Configurable max capacity (default: 10,000 entries) + +Warm tier: + - Redis with configurable TTL + - 1-5ms access latency + - Horizontal scaling via Redis Cluster + - Entries promoted to Hot on access + +Eviction flow: + - New inserts go to Hot tier + - When Hot tier is full, LRU entry demotes to Warm (Redis) + - When Warm tier TTL expires, entry is evicted entirely + - On cache hit from Warm tier, entry is promoted back to Hot + +Access tracking: + - Each entry tracks access count and last access time + - Eviction score = 1 / (access_count * recency_weight) + - Lower score = more likely to be evicted + +Architecture: + +-----------+ demote +-----------+ expire +-----------+ + | Hot | ------------> | Warm | ------------> | Evicted | + | (Memory) | <------------ | (Redis) | | (Gone) | + +-----------+ promote +-----------+ +-----------+ +""" + +from __future__ import annotations + +import json +import logging +import time +from collections import OrderedDict +from dataclasses import dataclass, field +from threading import Lock +from typing import Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class StoreEntry: + """A cached response with access metadata.""" + + response: str + access_count: int = 0 + last_access_time: float = 0.0 + created_time: float = 0.0 + size_bytes: int = 0 + + def touch(self) -> None: + """Record an access to this entry.""" + self.access_count += 1 + self.last_access_time = time.monotonic() + + def to_dict(self) -> dict: + """Serialize for Redis storage.""" + return { + "response": self.response, + "access_count": self.access_count, + "last_access_time": self.last_access_time, + "created_time": self.created_time, + "size_bytes": self.size_bytes, + } + + @classmethod + def from_dict(cls, data: dict) -> StoreEntry: + """Deserialize from Redis storage.""" + return cls( + response=data["response"], + access_count=data.get("access_count", 0), + last_access_time=data.get("last_access_time", 0.0), + created_time=data.get("created_time", 0.0), + size_bytes=data.get("size_bytes", 0), + ) + + +@dataclass +class TierMetrics: + """Per-tier performance metrics.""" + + gets: int = 0 + hits: int = 0 + puts: int = 0 + evictions: int = 0 + promotions: int = 0 + demotions: int = 0 + total_bytes: int = 0 + + @property + def hit_rate(self) -> float: + return self.hits / max(self.gets, 1) + + +@dataclass +class StoreMetrics: + """Aggregate metrics across all tiers.""" + + hot: TierMetrics = field(default_factory=TierMetrics) + warm: TierMetrics = field(default_factory=TierMetrics) + total_gets: int = 0 + total_hits: int = 0 + + @property + def overall_hit_rate(self) -> float: + return self.total_hits / max(self.total_gets, 1) + + def summary(self) -> dict: + """Return a serializable summary.""" + return { + "hot_entries": self.hot.puts - self.hot.evictions, + "hot_hit_rate": f"{self.hot.hit_rate:.2%}", + "hot_gets": self.hot.gets, + "hot_hits": self.hot.hits, + "hot_evictions": self.hot.evictions, + "warm_entries": self.warm.puts - self.warm.evictions, + "warm_hit_rate": f"{self.warm.hit_rate:.2%}", + "warm_gets": self.warm.gets, + "warm_hits": self.warm.hits, + "warm_promotions": self.warm.promotions, + "warm_demotions": self.warm.demotions, + "overall_hit_rate": f"{self.overall_hit_rate:.2%}", + "total_gets": self.total_gets, + "total_hits": self.total_hits, + } + + +class TieredResponseStore: + """Three-tier response store: Hot (memory) -> Warm (Redis) -> Evicted. + + Thread-safe via a single lock. The lock scope is kept minimal: + hot-tier dict operations are O(1) and Redis calls happen outside + the lock when possible. + + Parameters: + hot_capacity: Maximum entries in the hot (in-memory) tier. + redis_url: Redis connection URL (e.g., "redis://localhost:6379"). + If None, warm tier is disabled and evicted entries are lost. + redis_prefix: Key prefix for Redis entries (for namespace isolation). + redis_ttl_secs: TTL for warm-tier entries in Redis. + promote_on_warm_hit: Whether to promote entries from warm to hot + on access (default: True). + """ + + def __init__( + self, + hot_capacity: int = 10_000, + redis_url: Optional[str] = None, + redis_prefix: str = "triton_cache:", + redis_ttl_secs: int = 86400, + promote_on_warm_hit: bool = True, + ): + self._hot_capacity = hot_capacity + self._redis_prefix = redis_prefix + self._redis_ttl_secs = redis_ttl_secs + self._promote_on_warm_hit = promote_on_warm_hit + self._lock = Lock() + + # Hot tier: OrderedDict for O(1) LRU + # move_to_end() on access, popitem(last=False) for eviction + self._hot: OrderedDict[int, StoreEntry] = OrderedDict() + + # Warm tier: Redis connection (lazy) + self._redis = None + self._redis_url = redis_url + if redis_url: + self._connect_redis(redis_url) + + # Metrics + self.metrics = StoreMetrics() + + def _connect_redis(self, url: str) -> None: + """Establish Redis connection.""" + try: + import redis + + self._redis = redis.Redis.from_url( + url, + decode_responses=True, + socket_connect_timeout=5, + socket_timeout=2, + retry_on_timeout=True, + ) + # Test connection + self._redis.ping() + logger.info("Warm tier connected: %s", url) + except ImportError: + logger.warning( + "redis package not installed. Warm tier disabled. " + "Install with: pip install redis" + ) + self._redis = None + except Exception as e: + logger.warning( + "Failed to connect to Redis at %s: %s. " + "Warm tier disabled.", + url, + e, + ) + self._redis = None + + def get(self, idx: int) -> Optional[str]: + """Retrieve a cached response by index. + + Checks hot tier first, then warm tier. Promotes warm hits + to hot tier if configured. + + Returns: + The cached response string, or None if not found. + """ + self.metrics.total_gets += 1 + + # Check hot tier + with self._lock: + self.metrics.hot.gets += 1 + entry = self._hot.get(idx) + if entry is not None: + entry.touch() + self._hot.move_to_end(idx) + self.metrics.hot.hits += 1 + self.metrics.total_hits += 1 + return entry.response + + # Check warm tier (Redis) - outside lock for I/O + if self._redis is not None: + self.metrics.warm.gets += 1 + warm_entry = self._get_from_redis(idx) + if warm_entry is not None: + self.metrics.warm.hits += 1 + self.metrics.total_hits += 1 + + # Promote to hot tier + if self._promote_on_warm_hit: + warm_entry.touch() + self._promote(idx, warm_entry) + + return warm_entry.response + + return None + + def put(self, idx: int, response: str) -> None: + """Store a response in the hot tier. + + If the hot tier is full, the LRU entry is demoted to + the warm tier (Redis) before insertion. + + Args: + idx: Cache index (maps to CAGRA index position). + response: The cached response string. + """ + now = time.monotonic() + entry = StoreEntry( + response=response, + access_count=1, + last_access_time=now, + created_time=now, + size_bytes=len(response.encode("utf-8")), + ) + + demoted_idx = None + demoted_entry = None + + with self._lock: + # If already in hot, just update + if idx in self._hot: + self._hot[idx] = entry + self._hot.move_to_end(idx) + return + + # Evict LRU if at capacity + if len(self._hot) >= self._hot_capacity: + demoted_idx, demoted_entry = self._hot.popitem(last=False) + self.metrics.hot.evictions += 1 + + self._hot[idx] = entry + self._hot.move_to_end(idx) + self.metrics.hot.puts += 1 + + # Demote evicted entry to warm tier (outside lock) + if demoted_entry is not None and self._redis is not None: + self._demote(demoted_idx, demoted_entry) + + def delete(self, idx: int) -> bool: + """Remove an entry from all tiers. + + Returns: + True if the entry was found and removed. + """ + found = False + + with self._lock: + if idx in self._hot: + del self._hot[idx] + found = True + + if self._redis is not None: + key = f"{self._redis_prefix}{idx}" + try: + if self._redis.delete(key): + found = True + except Exception as e: + logger.warning("Redis delete failed for %s: %s", key, e) + + return found + + def clear(self) -> None: + """Remove all entries from all tiers.""" + with self._lock: + self._hot.clear() + + if self._redis is not None: + self._clear_redis() + + @property + def hot_size(self) -> int: + """Number of entries in the hot tier.""" + return len(self._hot) + + @property + def hot_capacity(self) -> int: + """Maximum hot tier capacity.""" + return self._hot_capacity + + @property + def has_warm_tier(self) -> bool: + """Whether the warm (Redis) tier is available.""" + return self._redis is not None + + def warm_size(self) -> int: + """Approximate number of entries in the warm tier. + + Uses Redis DBSIZE which includes all keys, so this is only + accurate when the Redis instance is dedicated to the cache. + """ + if self._redis is None: + return 0 + try: + # Count keys matching our prefix + count = 0 + cursor = 0 + while True: + cursor, keys = self._redis.scan( + cursor=cursor, + match=f"{self._redis_prefix}*", + count=1000, + ) + count += len(keys) + if cursor == 0: + break + return count + except Exception: + return 0 + + def get_metrics(self) -> dict: + """Return current metrics as a serializable dict.""" + return self.metrics.summary() + + # --------------------------------------------------------------- + # Internal: Redis operations + # --------------------------------------------------------------- + + def _get_from_redis(self, idx: int) -> Optional[StoreEntry]: + """Fetch an entry from Redis warm tier.""" + key = f"{self._redis_prefix}{idx}" + try: + data = self._redis.get(key) + if data is None: + return None + return StoreEntry.from_dict(json.loads(data)) + except Exception as e: + logger.warning("Redis get failed for %s: %s", key, e) + return None + + def _demote(self, idx: int, entry: StoreEntry) -> None: + """Demote an entry from hot to warm (Redis).""" + key = f"{self._redis_prefix}{idx}" + try: + self._redis.setex( + key, + self._redis_ttl_secs, + json.dumps(entry.to_dict()), + ) + self.metrics.warm.puts += 1 + self.metrics.warm.demotions += 1 + logger.debug( + "Demoted entry %d to warm tier (accesses=%d)", + idx, + entry.access_count, + ) + except Exception as e: + logger.warning("Redis demote failed for %s: %s", key, e) + + def _promote(self, idx: int, entry: StoreEntry) -> None: + """Promote an entry from warm (Redis) to hot.""" + demoted_idx = None + demoted_entry = None + + with self._lock: + # Evict LRU from hot if at capacity + if len(self._hot) >= self._hot_capacity: + demoted_idx, demoted_entry = self._hot.popitem(last=False) + self.metrics.hot.evictions += 1 + + self._hot[idx] = entry + self._hot.move_to_end(idx) + self.metrics.warm.promotions += 1 + + # Remove promoted entry from Redis + key = f"{self._redis_prefix}{idx}" + try: + self._redis.delete(key) + except Exception as e: + logger.warning("Redis delete (promote) failed for %s: %s", key, e) + + # Demote the evicted hot entry to warm + if demoted_entry is not None: + self._demote(demoted_idx, demoted_entry) + + def _clear_redis(self) -> None: + """Remove all cache entries from Redis.""" + try: + cursor = 0 + while True: + cursor, keys = self._redis.scan( + cursor=cursor, + match=f"{self._redis_prefix}*", + count=1000, + ) + if keys: + self._redis.delete(*keys) + if cursor == 0: + break + except Exception as e: + logger.warning("Redis clear failed: %s", e) + + def finalize(self) -> None: + """Clean up resources.""" + if self._redis is not None: + try: + self._redis.close() + except Exception: + pass + self._redis = None diff --git a/Conceptual_Guide/README.md b/Conceptual_Guide/README.md index d0a44b5c..a87dcd6a 100644 --- a/Conceptual_Guide/README.md +++ b/Conceptual_Guide/README.md @@ -41,3 +41,4 @@ Conceptual guides have been designed as an onboarding experience to Triton Infer * [Part 6: Using the BLS API to build complex pipelines](Part_6-building_complex_pipelines/): Often times there are scenarios where the pipeline requires control flows. Learn how to work with complex pipelines with models deployed on different backends. * [Part 7: Iterative Scheduling Tutorial](./Part_7-iterative_scheduling): Shows how to use the Triton Iterative Scheduler with a GPT2 model using HuggingFace Transformers. * [Part 8: Semantic Caching](./Part_8-semantic_caching/): Shows benefits of adding semantic caching to you LLM-based workflow. +* [Part 9: GPU-Accelerated Semantic Caching](./Part_9-gpu_semantic_caching/): Extends Part 8 with GPU-accelerated vector search using cuVS CAGRA and tiered response storage, achieving up to 817x speedup over CPU Faiss at 100K cache entries.