From 0eef8a2673a2a835bce4838b6a1acf377452d204 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Mar 2026 04:53:47 -0400 Subject: [PATCH 1/3] External -> internal consolidation --- differential-dataflow/src/consolidation.rs | 121 +++--------------- .../src/trace/implementations/chunker.rs | 6 +- .../trace/implementations/merge_batcher.rs | 1 - 3 files changed, 21 insertions(+), 107 deletions(-) diff --git a/differential-dataflow/src/consolidation.rs b/differential-dataflow/src/consolidation.rs index 0d5271b9e..a56a42530 100644 --- a/differential-dataflow/src/consolidation.rs +++ b/differential-dataflow/src/consolidation.rs @@ -10,11 +10,10 @@ //! you need specific behavior, it may be best to defensively copy, paste, and maintain the //! specific behavior you require. -use std::cmp::Ordering; use std::collections::VecDeque; -use timely::container::{ContainerBuilder, DrainContainer, PushInto}; +use timely::container::{ContainerBuilder, PushInto}; use crate::Data; -use crate::difference::{IsZero, Semigroup}; +use crate::difference::Semigroup; /// Sorts and consolidates `vec`. /// @@ -232,112 +231,28 @@ where } } -/// Layout of containers and their read items to be consolidated. +/// A container that can sort and consolidate its contents internally. /// -/// This trait specifies behavior to extract keys and diffs from container's read -/// items. Consolidation accumulates the diffs per key. +/// The container knows its own layout — how to sort its elements, how to +/// compare adjacent entries, and how to merge diffs. The caller provides +/// a `target` container to receive the consolidated output, allowing +/// reuse of allocations across calls. /// -/// The trait requires `Container` to have access to its `Item` GAT. -pub trait ConsolidateLayout: DrainContainer { - /// Key portion of data, essentially everything minus the diff - type Key<'a>: Eq where Self: 'a; - - /// GAT diff type. - type Diff<'a>; - - /// Owned diff type. - type DiffOwned: for<'a> Semigroup>; - - /// Converts a reference diff into an owned diff. - fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned; - - /// Deconstruct an item into key and diff. Must be cheap. - fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>); - - /// Push an element to a compatible container. - /// - /// This function is odd to have, so let's explain why it exists. Ideally, the container - /// would accept a `(key, diff)` pair and we wouldn't need this function. However, we - /// might never be in a position where this is true: Vectors can push any `T`, which would - /// collide with a specific implementation for pushing tuples of mixes GATs and owned types. - /// - /// For this reason, we expose a function here that takes a GAT key and an owned diff, and - /// leave it to the implementation to "patch" a suitable item that can be pushed into `self`. - fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned); - - /// Compare two items by key to sort containers. - fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering; - - /// Returns the number of items in the container. +/// After the call, `target` contains the sorted, consolidated data and +/// `self` may be empty or in an unspecified state (implementations should +/// document this). +pub trait Consolidate { + /// The number of elements in the container. fn len(&self) -> usize; - - /// Clear the container. Afterwards, `len()` should return 0. + /// Clear the container. fn clear(&mut self); - - /// Consolidate the supplied container. - fn consolidate_into(&mut self, target: &mut Self) { - // Sort input data - let mut permutation = Vec::with_capacity(self.len()); - permutation.extend(self.drain()); - permutation.sort_by(|a, b| Self::cmp(a, b)); - - // Iterate over the data, accumulating diffs for like keys. - let mut iter = permutation.drain(..); - if let Some(item) = iter.next() { - - let (k, d) = Self::into_parts(item); - let mut prev_key = k; - let mut prev_diff = Self::owned_diff(d); - - for item in iter { - let (next_key, next_diff) = Self::into_parts(item); - if next_key == prev_key { - prev_diff.plus_equals(&next_diff); - } - else { - if !prev_diff.is_zero() { - target.push_with_diff(prev_key, prev_diff); - } - prev_key = next_key; - prev_diff = Self::owned_diff(next_diff); - } - } - - if !prev_diff.is_zero() { - target.push_with_diff(prev_key, prev_diff); - } - } - } + /// Sort and consolidate `self` into `target`. + fn consolidate_into(&mut self, target: &mut Self); } -impl ConsolidateLayout for Vec<(D, T, R)> -where - D: Ord + Clone + 'static, - T: Ord + Clone + 'static, - R: Semigroup + Clone + 'static, -{ - type Key<'a> = (D, T) where Self: 'a; - type Diff<'a> = R where Self: 'a; - type DiffOwned = R; - - fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff } - - fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { - ((data, time), diff) - } - - fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { - (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1)) - } - - fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) { - self.push((data, time, diff)); - } - - #[inline] fn len(&self) -> usize { Vec::len(self) } - #[inline] fn clear(&mut self) { Vec::clear(self) } - - /// Consolidate the supplied container. +impl Consolidate for Vec<(D, T, R)> { + fn len(&self) -> usize { Vec::len(self) } + fn clear(&mut self) { Vec::clear(self) } fn consolidate_into(&mut self, target: &mut Self) { consolidate_updates(self); std::mem::swap(self, target); diff --git a/differential-dataflow/src/trace/implementations/chunker.rs b/differential-dataflow/src/trace/implementations/chunker.rs index 71d7aeb41..ac2048e14 100644 --- a/differential-dataflow/src/trace/implementations/chunker.rs +++ b/differential-dataflow/src/trace/implementations/chunker.rs @@ -7,7 +7,7 @@ use timely::Container; use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer}; use crate::containers::TimelyStack; -use crate::consolidation::{consolidate_updates, ConsolidateLayout}; +use crate::consolidation::{consolidate_updates, Consolidate}; use crate::difference::Semigroup; /// Chunk a stream of vectors into chains of vectors. @@ -257,7 +257,7 @@ where Input: DrainContainer, Output: Default + SizableContainer - + ConsolidateLayout + + Consolidate + PushInto>, { fn push_into(&mut self, container: &'a mut Input) { @@ -283,7 +283,7 @@ where impl ContainerBuilder for ContainerChunker where - Output: SizableContainer + ConsolidateLayout + Container, + Output: SizableContainer + Consolidate + Container, { type Container = Output; diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 5c932af9f..54e1ac8c4 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -331,7 +331,6 @@ pub mod container { type Time = MC::TimeOwned; type Chunk = MC; - // TODO: Consider integrating with `ConsolidateLayout`. fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); From e65d33edb16216f572803c6d6bf0ffc9175d947a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Mar 2026 05:35:22 -0400 Subject: [PATCH 2/3] External -> internal merging --- .../src/operators/arrange/arrangement.rs | 6 +- differential-dataflow/src/operators/reduce.rs | 4 +- .../trace/implementations/merge_batcher.rs | 534 +++++++++--------- .../src/trace/implementations/ord_neu.rs | 11 +- .../src/trace/implementations/rhh.rs | 7 +- 5 files changed, 289 insertions(+), 273 deletions(-) diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 5aaf8bb33..a894d5486 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -30,7 +30,7 @@ use crate::{Data, VecCollection, AsCollection}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; -use crate::trace::implementations::merge_batcher::container::MergerChunk; +use crate::trace::implementations::merge_batcher::container::InternalMerge; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -259,7 +259,7 @@ where Time=T1::Time, Diff: Abelian, >+'static, - Bu: Builder>, + Bu: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, { self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| { @@ -281,7 +281,7 @@ where ValOwn: Data, Time=T1::Time, >+'static, - Bu: Builder>, + Bu: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index f0302989f..fe3622d74 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -20,7 +20,7 @@ use crate::operators::arrange::{Arranged, TraceAgent}; use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; use crate::trace::cursor::CursorList; use crate::trace::implementations::containers::BatchContainer; -use crate::trace::implementations::merge_batcher::container::MergerChunk; +use crate::trace::implementations::merge_batcher::container::InternalMerge; use crate::trace::TraceReader; /// A key-wise reduction of values in an input trace. @@ -31,7 +31,7 @@ where G: Scope, T1: TraceReader + Clone + 'static, T2: for<'a> Trace=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static, - Bu: Builder>, + Bu: Builder>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static, { let mut result_trace = None; diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 54e1ac8c4..a1b818c6c 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -219,93 +219,80 @@ pub trait Merger: Default { fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); } -pub use container::{VecMerger, ColMerger}; +pub use container::InternalMerger; pub mod container { - //! A general purpose `Merger` implementation for arbitrary containers. + //! Merger implementations for the merge batcher. //! - //! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`. - //! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as - //! well as the ability to return the container when iteration is complete. - //! The `MergerChunk` trait is meant to be implemented by containers, and it explains how container - //! items should be interpreted with respect to times, and with respect to differences. - //! These two traits exist instead of a stack of constraints on the structure of the associated items - //! of the containers, allowing them to perform their functions without destructuring their guts. - //! - //! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules. + //! The `InternalMerge` trait allows containers to merge sorted, consolidated + //! data using internal iteration. The `InternalMerger` type implements the + //! `Merger` trait using `InternalMerge`, and is the standard merger for all + //! container types. - use std::cmp::Ordering; use std::marker::PhantomData; - use timely::container::{PushInto, SizableContainer}; + use timely::container::SizableContainer; use timely::progress::frontier::{Antichain, AntichainRef}; use timely::{Accountable, PartialOrder}; - use timely::container::DrainContainer; use crate::trace::implementations::merge_batcher::Merger; - /// An abstraction for a container that can be iterated over, and conclude by returning itself. - pub trait ContainerQueue { - /// Returns either the next item in the container, or the container itself. - fn next_or_alloc(&mut self) -> Result, C>; - /// Indicates whether `next_or_alloc` will return `Ok`, and whether `peek` will return `Some`. - fn is_empty(&self) -> bool; - /// Compare the heads of two queues, where empty queues come last. - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering; - /// Create a new queue from an existing container. - fn from(container: C) -> Self; - } - - /// Behavior to dissect items of chunks in the merge batcher - pub trait MergerChunk : Accountable + DrainContainer + SizableContainer + Default { - /// An owned time type. - /// - /// This type is provided so that users can maintain antichains of something, in order to track - /// the forward movement of time and extract intervals from chains of updates. + /// A container that supports the operations needed by the merge batcher: + /// merging sorted chains and extracting updates by time. + pub trait InternalMerge: Accountable + SizableContainer + Default { + /// The owned time type, for maintaining antichains. type TimeOwned; - /// The owned diff type. - /// - /// This type is provided so that users can provide an owned instance to the `push_and_add` method, - /// to act as a scratch space when the type is substantial and could otherwise require allocations. - type DiffOwned: Default; - /// Relates a borrowed time to antichains of owned times. - /// - /// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`. - fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool; + /// The number of items in this container. + fn len(&self) -> usize; - /// Push an entry that adds together two diffs. - /// - /// This is only called when two items are deemed mergeable by the container queue. - /// If the two diffs added together is zero do not push anything. - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned); + /// Clear the container for reuse. + fn clear(&mut self); /// Account the allocations behind the chunk. - // TODO: Find a more universal home for this: `Container`? fn account(&self) -> (usize, usize, usize, usize) { let (size, capacity, allocations) = (0, 0, 0); (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations) } - /// Clear the chunk, to be reused. - fn clear(&mut self); + /// Merge items from sorted inputs into `self`, advancing positions. + /// Merges until `self` is at capacity or all inputs are exhausted. + /// + /// Dispatches based on the number of inputs: + /// - **0**: no-op + /// - **1**: bulk copy (may swap the input into `self`) + /// - **2**: merge two sorted streams + fn merge_from( + &mut self, + others: &mut [Self], + positions: &mut [usize], + ); + + /// Extract updates from `self` into `ship` (times not beyond `upper`) + /// and `keep` (times beyond `upper`), updating `frontier` with kept times. + fn extract( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ); } - /// A merger for arbitrary containers. - /// - /// `MC` is a `Container` that implements [`MergerChunk`]. - /// `CQ` is a [`ContainerQueue`] supporting `MC`. - pub struct ContainerMerger { - _marker: PhantomData<(MC, CQ)>, + /// A `Merger` using internal iteration for `Vec` containers. + pub type VecInternalMerger = InternalMerger>; + /// A `Merger` using internal iteration for `TimelyStack` containers. + pub type ColInternalMerger = InternalMerger>; + + /// A merger that uses internal iteration via [`InternalMerge`]. + pub struct InternalMerger { + _marker: PhantomData, } - impl Default for ContainerMerger { - fn default() -> Self { - Self { _marker: PhantomData, } - } + impl Default for InternalMerger { + fn default() -> Self { Self { _marker: PhantomData } } } - impl ContainerMerger { - /// Helper to get pre-sized vector from the stash. + impl InternalMerger where MC: InternalMerge { #[inline] fn empty(&self, stash: &mut Vec) -> MC { stash.pop().unwrap_or_else(|| { @@ -314,96 +301,87 @@ pub mod container { container }) } - /// Helper to return a chunk to the stash. #[inline] fn recycle(&self, mut chunk: MC, stash: &mut Vec) { - // TODO: Should we only retain correctly sized containers? chunk.clear(); stash.push(chunk); } + /// Drain remaining items from one side into `result`/`output`. + fn drain_side( + &self, + head: &mut MC, + pos: &mut usize, + list: &mut std::vec::IntoIter, + result: &mut MC, + output: &mut Vec, + stash: &mut Vec, + ) { + while *pos < head.len() { + result.merge_from( + std::slice::from_mut(head), + std::slice::from_mut(pos), + ); + if *pos >= head.len() { + let old = std::mem::replace(head, list.next().unwrap_or_default()); + self.recycle(old, stash); + *pos = 0; + } + if result.at_capacity() { + output.push(std::mem::take(result)); + *result = self.empty(stash); + } + } + } } - impl Merger for ContainerMerger + impl Merger for InternalMerger where - for<'a> MC: MergerChunk + Clone + PushInto<::Item<'a>> + 'static, - CQ: ContainerQueue, + MC: InternalMerge + 'static, { type Time = MC::TimeOwned; type Chunk = MC; - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = CQ::from(list1.next().unwrap_or_default()); - let mut head2 = CQ::from(list2.next().unwrap_or_default()); + let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()]; + let mut positions = [0usize, 0usize]; let mut result = self.empty(stash); - let mut diff_owned = Default::default(); - - // while we have valid data in each input, merge. - while !head1.is_empty() && !head2.is_empty() { - while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() { - let cmp = head1.cmp_heads(&head2); - // TODO: The following less/greater branches could plausibly be a good moment for - // `copy_range`, on account of runs of records that might benefit more from a - // `memcpy`. - match cmp { - Ordering::Less => { - result.push_into(head1.next_or_alloc().ok().unwrap()); - } - Ordering::Greater => { - result.push_into(head2.next_or_alloc().ok().unwrap()); - } - Ordering::Equal => { - let item1 = head1.next_or_alloc().ok().unwrap(); - let item2 = head2.next_or_alloc().ok().unwrap(); - result.push_and_add(item1, item2, &mut diff_owned); - } - } - } - - if result.at_capacity() { - output.push_into(result); - result = self.empty(stash); - } + // Main merge loop: both sides have data. + while positions[0] < heads[0].len() && positions[1] < heads[1].len() { + result.merge_from(&mut heads, &mut positions); - if head1.is_empty() { - self.recycle(head1.next_or_alloc().err().unwrap(), stash); - head1 = CQ::from(list1.next().unwrap_or_default()); + if positions[0] >= heads[0].len() { + let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default()); + self.recycle(old, stash); + positions[0] = 0; } - if head2.is_empty() { - self.recycle(head2.next_or_alloc().err().unwrap(), stash); - head2 = CQ::from(list2.next().unwrap_or_default()); + if positions[1] >= heads[1].len() { + let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default()); + self.recycle(old, stash); + positions[1] = 0; } - } - - // TODO: recycle `head1` rather than discarding. - while let Ok(next) = head1.next_or_alloc() { - result.push_into(next); if result.at_capacity() { - output.push_into(result); + output.push(std::mem::take(&mut result)); result = self.empty(stash); } } + + // Drain remaining from side 0. + self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash); if !result.is_empty() { - output.push_into(result); + output.push(std::mem::take(&mut result)); result = self.empty(stash); } output.extend(list1); - // TODO: recycle `head2` rather than discarding. - while let Ok(next) = head2.next_or_alloc() { - result.push_into(next); - if result.at_capacity() { - output.push(result); - result = self.empty(stash); - } - } + // Drain remaining from side 1. + self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash); if !result.is_empty() { - output.push_into(result); - // result = self.empty(stash); + output.push(std::mem::take(&mut result)); } output.extend(list2); } @@ -413,7 +391,7 @@ pub mod container { merged: Vec, upper: AntichainRef, frontier: &mut Antichain, - readied: &mut Vec, + ship: &mut Vec, kept: &mut Vec, stash: &mut Vec, ) { @@ -421,174 +399,137 @@ pub mod container { let mut ready = self.empty(stash); for mut buffer in merged { - for item in buffer.drain() { - if MC::time_kept(&item, &upper, frontier) { - if keep.at_capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.empty(stash); - } - keep.push_into(item); - } else { - if ready.at_capacity() && !ready.is_empty() { - readied.push(ready); - ready = self.empty(stash); - } - ready.push_into(item); - } - } - // Recycling buffer. + buffer.extract(upper, frontier, &mut keep, &mut ready); self.recycle(buffer, stash); + if keep.at_capacity() { + kept.push(std::mem::take(&mut keep)); + keep = self.empty(stash); + } + if ready.at_capacity() { + ship.push(std::mem::take(&mut ready)); + ready = self.empty(stash); + } } - // Finish the kept data. if !keep.is_empty() { kept.push(keep); } if !ready.is_empty() { - readied.push(ready); + ship.push(ready); } } - /// Account the allocations behind the chunk. fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { chunk.account() } } - pub use vec::VecMerger; - /// Implementations of `ContainerQueue` and `MergerChunk` for `Vec` containers. - pub mod vec { - - use std::collections::VecDeque; - use timely::progress::{Antichain, frontier::AntichainRef}; + /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`. + pub mod vec_internal { + use std::cmp::Ordering; + use timely::PartialOrder; + use timely::container::SizableContainer; + use timely::progress::frontier::{Antichain, AntichainRef}; use crate::difference::Semigroup; - use super::{ContainerQueue, MergerChunk}; - - /// A `Merger` implementation backed by vector containers. - pub type VecMerger = super::ContainerMerger, std::collections::VecDeque<(D, T, R)>>; - - impl ContainerQueue> for VecDeque<(D, T, R)> { - fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> { - if self.is_empty() { - Err(Vec::from(std::mem::take(self))) - } - else { - Ok(self.pop_front().unwrap()) - } - } - fn is_empty(&self) -> bool { - self.is_empty() - } - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { - let (data1, time1, _) = self.front().unwrap(); - let (data2, time2, _) = other.front().unwrap(); - (data1, time1).cmp(&(data2, time2)) - } - fn from(list: Vec<(D, T, R)>) -> Self { - >::from(list) - } - } + use super::InternalMerge; - impl MergerChunk for Vec<(D, T, R)> { + impl InternalMerge for Vec<(D, T, R)> { type TimeOwned = T; - type DiffOwned = (); - fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { - if upper.less_equal(time) { - frontier.insert_with(&time, |time| time.clone()); - true + fn len(&self) -> usize { Vec::len(self) } + fn clear(&mut self) { Vec::clear(self) } + + fn merge_from( + &mut self, + others: &mut [Self], + positions: &mut [usize], + ) { + match others.len() { + 0 => {}, + 1 => { + let other = &mut others[0]; + let pos = &mut positions[0]; + if self.is_empty() && *pos == 0 { + std::mem::swap(self, other); + return; + } + self.extend_from_slice(&other[*pos ..]); + *pos = other.len(); + }, + 2 => { + let (left, right) = others.split_at_mut(1); + let other1 = &mut left[0]; + let other2 = &mut right[0]; + + while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() { + let (d1, t1, _) = &other1[positions[0]]; + let (d2, t2, _) = &other2[positions[1]]; + match (d1, t1).cmp(&(d2, t2)) { + Ordering::Less => { + self.push(other1[positions[0]].clone()); + positions[0] += 1; + } + Ordering::Greater => { + self.push(other2[positions[1]].clone()); + positions[1] += 1; + } + Ordering::Equal => { + let (d, t, mut r1) = other1[positions[0]].clone(); + let (_, _, ref r2) = other2[positions[1]]; + r1.plus_equals(r2); + if !r1.is_zero() { + self.push((d, t, r1)); + } + positions[0] += 1; + positions[1] += 1; + } + } + } + }, + n => unimplemented!("{n}-way merge not yet supported"), } - else { false } } - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) { - let (data, time, mut diff1) = item1; - let (_data, _time, diff2) = item2; - diff1.plus_equals(&diff2); - if !diff1.is_zero() { - self.push((data, time, diff1)); + + fn extract( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ) { + for (data, time, diff) in self.drain(..) { + if upper.less_equal(&time) { + frontier.insert_with(&time, |time| time.clone()); + keep.push((data, time, diff)); + } else { + ship.push((data, time, diff)); + } } } - fn account(&self) -> (usize, usize, usize, usize) { - let (size, capacity, allocations) = (0, 0, 0); - (self.len(), size, capacity, allocations) - } - #[inline] fn clear(&mut self) { Vec::clear(self) } } } - pub use columnation::ColMerger; - /// Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation). - pub mod columnation { - - use timely::progress::{Antichain, frontier::AntichainRef}; + /// Implementation of `InternalMerge` for `TimelyStack<(D, T, R)>`. + pub mod columnation_internal { + use std::cmp::Ordering; use columnation::Columnation; - + use timely::PartialOrder; + use timely::container::SizableContainer; + use timely::progress::frontier::{Antichain, AntichainRef}; use crate::containers::TimelyStack; use crate::difference::Semigroup; - - use super::{ContainerQueue, MergerChunk}; - - /// A `Merger` implementation backed by `TimelyStack` containers (columnation). - pub type ColMerger = super::ContainerMerger,TimelyStackQueue<(D, T, R)>>; - - /// TODO - pub struct TimelyStackQueue { - list: TimelyStack, - head: usize, - } - - impl ContainerQueue> for TimelyStackQueue<(D, T, R)> { - fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> { - if self.is_empty() { - Err(std::mem::take(&mut self.list)) - } - else { - Ok(self.pop()) - } - } - fn is_empty(&self) -> bool { - self.head == self.list[..].len() - } - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { - let (data1, time1, _) = self.peek(); - let (data2, time2, _) = other.peek(); - (data1, time1).cmp(&(data2, time2)) - } - fn from(list: TimelyStack<(D, T, R)>) -> Self { - TimelyStackQueue { list, head: 0 } - } - } - - impl TimelyStackQueue { - fn pop(&mut self) -> &T { - self.head += 1; - &self.list[self.head - 1] - } - - fn peek(&self) -> &T { - &self.list[self.head] - } - } - - impl MergerChunk for TimelyStack<(D, T, R)> { + use super::InternalMerge; + + impl InternalMerge for TimelyStack<(D, T, R)> + where + D: Ord + Columnation + Clone + 'static, + T: Ord + Columnation + Clone + PartialOrder + 'static, + R: Default + Semigroup + Columnation + Clone + 'static, + { type TimeOwned = T; - type DiffOwned = R; - fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { - if upper.less_equal(time) { - frontier.insert_with(&time, |time| time.clone()); - true - } - else { false } - } - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) { - let (data, time, diff1) = item1; - let (_data, _time, diff2) = item2; - stash.clone_from(diff1); - stash.plus_equals(&diff2); - if !stash.is_zero() { - self.copy_destructured(data, time, stash); - } - } + fn len(&self) -> usize { self[..].len() } + fn clear(&mut self) { TimelyStack::clear(self) } + fn account(&self) -> (usize, usize, usize, usize) { let (mut size, mut capacity, mut allocations) = (0, 0, 0); let cb = |siz, cap| { @@ -599,7 +540,80 @@ pub mod container { self.heap_size(cb); (self.len(), size, capacity, allocations) } - #[inline] fn clear(&mut self) { TimelyStack::clear(self) } + + fn merge_from( + &mut self, + others: &mut [Self], + positions: &mut [usize], + ) { + match others.len() { + 0 => {}, + 1 => { + let other = &mut others[0]; + let pos = &mut positions[0]; + if self[..].is_empty() && *pos == 0 { + std::mem::swap(self, other); + return; + } + for i in *pos .. other[..].len() { + self.copy(&other[i]); + } + *pos = other[..].len(); + }, + 2 => { + let (left, right) = others.split_at_mut(1); + let other1 = &left[0]; + let other2 = &right[0]; + + let mut stash = R::default(); + + while positions[0] < other1[..].len() && positions[1] < other2[..].len() && !self.at_capacity() { + let (d1, t1, _) = &other1[positions[0]]; + let (d2, t2, _) = &other2[positions[1]]; + match (d1, t1).cmp(&(d2, t2)) { + Ordering::Less => { + self.copy(&other1[positions[0]]); + positions[0] += 1; + } + Ordering::Greater => { + self.copy(&other2[positions[1]]); + positions[1] += 1; + } + Ordering::Equal => { + let (_, _, r1) = &other1[positions[0]]; + let (_, _, r2) = &other2[positions[1]]; + stash.clone_from(r1); + stash.plus_equals(r2); + if !stash.is_zero() { + let (d, t, _) = &other1[positions[0]]; + self.copy_destructured(d, t, &stash); + } + positions[0] += 1; + positions[1] += 1; + } + } + } + }, + n => unimplemented!("{n}-way merge not yet supported"), + } + } + + fn extract( + &mut self, + upper: AntichainRef, + frontier: &mut Antichain, + keep: &mut Self, + ship: &mut Self, + ) { + for (data, time, diff) in self.iter() { + if upper.less_equal(time) { + frontier.insert_with(time, |time| time.clone()); + keep.copy_destructured(data, time, diff); + } else { + ship.copy_destructured(data, time, diff); + } + } + } } } } diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index b6f9a198e..ac33dd5d5 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -13,7 +13,8 @@ use std::rc::Rc; use crate::containers::TimelyStack; use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; -use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; +use crate::trace::implementations::merge_batcher::MergeBatcher; +use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger}; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Layout, Vector, TStack}; @@ -24,7 +25,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. -pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; +pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecInternalMerger<(K, V), T, R>>; /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -34,14 +35,14 @@ pub type RcOrdValBuilder = RcBuilder = Spine>>>; /// A batcher for columnar storage. -pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColInternalMerger<(K,V),T,R>>; /// A builder for columnar storage. pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. -pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>; +pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecInternalMerger<(K, ()), T, R>>; /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; @@ -51,7 +52,7 @@ pub type RcOrdKeyBuilder = RcBuilder /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine>>>; /// A batcher for columnar storage -pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; +pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColInternalMerger<(K,()),T,R>>; /// A builder for columnar storage pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 71d20b69d..ca935afc6 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -13,7 +13,8 @@ use serde::{Deserialize, Serialize}; use crate::Hashable; use crate::containers::TimelyStack; use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; -use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; +use crate::trace::implementations::merge_batcher::MergeBatcher; +use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::rc_blanket_impls::RcBuilder; @@ -24,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine>>>; /// A batcher for ordered lists. -pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; +pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecInternalMerger<(K, V), T, R>>; /// A builder for ordered lists. pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -34,7 +35,7 @@ pub type VecBuilder = RcBuilder, Vec< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine>>>; /// A batcher for columnar storage. -pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColInternalMerger<(K,V),T,R>>; /// A builder for columnar storage. pub type ColBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; From fff3af95d34af9868509c7992c2cdaea5e8abfa5 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 20 Mar 2026 06:31:41 -0400 Subject: [PATCH 3/3] Replace VecChunker with ContainerChunker --- differential-dataflow/src/consolidation.rs | 33 +++++ .../src/trace/implementations/chunker.rs | 131 +----------------- .../src/trace/implementations/ord_neu.rs | 6 +- .../src/trace/implementations/rhh.rs | 4 +- 4 files changed, 44 insertions(+), 130 deletions(-) diff --git a/differential-dataflow/src/consolidation.rs b/differential-dataflow/src/consolidation.rs index a56a42530..43d403d38 100644 --- a/differential-dataflow/src/consolidation.rs +++ b/differential-dataflow/src/consolidation.rs @@ -11,6 +11,7 @@ //! specific behavior you require. use std::collections::VecDeque; +use columnation::Columnation; use timely::container::{ContainerBuilder, PushInto}; use crate::Data; use crate::difference::Semigroup; @@ -259,6 +260,38 @@ impl Consolidate for Vec<(D, T, R)> { } } +impl Consolidate for crate::containers::TimelyStack<(D, T, R)> { + fn len(&self) -> usize { self[..].len() } + fn clear(&mut self) { crate::containers::TimelyStack::clear(self) } + fn consolidate_into(&mut self, target: &mut Self) { + let len = self[..].len(); + let mut indices: Vec = (0..len).collect(); + indices.sort_unstable_by(|&i, &j| { + let (d1, t1, _) = &self[i]; + let (d2, t2, _) = &self[j]; + (d1, t1).cmp(&(d2, t2)) + }); + target.clear(); + let mut idx = 0; + while idx < indices.len() { + let (d, t, r) = &self[indices[idx]]; + let mut r_owned = r.clone(); + idx += 1; + while idx < indices.len() { + let (d2, t2, r2) = &self[indices[idx]]; + if d == d2 && t == t2 { + r_owned.plus_equals(r2); + idx += 1; + } else { break; } + } + if !r_owned.is_zero() { + target.copy_destructured(d, t, &r_owned); + } + } + self.clear(); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/differential-dataflow/src/trace/implementations/chunker.rs b/differential-dataflow/src/trace/implementations/chunker.rs index ac2048e14..93a9e516f 100644 --- a/differential-dataflow/src/trace/implementations/chunker.rs +++ b/differential-dataflow/src/trace/implementations/chunker.rs @@ -10,121 +10,12 @@ use crate::containers::TimelyStack; use crate::consolidation::{consolidate_updates, Consolidate}; use crate::difference::Semigroup; -/// Chunk a stream of vectors into chains of vectors. -pub struct VecChunker { - pending: Vec, - ready: VecDeque>, - empty: Option>, -} - -impl Default for VecChunker { - fn default() -> Self { - Self { - pending: Vec::default(), - ready: VecDeque::default(), - empty: None, - } - } -} - -impl VecChunker<((K, V), T, R)> -where - K: Ord, - V: Ord, - T: Ord, - R: Semigroup, -{ - const BUFFER_SIZE_BYTES: usize = 8 << 10; - fn chunk_capacity() -> usize { - let size = ::std::mem::size_of::<((K, V), T, R)>(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 - } - } - - /// Form chunks out of pending data, if needed. This function is meant to be applied to - /// potentially full buffers, and ensures that if the buffer was full when called it is at most - /// half full when the function returns. - /// - /// `form_chunk` does the following: - /// * If pending is full, consolidate. - /// * If after consolidation it's more than half full, peel off chunks, - /// leaving behind any partial chunk in pending. - fn form_chunk(&mut self) { - consolidate_updates(&mut self.pending); - if self.pending.len() >= Self::chunk_capacity() { - while self.pending.len() > Self::chunk_capacity() { - let mut chunk = Vec::with_capacity(Self::chunk_capacity()); - chunk.extend(self.pending.drain(..chunk.capacity())); - self.ready.push_back(chunk); - } - } - } -} - -impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)> -where - K: Ord + Clone, - V: Ord + Clone, - T: Ord + Clone, - R: Semigroup + Clone, -{ - fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) { - // Ensure `self.pending` has the desired capacity. We should never have a larger capacity - // because we don't write more than capacity elements into the buffer. - // Important: Consolidation requires `pending` to have twice the chunk capacity to - // amortize its cost. Otherwise, it risks to do quadratic work. - if self.pending.capacity() < Self::chunk_capacity() * 2 { - self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); - } - - let mut drain = container.drain(..).peekable(); - while drain.peek().is_some() { - self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } - } -} - -impl ContainerBuilder for VecChunker<((K, V), T, R)> -where - K: Ord + Clone + 'static, - V: Ord + Clone + 'static, - T: Ord + Clone + 'static, - R: Semigroup + Clone + 'static, -{ - type Container = Vec<((K, V), T, R)>; - - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(ready) = self.ready.pop_front() { - self.empty = Some(ready); - self.empty.as_mut() - } else { - None - } - } - - fn finish(&mut self) -> Option<&mut Self::Container> { - if !self.pending.is_empty() { - consolidate_updates(&mut self.pending); - while !self.pending.is_empty() { - let mut chunk = Vec::with_capacity(Self::chunk_capacity()); - chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity()))); - self.ready.push_back(chunk); - } - } - self.empty = self.ready.pop_front(); - self.empty.as_mut() - } -} - -/// Chunk a stream of vectors into chains of vectors. +/// Chunk a stream of vectors into chains of columnation stacks. +/// +/// This chunker accumulates into a `Vec` (not a `TimelyStack`) for efficient +/// in-place sorting and consolidation, then copies the consolidated results +/// into `TimelyStack` chunks. This avoids the cost of sorting through +/// columnation indirection. pub struct ColumnationChunker { pending: Vec, ready: VecDeque>, @@ -159,14 +50,6 @@ where } } - /// Form chunks out of pending data, if needed. This function is meant to be applied to - /// potentially full buffers, and ensures that if the buffer was full when called it is at most - /// half full when the function returns. - /// - /// `form_chunk` does the following: - /// * If pending is full, consolidate. - /// * If after consolidation it's more than half full, peel off chunks, - /// leaving behind any partial chunk in pending. fn form_chunk(&mut self) { consolidate_updates(&mut self.pending); if self.pending.len() >= Self::chunk_capacity() { @@ -188,8 +71,6 @@ where R: Columnation + Semigroup + Clone, { fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) { - // Ensure `self.pending` has the desired capacity. We should never have a larger capacity - // because we don't write more than capacity elements into the buffer. if self.pending.capacity() < Self::chunk_capacity() * 2 { self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); } diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index ac33dd5d5..14bc8a2db 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -11,7 +11,7 @@ use std::rc::Rc; use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; +use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::MergeBatcher; use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger}; @@ -25,7 +25,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. -pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecInternalMerger<(K, V), T, R>>; +pub type OrdValBatcher = MergeBatcher, ContainerChunker>, VecInternalMerger<(K, V), T, R>>; /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -42,7 +42,7 @@ pub type ColValBuilder = RcBuilder /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. -pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecInternalMerger<(K, ()), T, R>>; +pub type OrdKeyBatcher = MergeBatcher, ContainerChunker>, VecInternalMerger<(K, ()), T, R>>; /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index ca935afc6..bf2f252a9 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use crate::Hashable; use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; +use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker}; use crate::trace::implementations::merge_batcher::MergeBatcher; use crate::trace::implementations::merge_batcher::container::{VecInternalMerger, ColInternalMerger}; use crate::trace::implementations::spine_fueled::Spine; @@ -25,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine>>>; /// A batcher for ordered lists. -pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecInternalMerger<(K, V), T, R>>; +pub type VecBatcher = MergeBatcher, ContainerChunker>, VecInternalMerger<(K, V), T, R>>; /// A builder for ordered lists. pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>;