From 500dabc1c23cd58383da9d03ee23e4e88048ccae Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 7 Mar 2026 16:46:08 -0500 Subject: [PATCH 1/4] Demonstrate lazy scheduling --- timely/src/progress/subgraph.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 7a0a662c6..db7fdb185 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -198,6 +198,8 @@ where activations.borrow_mut().activate(&self.path[..]); + let notify_me: bool = self.children.iter().any(|c| c.notify); + Subgraph { name: self.name, path: self.path, @@ -221,6 +223,7 @@ where scope_summary, progress_mode: worker.config().progress_mode, + notify_me, } } } @@ -273,6 +276,8 @@ where scope_summary: Connectivity, progress_mode: ProgressMode, + + notify_me: bool, } impl Schedule for Subgraph @@ -524,7 +529,9 @@ where .iter() .any(|((location, time), diff)| // Must publish scope-wide visible subtractions. - tracker.is_global(*location, time) && *diff < 0 + tracker.is_global(*location, time) && *diff < 0 || + // Must confirm the receipt of inbound messages. + location.node == 0 ) }; @@ -586,6 +593,8 @@ where // Return summaries and shared progress information. (internal_summary, Rc::clone(&self.shared_progress), self) } + + fn notify_me(&self) -> bool { self.notify_me } } struct PerOperatorState { @@ -619,7 +628,7 @@ impl PerOperatorState { index: 0, id: usize::MAX, local: false, - notify: true, + notify: false, inputs, outputs, From df334ab6c86ede1b0b73d4f51ee3e4f968d26d1c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 9 Mar 2026 06:19:20 -0400 Subject: [PATCH 2/4] Add FrontierInterest with IfCapability variant --- timely/examples/event_driven.rs | 17 ++++++---- timely/src/dataflow/operators/core/input.rs | 3 +- .../operators/core/unordered_input.rs | 3 +- .../dataflow/operators/generic/builder_raw.rs | 10 +++--- .../dataflow/operators/generic/builder_rc.rs | 8 +++-- timely/src/progress/operate.rs | 13 +++++++- timely/src/progress/reachability.rs | 17 +++++++--- timely/src/progress/subgraph.rs | 32 +++++++++++-------- 8 files changed, 68 insertions(+), 35 deletions(-) diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index 2cfb39e1f..7a392d54e 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -1,4 +1,5 @@ -use timely::dataflow::operators::{Input, Probe}; +use timely::dataflow::Scope; +use timely::dataflow::operators::{Input, Probe, Enter, Leave}; use timely::dataflow::operators::vec::Map; fn main() { @@ -20,10 +21,14 @@ fn main() { // create a new input, exchange data, and inspect its output for _dataflow in 0 .. dataflows { worker.dataflow(|scope| { - let (input, mut stream) = scope.new_input(); - for _step in 0 .. length { - stream = stream.map(|x: ()| x); - } + let (input, stream) = scope.new_input(); + let stream = scope.region(|inner| { + let mut stream = stream.enter(inner); + for _step in 0 .. length { + stream = stream.map(|x: ()| x); + } + stream.leave() + }); let (probe, _stream) = stream.probe(); inputs.push(input); probes.push(probe); @@ -43,7 +48,7 @@ fn main() { worker.step(); steps += 1; } - println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); + if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); } } }).unwrap(); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index fe530c49a..853ab5583 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -199,6 +199,7 @@ impl Schedule for Operator { } } +use crate::progress::operate::FrontierInterest; impl Operate for Operator { fn inputs(&self) -> usize { 0 } @@ -209,7 +210,7 @@ impl Operate for Operator { (Vec::new(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> bool { false } + fn notify_me(&self) -> FrontierInterest { FrontierInterest::Never } } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 0ed84cef2..b30fbf09f 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -129,6 +129,7 @@ impl Schedule for UnorderedOperator { } } +use crate::progress::operate::FrontierInterest; impl Operate for UnorderedOperator { fn inputs(&self) -> usize { 0 } fn outputs(&self) -> usize { 1 } @@ -140,7 +141,7 @@ impl Operate for UnorderedOperator { (Vec::new(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> bool { false } + fn notify_me(&self) -> FrontierInterest { FrontierInterest::Never } } /// A handle to an input [Stream], used to introduce data to a timely dataflow computation. diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 2c735065c..038acf6c3 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations}; use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; -use crate::progress::operate::{Connectivity, PortConnectivity}; +use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity}; use crate::Container; use crate::dataflow::{Stream, Scope}; use crate::dataflow::channels::pushers::Tee; @@ -23,7 +23,7 @@ use crate::dataflow::operators::generic::operator_info::OperatorInfo; #[derive(Debug)] pub struct OperatorShape { name: String, // A meaningful name for the operator. - notify: bool, // Does the operator require progress notifications. + notify: FrontierInterest, // Does the operator require progress notifications. peers: usize, // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude. inputs: usize, // The number of input ports. outputs: usize, // The number of output ports. @@ -34,7 +34,7 @@ impl OperatorShape { fn new(name: String, peers: usize) -> Self { OperatorShape { name, - notify: true, + notify: FrontierInterest::Always, peers, inputs: 0, outputs: 0, @@ -89,7 +89,7 @@ impl OperatorBuilder { pub fn shape(&self) -> &OperatorShape { &self.shape } /// Indicates whether the operator requires frontier information. - pub fn set_notify(&mut self, notify: bool) { self.shape.notify = notify; } + pub fn set_notify(&mut self, notify: FrontierInterest) { self.shape.notify = notify; } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: Stream, pact: P) -> P::Puller @@ -220,5 +220,5 @@ where (self.summary.clone(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> bool { self.shape.notify } + fn notify_me(&self) -> FrontierInterest { self.shape.notify } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 522819349..f499d481d 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -18,7 +18,7 @@ use crate::dataflow::operators::capability::Capability; use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle}; use crate::dataflow::operators::generic::operator_info::OperatorInfo; use crate::dataflow::operators::generic::builder_raw::OperatorShape; -use crate::progress::operate::PortConnectivity; +use crate::progress::operate::{FrontierInterest, PortConnectivity}; use super::builder_raw::OperatorBuilder as OperatorBuilderRaw; @@ -49,7 +49,11 @@ impl OperatorBuilder { } /// Indicates whether the operator requires frontier information. - pub fn set_notify(&mut self, notify: bool) { self.builder.set_notify(notify); } + pub fn set_notify(&mut self, notify: bool) { + let interest = if notify { FrontierInterest::Always } else { FrontierInterest::Never }; + self.builder.set_notify(interest); + + } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: Stream, pact: P) -> InputHandleCore diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 40ee2d9d4..68ad3b813 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -60,7 +60,18 @@ pub trait Operate { /// This value is conservatively set to `true`, but operators that know they are oblivious to /// frontier information can indicate this with `false`, and they will not be scheduled on the /// basis of their input frontiers changing. - fn notify_me(&self) -> bool { true } + fn notify_me(&self) -> FrontierInterest { FrontierInterest::Always } +} + +/// The ways in which an operator can express interest in activation when an input frontier changes. +#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)] +pub enum FrontierInterest { + /// Never interested in frontier changes, as for example the `map()` and `filter()` operators. + Never, + /// Interested when the operator holds capabilities. + IfCapability, + /// Always interested in frontier changes, as for example the `probe()` and `capture()` operators. + Always, } /// Operator internal connectivity, from inputs to outputs. diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 2576d67f1..44d906094 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -38,6 +38,7 @@ //! let mut results = //! tracker //! .pushed() +//! .0 //! .drain() //! .filter(|((location, time), delta)| location.is_target()) //! .collect::>(); @@ -60,6 +61,7 @@ //! let mut results = //! tracker //! .pushed() +//! .0 //! .drain() //! .filter(|((location, time), delta)| location.is_target()) //! .collect::>(); @@ -413,6 +415,8 @@ pub struct PerOperator { pub targets: Vec>, /// Port information for each source. pub sources: Vec>, + /// Sum across outputs of capabilities. + pub cap_counts: i64, } impl PerOperator { @@ -421,6 +425,7 @@ impl PerOperator { PerOperator { targets: vec![PortInformation::new(); inputs], sources: vec![PortInformation::new(); outputs], + cap_counts: 0, } } } @@ -618,13 +623,15 @@ impl Tracker { while let Some(((source, _), _)) = source_changes.peek() { let source = *source; - let operator = &mut self.per_operator[source.node].sources[source.port]; + let operator = &mut self.per_operator[source.node]; + let op_source = &mut operator.sources[source.port]; let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff)); - let changes = operator.pointstamps.update_iter(source_updates); + let changes = op_source.pointstamps.update_iter(source_updates); for (time, diff) in changes { self.total_counts += diff; - for (output, summaries) in operator.output_summaries.iter_ports() { + operator.cap_counts += diff; + for (output, summaries) in op_source.output_summaries.iter_ports() { let output_changes = &mut self.output_changes[output]; summaries .elements() @@ -708,8 +715,8 @@ impl Tracker { } /// A mutable reference to the pushed results of changes. - pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> { - &mut self.pushed_changes + pub fn pushed(&mut self) -> (&mut ChangeBatch<(Location, T)>, &[PerOperator]) { + (&mut self.pushed_changes, &self.per_operator) } /// Reveals per-operator frontier state. diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index db7fdb185..8c820bec8 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -19,7 +19,7 @@ use crate::scheduling::activate::Activations; use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter}; use crate::progress::{Timestamp, Operate, operate::SharedProgress}; use crate::progress::{Location, Port, Source, Target}; -use crate::progress::operate::{Connectivity, PortConnectivity}; +use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity}; use crate::progress::ChangeBatch; use crate::progress::broadcast::Progcaster; use crate::progress::reachability; @@ -198,7 +198,7 @@ where activations.borrow_mut().activate(&self.path[..]); - let notify_me: bool = self.children.iter().any(|c| c.notify); + let notify_me: FrontierInterest = self.children.iter().map(|c| c.notify).max().unwrap(); Subgraph { name: self.name, @@ -277,7 +277,7 @@ where progress_mode: ProgressMode, - notify_me: bool, + notify_me: FrontierInterest, } impl Schedule for Subgraph @@ -474,16 +474,20 @@ where self.pointstamp_tracker.propagate_all(); // Drain propagated information into shared progress structure. - for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() { + let (pushed, operators) = self.pointstamp_tracker.pushed(); + for ((location, time), diff) in pushed.drain() { self.maybe_shutdown.push(location.node); // Targets are actionable, sources are not. if let crate::progress::Port::Target(port) = location.port { - if self.children[location.node].notify { - self.temp_active.push(Reverse(location.node)); - } - // TODO: This logic could also be guarded by `.notify`, but - // we want to be a bit careful to make sure all related logic - // agrees with this (e.g. initialization, operator logic, etc.) + // Activate based on expressed frontier interest. + let activate = match self.children[location.node].notify { + FrontierInterest::Always => true, + FrontierInterest::IfCapability => { operators[location.node].cap_counts > 0 } + FrontierInterest::Never => false, + }; + if activate { self.temp_active.push(Reverse(location.node)); } + + // Keep this current independent of the interest. self.children[location.node] .shared_progress .borrow_mut() @@ -498,7 +502,7 @@ where for child_index in self.maybe_shutdown.drain(..) { let child_state = self.pointstamp_tracker.node_state(child_index); let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty()); - let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty()); + let no_capabilities = child_state.cap_counts == 0; if frontiers_empty && no_capabilities { self.temp_active.push(Reverse(child_index)); } @@ -594,7 +598,7 @@ where (internal_summary, Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> bool { self.notify_me } + fn notify_me(&self) -> FrontierInterest { self.notify_me } } struct PerOperatorState { @@ -604,7 +608,7 @@ struct PerOperatorState { id: usize, // worker-unique identifier local: bool, // indicates whether the operator will exchange data or not - notify: bool, + notify: FrontierInterest, inputs: usize, // number of inputs to the operator outputs: usize, // number of outputs from the operator @@ -628,7 +632,7 @@ impl PerOperatorState { index: 0, id: usize::MAX, local: false, - notify: false, + notify: FrontierInterest::IfCapability, inputs, outputs, From 2f2e1b6d6d567bb73a40b57075eaf8695d1eb24a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 13 Mar 2026 04:40:15 -0400 Subject: [PATCH 3/4] Move FrontierInterest to be per-input --- timely/src/dataflow/operators/core/input.rs | 2 +- .../operators/core/unordered_input.rs | 2 +- .../dataflow/operators/generic/builder_raw.rs | 20 ++++++++++++++----- .../dataflow/operators/generic/builder_rc.rs | 6 +++++- timely/src/progress/operate.rs | 10 +++++----- timely/src/progress/subgraph.rs | 19 +++++++++++------- 6 files changed, 39 insertions(+), 20 deletions(-) diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index 853ab5583..da5adcfd8 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -210,7 +210,7 @@ impl Operate for Operator { (Vec::new(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> FrontierInterest { FrontierInterest::Never } + fn notify_me(&self) -> Vec { Vec::new() } } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index b30fbf09f..cfb3b58d2 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -141,7 +141,7 @@ impl Operate for UnorderedOperator { (Vec::new(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> FrontierInterest { FrontierInterest::Never } + fn notify_me(&self) -> Vec { Vec::new() } } /// A handle to an input [Stream], used to introduce data to a timely dataflow computation. diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 038acf6c3..5baa0edc7 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -23,7 +23,7 @@ use crate::dataflow::operators::generic::operator_info::OperatorInfo; #[derive(Debug)] pub struct OperatorShape { name: String, // A meaningful name for the operator. - notify: FrontierInterest, // Does the operator require progress notifications. + notify: Vec, // Per-input frontier interest. peers: usize, // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude. inputs: usize, // The number of input ports. outputs: usize, // The number of output ports. @@ -34,7 +34,7 @@ impl OperatorShape { fn new(name: String, peers: usize) -> Self { OperatorShape { name, - notify: FrontierInterest::Always, + notify: Vec::new(), peers, inputs: 0, outputs: 0, @@ -88,8 +88,17 @@ impl OperatorBuilder { /// Return a reference to the operator's shape pub fn shape(&self) -> &OperatorShape { &self.shape } - /// Indicates whether the operator requires frontier information. - pub fn set_notify(&mut self, notify: FrontierInterest) { self.shape.notify = notify; } + /// Sets frontier interest for all inputs. + pub fn set_notify(&mut self, notify: FrontierInterest) { + for n in self.shape.notify.iter_mut() { + *n = notify; + } + } + + /// Sets frontier interest for a specific input. + pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) { + self.shape.notify[input] = notify; + } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: Stream, pact: P) -> P::Puller @@ -113,6 +122,7 @@ impl OperatorBuilder { stream.connect_to(target, sender, channel_id); self.shape.inputs += 1; + self.shape.notify.push(FrontierInterest::Always); let connectivity: PortConnectivity<_> = connection.into_iter().collect(); assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs)); self.summary.push(connectivity); @@ -220,5 +230,5 @@ where (self.summary.clone(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> FrontierInterest { self.shape.notify } + fn notify_me(&self) -> Vec { self.shape.notify.clone() } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index f499d481d..523ad3232 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -48,11 +48,15 @@ impl OperatorBuilder { } } - /// Indicates whether the operator requires frontier information. + /// Sets frontier interest for all inputs. pub fn set_notify(&mut self, notify: bool) { let interest = if notify { FrontierInterest::Always } else { FrontierInterest::Never }; self.builder.set_notify(interest); + } + /// Sets frontier interest for a specific input. + pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) { + self.builder.set_notify_for(input, notify); } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 68ad3b813..8f95ad4be 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -55,12 +55,12 @@ pub trait Operate { /// safely "create" capabilities without basing them on other, prior capabilities. fn initialize(self: Box) -> (Connectivity, Rc>>, Box); - /// Indicates if the operator should be invoked on the basis of input frontier transitions. + /// Indicates for each input whether the operator should be invoked when that input's frontier changes. /// - /// This value is conservatively set to `true`, but operators that know they are oblivious to - /// frontier information can indicate this with `false`, and they will not be scheduled on the - /// basis of their input frontiers changing. - fn notify_me(&self) -> FrontierInterest { FrontierInterest::Always } + /// Returns a `Vec` with one entry per input. Each entry describes whether + /// frontier changes on that input should cause the operator to be scheduled. The conservative + /// default is `Always` for each input. + fn notify_me(&self) -> Vec { vec![FrontierInterest::Always; self.inputs()] } } /// The ways in which an operator can express interest in activation when an input frontier changes. diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 8c820bec8..1ff5cc5d3 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -198,7 +198,12 @@ where activations.borrow_mut().activate(&self.path[..]); - let notify_me: FrontierInterest = self.children.iter().map(|c| c.notify).max().unwrap(); + // The subgraph's per-input interest is conservatively the max across all children's inputs. + let max_interest = self.children.iter() + .flat_map(|c| c.notify.iter().copied()) + .max() + .unwrap_or(FrontierInterest::Never); + let notify_me: Vec = vec![max_interest; inputs]; Subgraph { name: self.name, @@ -277,7 +282,7 @@ where progress_mode: ProgressMode, - notify_me: FrontierInterest, + notify_me: Vec, } impl Schedule for Subgraph @@ -479,8 +484,8 @@ where self.maybe_shutdown.push(location.node); // Targets are actionable, sources are not. if let crate::progress::Port::Target(port) = location.port { - // Activate based on expressed frontier interest. - let activate = match self.children[location.node].notify { + // Activate based on expressed frontier interest for this input. + let activate = match self.children[location.node].notify[port] { FrontierInterest::Always => true, FrontierInterest::IfCapability => { operators[location.node].cap_counts > 0 } FrontierInterest::Never => false, @@ -598,7 +603,7 @@ where (internal_summary, Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> FrontierInterest { self.notify_me } + fn notify_me(&self) -> Vec { self.notify_me.clone() } } struct PerOperatorState { @@ -608,7 +613,7 @@ struct PerOperatorState { id: usize, // worker-unique identifier local: bool, // indicates whether the operator will exchange data or not - notify: FrontierInterest, + notify: Vec, inputs: usize, // number of inputs to the operator outputs: usize, // number of outputs from the operator @@ -632,7 +637,7 @@ impl PerOperatorState { index: 0, id: usize::MAX, local: false, - notify: FrontierInterest::IfCapability, + notify: vec![FrontierInterest::IfCapability; inputs], inputs, outputs, From 88a0116c0118fc6de8b065a52fb0296fb438ffc2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 13 Mar 2026 09:13:49 -0400 Subject: [PATCH 4/4] Remove default notify_me impl; streamline setting the value --- timely/src/dataflow/operators/core/concat.rs | 2 +- timely/src/dataflow/operators/core/feedback.rs | 2 +- timely/src/dataflow/operators/core/input.rs | 2 +- timely/src/dataflow/operators/core/ok_err.rs | 2 +- timely/src/dataflow/operators/core/partition.rs | 2 +- timely/src/dataflow/operators/core/unordered_input.rs | 2 +- timely/src/dataflow/operators/generic/builder_raw.rs | 9 +-------- timely/src/dataflow/operators/generic/builder_rc.rs | 6 ------ timely/src/dataflow/operators/generic/operator.rs | 6 +++--- timely/src/dataflow/operators/vec/branch.rs | 4 ++-- timely/src/progress/operate.rs | 2 +- timely/src/progress/subgraph.rs | 4 ++-- 12 files changed, 15 insertions(+), 28 deletions(-) diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index a9813de81..e626897d6 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -63,10 +63,10 @@ impl Concatenate for G { // create an operator builder. use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone()); - builder.set_notify(false); // create new input handles for each input stream. let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::>(); + for i in 0 .. handles.len() { builder.set_notify_for(i, crate::progress::operate::FrontierInterest::Never); } // create one output handle for the concatenated results. let (mut output, result) = builder.new_output(); diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index e13a22426..290b7fb39 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -73,7 +73,6 @@ impl Feedback for G { fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); - builder.set_notify(false); let (output, stream) = builder.new_output(); (Handle { builder, summary, output }, stream) @@ -118,6 +117,7 @@ impl ConnectLoop for Stream { let mut output = handle.output; let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index da5adcfd8..c7f1aa064 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -210,7 +210,7 @@ impl Operate for Operator { (Vec::new(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> Vec { Vec::new() } + fn notify_me(&self) -> &[FrontierInterest] { &[] } } diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 7e7d983a9..c4e239756 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -52,9 +52,9 @@ impl OkErr for Stream { L: FnMut(C::Item<'_>) -> Result+'static { let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope()); - builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let (output1, stream1) = builder.new_output(); let (output2, stream2) = builder.new_output(); diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index b2ac992e7..c9acdb6c1 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -41,9 +41,9 @@ impl Partition for Stream { F: FnMut(C::Item<'_>) -> (u64, D2) + 'static, { let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); - builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let mut outputs = Vec::with_capacity(parts as usize); let mut streams = Vec::with_capacity(parts as usize); diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index cfb3b58d2..6bfee175b 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -141,7 +141,7 @@ impl Operate for UnorderedOperator { (Vec::new(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> Vec { Vec::new() } + fn notify_me(&self) -> &[FrontierInterest] { &[] } } /// A handle to an input [Stream], used to introduce data to a timely dataflow computation. diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 5baa0edc7..7839241bc 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -88,13 +88,6 @@ impl OperatorBuilder { /// Return a reference to the operator's shape pub fn shape(&self) -> &OperatorShape { &self.shape } - /// Sets frontier interest for all inputs. - pub fn set_notify(&mut self, notify: FrontierInterest) { - for n in self.shape.notify.iter_mut() { - *n = notify; - } - } - /// Sets frontier interest for a specific input. pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) { self.shape.notify[input] = notify; @@ -230,5 +223,5 @@ where (self.summary.clone(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> Vec { self.shape.notify.clone() } + fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 523ad3232..2cfc39894 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -48,12 +48,6 @@ impl OperatorBuilder { } } - /// Sets frontier interest for all inputs. - pub fn set_notify(&mut self, notify: bool) { - let interest = if notify { FrontierInterest::Always } else { FrontierInterest::Never }; - self.builder.set_notify(interest); - } - /// Sets frontier interest for a specific input. pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) { self.builder.set_notify_for(input, notify); diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 9a4319437..c7fa28b15 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -373,9 +373,9 @@ impl Operator for Stream { let operator_info = builder.operator_info(); let mut input = builder.new_input(self, pact); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let (output, stream) = builder.new_output(); let mut output = OutputBuilder::from(output); - builder.set_notify(false); builder.build(move |mut capabilities| { // `capabilities` should be a single-element vector. @@ -461,9 +461,10 @@ impl Operator for Stream { let mut input1 = builder.new_input(self, pact1); let mut input2 = builder.new_input(other, pact2); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); + builder.set_notify_for(1, crate::progress::operate::FrontierInterest::Never); let (output, stream) = builder.new_output(); let mut output = OutputBuilder::from(output); - builder.set_notify(false); builder.build(move |mut capabilities| { // `capabilities` should be a single-element vector. @@ -547,7 +548,6 @@ where let (output, stream) = builder.new_output(); let mut output = OutputBuilder::from(output); - builder.set_notify(false); builder.build(move |mut capabilities| { // `capabilities` should be a single-element vector. diff --git a/timely/src/dataflow/operators/vec/branch.rs b/timely/src/dataflow/operators/vec/branch.rs index 62bb427b3..ce6ff785f 100644 --- a/timely/src/dataflow/operators/vec/branch.rs +++ b/timely/src/dataflow/operators/vec/branch.rs @@ -41,9 +41,9 @@ impl Branch for StreamVec { condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, ) -> (StreamVec, StreamVec) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); - builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let (output1, stream1) = builder.new_output(); let (output2, stream2) = builder.new_output(); @@ -102,9 +102,9 @@ pub trait BranchWhen: Sized { impl BranchWhen for Stream { fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); - builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let (output1, stream1) = builder.new_output(); let (output2, stream2) = builder.new_output(); diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 8f95ad4be..6c3f12955 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -60,7 +60,7 @@ pub trait Operate { /// Returns a `Vec` with one entry per input. Each entry describes whether /// frontier changes on that input should cause the operator to be scheduled. The conservative /// default is `Always` for each input. - fn notify_me(&self) -> Vec { vec![FrontierInterest::Always; self.inputs()] } + fn notify_me(&self) -> &[FrontierInterest];// { &vec![FrontierInterest::Always; self.inputs()] } } /// The ways in which an operator can express interest in activation when an input frontier changes. diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 1ff5cc5d3..e11011368 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -603,7 +603,7 @@ where (internal_summary, Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> Vec { self.notify_me.clone() } + fn notify_me(&self) -> &[FrontierInterest] { &self.notify_me } } struct PerOperatorState { @@ -661,7 +661,7 @@ impl PerOperatorState { let local = scope.local(); let inputs = scope.inputs(); let outputs = scope.outputs(); - let notify = scope.notify_me(); + let notify = scope.notify_me().to_vec(); let (internal_summary, shared_progress, operator) = scope.initialize();