From 9d69d80a99e01315dbadb7edc51dd3c15e6d6587 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 12 Mar 2026 20:51:57 +0100 Subject: [PATCH 1/8] Fuse pipeline-connected operator chains in SubgraphBuilder::build() During subgraph construction, detect maximal chains of pipeline-connected operators (single input/output, local, notify=false, no fan-in/fan-out) and replace each chain with a single ChainScheduler that schedules members sequentially. Intermediate pointstamps are hidden from the reachability tracker, reducing progress tracking overhead. Configurable via WorkerConfig::min_chain_length (default: 2, 0 disables). Co-Authored-By: Claude Opus 4.6 --- timely/src/progress/subgraph.rs | 386 +++++++++++++++++++++++++++++++- timely/src/worker.rs | 26 ++- timely/tests/chain_fusion.rs | 138 ++++++++++++ 3 files changed, 545 insertions(+), 5 deletions(-) create mode 100644 timely/tests/chain_fusion.rs diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index e11011368..8d34f9451 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -7,7 +7,7 @@ use std::rc::Rc; use std::cell::RefCell; -use std::collections::BinaryHeap; +use std::collections::{BinaryHeap, HashMap}; use std::cmp::Reverse; use crate::logging::TimelyLogger as Logger; @@ -16,14 +16,14 @@ use crate::logging::TimelySummaryLogger as SummaryLogger; use crate::scheduling::Schedule; use crate::scheduling::activate::Activations; -use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter}; +use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter, Antichain}; use crate::progress::{Timestamp, Operate, operate::SharedProgress}; use crate::progress::{Location, Port, Source, Target}; use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity}; use crate::progress::ChangeBatch; use crate::progress::broadcast::Progcaster; use crate::progress::reachability; -use crate::progress::timestamp::Refines; +use crate::progress::timestamp::{Refines, PathSummary}; use crate::worker::ProgressMode; @@ -166,12 +166,25 @@ where // Create empty child zero representative. self.children[0] = PerOperatorState::empty(outputs, inputs); + // Pipeline chain fusion: detect and fuse chains of pipeline-connected operators. + let min_chain_length = worker.config().min_chain_length; + if min_chain_length >= 2 { + let chains = detect_chains(&self.children, &self.edge_stash, min_chain_length); + for chain in chains { + fuse_chain::(&mut self.children, &mut self.edge_stash, &chain); + } + } + let mut builder = reachability::Builder::new(); // Child 0 has `inputs` outputs and `outputs` inputs, not yet connected. let summary = (0..outputs).map(|_| PortConnectivity::default()).collect(); builder.add_node(0, outputs, inputs, summary); for (index, child) in self.children.iter().enumerate().skip(1) { + // Skip tombstoned children (absorbed into chains). + if child.inputs == 0 && child.outputs == 0 && child.operator.is_none() { + continue; + } builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone()); } @@ -192,7 +205,13 @@ where let mut incomplete = vec![true; self.children.len()]; incomplete[0] = false; - let incomplete_count = incomplete.len() - 1; + // Tombstoned children are not incomplete. + for (i, child) in self.children.iter().enumerate().skip(1) { + if child.inputs == 0 && child.outputs == 0 && child.operator.is_none() { + incomplete[i] = false; + } + } + let incomplete_count = incomplete.iter().filter(|&&b| b).count(); let activations = worker.activations(); @@ -823,3 +842,362 @@ impl Drop for PerOperatorState { self.shut_down(); } } + +// --- Pipeline chain fusion --- + +/// Detects maximal fusible chains from the edge topology. +/// +/// A chain is a sequence of operators `[A, B, C, ...]` where each link satisfies: +/// * Both operators have exactly 1 input and 1 output +/// * A's sole output targets B's sole input (no fan-out) +/// * B's sole input comes from A's output (no fan-in) +/// * Both are `local == true` (pipeline edges only) +/// * Both have `notify == false` (no frontier observation) +/// +/// Returns chains of at least `min_length` operators, identified by index. +fn detect_chains( + children: &[PerOperatorState], + edge_stash: &[(Source, Target)], + min_length: usize, +) -> Vec> { + // Build forward map: source_node -> target_node (only for single-input/single-output pipeline edges). + // Build reverse map: target_node -> source_node. + let mut forward: HashMap = HashMap::new(); + let mut reverse: HashMap = HashMap::new(); + + // Count edges per source and target to detect fan-out/fan-in. + let mut source_edge_count: HashMap = HashMap::new(); + let mut target_edge_count: HashMap = HashMap::new(); + + for (source, target) in edge_stash.iter() { + *source_edge_count.entry(source.node).or_insert(0) += 1; + *target_edge_count.entry(target.node).or_insert(0) += 1; + } + + for (source, target) in edge_stash.iter() { + let src = source.node; + let tgt = target.node; + + // Skip child 0 (the subgraph boundary). + if src == 0 || tgt == 0 { continue; } + + let src_child = &children[src]; + let tgt_child = &children[tgt]; + + // Both must have exactly 1 input and 1 output. + if src_child.inputs != 1 || src_child.outputs != 1 { continue; } + if tgt_child.inputs != 1 || tgt_child.outputs != 1 { continue; } + + // Both must be local (pipeline edges) and not frontier-observing. + if !src_child.local || !tgt_child.local { continue; } + if src_child.notify || tgt_child.notify { continue; } + + // No fan-out from source, no fan-in to target. + if source_edge_count.get(&src) != Some(&1) { continue; } + if target_edge_count.get(&tgt) != Some(&1) { continue; } + + // Must have an operator (not already tombstoned). + if src_child.operator.is_none() || tgt_child.operator.is_none() { continue; } + + // Edge must connect port 0 -> port 0. + if source.port != 0 || target.port != 0 { continue; } + + forward.insert(src, tgt); + reverse.insert(tgt, src); + } + + // Walk chains starting from heads (nodes with no predecessor in a chain). + let mut visited = vec![false; children.len()]; + let mut chains = Vec::new(); + + for &head in forward.keys() { + // A head is a node that is not a successor in any chain link. + if reverse.contains_key(&head) { continue; } + if visited[head] { continue; } + + let mut chain = vec![head]; + visited[head] = true; + let mut current = head; + while let Some(&next) = forward.get(¤t) { + if visited[next] { break; } + chain.push(next); + visited[next] = true; + current = next; + } + + if chain.len() >= min_length { + chains.push(chain); + } + } + + chains +} + +/// Composes internal summaries for a chain of single-input/single-output operators. +/// +/// For chain `[A, B, C]`, the composed summary is: +/// `A.summary[0→0] followed_by B.summary[0→0] followed_by C.summary[0→0]` +fn compose_summaries( + summaries: &[&Connectivity], +) -> Connectivity { + if summaries.is_empty() { + return vec![PortConnectivity::default()]; + } + + // Start with the first operator's summary (input 0 -> output 0). + let mut result_antichain: Antichain = Antichain::new(); + + // Extract the first summary's 0→0 antichain. + if let Some(first) = summaries[0].get(0) { + if let Some(ac) = first.get(0) { + result_antichain = ac.clone(); + } + } + + // Compose with each subsequent operator's summary. + for summary in &summaries[1..] { + let next_antichain = summary.get(0).and_then(|pc| pc.get(0)); + let next_antichain = match next_antichain { + Some(ac) => ac, + None => { + // No path through this operator; result is empty. + result_antichain = Antichain::new(); + break; + } + }; + + let mut composed = Antichain::new(); + for a in result_antichain.elements().iter() { + for b in next_antichain.elements().iter() { + if let Some(c) = a.followed_by(b) { + composed.insert(c); + } + } + } + result_antichain = composed; + } + + let mut port_connectivity = PortConnectivity::default(); + for elem in result_antichain.elements().iter() { + port_connectivity.insert_ref(0, elem); + } + vec![port_connectivity] +} + +/// A member of a fused chain, holding the original operator and its progress handle. +struct ChainMember { + operator: Box, + shared_progress: Rc>>, +} + +/// Schedules a chain of pipeline-connected operators as a single unit. +/// +/// The chain presents as a single operator to the subgraph: it has the first +/// member's input and the last member's output. Intermediate progress is +/// hidden from the reachability tracker. +struct ChainScheduler { + name: String, + path: Vec, + /// Progress visible to the subgraph (first op's inputs, last op's outputs). + chain_progress: Rc>>, + /// Operators in execution order, with their individual SharedProgress handles. + members: Vec>, +} + +impl Schedule for ChainScheduler { + fn name(&self) -> &str { &self.name } + fn path(&self) -> &[usize] { &self.path } + + fn schedule(&mut self) -> bool { + let n = self.members.len(); + assert!(n > 0); + + // Step 1: Copy chain's input frontier to first member's frontier. + { + let mut chain_sp = self.chain_progress.borrow_mut(); + let mut first_sp = self.members[0].shared_progress.borrow_mut(); + for (port, frontier) in chain_sp.frontiers.iter_mut().enumerate() { + for (time, diff) in frontier.iter() { + first_sp.frontiers[port].update(time.clone(), *diff); + } + } + } + + // Step 2: Schedule each member in sequence, propagating frontiers forward. + let mut any_incomplete = false; + for i in 0..n { + let incomplete = self.members[i].operator.schedule(); + any_incomplete = any_incomplete || incomplete; + + // Propagate: current member's produceds become next member's frontiers + // (conceptually, records flow through the pipeline channel). + // The data actually flows through the ThreadPusher/ThreadPuller pipeline + // channels that already exist between the operators. We only need to + // propagate frontier information for notify-capable operators, but since + // we only fuse notify=false operators, we skip frontier propagation between + // members. The intermediate consumeds/produceds are simply discarded. + } + + // Step 3: Aggregate progress. + { + let mut chain_sp = self.chain_progress.borrow_mut(); + + // consumeds = first member's consumeds + let mut first_sp = self.members[0].shared_progress.borrow_mut(); + for (port, consumed) in first_sp.consumeds.iter_mut().enumerate() { + for (time, diff) in consumed.iter() { + chain_sp.consumeds[port].update(time.clone(), *diff); + } + } + drop(first_sp); + + // produceds = last member's produceds + let mut last_sp = self.members[n-1].shared_progress.borrow_mut(); + for (port, produced) in last_sp.produceds.iter_mut().enumerate() { + for (time, diff) in produced.iter() { + chain_sp.produceds[port].update(time.clone(), *diff); + } + } + drop(last_sp); + + // internals = aggregate from all members + for member in self.members.iter() { + let mut member_sp = member.shared_progress.borrow_mut(); + for (port, internal) in member_sp.internals.iter_mut().enumerate() { + for (time, diff) in internal.iter() { + // All internals map to the chain's single output (port 0). + chain_sp.internals[port].update(time.clone(), *diff); + } + } + } + } + + // Step 4: Clear all members' SharedProgress to prevent accumulation. + for member in self.members.iter() { + let mut sp = member.shared_progress.borrow_mut(); + for batch in sp.frontiers.iter_mut() { batch.clear(); } + for batch in sp.consumeds.iter_mut() { batch.clear(); } + for batch in sp.internals.iter_mut() { batch.clear(); } + for batch in sp.produceds.iter_mut() { batch.clear(); } + } + + // Clear the chain's own frontiers (consumed by step 1). + { + let mut chain_sp = self.chain_progress.borrow_mut(); + for batch in chain_sp.frontiers.iter_mut() { batch.clear(); } + } + + any_incomplete + } +} + +/// Fuses a detected chain into a single operator within `children`, rewriting `edge_stash`. +/// +/// The head of the chain retains its slot and becomes the fused operator. +/// All other chain members become tombstones. +fn fuse_chain( + children: &mut [PerOperatorState], + edge_stash: &mut Vec<(Source, Target)>, + chain: &[usize], +) { + assert!(chain.len() >= 2); + let head_idx = chain[0]; + let tail_idx = *chain.last().unwrap(); + + // Compose internal summaries. + let summary_refs: Vec<&Connectivity> = chain.iter() + .map(|&idx| &children[idx].internal_summary) + .collect(); + let composed_summary = compose_summaries::(&summary_refs); + + // Collect members by extracting operators and shared_progress from children. + let mut members = Vec::with_capacity(chain.len()); + let mut chain_name_parts = Vec::new(); + let mut head_path = Vec::new(); + + for (i, &idx) in chain.iter().enumerate() { + let child = &mut children[idx]; + let operator = child.operator.take().expect("chain member must have an operator"); + let shared_progress = Rc::clone(&child.shared_progress); + + if i == 0 { + head_path = operator.path().to_vec(); + } + chain_name_parts.push(child.name.clone()); + + members.push(ChainMember { + operator, + shared_progress, + }); + } + + let chain_name = format!("Chain[{}]", chain_name_parts.join(" -> ")); + + // Create the chain's SharedProgress: 1 input, 1 output (matching head's input, tail's output). + let chain_progress = Rc::new(RefCell::new(SharedProgress::new(1, 1))); + + // Transfer initial internal capabilities: the chain's initial internals should reflect + // the last member's initial capabilities (at the chain's output port). + // All members' internals need to be cleared so they don't pollute Subgraph::initialize(). + // However, for correctness, only the *last* member's internals matter (those are at the + // chain's output), and we move them to the chain's shared_progress. + { + let mut last_sp = members.last().unwrap().shared_progress.borrow_mut(); + let mut chain_sp = chain_progress.borrow_mut(); + for (port, internal) in last_sp.internals.iter_mut().enumerate() { + for (time, diff) in internal.iter() { + chain_sp.internals[port].update(time.clone(), *diff); + } + } + } + + // Clear all members' internals to prevent double-counting during initialize(). + for member in members.iter() { + let mut sp = member.shared_progress.borrow_mut(); + for batch in sp.internals.iter_mut() { batch.clear(); } + } + + let chain_scheduler: Box = Box::new(ChainScheduler { + name: chain_name.clone(), + path: head_path, + chain_progress: Rc::clone(&chain_progress), + members, + }); + + // Install the fused operator at the head slot. + let head = &mut children[head_idx]; + head.name = chain_name; + head.operator = Some(chain_scheduler); + head.shared_progress = chain_progress; + head.internal_summary = composed_summary; + // Head keeps inputs=1, outputs=1, local=true, notify=false. + + // Tombstone all other chain members. + for &idx in &chain[1..] { + let child = &mut children[idx]; + child.name = format!("Tombstone({})", child.name); + child.operator = None; + child.shared_progress = Rc::new(RefCell::new(SharedProgress::new(0, 0))); + child.edges = Vec::new(); + child.inputs = 0; + child.outputs = 0; + child.internal_summary = Vec::new(); + } + + // Rewrite edge_stash: + // 1. Remove internal chain edges (edges between consecutive chain members). + // 2. Rewrite edges from the tail's output to use the head's index as source. + let chain_set: std::collections::HashSet = chain.iter().cloned().collect(); + + edge_stash.retain(|(source, target)| { + // Remove edges internal to the chain. + !(chain_set.contains(&source.node) && chain_set.contains(&target.node)) + }); + + // Rewrite edges from tail's output to come from head's output instead. + for (source, _target) in edge_stash.iter_mut() { + if source.node == tail_idx { + source.node = head_idx; + } + } +} diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 70051214a..9e5b368fd 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -78,14 +78,29 @@ impl FromStr for ProgressMode { } /// Worker configuration. -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct Config { /// The progress mode to use. pub(crate) progress_mode: ProgressMode, + /// Minimum chain length for pipeline chain fusion. + /// + /// Chains of pipeline-connected operators shorter than this threshold + /// will not be fused. Set to 0 to disable fusion entirely. + pub(crate) min_chain_length: usize, /// A map from parameter name to typed parameter values. registry: HashMap>, } +impl Default for Config { + fn default() -> Self { + Config { + progress_mode: ProgressMode::default(), + min_chain_length: 2, + registry: HashMap::new(), + } + } +} + impl Config { /// Installs options into a [getopts::Options] struct that correspond /// to the parameters in the configuration. @@ -122,6 +137,15 @@ impl Config { self } + /// Sets the minimum chain length for pipeline chain fusion. + /// + /// Chains of pipeline-connected operators shorter than this threshold + /// will not be fused. Set to 0 to disable fusion entirely. Default: 2. + pub fn min_chain_length(mut self, min_chain_length: usize) -> Self { + self.min_chain_length = min_chain_length; + self + } + /// Sets a typed configuration parameter for the given `key`. /// /// It is recommended to install a single configuration struct using a key diff --git a/timely/tests/chain_fusion.rs b/timely/tests/chain_fusion.rs new file mode 100644 index 000000000..c5358f906 --- /dev/null +++ b/timely/tests/chain_fusion.rs @@ -0,0 +1,138 @@ +//! Tests for pipeline chain fusion. + +use std::sync::{Arc, Mutex}; +use timely::dataflow::operators::{ToStream, Inspect, Probe}; +use timely::dataflow::operators::vec::Map; + +/// Verifies that a chain of map operators produces correct output. +#[test] +fn chain_fusion_correctness() { + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + timely::execute_from_args(std::env::args(), move |worker| { + let result3 = Arc::clone(&result2); + worker.dataflow::(|scope| { + (0..10u64) + .to_stream(scope) + .map(|x| x + 1) + .map(|x| x * 2) + .map(|x| x + 10) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }); + }); + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + let expected: Vec = (0..10).map(|x| (x + 1) * 2 + 10).collect(); + assert_eq!(got, expected); +} + +/// Verifies that a longer chain of maps (5 operators) produces correct output. +#[test] +fn chain_fusion_long_chain() { + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + timely::execute_from_args(std::env::args(), move |worker| { + let result3 = Arc::clone(&result2); + worker.dataflow::(|scope| { + (0..5u64) + .to_stream(scope) + .map(|x| x + 1) + .map(|x| x * 2) + .map(|x| x + 3) + .map(|x| x * 4) + .map(|x| x + 5) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }); + }); + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + let expected: Vec = (0..5).map(|x| ((x + 1) * 2 + 3) * 4 + 5).collect(); + assert_eq!(got, expected); +} + +/// Verifies that fusion works with probe (which tests that the dataflow completes). +#[test] +fn chain_fusion_with_probe() { + timely::execute_from_args(std::env::args(), move |worker| { + let probe = worker.dataflow::(|scope| { + (0..100u64) + .to_stream(scope) + .map(|x| x + 1) + .map(|x| x * 2) + .map(|x| x + 10) + .probe() + .0 + }); + + worker.step_while(|| probe.less_than(&usize::MAX)); + }).unwrap(); +} + +/// Verifies that fusion is disabled when min_chain_length is 0. +#[test] +fn chain_fusion_disabled() { + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + let config = timely::Config { + communication: timely::CommunicationConfig::Thread, + worker: timely::WorkerConfig::default().min_chain_length(0), + }; + + timely::execute(config, move |worker| { + let result3 = Arc::clone(&result2); + worker.dataflow::(|scope| { + (0..10u64) + .to_stream(scope) + .map(|x| x + 1) + .map(|x| x * 2) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }); + }); + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + let expected: Vec = (0..10).map(|x| (x + 1) * 2).collect(); + assert_eq!(got, expected); +} + +/// Verifies that flat_map (single in/out, pipeline, notify=false) is also fused. +#[test] +fn chain_fusion_flat_map() { + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + timely::execute_from_args(std::env::args(), move |worker| { + let result3 = Arc::clone(&result2); + worker.dataflow::(|scope| { + (0..5u64) + .to_stream(scope) + .map(|x| x + 1) + .flat_map(|x| vec![x, x * 10]) + .map(|x| x + 100) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }); + }); + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + let mut expected: Vec = (0..5u64) + .map(|x| x + 1) + .flat_map(|x| vec![x, x * 10]) + .map(|x| x + 100) + .collect(); + expected.sort(); + assert_eq!(got, expected); +} From dcc55d7d4b0068746c982dfeef923584b9991635 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 12 Mar 2026 21:57:46 +0100 Subject: [PATCH 2/8] Fuse notify=true operators and add rounds parameter to event_driven MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lift the notify=false restriction from chain fusion so operators like inspect, probe, and unary_notify can be fused into chains. Frontier changes are propagated to notify=true members via sparse notify_frontiers (only allocated for members that need notification), avoiding O(N²) overhead for chains where few members observe frontiers. Tombstoned children are now added to the reachability builder with (0, 0) inputs/outputs to preserve index positions. The event_driven example gains a rounds parameter for benchmarking. Co-Authored-By: Claude Opus 4.6 --- timely/examples/event_driven.rs | 25 ++++++-- timely/src/progress/subgraph.rs | 101 +++++++++++++++++++++++++------- timely/tests/chain_fusion.rs | 98 +++++++++++++++++++++++++++++++ 3 files changed, 197 insertions(+), 27 deletions(-) diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index 7a392d54e..a28f0447c 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -8,12 +8,25 @@ fn main() { let timer = std::time::Instant::now(); - let mut args = std::env::args(); - args.next(); + // Collect positional arguments, skipping flags consumed by timely (-w, -n, -p, -h). + let positional: Vec = { + let mut pos = Vec::new(); + let mut args = std::env::args(); + args.next(); // skip binary name + while let Some(arg) = args.next() { + if arg.starts_with('-') { + args.next(); // skip flag value + } else { + pos.push(arg); + } + } + pos + }; - let dataflows = args.next().unwrap().parse::().unwrap(); - let length = args.next().unwrap().parse::().unwrap(); - let record = args.next() == Some("record".to_string()); + let dataflows = positional[0].parse::().unwrap(); + let length = positional[1].parse::().unwrap(); + let record = positional.get(2).map(|s| s.as_str()) == Some("record"); + let rounds: usize = positional.get(3).map(|s| s.parse().unwrap()).unwrap_or(usize::MAX); let mut inputs = Vec::new(); let mut probes = Vec::new(); @@ -37,7 +50,7 @@ fn main() { println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length); - for round in 0 .. { + for round in 0 .. rounds { let dataflow = round % dataflows; if record { inputs[dataflow].send(()); diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 8d34f9451..f02c78b65 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -181,10 +181,8 @@ where let summary = (0..outputs).map(|_| PortConnectivity::default()).collect(); builder.add_node(0, outputs, inputs, summary); for (index, child) in self.children.iter().enumerate().skip(1) { - // Skip tombstoned children (absorbed into chains). - if child.inputs == 0 && child.outputs == 0 && child.operator.is_none() { - continue; - } + // Tombstoned children are added with (0, 0) inputs/outputs and empty summary + // to preserve index positions in the reachability tracker. builder.add_node(index, child.inputs, child.outputs, child.internal_summary.clone()); } @@ -888,9 +886,8 @@ fn detect_chains( if src_child.inputs != 1 || src_child.outputs != 1 { continue; } if tgt_child.inputs != 1 || tgt_child.outputs != 1 { continue; } - // Both must be local (pipeline edges) and not frontier-observing. + // Both must be local (pipeline edges). if !src_child.local || !tgt_child.local { continue; } - if src_child.notify || tgt_child.notify { continue; } // No fan-out from source, no fan-in to target. if source_edge_count.get(&src) != Some(&1) { continue; } @@ -988,6 +985,7 @@ fn compose_summaries( struct ChainMember { operator: Box, shared_progress: Rc>>, + notify: bool, } /// Schedules a chain of pipeline-connected operators as a single unit. @@ -1002,6 +1000,13 @@ struct ChainScheduler { chain_progress: Rc>>, /// Operators in execution order, with their individual SharedProgress handles. members: Vec>, + /// Tracks the input frontier at notify=true members' input positions. + /// + /// Each entry `(member_index, frontier)` corresponds to a notify=true member. + /// The frontier reflects the accumulated capabilities of all upstream members + /// plus external frontier changes. With identity summaries, a capability at + /// timestamp `t` in member[i] contributes to all notify frontiers at positions j > i. + notify_frontiers: Vec<(usize, MutableAntichain)>, } impl Schedule for ChainScheduler { @@ -1012,30 +1017,52 @@ impl Schedule for ChainScheduler { let n = self.members.len(); assert!(n > 0); - // Step 1: Copy chain's input frontier to first member's frontier. + // Step 1: Copy chain's input frontier changes to first member + // and propagate to notify=true members' frontiers. { let mut chain_sp = self.chain_progress.borrow_mut(); let mut first_sp = self.members[0].shared_progress.borrow_mut(); for (port, frontier) in chain_sp.frontiers.iter_mut().enumerate() { for (time, diff) in frontier.iter() { first_sp.frontiers[port].update(time.clone(), *diff); + // Propagate external frontier changes to notify frontiers. + for &mut (j, ref mut nf) in self.notify_frontiers.iter_mut() { + let frontier_changes = nf.update_iter(Some((time.clone(), *diff))); + let mut downstream_sp = self.members[j].shared_progress.borrow_mut(); + for (t, d) in frontier_changes { + downstream_sp.frontiers[port].update(t, d); + } + } } } } - // Step 2: Schedule each member in sequence, propagating frontiers forward. + // Step 2: Schedule each member in sequence. + // For chains with notify=true members, propagate capability changes + // to downstream notify frontiers after each member runs. let mut any_incomplete = false; for i in 0..n { let incomplete = self.members[i].operator.schedule(); any_incomplete = any_incomplete || incomplete; - // Propagate: current member's produceds become next member's frontiers - // (conceptually, records flow through the pipeline channel). - // The data actually flows through the ThreadPusher/ThreadPuller pipeline - // channels that already exist between the operators. We only need to - // propagate frontier information for notify-capable operators, but since - // we only fuse notify=false operators, we skip frontier propagation between - // members. The intermediate consumeds/produceds are simply discarded. + // Propagate capability changes to downstream notify members. + if !self.notify_frontiers.is_empty() { + let mut member_sp = self.members[i].shared_progress.borrow_mut(); + for internal in member_sp.internals.iter_mut() { + for (time, diff) in internal.iter() { + // Only update notify frontiers at positions j > i. + for &mut (j, ref mut nf) in self.notify_frontiers.iter_mut() { + if j <= i { continue; } + let frontier_changes = nf.update_iter(Some((time.clone(), *diff))); + let mut downstream_sp = self.members[j].shared_progress.borrow_mut(); + for (t, d) in frontier_changes { + downstream_sp.frontiers[0].update(t, d); + } + } + } + } + drop(member_sp); + } } // Step 3: Aggregate progress. @@ -1115,10 +1142,13 @@ fn fuse_chain( let mut chain_name_parts = Vec::new(); let mut head_path = Vec::new(); + let mut has_notify = false; for (i, &idx) in chain.iter().enumerate() { let child = &mut children[idx]; let operator = child.operator.take().expect("chain member must have an operator"); let shared_progress = Rc::clone(&child.shared_progress); + let notify = child.notify; + has_notify = has_notify || notify; if i == 0 { head_path = operator.path().to_vec(); @@ -1128,6 +1158,7 @@ fn fuse_chain( members.push(ChainMember { operator, shared_progress, + notify, }); } @@ -1136,11 +1167,18 @@ fn fuse_chain( // Create the chain's SharedProgress: 1 input, 1 output (matching head's input, tail's output). let chain_progress = Rc::new(RefCell::new(SharedProgress::new(1, 1))); - // Transfer initial internal capabilities: the chain's initial internals should reflect - // the last member's initial capabilities (at the chain's output port). - // All members' internals need to be cleared so they don't pollute Subgraph::initialize(). - // However, for correctness, only the *last* member's internals matter (those are at the - // chain's output), and we move them to the chain's shared_progress. + // Transfer initial internal capabilities from all members to chain_progress. + // Each member has initial capabilities at T::minimum() (added during initialize()). + // The chain's output capabilities = last member's capabilities. + // But all members' internals must be accounted for in the chain's internals, + // because dropping a capability in any member produces a (-1) internal change + // that the chain must report. + // + // For the chain's initial internals reported to the subgraph, we only report + // the *last* member's initial capabilities (those are at the chain's output port). + // The intermediate members' capabilities are internal to the chain and cancel out: + // each intermediate member's capability implies it can produce data, which the next + // member will consume — but this is hidden from the reachability tracker. { let mut last_sp = members.last().unwrap().shared_progress.borrow_mut(); let mut chain_sp = chain_progress.borrow_mut(); @@ -1151,6 +1189,26 @@ fn fuse_chain( } } + // Initialize notify frontiers for frontier propagation to notify=true members. + // Each notify frontier tracks accumulated capabilities from upstream members + // plus external frontier changes. Initialized with upstream members' initial caps. + let n = members.len(); + let mut notify_frontiers = Vec::new(); + for j in 1..n { + if !members[j].notify { continue; } + let mut frontier = MutableAntichain::new(); + // Each upstream member [0..j) contributes initial capabilities. + for i in 0..j { + let mut sp = members[i].shared_progress.borrow_mut(); + for internal in sp.internals.iter_mut() { + for (time, diff) in internal.iter() { + frontier.update_iter(Some((time.clone(), *diff))); + } + } + } + notify_frontiers.push((j, frontier)); + } + // Clear all members' internals to prevent double-counting during initialize(). for member in members.iter() { let mut sp = member.shared_progress.borrow_mut(); @@ -1162,6 +1220,7 @@ fn fuse_chain( path: head_path, chain_progress: Rc::clone(&chain_progress), members, + notify_frontiers, }); // Install the fused operator at the head slot. @@ -1170,7 +1229,7 @@ fn fuse_chain( head.operator = Some(chain_scheduler); head.shared_progress = chain_progress; head.internal_summary = composed_summary; - // Head keeps inputs=1, outputs=1, local=true, notify=false. + head.notify = has_notify; // Tombstone all other chain members. for &idx in &chain[1..] { diff --git a/timely/tests/chain_fusion.rs b/timely/tests/chain_fusion.rs index c5358f906..e65c398e0 100644 --- a/timely/tests/chain_fusion.rs +++ b/timely/tests/chain_fusion.rs @@ -106,6 +106,104 @@ fn chain_fusion_disabled() { assert_eq!(got, expected); } +/// Verifies that fusion works with notify=true operators (inspect uses unary_frontier). +/// This test drives multiple rounds to exercise frontier propagation within the fused chain. +#[test] +fn chain_fusion_notify_operator() { + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + timely::execute_from_args(std::env::args(), move |worker| { + let result3 = Arc::clone(&result2); + let (mut input, probe) = worker.dataflow::(|scope| { + use timely::dataflow::operators::Input; + let (input, stream) = scope.new_input(); + let probe = stream + .map(|x: u64| x + 1) + .map(|x| x * 2) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }) + .probe() + .0; + (input, probe) + }); + + for round in 0..5usize { + input.send(round as u64); + input.advance_to(round + 1); + worker.step_while(|| probe.less_than(&(round + 1))); + } + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + let expected: Vec = (0..5).map(|x: u64| (x + 1) * 2).collect(); + assert_eq!(got, expected); +} + +/// Verifies that fusion works with a unary_notify operator that buffers data +/// and emits on frontier notification. This exercises frontier propagation within +/// the fused chain. +#[test] +fn chain_fusion_unary_notify() { + use timely::dataflow::channels::pact::Pipeline; + use timely::dataflow::operators::generic::operator::Operator; + + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + timely::execute_from_args(std::env::args(), move |worker| { + let result3 = Arc::clone(&result2); + let (mut input, probe) = worker.dataflow::(|scope| { + use timely::dataflow::operators::Input; + let (input, stream) = scope.new_input(); + let probe = stream + .map(|x: u64| x + 1) + // A unary_notify operator that buffers data and emits on notification. + // This is a notify=true operator with 1 input and 1 output. + .unary_notify(Pipeline, "Buffer", vec![], { + let mut stash: std::collections::HashMap> = std::collections::HashMap::new(); + move |input, output, notificator| { + input.for_each(|time, data| { + stash.entry(time.time().clone()) + .or_insert_with(Vec::new) + .extend(data.drain(..)); + notificator.notify_at(time.retain(0)); + }); + notificator.for_each(|time, _count, _notify| { + if let Some(data) = stash.remove(time.time()) { + let mut session = output.session(&time); + for datum in data { + session.give(datum); + } + } + }); + } + }) + .map(|x: u64| x * 10) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }) + .probe() + .0; + (input, probe) + }); + + for round in 0..5usize { + input.send(round as u64); + input.advance_to(round + 1); + worker.step_while(|| probe.less_than(&(round + 1))); + } + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + // Each value: (x + 1) buffered by unary_notify, then * 10 + let expected: Vec = (0..5).map(|x: u64| (x + 1) * 10).collect(); + assert_eq!(got, expected); +} + /// Verifies that flat_map (single in/out, pipeline, notify=false) is also fused. #[test] fn chain_fusion_flat_map() { From 91534e6499f967e5f1d271656f15e65537427e78 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 12 Mar 2026 22:34:50 +0100 Subject: [PATCH 3/8] Restrict chain fusion to notify=false operators with identity summaries Revert V2 notify=true fusion which had a correctness bug: frontier propagation within fused chains did not correctly track capabilities for notify=true operators, causing iterative computations (e.g., differential-dataflow's reduce) to fail to converge. Add identity summary check to exclude operators with non-trivial internal summaries (e.g., feedback operators with Product(0, 1)) from chains, even though they otherwise meet all fusion criteria. Validated against differential-dataflow's full test suite. Co-Authored-By: Claude Opus 4.6 --- timely/src/progress/subgraph.rs | 107 +++++++++----------------------- 1 file changed, 29 insertions(+), 78 deletions(-) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index f02c78b65..054a3fe92 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -889,6 +889,11 @@ fn detect_chains( // Both must be local (pipeline edges). if !src_child.local || !tgt_child.local { continue; } + // Both must not require notifications. + // Fusing notify=true operators requires propagating frontier changes + // to each member within the chain, which is not yet implemented correctly. + if src_child.notify || tgt_child.notify { continue; } + // No fan-out from source, no fan-in to target. if source_edge_count.get(&src) != Some(&1) { continue; } if target_edge_count.get(&tgt) != Some(&1) { continue; } @@ -899,6 +904,18 @@ fn detect_chains( // Edge must connect port 0 -> port 0. if source.port != 0 || target.port != 0 { continue; } + // Both must have identity internal summaries. + // Non-identity summaries (e.g., feedback operators with iteration steps) + // require per-member timestamp transformation that fusion does not support. + fn has_identity_summary(child: &PerOperatorState) -> bool { + if child.internal_summary.len() != 1 { return false; } + match child.internal_summary[0].get(0) { + Some(ac) => ac.len() == 1 && ac.elements()[0] == Default::default(), + None => false, + } + } + if !has_identity_summary(src_child) || !has_identity_summary(tgt_child) { continue; } + forward.insert(src, tgt); reverse.insert(tgt, src); } @@ -985,7 +1002,6 @@ fn compose_summaries( struct ChainMember { operator: Box, shared_progress: Rc>>, - notify: bool, } /// Schedules a chain of pipeline-connected operators as a single unit. @@ -993,6 +1009,11 @@ struct ChainMember { /// The chain presents as a single operator to the subgraph: it has the first /// member's input and the last member's output. Intermediate progress is /// hidden from the reachability tracker. +/// +/// Only `notify=false` operators with identity internal summaries are fused. +/// This means members don't observe frontiers, so we only need to propagate +/// the chain's external frontier to the first member and aggregate progress +/// from all members. struct ChainScheduler { name: String, path: Vec, @@ -1000,13 +1021,6 @@ struct ChainScheduler { chain_progress: Rc>>, /// Operators in execution order, with their individual SharedProgress handles. members: Vec>, - /// Tracks the input frontier at notify=true members' input positions. - /// - /// Each entry `(member_index, frontier)` corresponds to a notify=true member. - /// The frontier reflects the accumulated capabilities of all upstream members - /// plus external frontier changes. With identity summaries, a capability at - /// timestamp `t` in member[i] contributes to all notify frontiers at positions j > i. - notify_frontiers: Vec<(usize, MutableAntichain)>, } impl Schedule for ChainScheduler { @@ -1017,52 +1031,22 @@ impl Schedule for ChainScheduler { let n = self.members.len(); assert!(n > 0); - // Step 1: Copy chain's input frontier changes to first member - // and propagate to notify=true members' frontiers. + // Step 1: Copy chain's input frontier changes to first member. { let mut chain_sp = self.chain_progress.borrow_mut(); let mut first_sp = self.members[0].shared_progress.borrow_mut(); for (port, frontier) in chain_sp.frontiers.iter_mut().enumerate() { for (time, diff) in frontier.iter() { first_sp.frontiers[port].update(time.clone(), *diff); - // Propagate external frontier changes to notify frontiers. - for &mut (j, ref mut nf) in self.notify_frontiers.iter_mut() { - let frontier_changes = nf.update_iter(Some((time.clone(), *diff))); - let mut downstream_sp = self.members[j].shared_progress.borrow_mut(); - for (t, d) in frontier_changes { - downstream_sp.frontiers[port].update(t, d); - } - } } } } // Step 2: Schedule each member in sequence. - // For chains with notify=true members, propagate capability changes - // to downstream notify frontiers after each member runs. let mut any_incomplete = false; for i in 0..n { let incomplete = self.members[i].operator.schedule(); any_incomplete = any_incomplete || incomplete; - - // Propagate capability changes to downstream notify members. - if !self.notify_frontiers.is_empty() { - let mut member_sp = self.members[i].shared_progress.borrow_mut(); - for internal in member_sp.internals.iter_mut() { - for (time, diff) in internal.iter() { - // Only update notify frontiers at positions j > i. - for &mut (j, ref mut nf) in self.notify_frontiers.iter_mut() { - if j <= i { continue; } - let frontier_changes = nf.update_iter(Some((time.clone(), *diff))); - let mut downstream_sp = self.members[j].shared_progress.borrow_mut(); - for (t, d) in frontier_changes { - downstream_sp.frontiers[0].update(t, d); - } - } - } - } - drop(member_sp); - } } // Step 3: Aggregate progress. @@ -1092,7 +1076,6 @@ impl Schedule for ChainScheduler { let mut member_sp = member.shared_progress.borrow_mut(); for (port, internal) in member_sp.internals.iter_mut().enumerate() { for (time, diff) in internal.iter() { - // All internals map to the chain's single output (port 0). chain_sp.internals[port].update(time.clone(), *diff); } } @@ -1142,13 +1125,10 @@ fn fuse_chain( let mut chain_name_parts = Vec::new(); let mut head_path = Vec::new(); - let mut has_notify = false; for (i, &idx) in chain.iter().enumerate() { let child = &mut children[idx]; let operator = child.operator.take().expect("chain member must have an operator"); let shared_progress = Rc::clone(&child.shared_progress); - let notify = child.notify; - has_notify = has_notify || notify; if i == 0 { head_path = operator.path().to_vec(); @@ -1158,7 +1138,6 @@ fn fuse_chain( members.push(ChainMember { operator, shared_progress, - notify, }); } @@ -1167,18 +1146,10 @@ fn fuse_chain( // Create the chain's SharedProgress: 1 input, 1 output (matching head's input, tail's output). let chain_progress = Rc::new(RefCell::new(SharedProgress::new(1, 1))); - // Transfer initial internal capabilities from all members to chain_progress. - // Each member has initial capabilities at T::minimum() (added during initialize()). - // The chain's output capabilities = last member's capabilities. - // But all members' internals must be accounted for in the chain's internals, - // because dropping a capability in any member produces a (-1) internal change - // that the chain must report. - // - // For the chain's initial internals reported to the subgraph, we only report - // the *last* member's initial capabilities (those are at the chain's output port). - // The intermediate members' capabilities are internal to the chain and cancel out: - // each intermediate member's capability implies it can produce data, which the next - // member will consume — but this is hidden from the reachability tracker. + // Transfer initial internal capabilities from the last member to chain_progress. + // Only the last member's initial capabilities are visible at the chain's output. + // Intermediate members' capabilities are internal to the chain: their +1/-1 + // deltas flow through the chain's aggregate internals in Step 3. { let mut last_sp = members.last().unwrap().shared_progress.borrow_mut(); let mut chain_sp = chain_progress.borrow_mut(); @@ -1189,26 +1160,6 @@ fn fuse_chain( } } - // Initialize notify frontiers for frontier propagation to notify=true members. - // Each notify frontier tracks accumulated capabilities from upstream members - // plus external frontier changes. Initialized with upstream members' initial caps. - let n = members.len(); - let mut notify_frontiers = Vec::new(); - for j in 1..n { - if !members[j].notify { continue; } - let mut frontier = MutableAntichain::new(); - // Each upstream member [0..j) contributes initial capabilities. - for i in 0..j { - let mut sp = members[i].shared_progress.borrow_mut(); - for internal in sp.internals.iter_mut() { - for (time, diff) in internal.iter() { - frontier.update_iter(Some((time.clone(), *diff))); - } - } - } - notify_frontiers.push((j, frontier)); - } - // Clear all members' internals to prevent double-counting during initialize(). for member in members.iter() { let mut sp = member.shared_progress.borrow_mut(); @@ -1220,7 +1171,6 @@ fn fuse_chain( path: head_path, chain_progress: Rc::clone(&chain_progress), members, - notify_frontiers, }); // Install the fused operator at the head slot. @@ -1229,7 +1179,8 @@ fn fuse_chain( head.operator = Some(chain_scheduler); head.shared_progress = chain_progress; head.internal_summary = composed_summary; - head.notify = has_notify; + // All chain members are notify=false (enforced by detect_chains). + head.notify = false; // Tombstone all other chain members. for &idx in &chain[1..] { From 6c20d1828807716fd94b0b8caa26b2c3d3830166 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 12 Mar 2026 23:20:43 +0100 Subject: [PATCH 4/8] Fix initial capability accounting and remove notify=true scaffolding Report ALL chain members' initial +peers capabilities to the chain's SharedProgress, not just the last member's. Each member independently drops its capability during execution, so all N members' initial caps must be visible to the reachability tracker. Remove dead notify=true fusion scaffolding (notify_frontiers, frontier propagation in schedule()) since detect_chains excludes notify=true operators. See commit message of 214e57fe for rationale. Co-Authored-By: Claude Opus 4.6 --- timely/src/progress/subgraph.rs | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 054a3fe92..984f6a753 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -890,8 +890,6 @@ fn detect_chains( if !src_child.local || !tgt_child.local { continue; } // Both must not require notifications. - // Fusing notify=true operators requires propagating frontier changes - // to each member within the chain, which is not yet implemented correctly. if src_child.notify || tgt_child.notify { continue; } // No fan-out from source, no fan-in to target. @@ -1009,11 +1007,6 @@ struct ChainMember { /// The chain presents as a single operator to the subgraph: it has the first /// member's input and the last member's output. Intermediate progress is /// hidden from the reachability tracker. -/// -/// Only `notify=false` operators with identity internal summaries are fused. -/// This means members don't observe frontiers, so we only need to propagate -/// the chain's external frontier to the first member and aggregate progress -/// from all members. struct ChainScheduler { name: String, path: Vec, @@ -1146,16 +1139,21 @@ fn fuse_chain( // Create the chain's SharedProgress: 1 input, 1 output (matching head's input, tail's output). let chain_progress = Rc::new(RefCell::new(SharedProgress::new(1, 1))); - // Transfer initial internal capabilities from the last member to chain_progress. - // Only the last member's initial capabilities are visible at the chain's output. - // Intermediate members' capabilities are internal to the chain: their +1/-1 - // deltas flow through the chain's aggregate internals in Step 3. + // Transfer initial internal capabilities from ALL members to chain_progress. + // + // Each member declared +peers capabilities at T::minimum() during initialize(). + // All N members' capabilities must be visible to the reachability tracker at the + // chain's single output port, because each member independently drops its + // capability during execution. If we only reported one member's initial caps, + // the tracker would go negative after N members drop their capabilities. { - let mut last_sp = members.last().unwrap().shared_progress.borrow_mut(); let mut chain_sp = chain_progress.borrow_mut(); - for (port, internal) in last_sp.internals.iter_mut().enumerate() { - for (time, diff) in internal.iter() { - chain_sp.internals[port].update(time.clone(), *diff); + for member in members.iter() { + let mut member_sp = member.shared_progress.borrow_mut(); + for (port, internal) in member_sp.internals.iter_mut().enumerate() { + for (time, diff) in internal.iter() { + chain_sp.internals[port].update(time.clone(), *diff); + } } } } @@ -1179,8 +1177,6 @@ fn fuse_chain( head.operator = Some(chain_scheduler); head.shared_progress = chain_progress; head.internal_summary = composed_summary; - // All chain members are notify=false (enforced by detect_chains). - head.notify = false; // Tombstone all other chain members. for &idx in &chain[1..] { From 9a54ecb5877ad0fbf35595c1646bf739aa531865 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 12 Mar 2026 23:25:05 +0100 Subject: [PATCH 5/8] Rename min_chain_length to fuse_chain_length and expose via getopts Wire fuse_chain_length into install_options/from_matches so it can be set from the command line via --fuse-chain-length N. Co-Authored-By: Claude Opus 4.6 --- timely/src/progress/subgraph.rs | 6 +++--- timely/src/worker.rs | 20 ++++++++++++-------- timely/tests/chain_fusion.rs | 4 ++-- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 984f6a753..6fda3bd62 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -167,9 +167,9 @@ where self.children[0] = PerOperatorState::empty(outputs, inputs); // Pipeline chain fusion: detect and fuse chains of pipeline-connected operators. - let min_chain_length = worker.config().min_chain_length; - if min_chain_length >= 2 { - let chains = detect_chains(&self.children, &self.edge_stash, min_chain_length); + let fuse_chain_length = worker.config().fuse_chain_length; + if fuse_chain_length >= 2 { + let chains = detect_chains(&self.children, &self.edge_stash, fuse_chain_length); for chain in chains { fuse_chain::(&mut self.children, &mut self.edge_stash, &chain); } diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 9e5b368fd..bf8f531e5 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -82,11 +82,11 @@ impl FromStr for ProgressMode { pub struct Config { /// The progress mode to use. pub(crate) progress_mode: ProgressMode, - /// Minimum chain length for pipeline chain fusion. + /// Minimum chain length for pipeline chain fusion (default: 2). /// /// Chains of pipeline-connected operators shorter than this threshold /// will not be fused. Set to 0 to disable fusion entirely. - pub(crate) min_chain_length: usize, + pub(crate) fuse_chain_length: usize, /// A map from parameter name to typed parameter values. registry: HashMap>, } @@ -95,7 +95,7 @@ impl Default for Config { fn default() -> Self { Config { progress_mode: ProgressMode::default(), - min_chain_length: 2, + fuse_chain_length: 2, registry: HashMap::new(), } } @@ -114,6 +114,7 @@ impl Config { #[cfg(feature = "getopts")] pub fn install_options(opts: &mut getopts::Options) { opts.optopt("", "progress-mode", "progress tracking mode (eager or demand)", "MODE"); + opts.optopt("", "fuse-chain-length", "minimum chain length for pipeline fusion (0 disables, default: 2)", "N"); } /// Instantiates a configuration based upon the parsed options in `matches`. @@ -128,7 +129,10 @@ impl Config { pub fn from_matches(matches: &getopts::Matches) -> Result { let progress_mode = matches .opt_get_default("progress-mode", ProgressMode::Demand)?; - Ok(Config::default().progress_mode(progress_mode)) + let fuse_chain_length: usize = matches + .opt_get_default("fuse-chain-length", 2) + .map_err(|e: std::num::ParseIntError| e.to_string())?; + Ok(Config::default().progress_mode(progress_mode).fuse_chain_length(fuse_chain_length)) } /// Sets the progress mode to `progress_mode`. @@ -137,12 +141,12 @@ impl Config { self } - /// Sets the minimum chain length for pipeline chain fusion. + /// Sets the minimum chain length for pipeline chain fusion (default: 2). /// /// Chains of pipeline-connected operators shorter than this threshold - /// will not be fused. Set to 0 to disable fusion entirely. Default: 2. - pub fn min_chain_length(mut self, min_chain_length: usize) -> Self { - self.min_chain_length = min_chain_length; + /// will not be fused. Set to 0 to disable fusion entirely. + pub fn fuse_chain_length(mut self, fuse_chain_length: usize) -> Self { + self.fuse_chain_length = fuse_chain_length; self } diff --git a/timely/tests/chain_fusion.rs b/timely/tests/chain_fusion.rs index e65c398e0..d04eb0629 100644 --- a/timely/tests/chain_fusion.rs +++ b/timely/tests/chain_fusion.rs @@ -76,7 +76,7 @@ fn chain_fusion_with_probe() { }).unwrap(); } -/// Verifies that fusion is disabled when min_chain_length is 0. +/// Verifies that fusion is disabled when fuse_chain_length is 0. #[test] fn chain_fusion_disabled() { let result = Arc::new(Mutex::new(Vec::new())); @@ -84,7 +84,7 @@ fn chain_fusion_disabled() { let config = timely::Config { communication: timely::CommunicationConfig::Thread, - worker: timely::WorkerConfig::default().min_chain_length(0), + worker: timely::WorkerConfig::default().fuse_chain_length(0), }; timely::execute(config, move |worker| { From 6fe6deae315a6ea7bb4e3cb3a0801b25efc1aff5 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 13 Mar 2026 08:41:49 +0100 Subject: [PATCH 6/8] Check for pipeline pact instead of local in chain detection The `local` field on operators indicates whether progress information is pre-circulated, not whether data flows through thread-local channels. All regular operators return `local = true` regardless of their pact, so the previous check didn't actually verify pipeline connectivity. Add `is_pipeline()` to `ParallelizationContract` (default: false, overridden to true for `Pipeline`), track it in `OperatorShape`, and expose it through `Operate::pipeline()`. Chain detection checks the target operator's `pipeline` in each link; the head's input pact is irrelevant since it receives data from outside the chain. Co-Authored-By: Claude Opus 4.6 --- timely/src/dataflow/channels/pact.rs | 3 +++ .../src/dataflow/operators/generic/builder_raw.rs | 4 ++++ timely/src/progress/operate.rs | 8 ++++++++ timely/src/progress/subgraph.rs | 13 +++++++++---- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 296e87a8f..5499ba956 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -25,6 +25,8 @@ pub trait ParallelizationContract { type Puller: Pull>+'static; /// Allocates a matched pair of push and pull endpoints implementing the pact. fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller); + /// Indicates whether this pact uses a thread-local channel (no inter-worker exchange). + fn is_pipeline(&self) -> bool { false } } /// A direct connection @@ -34,6 +36,7 @@ pub struct Pipeline; impl ParallelizationContract for Pipeline { type Pusher = LogPusher>>; type Puller = LogPuller>>; + fn is_pipeline(&self) -> bool { true } fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { let (pusher, puller) = allocator.pipeline::>(identifier, address); (LogPusher::new(pusher, allocator.index(), allocator.index(), identifier, logging.clone()), diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 7839241bc..52a228f2e 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -27,6 +27,7 @@ pub struct OperatorShape { peers: usize, // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude. inputs: usize, // The number of input ports. outputs: usize, // The number of output ports. + pipeline: bool, // Whether all inputs use Pipeline pact (thread-local channels). } /// Core data for the structure of an operator, minus scope and logic. @@ -38,6 +39,7 @@ impl OperatorShape { peers, inputs: 0, outputs: 0, + pipeline: true, } } @@ -110,6 +112,7 @@ impl OperatorBuilder { { let channel_id = self.scope.new_identifier(); let logging = self.scope.logging(); + if !pact.is_pipeline() { self.shape.pipeline = false; } let (sender, receiver) = pact.connect(&mut self.scope, channel_id, Rc::clone(&self.address), logging); let target = Target::new(self.index, self.shape.inputs); stream.connect_to(target, sender, channel_id); @@ -224,4 +227,5 @@ where } fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify } + fn pipeline(&self) -> bool { self.shape.pipeline } } diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 6c3f12955..92dbf8017 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -61,6 +61,14 @@ pub trait Operate { /// frontier changes on that input should cause the operator to be scheduled. The conservative /// default is `Always` for each input. fn notify_me(&self) -> &[FrontierInterest];// { &vec![FrontierInterest::Always; self.inputs()] } + + /// Indicates whether all inputs use thread-local (pipeline) channels. + /// + /// Operators with pipeline inputs receive data through thread-local channels, + /// meaning data pushed by an upstream operator on the same worker is immediately + /// available. This property is used by chain fusion to determine whether + /// consecutive operators can be scheduled as a single unit. + fn pipeline(&self) -> bool { true } } /// The ways in which an operator can express interest in activation when an input frontier changes. diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 6fda3bd62..d2cc63e97 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -629,8 +629,9 @@ struct PerOperatorState { index: usize, // index of the operator within its parent scope id: usize, // worker-unique identifier - local: bool, // indicates whether the operator will exchange data or not + local: bool, // indicates whether progress information is pre-circulated or not notify: Vec, + pipeline: bool, // indicates whether all inputs use thread-local (pipeline) channels inputs: usize, // number of inputs to the operator outputs: usize, // number of outputs from the operator @@ -655,6 +656,7 @@ impl PerOperatorState { id: usize::MAX, local: false, notify: vec![FrontierInterest::IfCapability; inputs], + pipeline: false, inputs, outputs, @@ -679,6 +681,7 @@ impl PerOperatorState { let inputs = scope.inputs(); let outputs = scope.outputs(); let notify = scope.notify_me().to_vec(); + let pipeline = scope.pipeline(); let (internal_summary, shared_progress, operator) = scope.initialize(); @@ -708,6 +711,7 @@ impl PerOperatorState { id: identifier, local, notify, + pipeline, inputs, outputs, edges: vec![vec![]; outputs], @@ -849,7 +853,7 @@ impl Drop for PerOperatorState { /// * Both operators have exactly 1 input and 1 output /// * A's sole output targets B's sole input (no fan-out) /// * B's sole input comes from A's output (no fan-in) -/// * Both are `local == true` (pipeline edges only) +/// * The target uses pipeline (thread-local) channels on its input /// * Both have `notify == false` (no frontier observation) /// /// Returns chains of at least `min_length` operators, identified by index. @@ -886,8 +890,9 @@ fn detect_chains( if src_child.inputs != 1 || src_child.outputs != 1 { continue; } if tgt_child.inputs != 1 || tgt_child.outputs != 1 { continue; } - // Both must be local (pipeline edges). - if !src_child.local || !tgt_child.local { continue; } + // The target must use pipeline (thread-local) channels on its input. + // The source's input pact doesn't matter: it receives data from outside the chain. + if !tgt_child.pipeline { continue; } // Both must not require notifications. if src_child.notify || tgt_child.notify { continue; } From 6f705b327430cfb350bc73807c7587bb63842eb1 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 13 Mar 2026 10:52:32 +0100 Subject: [PATCH 7/8] Generalize chain fusion to DAG subgraph fusion Replace the linear-chain-only fusion (1-input/1-output operators) with general DAG subgraph fusion that handles arbitrary topologies: diamonds, fan-in (concat), fan-out (branch), and mixed patterns. Detection uses union-find over fusible edges instead of forward/reverse chain walking. Fusibility constraints are unchanged per-operator (\!notify, identity summaries, has operator) but the 1-input/1-output restriction is lifted. GroupScheduler replaces ChainScheduler, using topological sort for member execution order and port maps for the group's external interface. Activation forwarding (forward_to on tombstoned members) ensures pipeline channel activations reach the group representative even when data arrives at a non-representative member through feedback edges. The reachability computation uses a single reverse-topological pass over the group's internal DAG, replacing the previous per-node BFS that was O(n^2) in group size. Adds event_driven_diamond benchmark and DAG fusion tests (diamond, multi-input merge, branch, repeated diamonds, Collatz mutual recursion). Co-Authored-By: Claude Opus 4.6 --- timely/examples/event_driven_diamond.rs | 67 +++ timely/src/progress/subgraph.rs | 700 ++++++++++++++++-------- timely/tests/chain_fusion.rs | 198 ++++++- 3 files changed, 729 insertions(+), 236 deletions(-) create mode 100644 timely/examples/event_driven_diamond.rs diff --git a/timely/examples/event_driven_diamond.rs b/timely/examples/event_driven_diamond.rs new file mode 100644 index 000000000..4045a6f17 --- /dev/null +++ b/timely/examples/event_driven_diamond.rs @@ -0,0 +1,67 @@ +use timely::dataflow::operators::{Input, Concat, Probe}; +use timely::dataflow::operators::vec::{Map, Filter}; + +fn main() { + timely::execute_from_args(std::env::args(), |worker| { + + let timer = std::time::Instant::now(); + + // Collect positional arguments, skipping flags consumed by timely (-w, -n, -p, -h). + let positional: Vec = { + let mut pos = Vec::new(); + let mut args = std::env::args(); + args.next(); // skip binary name + while let Some(arg) = args.next() { + if arg.starts_with('-') { + args.next(); // skip flag value + } else { + pos.push(arg); + } + } + pos + }; + + let dataflows = positional[0].parse::().unwrap(); + let diamonds = positional[1].parse::().unwrap(); + let record = positional.get(2).map(|s| s.as_str()) == Some("record"); + let rounds: usize = positional.get(3).map(|s| s.parse().unwrap()).unwrap_or(usize::MAX); + + let mut inputs = Vec::new(); + let mut probes = Vec::new(); + + // Each dataflow builds a chain of diamond patterns: + // input -> map (left) + map (right) -> concat -> ... -> probe + // Each diamond has 3 operators (map, map, concat). + // The clone/branch doesn't create an operator — it reuses the stream's Tee. + for _dataflow in 0..dataflows { + worker.dataflow(|scope| { + let (input, mut stream) = scope.new_input(); + for _diamond in 0..diamonds { + let left = stream.clone().map(|x: ()| x); + let right = stream.filter(|_| false).map(|x: ()| x); + stream = left.concat(right).container::>(); + } + let (probe, _stream) = stream.probe(); + inputs.push(input); + probes.push(probe); + }); + } + + println!("{:?}\tdataflows built ({} x {} diamonds)", timer.elapsed(), dataflows, diamonds); + + for round in 0..rounds { + let dataflow = round % dataflows; + if record { + inputs[dataflow].send(()); + } + inputs[dataflow].advance_to(round); + let mut steps = 0; + while probes[dataflow].less_than(&round) { + worker.step(); + steps += 1; + } + println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); + } + + }).unwrap(); +} diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index d2cc63e97..a2075b6eb 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -16,14 +16,14 @@ use crate::logging::TimelySummaryLogger as SummaryLogger; use crate::scheduling::Schedule; use crate::scheduling::activate::Activations; -use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter, Antichain}; +use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter}; use crate::progress::{Timestamp, Operate, operate::SharedProgress}; use crate::progress::{Location, Port, Source, Target}; use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity}; use crate::progress::ChangeBatch; use crate::progress::broadcast::Progcaster; use crate::progress::reachability; -use crate::progress::timestamp::{Refines, PathSummary}; +use crate::progress::timestamp::Refines; use crate::worker::ProgressMode; @@ -166,12 +166,12 @@ where // Create empty child zero representative. self.children[0] = PerOperatorState::empty(outputs, inputs); - // Pipeline chain fusion: detect and fuse chains of pipeline-connected operators. + // Pipeline group fusion: detect and fuse groups of pipeline-connected operators. let fuse_chain_length = worker.config().fuse_chain_length; if fuse_chain_length >= 2 { - let chains = detect_chains(&self.children, &self.edge_stash, fuse_chain_length); - for chain in chains { - fuse_chain::(&mut self.children, &mut self.edge_stash, &chain); + let groups = detect_groups(&self.children, &self.edge_stash, fuse_chain_length); + for group in groups { + fuse_group::(&mut self.children, &mut self.edge_stash, &group); } } @@ -338,13 +338,16 @@ where // // We should be able to schedule arbitrary subsets of children, as // long as we eventually schedule all children that need to do work. - let mut previous = 0; + let mut scheduled = std::collections::HashSet::new(); + scheduled.insert(0); // Child 0 is the subgraph boundary, never scheduled. while let Some(Reverse(index)) = self.temp_active.pop() { - // De-duplicate, and don't revisit. - if index > previous { + if !scheduled.insert(index) { continue; } + // Tombstoned group members forward activations to their representative. + if let Some(fwd) = self.children[index].forward_to { + self.temp_active.push(Reverse(fwd)); + } else { // TODO: This is a moment where a scheduling decision happens. self.activate_child(index); - previous = index; } } @@ -644,6 +647,9 @@ struct PerOperatorState { internal_summary: Connectivity, // cached result from initialize. logging: Option, + + /// For tombstoned group members: forward activations to the group representative. + forward_to: Option, } impl PerOperatorState { @@ -666,6 +672,7 @@ impl PerOperatorState { shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs,outputs))), internal_summary: Vec::new(), + forward_to: None, } } @@ -720,6 +727,7 @@ impl PerOperatorState { shared_progress, internal_summary, + forward_to: None, } } @@ -845,183 +853,170 @@ impl Drop for PerOperatorState { } } -// --- Pipeline chain fusion --- +// --- Pipeline group fusion --- + +/// Returns true if an operator has identity internal summaries on all (input, output) pairs. +/// That is, every connected (input, output) pair has summary `Antichain::from_elem(Default::default())`. +fn has_identity_summary(child: &PerOperatorState) -> bool { + for input_pc in child.internal_summary.iter() { + for (_port, ac) in input_pc.iter_ports() { + if ac.len() != 1 || ac.elements()[0] != Default::default() { + return false; + } + } + } + // Must have at least one connection (empty summary means no paths). + child.internal_summary.iter().any(|pc| pc.iter_ports().next().is_some()) +} + +/// Returns true if an operator is eligible for group fusion. +fn is_fusible(child: &PerOperatorState) -> bool { + child.operator.is_some() + && !child.notify + && has_identity_summary(child) +} -/// Detects maximal fusible chains from the edge topology. +/// Detects fusible groups of operators connected by pipeline edges. /// -/// A chain is a sequence of operators `[A, B, C, ...]` where each link satisfies: -/// * Both operators have exactly 1 input and 1 output -/// * A's sole output targets B's sole input (no fan-out) -/// * B's sole input comes from A's output (no fan-in) -/// * The target uses pipeline (thread-local) channels on its input -/// * Both have `notify == false` (no frontier observation) +/// Uses union-find to group operators connected by fusible edges into components. +/// An edge is fusible when both endpoints are fusible operators and the target +/// uses pipeline (thread-local) channels. No fan-in/fan-out or 1-in/1-out restriction. /// -/// Returns chains of at least `min_length` operators, identified by index. -fn detect_chains( +/// Returns groups of at least `min_length` operators, identified by child index. +fn detect_groups( children: &[PerOperatorState], edge_stash: &[(Source, Target)], min_length: usize, ) -> Vec> { - // Build forward map: source_node -> target_node (only for single-input/single-output pipeline edges). - // Build reverse map: target_node -> source_node. - let mut forward: HashMap = HashMap::new(); - let mut reverse: HashMap = HashMap::new(); - - // Count edges per source and target to detect fan-out/fan-in. - let mut source_edge_count: HashMap = HashMap::new(); - let mut target_edge_count: HashMap = HashMap::new(); + // Mark fusible operators. + let fusible: Vec = children.iter().enumerate().map(|(i, child)| { + i != 0 && is_fusible(child) + }).collect(); + + // Union-Find structure. + let n = children.len(); + let mut parent: Vec = (0..n).collect(); + let mut rank: Vec = vec![0; n]; + + fn find(parent: &mut [usize], x: usize) -> usize { + if parent[x] != x { + parent[x] = find(parent, parent[x]); + } + parent[x] + } - for (source, target) in edge_stash.iter() { - *source_edge_count.entry(source.node).or_insert(0) += 1; - *target_edge_count.entry(target.node).or_insert(0) += 1; + fn union(parent: &mut [usize], rank: &mut [usize], a: usize, b: usize) { + let ra = find(parent, a); + let rb = find(parent, b); + if ra == rb { return; } + if rank[ra] < rank[rb] { + parent[ra] = rb; + } else if rank[ra] > rank[rb] { + parent[rb] = ra; + } else { + parent[rb] = ra; + rank[ra] += 1; + } } + // For each edge, if both endpoints are fusible and target uses pipeline pact, union them. for (source, target) in edge_stash.iter() { let src = source.node; let tgt = target.node; - - // Skip child 0 (the subgraph boundary). if src == 0 || tgt == 0 { continue; } - - let src_child = &children[src]; - let tgt_child = &children[tgt]; - - // Both must have exactly 1 input and 1 output. - if src_child.inputs != 1 || src_child.outputs != 1 { continue; } - if tgt_child.inputs != 1 || tgt_child.outputs != 1 { continue; } - - // The target must use pipeline (thread-local) channels on its input. - // The source's input pact doesn't matter: it receives data from outside the chain. - if !tgt_child.pipeline { continue; } - - // Both must not require notifications. - if src_child.notify || tgt_child.notify { continue; } - - // No fan-out from source, no fan-in to target. - if source_edge_count.get(&src) != Some(&1) { continue; } - if target_edge_count.get(&tgt) != Some(&1) { continue; } - - // Must have an operator (not already tombstoned). - if src_child.operator.is_none() || tgt_child.operator.is_none() { continue; } - - // Edge must connect port 0 -> port 0. - if source.port != 0 || target.port != 0 { continue; } - - // Both must have identity internal summaries. - // Non-identity summaries (e.g., feedback operators with iteration steps) - // require per-member timestamp transformation that fusion does not support. - fn has_identity_summary(child: &PerOperatorState) -> bool { - if child.internal_summary.len() != 1 { return false; } - match child.internal_summary[0].get(0) { - Some(ac) => ac.len() == 1 && ac.elements()[0] == Default::default(), - None => false, - } - } - if !has_identity_summary(src_child) || !has_identity_summary(tgt_child) { continue; } - - forward.insert(src, tgt); - reverse.insert(tgt, src); + if !fusible[src] || !fusible[tgt] { continue; } + if !children[tgt].pipeline { continue; } + union(&mut parent, &mut rank, src, tgt); } - // Walk chains starting from heads (nodes with no predecessor in a chain). - let mut visited = vec![false; children.len()]; - let mut chains = Vec::new(); - - for &head in forward.keys() { - // A head is a node that is not a successor in any chain link. - if reverse.contains_key(&head) { continue; } - if visited[head] { continue; } - - let mut chain = vec![head]; - visited[head] = true; - let mut current = head; - while let Some(&next) = forward.get(¤t) { - if visited[next] { break; } - chain.push(next); - visited[next] = true; - current = next; - } - - if chain.len() >= min_length { - chains.push(chain); + // Collect components. + let mut components: HashMap> = HashMap::new(); + for i in 1..n { + if fusible[i] { + let root = find(&mut parent, i); + components.entry(root).or_default().push(i); } } - chains + // Filter by minimum size and sort members for determinism. + components.into_values() + .filter(|group| group.len() >= min_length) + .map(|mut group| { group.sort(); group }) + .collect() } -/// Composes internal summaries for a chain of single-input/single-output operators. -/// -/// For chain `[A, B, C]`, the composed summary is: -/// `A.summary[0→0] followed_by B.summary[0→0] followed_by C.summary[0→0]` -fn compose_summaries( - summaries: &[&Connectivity], -) -> Connectivity { - if summaries.is_empty() { - return vec![PortConnectivity::default()]; - } +/// Topological sort of group members using Kahn's algorithm on internal edges. +fn topological_sort( + members: &[usize], + edge_stash: &[(Source, Target)], +) -> Vec { + let member_set: std::collections::HashSet = members.iter().cloned().collect(); + let member_to_pos: HashMap = members.iter().enumerate().map(|(i, &m)| (m, i)).collect(); + let n = members.len(); - // Start with the first operator's summary (input 0 -> output 0). - let mut result_antichain: Antichain = Antichain::new(); + let mut in_degree = vec![0usize; n]; + let mut adj: Vec> = vec![Vec::new(); n]; - // Extract the first summary's 0→0 antichain. - if let Some(first) = summaries[0].get(0) { - if let Some(ac) = first.get(0) { - result_antichain = ac.clone(); + for (source, target) in edge_stash.iter() { + if member_set.contains(&source.node) && member_set.contains(&target.node) { + let from = member_to_pos[&source.node]; + let to = member_to_pos[&target.node]; + // Avoid counting duplicate edges for the same (from, to) pair multiple times + // for in-degree. We track adjacency; Kahn's handles it correctly. + adj[from].push(to); + in_degree[to] += 1; } } - // Compose with each subsequent operator's summary. - for summary in &summaries[1..] { - let next_antichain = summary.get(0).and_then(|pc| pc.get(0)); - let next_antichain = match next_antichain { - Some(ac) => ac, - None => { - // No path through this operator; result is empty. - result_antichain = Antichain::new(); - break; - } - }; + let mut queue: std::collections::VecDeque = std::collections::VecDeque::new(); + for i in 0..n { + if in_degree[i] == 0 { + queue.push_back(i); + } + } - let mut composed = Antichain::new(); - for a in result_antichain.elements().iter() { - for b in next_antichain.elements().iter() { - if let Some(c) = a.followed_by(b) { - composed.insert(c); - } + let mut order = Vec::with_capacity(n); + while let Some(pos) = queue.pop_front() { + order.push(members[pos]); + for &next in &adj[pos] { + in_degree[next] -= 1; + if in_degree[next] == 0 { + queue.push_back(next); } } - result_antichain = composed; } - let mut port_connectivity = PortConnectivity::default(); - for elem in result_antichain.elements().iter() { - port_connectivity.insert_ref(0, elem); - } - vec![port_connectivity] + assert_eq!(order.len(), n, "group contains a cycle, which should be impossible with identity summaries"); + order } -/// A member of a fused chain, holding the original operator and its progress handle. -struct ChainMember { +/// A member of a fused group, holding the original operator and its progress handle. +struct GroupMember { operator: Box, shared_progress: Rc>>, } -/// Schedules a chain of pipeline-connected operators as a single unit. +/// Schedules a DAG of pipeline-connected operators as a single unit. /// -/// The chain presents as a single operator to the subgraph: it has the first -/// member's input and the last member's output. Intermediate progress is -/// hidden from the reachability tracker. -struct ChainScheduler { +/// The group presents as a single operator to the subgraph with `input_map.len()` inputs +/// and `output_map.len()` outputs. Members are scheduled in topological order. +/// Intermediate progress is hidden from the reachability tracker. +struct GroupScheduler { name: String, path: Vec, - /// Progress visible to the subgraph (first op's inputs, last op's outputs). - chain_progress: Rc>>, - /// Operators in execution order, with their individual SharedProgress handles. - members: Vec>, + /// Progress visible to the subgraph. + group_progress: Rc>>, + /// Operators in topological order, with their individual SharedProgress handles. + members: Vec>, + /// Group input i -> (member index in members vec, member input port) + input_map: Vec<(usize, usize)>, + /// Group output j -> (member index in members vec, member output port) + output_map: Vec<(usize, usize)>, + /// capability_map[member_idx][output_port] -> list of group output indices + capability_map: Vec>>, } -impl Schedule for ChainScheduler { +impl Schedule for GroupScheduler { fn name(&self) -> &str { &self.name } fn path(&self) -> &[usize] { &self.path } @@ -1029,18 +1024,18 @@ impl Schedule for ChainScheduler { let n = self.members.len(); assert!(n > 0); - // Step 1: Copy chain's input frontier changes to first member. + // Step 1: Forward group's input frontier changes to the appropriate members. { - let mut chain_sp = self.chain_progress.borrow_mut(); - let mut first_sp = self.members[0].shared_progress.borrow_mut(); - for (port, frontier) in chain_sp.frontiers.iter_mut().enumerate() { - for (time, diff) in frontier.iter() { - first_sp.frontiers[port].update(time.clone(), *diff); + let mut group_sp = self.group_progress.borrow_mut(); + for (i, &(member_idx, member_port)) in self.input_map.iter().enumerate() { + let mut member_sp = self.members[member_idx].shared_progress.borrow_mut(); + for (time, diff) in group_sp.frontiers[i].iter() { + member_sp.frontiers[member_port].update(time.clone(), *diff); } } } - // Step 2: Schedule each member in sequence. + // Step 2: Schedule each member in topological order. let mut any_incomplete = false; for i in 0..n { let incomplete = self.members[i].operator.schedule(); @@ -1049,32 +1044,32 @@ impl Schedule for ChainScheduler { // Step 3: Aggregate progress. { - let mut chain_sp = self.chain_progress.borrow_mut(); + let mut group_sp = self.group_progress.borrow_mut(); - // consumeds = first member's consumeds - let mut first_sp = self.members[0].shared_progress.borrow_mut(); - for (port, consumed) in first_sp.consumeds.iter_mut().enumerate() { - for (time, diff) in consumed.iter() { - chain_sp.consumeds[port].update(time.clone(), *diff); + // consumeds: for each group input, take from the corresponding member. + for (i, &(member_idx, member_port)) in self.input_map.iter().enumerate() { + let mut member_sp = self.members[member_idx].shared_progress.borrow_mut(); + for (time, diff) in member_sp.consumeds[member_port].iter() { + group_sp.consumeds[i].update(time.clone(), *diff); } } - drop(first_sp); - // produceds = last member's produceds - let mut last_sp = self.members[n-1].shared_progress.borrow_mut(); - for (port, produced) in last_sp.produceds.iter_mut().enumerate() { - for (time, diff) in produced.iter() { - chain_sp.produceds[port].update(time.clone(), *diff); + // produceds: for each group output, take from the corresponding member. + for (j, &(member_idx, member_port)) in self.output_map.iter().enumerate() { + let mut member_sp = self.members[member_idx].shared_progress.borrow_mut(); + for (time, diff) in member_sp.produceds[member_port].iter() { + group_sp.produceds[j].update(time.clone(), *diff); } } - drop(last_sp); - // internals = aggregate from all members - for member in self.members.iter() { + // internals: for each member's output port, report at mapped group outputs. + for (m, member) in self.members.iter().enumerate() { let mut member_sp = member.shared_progress.borrow_mut(); for (port, internal) in member_sp.internals.iter_mut().enumerate() { for (time, diff) in internal.iter() { - chain_sp.internals[port].update(time.clone(), *diff); + for &group_out in &self.capability_map[m][port] { + group_sp.internals[group_out].update(time.clone(), *diff); + } } } } @@ -1089,75 +1084,280 @@ impl Schedule for ChainScheduler { for batch in sp.produceds.iter_mut() { batch.clear(); } } - // Clear the chain's own frontiers (consumed by step 1). + // Clear the group's own frontiers (consumed by step 1). { - let mut chain_sp = self.chain_progress.borrow_mut(); - for batch in chain_sp.frontiers.iter_mut() { batch.clear(); } + let mut group_sp = self.group_progress.borrow_mut(); + for batch in group_sp.frontiers.iter_mut() { batch.clear(); } } any_incomplete } } -/// Fuses a detected chain into a single operator within `children`, rewriting `edge_stash`. +/// Computes reachability for all (member, output_port) pairs in a single reverse-topological pass. +/// +/// Returns `capability_map[topo_pos][output_port] -> sorted Vec`. +/// Since all summaries are identity, timestamps don't change along any path. +fn compute_all_reachability( + topo_order: &[usize], + children: &[PerOperatorState], + internal_edges: &HashMap<(usize, usize), Vec<(usize, usize)>>, + member_summaries: &HashMap>, + output_port_to_group_output: &HashMap<(usize, usize), Vec>, +) -> Vec>> { + let n = topo_order.len(); + + // Build reverse lookup: node -> topo_pos. + let node_to_topo: HashMap = topo_order.iter().enumerate() + .map(|(i, &node)| (node, i)) + .collect(); + + // reachable[(node, output_port)] -> set of group output indices + // Use a flat Vec indexed by (topo_pos, port) for fast access. + // First, compute a port offset table. + let mut port_offset = Vec::with_capacity(n); + let mut total_ports = 0usize; + for &node in topo_order.iter() { + port_offset.push(total_ports); + total_ports += children[node].outputs; + } + + // Each entry is a sorted Vec of reachable group outputs. + let mut reachable: Vec> = vec![Vec::new(); total_ports]; + + // Seed: output ports that are directly group outputs. + for (&(node, port), group_outs) in output_port_to_group_output.iter() { + if let Some(&topo_pos) = node_to_topo.get(&node) { + let idx = port_offset[topo_pos] + port; + reachable[idx] = group_outs.clone(); + reachable[idx].sort(); + reachable[idx].dedup(); + } + } + + // Reverse topological pass: propagate reachability backward through edges. + for rev_pos in (0..n).rev() { + let node = topo_order[rev_pos]; + let num_outputs = children[node].outputs; + + // For each output port of this node, follow internal edges forward + // and union the reachability of the downstream ports. + for port in 0..num_outputs { + if let Some(targets) = internal_edges.get(&(node, port)) { + for &(next_node, next_input_port) in targets { + // Use next node's summary to find which output ports are reachable from this input. + if let Some(connections) = member_summaries.get(&next_node) { + if let Some(&next_topo) = node_to_topo.get(&next_node) { + for &(inp, outp) in connections.iter() { + if inp == next_input_port { + // Merge reachable[next_topo][outp] into reachable[rev_pos][port]. + let src_idx = port_offset[next_topo] + outp; + let dst_idx = port_offset[rev_pos] + port; + if src_idx != dst_idx { + // Clone to avoid double borrow. + let to_add = reachable[src_idx].clone(); + let dst = &mut reachable[dst_idx]; + dst.extend_from_slice(&to_add); + } + } + } + } + } + } + } + // Deduplicate after merging all edges for this port. + let idx = port_offset[rev_pos] + port; + reachable[idx].sort(); + reachable[idx].dedup(); + } + } + + // Reshape into capability_map[topo_pos][output_port]. + let mut capability_map = Vec::with_capacity(n); + for (topo_pos, &node) in topo_order.iter().enumerate() { + let num_outputs = children[node].outputs; + let mut port_map = Vec::with_capacity(num_outputs); + for port in 0..num_outputs { + let idx = port_offset[topo_pos] + port; + port_map.push(std::mem::take(&mut reachable[idx])); + } + capability_map.push(port_map); + } + + capability_map +} + +/// Fuses a detected group into a single operator within `children`, rewriting `edge_stash`. /// -/// The head of the chain retains its slot and becomes the fused operator. -/// All other chain members become tombstones. -fn fuse_chain( +/// The representative (lowest index in group) retains its slot and becomes the fused operator. +/// All other group members become tombstones. +fn fuse_group( children: &mut [PerOperatorState], edge_stash: &mut Vec<(Source, Target)>, - chain: &[usize], + group: &[usize], ) { - assert!(chain.len() >= 2); - let head_idx = chain[0]; - let tail_idx = *chain.last().unwrap(); + assert!(group.len() >= 2); + let group_set: std::collections::HashSet = group.iter().cloned().collect(); + let representative = *group.iter().min().unwrap(); + + // Step 1: Compute topological order. + let topo_order = topological_sort(group, edge_stash); + let node_to_topo: HashMap = topo_order.iter().enumerate().map(|(i, &n)| (n, i)).collect(); + + // Step 2: Compute input_map and output_map by scanning edges. + // Group inputs: (member_node, input_port) pairs where at least one incoming edge originates outside the group. + // Group outputs: (member_node, output_port) pairs where at least one outgoing edge targets outside the group, + // OR the port has no outgoing edges at all within the edge_stash. + let mut group_input_set: std::collections::HashSet<(usize, usize)> = std::collections::HashSet::new(); + let mut group_output_set: std::collections::HashSet<(usize, usize)> = std::collections::HashSet::new(); + let mut has_outgoing: std::collections::HashSet<(usize, usize)> = std::collections::HashSet::new(); + + // Collect all output ports of group members. + let mut all_output_ports: std::collections::HashSet<(usize, usize)> = std::collections::HashSet::new(); + for &node in group.iter() { + for port in 0..children[node].outputs { + all_output_ports.insert((node, port)); + } + } + + // Build internal edges map: (src_node, src_port) -> [(tgt_node, tgt_port)] + let mut internal_edges: HashMap<(usize, usize), Vec<(usize, usize)>> = HashMap::new(); + + for (source, target) in edge_stash.iter() { + let src_in = group_set.contains(&source.node); + let tgt_in = group_set.contains(&target.node); + + if src_in { + has_outgoing.insert((source.node, source.port)); + } + + if src_in && tgt_in { + // Internal edge + internal_edges.entry((source.node, source.port)) + .or_default() + .push((target.node, target.port)); + } else if !src_in && tgt_in { + // Incoming edge from outside + group_input_set.insert((target.node, target.port)); + } else if src_in && !tgt_in { + // Outgoing edge to outside + group_output_set.insert((source.node, source.port)); + } + } + + // Output ports with no outgoing edges at all are also group outputs. + for &(node, port) in &all_output_ports { + if !has_outgoing.contains(&(node, port)) { + group_output_set.insert((node, port)); + } + } + + // Sort and assign indices for determinism. + let mut input_map: Vec<(usize, usize)> = group_input_set.into_iter() + .map(|(node, port)| (node_to_topo[&node], port)) + .collect(); + input_map.sort(); + // Convert back: input_map elements are (topo_position, port) - // Compose internal summaries. - let summary_refs: Vec<&Connectivity> = chain.iter() - .map(|&idx| &children[idx].internal_summary) + let mut output_map: Vec<(usize, usize)> = group_output_set.into_iter() + .map(|(node, port)| (node_to_topo[&node], port)) .collect(); - let composed_summary = compose_summaries::(&summary_refs); + output_map.sort(); - // Collect members by extracting operators and shared_progress from children. - let mut members = Vec::with_capacity(chain.len()); - let mut chain_name_parts = Vec::new(); - let mut head_path = Vec::new(); + // Build reverse lookups. + // (node, input_port) -> group input index + let input_port_to_group_input: HashMap<(usize, usize), usize> = input_map.iter().enumerate() + .map(|(i, &(topo_pos, port))| ((topo_order[topo_pos], port), i)) + .collect(); + + // (node, output_port) -> group output indices + let mut output_port_to_group_output: HashMap<(usize, usize), Vec> = HashMap::new(); + for (j, &(topo_pos, port)) in output_map.iter().enumerate() { + output_port_to_group_output.entry((topo_order[topo_pos], port)) + .or_default() + .push(j); + } + + // Step 3: Compute member summaries (input_port, output_port) connections for each node. + let mut member_summaries: HashMap> = HashMap::new(); + for &node in group.iter() { + let mut connections = Vec::new(); + for (inp_idx, pc) in children[node].internal_summary.iter().enumerate() { + for (out_port, _ac) in pc.iter_ports() { + connections.push((inp_idx, out_port)); + } + } + member_summaries.insert(node, connections); + } + + // Step 4: Compute capability_map via reachability (single reverse-topological pass). + // capability_map[topo_pos][output_port] -> list of group output indices + let capability_map = compute_all_reachability( + &topo_order, children, &internal_edges, &member_summaries, &output_port_to_group_output, + ); + + // Step 5: Compute composed summary for the group. + // For each (group_input_i, group_output_j): if there's a reachability path, set identity summary. + let num_inputs = input_map.len(); + let num_outputs = output_map.len(); + + let mut composed_summary: Connectivity = Vec::with_capacity(num_inputs); + for &(topo_pos, port) in input_map.iter() { + let node = topo_order[topo_pos]; + let mut pc = PortConnectivity::default(); + + // Find which group outputs are reachable from this input. + // Use the node's summary to find output ports reachable from this input port, + // then use capability_map for those output ports. + if let Some(connections) = member_summaries.get(&node) { + for &(inp, outp) in connections.iter() { + if inp == port { + for &group_out in &capability_map[topo_pos][outp] { + pc.insert(group_out, Default::default()); + } + } + } + } + composed_summary.push(pc); + } + + // Step 6: Extract members in topological order. + let mut members = Vec::with_capacity(topo_order.len()); + let mut group_name_parts = Vec::new(); + let mut representative_path = Vec::new(); - for (i, &idx) in chain.iter().enumerate() { - let child = &mut children[idx]; - let operator = child.operator.take().expect("chain member must have an operator"); + for &node in topo_order.iter() { + let child = &mut children[node]; + let operator = child.operator.take().expect("group member must have an operator"); let shared_progress = Rc::clone(&child.shared_progress); - if i == 0 { - head_path = operator.path().to_vec(); + if node == representative { + representative_path = operator.path().to_vec(); } - chain_name_parts.push(child.name.clone()); + group_name_parts.push(child.name.clone()); - members.push(ChainMember { + members.push(GroupMember { operator, shared_progress, }); } - let chain_name = format!("Chain[{}]", chain_name_parts.join(" -> ")); + let group_name = format!("Group[{}]", group_name_parts.join(", ")); - // Create the chain's SharedProgress: 1 input, 1 output (matching head's input, tail's output). - let chain_progress = Rc::new(RefCell::new(SharedProgress::new(1, 1))); + // Step 7: Create the group's SharedProgress. + let group_progress = Rc::new(RefCell::new(SharedProgress::new(num_inputs, num_outputs))); - // Transfer initial internal capabilities from ALL members to chain_progress. - // - // Each member declared +peers capabilities at T::minimum() during initialize(). - // All N members' capabilities must be visible to the reachability tracker at the - // chain's single output port, because each member independently drops its - // capability during execution. If we only reported one member's initial caps, - // the tracker would go negative after N members drop their capabilities. + // Transfer initial internal capabilities from ALL members to group_progress, + // mapped through capability_map. { - let mut chain_sp = chain_progress.borrow_mut(); - for member in members.iter() { + let mut group_sp = group_progress.borrow_mut(); + for (topo_pos, member) in members.iter().enumerate() { let mut member_sp = member.shared_progress.borrow_mut(); for (port, internal) in member_sp.internals.iter_mut().enumerate() { for (time, diff) in internal.iter() { - chain_sp.internals[port].update(time.clone(), *diff); + for &group_out in &capability_map[topo_pos][port] { + group_sp.internals[group_out].update(time.clone(), *diff); + } } } } @@ -1169,23 +1369,31 @@ fn fuse_chain( for batch in sp.internals.iter_mut() { batch.clear(); } } - let chain_scheduler: Box = Box::new(ChainScheduler { - name: chain_name.clone(), - path: head_path, - chain_progress: Rc::clone(&chain_progress), + let group_scheduler: Box = Box::new(GroupScheduler { + name: group_name.clone(), + path: representative_path, + group_progress: Rc::clone(&group_progress), members, + input_map: input_map.clone(), + output_map: output_map.clone(), + capability_map, }); - // Install the fused operator at the head slot. - let head = &mut children[head_idx]; - head.name = chain_name; - head.operator = Some(chain_scheduler); - head.shared_progress = chain_progress; + // Step 8: Install the fused operator at the representative slot. + // Edges are left empty here; the build method populates them from edge_stash. + let head = &mut children[representative]; + head.name = group_name; + head.operator = Some(group_scheduler); + head.shared_progress = group_progress; head.internal_summary = composed_summary; - - // Tombstone all other chain members. - for &idx in &chain[1..] { - let child = &mut children[idx]; + head.inputs = num_inputs; + head.outputs = num_outputs; + head.edges = vec![Vec::new(); num_outputs]; + + // Step 9: Tombstone all other group members, forwarding activations to representative. + for &node in group.iter() { + if node == representative { continue; } + let child = &mut children[node]; child.name = format!("Tombstone({})", child.name); child.operator = None; child.shared_progress = Rc::new(RefCell::new(SharedProgress::new(0, 0))); @@ -1193,22 +1401,48 @@ fn fuse_chain( child.inputs = 0; child.outputs = 0; child.internal_summary = Vec::new(); + child.forward_to = Some(representative); } - // Rewrite edge_stash: - // 1. Remove internal chain edges (edges between consecutive chain members). - // 2. Rewrite edges from the tail's output to use the head's index as source. - let chain_set: std::collections::HashSet = chain.iter().cloned().collect(); - - edge_stash.retain(|(source, target)| { - // Remove edges internal to the chain. - !(chain_set.contains(&source.node) && chain_set.contains(&target.node)) - }); + // Step 10: Rewrite edge_stash. + // Remove edges where both endpoints are in the group. + // Rewrite edges incoming to group members: target.node = representative, target.port = group_input_index. + // Rewrite edges outgoing from group members: source.node = representative, source.port = group_output_index. + let mut new_edge_stash: Vec<(Source, Target)> = Vec::new(); - // Rewrite edges from tail's output to come from head's output instead. - for (source, _target) in edge_stash.iter_mut() { - if source.node == tail_idx { - source.node = head_idx; + for (source, target) in edge_stash.iter() { + let src_in = group_set.contains(&source.node); + let tgt_in = group_set.contains(&target.node); + + if src_in && tgt_in { + // Internal edge: remove. + continue; + } else if !src_in && tgt_in { + // Incoming edge: rewrite target. + if let Some(&group_input) = input_port_to_group_input.get(&(target.node, target.port)) { + new_edge_stash.push(( + *source, + Target::new(representative, group_input), + )); + } + } else if src_in && !tgt_in { + // Outgoing edge: rewrite source. + let topo_pos = node_to_topo[&source.node]; + if let Some(group_outs) = output_port_to_group_output.get(&(source.node, source.port)) { + for &group_out in group_outs { + if output_map[group_out] == (topo_pos, source.port) { + new_edge_stash.push(( + Source::new(representative, group_out), + *target, + )); + } + } + } + } else { + // Neither endpoint in group: keep as-is. + new_edge_stash.push((*source, *target)); } } + + *edge_stash = new_edge_stash; } diff --git a/timely/tests/chain_fusion.rs b/timely/tests/chain_fusion.rs index d04eb0629..a3ca309af 100644 --- a/timely/tests/chain_fusion.rs +++ b/timely/tests/chain_fusion.rs @@ -1,8 +1,8 @@ -//! Tests for pipeline chain fusion. +//! Tests for pipeline group fusion. use std::sync::{Arc, Mutex}; -use timely::dataflow::operators::{ToStream, Inspect, Probe}; -use timely::dataflow::operators::vec::Map; +use timely::dataflow::operators::{ToStream, Concat, Inspect, Probe, Feedback, ConnectLoop}; +use timely::dataflow::operators::vec::{Map, Filter, Partition}; /// Verifies that a chain of map operators produces correct output. #[test] @@ -234,3 +234,195 @@ fn chain_fusion_flat_map() { expected.sort(); assert_eq!(got, expected); } + +/// Diamond pattern: stream -> map (left) + map (right) -> concat -> inspect. +/// All operators are fusible (!notify, pipeline, identity summary). +#[test] +fn group_fusion_diamond() { + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + timely::execute_from_args(std::env::args(), move |worker| { + let result3 = Arc::clone(&result2); + worker.dataflow::(|scope| { + let stream = (0..10u64).to_stream(scope); + let left = stream.clone().map(|x| x + 1); + let right = stream.map(|x| x + 100); + left.concat(right) + .map(|x| x * 2) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }); + }); + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + let mut expected: Vec = (0..10u64) + .flat_map(|x| vec![(x + 1) * 2, (x + 100) * 2]) + .collect(); + expected.sort(); + assert_eq!(got, expected); +} + +/// Diamond with probe: verifies dataflow completion with DAG fusion. +#[test] +fn group_fusion_diamond_with_probe() { + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + timely::execute_from_args(std::env::args(), move |worker| { + let result3 = Arc::clone(&result2); + let (mut input, probe) = worker.dataflow::(|scope| { + use timely::dataflow::operators::Input; + let (input, stream) = scope.new_input(); + let left = stream.clone().map(|x: u64| x + 1); + let right = stream.map(|x: u64| x + 100); + let probe = left.concat(right) + .map(|x| x * 2) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }) + .probe() + .0; + (input, probe) + }); + + for round in 0..5usize { + input.send(round as u64); + input.advance_to(round + 1); + worker.step_while(|| probe.less_than(&(round + 1))); + } + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + let mut expected: Vec = (0..5u64) + .flat_map(|x| vec![(x + 1) * 2, (x + 100) * 2]) + .collect(); + expected.sort(); + assert_eq!(got, expected); +} + +/// Multi-input merge: two independent input streams -> concat -> map. +#[test] +fn group_fusion_multi_input_merge() { + let result = Arc::new(Mutex::new(Vec::new())); + let result2 = Arc::clone(&result); + + timely::execute_from_args(std::env::args(), move |worker| { + let result3 = Arc::clone(&result2); + worker.dataflow::(|scope| { + let s1 = (0..5u64).to_stream(scope).map(|x| x + 1); + let s2 = (10..15u64).to_stream(scope).map(|x| x + 1); + s1.concat(s2) + .map(|x| x * 3) + .inspect(move |x| { + result3.lock().unwrap().push(*x); + }); + }); + }).unwrap(); + + let mut got = result.lock().unwrap().clone(); + got.sort(); + let mut expected: Vec = (0..5u64).map(|x| (x + 1) * 3) + .chain((10..15u64).map(|x| (x + 1) * 3)) + .collect(); + expected.sort(); + assert_eq!(got, expected); +} + +/// Branch without merge: map -> (map + map) with two separate outputs consumed by inspect. +/// The two branches are not merged back, testing fan-out group outputs. +#[test] +fn group_fusion_branch() { + let left_result = Arc::new(Mutex::new(Vec::new())); + let right_result = Arc::new(Mutex::new(Vec::new())); + let left_result2 = Arc::clone(&left_result); + let right_result2 = Arc::clone(&right_result); + + timely::execute_from_args(std::env::args(), move |worker| { + let left_result3 = Arc::clone(&left_result2); + let right_result3 = Arc::clone(&right_result2); + worker.dataflow::(|scope| { + let stream = (0..5u64).to_stream(scope).map(|x| x + 1); + // Two branches from the same map output. + stream.clone().map(|x| x * 2) + .inspect(move |x| { + left_result3.lock().unwrap().push(*x); + }); + stream.map(|x| x * 3) + .inspect(move |x| { + right_result3.lock().unwrap().push(*x); + }); + }); + }).unwrap(); + + let mut got_left = left_result.lock().unwrap().clone(); + got_left.sort(); + let expected_left: Vec = (0..5).map(|x| (x + 1) * 2).collect(); + assert_eq!(got_left, expected_left); + + let mut got_right = right_result.lock().unwrap().clone(); + got_right.sort(); + let expected_right: Vec = (0..5).map(|x| (x + 1) * 3).collect(); + assert_eq!(got_right, expected_right); +} + +/// Collatz mutual recursion with feedback loops. +/// This exercises DAG fusion with external feedback edges. +#[test] +fn group_fusion_collatz_mutual_recursion() { + let config = timely::Config { + communication: timely::CommunicationConfig::Thread, + worker: timely::WorkerConfig::default(), + }; + + timely::execute(config, |worker| { + worker.dataflow::(|scope| { + let (handle0, stream0) = scope.feedback(1); + let (handle1, stream1) = scope.feedback(1); + + let results0 = stream0.map(|x: u64| x / 2).filter(|x| *x != 1); + let results1 = stream1.map(|x: u64| 3 * x + 1); + + let mut parts = + (1u64..10) + .to_stream(scope) + .concat(results0) + .concat(results1) + .inspect(|_x| {}) + .partition(2, |x| (x % 2, x)); + + parts.pop().unwrap().connect_loop(handle1); + parts.pop().unwrap().connect_loop(handle0); + }); + }).unwrap(); +} + +/// Repeated diamond chain with probe: tests larger fused groups. +#[test] +fn group_fusion_repeated_diamonds_with_probe() { + use timely::dataflow::operators::Input; + + timely::execute_from_args(std::env::args(), move |worker| { + let (mut input, probe) = worker.dataflow(|scope| { + let (input, mut stream) = scope.new_input(); + for _diamond in 0..15 { + let left = stream.clone().map(|x: u64| x); + let right = stream.map(|x: u64| x); + stream = left.concat(right).container::>(); + } + let (probe, _stream) = stream.probe(); + (input, probe) + }); + + for round in 0..5usize { + input.send(0u64); + input.advance_to(round); + while probe.less_than(&round) { + worker.step(); + } + } + }).unwrap(); +} From fb034af7ba5f47c98a64c682b211b1cb4e49be72 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 13 Mar 2026 10:56:14 +0100 Subject: [PATCH 8/8] Document operator fusion in mdbook internals chapter Add chapter 5.4 explaining how operator fusion works: fusibility constraints, group detection, scheduling, capability mapping, correctness argument for progress tracking, and configuration. Co-Authored-By: Claude Opus 4.6 --- mdbook/src/SUMMARY.md | 1 + mdbook/src/chapter_5/chapter_5_4.md | 123 ++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) create mode 100644 mdbook/src/chapter_5/chapter_5_4.md diff --git a/mdbook/src/SUMMARY.md b/mdbook/src/SUMMARY.md index 45ad4fb99..5b48e0181 100644 --- a/mdbook/src/SUMMARY.md +++ b/mdbook/src/SUMMARY.md @@ -36,3 +36,4 @@ - [Internals](./chapter_5/chapter_5.md) - [Communication](./chapter_5/chapter_5_1.md) - [Progress Tracking](./chapter_5/chapter_5_2.md) + - [Operator Fusion](./chapter_5/chapter_5_4.md) diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md new file mode 100644 index 000000000..3d225e6ee --- /dev/null +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -0,0 +1,123 @@ +# Operator fusion + +When building dataflows, users often compose many small operators: a `map` followed by a `filter`, a `flat_map`, another `map`, and finally a `probe`. +Each operator is a separate node in the progress tracking graph, with its own `SharedProgress` handle, pointstamp accounting, and scheduling overhead. +For long pipelines, this overhead dominates actual computation. + +Operator fusion detects groups of operators that can be scheduled as a single unit, hiding intermediate nodes from the reachability tracker. +This section explains how fusion works and why it preserves correctness. + +## Which operators fuse + +Fusion applies to operators connected by pipeline (thread-local) channels where the group's internal progress tracking can be collapsed without losing information. +An operator is *fusible* if: + +* It does not observe frontiers (`notify == false`). + Frontier-observing operators buffer data until they receive a notification that a timestamp is complete. + Fusing them would require propagating frontiers within the group, which the scheduler does not do. +* All (input, output) pairs in its internal summary are the identity. + Non-identity summaries (like the feedback operator's `Product(0, 1)`) require per-member timestamp transformation that the group's aggregate reporting does not support. +* It has an operator implementation (not already tombstoned). + +An edge between two fusible operators is *fusible* if the target uses pipeline pact on the corresponding input port. +Exchange or broadcast pacts route data through inter-worker channels that the group scheduler cannot intercept. + +Operators connected by fusible edges are grouped using union-find. +Groups with fewer members than a configurable threshold (`fuse_chain_length`, default 2) are left alone. +There is no restriction on fan-in or fan-out: diamonds, concatenations, and branches all fuse. + +## How a fused group presents to the subgraph + +A fused group replaces its member operators with a single `GroupScheduler` installed at the representative slot (the lowest-indexed member). +All other members become tombstones. + +The group exposes: + +* **Group inputs**: member input ports that receive edges from outside the group. +* **Group outputs**: member output ports that send edges outside the group, or that have no outgoing edges (their capabilities still need tracking). + +The subgraph's `edge_stash` is rewritten: internal edges are removed, incoming edges are retargeted to the representative's group input ports, and outgoing edges are sourced from the representative's group output ports. + +## Scheduling + +Members are executed in topological order, computed by Kahn's algorithm over internal edges. +This guarantees that data pushed by a producer through a pipeline channel is available to its consumer when the consumer runs. + +The physical pipeline channels between members are established during operator construction and are unaffected by fusion. +Only the progress tracking layer changes. + +### Activation forwarding + +Pipeline channels activate the original target operator when data arrives. +After fusion, the target may be a tombstone. +Each tombstone records a `forward_to` field pointing to the group representative. +The subgraph's scheduling loop checks this field and redirects the activation. + +## Why the fused group reports correct progress + +The key insight is that because all members have identity summaries, a capability at any member's output port at timestamp `t` implies the same timestamp `t` at every reachable group output. +The timestamp does not change along any internal path. + +### Consumeds and produceds + +The group reports consumeds only for group input ports and produceds only for group output ports. +Intermediate consumeds and produceds (data passing between members through internal pipeline channels) would cancel in the reachability tracker: a member producing `(t, +d)` and the next member consuming `(t, -d)` net to zero. +Since the internal edges are removed from the tracker, these intermediate changes are simply not reported. + +### Internal capabilities + +Each member reports internal capability changes through its `SharedProgress.internals`. +In the unfused case, the reachability tracker sees each member's capabilities at their respective source locations and computes implications through the graph. + +The group scheduler aggregates each member's internal changes to the group outputs via a *capability map*. +This map is computed by a single reverse-topological pass over the group's internal DAG: + +1. Seed: member output ports that are group outputs map directly to themselves. +2. Reverse pass: for each member from last to first in topological order, for each output port, follow internal edges forward to downstream members. + Use the downstream member's summary to find which of its output ports are reachable from the targeted input port. + Union the reachability sets. + +This produces `capability_map[member][output_port] -> Vec`. + +When the group scheduler runs, it reads each member's `SharedProgress.internals` and reports them at every group output reached via the capability map. +Because all summaries are identity, this is equivalent to what the reachability tracker would compute by composing identity summaries along internal paths. + +### Initial capability accounting + +During `initialize()`, each member reports `+peers` capabilities at `T::minimum()` on its output ports. +The group transfers ALL members' initial capabilities to the group's `SharedProgress`, mapped through the capability map. +Members' initial internals are then cleared to prevent double-counting. + +This is necessary because each member independently drops its initial capability during execution, producing `(-peers)` changes that flow through the capability map. +If only one member's `+peers` were reported, the tracker would go negative. + +## Composed summary + +The group's `internal_summary` describes which group outputs are reachable from which group inputs. +For each group input, the scheduler finds which member output ports are reachable (via the member's own summary), then follows the capability map to group outputs. +If a path exists, the summary entry is the identity; otherwise no entry exists. + +This composed summary is used by the reachability tracker to determine implications from the group's sources to downstream operators. + +## What does not fuse + +Several classes of operators are excluded: + +* **Frontier-observing operators** (`notify == true`): `inspect`, `unary_frontier`, and any operator that requests notifications. + These need intra-group frontier propagation, which the group scheduler does not implement. +* **Operators with non-identity summaries**: the `Feedback` operator increments a loop counter coordinate. + Fusing it would require the group to transform timestamps along internal paths. +* **Exchange-pact operators**: data moves between workers through channels outside the group scheduler's control. +* **Operators in iteration scopes**: the nested timestamp structure typically involves non-identity summaries at scope boundaries. + +In practice, the operators that fuse are the "glue" operators: `map`, `flat_map`, `filter`, `Enter`, `Leave`, `Concatenate`, and similar single-purpose transformations. +In differential dataflow's BFS, fusion merges groups like `[Enter, Concatenate, Negate, AsCollection, Concatenate, ResultsIn]` into single scheduling units. + +## Configuration + +Fusion is controlled by `WorkerConfig::fuse_chain_length(n)`: + +* `n >= 2` (default): fuse groups of at least `n` members. +* `n == 0` or `n == 1`: disable fusion entirely. + +From the command line, pass `--fuse-chain-length N` to any timely program that uses `execute_from_args`.