Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions timely/examples/event_driven.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use timely::dataflow::operators::{Input, Probe};
use timely::dataflow::Scope;
use timely::dataflow::operators::{Input, Probe, Enter, Leave};
use timely::dataflow::operators::vec::Map;

fn main() {
Expand All @@ -20,10 +21,14 @@ fn main() {
// create a new input, exchange data, and inspect its output
for _dataflow in 0 .. dataflows {
worker.dataflow(|scope| {
let (input, mut stream) = scope.new_input();
for _step in 0 .. length {
stream = stream.map(|x: ()| x);
}
let (input, stream) = scope.new_input();
let stream = scope.region(|inner| {
let mut stream = stream.enter(inner);
for _step in 0 .. length {
stream = stream.map(|x: ()| x);
}
stream.leave()
});
let (probe, _stream) = stream.probe();
inputs.push(input);
probes.push(probe);
Expand All @@ -43,7 +48,7 @@ fn main() {
worker.step();
steps += 1;
}
println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); }
}

}).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ impl<G: Scope, C: Container> Concatenate<G, C> for G {
// create an operator builder.
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
builder.set_notify(false);

// create new input handles for each input stream.
let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();
for i in 0 .. handles.len() { builder.set_notify_for(i, crate::progress::operate::FrontierInterest::Never); }

// create one output handle for the concatenated results.
let (mut output, result) = builder.new_output();
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/feedback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl<G: Scope> Feedback<G> for G {
fn feedback<C: Container>(&mut self, summary: <G::Timestamp as Timestamp>::Summary) -> (Handle<G, C>, Stream<G, C>) {

let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone());
builder.set_notify(false);
let (output, stream) = builder.new_output();

(Handle { builder, summary, output }, stream)
Expand Down Expand Up @@ -118,6 +117,7 @@ impl<G: Scope, C: Container> ConnectLoop<G, C> for Stream<G, C> {
let mut output = handle.output;

let mut input = builder.new_input_connection(self, Pipeline, [(0, Antichain::from_elem(summary.clone()))]);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);

builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
Expand Down
3 changes: 2 additions & 1 deletion timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl<T:Timestamp> Schedule for Operator<T> {
}
}

use crate::progress::operate::FrontierInterest;
impl<T:Timestamp> Operate<T> for Operator<T> {

fn inputs(&self) -> usize { 0 }
Expand All @@ -209,7 +210,7 @@ impl<T:Timestamp> Operate<T> for Operator<T> {
(Vec::new(), Rc::clone(&self.shared_progress), self)
}

fn notify_me(&self) -> bool { false }
fn notify_me(&self) -> &[FrontierInterest] { &[] }
}


Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/ok_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ impl<S: Scope, C: Container + DrainContainer> OkErr<S, C> for Stream<S, C> {
L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
{
let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
builder.set_notify(false);

let mut input = builder.new_input(self, Pipeline);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
let (output1, stream1) = builder.new_output();
let (output2, stream2) = builder.new_output();

Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
{
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
builder.set_notify(false);

let mut input = builder.new_input(self, Pipeline);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
let mut outputs = Vec::with_capacity(parts as usize);
let mut streams = Vec::with_capacity(parts as usize);

Expand All @@ -66,7 +66,7 @@
targets.entry(part).or_default().push(datum);
}
// Form each intended output into a container and ship.
while let Some((part, data)) = targets.pop_first() {

Check warning on line 69 in timely/src/dataflow/operators/core/partition.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`data` shadows a previous, unrelated binding
for datum in data.into_iter() {
c_build.push_into(datum);
while let Some(container) = c_build.extract() {
Expand Down
3 changes: 2 additions & 1 deletion timely/src/dataflow/operators/core/unordered_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ impl<T:Timestamp> Schedule for UnorderedOperator<T> {
}
}

use crate::progress::operate::FrontierInterest;
impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }
Expand All @@ -140,7 +141,7 @@ impl<T:Timestamp> Operate<T> for UnorderedOperator<T> {
(Vec::new(), Rc::clone(&self.shared_progress), self)
}

fn notify_me(&self) -> bool { false }
fn notify_me(&self) -> &[FrontierInterest] { &[] }
}

/// A handle to an input [Stream], used to introduce data to a timely dataflow computation.
Expand Down
15 changes: 9 additions & 6 deletions timely/src/dataflow/operators/generic/builder_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::scheduling::{Schedule, Activations};

use crate::progress::{Source, Target};
use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain};
use crate::progress::operate::{Connectivity, PortConnectivity};
use crate::progress::operate::{FrontierInterest, Connectivity, PortConnectivity};
use crate::Container;
use crate::dataflow::{Stream, Scope};
use crate::dataflow::channels::pushers::Tee;
Expand All @@ -23,7 +23,7 @@ use crate::dataflow::operators::generic::operator_info::OperatorInfo;
#[derive(Debug)]
pub struct OperatorShape {
name: String, // A meaningful name for the operator.
notify: bool, // Does the operator require progress notifications.
notify: Vec<FrontierInterest>, // Per-input frontier interest.
peers: usize, // The total number of workers in the computation. Needed to initialize pointstamp counts with the correct magnitude.
inputs: usize, // The number of input ports.
outputs: usize, // The number of output ports.
Expand All @@ -34,7 +34,7 @@ impl OperatorShape {
fn new(name: String, peers: usize) -> Self {
OperatorShape {
name,
notify: true,
notify: Vec::new(),
peers,
inputs: 0,
outputs: 0,
Expand Down Expand Up @@ -88,8 +88,10 @@ impl<G: Scope> OperatorBuilder<G> {
/// Return a reference to the operator's shape
pub fn shape(&self) -> &OperatorShape { &self.shape }

/// Indicates whether the operator requires frontier information.
pub fn set_notify(&mut self, notify: bool) { self.shape.notify = notify; }
/// Sets frontier interest for a specific input.
pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
self.shape.notify[input] = notify;
}

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
pub fn new_input<C: Container, P>(&mut self, stream: Stream<G, C>, pact: P) -> P::Puller
Expand All @@ -113,6 +115,7 @@ impl<G: Scope> OperatorBuilder<G> {
stream.connect_to(target, sender, channel_id);

self.shape.inputs += 1;
self.shape.notify.push(FrontierInterest::Always);
let connectivity: PortConnectivity<_> = connection.into_iter().collect();
assert!(connectivity.iter_ports().all(|(o,_)| o < self.shape.outputs));
self.summary.push(connectivity);
Expand Down Expand Up @@ -220,5 +223,5 @@ where
(self.summary.clone(), Rc::clone(&self.shared_progress), self)
}

fn notify_me(&self) -> bool { self.shape.notify }
fn notify_me(&self) -> &[FrontierInterest] { &self.shape.notify }
}
8 changes: 5 additions & 3 deletions timely/src/dataflow/operators/generic/builder_rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::dataflow::operators::capability::Capability;
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
use crate::progress::operate::PortConnectivity;
use crate::progress::operate::{FrontierInterest, PortConnectivity};

use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;

Expand Down Expand Up @@ -48,8 +48,10 @@ impl<G: Scope> OperatorBuilder<G> {
}
}

/// Indicates whether the operator requires frontier information.
pub fn set_notify(&mut self, notify: bool) { self.builder.set_notify(notify); }
/// Sets frontier interest for a specific input.
pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
self.builder.set_notify_for(input, notify);
}

/// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
pub fn new_input<C: Container, P>(&mut self, stream: Stream<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>
Expand Down
6 changes: 3 additions & 3 deletions timely/src/dataflow/operators/generic/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1> {
let operator_info = builder.operator_info();

let mut input = builder.new_input(self, pact);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
let (output, stream) = builder.new_output();
let mut output = OutputBuilder::from(output);
builder.set_notify(false);

builder.build(move |mut capabilities| {
// `capabilities` should be a single-element vector.
Expand Down Expand Up @@ -461,9 +461,10 @@ impl<G: Scope, C1: Container> Operator<G, C1> for Stream<G, C1> {

let mut input1 = builder.new_input(self, pact1);
let mut input2 = builder.new_input(other, pact2);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
builder.set_notify_for(1, crate::progress::operate::FrontierInterest::Never);
let (output, stream) = builder.new_output();
let mut output = OutputBuilder::from(output);
builder.set_notify(false);

builder.build(move |mut capabilities| {
// `capabilities` should be a single-element vector.
Expand Down Expand Up @@ -547,7 +548,6 @@ where

let (output, stream) = builder.new_output();
let mut output = OutputBuilder::from(output);
builder.set_notify(false);

builder.build(move |mut capabilities| {
// `capabilities` should be a single-element vector.
Expand Down
4 changes: 2 additions & 2 deletions timely/src/dataflow/operators/vec/branch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ impl<S: Scope, D: 'static> Branch<S, D> for StreamVec<S, D> {
condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
) -> (StreamVec<S, D>, StreamVec<S, D>) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
builder.set_notify(false);

let mut input = builder.new_input(self, Pipeline);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
let (output1, stream1) = builder.new_output();
let (output2, stream2) = builder.new_output();

Expand Down Expand Up @@ -102,9 +102,9 @@ pub trait BranchWhen<T>: Sized {
impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for Stream<S, C> {
fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
builder.set_notify(false);

let mut input = builder.new_input(self, Pipeline);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
let (output1, stream1) = builder.new_output();
let (output2, stream2) = builder.new_output();

Expand Down
21 changes: 16 additions & 5 deletions timely/src/progress/operate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,23 @@ pub trait Operate<T: Timestamp> {
/// safely "create" capabilities without basing them on other, prior capabilities.
fn initialize(self: Box<Self>) -> (Connectivity<T::Summary>, Rc<RefCell<SharedProgress<T>>>, Box<dyn Schedule>);

/// Indicates if the operator should be invoked on the basis of input frontier transitions.
/// Indicates for each input whether the operator should be invoked when that input's frontier changes.
///
/// This value is conservatively set to `true`, but operators that know they are oblivious to
/// frontier information can indicate this with `false`, and they will not be scheduled on the
/// basis of their input frontiers changing.
fn notify_me(&self) -> bool { true }
/// Returns a `Vec<FrontierInterest>` with one entry per input. Each entry describes whether
/// frontier changes on that input should cause the operator to be scheduled. The conservative
/// default is `Always` for each input.
fn notify_me(&self) -> &[FrontierInterest];// { &vec![FrontierInterest::Always; self.inputs()] }
}

/// The ways in which an operator can express interest in activation when an input frontier changes.
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Debug)]
pub enum FrontierInterest {
/// Never interested in frontier changes, as for example the `map()` and `filter()` operators.
Never,
/// Interested when the operator holds capabilities.
IfCapability,
/// Always interested in frontier changes, as for example the `probe()` and `capture()` operators.
Always,
}

/// Operator internal connectivity, from inputs to outputs.
Expand Down
17 changes: 12 additions & 5 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
//! let mut results =
//! tracker
//! .pushed()
//! .0
//! .drain()
//! .filter(|((location, time), delta)| location.is_target())
//! .collect::<Vec<_>>();
Expand All @@ -60,6 +61,7 @@
//! let mut results =
//! tracker
//! .pushed()
//! .0
//! .drain()
//! .filter(|((location, time), delta)| location.is_target())
//! .collect::<Vec<_>>();
Expand Down Expand Up @@ -287,7 +289,7 @@
in_degree.entry(target).or_insert(0);
for (output, summaries) in outputs.iter_ports() {
let source = Location::new_source(index, output);
for summary in summaries.elements().iter() {

Check warning on line 292 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`summary` shadows a previous, unrelated binding
if summary == &Default::default() {
*in_degree.entry(source).or_insert(0) += 1;
}
Expand Down Expand Up @@ -413,6 +415,8 @@
pub targets: Vec<PortInformation<T>>,
/// Port information for each source.
pub sources: Vec<PortInformation<T>>,
/// Sum across outputs of capabilities.
pub cap_counts: i64,
}

impl<T: Timestamp> PerOperator<T> {
Expand All @@ -421,6 +425,7 @@
PerOperator {
targets: vec![PortInformation::new(); inputs],
sources: vec![PortInformation::new(); outputs],
cap_counts: 0,
}
}
}
Expand Down Expand Up @@ -618,13 +623,15 @@
while let Some(((source, _), _)) = source_changes.peek() {

let source = *source;
let operator = &mut self.per_operator[source.node].sources[source.port];
let operator = &mut self.per_operator[source.node];
let op_source = &mut operator.sources[source.port];
let source_updates = source_changes.peeking_take_while(|((s, _),_)| s == &source).map(|((_,time),diff)| (time,diff));
let changes = operator.pointstamps.update_iter(source_updates);
let changes = op_source.pointstamps.update_iter(source_updates);

for (time, diff) in changes {
self.total_counts += diff;
for (output, summaries) in operator.output_summaries.iter_ports() {
operator.cap_counts += diff;
for (output, summaries) in op_source.output_summaries.iter_ports() {
let output_changes = &mut self.output_changes[output];
summaries
.elements()
Expand Down Expand Up @@ -663,7 +670,7 @@
.implications
.update_iter(Some((time, diff)));

for (time, diff) in changes {

Check warning on line 673 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`diff` shadows a previous, unrelated binding

Check warning on line 673 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
let nodes = &self.nodes[location.node][port_index];
for (output_port, summaries) in nodes.iter_ports() {
let source = Location { node: location.node, port: Port::Source(output_port) };
Expand All @@ -686,7 +693,7 @@
.implications
.update_iter(Some((time, diff)));

for (time, diff) in changes {

Check warning on line 696 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`diff` shadows a previous, unrelated binding

Check warning on line 696 in timely/src/progress/reachability.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

`time` shadows a previous, unrelated binding
for new_target in self.edges[location.node][port_index].iter() {
self.worklist.push(Reverse((
time.clone(),
Expand All @@ -708,8 +715,8 @@
}

/// A mutable reference to the pushed results of changes.
pub fn pushed(&mut self) -> &mut ChangeBatch<(Location, T)> {
&mut self.pushed_changes
pub fn pushed(&mut self) -> (&mut ChangeBatch<(Location, T)>, &[PerOperator<T>]) {
(&mut self.pushed_changes, &self.per_operator)
}

/// Reveals per-operator frontier state.
Expand Down
Loading
Loading