From 8a408f9db3ad1715a95a8c0a7032692de0bd6cdc Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 18 Mar 2026 22:16:10 -0400 Subject: [PATCH] Live visualizer --- mdbook/src/SUMMARY.md | 2 + mdbook/src/chapter_5/chapter_5_4.md | 240 ++++ timely/Cargo.toml | 2 + timely/examples/logging-live.rs | 77 ++ timely/src/lib.rs | 3 + timely/src/visualizer.rs | 232 ++++ visualizer/index.html | 1643 +++++++++++++++++++++++++++ 7 files changed, 2199 insertions(+) create mode 100644 mdbook/src/chapter_5/chapter_5_4.md create mode 100644 timely/examples/logging-live.rs create mode 100644 timely/src/visualizer.rs create mode 100644 visualizer/index.html diff --git a/mdbook/src/SUMMARY.md b/mdbook/src/SUMMARY.md index 45ad4fb99..fc0ba42c2 100644 --- a/mdbook/src/SUMMARY.md +++ b/mdbook/src/SUMMARY.md @@ -36,3 +36,5 @@ - [Internals](./chapter_5/chapter_5.md) - [Communication](./chapter_5/chapter_5_1.md) - [Progress Tracking](./chapter_5/chapter_5_2.md) + - [Containers](./chapter_5/chapter_5_3.md) + - [Logging](./chapter_5/chapter_5_4.md) diff --git a/mdbook/src/chapter_5/chapter_5_4.md b/mdbook/src/chapter_5/chapter_5_4.md new file mode 100644 index 000000000..57ec47e29 --- /dev/null +++ b/mdbook/src/chapter_5/chapter_5_4.md @@ -0,0 +1,240 @@ +# Logging + +Timely dataflow provides a comprehensive logging infrastructure that records structural and runtime events as the dataflow executes. +These events allow you to reconstruct the dataflow graph, understand how data flows across scope boundaries, and profile operator execution. + +All events are logged to named log streams, and each event carries a `Duration` timestamp (elapsed time since the worker started). +The primary log stream is `"timely"`, which carries `TimelyEvent` variants. +Additional typed log streams exist for progress and summary information (e.g. `"timely/progress/"` and `"timely/summary/"`). + +## Structural Events + +These events describe the shape of the dataflow graph. They are logged once during construction. + +### OperatesEvent + +Logged when an operator is created within a scope. + +| Field | Type | Description | +|--------|--------------|-------------| +| `id` | `usize` | Worker-unique identifier for the operator, allocated by the worker. | +| `addr` | `Vec` | Hierarchical address: the path from the root scope to this operator. | +| `name` | `String` | Human-readable name (e.g. `"Map"`, `"Feedback"`, `"Subgraph"`). | + +The `addr` field encodes the nesting structure. +For example, an address of `[0, 2, 1]` means: child 0 of the root, then child 2 within that scope, then child 1 within that. +Within any scope, child indices start at 1 for actual operators; index 0 is reserved (see [Scope Boundary Conventions](#scope-boundary-conventions) below). + +The `id` field is a flat, worker-unique integer. +It is the key used by all other events (`ScheduleEvent`, `ShutdownEvent`, `MessagesEvent` via channels, etc.) to refer to this operator. +Two different workers will generally assign different `id` values to corresponding operators, but the `addr` will be the same. + +### ChannelsEvent + +Logged when a data channel is created between two operators (or between an operator and a scope boundary). + +| Field | Type | Description | +|--------------|------------------|-------------| +| `id` | `usize` | Worker-unique channel identifier. | +| `scope_addr` | `Vec` | Address of the scope that *contains* this channel. | +| `source` | `(usize, usize)` | `(operator_index, output_port)` of the source within the containing scope. | +| `target` | `(usize, usize)` | `(operator_index, input_port)` of the target within the containing scope. | +| `typ` | `String` | The container type transported on this channel, as a string. | + +The `source` and `target` tuples use **scope-local** operator indices (not the worker-unique `id` from `OperatesEvent`). +To resolve them, find the `OperatesEvent` whose `addr` equals `scope_addr` with the operator index appended. +For example, if `scope_addr` is `[0, 2]` and `source` is `(3, 0)`, the source operator has address `[0, 2, 3]` and you want output port 0. + +When either the source or target operator index is 0, the channel crosses a scope boundary. See [Scope Boundary Conventions](#scope-boundary-conventions). + +### CommChannelsEvent + +Logged when a communication channel (for inter-worker exchange) is established. + +| Field | Type | Description | +|--------------|-------------------|-------------| +| `identifier` | `usize` | Communication channel identifier. | +| `kind` | `CommChannelKind` | Either `Progress` or `Data`. | + +## Runtime Events + +These events describe what happens as the dataflow executes. + +### ScheduleEvent + +Logged when an operator begins or finishes a scheduling invocation. + +| Field | Type | Description | +|--------------|-------------|-------------| +| `id` | `usize` | Worker-unique operator identifier (same as `OperatesEvent::id`). | +| `start_stop` | `StartStop` | `Start` when the operator begins executing, `Stop` when it returns. | + +A matched pair of `Start` and `Stop` events brackets one invocation of the operator's `schedule()` method. +These pairs let you measure per-operator execution time. + +### MessagesEvent + +Logged when a batch of data is sent or received on a channel. + +| Field | Type | Description | +|----------------|---------|-------------| +| `is_send` | `bool` | `true` for a send, `false` for a receive. | +| `channel` | `usize` | Channel identifier (same as `ChannelsEvent::id`). | +| `source` | `usize` | Source worker index. | +| `target` | `usize` | Target worker index. | +| `seq_no` | `usize` | Sequence number for this (source, target) pair on this channel. | +| `record_count` | `i64` | Number of records in the batch. | + +For channels that stay within a single worker, `source` and `target` will be the same worker index. +For exchange (inter-worker) channels, they may differ. +The `record_count` comes from the container's `Accountable` trait implementation (e.g. `Vec::len()` cast to `i64`). + +### ShutdownEvent + +Logged when an operator is permanently shut down. + +| Field | Type | Description | +|-------|---------|-------------| +| `id` | `usize` | Worker-unique operator identifier. | + +### PushProgressEvent + +Logged when frontier changes are pushed to an operator. + +| Field | Type | Description | +|---------|---------|-------------| +| `op_id` | `usize` | Worker-unique operator identifier. | + +### InputEvent + +Logged around input ingestion logic. + +| Field | Type | Description | +|--------------|-------------|-------------| +| `start_stop` | `StartStop` | `Start` or `Stop`. | + +### ParkEvent + +Logged when a worker parks (goes idle waiting for external events) or wakes up. + +| Variant | Description | +|--------------------|-------------| +| `Park(Option)` | Worker parks, with an optional maximum sleep duration. | +| `Unpark` | Worker wakes from a parked state. | + +### ApplicationEvent + +User-defined start/stop events for custom instrumentation. + +| Field | Type | Description | +|------------|---------|-------------| +| `id` | `usize` | User-chosen event type identifier. | +| `is_start` | `bool` | `true` when activity begins, `false` when it stops. | + +## Scope Boundary Conventions + +Understanding scope boundaries is essential for interpreting `ChannelsEvent` data and reconstructing the full dataflow graph across nested scopes. + +### Child Zero + +By convention, **child index 0** within any scope is a pseudo-operator representing the scope's own boundary — its interface with its parent. +It is not a real operator; you will not see an `OperatesEvent` with a trailing 0 in its address. +Instead, child zero is the mechanism by which channels inside a scope connect to channels outside. + +Child zero's ports are **inverted** relative to the scope's external interface: + +- **Child zero's outputs** are the scope's **inputs** (data arriving from the parent). +- **Child zero's inputs** are the scope's **outputs** (data leaving to the parent). + +This inversion makes the internal wiring uniform: every channel inside a scope connects an operator output to an operator input, even when one end is the scope boundary. + +### Connecting Parent and Child + +When a scope (say, an iterative scope) appears as operator `K` in its parent, and you look inside that scope, the relationship is: + +| Parent perspective | Child perspective | +|--------------------|-------------------| +| Operator `K`, input port `i` | Child zero, output port `i` | +| Operator `K`, output port `j` | Child zero, input port `j` | + +So if you see a `ChannelsEvent` in the parent scope with `target: (K, i)`, the data enters the child scope and appears as if it came from child zero's output port `i`. +Inside the child scope, a `ChannelsEvent` with `source: (0, i)` connects that incoming data to whatever internal operator consumes it. + +Similarly, data produced inside the child scope that should leave the scope is connected via a `ChannelsEvent` with `target: (0, j)` inside the child scope, and emerges as output port `j` of operator `K` in the parent scope. + +### Worked Example + +Consider a dataflow with an iterative scope: + +``` +worker.dataflow(|scope| { // root scope, addr [] + input // operator at [1] + .enter(scope.iterative(|inner| { // iterative scope at [2] + inner // ... operators inside at [2, 1], [2, 2], etc. + .map(...) // e.g. [2, 1] + .filter(...) // e.g. [2, 2] + })) + .inspect(...) // operator at [3] +}); +``` + +You would see structural events like: + +1. `OperatesEvent { id: _, addr: [0], name: "Dataflow" }` — the root scope itself. +2. `OperatesEvent { id: _, addr: [0, 1], name: "Input" }` — the input operator. +3. `OperatesEvent { id: _, addr: [0, 2], name: "Iterative" }` — the iterative scope (appears as an operator in the root). +4. `OperatesEvent { id: _, addr: [0, 2, 1], name: "Map" }` — the map, inside the iterative scope. +5. `OperatesEvent { id: _, addr: [0, 2, 2], name: "Filter" }` — the filter, inside the iterative scope. +6. `OperatesEvent { id: _, addr: [0, 3], name: "Inspect" }` — the inspect, in the root scope. + +Channel events in the root scope (`scope_addr: [0]`) connecting `Input` to the iterative scope: +- `ChannelsEvent { scope_addr: [0], source: (1, 0), target: (2, 0), ... }` — from Input's output 0 to the iterative scope's input 0. + +Channel events inside the iterative scope (`scope_addr: [0, 2]`): +- `ChannelsEvent { scope_addr: [0, 2], source: (0, 0), target: (1, 0), ... }` — from child zero's output 0 (= scope input 0) to Map's input 0. +- `ChannelsEvent { scope_addr: [0, 2], source: (1, 0), target: (2, 0), ... }` — from Map's output 0 to Filter's input 0. +- `ChannelsEvent { scope_addr: [0, 2], source: (2, 0), target: (0, 0), ... }` — from Filter's output 0 to child zero's input 0 (= scope output 0). + +And back in the root scope: +- `ChannelsEvent { scope_addr: [0], source: (2, 0), target: (3, 0), ... }` — from the iterative scope's output 0 to Inspect's input 0. + +This chain shows data flowing: Input → [into scope via child zero] → Map → Filter → [out of scope via child zero] → Inspect. + +### Reconstructing the Full Graph + +To reconstruct the dataflow graph from logged events: + +1. **Build the operator tree** from `OperatesEvent` entries, using `addr` to establish parent-child relationships. Any operator whose `addr` has length `n` is a child of the operator (scope) whose `addr` is the first `n-1` elements. + +2. **Build per-scope channel graphs** from `ChannelsEvent` entries. Group channels by `scope_addr`. Within each scope, the `source` and `target` pairs give you directed edges between scope-local operator indices. + +3. **Stitch across scope boundaries** using child zero. When a channel in scope `S` has source or target operator index 0, it connects to the scope's external interface. Find the operator in `S`'s parent whose `addr` equals `S`, and link the corresponding port. + +4. **Correlate runtime events** using the worker-unique `id` from `OperatesEvent` to join `ScheduleEvent`, `ShutdownEvent`, and other events. Use `ChannelsEvent::id` to join `MessagesEvent` records to their channel. + +## Additional Log Streams + +Beyond the main `"timely"` stream, there are typed log streams for deeper introspection: + +- **`"timely/progress/"`** — `TimelyProgressEvent`: detailed progress messages (sends and receives of timestamp capability changes between workers). These include per-operator frontier updates with `(node, port, timestamp, delta)` tuples. + +- **`"timely/summary/"`** — `OperatesSummaryEvent`: the internal connectivity summary of each operator, describing for each (input, output) pair the set of timestamp transformations (path summaries) that the operator may apply. These are logged once during construction. + +- **`"timely/reachability/"`** — reachability tracker events, recording how pointstamps propagate through the dataflow graph. + +## Registering a Logger + +To consume logging events, register a callback with the worker's log registry before building the dataflow: + +```rust +worker.log_register() + .insert::("timely", |time, data| { + if let Some(data) = data { + for (elapsed, event) in data.iter() { + println!("{elapsed:?}\t{event:?}"); + } + } + }); +``` + +You can also use `BatchLogger` to forward events into a timely capture stream for downstream processing. diff --git a/timely/Cargo.toml b/timely/Cargo.toml index f45a04b40..e72c05cc2 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -21,6 +21,7 @@ workspace = true [features] default = ["getopts"] getopts = ["dep:getopts", "timely_communication/getopts"] +visualizer = ["dep:tungstenite"] [dependencies] columnar = { workspace = true } @@ -35,3 +36,4 @@ timely_logging = { path = "../logging", version = "0.27" } timely_communication = { path = "../communication", version = "0.27", default-features = false } timely_container = { path = "../container", version = "0.27" } smallvec = { version = "1.15.1", features = ["serde", "const_generics"] } +tungstenite = { version = "0.26", optional = true } diff --git a/timely/examples/logging-live.rs b/timely/examples/logging-live.rs new file mode 100644 index 000000000..3c5a397ed --- /dev/null +++ b/timely/examples/logging-live.rs @@ -0,0 +1,77 @@ +//! Streams timely events over WebSocket for live visualization. +//! +//! Usage: +//! cargo run --example logging-live --features visualizer [-- timely args] +//! +//! Then open `visualizer/index.html` in a browser and connect to `ws://localhost:51371`. + +use timely::dataflow::operators::{Input, Exchange, Enter, Leave, Inspect, Probe, Feedback, ConnectLoop, Concat}; +use timely::dataflow::operators::vec::{Map, Filter}; +use timely::dataflow::{InputHandle, Scope}; +use timely::visualizer::Server; + +fn main() { + let port = std::env::var("TIMELY_VIS_PORT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(51371); + + let server = Server::start(port); + + timely::execute_from_args(std::env::args(), move |worker| { + + server.register(worker); + + // A richer dataflow to exercise the visualizer: + // + // Input -> Map -> Exchange -> Filter ----> Region[ Inspect ] -> Probe + // \ ^ + // +-> Feedback ----+ + // + // This gives us: multiple pipeline stages, an exchange, branching, + // a nested scope (region), a feedback loop with a back-edge, and + // real data flowing through channels. + let mut input = InputHandle::new(); + let mut probe = timely::dataflow::ProbeHandle::new(); + worker.dataflow(|scope| { + let (handle, cycle) = scope.feedback::>(1); + + let mapped = scope + .input_from(&mut input) + .container::>() + .map(|x: u64| x.wrapping_mul(17).wrapping_add(3)) + .exchange(|&x: &u64| x); + + let filtered = mapped.filter(|&x: &u64| x % 2 == 0); + + let looped = filtered.clone().concat(cycle); + + scope.region(|inner| { + looped + .enter(inner) + .inspect(|_x| { }) + .leave() + }) + .probe_with(&mut probe); + + // Feed back values that haven't reached zero yet. + filtered + .map(|x: u64| x / 4) + .filter(|&x: &u64| x > 0) + .connect_loop(handle); + }); + + // Continuously feed data so events keep flowing. + let mut round = 0u64; + loop { + for i in 0..100u64 { + input.send(round * 100 + i); + } + round += 1; + input.advance_to(round); + while probe.less_than(input.time()) { + worker.step(); + } + } + }).unwrap(); +} diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 39625f7e3..e4de275c4 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -96,6 +96,9 @@ pub mod order; pub mod logging; // pub mod log_events; +#[cfg(feature = "visualizer")] +pub mod visualizer; + pub mod scheduling; /// A composite trait for types usable as containers in timely dataflow. diff --git a/timely/src/visualizer.rs b/timely/src/visualizer.rs new file mode 100644 index 000000000..1979d740a --- /dev/null +++ b/timely/src/visualizer.rs @@ -0,0 +1,232 @@ +//! Live dataflow visualization over WebSocket. +//! +//! This module provides a [`Server`] that streams timely logging events as JSON +//! over WebSocket to the browser-based visualizer (`visualizer/index.html`). +//! +//! # Usage +//! +//! ```ignore +//! use timely::visualizer::Server; +//! +//! let server = Server::start(51371); +//! +//! timely::execute_from_args(std::env::args(), move |worker| { +//! server.register(worker); +//! // ... build and run dataflows ... +//! }).unwrap(); +//! ``` +//! +//! Then open `visualizer/index.html` in a browser and connect to `ws://localhost:51371`. +//! +//! Requires the `visualizer` feature flag. + +use std::net::{TcpListener, TcpStream}; +use std::sync::{Arc, Mutex, mpsc}; +use std::thread; +use std::time::{Duration, Instant}; + +use tungstenite::{accept, Message}; + +use crate::logging::{TimelyEvent, TimelyEventBuilder, StartStop}; +use crate::worker::Worker; +use crate::communication::Allocate; + +/// A handle to the visualization WebSocket server. +/// +/// The server accepts browser connections and streams timely events as JSON. +/// Structural events (`Operates`, `Channels`) are replayed to late-connecting +/// clients so they can reconstruct the dataflow graph. +/// +/// `Server` is cheaply cloneable — all clones share the same underlying channel +/// and server thread. The server thread runs until all `Server` handles (and +/// their associated senders) are dropped. +#[derive(Clone)] +pub struct Server { + tx: Arc>>, +} + +impl Server { + /// Start the WebSocket visualization server on the given port. + /// + /// Spawns a background thread that accepts WebSocket connections and + /// broadcasts events. The thread exits when all `Server` handles are + /// dropped. + /// + /// # Panics + /// + /// Panics if the port cannot be bound. + pub fn start(port: u16) -> Self { + let (tx, rx) = mpsc::channel::(); + + thread::spawn(move || run_ws_server(port, rx)); + + eprintln!("Visualizer WebSocket server on ws://localhost:{port}"); + eprintln!("Open visualizer/index.html and connect to the address above."); + + Server { + tx: Arc::new(Mutex::new(tx)), + } + } + + /// Register the timely event logger for this worker. + /// + /// This installs a logging callback on the `"timely"` log stream that + /// serializes events as JSON and sends them to the WebSocket server. + pub fn register(&self, worker: &mut Worker) { + let tx = Arc::clone(&self.tx); + let worker_index = worker.index(); + + worker.log_register().unwrap().insert::( + "timely", + move |_time, data| { + if let Some(data) = data { + let tx = tx.lock().unwrap(); + for (elapsed, event) in data.iter() { + let duration_ns = + elapsed.as_secs() as u64 * 1_000_000_000 + + elapsed.subsec_nanos() as u64; + if let Some(json) = event_to_json(worker_index, duration_ns, event) { + let _ = tx.send(json); + } + } + } + }, + ); + } +} + +/// Convert a timely event to a JSON string: `[worker, duration_ns, event]`. +fn event_to_json(worker: usize, duration_ns: u64, event: &TimelyEvent) -> Option { + let event_json = match event { + TimelyEvent::Operates(e) => { + let addr: Vec = e.addr.iter().map(|a| a.to_string()).collect(); + let name = e.name.replace('\\', "\\\\").replace('"', "\\\""); + format!( + r#"{{"Operates": {{"id": {}, "addr": [{}], "name": "{}"}}}}"#, + e.id, addr.join(", "), name, + ) + } + TimelyEvent::Channels(e) => { + let scope_addr: Vec = e.scope_addr.iter().map(|a| a.to_string()).collect(); + let typ = e.typ.replace('\\', "\\\\").replace('"', "\\\""); + format!( + r#"{{"Channels": {{"id": {}, "scope_addr": [{}], "source": [{}, {}], "target": [{}, {}], "typ": "{}"}}}}"#, + e.id, scope_addr.join(", "), + e.source.0, e.source.1, e.target.0, e.target.1, typ, + ) + } + TimelyEvent::Schedule(e) => { + let ss = match e.start_stop { + StartStop::Start => "\"Start\"", + StartStop::Stop => "\"Stop\"", + }; + format!( + r#"{{"Schedule": {{"id": {}, "start_stop": {}}}}}"#, + e.id, ss, + ) + } + TimelyEvent::Messages(e) => { + format!( + r#"{{"Messages": {{"is_send": {}, "channel": {}, "source": {}, "target": {}, "seq_no": {}, "record_count": {}}}}}"#, + e.is_send, e.channel, e.source, e.target, e.seq_no, e.record_count, + ) + } + TimelyEvent::Shutdown(e) => { + format!(r#"{{"Shutdown": {{"id": {}}}}}"#, e.id) + } + _ => return None, + }; + Some(format!("[{worker}, {duration_ns}, {event_json}]")) +} + +const FLUSH_INTERVAL: Duration = Duration::from_millis(250); + +/// Run the WebSocket server. Batches events and replays structural events to +/// late-connecting clients. +fn run_ws_server(port: u16, rx: mpsc::Receiver) { + let listener = TcpListener::bind(format!("0.0.0.0:{port}")) + .unwrap_or_else(|e| panic!("Failed to bind to port {port}: {e}")); + listener.set_nonblocking(true).expect("Cannot set non-blocking"); + + let mut clients: Vec> = Vec::new(); + let mut batch: Vec = Vec::new(); + let mut replay: Vec = Vec::new(); + let mut done = false; + + loop { + let deadline = Instant::now() + FLUSH_INTERVAL; + + // Accept pending connections (non-blocking). + loop { + match listener.accept() { + Ok((stream, addr)) => { + eprintln!("Visualizer client connected from {addr}"); + stream.set_nonblocking(false).ok(); + match accept(stream) { + Ok(mut ws) => { + if !replay.is_empty() { + let payload = format!("[{}]", replay.join(",")); + let msg = Message::Text(payload.into()); + if ws.send(msg).is_err() { + eprintln!("Failed to send replay; dropping client"); + continue; + } + } + clients.push(ws); + } + Err(e) => eprintln!("WebSocket handshake failed: {e}"), + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => { + eprintln!("Accept error: {e}"); + break; + } + } + } + + // Drain the channel until the flush deadline. + loop { + let remaining = deadline.saturating_duration_since(Instant::now()); + match rx.recv_timeout(remaining) { + Ok(json) => { + if json.contains("\"Operates\"") || json.contains("\"Channels\"") { + replay.push(json.clone()); + } + batch.push(json); + } + Err(mpsc::RecvTimeoutError::Timeout) => break, + Err(mpsc::RecvTimeoutError::Disconnected) => { done = true; break; } + } + } + + // Flush batch to all connected clients. + if !batch.is_empty() && !clients.is_empty() { + let payload = format!("[{}]", batch.join(",")); + let msg = Message::Text(payload.into()); + clients.retain_mut(|ws| { + match ws.send(msg.clone()) { + Ok(_) => true, + Err(_) => { + eprintln!("Visualizer client disconnected"); + false + } + } + }); + } + batch.clear(); + + if done { break; } + } + + // Close all clients gracefully. + for ws in clients.iter_mut() { + let _ = ws.close(None); + loop { + match ws.read() { + Ok(_) => continue, + Err(_) => break, + } + } + } +} diff --git a/visualizer/index.html b/visualizer/index.html new file mode 100644 index 000000000..ba9cb7f4c --- /dev/null +++ b/visualizer/index.html @@ -0,0 +1,1643 @@ + + + + + + Timely Dataflow Visualizer + + + + + +
+
+ + + or + + +
+
+ Drop a JSON log file here, or click to browse +
+
+ Live: run cargo run --example logging-live and click Connect.
+ File: a JSON array of [worker, duration_ns, event] triples + (generate with cargo run --example logging-json > events.json). +
+
+ + + +
+ + + +