Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 50 additions & 102 deletions differential-dataflow/src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
//! specific behavior you require.

use std::cmp::Ordering;
use std::collections::VecDeque;
use timely::container::{ContainerBuilder, DrainContainer, PushInto};
use columnation::Columnation;
use timely::container::{ContainerBuilder, PushInto};
use crate::Data;
use crate::difference::{IsZero, Semigroup};
use crate::difference::Semigroup;

/// Sorts and consolidates `vec`.
///
Expand Down Expand Up @@ -232,115 +232,63 @@ where
}
}

/// Layout of containers and their read items to be consolidated.
/// A container that can sort and consolidate its contents internally.
///
/// This trait specifies behavior to extract keys and diffs from container's read
/// items. Consolidation accumulates the diffs per key.
/// The container knows its own layout — how to sort its elements, how to
/// compare adjacent entries, and how to merge diffs. The caller provides
/// a `target` container to receive the consolidated output, allowing
/// reuse of allocations across calls.
///
/// The trait requires `Container` to have access to its `Item` GAT.
pub trait ConsolidateLayout: DrainContainer {
/// Key portion of data, essentially everything minus the diff
type Key<'a>: Eq where Self: 'a;

/// GAT diff type.
type Diff<'a>;

/// Owned diff type.
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;

/// Converts a reference diff into an owned diff.
fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned;

/// Deconstruct an item into key and diff. Must be cheap.
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);

/// Push an element to a compatible container.
///
/// This function is odd to have, so let's explain why it exists. Ideally, the container
/// would accept a `(key, diff)` pair and we wouldn't need this function. However, we
/// might never be in a position where this is true: Vectors can push any `T`, which would
/// collide with a specific implementation for pushing tuples of mixes GATs and owned types.
///
/// For this reason, we expose a function here that takes a GAT key and an owned diff, and
/// leave it to the implementation to "patch" a suitable item that can be pushed into `self`.
fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned);

/// Compare two items by key to sort containers.
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;

/// Returns the number of items in the container.
/// After the call, `target` contains the sorted, consolidated data and
/// `self` may be empty or in an unspecified state (implementations should
/// document this).
pub trait Consolidate {
/// The number of elements in the container.
fn len(&self) -> usize;

/// Clear the container. Afterwards, `len()` should return 0.
/// Clear the container.
fn clear(&mut self);
/// Sort and consolidate `self` into `target`.
fn consolidate_into(&mut self, target: &mut Self);
}

/// Consolidate the supplied container.
impl<D: Ord, T: Ord, R: Semigroup> Consolidate for Vec<(D, T, R)> {
fn len(&self) -> usize { Vec::len(self) }
fn clear(&mut self) { Vec::clear(self) }
fn consolidate_into(&mut self, target: &mut Self) {
// Sort input data
let mut permutation = Vec::with_capacity(self.len());
permutation.extend(self.drain());
permutation.sort_by(|a, b| Self::cmp(a, b));

// Iterate over the data, accumulating diffs for like keys.
let mut iter = permutation.drain(..);
if let Some(item) = iter.next() {

let (k, d) = Self::into_parts(item);
let mut prev_key = k;
let mut prev_diff = Self::owned_diff(d);

for item in iter {
let (next_key, next_diff) = Self::into_parts(item);
if next_key == prev_key {
prev_diff.plus_equals(&next_diff);
}
else {
if !prev_diff.is_zero() {
target.push_with_diff(prev_key, prev_diff);
}
prev_key = next_key;
prev_diff = Self::owned_diff(next_diff);
}
}

if !prev_diff.is_zero() {
target.push_with_diff(prev_key, prev_diff);
}
}
consolidate_updates(self);
std::mem::swap(self, target);
}
}

impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
where
D: Ord + Clone + 'static,
T: Ord + Clone + 'static,
R: Semigroup + Clone + 'static,
{
type Key<'a> = (D, T) where Self: 'a;
type Diff<'a> = R where Self: 'a;
type DiffOwned = R;

fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff }

fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((data, time), diff)
}

fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering {
(&item1.0, &item1.1).cmp(&(&item2.0, &item2.1))
}

fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) {
self.push((data, time, diff));
}

#[inline] fn len(&self) -> usize { Vec::len(self) }
#[inline] fn clear(&mut self) { Vec::clear(self) }

/// Consolidate the supplied container.
impl<D: Ord + Columnation, T: Ord + Columnation, R: Semigroup + Columnation> Consolidate for crate::containers::TimelyStack<(D, T, R)> {
fn len(&self) -> usize { self[..].len() }
fn clear(&mut self) { crate::containers::TimelyStack::clear(self) }
fn consolidate_into(&mut self, target: &mut Self) {
consolidate_updates(self);
std::mem::swap(self, target);
let len = self[..].len();
let mut indices: Vec<usize> = (0..len).collect();
indices.sort_unstable_by(|&i, &j| {
let (d1, t1, _) = &self[i];
let (d2, t2, _) = &self[j];
(d1, t1).cmp(&(d2, t2))
});
target.clear();
let mut idx = 0;
while idx < indices.len() {
let (d, t, r) = &self[indices[idx]];
let mut r_owned = r.clone();
idx += 1;
while idx < indices.len() {
let (d2, t2, r2) = &self[indices[idx]];
if d == d2 && t == t2 {
r_owned.plus_equals(r2);
idx += 1;
} else { break; }
}
if !r_owned.is_zero() {
target.copy_destructured(d, t, &r_owned);
}
}
self.clear();
}
}

Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{Data, VecCollection, AsCollection};
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
use crate::trace::implementations::merge_batcher::container::MergerChunk;
use crate::trace::implementations::merge_batcher::container::InternalMerge;

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
Expand Down Expand Up @@ -259,7 +259,7 @@ where
Time=T1::Time,
Diff: Abelian,
>+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
{
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
Expand All @@ -281,7 +281,7 @@ where
ValOwn: Data,
Time=T1::Time,
>+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::operators::arrange::{Arranged, TraceAgent};
use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
use crate::trace::cursor::CursorList;
use crate::trace::implementations::containers::BatchContainer;
use crate::trace::implementations::merge_batcher::container::MergerChunk;
use crate::trace::implementations::merge_batcher::container::InternalMerge;
use crate::trace::TraceReader;

/// A key-wise reduction of values in an input trace.
Expand All @@ -31,7 +31,7 @@ where
G: Scope<Timestamp=T1::Time>,
T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static,
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: InternalMerge + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down
137 changes: 9 additions & 128 deletions differential-dataflow/src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,124 +7,15 @@ use timely::Container;
use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};

use crate::containers::TimelyStack;
use crate::consolidation::{consolidate_updates, ConsolidateLayout};
use crate::consolidation::{consolidate_updates, Consolidate};
use crate::difference::Semigroup;

/// Chunk a stream of vectors into chains of vectors.
pub struct VecChunker<T> {
pending: Vec<T>,
ready: VecDeque<Vec<T>>,
empty: Option<Vec<T>>,
}

impl<T> Default for VecChunker<T> {
fn default() -> Self {
Self {
pending: Vec::default(),
ready: VecDeque::default(),
empty: None,
}
}
}

impl<K, V, T, R> VecChunker<((K, V), T, R)>
where
K: Ord,
V: Ord,
T: Ord,
R: Semigroup,
{
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity() -> usize {
let size = ::std::mem::size_of::<((K, V), T, R)>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Self::BUFFER_SIZE_BYTES / size
} else {
1
}
}

/// Form chunks out of pending data, if needed. This function is meant to be applied to
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
/// half full when the function returns.
///
/// `form_chunk` does the following:
/// * If pending is full, consolidate.
/// * If after consolidation it's more than half full, peel off chunks,
/// leaving behind any partial chunk in pending.
fn form_chunk(&mut self) {
consolidate_updates(&mut self.pending);
if self.pending.len() >= Self::chunk_capacity() {
while self.pending.len() > Self::chunk_capacity() {
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
chunk.extend(self.pending.drain(..chunk.capacity()));
self.ready.push_back(chunk);
}
}
}
}

impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
where
K: Ord + Clone,
V: Ord + Clone,
T: Ord + Clone,
R: Semigroup + Clone,
{
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
// Important: Consolidation requires `pending` to have twice the chunk capacity to
// amortize its cost. Otherwise, it risks to do quadratic work.
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
}

impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
where
K: Ord + Clone + 'static,
V: Ord + Clone + 'static,
T: Ord + Clone + 'static,
R: Semigroup + Clone + 'static,
{
type Container = Vec<((K, V), T, R)>;

fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = Some(ready);
self.empty.as_mut()
} else {
None
}
}

fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.pending.is_empty() {
consolidate_updates(&mut self.pending);
while !self.pending.is_empty() {
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())));
self.ready.push_back(chunk);
}
}
self.empty = self.ready.pop_front();
self.empty.as_mut()
}
}

/// Chunk a stream of vectors into chains of vectors.
/// Chunk a stream of vectors into chains of columnation stacks.
///
/// This chunker accumulates into a `Vec` (not a `TimelyStack`) for efficient
/// in-place sorting and consolidation, then copies the consolidated results
/// into `TimelyStack` chunks. This avoids the cost of sorting through
/// columnation indirection.
pub struct ColumnationChunker<T: Columnation> {
pending: Vec<T>,
ready: VecDeque<TimelyStack<T>>,
Expand Down Expand Up @@ -159,14 +50,6 @@ where
}
}

/// Form chunks out of pending data, if needed. This function is meant to be applied to
/// potentially full buffers, and ensures that if the buffer was full when called it is at most
/// half full when the function returns.
///
/// `form_chunk` does the following:
/// * If pending is full, consolidate.
/// * If after consolidation it's more than half full, peel off chunks,
/// leaving behind any partial chunk in pending.
fn form_chunk(&mut self) {
consolidate_updates(&mut self.pending);
if self.pending.len() >= Self::chunk_capacity() {
Expand All @@ -188,8 +71,6 @@ where
R: Columnation + Semigroup + Clone,
{
fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}
Expand Down Expand Up @@ -257,7 +138,7 @@ where
Input: DrainContainer,
Output: Default
+ SizableContainer
+ ConsolidateLayout
+ Consolidate
+ PushInto<Input::Item<'a>>,
{
fn push_into(&mut self, container: &'a mut Input) {
Expand All @@ -283,7 +164,7 @@ where

impl<Output> ContainerBuilder for ContainerChunker<Output>
where
Output: SizableContainer + ConsolidateLayout + Container,
Output: SizableContainer + Consolidate + Container,
{
type Container = Output;

Expand Down
Loading
Loading