Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion differential-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ mimalloc = "0.1.48"

[dependencies]
columnar = { workspace = true }
columnation = "0.1.1"
fnv="1.0.2"
paste = "1.0"
serde = { version = "1.0", features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn main() {
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G>(edges: VecCollection<G, Edge>, roots: VecCollection<G, Node>) -> VecCollection<G, (Node, u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn main() {
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G>(edges: VecCollection<G, Edge>, roots: VecCollection<G, Node>) -> VecCollection<G, (Node, u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
{
use timely::order::Product;
use iterate::Variable;
Expand Down
8 changes: 4 additions & 4 deletions differential-dataflow/examples/graspan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ type Arrange<G,K,V,R> = Arranged<G, TraceValHandle<K, V, <G as ScopeParent>::Tim
///
/// An edge variable provides arranged representations of its contents, even before they are
/// completely defined, in support of recursively defined productions.
pub struct EdgeVariable<G: Scope<Timestamp: Lattice>> {
pub struct EdgeVariable<G: Scope<Timestamp: Lattice+columnar::Columnar>> {
variable: VecVariable<G, Edge, Diff>,
collection: VecCollection<G, Edge, Diff>,
current: VecCollection<G, Edge, Diff>,
forward: Option<Arrange<G, Node, Node, Diff>>,
reverse: Option<Arrange<G, Node, Node, Diff>>,
}

impl<G: Scope<Timestamp: Lattice>> EdgeVariable<G> {
impl<G: Scope<Timestamp: Lattice+columnar::Columnar>> EdgeVariable<G> {
/// Creates a new variable initialized with `source`.
pub fn from(source: VecCollection<G, Edge>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (variable, collection) = VecVariable::new(&mut source.scope(), step);
Expand Down Expand Up @@ -129,7 +129,7 @@ impl<G: Scope<Timestamp: Lattice>> EdgeVariable<G> {
}

/// Handles to inputs and outputs of a computation.
pub struct RelationHandles<T: Timestamp+Lattice> {
pub struct RelationHandles<T: Timestamp+Lattice+columnar::Columnar> {
/// An input handle supporting arbitrary changes.
pub input: InputSession<T, Edge, Diff>,
/// An output trace handle which can be used in other computations.
Expand All @@ -153,7 +153,7 @@ impl Query {
/// Creates a dataflow implementing the query, and returns input and trace handles.
pub fn render_in<G>(&self, scope: &mut G) -> BTreeMap<String, RelationHandles<G::Timestamp>>
where
G: Scope<Timestamp: Lattice+::timely::order::TotalOrder>,
G: Scope<Timestamp: Lattice+columnar::Columnar+::timely::order::TotalOrder>,
{
// Create new input (handle, stream) pairs
let mut input_map = BTreeMap::new();
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/interpreted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn main() {

fn interpret<G>(edges: VecCollection<G, Edge>, relations: &[(usize, usize)]) -> VecCollection<G, Vec<Node>>
where
G: Scope<Timestamp: Lattice+Hash+Ord>,
G: Scope<Timestamp: Lattice+Hash+Ord+columnar::Columnar>,
{

// arrange the edge relation three ways.
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use differential_dataflow::lattice::Lattice;
type Node = u32;
type Edge = (Node, Node);

#[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
#[derive(Copy, Ord, PartialOrd, Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash, columnar::Columnar)]
pub struct MinSum {
value: u32,
}
Expand Down Expand Up @@ -124,7 +124,7 @@ fn main() {
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G>(edges: VecCollection<G, Edge, MinSum>, roots: VecCollection<G, Node, MinSum>) -> VecCollection<G, Node, MinSum>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
{
// repeatedly update minimal distances each node can be reached from each root
roots.scope().iterative::<u32,_,_>(|scope| {
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/examples/multitemporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn main() {
mod pair {

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, columnar::Columnar)]
pub struct Pair<S, T> {
pub first: S,
pub second: T,
Expand Down Expand Up @@ -223,7 +223,7 @@ mod vector {
use serde::{Deserialize, Serialize};

/// A pair of timestamps, partially ordered by the product order.
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, columnar::Columnar)]
pub struct Vector<T> {
pub vector: Vec<T>,
}
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/pagerank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn main() {
// to its PageRank in the input graph `edges`.
fn pagerank<G>(iters: Iter, edges: VecCollection<G, Edge, Diff>) -> VecCollection<G, Node, Diff>
where
G: Scope<Timestamp: Lattice>,
G: Scope<Timestamp: Lattice+columnar::Columnar>,
{
// initialize many surfers at each node.
let nodes =
Expand Down
63 changes: 36 additions & 27 deletions differential-dataflow/examples/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ use differential_dataflow::lattice::Lattice;
use timely::progress::{Timestamp, Source, Target, Location};
use timely::progress::timestamp::PathSummary;

// Columnar-compatible representations of Source and Target as (node, port) tuples.
type Src = (usize, usize);
type Tgt = (usize, usize);

fn src(s: Source) -> Src { (s.node, s.port) }
fn tgt(t: Target) -> Tgt { (t.node, t.port) }
fn to_source(s: &Src) -> Source { Source::new(s.0, s.1) }
fn to_target(t: &Tgt) -> Target { Target::new(t.0, t.1) }

fn main() {

timely::execute_from_args(std::env::args(), move |worker| {
Expand Down Expand Up @@ -46,15 +55,15 @@ fn main() {

// A PageRank-like graph, as represented here:
// https://github.com/TimelyDataflow/diagnostics/blob/master/examples/pagerank.png
nodes.insert((Target::new(2, 0), Source::new(2, 0), 1));
nodes.insert((Target::new(3, 0), Source::new(3, 0), 0));
nodes.insert((Target::new(3, 1), Source::new(3, 0), 0));
nodes.insert((Target::new(4, 0), Source::new(4, 0), 0));
nodes.insert((tgt(Target::new(2, 0)), src(Source::new(2, 0)), 1));
nodes.insert((tgt(Target::new(3, 0)), src(Source::new(3, 0)), 0));
nodes.insert((tgt(Target::new(3, 1)), src(Source::new(3, 0)), 0));
nodes.insert((tgt(Target::new(4, 0)), src(Source::new(4, 0)), 0));

edges.insert((Source::new(1, 0), Target::new(3, 0)));
edges.insert((Source::new(3, 0), Target::new(4, 0)));
edges.insert((Source::new(4, 0), Target::new(2, 0)));
edges.insert((Source::new(2, 0), Target::new(3, 1)));
edges.insert((src(Source::new(1, 0)), tgt(Target::new(3, 0))));
edges.insert((src(Source::new(3, 0)), tgt(Target::new(4, 0))));
edges.insert((src(Source::new(4, 0)), tgt(Target::new(2, 0))));
edges.insert((src(Source::new(2, 0)), tgt(Target::new(3, 1))));

// Initially no capabilities.
nodes.advance_to(1); nodes.flush();
Expand Down Expand Up @@ -115,17 +124,17 @@ fn main() {
/// The computation to determine this, and to maintain it as times change, is an iterative
/// computation that propagates times and maintains the minimal elements at each location.
fn frontier<G, T>(
nodes: VecCollection<G, (Target, Source, T::Summary)>,
edges: VecCollection<G, (Source, Target)>,
nodes: VecCollection<G, (Tgt, Src, T::Summary)>,
edges: VecCollection<G, (Src, Tgt)>,
times: VecCollection<G, (Location, T)>,
) -> VecCollection<G, (Location, T)>
where
G: Scope<Timestamp: Lattice+Ord>,
T: Timestamp<Summary: differential_dataflow::ExchangeData>+std::hash::Hash,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
T: Timestamp<Summary: differential_dataflow::ExchangeData>+std::hash::Hash+columnar::Columnar,
{
// Translate node and edge transitions into a common Location to Location edge with an associated Summary.
let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary)));
let edges = edges.map(|(source, target)| (Location::from(source), (Location::from(target), Default::default())));
let nodes = nodes.map(|(target, source, summary)| (Location::from(to_target(&target)), (Location::from(to_source(&source)), summary)));
let edges = edges.map(|(source, target)| (Location::from(to_source(&source)), (Location::from(to_target(&target)), Default::default())));
let transitions: VecCollection<G, (Location, (Location, T::Summary))> = nodes.concat(edges);

times
Expand All @@ -150,24 +159,24 @@ where

/// Summary paths from locations to operator zero inputs.
fn summarize<G, T>(
nodes: VecCollection<G, (Target, Source, T::Summary)>,
edges: VecCollection<G, (Source, Target)>,
nodes: VecCollection<G, (Tgt, Src, T::Summary)>,
edges: VecCollection<G, (Src, Tgt)>,
) -> VecCollection<G, (Location, (Location, T::Summary))>
where
G: Scope<Timestamp: Lattice+Ord>,
T: Timestamp<Summary: differential_dataflow::ExchangeData+std::hash::Hash>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
T: Timestamp<Summary: differential_dataflow::ExchangeData+std::hash::Hash>+columnar::Columnar,
{
// Start from trivial reachability from each input to itself.
let zero_inputs =
edges
.clone()
.map(|(_source, target)| Location::from(target))
.map(|(_source, target)| Location::from(to_target(&target)))
.filter(|location| location.node == 0)
.map(|location| (location, (location, Default::default())));

// Retain node connections along "default" timestamp summaries.
let nodes = nodes.map(|(target, source, summary)| (Location::from(source), (Location::from(target), summary)));
let edges = edges.map(|(source, target)| (Location::from(target), (Location::from(source), Default::default())));
let nodes = nodes.map(|(target, source, summary)| (Location::from(to_source(&source)), (Location::from(to_target(&target)), summary)));
let edges = edges.map(|(source, target)| (Location::from(to_target(&target)), (Location::from(to_source(&source)), Default::default())));
let transitions: VecCollection<G, (Location, (Location, T::Summary))> = nodes.concat(edges);

zero_inputs
Expand Down Expand Up @@ -195,23 +204,23 @@ where

/// Identifies cycles along paths that do not increment timestamps.
fn find_cycles<G: Scope, T: Timestamp>(
nodes: VecCollection<G, (Target, Source, T::Summary)>,
edges: VecCollection<G, (Source, Target)>,
nodes: VecCollection<G, (Tgt, Src, T::Summary)>,
edges: VecCollection<G, (Src, Tgt)>,
) -> VecCollection<G, (Location, Location)>
where
G: Scope<Timestamp: Lattice+Ord>,
T: Timestamp<Summary: differential_dataflow::ExchangeData>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
T: Timestamp<Summary: differential_dataflow::ExchangeData>+columnar::Columnar,
{
// Retain node connections along "default" timestamp summaries.
let nodes = nodes.flat_map(|(target, source, summary)| {
if summary == Default::default() {
Some((Location::from(target), Location::from(source)))
Some((Location::from(to_target(&target)), Location::from(to_source(&source))))
}
else {
None
}
});
let edges = edges.map(|(source, target)| (Location::from(source), Location::from(target)));
let edges = edges.map(|(source, target)| (Location::from(to_source(&source)), Location::from(to_target(&target))));
let transitions: VecCollection<G, (Location, Location)> = nodes.concat(edges);

// Repeatedly restrict to locations with an incoming path.
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ fn main() {

match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
keys.join_core(data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/stackoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn main() {
// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G>(edges: VecCollection<G, Edge>, roots: VecCollection<G, Node>) -> VecCollection<G, (Node, u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/algorithms/graphs/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::lattice::Lattice;
/// Returns pairs (node, dist) indicating distance of each node from a root.
pub fn bfs<G, N>(edges: VecCollection<G, (N,N)>, roots: VecCollection<G, N>) -> VecCollection<G, (N,u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
N: ExchangeData+Hash,
{
let edges = edges.arrange_by_key();
Expand All @@ -26,7 +26,7 @@ pub fn bfs_arranged<G, N, Tr>(edges: Arranged<G, Tr>, roots: VecCollection<G, N>
where
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Diff=isize>+Clone+'static,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/algorithms/graphs/bijkstra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::operators::iterate::Variable;
/// could be good insurance here.
pub fn bidijkstra<G, N>(edges: VecCollection<G, (N,N)>, goals: VecCollection<G, (N,N)>) -> VecCollection<G, ((N,N), u32)>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
N: ExchangeData+Hash,
{
let forward = edges.clone().arrange_by_key();
Expand All @@ -41,7 +41,7 @@ pub fn bidijkstra_arranged<G, N, Tr>(
where
G: Scope<Timestamp=Tr::Time>,
N: ExchangeData+Hash,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=isize>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Diff=isize>+Clone+'static,
{
forward
.stream
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::difference::{Abelian, Multiply};
/// method to limit the introduction of labels.
pub fn propagate<G, N, L, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+columnar::Columnar>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -32,7 +32,7 @@ where
/// method to limit the introduction of labels.
pub fn propagate_at<G, N, L, F, R>(edges: VecCollection<G, (N,N), R>, nodes: VecCollection<G,(N,L),R>, logic: F) -> VecCollection<G,(N,L),R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+columnar::Columnar>,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -59,7 +59,7 @@ where
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Time:Hash, Diff=R>+Clone+'static,
Tr: TraceReader<Key=N, Val=N, Time:Hash, Diff=R>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
// Morally the code performs the following iterative computation. However, in the interest of a simplified
Expand Down
6 changes: 3 additions & 3 deletions differential-dataflow/src/algorithms/graphs/scc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::propagate::propagate;
/// Iteratively removes nodes with no in-edges.
pub fn trim<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord>,
G: Scope<Timestamp: Lattice+Ord+columnar::Columnar>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -35,7 +35,7 @@ where
/// Returns the subset of edges in the same strongly connected component.
pub fn strongly_connected<G, N, R>(graph: VecCollection<G, (N,N), R>) -> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+columnar::Columnar>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
Expand All @@ -51,7 +51,7 @@ where
fn trim_edges<G, N, R>(cycle: VecCollection<G, (N,N), R>, edges: VecCollection<G, (N,N), R>)
-> VecCollection<G, (N,N), R>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+columnar::Columnar>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output=R>,
Expand Down
4 changes: 2 additions & 2 deletions differential-dataflow/src/algorithms/graphs/sequential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::hashable::Hashable;

fn _color<G, N>(edges: VecCollection<G, (N,N)>) -> VecCollection<G,(N,Option<u32>)>
where
G: Scope<Timestamp: Lattice+Ord+Hash>,
G: Scope<Timestamp: Lattice+Ord+Hash+columnar::Columnar>,
N: ExchangeData+Hash,
{
// need some bogus initial values.
Expand Down Expand Up @@ -45,7 +45,7 @@ pub fn sequence<G, N, V, F>(
edges: VecCollection<G, (N,N)>,
logic: F) -> VecCollection<G, (N,Option<V>)>
where
G: Scope<Timestamp: Lattice+Hash+Ord>,
G: Scope<Timestamp: Lattice+Hash+Ord+columnar::Columnar>,
N: ExchangeData+Hashable,
V: ExchangeData,
F: Fn(&N, &[(&V, isize)])->V+'static
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/algorithms/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub trait Identifiers<G: Scope, D: ExchangeData, R: ExchangeData+Abelian> {

impl<G, D, R> Identifiers<G, D, R> for VecCollection<G, D, R>
where
G: Scope<Timestamp: Lattice>,
G: Scope<Timestamp: Lattice+columnar::Columnar>,
D: ExchangeData + ::std::hash::Hash,
R: ExchangeData + Abelian,
{
Expand Down
Loading
Loading