From 7bed198304cea9ec3e90242f87430bd23205ecef Mon Sep 17 00:00:00 2001 From: Sean Koval Date: Fri, 13 Feb 2026 19:12:52 -0500 Subject: [PATCH] feat(openquant): add AFML Ch20 hpc_parallel execution module --- crates/openquant/Cargo.toml | 4 + .../openquant/benches/hpc_parallel_scaling.rs | 82 ++++ crates/openquant/src/hpc_parallel.rs | 457 ++++++++++++++++++ crates/openquant/src/lib.rs | 1 + crates/openquant/tests/hpc_parallel.rs | 132 +++++ docs-site/src/data/afmlDocsState.ts | 14 + docs-site/src/data/moduleDocs.ts | 43 ++ 7 files changed, 733 insertions(+) create mode 100644 crates/openquant/benches/hpc_parallel_scaling.rs create mode 100644 crates/openquant/src/hpc_parallel.rs create mode 100644 crates/openquant/tests/hpc_parallel.rs diff --git a/crates/openquant/Cargo.toml b/crates/openquant/Cargo.toml index 053b1c5..fda9eb8 100644 --- a/crates/openquant/Cargo.toml +++ b/crates/openquant/Cargo.toml @@ -35,3 +35,7 @@ harness = false [[bench]] name = "synthetic_ticker_pipeline" harness = false + +[[bench]] +name = "hpc_parallel_scaling" +harness = false diff --git a/crates/openquant/benches/hpc_parallel_scaling.rs b/crates/openquant/benches/hpc_parallel_scaling.rs new file mode 100644 index 0000000..0c66bdb --- /dev/null +++ b/crates/openquant/benches/hpc_parallel_scaling.rs @@ -0,0 +1,82 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use openquant::hpc_parallel::{run_parallel, ExecutionMode, HpcParallelConfig, PartitionStrategy}; +use std::thread; + +fn heavy_atoms(n: usize) -> Vec<(usize, f64)> { + (0..n) + .map(|i| { + let x = i as f64; + (i, (x / 7.0).sin() + (x / 23.0).cos()) + }) + .collect() +} + +fn workload(chunk: &[(usize, f64)]) -> f64 { + let mut acc = 0.0; + for (idx, x) in chunk { + // Cost increases with atom index to make partition choice matter. + let reps = 16 + (*idx / 32); + let mut local = *x; + for k in 0..reps { + local = (local + (k as f64 + 1.0)).sin().abs(); + } + acc += local; + } + acc +} + +fn bench_hpc_parallel_scaling(c: &mut Criterion) { + let atoms = heavy_atoms(4_096); + let threaded = thread::available_parallelism().map_or(4, |n| n.get().clamp(2, 8)); + + c.bench_function("hpc_parallel/serial_linear", |b| { + b.iter(|| { + let _ = run_parallel( + &atoms, + HpcParallelConfig { + mode: ExecutionMode::Serial, + partition: PartitionStrategy::Linear, + mp_batches: 2, + progress_every: 64, + }, + |chunk| Ok::(workload(chunk)), + ) + .expect("serial run should succeed"); + }); + }); + + c.bench_function("hpc_parallel/threaded_linear", |b| { + b.iter(|| { + let _ = run_parallel( + &atoms, + HpcParallelConfig { + mode: ExecutionMode::Threaded { num_threads: threaded }, + partition: PartitionStrategy::Linear, + mp_batches: 4, + progress_every: 64, + }, + |chunk| Ok::(workload(chunk)), + ) + .expect("threaded linear run should succeed"); + }); + }); + + c.bench_function("hpc_parallel/threaded_nested", |b| { + b.iter(|| { + let _ = run_parallel( + &atoms, + HpcParallelConfig { + mode: ExecutionMode::Threaded { num_threads: threaded }, + partition: PartitionStrategy::Nested, + mp_batches: 4, + progress_every: 64, + }, + |chunk| Ok::(workload(chunk)), + ) + .expect("threaded nested run should succeed"); + }); + }); +} + +criterion_group!(hpc_parallel_scaling, bench_hpc_parallel_scaling); +criterion_main!(hpc_parallel_scaling); diff --git a/crates/openquant/src/hpc_parallel.rs b/crates/openquant/src/hpc_parallel.rs new file mode 100644 index 0000000..b9fd56e --- /dev/null +++ b/crates/openquant/src/hpc_parallel.rs @@ -0,0 +1,457 @@ +//! AFML Chapter 20: multiprocessing and vectorization utilities. +//! +//! The core abstraction maps a collection of atoms into contiguous molecules and runs +//! user callbacks over those molecules in either serial or threaded mode. + +use std::fmt::{Display, Formatter}; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::{Duration, Instant}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ExecutionMode { + Serial, + Threaded { num_threads: usize }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PartitionStrategy { + /// Equal-size contiguous chunks. + Linear, + /// sqrt-spaced boundaries to balance workloads that get heavier with atom index. + Nested, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct HpcParallelConfig { + pub mode: ExecutionMode, + pub partition: PartitionStrategy, + /// Oversubscription factor, analogous to mpBatches in AFML. + pub mp_batches: usize, + /// Emit progress snapshot every N completed molecules. + pub progress_every: usize, +} + +impl Default for HpcParallelConfig { + fn default() -> Self { + Self { + mode: ExecutionMode::Threaded { num_threads: default_threads() }, + partition: PartitionStrategy::Linear, + mp_batches: 1, + progress_every: 1, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct MoleculePartition { + pub molecule_id: usize, + pub start: usize, + pub end: usize, +} + +impl MoleculePartition { + pub fn len(self) -> usize { + self.end.saturating_sub(self.start) + } + + pub fn is_empty(self) -> bool { + self.len() == 0 + } +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct ProgressSnapshot { + pub completed_molecules: usize, + pub total_molecules: usize, + pub elapsed: Duration, + pub throughput_molecules_per_sec: f64, + pub throughput_atoms_per_sec: f64, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct HpcParallelMetrics { + pub atoms_total: usize, + pub molecules_total: usize, + pub runtime: Duration, + pub throughput_atoms_per_sec: f64, + pub throughput_molecules_per_sec: f64, + /// max molecule size divided by mean molecule size. + pub partition_imbalance_ratio: f64, + pub progress: Vec, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct ParallelRunReport { + pub outputs: Vec, + pub metrics: HpcParallelMetrics, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum HpcParallelError { + InvalidConfig(&'static str), + CallbackFailed { molecule_id: usize, message: String }, + WorkerPanic, + ChannelClosed(&'static str), +} + +impl Display for HpcParallelError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidConfig(msg) => write!(f, "invalid HPC parallel config: {msg}"), + Self::CallbackFailed { molecule_id, message } => { + write!(f, "callback failed in molecule {molecule_id}: {message}") + } + Self::WorkerPanic => write!(f, "parallel worker panicked"), + Self::ChannelClosed(ctx) => write!(f, "channel unexpectedly closed while {ctx}"), + } + } +} + +impl std::error::Error for HpcParallelError {} + +#[derive(Debug)] +pub struct AsyncParallelHandle { + join_handle: JoinHandle<()>, + result_rx: mpsc::Receiver, HpcParallelError>>, +} + +impl AsyncParallelHandle { + pub fn is_finished(&self) -> bool { + self.join_handle.is_finished() + } + + pub fn wait(self) -> Result, HpcParallelError> { + if self.join_handle.join().is_err() { + return Err(HpcParallelError::WorkerPanic); + } + self.result_rx + .recv() + .map_err(|_| HpcParallelError::ChannelClosed("waiting for async coordinator result"))? + } +} + +pub fn partition_atoms( + atom_count: usize, + target_molecules: usize, + strategy: PartitionStrategy, +) -> Result, HpcParallelError> { + if atom_count == 0 { + return Ok(Vec::new()); + } + if target_molecules == 0 { + return Err(HpcParallelError::InvalidConfig("target_molecules must be > 0")); + } + let molecules = target_molecules.min(atom_count); + let mut boundaries = Vec::with_capacity(molecules + 1); + boundaries.push(0usize); + for i in 1..molecules { + let b = match strategy { + PartitionStrategy::Linear => i * atom_count / molecules, + PartitionStrategy::Nested => { + ((atom_count as f64) * (i as f64 / molecules as f64).sqrt()).round() as usize + } + }; + let last = *boundaries.last().unwrap_or(&0); + boundaries.push(b.clamp(last + 1, atom_count)); + } + boundaries.push(atom_count); + + let mut partitions = Vec::with_capacity(molecules); + for i in 0..molecules { + let start = boundaries[i]; + let end = boundaries[i + 1]; + if end > start { + partitions.push(MoleculePartition { molecule_id: partitions.len(), start, end }); + } + } + Ok(partitions) +} + +pub fn run_parallel( + atoms: &[A], + cfg: HpcParallelConfig, + callback: F, +) -> Result, HpcParallelError> +where + A: Sync, + R: Send, + F: Fn(&[A]) -> Result + Send + Sync, + E: Display, +{ + validate_config(cfg)?; + if atoms.is_empty() { + return Ok(ParallelRunReport { + outputs: Vec::new(), + metrics: HpcParallelMetrics { + atoms_total: 0, + molecules_total: 0, + runtime: Duration::ZERO, + throughput_atoms_per_sec: 0.0, + throughput_molecules_per_sec: 0.0, + partition_imbalance_ratio: 0.0, + progress: Vec::new(), + }, + }); + } + + let worker_count = match cfg.mode { + ExecutionMode::Serial => 1, + ExecutionMode::Threaded { num_threads } => num_threads, + }; + let target_molecules = worker_count.saturating_mul(cfg.mp_batches).max(1); + let partitions = partition_atoms(atoms.len(), target_molecules, cfg.partition)?; + let started = Instant::now(); + + let (outputs, progress) = match cfg.mode { + ExecutionMode::Serial => run_serial(atoms, &partitions, cfg.progress_every, &callback)?, + ExecutionMode::Threaded { num_threads } => { + run_threaded(atoms, &partitions, cfg.progress_every, num_threads, callback)? + } + }; + + Ok(ParallelRunReport { + outputs, + metrics: build_metrics(atoms.len(), &partitions, started.elapsed(), progress), + }) +} + +pub fn dispatch_async( + atoms: Vec, + cfg: HpcParallelConfig, + callback: F, +) -> AsyncParallelHandle +where + A: Send + Sync + 'static, + R: Send + 'static, + F: Fn(&[A]) -> Result + Send + Sync + 'static, + E: Display + Send + 'static, +{ + let (tx, rx) = mpsc::channel(); + let join_handle = thread::spawn(move || { + let report = run_parallel(&atoms, cfg, callback); + let _ = tx.send(report); + }); + AsyncParallelHandle { join_handle, result_rx: rx } +} + +fn run_serial( + atoms: &[A], + partitions: &[MoleculePartition], + progress_every: usize, + callback: &F, +) -> Result<(Vec, Vec), HpcParallelError> +where + F: Fn(&[A]) -> Result, + E: Display, +{ + let total = partitions.len(); + let started = Instant::now(); + let mut outputs = Vec::with_capacity(total); + let mut progress = Vec::new(); + let mut completed_atoms = 0usize; + + for part in partitions { + let out = callback(&atoms[part.start..part.end]).map_err(|err| { + HpcParallelError::CallbackFailed { + molecule_id: part.molecule_id, + message: err.to_string(), + } + })?; + outputs.push(out); + completed_atoms += part.len(); + maybe_record_progress( + &mut progress, + started.elapsed(), + outputs.len(), + total, + completed_atoms, + progress_every, + ); + } + Ok((outputs, progress)) +} + +fn run_threaded( + atoms: &[A], + partitions: &[MoleculePartition], + progress_every: usize, + num_threads: usize, + callback: F, +) -> Result<(Vec, Vec), HpcParallelError> +where + A: Sync, + R: Send, + F: Fn(&[A]) -> Result + Send + Sync, + E: Display, +{ + let started = Instant::now(); + let total = partitions.len(); + let callback = Arc::new(callback); + + let (job_tx, job_rx) = mpsc::channel::(); + let job_rx = Arc::new(Mutex::new(job_rx)); + let (result_tx, result_rx) = + mpsc::channel::<(MoleculePartition, Result)>(); + + thread::scope(|scope| { + let mut workers = Vec::with_capacity(num_threads); + for _ in 0..num_threads { + let rx = Arc::clone(&job_rx); + let tx = result_tx.clone(); + let cb = Arc::clone(&callback); + workers.push(scope.spawn(move || loop { + let next = { + let guard = rx.lock().expect("job receiver mutex should not be poisoned"); + guard.recv() + }; + let part = match next { + Ok(part) => part, + Err(_) => break, + }; + let res = cb(&atoms[part.start..part.end]).map_err(|err| { + HpcParallelError::CallbackFailed { + molecule_id: part.molecule_id, + message: err.to_string(), + } + }); + if tx.send((part, res)).is_err() { + break; + } + })); + } + drop(result_tx); + + for part in partitions { + if job_tx.send(*part).is_err() { + return Err(HpcParallelError::ChannelClosed("queueing jobs")); + } + } + drop(job_tx); + + let mut ordered: Vec> = (0..total).map(|_| None).collect(); + let mut progress = Vec::new(); + let mut first_error: Option = None; + let mut completed = 0usize; + let mut completed_atoms = 0usize; + for _ in 0..total { + let (part, outcome) = result_rx + .recv() + .map_err(|_| HpcParallelError::ChannelClosed("receiving worker results"))?; + completed += 1; + completed_atoms += part.len(); + match outcome { + Ok(out) => ordered[part.molecule_id] = Some(out), + Err(err) => { + if first_error.is_none() { + first_error = Some(err); + } + } + } + maybe_record_progress( + &mut progress, + started.elapsed(), + completed, + total, + completed_atoms, + progress_every, + ); + } + + for worker in workers { + if worker.join().is_err() { + return Err(HpcParallelError::WorkerPanic); + } + } + if let Some(err) = first_error { + return Err(err); + } + + let mut outputs = Vec::with_capacity(total); + for maybe in ordered { + outputs.push( + maybe + .ok_or(HpcParallelError::ChannelClosed("assembling ordered worker results"))?, + ); + } + Ok((outputs, progress)) + }) +} + +fn maybe_record_progress( + progress: &mut Vec, + elapsed: Duration, + completed_molecules: usize, + total_molecules: usize, + completed_atoms: usize, + progress_every: usize, +) { + let should_record = + completed_molecules == total_molecules || completed_molecules % progress_every == 0; + if !should_record { + return; + } + let seconds = elapsed.as_secs_f64(); + let throughput_molecules_per_sec = + if seconds > 0.0 { completed_molecules as f64 / seconds } else { 0.0 }; + let throughput_atoms_per_sec = + if seconds > 0.0 { completed_atoms as f64 / seconds } else { 0.0 }; + progress.push(ProgressSnapshot { + completed_molecules, + total_molecules, + elapsed, + throughput_molecules_per_sec, + throughput_atoms_per_sec, + }); +} + +fn build_metrics( + atom_count: usize, + partitions: &[MoleculePartition], + runtime: Duration, + progress: Vec, +) -> HpcParallelMetrics { + let molecules = partitions.len(); + let seconds = runtime.as_secs_f64(); + let throughput_atoms_per_sec = if seconds > 0.0 { atom_count as f64 / seconds } else { 0.0 }; + let throughput_molecules_per_sec = if seconds > 0.0 { molecules as f64 / seconds } else { 0.0 }; + + let sizes: Vec = partitions.iter().map(|p| p.len()).collect(); + let mean = if sizes.is_empty() { + 0.0 + } else { + sizes.iter().sum::() as f64 / sizes.len() as f64 + }; + let max = sizes.iter().copied().max().unwrap_or(0) as f64; + let partition_imbalance_ratio = if mean > 0.0 { max / mean } else { 0.0 }; + + HpcParallelMetrics { + atoms_total: atom_count, + molecules_total: molecules, + runtime, + throughput_atoms_per_sec, + throughput_molecules_per_sec, + partition_imbalance_ratio, + progress, + } +} + +fn validate_config(cfg: HpcParallelConfig) -> Result<(), HpcParallelError> { + if cfg.mp_batches == 0 { + return Err(HpcParallelError::InvalidConfig("mp_batches must be > 0")); + } + if cfg.progress_every == 0 { + return Err(HpcParallelError::InvalidConfig("progress_every must be > 0")); + } + if let ExecutionMode::Threaded { num_threads } = cfg.mode { + if num_threads == 0 { + return Err(HpcParallelError::InvalidConfig("num_threads must be > 0")); + } + } + Ok(()) +} + +fn default_threads() -> usize { + thread::available_parallelism().map_or(1, |n| n.get().max(1)) +} diff --git a/crates/openquant/src/lib.rs b/crates/openquant/src/lib.rs index a0cae40..4b2dd6c 100644 --- a/crates/openquant/src/lib.rs +++ b/crates/openquant/src/lib.rs @@ -13,6 +13,7 @@ pub mod filters; pub mod fingerprint; pub mod fracdiff; pub mod hcaa; +pub mod hpc_parallel; pub mod hrp; pub mod hyperparameter_tuning; pub mod labeling; diff --git a/crates/openquant/tests/hpc_parallel.rs b/crates/openquant/tests/hpc_parallel.rs new file mode 100644 index 0000000..855e78d --- /dev/null +++ b/crates/openquant/tests/hpc_parallel.rs @@ -0,0 +1,132 @@ +use openquant::hpc_parallel::{ + dispatch_async, partition_atoms, run_parallel, ExecutionMode, HpcParallelConfig, + HpcParallelError, PartitionStrategy, +}; + +fn config( + mode: ExecutionMode, + partition: PartitionStrategy, + mp_batches: usize, +) -> HpcParallelConfig { + HpcParallelConfig { mode, partition, mp_batches, progress_every: 1 } +} + +#[test] +fn linear_partition_covers_all_atoms_without_gaps() { + let parts = partition_atoms(25, 4, PartitionStrategy::Linear).expect("valid partition"); + assert_eq!(parts.len(), 4); + assert_eq!(parts.first().expect("non-empty").start, 0); + assert_eq!(parts.last().expect("non-empty").end, 25); + for w in parts.windows(2) { + assert_eq!(w[0].end, w[1].start); + } +} + +#[test] +fn nested_partition_biases_molecule_sizes() { + let parts = partition_atoms(100, 4, PartitionStrategy::Nested).expect("valid partition"); + assert_eq!(parts.len(), 4); + assert!(parts[0].len() > parts[3].len()); +} + +#[test] +fn serial_and_threaded_results_match() { + let atoms: Vec = (1..=512).collect(); + let serial = run_parallel( + &atoms, + config(ExecutionMode::Serial, PartitionStrategy::Linear, 8), + |chunk| Ok::(chunk.iter().copied().sum()), + ) + .expect("serial run"); + + let threaded = run_parallel( + &atoms, + config(ExecutionMode::Threaded { num_threads: 4 }, PartitionStrategy::Linear, 2), + |chunk| Ok::(chunk.iter().copied().sum()), + ) + .expect("threaded run"); + + assert_eq!(serial.outputs.iter().sum::(), threaded.outputs.iter().sum::()); + assert_eq!(threaded.metrics.molecules_total, 8); + assert_eq!( + threaded.metrics.progress.last().expect("progress snapshots").completed_molecules, + threaded.metrics.molecules_total + ); +} + +#[test] +fn mp_batches_controls_work_granularity() { + let atoms: Vec = (0..1000).collect(); + let low = run_parallel( + &atoms, + config(ExecutionMode::Threaded { num_threads: 4 }, PartitionStrategy::Linear, 1), + |chunk| Ok::(chunk.len()), + ) + .expect("low batches"); + let high = run_parallel( + &atoms, + config(ExecutionMode::Threaded { num_threads: 4 }, PartitionStrategy::Linear, 4), + |chunk| Ok::(chunk.len()), + ) + .expect("high batches"); + + assert_eq!(low.metrics.molecules_total, 4); + assert_eq!(high.metrics.molecules_total, 16); +} + +#[test] +fn async_dispatch_returns_report() { + let atoms: Vec = (0..2048).map(|v| v as f64).collect(); + let handle = dispatch_async( + atoms, + config(ExecutionMode::Threaded { num_threads: 4 }, PartitionStrategy::Nested, 2), + |chunk| { + let mut acc = 0.0; + for x in chunk { + acc += x.sin().abs(); + } + Ok::(acc) + }, + ); + let report = handle.wait().expect("async report"); + assert_eq!(report.metrics.molecules_total, 8); + assert_eq!(report.outputs.len(), 8); +} + +#[test] +fn callback_error_is_reported_with_molecule_context() { + let atoms: Vec = (0..40).collect(); + let err = run_parallel( + &atoms, + config(ExecutionMode::Serial, PartitionStrategy::Linear, 4), + |chunk| { + if chunk.iter().any(|v| *v >= 20) { + return Err("synthetic failure"); + } + Ok::(chunk.iter().sum()) + }, + ) + .expect_err("expected callback failure"); + + match err { + HpcParallelError::CallbackFailed { molecule_id, .. } => assert!(molecule_id >= 2), + other => panic!("unexpected error variant: {other:?}"), + } +} + +#[test] +fn invalid_config_rejected() { + let atoms = vec![1, 2, 3]; + let err = run_parallel( + &atoms, + HpcParallelConfig { + mode: ExecutionMode::Threaded { num_threads: 0 }, + partition: PartitionStrategy::Linear, + mp_batches: 1, + progress_every: 1, + }, + |_chunk| Ok::(1), + ) + .expect_err("invalid thread count"); + assert!(matches!(err, HpcParallelError::InvalidConfig(_))); +} diff --git a/docs-site/src/data/afmlDocsState.ts b/docs-site/src/data/afmlDocsState.ts index bac0324..559bc06 100644 --- a/docs-site/src/data/afmlDocsState.ts +++ b/docs-site/src/data/afmlDocsState.ts @@ -331,6 +331,20 @@ export const afmlDocsState = { "status": "done" } ] + }, + { + "chapter": "CHAPTER 20", + "theme": "Multiprocessing and vectorization", + "status": "done", + "chunkCount": 0, + "sections": [ + { + "id": "chapter-20-hpc_parallel", + "module": "hpc_parallel", + "slug": "hpc-parallel", + "status": "done" + } + ] } ] } as const; diff --git a/docs-site/src/data/moduleDocs.ts b/docs-site/src/data/moduleDocs.ts index 5475039..624a5f2 100644 --- a/docs-site/src/data/moduleDocs.ts +++ b/docs-site/src/data/moduleDocs.ts @@ -691,6 +691,49 @@ export const moduleDocs: ModuleDoc[] = [ "Use this module for strategy-level viability and probability-of-failure diagnostics; use `risk_metrics` for portfolio-tail and drawdown risk.", ], }, + { + slug: "hpc-parallel", + module: "hpc_parallel", + subject: "Scaling, HPC and Infrastructure", + summary: "AFML Chapter 20 atom/molecule execution utilities with serial/threaded modes and partition diagnostics.", + whyItExists: + "Research pipelines bottleneck on repeated independent computations; this module exposes reproducible partitioning and dispatch controls to scale those workloads safely.", + keyApis: [ + "partition_atoms", + "run_parallel", + "dispatch_async", + "ExecutionMode", + "PartitionStrategy", + "HpcParallelConfig", + "ParallelRunReport", + "HpcParallelMetrics", + ], + formulas: [ + { + label: "Linear Partition Boundary", + latex: "b_i=\\left\\lfloor\\frac{iN}{M}\\right\\rfloor,\\;i=0,\\dots,M", + }, + { + label: "Nested Partition Boundary", + latex: "b_i=\\left\\lfloor N\\sqrt{\\frac{i}{M}}\\right\\rfloor,\\;i=0,\\dots,M", + }, + { + label: "Throughput", + latex: "\\text{throughput}=\\frac{\\text{atoms processed}}{\\text{runtime seconds}}", + }, + ], + examples: [ + { + title: "Run atom->molecule callback in threaded mode", + language: "rust", + code: `use openquant::hpc_parallel::{run_parallel, ExecutionMode, HpcParallelConfig, PartitionStrategy};\n\nlet atoms: Vec = (0..10_000).map(|i| i as f64).collect();\nlet report = run_parallel(\n &atoms,\n HpcParallelConfig {\n mode: ExecutionMode::Threaded { num_threads: 8 },\n partition: PartitionStrategy::Nested,\n mp_batches: 4,\n progress_every: 4,\n },\n |chunk| Ok::(chunk.iter().map(|x| x.sqrt()).sum()),\n)?;\n\nprintln!(\"molecules={} atoms/s={:.0}\", report.metrics.molecules_total, report.metrics.throughput_atoms_per_sec);`, + }, + ], + notes: [ + "Use `ExecutionMode::Serial` for deterministic debugging with identical callback semantics.", + "If per-atom cost rises with atom index (e.g., expanding windows), nested partitioning can reduce tail stragglers versus linear chunking.", + ], + }, { slug: "sample-weights", module: "sample_weights",