From b2f95018b69636308dba6c15d7d04f75d84bbcfc Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 13 Mar 2026 05:40:24 -0400 Subject: [PATCH 1/2] Convert Tracker structs to columnar --- timely/src/progress/reachability.rs | 144 +++++++++++++++++----------- 1 file changed, 90 insertions(+), 54 deletions(-) diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 44d906094..8aafca9d1 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -77,6 +77,8 @@ use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::cmp::Reverse; +use columnar::{Vecs, Index as ColumnarIndex}; + use crate::progress::Timestamp; use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; @@ -85,6 +87,19 @@ use crate::progress::operate::{Connectivity, PortConnectivity}; use crate::progress::frontier::MutableAntichain; use crate::progress::timestamp::PathSummary; +/// Build a `Vecs>>` from nested iterators. +/// +/// The outer iterator yields nodes, each node yields ports, each port yields data items. +fn build_nested_vecs(nodes: impl Iterator>>) -> Vecs>> { + let mut result: Vecs>> = Default::default(); + for node in nodes { + for port in node { + result.values.push_iter(port); + } + result.bounds.push(result.values.bounds.len() as u64); + } + result +} /// A topology builder, which can summarize reachability along paths. /// @@ -358,25 +373,21 @@ impl Default for Builder { /// alter the potential pointstamps that could arrive at downstream input ports. pub struct Tracker { - /// Internal connections within hosted operators. - /// - /// Indexed by operator index, then input port, then output port. This is the - /// same format returned by `initialize`, as if we simply appended - /// all of the summaries for the hosted nodes. - nodes: Vec>, - /// Direct connections from sources to targets. - /// - /// Edges do not affect timestamps, so we only need to know the connectivity. - /// Indexed by operator index then output port. - edges: Vec>>, - - // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`). - // It seems we should be able to flatten most of these so that there are a few allocations - // independent of the numbers of nodes and ports and such. - // - // TODO: We could also change the internal representation to be a graph of targets, using usize - // identifiers for each, so that internally we needn't use multiple levels of indirection. - // This may make more sense once we commit to topologically ordering the targets. + /// Internal connections within hosted operators, flattened from `Vec>>`. + /// Indexed by `(node, input_port)` to yield `(output_port, summary)` pairs. + /// Internal operator connectivity, columnar form of `Vec>>`. + /// Indexed by `(node, input_port)` to yield `(output_port, summary)` pairs. + nodes: Vecs>>, + /// Edge connectivity, columnar form of `Vec>>`. + /// Indexed by `(node, output_port)` to yield target slices. + edges: Vecs>>, + + /// Summaries from each target (operator input) to scope outputs. + /// Indexed by `(node, target_port)` to yield `(scope_output, summary)` pairs. + target_summaries: Vecs>>, + /// Summaries from each source (operator output) to scope outputs. + /// Indexed by `(node, source_port)` to yield `(scope_output, summary)` pairs. + source_summaries: Vecs>>, /// Each source and target has a mutable antichain to ensure that we track their discrete frontiers, /// rather than their multiplicities. We separately track the frontiers resulting from propagated @@ -437,8 +448,6 @@ pub struct PortInformation { pub pointstamps: MutableAntichain, /// Current implications of active pointstamps across the dataflow. pub implications: MutableAntichain, - /// Path summaries to each of the scope outputs. - pub output_summaries: PortConnectivity, } impl PortInformation { @@ -447,7 +456,6 @@ impl PortInformation { PortInformation { pointstamps: MutableAntichain::new(), implications: MutableAntichain::new(), - output_summaries: PortConnectivity::default(), } } @@ -511,7 +519,7 @@ impl Tracker { pub fn allocate_from(builder: Builder, logger: Option>) -> (Self, Connectivity) { // Allocate buffer space for each input and input port. - let mut per_operator = + let per_operator = builder .shape .iter() @@ -522,37 +530,74 @@ impl Tracker { let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1]; // Compile summaries from each location to each scope output. + // Collect into per-node, per-port buckets for flattening. let output_summaries = summarize_outputs::(&builder.nodes, &builder.edges); + + // Temporary storage: target_sum[node][port] and source_sum[node][port]. + let mut target_sum: Vec>> = builder.shape.iter() + .map(|&(inputs, _)| vec![PortConnectivity::default(); inputs]) + .collect(); + let mut source_sum: Vec>> = builder.shape.iter() + .map(|&(_, outputs)| vec![PortConnectivity::default(); outputs]) + .collect(); + for (location, summaries) in output_summaries.into_iter() { - // Summaries from scope inputs are useful in summarizing the scope. if location.node == 0 { if let Port::Source(port) = location.port { builder_summary[port] = summaries; } - else { - // Ignore (ideally trivial) output to output summaries. - } } - // Summaries from internal nodes are important for projecting capabilities. else { match location.port { Port::Target(port) => { - per_operator[location.node].targets[port].output_summaries = summaries; + target_sum[location.node][port] = summaries; }, Port::Source(port) => { - per_operator[location.node].sources[port].output_summaries = summaries; + source_sum[location.node][port] = summaries; }, } } } + // Build columnar nodes: Vecs>>. + let nodes = build_nested_vecs(builder.nodes.iter().map(|connectivity| { + connectivity.iter().map(|port_conn| { + port_conn.iter_ports().flat_map(|(port, antichain)| { + antichain.elements().iter().map(move |s| (port, s.clone())) + }) + }) + })); + + // Build columnar edges: Vecs>>. + let edges = build_nested_vecs(builder.edges.iter().map(|node_edges| { + node_edges.iter().map(|port_edges| port_edges.iter().cloned()) + })); + + // Build columnar target and source summaries. + let target_summaries = build_nested_vecs(target_sum.iter().map(|ports| { + ports.iter().map(|port_conn| { + port_conn.iter_ports().flat_map(|(port, antichain)| { + antichain.elements().iter().map(move |s| (port, s.clone())) + }) + }) + })); + let source_summaries = build_nested_vecs(source_sum.iter().map(|ports| { + ports.iter().map(|port_conn| { + port_conn.iter_ports().flat_map(|(port, antichain)| { + antichain.elements().iter().map(move |s| (port, s.clone())) + }) + }) + })); + let scope_outputs = builder.shape[0].0; let output_changes = vec![ChangeBatch::new(); scope_outputs]; let tracker = Tracker { - nodes: builder.nodes, - edges: builder.edges, + nodes, + edges, + target_summaries, + source_summaries, per_operator, target_changes: ChangeBatch::new(), source_changes: ChangeBatch::new(), @@ -607,13 +652,10 @@ impl Tracker { for (time, diff) in changes { self.total_counts += diff; - for (output, summaries) in operator.output_summaries.iter_ports() { - let output_changes = &mut self.output_changes[output]; - summaries - .elements() - .iter() - .flat_map(|summary| summary.results_in(&time)) - .for_each(|out_time| output_changes.update(out_time, diff)); + for &(output, ref summary) in (&self.target_summaries).get(target.node).get(target.port).into_index_iter() { + if let Some(out_time) = summary.results_in(&time) { + self.output_changes[output].update(out_time, diff); + } } self.worklist.push(Reverse((time, Location::from(target), diff))); } @@ -631,13 +673,10 @@ impl Tracker { for (time, diff) in changes { self.total_counts += diff; operator.cap_counts += diff; - for (output, summaries) in op_source.output_summaries.iter_ports() { - let output_changes = &mut self.output_changes[output]; - summaries - .elements() - .iter() - .flat_map(|summary| summary.results_in(&time)) - .for_each(|out_time| output_changes.update(out_time, diff)); + for &(output, ref summary) in (&self.source_summaries).get(source.node).get(source.port).into_index_iter() { + if let Some(out_time) = summary.results_in(&time) { + self.output_changes[output].update(out_time, diff); + } } self.worklist.push(Reverse((time, Location::from(source), diff))); } @@ -671,13 +710,10 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - let nodes = &self.nodes[location.node][port_index]; - for (output_port, summaries) in nodes.iter_ports() { - let source = Location { node: location.node, port: Port::Source(output_port) }; - for summary in summaries.elements().iter() { - if let Some(new_time) = summary.results_in(&time) { - self.worklist.push(Reverse((new_time, source, diff))); - } + for &(output_port, ref summary) in (&self.nodes).get(location.node).get(port_index).into_index_iter() { + if let Some(new_time) = summary.results_in(&time) { + let source = Location { node: location.node, port: Port::Source(output_port) }; + self.worklist.push(Reverse((new_time, source, diff))); } } self.pushed_changes.update((location, time), diff); @@ -694,7 +730,7 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - for new_target in self.edges[location.node][port_index].iter() { + for new_target in (&self.edges).get(location.node).get(port_index).into_index_iter() { self.worklist.push(Reverse(( time.clone(), Location::from(*new_target), From 1311dbb62fd0f87be44d51e13c6f8c00f5f330ab Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 13 Mar 2026 05:46:55 -0400 Subject: [PATCH 2/2] Restore comments --- timely/src/progress/reachability.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 8aafca9d1..93e37668b 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -373,8 +373,6 @@ impl Default for Builder { /// alter the potential pointstamps that could arrive at downstream input ports. pub struct Tracker { - /// Internal connections within hosted operators, flattened from `Vec>>`. - /// Indexed by `(node, input_port)` to yield `(output_port, summary)` pairs. /// Internal operator connectivity, columnar form of `Vec>>`. /// Indexed by `(node, input_port)` to yield `(output_port, summary)` pairs. nodes: Vecs>>, @@ -542,11 +540,16 @@ impl Tracker { .collect(); for (location, summaries) in output_summaries.into_iter() { + // Summaries from scope inputs are useful in summarizing the scope. if location.node == 0 { if let Port::Source(port) = location.port { builder_summary[port] = summaries; } + else { + // Ignore (ideally trivial) output to output summaries. + } } + // Summaries from internal nodes are important for projecting capabilities. else { match location.port { Port::Target(port) => {