diff --git a/log.md b/log.md new file mode 100644 index 000000000..e9db36109 --- /dev/null +++ b/log.md @@ -0,0 +1,118 @@ +# Bug audit log + +## `timely/src/progress/change_batch.rs` + +### `compact()` does not remove single zero-valued entries + +`compact()` at line 292 guards the compaction body with `self.updates.len() > 1`. +A batch with a single `(key, 0)` entry will never have that entry removed. +Subsequent `is_empty()` returns `false` for a logically empty batch because the `clean > len/2` fast path fires. + +Reproduction: `ChangeBatch::::new_from(17, 0)` followed by `is_empty()` returns `false`. + +Severity: low — zero-valued updates are uncommon in practice, but the invariant that "compact entries are non-zero" is violated. + +Fix: change the guard to `self.updates.len() > 1` → `self.updates.len() >= 1`, or handle the single-element case separately with a retain check. + +## `timely/src/progress/broadcast.rs` + +### `recv` logging pre-allocates with wrong variable + +In `recv()` at line 120-121, the logging closure allocates: +```rust +let mut messages = Vec::with_capacity(changes.len()); +let mut internal = Vec::with_capacity(changes.len()); +``` +`changes` is the *output accumulator*, not the received message (`recv_changes`). +Compare with `send()` at line 66-67 where `changes` correctly refers to the data being sent. + +Severity: very low — wrong capacity hint in a logging-only path; no correctness impact. + +## `timely/src/dataflow/operators/generic/notificator.rs` + +### `OrderReversed` has inconsistent `PartialEq` and `Ord`, causing incorrect notification counts + +`OrderReversed` derives `PartialEq` (compares both `element: Capability` and `value: u64`) but implements `Ord` comparing only by `element.time()`. +`Capability::PartialEq` additionally checks `Rc::ptr_eq` on the internal change batch. + +In `next_count()` (line 342-348), the loop `while self.available.peek() == Some(&front)` uses `PartialEq` to merge same-time entries from the BinaryHeap. +This fails to merge entries that have: +* Different `value` fields (e.g., one consolidated with count 3, another directly inserted with count 1). +* Different `Rc` pointers in their `Capability` (capabilities from different cloning chains). + +Reproduction scenario: +1. `make_available` consolidates two pending notifications for time T into `(T, count=2)` and pushes to `available`. +2. `notify_at_frontiered` adds `(T, count=1)` directly to `available`. +3. `next_count` pops `(T, 2)`, peeks at `(T, 1)`, comparison returns false (different `value`), returns `(cap, 2)` instead of `(cap, 3)`. + +The next `next_count` call returns the leftover `(cap, 1)`, so the total is eventually correct but split across calls. + +Severity: medium — notification counts are inaccurate. +Users relying on `count` in `for_each(|cap, count, _| ...)` may see split notifications for the same time. +Functional correctness of the dataflow is unaffected (all notifications are delivered), but the count semantic is broken. + +Fix: change the `peek` comparison to compare only by time: +```rust +while self.available.peek().map(|x| x.element.time() == front.element.time()).unwrap_or(false) { +``` + +## `timely/src/worker.rs` + +### `Config::from_matches` uses wrong default for `progress_mode` + +`Config::from_matches` at line 115 uses `ProgressMode::Eager` as the fallback when `--progress-mode` is not specified: +```rust +let progress_mode = matches + .opt_get_default("progress-mode", ProgressMode::Eager)?; +``` + +However, the `Default` impl for `ProgressMode` (line 64) is `Demand`: +```rust +#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)] +pub enum ProgressMode { + Eager, + #[default] + Demand, +} +``` + +This means `Config::thread()` and `Config::process(n)` use `Demand` (via `Config::default()`), but `execute_from_args` without `--progress-mode` uses `Eager`. The documentation explicitly recommends `Demand` as the safer default. + +Reproduction: calling `execute_from_args(std::env::args(), ...)` without `--progress-mode` yields `Eager`, while `execute(Config::process(n), ...)` yields `Demand`. + +Severity: low-medium — the two entry points silently use different progress modes. Users of `execute_from_args` get the less robust `Eager` mode by default, which risks saturating the system with progress messages. + +Fix: change line 115 to use `ProgressMode::Demand` (or `ProgressMode::default()`) as the fallback: +```rust +let progress_mode = matches + .opt_get_default("progress-mode", ProgressMode::default())?; +``` + +## `communication/src/allocator/zero_copy/allocator.rs` + +### `receive()` uses `size_of::()` instead of `header.header_bytes()` + +In `receive()` at line 270, the header is stripped from the payload using: +```rust +let _ = peel.extract_to(::std::mem::size_of::()); +``` + +`MessageHeader` has 6 `usize` fields, so `size_of::()` is platform-dependent (48 on 64-bit, 24 on 32-bit). +The wire format always uses 6 `u64` values, so the correct strip size is `header.header_bytes()` which returns `size_of::() * 6 = 48` unconditionally. + +Compare with `allocator_process.rs` line 197 which correctly uses: +```rust +let _ = peel.extract_to(header.header_bytes()); +``` + +On 64-bit platforms the values coincide (48 = 48), so the bug is latent. +On 32-bit platforms, only 24 bytes would be stripped, leaving 24 bytes of header data mixed into the payload, corrupting every deserialized message received over TCP. + +Reproduction: compile and run a multi-process timely computation on a 32-bit target. All inter-process messages will deserialize incorrectly. + +Severity: low — 32-bit deployments are rare, and the two values coincide on the dominant 64-bit platform. The inconsistency with `allocator_process.rs` indicates the intent was to use `header.header_bytes()`. + +Fix: change line 270 to: +```rust +let _ = peel.extract_to(header.header_bytes()); +``` diff --git a/log_alloc.md b/log_alloc.md new file mode 100644 index 000000000..f0e546ab6 --- /dev/null +++ b/log_alloc.md @@ -0,0 +1,125 @@ +# Allocation audit log + +## Findings by theme + +### 1. Broadcast clones every record `peers` times + +**Severity: medium-high** + +* `timely/src/dataflow/operators/vec/broadcast.rs:31` — The broadcast operator is implemented as `flat_map(|x| (0..peers).map(|i| (i, x.clone()))).exchange(|ix| ix.0).map(|(_i,x)| x)`. + Each record is cloned `peers` times, wrapped in tuples, exchanged, then unwrapped. + This is O(peers * records) in allocations. + The comment acknowledges: "Simplified implementation... Optimize once they have settled down." + +### 2. Per-message event notifications without batching + +**Severity: medium** + +The inter-thread communication path does three operations per message push with no batching: +* `communication/src/allocator/counters.rs:47-49` — `events.push(self.index)` appends to a shared Vec on every push, growing O(messages) between drains. +* `communication/src/allocator/counters.rs:102` — `self.buzzer.buzz()` calls unpark/condvar on every push, even when the target thread is already awake. +* `communication/src/allocator/process.rs:189-194` — `receive()` drains all pending mpsc messages into the events Vec in one shot, no bound or backpressure. + +Commented-out code in `counters.rs:34-44` shows a batching strategy was considered but not completed. + +### 3. Unbounded buffer growth throughout the communication layer + +**Severity: medium** + +Multiple buffers grow to their high-water mark and never shrink: + +* `communication/src/allocator/zero_copy/bytes_slab.rs:106` — `in_progress` Vec grows as buffers are retired, never shrinks. Slow consumers cause monotonic growth. +* `communication/src/allocator/zero_copy/bytes_exchange.rs:31` — `MergeQueue` VecDeque grows without backpressure under producer-consumer imbalance. +* `communication/src/allocator/zero_copy/allocator.rs:277-289` and `allocator_process.rs:204-216` — Per-channel `VecDeque` grows without limit if consumers are slow. +* `communication/src/allocator/zero_copy/tcp.rs:53-56` — `stageds` inner Vecs retain peak capacity. +* `communication/src/allocator/zero_copy/allocator.rs:128` and `allocator_process.rs:118` — `staged` Vec retains high-water-mark capacity. + +### 4. Capability operations are heavier than necessary + +**Severity: medium** + +* `timely/src/dataflow/operators/capability.rs:154-161` — `try_downgrade` creates a new intermediate `Capability` (incrementing ChangeBatch), then drops the old one (decrementing). Two `borrow_mut` + `update` calls when one in-place update would suffice. +* `timely/src/dataflow/operators/generic/notificator.rs:323` — `make_available` clones capabilities from `pending` instead of moving them. A TODO comment acknowledges this. +* `timely/src/dataflow/operators/capability.rs:167-171` — `Capability::drop` clones the time to call `update(time.clone(), -1)` because `update` takes ownership. + +### 5. Repeated string-based logger lookup on every step + +**Severity: medium** + +* `timely/src/worker.rs:391,401` — `self.logging()` is called multiple times per `step_or_park()`. Each call goes through `self.log_register()` → `borrow()` → `HashMap::get("timely")`, performing a string lookup on every worker step. + Should be cached in the `Worker` struct. + +### 6. EventLink allocates one Rc per captured event + +**Severity: medium** + +* `timely/src/dataflow/operators/core/capture/event.rs:75` — Every pushed event creates a new `Rc`. For high-throughput capture, this is one heap allocation per event. + A pre-allocated ring buffer or arena would be more efficient. + +### 7. Reclock operator has O(n^2) stash behavior + +**Severity: medium** + +* `timely/src/dataflow/operators/core/reclock.rs:55-79` — The stash is a `Vec` scanned linearly per notification, then `retain` shifts elements. With many distinct timestamps this becomes O(n^2). + A `BTreeMap>` would give O(log n) lookups and efficient range removal. + +### 8. Logging allocates Vecs in the hot path + +**Severity: medium (when logging enabled)** + +* `timely/src/progress/broadcast.rs:66-67,120-121` — Every `send()`/`recv()` allocates two `Vec`s for logging that are transferred to the logger by ownership. +* `timely/src/progress/reachability.rs:852,867` — `log_source_updates`/`log_target_updates` collect into new Vecs, cloning every timestamp. +* `timely/src/logging.rs:51` — `BatchLogger::publish_batch` allocates a 2-element Vec per progress frontier advance. + +### 9. `BytesRefill` double indirection + +**Severity: medium** + +* `communication/src/initialize.rs:157` — Default refill closure creates `Box::new(vec![0_u8; size])`: the Vec already heap-allocates its buffer, then Box adds another heap allocation and pointer indirection. + `vec![0u8; size].into_boxed_slice()` would eliminate the Vec metadata overhead. + +### 10. Unnecessary clone in TCP receive unicast path + +**Severity: low-medium** + +* `communication/src/allocator/zero_copy/tcp.rs:99-101` — `bytes.clone()` for every target in the range. For unicast messages (the common case where `target_upper - target_lower == 1`), the original `bytes` could be moved instead of cloned, saving one atomic refcount increment/decrement pair per message. + +### 11. `SyncActivator` and delayed activations allocate path Vecs + +**Severity: low-medium** + +* `timely/src/scheduling/activate.rs:280` — `SyncActivator::activate()` clones `self.path` (a `Vec`) on every call. +* `timely/src/scheduling/activate.rs:87` — `activate_after` allocates `path.to_vec()` per delayed activation. + Using `Rc<[usize]>` would avoid per-call allocation. + +### 12. Exchange partition clones time per container extraction + +**Severity: low-medium** + +* `timely/src/dataflow/channels/pushers/exchange.rs:57,67` — `time.clone()` inside the per-container extraction loop. For complex product timestamps, this adds up. + +### 13. Sequencer inefficiencies + +**Severity: low-medium** + +* `timely/src/synchronization/sequence.rs:185` — Sink re-sorts the entire `recvd` vector each invocation, including already-sorted elements. Should sort only new elements and merge. +* `timely/src/synchronization/sequence.rs:153` — Clones each element `peers - 1` times; the last iteration could move. + +### 14. Thread allocator dead code + +**Severity: low** + +* `communication/src/allocator/thread.rs:61` — The shared tuple contains two VecDeques but the recycling code using the second one (lines 97-102) is commented out. The second VecDeque is allocated but never used. + +### 15. Partition operator intermediate buffering + +**Severity: low-medium** + +* `timely/src/dataflow/operators/core/partition.rs:61-67` — Creates a `BTreeMap>` to buffer data per partition before pushing to outputs. Data could be pushed directly to per-output container builders without the intermediate collection. + +### 16. Minor findings + +* `container/src/lib.rs:150` — `CapacityContainerBuilder::pending` VecDeque grows but never shrinks. `relax()` is a no-op. +* `container/src/lib.rs:216-219` — `ensure_capacity` computes `reserve(preferred - capacity())` but should use `reserve(preferred - len())`. +* Various `BinaryHeap` and `Vec` instances across the codebase that drain but never shrink (standard amortized pattern, acceptable in most cases). +* `timely/src/synchronization/barrier.rs:23` — `Worker::clone()` deep-clones `Config` (which contains a `HashMap`), but Config could be `Arc`-wrapped. diff --git a/log_api.md b/log_api.md new file mode 100644 index 000000000..c53b590e4 --- /dev/null +++ b/log_api.md @@ -0,0 +1,119 @@ +# API consistency audit log + +## `core::Input::Handle` — `epoch()` and `time()` return the identical value + +`Handle` at `timely/src/dataflow/operators/core/input.rs:473-480` defines two methods: +```rust +pub fn epoch(&self) -> &T { &self.now_at } +pub fn time(&self) -> &T { &self.now_at } +``` + +Both return a reference to the same field. +Users have no way to know which to call, and the existence of both suggests they might differ. + +Suggestion: deprecate `epoch()` in favor of `time()`, which is the name used everywhere else (e.g. `Capability::time()`, `probe::Handle::less_than(time)`). + +## `Antichain::from_elem` vs `MutableAntichain::new_bottom` — inconsistent singleton constructors + +`Antichain::from_elem(element)` at `frontier.rs:222` and `MutableAntichain::new_bottom(bottom)` at `frontier.rs:447` both create a singleton antichain, but use different naming conventions. + +`from_elem` follows Rust standard library conventions. +`new_bottom` uses domain-specific naming. + +Suggestion: add `MutableAntichain::from_elem` as an alias, or rename `new_bottom` to `from_elem` for consistency. + +## `core` vs `vec` — operators with no `core` equivalent + +Several operators exist only in `vec` with no `core`-level generalization: + +* `vec::Delay` (`delay`, `delay_total`, `delay_batch`) +* `vec::Broadcast` +* `vec::Branch` (data-dependent branching; note: `BranchWhen` works on generic `Stream` but lives in `vec::branch`) +* `vec::count::Accumulate` (`accumulate`, `count`) +* `vec::ResultStream` (`ok`, `err`, `map_ok`, `map_err`, `and_then`, `unwrap_or_else`) +* `vec::flow_controlled::iterator_source` + +These operators are only available for `StreamVec`, not for arbitrary container types. +Users who switch from `Vec` to a custom container must reimplement this functionality. + +The most impactful gaps are `Delay` and `Branch`, which are fundamental dataflow operations. + +## `vec::BranchWhen` — lives in `vec` module but operates on generic `Stream` + +`BranchWhen` at `vec/branch.rs:102` is implemented for `Stream` (generic containers), not `StreamVec`: +```rust +impl BranchWhen for Stream { ... } +``` + +It is defined in the `vec` module but does not depend on `Vec` containers. +It should be in `core` alongside `OkErr`, `Partition`, etc. + +## `vec::Partition` — closure type as trait parameter + +`vec::Partition` at `vec/partition.rs:8` puts the closure type in the trait generics: +```rust +pub trait Partition (u64, D2)> { + fn partition(self, parts: u64, route: F) -> Vec>; +} +``` + +`core::Partition` at `core/partition.rs:11` keeps the closure as a method-level generic: +```rust +pub trait Partition { + fn partition(self, parts: u64, route: F) -> Vec> + where ...; +} +``` + +The `vec` style makes the trait harder to import and use, because the user must specify all type parameters. +Every other operator trait in both `core` and `vec` uses method-level generics for closures. + +## `Antichain` has `with_capacity` but `MutableAntichain` does not + +`Antichain::with_capacity(capacity)` at `frontier.rs:207` pre-allocates space. +`MutableAntichain` has no `with_capacity` constructor, despite wrapping internal collections that support it. + +This is a minor gap, but inconsistent across the two related types. + +## `Antichain::extend` shadows `std::iter::Extend` + +`Antichain::extend` at `frontier.rs:118` has signature: +```rust +pub fn extend>(&mut self, iterator: I) -> bool +``` + +This returns `bool` (whether any element was inserted), which conflicts with the `std::iter::Extend` trait (which returns `()`). +As a result, `Antichain` cannot implement `std::iter::Extend`, though it does implement `FromIterator`. + +Users expecting the standard `Extend` trait to work will be surprised. + +## `core::Map` has `flat_map_builder`, `vec::Map` has `map_in_place` — non-overlapping extensions + +`core::Map` provides `map`, `flat_map`, and `flat_map_builder`. +`vec::Map` provides `map`, `flat_map`, and `map_in_place`. + +`flat_map_builder` (a zero-cost iterator combinator pattern) has no vec counterpart. +`map_in_place` (mutation without allocation) has no core counterpart. + +These are both useful optimizations that are only available in one of the two module hierarchies. + +## `core::OkErr` has no `vec` counterpart + +`core::OkErr` splits a stream by a closure returning `Result`. +The `vec` module has no `OkErr` trait — instead it has `ResultStream` which operates on streams of `Result` values. +These serve different purposes: +* `OkErr` splits any stream into two streams (general routing) +* `ResultStream` processes streams whose data is already `Result` + +A `vec::OkErr` wrapper (delegating to `core::OkErr`) would be consistent with how `vec::Filter`, `vec::Partition`, etc. wrap their core counterparts. + +## `Bytes` and `BytesMut` — asymmetric `try_merge` + +`Bytes` has `try_merge(&mut self, other: Bytes) -> Result<(), Bytes>` at `bytes/src/lib.rs:238`. +`BytesMut` has no `try_merge` method. + +To merge `BytesMut` slices, one must first `freeze()` them into `Bytes`, merge, then work with the result. +The module-level doc example demonstrates this pattern, but it is an API asymmetry. + +The crate docs note this: "The crate is currently minimalist rather than maximalist." +Still, the asymmetry means `BytesMut` users must navigate a more complex workflow. diff --git a/log_hotloop.md b/log_hotloop.md new file mode 100644 index 000000000..4c9e1096c --- /dev/null +++ b/log_hotloop.md @@ -0,0 +1,150 @@ +# Hot loop optimization audit log + +## Findings by theme + +### 1. Per-message overhead on every dataflow edge (Counter + Tee chain) + +**Severity: high** + +Every edge in the dataflow chains: `Counter::push` → `Tee::push` → `{PushOne|PushMany}::push` → downstream. +Per message this costs: 2 `RefCell::borrow_mut()` checks, 1-2 vtable calls, 1 timestamp clone, 1 `ChangeBatch::update`. + +* `timely/src/dataflow/channels/pushers/tee.rs:86` — `RefCell::borrow_mut()` on every message through every Tee. +* `timely/src/dataflow/channels/pushers/tee.rs:87` — Virtual dispatch through `Box` on every message. Could be an enum (`PushOne | PushMany`) to allow inlining. +* `timely/src/dataflow/channels/pushers/counter.rs:21-26` — Two separate `borrow_mut()` calls per push (one for `update`, one for `is_empty` check). Could be a single borrow scope. +* `timely/src/dataflow/channels/pushers/counter.rs:22` — `message.time.clone()` per message. A "last seen time" cache could batch same-time updates. + +Fusing Counter and Tee into a single struct would eliminate one `RefCell` round-trip. Replacing `Box` with an enum would allow inlining the common single-consumer case. + +### 2. Per-element event push and buzzer syscall in inter-thread communication + +**Severity: high** + +* `communication/src/allocator/counters.rs:47-49` — `Pusher::push` does `events.borrow_mut().push(self.index)` per element. Commented-out code shows a batching strategy that would reduce to O(flushes). +* `communication/src/allocator/counters.rs:99,102` — `ArcPusher::push` does `events.send(self.index)` (mpsc send, involves mutex) and `buzzer.buzz()` (thread unpark syscall) per element. Batching to flush boundaries would save ~999 mutex acquisitions and syscalls per 1000-element batch. + +### 3. Exchange pusher: random cache access in hash-routing loop + +**Severity: high** + +* `timely/src/dataflow/channels/pushers/exchange.rs:54-55` — `builders[index]` with hash-derived index causes random cache-line access across the builders vector. With many peers, this thrashes L1 cache. +* The existing `TODO` comment on line 88 already notes the need for software write-combining (SWC): buffer small staging areas per target, flush when full. +* Line 53-58: `extract()` is called per datum inside the inner loop, checking builder fullness every element. Separating the hash-routing loop from container extraction would enable tighter codegen. +* Line 112: `#[inline(never)]` on `Exchange::push` prevents inlining the single-pusher fast path. + +### 4. Virtual dispatch through `Box>` on every message receive + +**Severity: medium-high** + +* `communication/src/allocator/zero_copy/push_pull.rs:122-140` — `PullerInner::pull` calls `self.inner.pull()` through `Box>` on every message. This is the primary data receive path for `TcpAllocator`. Making `PullerInner` generic over the inner puller type would monomorphize the pull path and allow inlining. + +### 5. `ChangeBatch::compact()` re-sorts clean prefix and uses two-pass consolidation + +**Severity: medium-high** + +* `timely/src/progress/change_batch.rs:293` — `compact()` sorts the entire vector including the already-sorted clean prefix. For workloads where a few dirty elements are appended to a large clean batch, a merge-based approach (sort only dirty suffix, then merge) would reduce from O(n log n) to O(n). +* `timely/src/progress/change_batch.rs:294-304` — Three passes (sort, forward-scan-and-zero, retain). A single dedup-accumulate pass would eliminate the `retain` pass and improve cache utilization. + +### 6. Delay operator: per-datum assert + HashMap lookup + timestamp clone + +**Severity: high** + +* `timely/src/dataflow/operators/vec/delay.rs:106` — `assert!(time.time().less_equal(&new_time))` per datum in release mode. Should be `debug_assert!`. +* `timely/src/dataflow/operators/vec/delay.rs:107-109` — Per-datum HashMap lookup with `new_time.clone()` for the entry API. A last-seen-time cache would avoid repeated lookups when most data maps to the same time. +* `delay_total` delegates to `delay` without specializing for total-order timestamps. + +### 7. `PortConnectivity` uses `BTreeMap` for typically 1-2 ports + +**Severity: medium** + +* `timely/src/progress/operate.rs:70-107` — `BTreeMap>` used in `PortConnectivity`, iterated via `iter_ports()` on every frontier change in the propagation loop. A `SmallVec<[(usize, Antichain); 2]>` would give contiguous memory access for the overwhelmingly common 1-2 port case. + +### 8. `BinaryHeap` worklist in `Tracker::propagate_all()` + +**Severity: medium** + +* `timely/src/progress/reachability.rs:645-702` — The worklist uses `BinaryHeap` with O(log n) per push/pop. Many entries are duplicates that cancel immediately. A sort-based approach (accumulate in Vec, sort once, process in order) would have better cache locality and constants. + +### 9. HashMap lookups in the worker step loop + +**Severity: medium** + +* `timely/src/worker.rs:367` — `paths.get(&channel)` HashMap lookup per channel event. Channel IDs are dense integers allocated sequentially — a `Vec` indexed by channel ID would give O(1) access. +* `timely/src/worker.rs:413` — `dataflows.entry(index)` HashMap lookup per active dataflow. Same — dataflow indices are sequential, suitable for `Vec>`. +* `timely/src/worker.rs:368-370` — `self.activations.borrow_mut()` inside the channel event loop. Should be hoisted outside. + +### 10. `RefCell::borrow_mut()` churn on activations during step + +**Severity: medium** + +* `timely/src/worker.rs:368-408` — Four separate `RefCell` borrow/unborrow cycles on `self.activations` per step. A single borrow scope for the channel-event + advance + for_extensions phase would reduce overhead. + +### 11. `Activations::advance()` unconditional compaction + +**Severity: medium** + +* `timely/src/scheduling/activate.rs:122-128` — Every `advance()` copies all active path slices from `self.slices` into `self.buffer`, even when no new activations were added. A dirty flag would make idle steps essentially free. +* `timely/src/scheduling/activate.rs:117-119` — Activation path sorting uses indirect slice comparisons. Packing short paths into fixed-size keys would enable direct value comparisons. + +### 12. Partition operator uses BTreeMap for dense integer indices + +**Severity: medium** + +* `timely/src/dataflow/operators/core/partition.rs:62-67` — Per-datum `BTreeMap` lookup where partition indices are 0..parts. A `Vec>` indexed by partition number converts O(log k) into O(1). + +### 13. Filter/Branch/OkErr per-element give instead of batch operations + +**Severity: medium** + +* `timely/src/dataflow/operators/core/filter.rs:33-34` — Core filter uses per-element `give_iterator` with `filter`. The vec variant already uses `retain` for in-place filtering. +* `timely/src/dataflow/operators/vec/branch.rs:58-66` — Per-datum `give()` to two output sessions. Partitioning into two local Vecs first then `give_container` would batch the output. + +### 14. `Rc>` indirection per serialized push + +**Severity: medium** + +* `communication/src/allocator/zero_copy/push_pull.rs:36-58` — Every outgoing message does `Rc` pointer chase + `RefCell::borrow_mut()` + borrow flag manipulation. Multiple Pushers share a single SendEndpoint; batching the borrow per push batch would reduce overhead. + +### 15. MergeQueue lock held during entire VecDeque drain + +**Severity: medium** + +* `communication/src/allocator/zero_copy/bytes_exchange.rs:94-106` — `drain_into` holds the mutex for the entire drain. Swapping the queue with an empty VecDeque under the lock and draining outside would minimize lock hold time. + +### 16. Spinlock without PAUSE hint + +**Severity: low-medium** + +* `communication/src/allocator/zero_copy/bytes_exchange.rs:57-62,98-103` — `MergeQueue` spin-lock loops without `std::hint::spin_loop()`. Missing PAUSE instruction causes excessive cache-line bouncing under contention. + +### 17. `MutableAntichain::update_iter()` redundant frontier scans + +**Severity: low-medium** + +* `timely/src/progress/frontier.rs:539-550` — Two separate linear scans of `self.frontier` per update (one for `less_than`, one for `less_equal`). Could be fused into a single scan. + +### 18. `Progcaster::recv()` per-element clone and ChangeBatch updates + +**Severity: medium** + +* `timely/src/progress/broadcast.rs:147-149` — Each received progress update is individually cloned and pushed via `update()`, potentially triggering `maintain_bounds`/`compact()` mid-receive. Batching with `extend` would defer compaction. + +### 19. `CapacityContainerBuilder` per-element capacity check prevents bulk vectorization + +**Severity: medium** + +* `container/src/lib.rs:153-167` — `push_into` checks `ensure_capacity` and `at_capacity` per element. A bulk `extend`-style method that copies elements in chunks up to remaining capacity would enable auto-vectorization for `Copy` types. + +### 20. Loop-invariant logger checks in TCP send/recv loops + +**Severity: low** + +* `communication/src/allocator/zero_copy/tcp.rs:87-96` — `logger.as_mut().map(...)` per iteration. Logger presence is loop-invariant. +* `communication/src/allocator/zero_copy/tcp.rs:181-192` — When logging enabled, send loop redundantly re-parses message headers just for logging. +* `timely/src/worker.rs:776-789` — Two `Option` checks on `self.logging` per `Wrapper::step()`. Branch predictor handles this well. + +### 21. `for_each_time` sorts staging on every operator invocation + +**Severity: low-medium** + +* `timely/src/dataflow/operators/generic/handles.rs:56-71` — Sorts entire staging deque by time on every call. Messages from a single source (pipeline pact) are already time-ordered; could skip the sort in that case. diff --git a/log_mdbook.md b/log_mdbook.md new file mode 100644 index 000000000..0304401ca --- /dev/null +++ b/log_mdbook.md @@ -0,0 +1,168 @@ +# mdbook documentation audit log + +## Overview + +All 47 compilable code examples (`rust` blocks without `ignore`) pass. +The problems are concentrated in chapters 4-5, where `rust,ignore` blocks show outdated API signatures. +Chapters 0-3 are largely accurate, with only cosmetic issues. + +## Findings + +### 1. Broken GitHub links to examples directory + +**Severity: low** + +* `chapter_0/chapter_0_1.md:5` — Links to `examples/hello.rs` at the repo root. + The actual path is `timely/examples/hello.rs`. + The link to the `examples/` tree is similarly wrong. + +### 2. Outdated build output with removed dependencies + +**Severity: low** + +* `chapter_0/chapter_0_1.md:62-70` — Shows `timely v0.2.0`, `timely_sort v0.1.6`, `abomonation v0.4.5`, `time v0.1.38`. + Current version is `v0.27.0`. + Dependencies `timely_sort`, `abomonation`, `time`, and `libc` no longer exist. + Build output also exposes a developer's local path (`/Users/mcsherry/Projects/temporary/timely-dataflow`). + +### 3. Outdated Rayon repository URL + +**Severity: low** + +* `chapter_0/chapter_0_3.md:13` — Links to `https://github.com/nikomatsakis/rayon`. + Rayon moved to `https://github.com/rayon-rs/rayon`. + +### 4. Missing dereference in delay example + +**Severity: medium — code won't compile** + +* `chapter_4/chapter_4_3.md:78` — `.delay(|number, time| number / 100)` where `number` is `&u64`. + Cannot divide a reference by an integer. + Should be `*number / 100`. + +### 5. Stale `capture_into` implementation + +**Severity: high — completely wrong API** + +* `chapter_4/chapter_4_4.md:12-42` — Shows a two-closure `OperatorBuilder::build` API: + ```rust + builder.build( + move |frontier| { ... }, // frontier closure + move |input, output| { ... } // data closure + ); + ``` + The actual API takes a single closure with a `&mut SharedProgress` parameter. + The generic parameter `D` is now `C` (container-generic). + +### 6. Stale `replay_into` implementation + +**Severity: high — completely wrong API** + +* `chapter_4/chapter_4_4.md:55-96` — Same two-closure API issue. + Also references `Event::Start` (does not exist) and calls `output.cease()` (does not exist, now `output.done()`). + Return type shown as `StreamVec`, actual is `Stream`. + +### 7. Dead abomonation reference + +**Severity: medium** + +* `chapter_4/chapter_4_4.md:115` — Claims serialization uses abomonation. + The crate now uses `serde`/`bincode` via the `Bytesable` trait. + Abomonation is no longer a dependency. + +### 8. Dead kafkaesque reference + +**Severity: low** + +* `chapter_4/chapter_4_4.md:212` — References an "in-progress Kafka adapter" at `kafkaesque/` in the repo. + This directory does not exist. + +### 9. Chapter 4.5 self-acknowledged as incorrect + +**Severity: high — entire chapter** + +* `chapter_4/chapter_4_5.md:4` — Contains `**THIS TEXT IS LARGELY INCORRECT AND NEEDS IMPROVEMENT**`. + +### 10. Wrong `Data` trait definition + +**Severity: medium** + +* `chapter_4/chapter_4_5.md:11-12` — Describes `Data` as "essentially a synonym for `Clone+'static`". + There is no separate `Data` trait in the timely crate. + Types used within a worker need `'static`; cross-thread exchange uses `ExchangeData` which requires `Send+Any+Serialize+Deserialize`. + +### 11. Wrong `ExchangeData` trait definition + +**Severity: medium** + +* `chapter_4/chapter_4_5.md:22-24` — Describes `ExchangeData` with a `Sync` bound. + The actual trait does not require `Sync`. + +### 12. Missing import for `Map` operator + +**Severity: medium — code won't compile** + +* `chapter_4/chapter_4_5.md:48-53,110-117` — Uses `use timely::dataflow::operators::*` which does not bring `Map` into scope. + The `map` method requires `use timely::dataflow::operators::vec::Map`. + +### 13. Wrong `Config` enum name and variants + +**Severity: medium** + +* `chapter_5/chapter_5_1.md:84-94` — Shows `Configuration` enum with variants `Thread`, `Process(usize)`, `Cluster(usize, usize, Vec, bool)`. + Actual name is `Config`. + Missing `ProcessBinary(usize)` variant. + `Cluster` is now a struct variant (`Cluster { threads, process, addresses, report, log_fn }`), not a tuple variant. + +### 14. Wrong `Allocate::allocate` signature + +**Severity: medium** + +* `chapter_5/chapter_5_1.md:104-109` — Shows `fn allocate(&mut self) -> (Vec>>, Box>)`. + Actual: `fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>)`. + Wrong trait bound (`Data` vs `Exchangeable`), missing `identifier` parameter, missing `dyn` keywords. + +### 15. Wrong `Bytesable` trait signature + +**Severity: high — completely different API** + +* `chapter_5/chapter_5_1.md:129-134` — Shows `fn into_bytes(&mut self, &mut Vec)` and `fn from_bytes(&mut Vec) -> Self`. + Actual: `fn from_bytes(bytes: Bytes) -> Self`, `fn length_in_bytes(&self) -> usize`, `fn into_bytes(&self, writer: &mut W)`. + Every method signature is different; `length_in_bytes` is missing entirely. + +### 16. Wrong `Source`/`Target` field name + +**Severity: medium** + +* `chapter_5/chapter_5_2.md:23-37` — Shows field name `pub index: usize` in both `Source` and `Target`. + Actual field name is `pub node: usize`. + +### 17. Incomplete `Timestamp` trait definition + +**Severity: medium** + +* `chapter_5/chapter_5_2.md:73-77` — Shows `pub trait Timestamp: PartialOrder { type Summary: PathSummary; }`. + Actual: `pub trait Timestamp: Clone+Eq+PartialOrder+Ord+Debug+Any+ExchangeData { type Summary: PathSummary + 'static; fn minimum() -> Self; }`. + Missing supertraits, `'static` bound, and `minimum()` method. + +### 18. Incomplete `PathSummary` trait definition + +**Severity: medium** + +* `chapter_5/chapter_5_2.md:81-85` — Shows `pub trait PathSummary: PartialOrder { ... }`. + Actual: `pub trait PathSummary: Clone+Eq+PartialOrder+Debug+Default { ... }`. + Missing `Clone+Eq+Debug+Default` supertraits. + +### 19. Stale `Operate` trait description + +**Severity: low-medium** + +* `chapter_5/chapter_5_2.md:115` — Describes `Operate` as a trait operators implement for scheduling. + Per commit `39ba5a74`, `Operate` is now a builder trait that consumes itself via `initialize()`. + The description is not entirely wrong but omits the builder pattern. + +### 20. Placeholder TODO in containers chapter + +**Severity: low** + +* `chapter_5/chapter_5_3.md:27` — Contains a TODO placeholder ("Explain why it's hard to build container-generic operators...") instead of actual content. diff --git a/log_panic.md b/log_panic.md new file mode 100644 index 000000000..49848dbcf --- /dev/null +++ b/log_panic.md @@ -0,0 +1,164 @@ +# Panic audit log + +## Findings by theme + +### 1. Network I/O errors panic instead of returning errors + +**Severity: high** + +The entire distributed communication layer converts all I/O errors into panics. +Any network disruption, remote process crash, or ungraceful shutdown crashes the local process. +Two unchecked network-supplied array indices can cause out-of-bounds panics from corrupted or malicious messages. + +**`communication/src/allocator/zero_copy/tcp.rs`** — The `tcp_panic` function (line 21) is the central panic point. +All read errors (line 73), write errors (line 192), flush errors (line 173), and shutdown errors (line 210) route through it. +EOF on the socket also panics (line 76). + +* Line 100: `stageds[target - worker_offset]` — array index computed from network-supplied `MessageHeader` fields. + If `target < worker_offset`, the subtraction wraps to `usize::MAX` (panic in debug, OOB in release). + If `target - worker_offset >= stageds.len()`, it's out of bounds. + A single corrupted message header crashes the receiver. + +**`communication/src/networking.rs`** — Startup handshake panics on I/O errors. + +* Line 127: `expect("failed to encode/send worker index")` — network failure after connect panics. +* Line 157: `expect("failed to decode worker index")` — remote crash or malformed connection panics. +* Line 158: `results[identifier - my_index - 1]` — `identifier` comes from the network with no validation. + If `identifier <= my_index`, subtraction wraps. If `identifier` is too large, it's OOB. + +**`timely/src/lib.rs`** — Deserialization of network messages. + +* Line 147: `bincode::deserialize(&bytes[..]).expect(...)` — corrupt message data panics. +* Line 149: `assert_eq!(bytes.len(), (typed_size + 7) & !7)` — size mismatch panics. + +**`timely/src/dataflow/channels/mod.rs`** — `Message::from_bytes` deserialization. + +* Lines 71-74: `read_u64().unwrap()`, `bincode::deserialize_from().expect(...)` — malformed network bytes panic. + +### 2. Drop implementations panic, causing abort during unwinding + +**Severity: high** + +Several `Drop` implementations call `expect()` on `JoinHandle::join()`. +If a worker or communication thread panicked (e.g., from network I/O errors above), the `Drop` impl panics too. +If the dropping thread is already unwinding from another panic, this causes a process abort. + +* `communication/src/initialize.rs:417` — `WorkerGuards::drop()` calls `guard.join().expect("Worker panic")`. + The public `join()` method (line 378) returns `Result`, but dropping without joining panics. + +* `communication/src/allocator/zero_copy/initialize.rs:25,29` — `CommsGuard::drop()` calls `.expect("Send thread panic")` and `.expect("Recv thread panic")`. + Network errors in tcp.rs cause send/recv threads to panic, then `CommsGuard::drop()` panics too. + +* `communication/src/allocator/zero_copy/bytes_exchange.rs:119` — `MergeQueue::drop()` panics if the poison flag is set (by another thread's panic). + If the dropping thread is already unwinding, this aborts. + +### 3. Capture/replay I/O errors panic + +**Severity: high** + +The capture/replay mechanism has no error propagation path. + +* `timely/src/dataflow/operators/core/capture/event.rs:163-165` — `EventWriter` calls `write_all().expect(...)` and `serialize_into().expect(...)`. + A broken TCP connection or full disk during capture crashes the process. + A TODO comment acknowledges: "push has no mechanism to report errors, so we unwrap." + +* `timely/src/dataflow/operators/core/capture/event.rs:200` — `EventReader` calls `panic!("read failed: {e}")` on any I/O error other than `WouldBlock`. + +* `timely/src/dataflow/operators/core/capture/event.rs:211` — `bincode::deserialize().expect(...)` on corrupt capture data panics. + +### 4. User-facing APIs panic without safe alternatives + +**Severity: high** + +Some user-facing APIs panic on invalid input with no `try_*` or `Result`-returning alternative. + +* `timely/src/dataflow/operators/core/input.rs:455` — `Handle::advance_to()` asserts `self.now_at.less_equal(&next)`. + Calling with a non-monotonic time panics. No `try_advance_to` exists. + +* `timely/src/dataflow/operators/vec/delay.rs:106,133` — `Delay::delay` and `delay_batch` assert that the user-supplied closure returns a time `>=` the input time. + A buggy closure panics inside the dataflow runtime. + +### 5. Capability API panics (safe alternatives exist) + +**Severity: medium** + +The capability API panics on misuse, but `try_*` alternatives exist for most methods. + +* `timely/src/dataflow/operators/capability.rs:105` — `Capability::delayed` panics if `new_time` is not `>=` current time. Use `try_delayed` instead. +* `timely/src/dataflow/operators/capability.rs:140` — `Capability::downgrade` panics similarly. Use `try_downgrade`. +* `timely/src/dataflow/operators/capability.rs:449` — `CapabilitySet::delayed` panics. Use `try_delayed`. +* `timely/src/dataflow/operators/capability.rs:486` — `CapabilitySet::downgrade` panics. Use `try_downgrade`. +* `timely/src/dataflow/operators/capability.rs:287` — `InputCapability::delayed` panics on invalid time or disconnected output. No `try_*` alternative exists for this one. + +### 6. `try_regenerate` panics despite `try_` naming convention + +**Severity: medium** + +* `bytes/src/lib.rs:132` — `BytesMut::try_regenerate::()` calls `downcast_mut::().expect("Downcast failed")`. + If the type parameter `B` doesn't match the original allocation type, this panics instead of returning `false`. + The `try_` prefix strongly implies graceful failure. + The only internal call site (`bytes_slab.rs:83`) uses the correct type, but the method is public API. + +### 7. Sequencer panics if used before first worker step + +**Severity: medium** + +* `timely/src/synchronization/sequence.rs:212` — `Sequencer::push()` calls `unwrap()` on the activator, which is lazily initialized on the first `worker.step()`. Calling `push()` before stepping panics. + +* `timely/src/synchronization/sequence.rs:229` — `Sequencer::drop()` calls `expect("Sequencer.activator unavailable")`. Dropping a `Sequencer` before the first step panics. + +### 8. Initialization failures cascade via channel send/recv expects + +**Severity: medium** + +During multi-worker initialization, if any thread dies before setup completes, the surviving threads panic when their channel operations fail. + +* `communication/src/allocator/process.rs:39,43` — `expect("Failed to send/recv buzzer")` +* `communication/src/allocator/zero_copy/allocator.rs:91,98` — `expect("Failed to send/receive MergeQueue")` +* `communication/src/allocator/zero_copy/allocator_process.rs:70,77` — same pattern +* `communication/src/allocator/zero_copy/tcp.rs:48,151` — same pattern + +### 9. Mutex poisoning propagation + +**Severity: medium** + +* `communication/src/allocator/process.rs:123` — `self.channels.lock().expect("mutex error?")`. + If any thread panics while holding this lock, all subsequent `allocate()` calls on other threads panic. + +* `communication/src/allocator/zero_copy/bytes_exchange.rs:47,55,62,96,103` — `MergeQueue` checks a poison flag and panics if set. Additionally, `lock().expect(...)` on the internal mutex propagates poisoning. + +### 10. Operator implementation validation panics + +**Severity: medium** + +Users implementing the `Operate` trait or using low-level builder APIs can trigger panics from invalid configurations. + +* `timely/src/progress/subgraph.rs:657-666` — `assert_eq!` and `assert!` on connectivity summary dimensions. + A custom `Operate` impl returning wrong-sized connectivity panics. + +* `timely/src/progress/subgraph.rs:161` — `assert!` that children have contiguous indices. + +* `timely/src/progress/operate.rs:93,96` — `PortConnectivity::add_port` panics on duplicate port entries. + +* `timely/src/dataflow/operators/generic/builder_raw.rs:117` — `assert!` that connectivity references valid output ports. + +### 11. Reachability tracker unchecked indexing + +**Severity: low-medium** + +The `Tracker` and `Builder` in `timely/src/progress/reachability.rs` use unchecked array indexing on node/port indices throughout. +These are internal APIs not typically called by users directly, but a custom `Operate` implementation could supply out-of-bounds indices. + +* Lines 188, 599, 621, 661, 684, 717, 728 — `self.per_operator[node]`, `self.edges[node][port]`, etc. + +### 12. Minor findings + +* `container/src/lib.rs:195` — `i64::try_from(Vec::len(self)).unwrap()` in `record_count()`. Only panics for `Vec<()>` with >2^63 elements (practically unreachable). + +* `bytes/src/lib.rs:92,202` — `assert!(index <= self.len)` in `extract_to`. Standard bounds check, but the error message is unhelpful. + +* `timely/src/worker.rs:245,252,259` — `panic!("Unacceptable address: Length zero")` in `allocate`/`pipeline`/`broadcast`. Addresses are framework-generated, so this is effectively an internal invariant. + +* `logging/src/lib.rs:168,263,303` — `RefCell::borrow_mut()` panics on re-entrant logging (calling `log()` from within a log action callback). + +* `timely/src/progress/subgraph.rs:720` — `panic!()` when a shut-down operator receives frontier changes. Possible timing issue, but the framework normally prevents this. diff --git a/log_silent.md b/log_silent.md new file mode 100644 index 000000000..c83d8ef5f --- /dev/null +++ b/log_silent.md @@ -0,0 +1,105 @@ +# Silent error swallowing audit log + +## Findings by theme + +### 1. Infinite spin-loop on failed header parse + +**Severity: high (bug)** + +Both `allocator.rs` and `allocator_process.rs` have a `while !bytes.is_empty()` loop that parses message headers. +When `MessageHeader::try_read` returns `None` (partial/corrupted header), the `else` branch prints to stdout but never advances `bytes` or breaks. +This creates an infinite busy-loop consuming 100% CPU. + +* `communication/src/allocator/zero_copy/allocator.rs:291-293` +* `communication/src/allocator/zero_copy/allocator_process.rs:218-220` + +The comment "We expect that `bytes` contains an integral number of messages" indicates this was considered unreachable, but the `else` branch exists specifically for the case where it isn't. +A `break` or `panic!` would be more appropriate than an infinite spin with `println!`. + +### 2. Dropped event notification on channel send failure + +**Severity: medium** + +* `communication/src/allocator/counters.rs:99` — `let _ = self.events.send(self.index)`. + The event-notification channel tells the receiving worker that a message arrived on a specific channel. + If the send fails (receiver dropped), the message is delivered but the consumer is never notified. + During normal operation this could cause messages to sit unprocessed until the next poll cycle. + The code has a TODO comment acknowledging this: "Perhaps this shouldn't be a fatal error (e.g. in shutdown)." + +### 3. Cycle detection only prints to stdout + +**Severity: medium** + +* `timely/src/progress/reachability.rs:200-203` — `Builder::build()` detects cycles without timestamp increment but only calls `println!` and continues. + A cycle without timestamp increment is a liveness issue that will cause the computation to deadlock. + The function signature returns `(Tracker, Connectivity)` with no `Result`, so it cannot propagate the error. + Printing to stdout (not stderr) means the warning is easily missed. + +### 4. Message send failures silently discarded during push + +**Severity: low** + +* `communication/src/allocator/process.rs:215` — `let _ = self.target.send(element)`. + The comment explains: "The remote endpoint could be shut down, and so it is not fundamentally an error to fail to send." + Messages are silently lost when the receiver is dropped. + Intentional for graceful shutdown, but no logging or metric tracks dropped messages. + +* `timely/src/dataflow/operators/core/capture/event.rs:41` — `let _ = self.send(event)`. + Comment: "An Err(x) result just means 'data not accepted' most likely because the receiver is gone." + Same pattern — intentional but unobservable. + +### 5. `try_recv().ok()` conflates empty and disconnected + +**Severity: low** + +* `communication/src/allocator/process.rs:229` — `self.source.try_recv().ok()`. + `TryRecvError::Empty` (no message available) and `TryRecvError::Disconnected` (sender dropped) are both mapped to `None`. + The caller cannot distinguish "temporarily empty" from "permanently dead." + In practice, peer departure is detected through other mechanisms. + +### 6. Silent data drop for past-bound channels + +**Severity: low** + +* `communication/src/allocator/zero_copy/allocator.rs:278-284` +* `communication/src/allocator/zero_copy/allocator_process.rs:205-211` + +When data arrives for a channel whose ID is at or below `channel_id_bound` (already allocated and dropped), the data is silently discarded. +Intentional for shutdown, but could mask messages routed to wrong channels. + +### 7. Buffered channel data dropped on removal + +**Severity: low** + +* `communication/src/allocator/zero_copy/allocator.rs:240-241` +* `communication/src/allocator/zero_copy/allocator_process.rs:169-172` + +When a channel is removed from `to_local`, any remaining buffered data in its `VecDeque` is dropped. +A commented-out `assert!(dropped.borrow().is_empty())` shows this was once checked. +Intentional when dataflows are forcibly dropped. + +### 8. Unknown channel activations silently ignored + +**Severity: low** + +* `timely/src/worker.rs:367-371` — `if let Some(path) = paths.get(&channel)` ignores channels with no registered path. + Has a TODO: "This is a sloppy way to deal with channels that may not be alloc'd." + Could mask channel routing bugs, though in practice it handles transient state during startup/shutdown. + +### 9. `activate_after` ignores delay when timer is None + +**Severity: low** + +* `timely/src/scheduling/activate.rs:80-92` — When `self.timer` is `None`, `activate_after` ignores the delay and activates immediately. + Changes call semantics in a way the caller may not expect, but this is consistent with the `Worker` construction model. + +### Non-findings + +The following patterns were found but are intentional and safe: + +* `logger.as_mut().map(|l| l.log(...))` — Conditional logging throughout the codebase. The discarded `Option<()>` carries no error information. +* `.unwrap_or_default()` on `stash.take()` in `container/src/lib.rs:213` — Correct fallback for optional recycled buffer. +* `.unwrap_or(false)` on `peek()` comparisons in `reachability.rs:648` — Standard loop termination. +* `Rc::try_unwrap` failure in `capture/event.rs:104` — Iterative drop pattern for linked lists. +* `Weak::upgrade()` returning `None` in `sequence.rs:140,199` — Standard cleanup when owner is dropped. +* `update_iter` return values discarded in frontier constructors — Changes during construction are not useful. diff --git a/log_unsafe.md b/log_unsafe.md new file mode 100644 index 000000000..6b524060b --- /dev/null +++ b/log_unsafe.md @@ -0,0 +1,82 @@ +# Unsafe code audit log + +## Overview + +The codebase has a very small unsafe surface: 7 total `unsafe` usages across 3 files, all concentrated in low-level byte handling. +No `unsafe` exists in the `container`, `logging`, or `communication` library crates, nor in most of `timely`. + +## Findings + +### 1. `BytesMut` raw pointer aliasing relies on `Arc` exclusivity invariant + +**Severity: medium — soundness depends on informal invariant** + +* `bytes/src/lib.rs:47-58` — `BytesMut` stores a raw `*mut u8` pointer alongside an `Arc` that owns the backing allocation. + The safety argument is that the `Arc` prevents the allocation from being freed while any `BytesMut`/`Bytes` slice exists. +* `bytes/src/lib.rs:157` — `Deref` for `BytesMut` does `from_raw_parts(self.ptr, self.len)`. +* `bytes/src/lib.rs:164` — `DerefMut` for `BytesMut` does `from_raw_parts_mut(self.ptr, self.len)`. +* The struct comment at line 55-56 acknowledges uncertainty: "I'm not sure I understand Rust's rules enough to make a stronger statement about this." + +**Soundness analysis:** +The key invariant is that no two `BytesMut` instances share overlapping byte ranges. +`extract_to` (line 90) splits ranges using `assert!(index <= self.len)` and pointer arithmetic, which maintains non-overlap. +However, `BytesMut::from` (line 63) takes `self` by value, so there's no path to create overlapping `BytesMut` slices through the public API. +`Bytes` (the frozen variant) only provides `&[u8]`, so multiple `Bytes` over the same range are fine. +**Verdict: sound**, assuming the backing allocation (`B: DerefMut`) keeps its buffer stable after being moved into an `Arc`. +This is guaranteed for `Vec` and `Box<[u8]>`, which are the only types used in practice. + +### 2. `unsafe impl Send for Bytes` — missing `Sync` + +**Severity: low — correct but incomplete** + +* `bytes/src/lib.rs:190` — `Bytes` manually implements `Send` but not `Sync`. + `Bytes` contains `*const u8` (not `Send`/`Sync` by default) and `Arc` (which is `Send + Sync`). +* Since `Bytes` only provides `&[u8]` access (no interior mutability), it would also be safe to implement `Sync`. + The omission means `Bytes` cannot be shared by reference across threads, only moved — which is a minor API limitation, not a soundness issue. + +**Verdict: sound.** `Send` is correct because moving a `Bytes` to another thread is safe (the `Arc` keeps the allocation alive, and the `*const u8` points into it). +`Sync` could be added but its absence is conservative, not unsound. + +### 3. `BytesMut` is not `Send` — intentional but undocumented + +**Severity: low — API limitation** + +* `BytesMut` contains `*mut u8`, which opts out of auto-`Send`. + No manual `unsafe impl Send` is provided for `BytesMut`. +* The module doc at line 5 notes: "`BytesMut` should be able to implement `Send`" — this is correct, the same safety argument as `Bytes` applies. + The `*mut u8` does not create aliasing across threads because `BytesMut` enforces exclusive access. + +**Verdict: sound** (conservative — not unsound, just a missing capability). + +### 4. `ProductRegion::copy` delegates to inner regions + +**Severity: none — trait obligation** + +* `timely/src/order.rs:174` — `unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item` implements the `columnation::Region` trait. + The implementation simply delegates to `self.outer_region.copy()` and `self.inner_region.copy()`. +* The unsafety contract is defined by the `columnation` crate (external dependency, version 0.1). + The `Product` implementation adds no new unsafe behavior beyond what the inner regions do. + +**Verdict: sound** (assuming the `columnation` crate's `Region` contract is sound). + +### 5. lgalloc example uses raw pointers from allocator + +**Severity: none — example code** + +* `communication/examples/lgalloc.rs:47,54` — `Deref`/`DerefMut` for `LgallocHandle` use `from_raw_parts`/`from_raw_parts_mut` with a `NonNull` pointer and `capacity` from `lgalloc::allocate`. +* Soundness depends on `lgalloc::allocate` returning a valid pointer and capacity, and `lgalloc::deallocate` being called exactly once (ensured by `Drop`). + +**Verdict: sound** for an example (not library code, not reusable). + +## Summary + +| # | Location | Type | Verdict | +|---|----------|------|---------| +| 1 | `bytes/src/lib.rs:157,164` | `from_raw_parts[_mut]` | Sound (invariant maintained by API) | +| 2 | `bytes/src/lib.rs:190` | `unsafe impl Send` | Sound (conservative, could also be `Sync`) | +| 3 | `bytes/src/lib.rs:253` | `from_raw_parts` | Sound (same as #1, read-only variant) | +| 4 | `timely/src/order.rs:174` | `unsafe fn copy` | Sound (delegates to inner regions) | +| 5 | `communication/examples/lgalloc.rs:47,54` | `from_raw_parts[_mut]` | Sound (example code) | + +No unsoundness found. +The most notable observation is the `bytes` crate's comment expressing uncertainty about its own safety argument (line 55-56), though analysis confirms the invariants hold. diff --git a/prompt.md b/prompt.md new file mode 100644 index 000000000..06e4a34ca --- /dev/null +++ b/prompt.md @@ -0,0 +1,120 @@ +# Bug audit tracker + +## Crates and areas + +### `bytes/` (1 file) — DONE +* [x] `bytes/src/lib.rs` + +### `container/` (1 file) — DONE +* [x] `container/src/lib.rs` + +### `logging/` (1 file) — DONE +* [x] `logging/src/lib.rs` + +### `communication/` (20 files) — DONE +* [x] `communication/src/lib.rs` +* [x] `communication/src/initialize.rs` +* [x] `communication/src/buzzer.rs` +* [x] `communication/src/logging.rs` +* [x] `communication/src/networking.rs` +* [x] `communication/src/allocator/mod.rs` +* [x] `communication/src/allocator/process.rs` +* [x] `communication/src/allocator/counters.rs` +* [x] `communication/src/allocator/generic.rs` +* [x] `communication/src/allocator/canary.rs` +* [x] `communication/src/allocator/thread.rs` +* [x] `communication/src/allocator/zero_copy/mod.rs` +* [x] `communication/src/allocator/zero_copy/bytes_slab.rs` +* [x] `communication/src/allocator/zero_copy/bytes_exchange.rs` +* [x] `communication/src/allocator/zero_copy/push_pull.rs` +* [x] `communication/src/allocator/zero_copy/allocator.rs` — 1 bug found +* [x] `communication/src/allocator/zero_copy/allocator_process.rs` +* [x] `communication/src/allocator/zero_copy/tcp.rs` +* [x] `communication/src/allocator/zero_copy/stream.rs` +* [x] `communication/src/allocator/zero_copy/initialize.rs` + +### `timely/` — progress (8 files) — DONE +* [x] `timely/src/progress/mod.rs` +* [x] `timely/src/progress/broadcast.rs` — 1 minor bug found +* [x] `timely/src/progress/change_batch.rs` — 1 bug found +* [x] `timely/src/progress/frontier.rs` +* [x] `timely/src/progress/operate.rs` +* [x] `timely/src/progress/reachability.rs` +* [x] `timely/src/progress/subgraph.rs` +* [x] `timely/src/progress/timestamp.rs` + +### `timely/` — dataflow/channels (8 files) — DONE +* [x] `timely/src/dataflow/channels/mod.rs` +* [x] `timely/src/dataflow/channels/pact.rs` +* [x] `timely/src/dataflow/channels/pullers/` +* [x] `timely/src/dataflow/channels/pushers/` + +### `timely/` — dataflow/operators/generic (7 files) — DONE +* [x] `timely/src/dataflow/operators/generic/builder_raw.rs` +* [x] `timely/src/dataflow/operators/generic/builder_rc.rs` +* [x] `timely/src/dataflow/operators/generic/builder_ref.rs` (empty/stub) +* [x] `timely/src/dataflow/operators/generic/handles.rs` +* [x] `timely/src/dataflow/operators/generic/notificator.rs` — 1 bug found +* [x] `timely/src/dataflow/operators/generic/operator_info.rs` (trivial) +* [x] `timely/src/dataflow/operators/generic/operator.rs` + +### `timely/` — dataflow/operators/capability — DONE +* [x] `timely/src/dataflow/operators/capability.rs` + +### `timely/` — dataflow/operators/core (20 files) — DONE +* [x] `timely/src/dataflow/operators/core/mod.rs` +* [x] `timely/src/dataflow/operators/core/input.rs` +* [x] `timely/src/dataflow/operators/core/unordered_input.rs` +* [x] `timely/src/dataflow/operators/core/exchange.rs` +* [x] `timely/src/dataflow/operators/core/feedback.rs` +* [x] `timely/src/dataflow/operators/core/enterleave.rs` +* [x] `timely/src/dataflow/operators/core/concat.rs` +* [x] `timely/src/dataflow/operators/core/filter.rs` +* [x] `timely/src/dataflow/operators/core/inspect.rs` +* [x] `timely/src/dataflow/operators/core/map.rs` +* [x] `timely/src/dataflow/operators/core/ok_err.rs` +* [x] `timely/src/dataflow/operators/core/partition.rs` +* [x] `timely/src/dataflow/operators/core/probe.rs` +* [x] `timely/src/dataflow/operators/core/rc.rs` +* [x] `timely/src/dataflow/operators/core/reclock.rs` +* [x] `timely/src/dataflow/operators/core/to_stream.rs` +* [x] `timely/src/dataflow/operators/core/capture/mod.rs` +* [x] `timely/src/dataflow/operators/core/capture/event.rs` +* [x] `timely/src/dataflow/operators/core/capture/capture.rs` +* [x] `timely/src/dataflow/operators/core/capture/replay.rs` +* [x] `timely/src/dataflow/operators/core/capture/extract.rs` + +### `timely/` — dataflow/operators/vec (17 files) — DONE +* [x] `timely/src/dataflow/operators/vec/mod.rs` +* [x] `timely/src/dataflow/operators/vec/input.rs` +* [x] `timely/src/dataflow/operators/vec/unordered_input.rs` +* [x] `timely/src/dataflow/operators/vec/broadcast.rs` +* [x] `timely/src/dataflow/operators/vec/count.rs` +* [x] `timely/src/dataflow/operators/vec/branch.rs` +* [x] `timely/src/dataflow/operators/vec/delay.rs` +* [x] `timely/src/dataflow/operators/vec/filter.rs` +* [x] `timely/src/dataflow/operators/vec/flow_controlled.rs` +* [x] `timely/src/dataflow/operators/vec/map.rs` +* [x] `timely/src/dataflow/operators/vec/partition.rs` +* [x] `timely/src/dataflow/operators/vec/queue.rs` (dead code, not in mod.rs) +* [x] `timely/src/dataflow/operators/vec/result.rs` +* [x] `timely/src/dataflow/operators/vec/to_stream.rs` +* [x] `timely/src/dataflow/operators/vec/aggregation/mod.rs` +* [x] `timely/src/dataflow/operators/vec/aggregation/aggregate.rs` +* [x] `timely/src/dataflow/operators/vec/aggregation/state_machine.rs` + +### `timely/` — dataflow/scopes (2 files) — DONE +* [x] `timely/src/dataflow/scopes/mod.rs` +* [x] `timely/src/dataflow/scopes/child.rs` + +### `timely/` — top-level (11 files) — DONE +* [x] `timely/src/lib.rs` +* [x] `timely/src/worker.rs` — 1 bug found +* [x] `timely/src/execute.rs` +* [x] `timely/src/order.rs` +* [x] `timely/src/logging.rs` +* [x] `timely/src/dataflow/stream.rs` +* [x] `timely/src/scheduling/activate.rs` +* [x] `timely/src/synchronization/mod.rs` +* [x] `timely/src/synchronization/barrier.rs` +* [x] `timely/src/synchronization/sequence.rs` diff --git a/prompt_alloc.md b/prompt_alloc.md new file mode 100644 index 000000000..c94534ea8 --- /dev/null +++ b/prompt_alloc.md @@ -0,0 +1,60 @@ +# Allocation audit tracker + +Look for: memory leaks, unbounded growth, unnecessary allocations in hot paths, +missing capacity hints, redundant clones, Vec/String allocations that could be avoided. + +## Crates and areas + +### `bytes/` (1 file) — DONE +* [x] `bytes/src/lib.rs` — clean (Arc-based sharing is efficient) + +### `container/` (1 file) — DONE +* [x] `container/src/lib.rs` — 3 findings (VecDeque never shrinks, ensure_capacity math, clone-by-reference) + +### `logging/` (1 file) — DONE +* [x] `logging/src/lib.rs` — 2 findings (Rc cycle hazard in Logger API, Instant::elapsed per log call) + +### `communication/` (20 files) — DONE +* [x] `communication/src/lib.rs` — clean +* [x] `communication/src/initialize.rs` — 2 findings (BytesRefill double indirection, missing capacity hints) +* [x] `communication/src/buzzer.rs` — clean +* [x] `communication/src/logging.rs` — clean +* [x] `communication/src/networking.rs` — 1 finding (unconditional println in polling loop) +* [x] `communication/src/allocator/mod.rs` — clean +* [x] `communication/src/allocator/process.rs` — 3 findings (quadratic sender clones, pusher clones, unbounded receive drain) +* [x] `communication/src/allocator/counters.rs` — 2 findings (per-message event push, unconditional buzzing) +* [x] `communication/src/allocator/generic.rs` — clean +* [x] `communication/src/allocator/canary.rs` — clean +* [x] `communication/src/allocator/thread.rs` — 2 findings (VecDeque never shrinks, dead recycling code) +* [x] `communication/src/allocator/zero_copy/mod.rs` — clean +* [x] `communication/src/allocator/zero_copy/bytes_slab.rs` — 2 findings (in_progress unbounded, stash limit timing) +* [x] `communication/src/allocator/zero_copy/bytes_exchange.rs` — 2 findings (MergeQueue unbounded, Drop allocates) +* [x] `communication/src/allocator/zero_copy/push_pull.rs` — clean +* [x] `communication/src/allocator/zero_copy/allocator.rs` — 3 findings (per-channel queue unbounded, staged never shrinks, missing capacity hint) +* [x] `communication/src/allocator/zero_copy/allocator_process.rs` — 3 findings (same patterns as allocator.rs) +* [x] `communication/src/allocator/zero_copy/tcp.rs` — 3 findings (unnecessary clone in unicast, stageds never shrink, stash never shrinks) +* [x] `communication/src/allocator/zero_copy/stream.rs` — clean +* [x] `communication/src/allocator/zero_copy/initialize.rs` — clean + +### `timely/` — progress (8 files) — DONE +* [x] `timely/src/progress/mod.rs` — clean +* [x] `timely/src/progress/broadcast.rs` — 2 findings (logging allocates Vecs per send/recv, clone in recv) +* [x] `timely/src/progress/change_batch.rs` — clean (amortized compaction is by design) +* [x] `timely/src/progress/frontier.rs` — 2 findings (hash allocates temp Vec, rebuild clones) +* [x] `timely/src/progress/operate.rs` — clean +* [x] `timely/src/progress/reachability.rs` — 3 findings (worklist never shrinks, logging allocates Vecs, time clone per edge) +* [x] `timely/src/progress/subgraph.rs` — 3 findings (temp_active never shrinks, maybe_shutdown never shrinks, timestamp clones per target) +* [x] `timely/src/progress/timestamp.rs` — clean + +### `timely/` — dataflow (all files) — DONE +* [x] `timely/src/dataflow/` — 15 findings (broadcast O(peers*records), capability double-update, notificator clone instead of move, EventLink per-event Rc, reclock O(n^2), exchange time clones, partition intermediate buffering, delay per-element Vec, handles staging never shrinks, counter double borrow_mut, etc.) + +### `timely/` — top-level — DONE +* [x] `timely/src/lib.rs` — clean +* [x] `timely/src/worker.rs` — 3 findings (repeated logger lookup, dataflow format allocations, collect instead of take) +* [x] `timely/src/execute.rs` — clean +* [x] `timely/src/order.rs` — clean +* [x] `timely/src/logging.rs` — 1 finding (BatchLogger allocates Vec per progress event) +* [x] `timely/src/scheduling/activate.rs` — 4 findings (SyncActivator path clone, activate_after path alloc, BinaryHeap never shrinks, slices/bounds never shrink) +* [x] `timely/src/synchronization/barrier.rs` — 1 finding (Worker clone deep-clones Config) +* [x] `timely/src/synchronization/sequence.rs` — 2 findings (re-sort entire recvd, clone per peer) diff --git a/prompt_api.md b/prompt_api.md new file mode 100644 index 000000000..540023b6e --- /dev/null +++ b/prompt_api.md @@ -0,0 +1,35 @@ +# API consistency audit tracker + +## Crates and areas + +### `container/` traits — DONE +* [x] `Accountable`, `DrainContainer`, `SizableContainer`, `PushInto`, `ContainerBuilder` + +### `bytes/` types — DONE +* [x] `Bytes` vs `BytesMut` API symmetry — 1 inconsistency found + +### `communication/` traits and types — DONE +* [x] `Push`/`Pull` trait and implementations +* [x] `Allocate` trait and implementations +* [x] `Bytesable` trait + +### `timely/` — progress types — DONE +* [x] `ChangeBatch` vs `MutableAntichain` vs `Antichain` — 3 inconsistencies found + +### `timely/` — dataflow/operators: `core` vs `vec` — DONE +* [x] Input / UnorderedInput +* [x] Exchange, Filter, Map, Partition, ToStream +* [x] Remaining operator pairs (Branch, Delay, Broadcast, etc.) +* 6 inconsistencies found + +### `timely/` — dataflow/operators/generic — DONE +* [x] Builder variants (raw, rc, ref) +* [x] Operator construction APIs + +### `timely/` — dataflow/channels — DONE +* [x] Pushers and pullers across channel types + +### `timely/` — top-level — DONE +* [x] Worker, Config, execution entry points +* [x] Stream type methods +* [x] Input Handle — 1 inconsistency found diff --git a/prompt_hotloop.md b/prompt_hotloop.md new file mode 100644 index 000000000..05dc65412 --- /dev/null +++ b/prompt_hotloop.md @@ -0,0 +1,43 @@ +# Hot loop optimization audit tracker + +Look for: +* Loop-invariant branches that could be hoisted +* Virtual dispatch (dyn trait calls) inside tight loops +* Iterator chains that prevent auto-vectorization +* Data layouts that prevent SIMD (AoS vs SoA, non-contiguous memory) +* Indirect indexing patterns that block vectorization +* Branchy per-element logic that could be branchless or batched + +## Crates and areas + +### `bytes/` (1 file) — DONE +* [x] `bytes/src/lib.rs` — 1 finding (Arc overhead, low impact) + +### `container/` (1 file) — DONE +* [x] `container/src/lib.rs` — 2 findings (per-element capacity check prevents vectorization, ensure_capacity per push) + +### `logging/` (1 file) — DONE +* [x] `logging/src/lib.rs` — 3 findings (dyn FnMut action dispatch, RefCell per log, time.elapsed per log) + +### `communication/` (20 files) — DONE +* [x] `communication/src/allocator/` — 3 findings (per-element event push+buzzer, RefCell per push in counters) +* [x] `communication/src/allocator/zero_copy/` — 6 findings (dyn Pull vtable, Rc> per push, HashMap per message in receive, spinlock without PAUSE, MergeQueue lock hold, logger loop-invariant) +* [x] `communication/src/` (top-level) — covered by zero_copy findings + +### `timely/` — progress (8 files) — DONE +* [x] `timely/src/progress/change_batch.rs` — 2 findings (re-sorts clean prefix, two-pass consolidation) +* [x] `timely/src/progress/frontier.rs` — 2 findings (redundant frontier scans, O(n*m) rebuild) +* [x] `timely/src/progress/reachability.rs` — 1 finding (BinaryHeap worklist overhead) +* [x] `timely/src/progress/subgraph.rs` — 2 findings (virtual dispatch through dyn Schedule, RefCell borrow churn) +* [x] `timely/src/progress/operate.rs` — 1 finding (BTreeMap for 1-2 ports) +* [x] `timely/src/progress/broadcast.rs` — 1 finding (per-element clone in recv) +* [x] `timely/src/progress/` (remaining) — clean + +### `timely/` — dataflow (all files) — DONE +* [x] `timely/src/dataflow/channels/` — 5 findings (Counter+Tee per-message overhead, exchange random cache access, exchange inline(never), dyn PushSet dispatch) +* [x] `timely/src/dataflow/operators/` — 7 findings (delay per-datum assert+HashMap, filter per-element vs retain, partition BTreeMap, branch per-datum give, map via flat_map, handles staging sort, for_each_time grouping) + +### `timely/` — top-level — DONE +* [x] `timely/src/worker.rs` — 4 findings (HashMap per channel event, borrow_mut inside loop, RefCell churn on activations, logger checks) +* [x] `timely/src/scheduling/activate.rs` — 3 findings (unconditional compaction, indirect slice sort, BinaryHeap) +* [x] `timely/src/` (remaining) — clean diff --git a/prompt_mdbook.md b/prompt_mdbook.md new file mode 100644 index 000000000..14d33f317 --- /dev/null +++ b/prompt_mdbook.md @@ -0,0 +1,53 @@ +# mdbook documentation audit tracker + +Compare documentation against actual implementation. +Look for: broken code examples, incorrect API signatures, outdated behavior descriptions, dead links, stale references. + +## Chapters + +### Chapter 0 — Motivation — DONE +* [x] `chapter_0.md` — clean +* [x] `chapter_0_0.md` — clean +* [x] `chapter_0_1.md` — 3 findings (broken example links, outdated build output, version mismatch) +* [x] `chapter_0_2.md` — clean +* [x] `chapter_0_3.md` — 1 finding (outdated Rayon URL) + +### Chapter 1 — Core concepts — DONE +* [x] `chapter_1.md` — clean +* [x] `chapter_1_1.md` — clean +* [x] `chapter_1_2.md` — clean +* [x] `chapter_1_3.md` — clean + +### Chapter 2 — Building dataflows — DONE +* [x] `chapter_2.md` — clean +* [x] `chapter_2_1.md` — clean +* [x] `chapter_2_2.md` — clean +* [x] `chapter_2_3.md` — clean +* [x] `chapter_2_4.md` — clean +* [x] `chapter_2_5.md` — clean + +### Chapter 3 — Running dataflows — DONE +* [x] `chapter_3.md` — clean +* [x] `chapter_3_1.md` — clean +* [x] `chapter_3_2.md` — clean +* [x] `chapter_3_3.md` — clean +* [x] `chapter_3_4.md` — clean + +### Chapter 4 — Advanced — DONE +* [x] `chapter_4.md` — clean +* [x] `chapter_4_1.md` — clean +* [x] `chapter_4_2.md` — clean +* [x] `chapter_4_3.md` — 1 finding (deref bug in delay example) +* [x] `chapter_4_4.md` — 4 findings (stale capture/replay implementations, abomonation reference, dead kafkaesque link) +* [x] `chapter_4_5.md` — 3 findings (self-acknowledged "LARGELY INCORRECT", wrong Data/ExchangeData trait definitions, missing import) + +### Chapter 5 — Internals — DONE +* [x] `chapter_5.md` — clean +* [x] `chapter_5_1.md` — 3 findings (wrong Config enum, wrong Allocate signature, wrong Bytesable signature) +* [x] `chapter_5_2.md` — 4 findings (Source/Target field name, Timestamp/PathSummary trait signatures, stale Operate description) +* [x] `chapter_5_3.md` — 1 finding (TODO placeholder instead of content) + +### Build infrastructure — DONE +* [x] `build.rs` — works correctly (generates doc tests from markdown) +* [x] `lib.rs` — clean +* [x] All 47 compilable code examples pass diff --git a/prompt_panic.md b/prompt_panic.md new file mode 100644 index 000000000..a75d52cbe --- /dev/null +++ b/prompt_panic.md @@ -0,0 +1,57 @@ +# Panic audit tracker + +## Crates and areas + +### `bytes/` (1 file) — DONE +* [x] `bytes/src/lib.rs` — 3 findings (try_regenerate naming, extract_to asserts) + +### `container/` (1 file) — DONE +* [x] `container/src/lib.rs` — 1 finding (record_count overflow for ZSTs) + +### `logging/` (1 file) — DONE +* [x] `logging/src/lib.rs` — 1 finding (RefCell re-entrancy) + +### `communication/` (20 files) — DONE +* [x] `communication/src/lib.rs` — clean +* [x] `communication/src/initialize.rs` — 2 findings (WorkerGuards::drop, assert_eq) +* [x] `communication/src/buzzer.rs` — clean +* [x] `communication/src/logging.rs` — clean +* [x] `communication/src/networking.rs` — 5 findings (I/O expects, unchecked network index) +* [x] `communication/src/allocator/mod.rs` — clean +* [x] `communication/src/allocator/process.rs` — 5 findings (mutex poison, channel expects, downcast) +* [x] `communication/src/allocator/counters.rs` — clean +* [x] `communication/src/allocator/generic.rs` — clean +* [x] `communication/src/allocator/canary.rs` — clean +* [x] `communication/src/allocator/thread.rs` — clean +* [x] `communication/src/allocator/zero_copy/mod.rs` — clean +* [x] `communication/src/allocator/zero_copy/bytes_slab.rs` — clean +* [x] `communication/src/allocator/zero_copy/bytes_exchange.rs` — 7 findings (MergeQueue poison, mutex, Drop panic) +* [x] `communication/src/allocator/zero_copy/push_pull.rs` — 3 findings (asserts, expect on header write) +* [x] `communication/src/allocator/zero_copy/allocator.rs` — 5 findings (channel expects, identifier asserts) +* [x] `communication/src/allocator/zero_copy/allocator_process.rs` — 4 findings (channel expects, identifier assert) +* [x] `communication/src/allocator/zero_copy/tcp.rs` — 11 findings (tcp_panic on all I/O, unchecked array index, MergeQueue expect) +* [x] `communication/src/allocator/zero_copy/stream.rs` — clean +* [x] `communication/src/allocator/zero_copy/initialize.rs` — 4 findings (CommsGuard::drop expects, set_nonblocking expect) + +### `timely/` — progress (8 files) — DONE +* [x] `timely/src/progress/mod.rs` — clean +* [x] `timely/src/progress/broadcast.rs` — clean +* [x] `timely/src/progress/change_batch.rs` — clean (indexing is guarded) +* [x] `timely/src/progress/frontier.rs` — clean (debug_assert only) +* [x] `timely/src/progress/operate.rs` — 1 finding (add_port duplicate panic) +* [x] `timely/src/progress/reachability.rs` — 6 findings (unchecked indexing throughout, unwraps in is_acyclic) +* [x] `timely/src/progress/subgraph.rs` — 5 findings (children index assert, child 0 shape assert, operator validation, premature shutdown panic, validate_progress panic) +* [x] `timely/src/progress/timestamp.rs` — clean + +### `timely/` — dataflow (all files) — DONE +* [x] `timely/src/dataflow/` (channels, operators, scopes, stream) — 16 findings (deserialization panics, capability panics, advance_to assert, delay asserts, capture I/O panics, tee add_pusher) + +### `timely/` — top-level — DONE +* [x] `timely/src/lib.rs` — 4 findings (deserialize expect, size assert, serialize expects) +* [x] `timely/src/worker.rs` — 3 findings (empty address panics) +* [x] `timely/src/execute.rs` — clean +* [x] `timely/src/order.rs` — clean +* [x] `timely/src/logging.rs` — clean +* [x] `timely/src/scheduling/activate.rs` — clean (all guarded) +* [x] `timely/src/synchronization/barrier.rs` — clean (overflow infeasible) +* [x] `timely/src/synchronization/sequence.rs` — 3 findings (activator unwrap before step, Drop expect before step) diff --git a/prompt_silent.md b/prompt_silent.md new file mode 100644 index 000000000..b4d9c0ddd --- /dev/null +++ b/prompt_silent.md @@ -0,0 +1,57 @@ +# Silent error swallowing audit tracker + +## Crates and areas + +### `bytes/` (1 file) — DONE +* [x] `bytes/src/lib.rs` — clean + +### `container/` (1 file) — DONE +* [x] `container/src/lib.rs` — clean (unwrap_or_default is intentional) + +### `logging/` (1 file) — DONE +* [x] `logging/src/lib.rs` — clean + +### `communication/` (20 files) — DONE +* [x] `communication/src/lib.rs` — clean +* [x] `communication/src/initialize.rs` — clean +* [x] `communication/src/buzzer.rs` — clean +* [x] `communication/src/logging.rs` — clean +* [x] `communication/src/networking.rs` — 1 finding (.ok() on cursor read, intentional) +* [x] `communication/src/allocator/mod.rs` — clean +* [x] `communication/src/allocator/process.rs` — 2 findings (silent send, try_recv conflation) +* [x] `communication/src/allocator/counters.rs` — 1 finding (dropped event notification) +* [x] `communication/src/allocator/generic.rs` — clean +* [x] `communication/src/allocator/canary.rs` — clean +* [x] `communication/src/allocator/thread.rs` — clean +* [x] `communication/src/allocator/zero_copy/mod.rs` — clean +* [x] `communication/src/allocator/zero_copy/bytes_slab.rs` — clean +* [x] `communication/src/allocator/zero_copy/bytes_exchange.rs` — clean +* [x] `communication/src/allocator/zero_copy/push_pull.rs` — clean +* [x] `communication/src/allocator/zero_copy/allocator.rs` — 3 findings (infinite spin-loop, silent data drop, buffered data drop) +* [x] `communication/src/allocator/zero_copy/allocator_process.rs` — 3 findings (same patterns) +* [x] `communication/src/allocator/zero_copy/tcp.rs` — clean (all errors panic via tcp_panic) +* [x] `communication/src/allocator/zero_copy/stream.rs` — clean +* [x] `communication/src/allocator/zero_copy/initialize.rs` — clean + +### `timely/` — progress (8 files) — DONE +* [x] `timely/src/progress/mod.rs` — clean +* [x] `timely/src/progress/broadcast.rs` — clean (conditional logging only) +* [x] `timely/src/progress/change_batch.rs` — clean +* [x] `timely/src/progress/frontier.rs` — clean (discarded update_iter in constructors) +* [x] `timely/src/progress/operate.rs` — clean +* [x] `timely/src/progress/reachability.rs` — 1 finding (cycle detection prints to stdout) +* [x] `timely/src/progress/subgraph.rs` — clean +* [x] `timely/src/progress/timestamp.rs` — clean + +### `timely/` — dataflow (all files) — DONE +* [x] `timely/src/dataflow/` (channels, operators, scopes, stream) — 1 finding (capture send silently dropped) + +### `timely/` — top-level — DONE +* [x] `timely/src/lib.rs` — clean +* [x] `timely/src/worker.rs` — 2 findings (unknown channel ignored with TODO, optional logging) +* [x] `timely/src/execute.rs` — clean +* [x] `timely/src/order.rs` — clean +* [x] `timely/src/logging.rs` — clean +* [x] `timely/src/scheduling/activate.rs` — 1 finding (delay ignored when timer is None) +* [x] `timely/src/synchronization/barrier.rs` — clean +* [x] `timely/src/synchronization/sequence.rs` — clean (Weak cleanup is intentional) diff --git a/prompt_unsafe.md b/prompt_unsafe.md new file mode 100644 index 000000000..a5d36272d --- /dev/null +++ b/prompt_unsafe.md @@ -0,0 +1,23 @@ +# Unsafe code audit tracker + +Look for: all `unsafe` blocks, `unsafe impl`, and `unsafe fn` across the codebase. +Validate soundness, check for UB, aliasing violations, incorrect Send/Sync impls, and unsound abstractions. + +## Crates and areas + +### `bytes/` (1 file) — DONE +* [x] `bytes/src/lib.rs` — 4 unsafe usages (3 `from_raw_parts`/`from_raw_parts_mut`, 1 `unsafe impl Send`) + +### `container/` (1 file) — DONE +* [x] `container/src/lib.rs` — clean (no unsafe) + +### `logging/` (1 file) — DONE +* [x] `logging/src/lib.rs` — clean (no unsafe) + +### `communication/` (20 files) — DONE +* [x] All files — clean (no unsafe in library code) +* [x] `communication/examples/lgalloc.rs` — 2 usages (example code, `from_raw_parts`/`from_raw_parts_mut` from lgalloc pointer) + +### `timely/` — DONE +* [x] `timely/src/order.rs` — 1 usage (`unsafe fn copy` implementing `columnation::Region` trait) +* [x] All other files — clean (no unsafe)