Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {

// Get the header and payload, ditch the header.
let mut peel = bytes.extract_to(header.required_bytes());
let _ = peel.extract_to(::std::mem::size_of::<MessageHeader>());
let _ = peel.extract_to(header.header_bytes());

// Increment message count for channel.
// Safe to do this even if the channel has been dropped.
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/hashjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn main() {
sent += to_send;

// Advance input, iterate until data cleared.
let next = input1.epoch() + 1;
let next = input1.time() + 1;
input1.advance_to(next);
input2.advance_to(next);
while probe.less_than(input1.time()) {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/unionfind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn main() {
for (edge, arc) in insert.take(edges / peers).enumerate() {
input.send(arc);
if edge % batch == (batch - 1) {
let next = input.epoch() + 1;
let next = input.time() + 1;
input.advance_to(next);
while probe.less_than(input.time()) {
worker.step();
Expand Down
5 changes: 0 additions & 5 deletions timely/src/dataflow/operators/core/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,6 @@ impl<T: Timestamp, CB: ContainerBuilder<Container: Clone>> Handle<T, CB> {
/// and to begin to shut down operators, as this input can no longer produce data.
pub fn close(self) { }

/// Reports the current epoch.
pub fn epoch(&self) -> &T {
&self.now_at
}

/// Reports the current timestamp.
pub fn time(&self) -> &T {
&self.now_at
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/generic/notificator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn notificator_delivers_notifications_in_topo_order() {
use crate::order::Product;
use crate::dataflow::operators::capability::Capability;

let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0));
let mut frontier = MutableAntichain::from_elem(Product::new(0, 0));

let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));

Expand Down
4 changes: 2 additions & 2 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ impl<T:Timestamp+Send> Progcaster<T> {
// options for improving it if performance limits users who want other logging.
self.progress_logging.as_ref().map(|l| {

let mut messages = Vec::with_capacity(changes.len());
let mut internal = Vec::with_capacity(changes.len());
let mut messages = Vec::with_capacity(recv_changes.len());
let mut internal = Vec::with_capacity(recv_changes.len());

for ((location, time), diff) in recv_changes.iter() {

Expand Down
16 changes: 8 additions & 8 deletions timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,10 @@ impl<T> Antichain<T> {
///
/// let mut frontier = Antichain::from_elem(2);
///```
pub fn from_elem(element: T) -> Antichain<T> {
pub fn from_elem(element: T) -> Antichain<T> {
let mut elements = SmallVec::with_capacity(1);
elements.push(element);
Antichain { elements }
Antichain { elements }
}

/// Clears the contents of the antichain.
Expand Down Expand Up @@ -440,11 +440,11 @@ impl<T> MutableAntichain<T> {
///```
/// use timely::progress::frontier::{AntichainRef, MutableAntichain};
///
/// let mut frontier = MutableAntichain::new_bottom(0u64);
/// let mut frontier = MutableAntichain::from_elem(0u64);
/// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
///```
#[inline]
pub fn new_bottom(bottom: T) -> MutableAntichain<T>
pub fn from_elem(bottom: T) -> MutableAntichain<T>
where
T: Ord+Clone,
{
Expand Down Expand Up @@ -477,7 +477,7 @@ impl<T> MutableAntichain<T> {
///```
/// use timely::progress::frontier::MutableAntichain;
///
/// let mut frontier = MutableAntichain::new_bottom(1u64);
/// let mut frontier = MutableAntichain::from_elem(1u64);
/// assert!(!frontier.less_than(&0));
/// assert!(!frontier.less_than(&1));
/// assert!(frontier.less_than(&2));
Expand All @@ -497,7 +497,7 @@ impl<T> MutableAntichain<T> {
///```
/// use timely::progress::frontier::MutableAntichain;
///
/// let mut frontier = MutableAntichain::new_bottom(1u64);
/// let mut frontier = MutableAntichain::from_elem(1u64);
/// assert!(!frontier.less_equal(&0));
/// assert!(frontier.less_equal(&1));
/// assert!(frontier.less_equal(&2));
Expand All @@ -517,7 +517,7 @@ impl<T> MutableAntichain<T> {
///```
/// use timely::progress::frontier::{AntichainRef, MutableAntichain};
///
/// let mut frontier = MutableAntichain::new_bottom(1u64);
/// let mut frontier = MutableAntichain::from_elem(1u64);
/// let changes =
/// frontier
/// .update_iter(vec![(1, -1), (2, 7)])
Expand Down Expand Up @@ -621,7 +621,7 @@ pub trait MutableAntichainFilter<T: PartialOrder+Ord+Clone> {
/// ```
/// use timely::progress::frontier::{MutableAntichain, MutableAntichainFilter};
///
/// let mut frontier = MutableAntichain::new_bottom(1u64);
/// let mut frontier = MutableAntichain::from_elem(1u64);
/// let changes =
/// vec![(1, -1), (2, 7)]
/// .filter_through(&mut frontier)
Expand Down
2 changes: 1 addition & 1 deletion timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Config {
#[cfg(feature = "getopts")]
pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
let progress_mode = matches
.opt_get_default("progress-mode", ProgressMode::Eager)?;
.opt_get_default("progress-mode", ProgressMode::Demand)?;
Ok(Config::default().progress_mode(progress_mode))
}

Expand Down
Loading