Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 87 additions & 48 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::cmp::Reverse;

use columnar::{Vecs, Index as ColumnarIndex};

use crate::progress::Timestamp;
use crate::progress::{Source, Target};
use crate::progress::ChangeBatch;
Expand All @@ -85,6 +87,19 @@
use crate::progress::frontier::MutableAntichain;
use crate::progress::timestamp::PathSummary;

/// Build a `Vecs<Vecs<Vec<S>>>` from nested iterators.
///
/// The outer iterator yields nodes, each node yields ports, each port yields data items.
fn build_nested_vecs<S>(nodes: impl Iterator<Item = impl Iterator<Item = impl Iterator<Item = S>>>) -> Vecs<Vecs<Vec<S>>> {
let mut result: Vecs<Vecs<Vec<S>>> = Default::default();
for node in nodes {
for port in node {
result.values.push_iter(port);
}
result.bounds.push(result.values.bounds.len() as u64);
}
result
}

/// A topology builder, which can summarize reachability along paths.
///
Expand Down Expand Up @@ -289,7 +304,7 @@
in_degree.entry(target).or_insert(0);
for (output, summaries) in outputs.iter_ports() {
let source = Location::new_source(index, output);
for summary in summaries.elements().iter() {

Check warning on line 307 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`summary` shadows a previous, unrelated binding
if summary == &Default::default() {
*in_degree.entry(source).or_insert(0) += 1;
}
Expand Down Expand Up @@ -358,25 +373,19 @@
/// alter the potential pointstamps that could arrive at downstream input ports.
pub struct Tracker<T:Timestamp> {

/// Internal connections within hosted operators.
///
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `initialize`, as if we simply appended
/// all of the summaries for the hosted nodes.
nodes: Vec<Connectivity<T::Summary>>,
/// Direct connections from sources to targets.
///
/// Edges do not affect timestamps, so we only need to know the connectivity.
/// Indexed by operator index then output port.
edges: Vec<Vec<Vec<Target>>>,
/// Internal operator connectivity, columnar form of `Vec<Vec<PortConnectivity<T::Summary>>>`.
/// Indexed by `(node, input_port)` to yield `(output_port, summary)` pairs.
nodes: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
/// Edge connectivity, columnar form of `Vec<Vec<Vec<Target>>>`.
/// Indexed by `(node, output_port)` to yield target slices.
edges: Vecs<Vecs<Vec<Target>>>,
Comment on lines +376 to +381
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're flattening this .. what about flattening it further? Instead of nested vecs, have the usual index vec + flat vector, one for each level. I can imagine it'll get unwieldy for no benefit, but maybe worth exploring.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vecs is what you are thinking of! :D

Copy link
Member Author

@frankmcsherry frankmcsherry Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could maybe be clearer as <Vec<Vec<PortConnectivity<T::Summary>>> as Columnar>::Container, but you can't quite get there yet (w/o T::Summary: Columnar>).


// TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
// It seems we should be able to flatten most of these so that there are a few allocations
// independent of the numbers of nodes and ports and such.
//
// TODO: We could also change the internal representation to be a graph of targets, using usize
// identifiers for each, so that internally we needn't use multiple levels of indirection.
// This may make more sense once we commit to topologically ordering the targets.
/// Summaries from each target (operator input) to scope outputs.
/// Indexed by `(node, target_port)` to yield `(scope_output, summary)` pairs.
target_summaries: Vecs<Vecs<Vec<(usize, T::Summary)>>>,
/// Summaries from each source (operator output) to scope outputs.
/// Indexed by `(node, source_port)` to yield `(scope_output, summary)` pairs.
source_summaries: Vecs<Vecs<Vec<(usize, T::Summary)>>>,

/// Each source and target has a mutable antichain to ensure that we track their discrete frontiers,
/// rather than their multiplicities. We separately track the frontiers resulting from propagated
Expand Down Expand Up @@ -437,8 +446,6 @@
pub pointstamps: MutableAntichain<T>,
/// Current implications of active pointstamps across the dataflow.
pub implications: MutableAntichain<T>,
/// Path summaries to each of the scope outputs.
pub output_summaries: PortConnectivity<T::Summary>,
}

impl<T: Timestamp> PortInformation<T> {
Expand All @@ -447,7 +454,6 @@
PortInformation {
pointstamps: MutableAntichain::new(),
implications: MutableAntichain::new(),
output_summaries: PortConnectivity::default(),
}
}

Expand Down Expand Up @@ -511,7 +517,7 @@
pub fn allocate_from(builder: Builder<T>, logger: Option<logging::TrackerLogger<T>>) -> (Self, Connectivity<T::Summary>) {

// Allocate buffer space for each input and input port.
let mut per_operator =
let per_operator =
builder
.shape
.iter()
Expand All @@ -522,7 +528,17 @@
let mut builder_summary = vec![PortConnectivity::default(); builder.shape[0].1];

// Compile summaries from each location to each scope output.
// Collect into per-node, per-port buckets for flattening.
let output_summaries = summarize_outputs::<T>(&builder.nodes, &builder.edges);

// Temporary storage: target_sum[node][port] and source_sum[node][port].
let mut target_sum: Vec<Vec<PortConnectivity<T::Summary>>> = builder.shape.iter()
.map(|&(inputs, _)| vec![PortConnectivity::default(); inputs])
.collect();
let mut source_sum: Vec<Vec<PortConnectivity<T::Summary>>> = builder.shape.iter()
.map(|&(_, outputs)| vec![PortConnectivity::default(); outputs])
.collect();

for (location, summaries) in output_summaries.into_iter() {
// Summaries from scope inputs are useful in summarizing the scope.
if location.node == 0 {
Expand All @@ -537,22 +553,54 @@
else {
match location.port {
Port::Target(port) => {
per_operator[location.node].targets[port].output_summaries = summaries;
target_sum[location.node][port] = summaries;
},
Port::Source(port) => {
per_operator[location.node].sources[port].output_summaries = summaries;
source_sum[location.node][port] = summaries;
},
}
}
}

// Build columnar nodes: Vecs<Vecs<Vec<(usize, T::Summary)>>>.
let nodes = build_nested_vecs(builder.nodes.iter().map(|connectivity| {
connectivity.iter().map(|port_conn| {
port_conn.iter_ports().flat_map(|(port, antichain)| {
antichain.elements().iter().map(move |s| (port, s.clone()))
})
})
}));

// Build columnar edges: Vecs<Vecs<Vec<Target>>>.
let edges = build_nested_vecs(builder.edges.iter().map(|node_edges| {
node_edges.iter().map(|port_edges| port_edges.iter().cloned())
}));

// Build columnar target and source summaries.
let target_summaries = build_nested_vecs(target_sum.iter().map(|ports| {
ports.iter().map(|port_conn| {
port_conn.iter_ports().flat_map(|(port, antichain)| {
antichain.elements().iter().map(move |s| (port, s.clone()))
})
})
}));
let source_summaries = build_nested_vecs(source_sum.iter().map(|ports| {
ports.iter().map(|port_conn| {
port_conn.iter_ports().flat_map(|(port, antichain)| {
antichain.elements().iter().map(move |s| (port, s.clone()))
})
})
}));

let scope_outputs = builder.shape[0].0;
let output_changes = vec![ChangeBatch::new(); scope_outputs];

let tracker =
Tracker {
nodes: builder.nodes,
edges: builder.edges,
nodes,
edges,
target_summaries,
source_summaries,
per_operator,
target_changes: ChangeBatch::new(),
source_changes: ChangeBatch::new(),
Expand Down Expand Up @@ -607,13 +655,10 @@

for (time, diff) in changes {
self.total_counts += diff;
for (output, summaries) in operator.output_summaries.iter_ports() {
let output_changes = &mut self.output_changes[output];
summaries
.elements()
.iter()
.flat_map(|summary| summary.results_in(&time))
.for_each(|out_time| output_changes.update(out_time, diff));
for &(output, ref summary) in (&self.target_summaries).get(target.node).get(target.port).into_index_iter() {
if let Some(out_time) = summary.results_in(&time) {
self.output_changes[output].update(out_time, diff);
}
}
self.worklist.push(Reverse((time, Location::from(target), diff)));
}
Expand All @@ -631,13 +676,10 @@
for (time, diff) in changes {
self.total_counts += diff;
operator.cap_counts += diff;
for (output, summaries) in op_source.output_summaries.iter_ports() {
let output_changes = &mut self.output_changes[output];
summaries
.elements()
.iter()
.flat_map(|summary| summary.results_in(&time))
.for_each(|out_time| output_changes.update(out_time, diff));
for &(output, ref summary) in (&self.source_summaries).get(source.node).get(source.port).into_index_iter() {
if let Some(out_time) = summary.results_in(&time) {
self.output_changes[output].update(out_time, diff);
}
}
self.worklist.push(Reverse((time, Location::from(source), diff)));
}
Expand Down Expand Up @@ -670,14 +712,11 @@
.implications
.update_iter(Some((time, diff)));

for (time, diff) in changes {

Check warning on line 715 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`diff` shadows a previous, unrelated binding

Check warning on line 715 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
let nodes = &self.nodes[location.node][port_index];
for (output_port, summaries) in nodes.iter_ports() {
let source = Location { node: location.node, port: Port::Source(output_port) };
for summary in summaries.elements().iter() {
if let Some(new_time) = summary.results_in(&time) {
self.worklist.push(Reverse((new_time, source, diff)));
}
for &(output_port, ref summary) in (&self.nodes).get(location.node).get(port_index).into_index_iter() {
if let Some(new_time) = summary.results_in(&time) {
let source = Location { node: location.node, port: Port::Source(output_port) };
self.worklist.push(Reverse((new_time, source, diff)));
}
}
self.pushed_changes.update((location, time), diff);
Expand All @@ -693,8 +732,8 @@
.implications
.update_iter(Some((time, diff)));

for (time, diff) in changes {

Check warning on line 735 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`diff` shadows a previous, unrelated binding

Check warning on line 735 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
for new_target in self.edges[location.node][port_index].iter() {
for new_target in (&self.edges).get(location.node).get(port_index).into_index_iter() {
self.worklist.push(Reverse((
time.clone(),
Location::from(*new_target),
Expand Down
Loading