diff --git a/crates/openquant/Cargo.toml b/crates/openquant/Cargo.toml index fda9eb8..398b24d 100644 --- a/crates/openquant/Cargo.toml +++ b/crates/openquant/Cargo.toml @@ -39,3 +39,7 @@ harness = false [[bench]] name = "hpc_parallel_scaling" harness = false + +[[bench]] +name = "streaming_hpc_scaling" +harness = false diff --git a/crates/openquant/benches/streaming_hpc_scaling.rs b/crates/openquant/benches/streaming_hpc_scaling.rs new file mode 100644 index 0000000..08a82c7 --- /dev/null +++ b/crates/openquant/benches/streaming_hpc_scaling.rs @@ -0,0 +1,79 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use openquant::hpc_parallel::{ExecutionMode, HpcParallelConfig, PartitionStrategy}; +use openquant::streaming_hpc::{ + generate_synthetic_flash_crash_stream, run_streaming_pipeline_parallel, AlertThresholds, + HhiConfig, StreamingPipelineConfig, SyntheticStreamConfig, VpinConfig, +}; +use std::thread; + +fn pipeline_cfg() -> StreamingPipelineConfig { + StreamingPipelineConfig { + vpin: VpinConfig { bucket_volume: 1_000.0, support_buckets: 20 }, + hhi: HhiConfig { lookback_events: 200 }, + thresholds: AlertThresholds { vpin: 0.45, hhi: 0.30 }, + } +} + +fn make_streams(streams: usize, events: usize) -> Vec> { + (0..streams) + .map(|k| { + let mut s = generate_synthetic_flash_crash_stream(SyntheticStreamConfig { + events, + crash_start_fraction: 0.70, + calm_venues: 8, + shock_venue: k % 2, + }) + .expect("synthetic stream"); + for event in &mut s { + event.price *= 1.0 + k as f64 * 1e-6; + } + s + }) + .collect() +} + +fn bench_streaming_hpc_scaling(c: &mut Criterion) { + let streams = make_streams(48, 4_000); + let available = thread::available_parallelism().map_or(4, |n| n.get()); + let mut thread_options = vec![1usize, 2, 4, 8]; + thread_options.retain(|n| *n <= available.max(1)); + if !thread_options.contains(&available) && available <= 16 { + thread_options.push(available); + } + thread_options.sort_unstable(); + thread_options.dedup(); + + let mut group = c.benchmark_group("streaming_hpc/scaling"); + for num_threads in thread_options { + for mp_batches in [1usize, 2, 4, 8] { + let mode = if num_threads == 1 { + ExecutionMode::Serial + } else { + ExecutionMode::Threaded { num_threads } + }; + let bench_id = BenchmarkId::new( + format!("threads_{num_threads}"), + format!("mp_batches_{mp_batches}"), + ); + group.bench_with_input(bench_id, &(mode, mp_batches), |b, (mode, mp_batches)| { + b.iter(|| { + let _ = run_streaming_pipeline_parallel( + &streams, + pipeline_cfg(), + HpcParallelConfig { + mode: *mode, + partition: PartitionStrategy::Linear, + mp_batches: *mp_batches, + progress_every: 16, + }, + ) + .expect("parallel streaming run should succeed"); + }); + }); + } + } + group.finish(); +} + +criterion_group!(streaming_hpc_scaling, bench_streaming_hpc_scaling); +criterion_main!(streaming_hpc_scaling); diff --git a/crates/openquant/src/lib.rs b/crates/openquant/src/lib.rs index 54e2acc..4b03826 100644 --- a/crates/openquant/src/lib.rs +++ b/crates/openquant/src/lib.rs @@ -27,6 +27,7 @@ pub mod sample_weights; pub mod sampling; pub mod sb_bagging; pub mod strategy_risk; +pub mod streaming_hpc; pub mod structural_breaks; pub mod synthetic_backtesting; pub mod util; diff --git a/crates/openquant/src/streaming_hpc.rs b/crates/openquant/src/streaming_hpc.rs new file mode 100644 index 0000000..7e393a7 --- /dev/null +++ b/crates/openquant/src/streaming_hpc.rs @@ -0,0 +1,461 @@ +//! AFML Chapter 22: streaming analytics utilities for low-latency early warning. +//! +//! This module emphasizes bounded-memory, incremental updates suitable for +//! near-real-time decision workflows. It includes: +//! - VPIN-like flow-toxicity tracking with rolling volume buckets, +//! - HHI-style market fragmentation concentration tracking over rolling windows, +//! - an event-by-event early-warning pipeline, and +//! - serial/parallel execution helpers for multi-stream workloads. + +use crate::hpc_parallel::{run_parallel, HpcParallelConfig, HpcParallelError, ParallelRunReport}; +use std::collections::{HashMap, VecDeque}; +use std::fmt::{Display, Formatter}; +use std::time::{Duration, Instant}; + +#[derive(Debug, Clone, PartialEq)] +pub enum StreamingHpcError { + InvalidConfig(&'static str), + InvalidEvent(&'static str), + Parallel(HpcParallelError), +} + +impl Display for StreamingHpcError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidConfig(msg) => write!(f, "invalid streaming HPC config: {msg}"), + Self::InvalidEvent(msg) => write!(f, "invalid streaming event: {msg}"), + Self::Parallel(err) => write!(f, "{err}"), + } + } +} + +impl std::error::Error for StreamingHpcError {} + +impl From for StreamingHpcError { + fn from(value: HpcParallelError) -> Self { + Self::Parallel(value) + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct StreamEvent { + pub timestamp_ns: i64, + pub price: f64, + pub buy_volume: f64, + pub sell_volume: f64, + pub venue_id: usize, +} + +impl StreamEvent { + pub fn total_volume(self) -> f64 { + self.buy_volume + self.sell_volume + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct VpinConfig { + /// Volume in each bucket. + pub bucket_volume: f64, + /// Number of completed buckets in rolling VPIN window. + pub support_buckets: usize, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct HhiConfig { + /// Number of events in rolling concentration window. + pub lookback_events: usize, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct AlertThresholds { + pub vpin: f64, + pub hhi: f64, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct StreamingPipelineConfig { + pub vpin: VpinConfig, + pub hhi: HhiConfig, + pub thresholds: AlertThresholds, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct EarlyWarningSnapshot { + pub timestamp_ns: i64, + pub price: f64, + pub vpin: Option, + pub hhi: Option, + /// Simple normalized alert score for operations dashboards. + pub normalized_risk_score: Option, + pub is_alert: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct StreamingRunMetrics { + pub processed_events: usize, + pub events_per_sec: f64, + pub avg_event_latency_micros: f64, + pub max_event_latency_micros: f64, + pub runtime: Duration, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct StreamingRunReport { + pub snapshots: Vec, + pub metrics: StreamingRunMetrics, + pub alert_count: usize, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct StreamSummary { + pub processed_events: usize, + pub alert_count: usize, + pub latest_vpin: Option, + pub latest_hhi: Option, + pub latest_risk_score: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ParallelStreamingReport { + pub stream_summaries: Vec, + pub parallel_metrics: crate::hpc_parallel::HpcParallelMetrics, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct VpinState { + cfg: VpinConfig, + window: VecDeque, + window_sum: f64, + current_bucket_abs_imbalance: f64, + current_bucket_volume: f64, +} + +impl VpinState { + pub fn new(cfg: VpinConfig) -> Result { + if !cfg.bucket_volume.is_finite() || cfg.bucket_volume <= 0.0 { + return Err(StreamingHpcError::InvalidConfig( + "vpin.bucket_volume must be finite and > 0", + )); + } + if cfg.support_buckets == 0 { + return Err(StreamingHpcError::InvalidConfig("vpin.support_buckets must be > 0")); + } + Ok(Self { + cfg, + window: VecDeque::with_capacity(cfg.support_buckets), + window_sum: 0.0, + current_bucket_abs_imbalance: 0.0, + current_bucket_volume: 0.0, + }) + } + + pub fn update( + &mut self, + mut buy_volume: f64, + mut sell_volume: f64, + ) -> Result, StreamingHpcError> { + validate_non_negative_finite("buy_volume", buy_volume)?; + validate_non_negative_finite("sell_volume", sell_volume)?; + let mut remaining = buy_volume + sell_volume; + if remaining == 0.0 { + return Ok(self.current()); + } + while remaining > 0.0 { + let capacity = self.cfg.bucket_volume - self.current_bucket_volume; + let take = remaining.min(capacity); + if take <= 0.0 { + break; + } + // Preserve buy/sell ratio within partial fill. + let ratio_buy = if remaining > 0.0 { buy_volume / remaining } else { 0.5 }; + let used_buy = take * ratio_buy; + let used_sell = take - used_buy; + + self.current_bucket_volume += take; + self.current_bucket_abs_imbalance += (used_buy - used_sell).abs(); + + buy_volume -= used_buy; + sell_volume -= used_sell; + remaining -= take; + + if self.current_bucket_volume >= self.cfg.bucket_volume - 1e-12 { + let toxicity = self.current_bucket_abs_imbalance / self.cfg.bucket_volume; + self.window.push_back(toxicity); + self.window_sum += toxicity; + if self.window.len() > self.cfg.support_buckets { + if let Some(expired) = self.window.pop_front() { + self.window_sum -= expired; + } + } + self.current_bucket_volume = 0.0; + self.current_bucket_abs_imbalance = 0.0; + } + } + Ok(self.current()) + } + + pub fn current(&self) -> Option { + if self.window.len() < self.cfg.support_buckets { + None + } else { + Some(self.window_sum / self.window.len() as f64) + } + } + + pub fn completed_buckets(&self) -> usize { + self.window.len() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct HhiState { + cfg: HhiConfig, + window: VecDeque, + venue_counts: HashMap, + sum_sq_counts: usize, +} + +impl HhiState { + pub fn new(cfg: HhiConfig) -> Result { + if cfg.lookback_events == 0 { + return Err(StreamingHpcError::InvalidConfig("hhi.lookback_events must be > 0")); + } + Ok(Self { + cfg, + window: VecDeque::with_capacity(cfg.lookback_events), + venue_counts: HashMap::new(), + sum_sq_counts: 0, + }) + } + + pub fn update(&mut self, venue_id: usize) -> Option { + self.window.push_back(venue_id); + let count_before = *self.venue_counts.get(&venue_id).unwrap_or(&0); + self.sum_sq_counts += 2 * count_before + 1; + self.venue_counts.insert(venue_id, count_before + 1); + + if self.window.len() > self.cfg.lookback_events { + if let Some(expired) = self.window.pop_front() { + let old_count = *self.venue_counts.get(&expired).unwrap_or(&0); + if old_count > 0 { + self.sum_sq_counts = self.sum_sq_counts.saturating_sub(2 * old_count - 1); + if old_count == 1 { + self.venue_counts.remove(&expired); + } else { + self.venue_counts.insert(expired, old_count - 1); + } + } + } + } + self.current() + } + + pub fn current(&self) -> Option { + let n = self.window.len(); + if n < self.cfg.lookback_events || n == 0 { + return None; + } + let denom = (n * n) as f64; + Some(self.sum_sq_counts as f64 / denom) + } + + pub fn window_len(&self) -> usize { + self.window.len() + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct StreamingEarlyWarningEngine { + cfg: StreamingPipelineConfig, + vpin_state: VpinState, + hhi_state: HhiState, +} + +impl StreamingEarlyWarningEngine { + pub fn new(cfg: StreamingPipelineConfig) -> Result { + if !cfg.thresholds.vpin.is_finite() || cfg.thresholds.vpin <= 0.0 { + return Err(StreamingHpcError::InvalidConfig("thresholds.vpin must be finite and > 0")); + } + if !cfg.thresholds.hhi.is_finite() || cfg.thresholds.hhi <= 0.0 { + return Err(StreamingHpcError::InvalidConfig("thresholds.hhi must be finite and > 0")); + } + Ok(Self { vpin_state: VpinState::new(cfg.vpin)?, hhi_state: HhiState::new(cfg.hhi)?, cfg }) + } + + pub fn on_event( + &mut self, + event: StreamEvent, + ) -> Result { + validate_event(event)?; + let vpin = self.vpin_state.update(event.buy_volume, event.sell_volume)?; + let hhi = self.hhi_state.update(event.venue_id); + let normalized_risk_score = match (vpin, hhi) { + (Some(v), Some(h)) => { + Some(0.5 * (v / self.cfg.thresholds.vpin + h / self.cfg.thresholds.hhi)) + } + _ => None, + }; + let is_alert = match (vpin, hhi) { + (Some(v), Some(h)) => v >= self.cfg.thresholds.vpin && h >= self.cfg.thresholds.hhi, + _ => false, + }; + Ok(EarlyWarningSnapshot { + timestamp_ns: event.timestamp_ns, + price: event.price, + vpin, + hhi, + normalized_risk_score, + is_alert, + }) + } +} + +pub fn run_streaming_pipeline( + events: &[StreamEvent], + cfg: StreamingPipelineConfig, +) -> Result { + let mut engine = StreamingEarlyWarningEngine::new(cfg)?; + let mut snapshots = Vec::with_capacity(events.len()); + let mut alert_count = 0usize; + let mut total_event_latency = Duration::ZERO; + let mut max_event_latency = Duration::ZERO; + let started = Instant::now(); + + for event in events { + let t0 = Instant::now(); + let snapshot = engine.on_event(*event)?; + let elapsed = t0.elapsed(); + total_event_latency += elapsed; + if elapsed > max_event_latency { + max_event_latency = elapsed; + } + if snapshot.is_alert { + alert_count += 1; + } + snapshots.push(snapshot); + } + + let runtime = started.elapsed(); + let processed = events.len(); + let runtime_secs = runtime.as_secs_f64(); + let events_per_sec = if runtime_secs > 0.0 { processed as f64 / runtime_secs } else { 0.0 }; + let avg_event_latency_micros = if processed > 0 { + total_event_latency.as_secs_f64() * 1_000_000.0 / processed as f64 + } else { + 0.0 + }; + let max_event_latency_micros = max_event_latency.as_secs_f64() * 1_000_000.0; + + Ok(StreamingRunReport { + snapshots, + metrics: StreamingRunMetrics { + processed_events: processed, + events_per_sec, + avg_event_latency_micros, + max_event_latency_micros, + runtime, + }, + alert_count, + }) +} + +pub fn run_streaming_pipeline_parallel( + streams: &[Vec], + pipeline_cfg: StreamingPipelineConfig, + parallel_cfg: HpcParallelConfig, +) -> Result { + let report: ParallelRunReport> = + run_parallel(streams, parallel_cfg, |chunk| { + let mut summaries = Vec::with_capacity(chunk.len()); + for stream in chunk { + let run = run_streaming_pipeline(stream, pipeline_cfg) + .map_err(|err| format!("stream pipeline failed: {err}"))?; + let last = run.snapshots.last(); + summaries.push(StreamSummary { + processed_events: run.metrics.processed_events, + alert_count: run.alert_count, + latest_vpin: last.and_then(|s| s.vpin), + latest_hhi: last.and_then(|s| s.hhi), + latest_risk_score: last.and_then(|s| s.normalized_risk_score), + }); + } + Ok::, String>(summaries) + })?; + + let mut stream_summaries = Vec::new(); + for batch in report.outputs { + stream_summaries.extend(batch); + } + Ok(ParallelStreamingReport { stream_summaries, parallel_metrics: report.metrics }) +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct SyntheticStreamConfig { + pub events: usize, + pub crash_start_fraction: f64, + pub calm_venues: usize, + pub shock_venue: usize, +} + +pub fn generate_synthetic_flash_crash_stream( + cfg: SyntheticStreamConfig, +) -> Result, StreamingHpcError> { + if cfg.events == 0 { + return Err(StreamingHpcError::InvalidConfig("synthetic events must be > 0")); + } + if !cfg.crash_start_fraction.is_finite() + || cfg.crash_start_fraction <= 0.0 + || cfg.crash_start_fraction >= 1.0 + { + return Err(StreamingHpcError::InvalidConfig( + "crash_start_fraction must be finite and in (0, 1)", + )); + } + if cfg.calm_venues == 0 { + return Err(StreamingHpcError::InvalidConfig("calm_venues must be > 0")); + } + + let crash_start = (cfg.events as f64 * cfg.crash_start_fraction).round() as usize; + let mut events = Vec::with_capacity(cfg.events); + let mut price = 100.0; + for i in 0..cfg.events { + let in_shock = i >= crash_start; + let venue = if in_shock { cfg.shock_venue } else { i % cfg.calm_venues }; + let (buy_volume, sell_volume, drift) = if in_shock { + // Toxic order flow and downside pressure during flash-crash regime. + (80.0, 320.0, -0.0025) + } else { + (120.0, 130.0, 0.0001) + }; + price *= 1.0 + drift; + events.push(StreamEvent { + timestamp_ns: i as i64 * 1_000_000, + price, + buy_volume, + sell_volume, + venue_id: venue, + }); + } + Ok(events) +} + +fn validate_non_negative_finite(name: &'static str, value: f64) -> Result<(), StreamingHpcError> { + if !value.is_finite() || value < 0.0 { + return Err(StreamingHpcError::InvalidEvent(name)); + } + Ok(()) +} + +fn validate_event(event: StreamEvent) -> Result<(), StreamingHpcError> { + if !event.price.is_finite() || event.price <= 0.0 { + return Err(StreamingHpcError::InvalidEvent("price must be finite and > 0")); + } + validate_non_negative_finite("buy_volume", event.buy_volume)?; + validate_non_negative_finite("sell_volume", event.sell_volume)?; + if event.total_volume() <= 0.0 { + return Err(StreamingHpcError::InvalidEvent( + "event must have strictly positive total volume", + )); + } + Ok(()) +} diff --git a/crates/openquant/tests/streaming_hpc.rs b/crates/openquant/tests/streaming_hpc.rs new file mode 100644 index 0000000..5351045 --- /dev/null +++ b/crates/openquant/tests/streaming_hpc.rs @@ -0,0 +1,122 @@ +use openquant::hpc_parallel::{ExecutionMode, HpcParallelConfig, PartitionStrategy}; +use openquant::streaming_hpc::{ + generate_synthetic_flash_crash_stream, run_streaming_pipeline, run_streaming_pipeline_parallel, + AlertThresholds, HhiConfig, StreamEvent, StreamingPipelineConfig, SyntheticStreamConfig, + VpinConfig, VpinState, +}; + +fn pipeline_cfg() -> StreamingPipelineConfig { + StreamingPipelineConfig { + vpin: VpinConfig { bucket_volume: 1_000.0, support_buckets: 10 }, + hhi: HhiConfig { lookback_events: 120 }, + thresholds: AlertThresholds { vpin: 0.45, hhi: 0.30 }, + } +} + +#[test] +fn bounded_memory_vpin_window_size() { + let mut state = + VpinState::new(VpinConfig { bucket_volume: 100.0, support_buckets: 8 }).expect("valid"); + for _ in 0..10_000 { + let _ = state.update(40.0, 60.0).expect("update"); + } + assert!(state.completed_buckets() <= 8); +} + +#[test] +fn flash_crash_segment_raises_early_warning_metrics() { + let events = generate_synthetic_flash_crash_stream(SyntheticStreamConfig { + events: 4_000, + crash_start_fraction: 0.70, + calm_venues: 8, + shock_venue: 0, + }) + .expect("synthetic stream"); + let report = run_streaming_pipeline(&events, pipeline_cfg()).expect("pipeline run"); + + let split = (events.len() as f64 * 0.70) as usize; + let pre = &report.snapshots[split / 2..split]; + let post = &report.snapshots[split..(split + split / 4)]; + + let pre_vpin = pre.iter().filter_map(|s| s.vpin).sum::() + / pre.iter().filter(|s| s.vpin.is_some()).count() as f64; + let post_vpin = post.iter().filter_map(|s| s.vpin).sum::() + / post.iter().filter(|s| s.vpin.is_some()).count() as f64; + let pre_hhi = pre.iter().filter_map(|s| s.hhi).sum::() + / pre.iter().filter(|s| s.hhi.is_some()).count() as f64; + let post_hhi = post.iter().filter_map(|s| s.hhi).sum::() + / post.iter().filter(|s| s.hhi.is_some()).count() as f64; + + assert!(post_vpin > pre_vpin); + assert!(post_hhi > pre_hhi); + assert!(report.alert_count > 0); +} + +#[test] +fn serial_and_parallel_grouped_runs_agree_on_terminal_state() { + let mut streams = Vec::new(); + for k in 0..12 { + let mut stream = generate_synthetic_flash_crash_stream(SyntheticStreamConfig { + events: 1_500, + crash_start_fraction: 0.65, + calm_venues: 6, + shock_venue: k % 3, + }) + .expect("stream"); + // Make streams non-identical while preserving deterministic order. + for event in &mut stream { + event.price *= 1.0 + k as f64 * 1e-6; + } + streams.push(stream); + } + + let serial: Vec<_> = streams + .iter() + .map(|s| run_streaming_pipeline(s, pipeline_cfg()).expect("serial run")) + .collect(); + + let parallel = run_streaming_pipeline_parallel( + &streams, + pipeline_cfg(), + HpcParallelConfig { + mode: ExecutionMode::Threaded { num_threads: 4 }, + partition: PartitionStrategy::Linear, + mp_batches: 3, + progress_every: 2, + }, + ) + .expect("parallel run"); + + assert_eq!(parallel.stream_summaries.len(), streams.len()); + for (lhs, rhs) in serial.iter().zip(parallel.stream_summaries.iter()) { + let last = lhs.snapshots.last().expect("non-empty stream"); + assert_eq!(lhs.metrics.processed_events, rhs.processed_events); + assert_eq!(lhs.alert_count, rhs.alert_count); + assert_eq!(last.vpin, rhs.latest_vpin); + assert_eq!(last.hhi, rhs.latest_hhi); + } +} + +#[test] +fn supports_large_synthetic_stream_incrementally() { + let mut events = Vec::with_capacity(50_000); + let mut price = 100.0; + for i in 0..50_000 { + let venue = i % 10; + let (buy, sell, drift) = + if i % 5000 >= 4200 { (90.0, 280.0, -0.0012) } else { (140.0, 150.0, 0.00005) }; + price *= 1.0 + drift; + events.push(StreamEvent { + timestamp_ns: i as i64 * 500_000, + price, + buy_volume: buy, + sell_volume: sell, + venue_id: venue, + }); + } + let report = run_streaming_pipeline(&events, pipeline_cfg()).expect("large run should succeed"); + assert_eq!(report.metrics.processed_events, 50_000); + assert!(report.metrics.events_per_sec > 0.0); + assert!(report.snapshots.last().and_then(|s| s.vpin).is_some()); + assert!(report.snapshots.last().and_then(|s| s.hhi).is_some()); +} diff --git a/docs-site/src/data/afmlDocsState.ts b/docs-site/src/data/afmlDocsState.ts index d91f4ca..8d38685 100644 --- a/docs-site/src/data/afmlDocsState.ts +++ b/docs-site/src/data/afmlDocsState.ts @@ -359,6 +359,20 @@ export const afmlDocsState = { "status": "done" } ] + }, + { + "chapter": "CHAPTER 22", + "theme": "High-performance computing for streaming analytics", + "status": "done", + "chunkCount": 0, + "sections": [ + { + "id": "chapter-22-streaming_hpc", + "module": "streaming_hpc", + "slug": "streaming-hpc", + "status": "done" + } + ] } ] } as const; diff --git a/docs-site/src/data/moduleDocs.ts b/docs-site/src/data/moduleDocs.ts index b66e7d2..acc88d1 100644 --- a/docs-site/src/data/moduleDocs.ts +++ b/docs-site/src/data/moduleDocs.ts @@ -783,6 +783,52 @@ export const moduleDocs: ModuleDoc[] = [ "Use adapter interfaces to compare heuristic/external solvers against exact solutions on small calibration instances before production deployment.", ], }, + { + slug: "streaming-hpc", + module: "streaming_hpc", + subject: "Scaling, HPC and Infrastructure", + summary: + "AFML Chapter 22 streaming analytics utilities for low-latency early-warning metrics with bounded-memory incremental state.", + whyItExists: + "Streaming decisions are turnaround-time constrained; this module maintains VPIN/HHI-style indicators incrementally and supports multi-stream scaling across cores/chunk sizes.", + keyApis: [ + "StreamEvent", + "VpinState", + "HhiState", + "StreamingEarlyWarningEngine", + "run_streaming_pipeline", + "run_streaming_pipeline_parallel", + "generate_synthetic_flash_crash_stream", + "StreamingPipelineConfig", + "StreamingRunMetrics", + ], + formulas: [ + { + label: "VPIN (Rolling Buckets)", + latex: "\\mathrm{VPIN}_t=\\frac{1}{N}\\sum_{i=t-N+1}^{t}\\frac{|V_i^B-V_i^S|}{V_i}", + }, + { + label: "Market Fragmentation HHI", + latex: "\\mathrm{HHI}_t=\\sum_{v=1}^{K}\\left(\\frac{n_{v,t}}{\\sum_j n_{j,t}}\\right)^2", + }, + { + label: "Streaming Throughput", + latex: "\\mathrm{throughput}=\\frac{\\#\\mathrm{events\\ processed}}{\\mathrm{runtime\\ seconds}}", + }, + ], + examples: [ + { + title: "Incremental early-warning pipeline on streaming trades", + language: "rust", + code: `use openquant::hpc_parallel::{ExecutionMode, HpcParallelConfig, PartitionStrategy};\nuse openquant::streaming_hpc::{\n run_streaming_pipeline_parallel, AlertThresholds, HhiConfig, StreamingPipelineConfig,\n SyntheticStreamConfig, VpinConfig, generate_synthetic_flash_crash_stream,\n};\n\nlet streams: Vec<_> = (0..16)\n .map(|k| generate_synthetic_flash_crash_stream(SyntheticStreamConfig {\n events: 2_000,\n crash_start_fraction: 0.7,\n calm_venues: 8,\n shock_venue: k % 2,\n }))\n .collect::, _>>()?;\n\nlet report = run_streaming_pipeline_parallel(\n &streams,\n StreamingPipelineConfig {\n vpin: VpinConfig { bucket_volume: 1_000.0, support_buckets: 20 },\n hhi: HhiConfig { lookback_events: 200 },\n thresholds: AlertThresholds { vpin: 0.45, hhi: 0.30 },\n },\n HpcParallelConfig {\n mode: ExecutionMode::Threaded { num_threads: 8 },\n partition: PartitionStrategy::Linear,\n mp_batches: 4,\n progress_every: 8,\n },\n)?;\n\nprintln!(\"streams={} molecules={} events/s={:.0}\",\n report.stream_summaries.len(),\n report.parallel_metrics.molecules_total,\n report.parallel_metrics.throughput_atoms_per_sec\n);`, + }, + ], + notes: [ + "Chapter 22 stresses turnaround-time over pure throughput: bounded rolling windows avoid unbounded latency/memory growth.", + "For low-latency alerts, keep stream partitioning stable and calibrate `mp_batches` against scheduling overhead and cache locality.", + "Use synthetic flash-crash replays to validate that warning thresholds react early without excessive false positives.", + ], + }, { slug: "sample-weights", module: "sample_weights",