From fe3dd26564b3ab8fd6508b1921b364eba3cf5eeb Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 11 Mar 2026 16:41:11 -0400 Subject: [PATCH 1/5] Set default progress to Demand --- timely/src/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timely/src/worker.rs b/timely/src/worker.rs index da92aa032..70051214a 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -112,7 +112,7 @@ impl Config { #[cfg(feature = "getopts")] pub fn from_matches(matches: &getopts::Matches) -> Result { let progress_mode = matches - .opt_get_default("progress-mode", ProgressMode::Eager)?; + .opt_get_default("progress-mode", ProgressMode::Demand)?; Ok(Config::default().progress_mode(progress_mode)) } From fdac7255b4996a9a0ea969542aae59c8739fa822 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 11 Mar 2026 16:43:29 -0400 Subject: [PATCH 2/5] Use robust header size method --- communication/src/allocator/zero_copy/allocator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 35cac7f4a..41cb4d6aa 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -267,7 +267,7 @@ impl Allocate for TcpAllocator { // Get the header and payload, ditch the header. let mut peel = bytes.extract_to(header.required_bytes()); - let _ = peel.extract_to(::std::mem::size_of::()); + let _ = peel.extract_to(header.header_bytes()); // Increment message count for channel. // Safe to do this even if the channel has been dropped. From fc4a7e2ba3f435e76f37e137d2e3f8f4232f25be Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 11 Mar 2026 16:43:46 -0400 Subject: [PATCH 3/5] Pre-allocate with intended length --- timely/src/progress/broadcast.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 3c5a7f00f..e043e8143 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -117,8 +117,8 @@ impl Progcaster { // options for improving it if performance limits users who want other logging. self.progress_logging.as_ref().map(|l| { - let mut messages = Vec::with_capacity(changes.len()); - let mut internal = Vec::with_capacity(changes.len()); + let mut messages = Vec::with_capacity(recv_changes.len()); + let mut internal = Vec::with_capacity(recv_changes.len()); for ((location, time), diff) in recv_changes.iter() { From f1624779455597b5ad9bb362901b4eec9089972e Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 11 Mar 2026 16:47:46 -0400 Subject: [PATCH 4/5] Remove fn epoch() --- timely/examples/hashjoin.rs | 2 +- timely/examples/unionfind.rs | 2 +- timely/src/dataflow/operators/core/input.rs | 5 ----- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/timely/examples/hashjoin.rs b/timely/examples/hashjoin.rs index 372d938ba..fec6925ec 100644 --- a/timely/examples/hashjoin.rs +++ b/timely/examples/hashjoin.rs @@ -92,7 +92,7 @@ fn main() { sent += to_send; // Advance input, iterate until data cleared. - let next = input1.epoch() + 1; + let next = input1.time() + 1; input1.advance_to(next); input2.advance_to(next); while probe.less_than(input1.time()) { diff --git a/timely/examples/unionfind.rs b/timely/examples/unionfind.rs index 9eb41cebb..1e62e619c 100644 --- a/timely/examples/unionfind.rs +++ b/timely/examples/unionfind.rs @@ -38,7 +38,7 @@ fn main() { for (edge, arc) in insert.take(edges / peers).enumerate() { input.send(arc); if edge % batch == (batch - 1) { - let next = input.epoch() + 1; + let next = input.time() + 1; input.advance_to(next); while probe.less_than(input.time()) { worker.step(); diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index fe530c49a..02622c5d1 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -469,11 +469,6 @@ impl> Handle { /// and to begin to shut down operators, as this input can no longer produce data. pub fn close(self) { } - /// Reports the current epoch. - pub fn epoch(&self) -> &T { - &self.now_at - } - /// Reports the current timestamp. pub fn time(&self) -> &T { &self.now_at From b76b7ce6d3c051a13e19218c2c1ad9e26a8d3dcf Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 11 Mar 2026 17:05:49 -0400 Subject: [PATCH 5/5] Rename MutableAntichain::new_bottom to ::from_elem --- .../dataflow/operators/generic/notificator.rs | 2 +- timely/src/progress/frontier.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index a50d342c4..1364a9241 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -109,7 +109,7 @@ fn notificator_delivers_notifications_in_topo_order() { use crate::order::Product; use crate::dataflow::operators::capability::Capability; - let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0)); + let mut frontier = MutableAntichain::from_elem(Product::new(0, 0)); let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new()))); diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 7105c665f..25c82ed56 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -219,10 +219,10 @@ impl Antichain { /// /// let mut frontier = Antichain::from_elem(2); ///``` - pub fn from_elem(element: T) -> Antichain { + pub fn from_elem(element: T) -> Antichain { let mut elements = SmallVec::with_capacity(1); elements.push(element); - Antichain { elements } + Antichain { elements } } /// Clears the contents of the antichain. @@ -440,11 +440,11 @@ impl MutableAntichain { ///``` /// use timely::progress::frontier::{AntichainRef, MutableAntichain}; /// - /// let mut frontier = MutableAntichain::new_bottom(0u64); + /// let mut frontier = MutableAntichain::from_elem(0u64); /// assert!(frontier.frontier() == AntichainRef::new(&[0u64])); ///``` #[inline] - pub fn new_bottom(bottom: T) -> MutableAntichain + pub fn from_elem(bottom: T) -> MutableAntichain where T: Ord+Clone, { @@ -477,7 +477,7 @@ impl MutableAntichain { ///``` /// use timely::progress::frontier::MutableAntichain; /// - /// let mut frontier = MutableAntichain::new_bottom(1u64); + /// let mut frontier = MutableAntichain::from_elem(1u64); /// assert!(!frontier.less_than(&0)); /// assert!(!frontier.less_than(&1)); /// assert!(frontier.less_than(&2)); @@ -497,7 +497,7 @@ impl MutableAntichain { ///``` /// use timely::progress::frontier::MutableAntichain; /// - /// let mut frontier = MutableAntichain::new_bottom(1u64); + /// let mut frontier = MutableAntichain::from_elem(1u64); /// assert!(!frontier.less_equal(&0)); /// assert!(frontier.less_equal(&1)); /// assert!(frontier.less_equal(&2)); @@ -517,7 +517,7 @@ impl MutableAntichain { ///``` /// use timely::progress::frontier::{AntichainRef, MutableAntichain}; /// - /// let mut frontier = MutableAntichain::new_bottom(1u64); + /// let mut frontier = MutableAntichain::from_elem(1u64); /// let changes = /// frontier /// .update_iter(vec![(1, -1), (2, 7)]) @@ -621,7 +621,7 @@ pub trait MutableAntichainFilter { /// ``` /// use timely::progress::frontier::{MutableAntichain, MutableAntichainFilter}; /// - /// let mut frontier = MutableAntichain::new_bottom(1u64); + /// let mut frontier = MutableAntichain::from_elem(1u64); /// let changes = /// vec![(1, -1), (2, 7)] /// .filter_through(&mut frontier)