diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index 2cfb39e1f..7a392d54e 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -1,4 +1,5 @@ -use timely::dataflow::operators::{Input, Probe}; +use timely::dataflow::Scope; +use timely::dataflow::operators::{Input, Probe, Enter, Leave}; use timely::dataflow::operators::vec::Map; fn main() { @@ -20,10 +21,14 @@ fn main() { // create a new input, exchange data, and inspect its output for _dataflow in 0 .. dataflows { worker.dataflow(|scope| { - let (input, mut stream) = scope.new_input(); - for _step in 0 .. length { - stream = stream.map(|x: ()| x); - } + let (input, stream) = scope.new_input(); + let stream = scope.region(|inner| { + let mut stream = stream.enter(inner); + for _step in 0 .. length { + stream = stream.map(|x: ()| x); + } + stream.leave() + }); let (probe, _stream) = stream.probe(); inputs.push(input); probes.push(probe); @@ -43,7 +48,7 @@ fn main() { worker.step(); steps += 1; } - println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); + if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); } } }).unwrap(); diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index a9813de81..e626897d6 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -63,10 +63,10 @@ impl Concatenate for G { // create an operator builder. use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone()); - builder.set_notify(false); // create new input handles for each input stream. let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::>(); + for i in 0 .. handles.len() { builder.set_notify_for(i, crate::progress::operate::FrontierInterest::Never); } // create one output handle for the concatenated results. let (mut output, result) = builder.new_output(); diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index e13a22426..290b7fb39 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -73,7 +73,6 @@ impl Feedback for G { fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); - builder.set_notify(false); let (output, stream) = builder.new_output(); (Handle { builder, summary, output }, stream) @@ -118,6 +117,7 @@ impl ConnectLoop for Stream { let mut output = handle.output; let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index fe530c49a..c7f1aa064 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -199,6 +199,7 @@ impl Schedule for Operator { } } +use crate::progress::operate::FrontierInterest; impl Operate for Operator { fn inputs(&self) -> usize { 0 } @@ -209,7 +210,7 @@ impl Operate for Operator { (Vec::new(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> bool { false } + fn notify_me(&self) -> &[FrontierInterest] { &[] } } diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 7e7d983a9..c4e239756 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -52,9 +52,9 @@ impl OkErr for Stream { L: FnMut(C::Item<'_>) -> Result+'static { let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope()); - builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let (output1, stream1) = builder.new_output(); let (output2, stream2) = builder.new_output(); diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index b2ac992e7..c9acdb6c1 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -41,9 +41,9 @@ impl Partition for Stream { F: FnMut(C::Item<'_>) -> (u64, D2) + 'static, { let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope()); - builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let mut outputs = Vec::with_capacity(parts as usize); let mut streams = Vec::with_capacity(parts as usize); diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index 0ed84cef2..6bfee175b 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -129,6 +129,7 @@ impl Schedule for UnorderedOperator { } } +use crate::progress::operate::FrontierInterest; impl Operate for UnorderedOperator { fn inputs(&self) -> usize { 0 } fn outputs(&self) -> usize { 1 } @@ -140,7 +141,7 @@ impl Operate for UnorderedOperator { (Vec::new(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> bool { false } + fn notify_me(&self) -> &[FrontierInterest] { &[] } } /// A handle to an input [Stream], used to introduce data to a timely dataflow computation. diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 2c735065c..7839241bc 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations}; use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; -use crate::progress::operate::{Connectivity, PortConnectivity}; +use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity}; use crate::Container; use crate::dataflow::{Stream, Scope}; use crate::dataflow::channels::pushers::Tee; @@ -23,7 +23,7 @@ use crate::dataflow::operators::generic::operator_info::OperatorInfo; #[derive(Debug)] pub struct OperatorShape { name: String, // A meaningful name for the operator. - notify: bool, // Does the operator require progress notifications. + notify: Vec, // Per-input frontier interest. peers: usize, // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude. inputs: usize, // The number of input ports. outputs: usize, // The number of output ports. @@ -34,7 +34,7 @@ impl OperatorShape { fn new(name: String, peers: usize) -> Self { OperatorShape { name, - notify: true, + notify: Vec::new(), peers, inputs: 0, outputs: 0, @@ -88,8 +88,10 @@ impl OperatorBuilder { /// Return a reference to the operator's shape pub fn shape(&self) -> &OperatorShape { &self.shape } - /// Indicates whether the operator requires frontier information. - pub fn set_notify(&mut self, notify: bool) { self.shape.notify = notify; } + /// Sets frontier interest for a specific input. + pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) { + self.shape.notify[input] = notify; + } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: Stream, pact: P) -> P::Puller @@ -113,6 +115,7 @@ impl OperatorBuilder { stream.connect_to(target, sender, channel_id); self.shape.inputs += 1; + self.shape.notify.push(FrontierInterest::Always); let connectivity: PortConnectivity<_> = connection.into_iter().collect(); assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs)); self.summary.push(connectivity); @@ -220,5 +223,5 @@ where (self.summary.clone(), Rc::clone(&self.shared_progress), self) } - fn notify_me(&self) -> bool { self.shape.notify } + fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify } } diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index 522819349..2cfc39894 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -18,7 +18,7 @@ use crate::dataflow::operators::capability::Capability; use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle}; use crate::dataflow::operators::generic::operator_info::OperatorInfo; use crate::dataflow::operators::generic::builder_raw::OperatorShape; -use crate::progress::operate::PortConnectivity; +use crate::progress::operate::{FrontierInterest, PortConnectivity}; use super::builder_raw::OperatorBuilder as OperatorBuilderRaw; @@ -48,8 +48,10 @@ impl OperatorBuilder { } } - /// Indicates whether the operator requires frontier information. - pub fn set_notify(&mut self, notify: bool) { self.builder.set_notify(notify); } + /// Sets frontier interest for a specific input. + pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) { + self.builder.set_notify_for(input, notify); + } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. pub fn new_input(&mut self, stream: Stream, pact: P) -> InputHandleCore diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index 9a4319437..c7fa28b15 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -373,9 +373,9 @@ impl Operator for Stream { let operator_info = builder.operator_info(); let mut input = builder.new_input(self, pact); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let (output, stream) = builder.new_output(); let mut output = OutputBuilder::from(output); - builder.set_notify(false); builder.build(move |mut capabilities| { // `capabilities` should be a single-element vector. @@ -461,9 +461,10 @@ impl Operator for Stream { let mut input1 = builder.new_input(self, pact1); let mut input2 = builder.new_input(other, pact2); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); + builder.set_notify_for(1, crate::progress::operate::FrontierInterest::Never); let (output, stream) = builder.new_output(); let mut output = OutputBuilder::from(output); - builder.set_notify(false); builder.build(move |mut capabilities| { // `capabilities` should be a single-element vector. @@ -547,7 +548,6 @@ where let (output, stream) = builder.new_output(); let mut output = OutputBuilder::from(output); - builder.set_notify(false); builder.build(move |mut capabilities| { // `capabilities` should be a single-element vector. diff --git a/timely/src/dataflow/operators/vec/branch.rs b/timely/src/dataflow/operators/vec/branch.rs index 62bb427b3..ce6ff785f 100644 --- a/timely/src/dataflow/operators/vec/branch.rs +++ b/timely/src/dataflow/operators/vec/branch.rs @@ -41,9 +41,9 @@ impl Branch for StreamVec { condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, ) -> (StreamVec, StreamVec) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); - builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let (output1, stream1) = builder.new_output(); let (output2, stream2) = builder.new_output(); @@ -102,9 +102,9 @@ pub trait BranchWhen: Sized { impl BranchWhen for Stream { fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); - builder.set_notify(false); let mut input = builder.new_input(self, Pipeline); + builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never); let (output1, stream1) = builder.new_output(); let (output2, stream2) = builder.new_output(); diff --git a/timely/src/progress/operate.rs b/timely/src/progress/operate.rs index 40ee2d9d4..6c3f12955 100644 --- a/timely/src/progress/operate.rs +++ b/timely/src/progress/operate.rs @@ -55,12 +55,23 @@ pub trait Operate { /// safely "create" capabilities without basing them on other, prior capabilities. fn initialize(self: Box) -> (Connectivity, Rc>>, Box); - /// Indicates if the operator should be invoked on the basis of input frontier transitions. + /// Indicates for each input whether the operator should be invoked when that input's frontier changes. /// - /// This value is conservatively set to `true`, but operators that know they are oblivious to - /// frontier information can indicate this with `false`, and they will not be scheduled on the - /// basis of their input frontiers changing. - fn notify_me(&self) -> bool { true } + /// Returns a `Vec` with one entry per input. Each entry describes whether + /// frontier changes on that input should cause the operator to be scheduled. The conservative + /// default is `Always` for each input. + fn notify_me(&self) -> &[FrontierInterest];// { &vec![FrontierInterest::Always; self.inputs()] } +} + +/// The ways in which an operator can express interest in activation when an input frontier changes. +#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)] +pub enum FrontierInterest { + /// Never interested in frontier changes, as for example the `map()` and `filter()` operators. + Never, + /// Interested when the operator holds capabilities. + IfCapability, + /// Always interested in frontier changes, as for example the `probe()` and `capture()` operators. + Always, } /// Operator internal connectivity, from inputs to outputs. diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 2576d67f1..44d906094 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -38,6 +38,7 @@ //! let mut results = //! tracker //! .pushed() +//! .0 //! .drain() //! .filter(|((location, time), delta)| location.is_target()) //! .collect::>(); @@ -60,6 +61,7 @@ //! let mut results = //! tracker //! .pushed() +//! .0 //! .drain() //! .filter(|((location, time), delta)| location.is_target()) //! .collect::>(); @@ -413,6 +415,8 @@ pub struct PerOperator { pub targets: Vec>, /// Port information for each source. pub sources: Vec>, + /// Sum across outputs of capabilities. + pub cap_counts: i64, } impl PerOperator { @@ -421,6 +425,7 @@ impl PerOperator { PerOperator { targets: vec![PortInformation::new(); inputs], sources: vec![PortInformation::new(); outputs], + cap_counts: 0, } } } @@ -618,13 +623,15 @@ impl Tracker { while let Some(((source, _), _)) = source_changes.peek() { let source = *source; - let operator = &mut self.per_operator[source.node].sources[source.port]; + let operator = &mut self.per_operator[source.node]; + let op_source = &mut operator.sources[source.port]; let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff)); - let changes = operator.pointstamps.update_iter(source_updates); + let changes = op_source.pointstamps.update_iter(source_updates); for (time, diff) in changes { self.total_counts += diff; - for (output, summaries) in operator.output_summaries.iter_ports() { + operator.cap_counts += diff; + for (output, summaries) in op_source.output_summaries.iter_ports() { let output_changes = &mut self.output_changes[output]; summaries .elements() @@ -708,8 +715,8 @@ impl Tracker { } /// A mutable reference to the pushed results of changes. - pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> { - &mut self.pushed_changes + pub fn pushed(&mut self) -> (&mut ChangeBatch<(Location, T)>, &[PerOperator]) { + (&mut self.pushed_changes, &self.per_operator) } /// Reveals per-operator frontier state. diff --git a/timely/src/progress/subgraph.rs b/timely/src/progress/subgraph.rs index 7a0a662c6..e11011368 100644 --- a/timely/src/progress/subgraph.rs +++ b/timely/src/progress/subgraph.rs @@ -19,7 +19,7 @@ use crate::scheduling::activate::Activations; use crate::progress::frontier::{MutableAntichain, MutableAntichainFilter}; use crate::progress::{Timestamp, Operate, operate::SharedProgress}; use crate::progress::{Location, Port, Source, Target}; -use crate::progress::operate::{Connectivity, PortConnectivity}; +use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity}; use crate::progress::ChangeBatch; use crate::progress::broadcast::Progcaster; use crate::progress::reachability; @@ -198,6 +198,13 @@ where activations.borrow_mut().activate(&self.path[..]); + // The subgraph's per-input interest is conservatively the max across all children's inputs. + let max_interest = self.children.iter() + .flat_map(|c| c.notify.iter().copied()) + .max() + .unwrap_or(FrontierInterest::Never); + let notify_me: Vec = vec![max_interest; inputs]; + Subgraph { name: self.name, path: self.path, @@ -221,6 +228,7 @@ where scope_summary, progress_mode: worker.config().progress_mode, + notify_me, } } } @@ -273,6 +281,8 @@ where scope_summary: Connectivity, progress_mode: ProgressMode, + + notify_me: Vec, } impl Schedule for Subgraph @@ -469,16 +479,20 @@ where self.pointstamp_tracker.propagate_all(); // Drain propagated information into shared progress structure. - for ((location, time), diff) in self.pointstamp_tracker.pushed().drain() { + let (pushed, operators) = self.pointstamp_tracker.pushed(); + for ((location, time), diff) in pushed.drain() { self.maybe_shutdown.push(location.node); // Targets are actionable, sources are not. if let crate::progress::Port::Target(port) = location.port { - if self.children[location.node].notify { - self.temp_active.push(Reverse(location.node)); - } - // TODO: This logic could also be guarded by `.notify`, but - // we want to be a bit careful to make sure all related logic - // agrees with this (e.g. initialization, operator logic, etc.) + // Activate based on expressed frontier interest for this input. + let activate = match self.children[location.node].notify[port] { + FrontierInterest::Always => true, + FrontierInterest::IfCapability => { operators[location.node].cap_counts > 0 } + FrontierInterest::Never => false, + }; + if activate { self.temp_active.push(Reverse(location.node)); } + + // Keep this current independent of the interest. self.children[location.node] .shared_progress .borrow_mut() @@ -493,7 +507,7 @@ where for child_index in self.maybe_shutdown.drain(..) { let child_state = self.pointstamp_tracker.node_state(child_index); let frontiers_empty = child_state.targets.iter().all(|x| x.implications.is_empty()); - let no_capabilities = child_state.sources.iter().all(|x| x.pointstamps.is_empty()); + let no_capabilities = child_state.cap_counts == 0; if frontiers_empty && no_capabilities { self.temp_active.push(Reverse(child_index)); } @@ -524,7 +538,9 @@ where .iter() .any(|((location, time), diff)| // Must publish scope-wide visible subtractions. - tracker.is_global(*location, time) && *diff < 0 + tracker.is_global(*location, time) && *diff < 0 || + // Must confirm the receipt of inbound messages. + location.node == 0 ) }; @@ -586,6 +602,8 @@ where // Return summaries and shared progress information. (internal_summary, Rc::clone(&self.shared_progress), self) } + + fn notify_me(&self) -> &[FrontierInterest] { &self.notify_me } } struct PerOperatorState { @@ -595,7 +613,7 @@ struct PerOperatorState { id: usize, // worker-unique identifier local: bool, // indicates whether the operator will exchange data or not - notify: bool, + notify: Vec, inputs: usize, // number of inputs to the operator outputs: usize, // number of outputs from the operator @@ -619,7 +637,7 @@ impl PerOperatorState { index: 0, id: usize::MAX, local: false, - notify: true, + notify: vec![FrontierInterest::IfCapability; inputs], inputs, outputs, @@ -643,7 +661,7 @@ impl PerOperatorState { let local = scope.local(); let inputs = scope.inputs(); let outputs = scope.outputs(); - let notify = scope.notify_me(); + let notify = scope.notify_me().to_vec(); let (internal_summary, shared_progress, operator) = scope.initialize();