diff --git a/Cargo.toml b/Cargo.toml index e0b811ad9..8995ab151 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ #"tpchlike", #"doop", "mdbook", + "diagnostics", ] resolver = "2" diff --git a/diagnostics/Cargo.toml b/diagnostics/Cargo.toml new file mode 100644 index 000000000..c4a4f89a3 --- /dev/null +++ b/diagnostics/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "diagnostics" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true + +[dependencies] +differential-dataflow.workspace = true +timely = { workspace = true, features = ["getopts"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tungstenite = "0.26" diff --git a/diagnostics/examples/scc-bench.rs b/diagnostics/examples/scc-bench.rs new file mode 100644 index 000000000..549562ce8 --- /dev/null +++ b/diagnostics/examples/scc-bench.rs @@ -0,0 +1,126 @@ +//! SCC benchmark with diagnostics. +//! +//! Usage: scc-bench [timely args] [--log-logging] [nodes [edges [batch [rounds]]]] +//! +//! Supports standard timely arguments like `-w4` for 4 workers. With multiple +//! workers, all workers' diagnostics are exchanged to worker 0 for serving. +//! +//! Start this, then serve the diagnostics UI and open it in a browser: +//! +//! cd diagnostics && python3 -m http.server 8000 +//! open http://localhost:8000/index.html +//! +//! Click Connect (defaults to ws://localhost:51371). +//! +//! Pass --log-logging to also see the diagnostics dataflow itself. + +use std::hash::{Hash, Hasher}; +use std::collections::hash_map::DefaultHasher; + +use timely::dataflow::operators::probe::Handle; + +use differential_dataflow::input::Input; +use differential_dataflow::algorithms::graphs::scc::strongly_connected; + +use diagnostics::logging; +use diagnostics::server::Server; + +fn hash_to_u64(value: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + hasher.finish() +} + +fn edge_for(index: usize, nodes: usize) -> (usize, usize) { + let h1 = hash_to_u64(&index); + let h2 = hash_to_u64(&h1); + ((h1 as usize) % nodes, (h2 as usize) % nodes) +} + +fn main() { + let timer = std::time::Instant::now(); + + // Extract our flags before timely consumes its args. + let log_logging = std::env::args().any(|a| a == "--log-logging"); + + // timely::execute_from_args handles -w, -n, -p, -h and passes the rest through. + timely::execute_from_args(std::env::args(), move |worker| { + // Register diagnostics on each worker. + let state = logging::register(worker, log_logging); + + // Start the WebSocket server on worker 0 only. With multiple + // workers (-w N), all workers' data is exchanged to worker 0 + // via the DD arrangements, so the browser sees everything. + // + // Non-server workers must drop the SinkHandle so its BatchLogger + // sends a capability retraction and the client input Replay can + // advance. Without this, the Replay holds its frontier at time 0 + // forever, blocking the cross-join from producing output. + let _server = if worker.index() == 0 { + Some(Server::start(51371, state.sink)) + } else { + drop(state.sink); + None + }; + + // Parse positional args (skip flags consumed by timely and ourselves). + let positional: Vec = std::env::args() + .skip(1) + .filter(|a| !a.starts_with('-')) + .collect(); + let nodes: usize = positional.get(0).and_then(|s| s.parse().ok()).unwrap_or(100_000); + let edges: usize = positional.get(1).and_then(|s| s.parse().ok()).unwrap_or(200_000); + let batch: usize = positional.get(2).and_then(|s| s.parse().ok()).unwrap_or(1_000); + let rounds: usize = positional.get(3).and_then(|s| s.parse().ok()).unwrap_or(usize::MAX); + + if worker.index() == 0 { + println!("nodes: {nodes}, edges: {edges}, batch: {batch}, rounds: {}, workers: {}", + if rounds == usize::MAX { "∞".to_string() } else { rounds.to_string() }, + worker.peers()); + } + + let mut probe = Handle::new(); + let mut input = worker.dataflow(|scope| { + let (input, graph) = scope.new_collection::<(usize, usize), isize>(); + let _scc = strongly_connected(graph).probe_with(&mut probe); + input + }); + + let index = worker.index(); + let peers = worker.peers(); + + // Load initial edges (partitioned across workers). + let timer_load = std::time::Instant::now(); + for i in (0..edges).filter(|i| i % peers == index) { + input.insert(edge_for(i, nodes)); + } + input.advance_to(1); + input.flush(); + while probe.less_than(input.time()) { + worker.step(); + } + if index == 0 { + println!("{:?}\t{:?}\tloaded {edges} edges", timer.elapsed(), timer_load.elapsed()); + } + + // Apply changes in rounds. + for round in 0..rounds { + let timer_round = std::time::Instant::now(); + for i in (0..batch).filter(|i| i % peers == index) { + input.remove(edge_for(round * batch + i, nodes)); + input.insert(edge_for(edges + round * batch + i, nodes)); + } + input.advance_to(round + 2); + input.flush(); + while probe.less_than(input.time()) { + worker.step(); + } + if index == 0 { + println!("{:?}\t{:?}\tround {round} ({} changes)", + timer.elapsed(), timer_round.elapsed(), batch * 2); + } + } + }).unwrap(); + + println!("{:?}\tshut down", timer.elapsed()); +} diff --git a/diagnostics/examples/smoke.rs b/diagnostics/examples/smoke.rs new file mode 100644 index 000000000..d63937545 --- /dev/null +++ b/diagnostics/examples/smoke.rs @@ -0,0 +1,96 @@ +//! Smoke test: run a small DD computation with diagnostics and a WS server. +//! +//! Start this, then open a browser console and connect: +//! let ws = new WebSocket("ws://localhost:51371"); +//! ws.onmessage = e => console.log(JSON.parse(e.data)); +//! +//! You should see operator, channel, and stat updates flowing. + +use std::hash::{Hash, Hasher}; +use std::collections::hash_map::DefaultHasher; +use std::time::Duration; + +use timely::dataflow::operators::probe::Handle; +use differential_dataflow::input::Input; + +use diagnostics::logging; +use diagnostics::server::Server; + +fn hash_to_u64(value: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + hasher.finish() +} + +fn edge_for(index: usize, nodes: usize) -> (usize, usize) { + let h1 = hash_to_u64(&index); + let h2 = hash_to_u64(&h1); + ((h1 as usize) % nodes, (h2 as usize) % nodes) +} + +fn main() { + let timer = std::time::Instant::now(); + + timely::execute(timely::Config::thread(), move |worker| { + // Register diagnostics (log_logging = true to see the diagnostics dataflow itself). + let state = logging::register(worker, true); + + // Start the WebSocket server on worker 0 only. + // Non-server workers drop the SinkHandle so the client input + // Replay can advance its frontier. + let _server = if worker.index() == 0 { + Some(Server::start(51371, state.sink)) + } else { + drop(state.sink); + None + }; + + // Build a user dataflow. + let nodes = 1000; + let edges = 2000; + let batch = 100; + + let mut probe = Handle::new(); + let mut input = worker.dataflow(|scope| { + let (input, graph) = scope.new_collection::<(usize, usize), isize>(); + graph + .map(|(src, _dst)| src) + .probe_with(&mut probe); + input + }); + + // Load initial edges. + for i in 0..edges { + input.insert(edge_for(i, nodes)); + } + input.advance_to(1); + input.flush(); + while probe.less_than(input.time()) { + worker.step(); + } + if worker.index() == 0 { + eprintln!("{:?}\tloaded {edges} edges", timer.elapsed()); + } + + // Run rounds of changes, keeping the server alive for browsers to connect. + for round in 0..usize::MAX { + let round_timer = std::time::Instant::now(); + for i in 0..batch { + input.remove(edge_for(round * batch + i, nodes)); + input.insert(edge_for(edges + round * batch + i, nodes)); + } + input.advance_to(round + 2); + input.flush(); + while probe.less_than(input.time()) { + worker.step(); + } + if worker.index() == 0 && round % 100 == 0 { + eprintln!("{:?}\t{:?}\tround {round}", timer.elapsed(), round_timer.elapsed()); + } + + // Slow down so there's time to connect a browser. + std::thread::sleep(Duration::from_millis(10)); + } + }) + .unwrap(); +} diff --git a/diagnostics/index.html b/diagnostics/index.html new file mode 100644 index 000000000..403e39b2d --- /dev/null +++ b/diagnostics/index.html @@ -0,0 +1,1145 @@ + + + + + + Differential Dataflow Diagnostics + + + + + + +
+
+ + +
+
+ + + +
+ + + + diff --git a/diagnostics/src/lib.rs b/diagnostics/src/lib.rs new file mode 100644 index 000000000..1f61ba037 --- /dev/null +++ b/diagnostics/src/lib.rs @@ -0,0 +1,62 @@ +//! Diagnostics and visualization for timely and differential dataflow. +//! +//! This crate provides a live diagnostics console for timely and differential +//! dataflow computations. It captures logging events, maintains them as +//! differential dataflow collections with indexed arrangements, and serves +//! them to browser clients over WebSocket. +//! +//! # Quick start +//! +//! ```ignore +//! use diagnostics::{logging, server::Server}; +//! +//! timely::execute(config, |worker| { +//! // Register diagnostics logging on each worker. +//! let state = logging::register(worker, false); +//! +//! // Start the WebSocket server on worker 0 only. +//! // Other workers drop their sink handles (diagnostics collections +//! // are still maintained, but not served). +//! let _server = if worker.index() == 0 { +//! Some(Server::start(51371, state.sink)) +//! } else { +//! None +//! }; +//! +//! // Build your dataflow as usual... +//! // worker.dataflow(|scope| { ... }); +//! }); +//! ``` +//! +//! To view the diagnostics, serve the included `index.html` over HTTP +//! (browsers restrict WebSocket connections from `file://` URLs): +//! +//! ```text +//! cd diagnostics && python3 -m http.server 8000 +//! ``` +//! +//! Then open `http://localhost:8000/index.html` and click Connect. +//! The browser connects to `ws://localhost:51371` for live data. +//! +//! With multiple workers (`-w N`), all workers' diagnostics are exchanged +//! to worker 0 via the DD arrangements, so the browser sees everything. +//! +//! # Architecture +//! +//! [`logging::register`] builds a dataflow that: +//! 1. Captures timely and differential logging events via [`EventLink`](timely::dataflow::operators::capture::EventLink) pairs. +//! 2. Demuxes them into typed DD collections (operators, channels, schedule elapsed, message counts, arrangement stats). +//! 3. Arranges them into indexed traces for persistence. +//! 4. Cross-joins all collections with a client input: when a browser connects, the join naturally produces the full current state as a batch of diffs, followed by incremental updates. +//! 5. Captures the output into an `mpsc` channel for the WebSocket thread. +//! +//! [`server::Server`] runs a WebSocket server that: +//! - Accepts browser connections and assigns client IDs. +//! - Announces connect/disconnect to the dataflow via the client input channel. +//! - Reads diagnostic updates from the capture channel and routes them as JSON to the appropriate browser client. +//! +//! The browser-side `index.html` simply applies the diffs to maintain local +//! state and renders the dataflow graph — no client-side aggregation needed. + +pub mod logging; +pub mod server; diff --git a/diagnostics/src/logging.rs b/diagnostics/src/logging.rs new file mode 100644 index 000000000..b47721e62 --- /dev/null +++ b/diagnostics/src/logging.rs @@ -0,0 +1,662 @@ +//! Logging dataflows for timely and differential dataflow events. +//! +//! Captures timely and differential logging events, replays them as timely +//! streams, and maintains them as differential dataflow collections with +//! indexed arrangements and a client-driven sink. +//! +//! # Architecture +//! +//! The [`register`] function: +//! 1. Creates `EventLink` pairs for timely and DD event capture. +//! 2. Builds a dataflow that replays events into DD collections and arranges them. +//! 3. Creates a cross-join with a client input collection: when a client appears +//! at +1, the join naturally replays the full current state as updates. +//! 4. Captures the joined output via `mpsc` for the WebSocket thread. +//! 5. Registers logging callbacks that push events into the links. +//! +//! The WebSocket thread communicates client connect/disconnect via a +//! [`ClientInput`] that pushes through an `mpsc` channel, and reads +//! diagnostic updates from another `mpsc` channel. +//! +//! Timestamps use `Duration` (matching timely's logging infrastructure). +//! Timestamp quantization (rounding to interval boundaries) is done as a DD +//! `map_in_place` operation on the collections, keeping the logging layer simple. + +use std::borrow::Cow; +use std::collections::BTreeMap; +use std::rc::Rc; +use std::sync::mpsc; +use std::time::{Duration, Instant}; + +use differential_dataflow::collection::concatenate; +use differential_dataflow::logging::{DifferentialEvent, DifferentialEventBuilder}; +use differential_dataflow::operators::arrange::TraceAgent; +use differential_dataflow::trace::implementations::{KeySpine, ValSpine}; +use differential_dataflow::{AsCollection, VecCollection}; + +use timely::communication::Allocate; +use timely::container::CapacityContainerBuilder; +use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::operators::capture::{Event, EventLink, Replay, Capture}; +use timely::dataflow::operators::Exchange; +use timely::dataflow::operators::vec::Map; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use timely::dataflow::operators::generic::OutputBuilder; +use timely::dataflow::{Scope, Stream}; +use timely::logging::{ + BatchLogger, OperatesEvent, StartStop, TimelyEvent, TimelyEventBuilder, +}; +use timely::worker::Worker; + +use serde::{Serialize, Deserialize}; + +// ============================================================================ +// ClientInput — manages client connect/disconnect events across threads +// ============================================================================ + +/// Container type for client connection events: `(client_id, time, diff)`. +type ClientContainer = Vec<(usize, Duration, i64)>; + +/// Sends client connect/disconnect events to the diagnostics dataflow. +/// +/// The WebSocket server thread uses this to announce clients. On drop, +/// sends a capability retraction so the dataflow's client input frontier +/// can advance (important for multi-worker setups where non-server workers +/// must release their frontier). +pub struct ClientInput { + sender: mpsc::Sender>, + time: Duration, +} + +impl ClientInput { + /// Announce a client connection. + pub fn connect(&mut self, client_id: usize, elapsed: Duration) { + let _ = self + .sender + .send(Event::Messages(self.time, vec![(client_id, elapsed, 1)])); + self.advance(elapsed); + } + + /// Announce a client disconnection. + pub fn disconnect(&mut self, client_id: usize, elapsed: Duration) { + let _ = self + .sender + .send(Event::Messages(self.time, vec![(client_id, elapsed, -1)])); + self.advance(elapsed); + } + + /// Advance the capability to `elapsed`. Call periodically so the + /// dataflow's frontier can progress. + pub fn advance(&mut self, elapsed: Duration) { + if self.time < elapsed { + let _ = self + .sender + .send(Event::Progress(vec![(elapsed, 1), (self.time, -1)])); + self.time = elapsed; + } + } +} + +impl Drop for ClientInput { + fn drop(&mut self) { + let _ = self + .sender + .send(Event::Progress(vec![(self.time, -1)])); + } +} + +// ============================================================================ +// MpscEventIterator — bridges mpsc::Receiver to timely's EventIterator +// ============================================================================ + +/// Wraps an `mpsc::Receiver` as an `EventIterator` for use with `Replay`. +struct MpscEventIterator { + receiver: mpsc::Receiver>, +} + +impl timely::dataflow::operators::capture::event::EventIterator + for MpscEventIterator +{ + fn next(&mut self) -> Option>> { + self.receiver.try_recv().ok().map(Cow::Owned) + } +} + +// ============================================================================ +// Diagnostic update types +// ============================================================================ + +/// Identifies the kind of a key-only statistic (diff carries the value). +#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub enum StatKind { + Elapsed, + Messages, + ArrangementBatches, + ArrangementRecords, + Sharing, + BatcherRecords, + BatcherSize, + BatcherCapacity, + BatcherAllocations, +} + +/// A tagged diagnostic update sent to the WebSocket thread. +/// +/// Each variant carries enough information for the browser to apply the update. +/// The diff on the containing `(D, time, diff)` triple carries the magnitude. +#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)] +pub enum DiagnosticUpdate { + /// Operator appeared (+1) or disappeared (-1). Diff is ±1. + Operator { id: usize, name: String, addr: Vec }, + /// Channel appeared (+1). Diff is ±1. + Channel { id: usize, scope_addr: Vec, source: (usize, usize), target: (usize, usize) }, + /// A key-only statistic keyed by operator/channel id. Diff carries the value. + Stat { kind: StatKind, id: usize }, +} + +/// The container type for captured diagnostic output. +/// Each element is `((client_id, update), time, diff)`. +pub type DiagnosticContainer = Vec<((usize, DiagnosticUpdate), Duration, i64)>; + +/// The event type received by the WebSocket thread. +pub type DiagnosticEvent = Event; + +// ============================================================================ +// Trace handle types +// ============================================================================ + +/// A key-value trace: key K, value V, time Duration, diff i64. +type ValTrace = TraceAgent>; +/// A key-only trace: key K, time Duration, diff i64. +type KeyTrace = TraceAgent>; + +/// Trace handles for timely logging arrangements. +pub struct TimelyTraces { + /// Live operators arranged by id → (name, addr). + pub operators: ValTrace)>, + /// Live channels arranged by id → (scope_addr, source, target). + pub channels: ValTrace, (usize, usize), (usize, usize))>, + /// Schedule elapsed per operator (diff = nanoseconds). + pub elapsed: KeyTrace, + /// Message records sent per channel (diff = record count). + pub messages: KeyTrace, +} + +/// Trace handles for differential dataflow logging arrangements. +pub struct DifferentialTraces { + pub arrangement_batches: KeyTrace, + pub arrangement_records: KeyTrace, + pub sharing: KeyTrace, + pub batcher_records: KeyTrace, + pub batcher_size: KeyTrace, + pub batcher_capacity: KeyTrace, + pub batcher_allocations: KeyTrace, +} + +// ============================================================================ +// SinkHandle — returned to the caller for WebSocket integration +// ============================================================================ + +/// Handle for the WebSocket thread to interact with the diagnostics dataflow. +/// +/// **Important:** The WS thread must call `client_input.advance(elapsed)` +/// periodically (e.g., every 100ms–1s) to advance the client input's frontier. +/// Without this, the cross-join's output frontier won't advance and the capture +/// operator will never emit `Event::Progress` messages. +pub struct SinkHandle { + /// Input for the WS thread to send client connect/disconnect events. + pub client_input: ClientInput, + /// Receiver for diagnostic updates produced by the cross-join. + /// + /// Each `Event::Messages(time, vec)` contains `((client_id, update), time, diff)` + /// triples. The WS thread routes updates to clients by `client_id`. + pub output_receiver: mpsc::Receiver, + /// The reference instant for computing elapsed durations. + /// Use `start.elapsed()` when calling `client_input.advance()`. + pub start: Instant, +} + +/// Everything returned by [`register`]. +pub struct LoggingState { + pub traces: LoggingTraces, + pub sink: SinkHandle, +} + +/// All trace handles. +pub struct LoggingTraces { + pub timely: TimelyTraces, + pub differential: DifferentialTraces, +} + +// ============================================================================ +// Timestamp quantization +// ============================================================================ + +/// Default quantization interval. +const INTERVAL: Duration = Duration::from_secs(1); + +/// Round a Duration up to the next multiple of `interval`. +fn quantize(time: Duration, interval: Duration) -> Duration { + let nanos = time.as_nanos(); + let interval_nanos = interval.as_nanos(); + let rounded = (nanos / interval_nanos + 1) * interval_nanos; + Duration::from_nanos(rounded as u64) +} + +/// Quantize timestamps in a collection's inner stream. +fn quantize_collection( + collection: VecCollection, + interval: Duration, +) -> VecCollection +where + S: Scope, + D: differential_dataflow::Data, +{ + collection + .inner + .map_in_place(move |(_, time, _)| *time = quantize(*time, interval)) + .as_collection() +} + +// ============================================================================ +// Registration +// ============================================================================ + +/// Register diagnostics logging for a worker. +/// +/// Builds a dataflow that: +/// 1. Captures timely and differential logging events into DD collections. +/// 2. Arranges them into indexed traces for persistence. +/// 3. Cross-joins all collections with a client input, so new clients +/// automatically receive the full current state as updates. +/// 4. Captures the output for the WebSocket thread via `mpsc`. +/// +/// If `log_logging` is true, the diagnostics dataflow itself will appear in +/// the timely logs. +/// +/// Returns a [`LoggingState`] with trace handles and a [`SinkHandle`] for +/// the WebSocket thread. +pub fn register(worker: &mut Worker, log_logging: bool) -> LoggingState { + let start = Instant::now(); + + // Event links for logging capture (worker-internal, Rc-based). + let t_link: Rc>> = Rc::new(EventLink::new()); + let d_link: Rc>> = + Rc::new(EventLink::new()); + + // Cross-thread channels for client input and diagnostic output. + let (client_tx, client_rx) = mpsc::channel::>(); + let (output_tx, output_rx) = mpsc::channel::(); + + if log_logging { + install_loggers(worker, t_link.clone(), d_link.clone()); + } + + let traces = worker.dataflow::(|scope| { + // Replay logging events into the dataflow. + let timely_stream = Some(t_link.clone()).replay_into(scope); + let diff_stream = Some(d_link.clone()).replay_into(scope); + + // Build collections and arrangements. + let (t_traces, t_collections) = construct_timely(scope, timely_stream); + let (d_traces, d_collections) = construct_differential(scope, diff_stream); + + // Replay client connection events from the WS thread. + let client_iter = MpscEventIterator { receiver: client_rx }; + let clients: VecCollection<_, usize, i64> = + Some(client_iter).replay_into(scope).as_collection(); + + // Cross-join: clients × each data collection. + let clients_keyed = clients.map(|c| ((), c)); + + // Tag all collections and cross-join with clients. + let operators_tagged = t_collections.operators + .map(|(id, name, addr)| ((), DiagnosticUpdate::Operator { id, name, addr })); + let channels_tagged = t_collections.channels + .map(|(id, scope_addr, source, target)| { + ((), DiagnosticUpdate::Channel { id, scope_addr, source, target }) + }); + + // Key-only stats: tag them all and concat. + let stats = concatenate(scope, vec![ + t_collections.elapsed + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::Elapsed, id })), + t_collections.messages + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::Messages, id })), + d_collections.arrangement_batches + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::ArrangementBatches, id })), + d_collections.arrangement_records + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::ArrangementRecords, id })), + d_collections.sharing + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::Sharing, id })), + d_collections.batcher_records + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::BatcherRecords, id })), + d_collections.batcher_size + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::BatcherSize, id })), + d_collections.batcher_capacity + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::BatcherCapacity, id })), + d_collections.batcher_allocations + .map(|id| ((), DiagnosticUpdate::Stat { kind: StatKind::BatcherAllocations, id })), + ]); + + // Concatenate all tagged collections. + let all_data = concatenate(scope, vec![ + operators_tagged, + channels_tagged, + stats, + ]); + + let output = clients_keyed + .join(all_data) + .map(|((), (client_id, update))| (client_id, update)); + + // Route all output to worker 0 before capture, since only worker 0 + // runs the WebSocket server. + output.inner.exchange(|_| 0).capture_into(output_tx); + + LoggingTraces { + timely: t_traces, + differential: d_traces, + } + }); + + if !log_logging { + install_loggers(worker, t_link, d_link); + } + + LoggingState { + traces, + sink: SinkHandle { + client_input: ClientInput { + sender: client_tx, + time: Duration::default(), + }, + output_receiver: output_rx, + start, + }, + } +} + +fn install_loggers( + worker: &mut Worker, + t_link: Rc>>, + d_link: Rc>>, +) { + let mut registry = worker.log_register().expect("Logging not initialized"); + + // Use timely's BatchLogger directly — it handles progress tracking + // with Duration timestamps, matching the logging framework's epoch. + let mut t_batch = BatchLogger::new(t_link); + registry.insert::("timely", move |time, data| { + t_batch.publish_batch(time, data); + }); + + let mut d_batch = BatchLogger::new(d_link); + registry.insert::("differential/arrange", move |time, data| { + d_batch.publish_batch(time, data); + }); +} + +// ============================================================================ +// Timely event demux +// ============================================================================ + +/// Internal: collections before arrangement, used for the cross-join. +struct TimelyCollections { + operators: VecCollection), i64>, + channels: VecCollection, (usize, usize), (usize, usize)), i64>, + elapsed: VecCollection, + messages: VecCollection, +} + +#[derive(Default)] +struct TimelyDemuxState { + operators: BTreeMap, + schedule_starts: BTreeMap, +} + +/// Build timely logging collections and arrangements. +fn construct_timely>( + scope: &mut S, + stream: Stream>, +) -> (TimelyTraces, TimelyCollections) { + type OpUpdate = ((usize, String, Vec), Duration, i64); + type ChUpdate = ((usize, Vec, (usize, usize), (usize, usize)), Duration, i64); + type ElUpdate = (usize, Duration, i64); + type MsgUpdate = (usize, Duration, i64); + + let mut demux = OperatorBuilder::new("Timely Demux".to_string(), scope.clone()); + let mut input = demux.new_input(stream, Pipeline); + + let (op_out, operates) = demux.new_output::>(); + let mut op_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(op_out); + let (ch_out, channels) = demux.new_output::>(); + let mut ch_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(ch_out); + let (el_out, elapsed) = demux.new_output::>(); + let mut el_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(el_out); + let (msg_out, messages) = demux.new_output::>(); + let mut msg_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(msg_out); + + demux.build(|_capabilities| { + let mut state = TimelyDemuxState::default(); + move |_frontiers| { + let mut op_act = op_out.activate(); + let mut ch_act = ch_out.activate(); + let mut el_act = el_out.activate(); + let mut msg_act = msg_out.activate(); + + input.for_each(|cap, data: &mut Vec<(Duration, TimelyEvent)>| { + let mut ops = op_act.session(&cap); + let mut chs = ch_act.session(&cap); + let mut els = el_act.session(&cap); + let mut msgs = msg_act.session(&cap); + let ts = *cap.time(); + + for (event_time, event) in data.drain(..) { + match event { + TimelyEvent::Operates(e) => { + ops.give(((e.id, e.name.clone(), e.addr.clone()), ts, 1i64)); + state.operators.insert(e.id, e); + } + TimelyEvent::Shutdown(e) => { + if let Some(op) = state.operators.remove(&e.id) { + ops.give(((op.id, op.name, op.addr), ts, -1i64)); + } + } + TimelyEvent::Channels(e) => { + chs.give(( + (e.id, e.scope_addr.clone(), e.source, e.target), + ts, + 1i64, + )); + } + TimelyEvent::Schedule(e) => match e.start_stop { + StartStop::Start => { + state.schedule_starts.insert(e.id, event_time); + } + StartStop::Stop => { + if let Some(start) = state.schedule_starts.remove(&e.id) { + let elapsed_ns = + event_time.saturating_sub(start).as_nanos() as i64; + if elapsed_ns > 0 { + els.give((e.id, ts, elapsed_ns)); + } + } + } + }, + TimelyEvent::Messages(e) => { + if e.is_send { + msgs.give((e.channel, ts, e.record_count as i64)); + } + } + _ => {} + } + } + }); + } + }); + + // Quantize timestamps to interval boundaries. + let op_collection = quantize_collection(operates.as_collection(), INTERVAL); + let ch_collection = quantize_collection(channels.as_collection(), INTERVAL); + let el_collection = quantize_collection(elapsed.as_collection(), INTERVAL); + let msg_collection = quantize_collection(messages.as_collection(), INTERVAL); + + // Arrange for persistence. + let operators = op_collection.clone() + .map(|(id, name, addr)| (id, (name, addr))) + .arrange_by_key_named("Arrange Operators"); + let channels = ch_collection.clone() + .map(|(id, scope_addr, source, target)| (id, (scope_addr, source, target))) + .arrange_by_key_named("Arrange Channels"); + let elapsed = el_collection.clone() + .arrange_by_self_named("Arrange Elapsed"); + let messages = msg_collection.clone() + .arrange_by_self_named("Arrange Messages"); + + let traces = TimelyTraces { + operators: operators.trace, + channels: channels.trace, + elapsed: elapsed.trace, + messages: messages.trace, + }; + + let collections = TimelyCollections { + operators: op_collection, + channels: ch_collection, + elapsed: el_collection, + messages: msg_collection, + }; + + (traces, collections) +} + +// ============================================================================ +// Differential event demux +// ============================================================================ + +/// Internal: collections before arrangement, used for the cross-join. +struct DifferentialCollections { + arrangement_batches: VecCollection, + arrangement_records: VecCollection, + sharing: VecCollection, + batcher_records: VecCollection, + batcher_size: VecCollection, + batcher_capacity: VecCollection, + batcher_allocations: VecCollection, +} + +/// Build differential logging collections and arrangements. +fn construct_differential>( + scope: &mut S, + stream: Stream>, +) -> (DifferentialTraces, DifferentialCollections) { + type Update = (usize, Duration, i64); + + let mut demux = OperatorBuilder::new("Differential Demux".to_string(), scope.clone()); + let mut input = demux.new_input(stream, Pipeline); + + let (bat_out, batches) = demux.new_output::>(); + let mut bat_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(bat_out); + let (rec_out, records) = demux.new_output::>(); + let mut rec_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(rec_out); + let (shr_out, sharing) = demux.new_output::>(); + let mut shr_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(shr_out); + let (br_out, batcher_records) = demux.new_output::>(); + let mut br_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(br_out); + let (bs_out, batcher_size) = demux.new_output::>(); + let mut bs_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(bs_out); + let (bc_out, batcher_capacity) = demux.new_output::>(); + let mut bc_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(bc_out); + let (ba_out, batcher_allocations) = demux.new_output::>(); + let mut ba_out = OutputBuilder::<_, CapacityContainerBuilder>>::from(ba_out); + + demux.build(|_capabilities| { + move |_frontiers| { + let mut bat_act = bat_out.activate(); + let mut rec_act = rec_out.activate(); + let mut shr_act = shr_out.activate(); + let mut br_act = br_out.activate(); + let mut bs_act = bs_out.activate(); + let mut bc_act = bc_out.activate(); + let mut ba_act = ba_out.activate(); + + input.for_each(|cap, data: &mut Vec<(Duration, DifferentialEvent)>| { + let mut bat = bat_act.session(&cap); + let mut rec = rec_act.session(&cap); + let mut shr = shr_act.session(&cap); + let mut b_rec = br_act.session(&cap); + let mut b_sz = bs_act.session(&cap); + let mut b_cap = bc_act.session(&cap); + let mut b_alloc = ba_act.session(&cap); + let ts = *cap.time(); + + for (_event_time, event) in data.drain(..) { + match event { + DifferentialEvent::Batch(e) => { + bat.give((e.operator, ts, 1i64)); + rec.give((e.operator, ts, e.length as i64)); + } + DifferentialEvent::Merge(e) => { + if let Some(complete) = e.complete { + bat.give((e.operator, ts, -1i64)); + let diff = complete as i64 - (e.length1 + e.length2) as i64; + if diff != 0 { + rec.give((e.operator, ts, diff)); + } + } + } + DifferentialEvent::Drop(e) => { + bat.give((e.operator, ts, -1i64)); + let diff = -(e.length as i64); + if diff != 0 { + rec.give((e.operator, ts, diff)); + } + } + DifferentialEvent::TraceShare(e) => { + shr.give((e.operator, ts, e.diff as i64)); + } + DifferentialEvent::Batcher(e) => { + b_rec.give((e.operator, ts, e.records_diff as i64)); + b_sz.give((e.operator, ts, e.size_diff as i64)); + b_cap.give((e.operator, ts, e.capacity_diff as i64)); + b_alloc.give((e.operator, ts, e.allocations_diff as i64)); + } + _ => {} + } + } + }); + } + }); + + // Quantize timestamps to interval boundaries. + let bat_coll = quantize_collection(batches.as_collection(), INTERVAL); + let rec_coll = quantize_collection(records.as_collection(), INTERVAL); + let shr_coll = quantize_collection(sharing.as_collection(), INTERVAL); + let br_coll = quantize_collection(batcher_records.as_collection(), INTERVAL); + let bs_coll = quantize_collection(batcher_size.as_collection(), INTERVAL); + let bc_coll = quantize_collection(batcher_capacity.as_collection(), INTERVAL); + let ba_coll = quantize_collection(batcher_allocations.as_collection(), INTERVAL); + + let traces = DifferentialTraces { + arrangement_batches: bat_coll.clone().arrange_by_self_named("Arrange ArrangementBatches").trace, + arrangement_records: rec_coll.clone().arrange_by_self_named("Arrange ArrangementRecords").trace, + sharing: shr_coll.clone().arrange_by_self_named("Arrange Sharing").trace, + batcher_records: br_coll.clone().arrange_by_self_named("Arrange BatcherRecords").trace, + batcher_size: bs_coll.clone().arrange_by_self_named("Arrange BatcherSize").trace, + batcher_capacity: bc_coll.clone().arrange_by_self_named("Arrange BatcherCapacity").trace, + batcher_allocations: ba_coll.clone().arrange_by_self_named("Arrange BatcherAllocations").trace, + }; + + let collections = DifferentialCollections { + arrangement_batches: bat_coll, + arrangement_records: rec_coll, + sharing: shr_coll, + batcher_records: br_coll, + batcher_size: bs_coll, + batcher_capacity: bc_coll, + batcher_allocations: ba_coll, + }; + + (traces, collections) +} diff --git a/diagnostics/src/server.rs b/diagnostics/src/server.rs new file mode 100644 index 000000000..a0a40c2b6 --- /dev/null +++ b/diagnostics/src/server.rs @@ -0,0 +1,242 @@ +//! WebSocket server that bridges the diagnostics dataflow to browser clients. +//! +//! The server runs on a background thread and manages the full client lifecycle: +//! +//! 1. Accepts WebSocket connections and assigns each a unique client ID. +//! 2. Announces connects/disconnects to the diagnostics dataflow via the +//! [`SinkHandle`](crate::logging::SinkHandle)'s client input channel. +//! 3. Reads diagnostic updates from the capture channel (produced by the +//! dataflow's cross-join of clients × data) and forwards them as JSON +//! to the appropriate browser client. +//! +//! This server only handles the WebSocket data protocol. The browser loads +//! `index.html` (and its JavaScript) from a separate static file server +//! (e.g., `python3 -m http.server 8000`). A future improvement could embed +//! static file serving here so only one port is needed. + +use std::collections::HashMap; +use std::net::TcpListener; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +use serde_json; +use tungstenite::{Message, accept}; + +use timely::dataflow::operators::capture::Event; + +use crate::logging::{DiagnosticUpdate, SinkHandle, StatKind}; + +/// A running diagnostics WebSocket server. +/// +/// Created by [`Server::start`]. The server thread runs in the background +/// until the `Server` is dropped. +pub struct Server { + _handle: thread::JoinHandle<()>, +} + +impl Server { + /// Start the diagnostics WebSocket server on the given port. + /// + /// Takes ownership of the [`SinkHandle`] and moves it to a background + /// thread. When a browser connects to `ws://localhost:{port}`: + /// + /// 1. The server announces the new client to the diagnostics dataflow. + /// 2. The dataflow's cross-join produces the full current state as diffs. + /// 3. The server serializes those diffs as JSON and sends them over the + /// WebSocket, followed by incremental updates as the computation runs. + /// + /// The browser should load `index.html` over HTTP (not `file://`), since + /// browsers restrict WebSocket connections from `file://` origins. Serve + /// the diagnostics directory with any static file server, e.g.: + /// + /// ```text + /// cd diagnostics && python3 -m http.server 8000 + /// ``` + /// + /// Then open `http://localhost:8000/index.html` and click Connect. + /// + /// # Panics + /// + /// Panics if the port cannot be bound. + pub fn start(port: u16, sink: SinkHandle) -> Self { + let handle = thread::spawn(move || run_server(port, sink)); + eprintln!("Diagnostics server on ws://localhost:{port}"); + Server { _handle: handle } + } +} + +/// JSON-serializable update sent to clients. +#[derive(serde::Serialize)] +#[serde(tag = "type")] +enum JsonUpdate<'a> { + Operator { + id: usize, + name: &'a str, + addr: &'a [usize], + diff: i64, + }, + Channel { + id: usize, + scope_addr: &'a [usize], + source: (usize, usize), + target: (usize, usize), + diff: i64, + }, + Stat { + kind: &'a str, + id: usize, + diff: i64, + }, +} + +fn stat_kind_str(kind: &StatKind) -> &'static str { + match kind { + StatKind::Elapsed => "Elapsed", + StatKind::Messages => "Messages", + StatKind::ArrangementBatches => "ArrangementBatches", + StatKind::ArrangementRecords => "ArrangementRecords", + StatKind::Sharing => "Sharing", + StatKind::BatcherRecords => "BatcherRecords", + StatKind::BatcherSize => "BatcherSize", + StatKind::BatcherCapacity => "BatcherCapacity", + StatKind::BatcherAllocations => "BatcherAllocations", + } +} + +fn update_to_json(update: &DiagnosticUpdate, diff: i64) -> serde_json::Value { + match update { + DiagnosticUpdate::Operator { id, name, addr } => { + serde_json::to_value(JsonUpdate::Operator { + id: *id, + name, + addr, + diff, + }) + .unwrap() + } + DiagnosticUpdate::Channel { + id, + scope_addr, + source, + target, + } => serde_json::to_value(JsonUpdate::Channel { + id: *id, + scope_addr, + source: *source, + target: *target, + diff, + }) + .unwrap(), + DiagnosticUpdate::Stat { kind, id } => { + serde_json::to_value(JsonUpdate::Stat { + kind: stat_kind_str(kind), + id: *id, + diff, + }) + .unwrap() + } + } +} + +const FLUSH_INTERVAL: Duration = Duration::from_millis(100); + +fn run_server(port: u16, sink: SinkHandle) { + let listener = TcpListener::bind(format!("0.0.0.0:{port}")) + .unwrap_or_else(|e| panic!("Failed to bind to port {port}: {e}")); + listener + .set_nonblocking(true) + .expect("Cannot set non-blocking"); + + let mut client_input = sink.client_input; + let receiver = sink.output_receiver; + let start = sink.start; + + let mut clients: HashMap> = HashMap::new(); + let mut next_client_id: usize = 0; + + loop { + // Accept pending connections. + loop { + match listener.accept() { + Ok((stream, addr)) => { + eprintln!("Diagnostics client connected from {addr}"); + stream.set_nonblocking(false).ok(); + match accept(stream) { + Ok(ws) => { + let client_id = next_client_id; + next_client_id += 1; + clients.insert(client_id, ws); + + // Announce to the diagnostics dataflow. + client_input.connect(client_id, start.elapsed()); + eprintln!(" assigned client id {client_id}"); + } + Err(e) => eprintln!("WebSocket handshake failed: {e}"), + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, + Err(e) => { + eprintln!("Accept error: {e}"); + break; + } + } + } + + // Drain diagnostic updates and group by client. + let mut batches_by_client: HashMap> = HashMap::new(); + loop { + match receiver.try_recv() { + Ok(Event::Messages(_time, data)) => { + for ((client_id, update), _ts, diff) in data { + let json = update_to_json(&update, diff); + batches_by_client.entry(client_id).or_default().push(json); + } + } + Ok(Event::Progress(_)) => {} + Err(mpsc::TryRecvError::Empty) => break, + Err(mpsc::TryRecvError::Disconnected) => { + eprintln!("Diagnostics output channel closed, shutting down server"); + // Close all clients gracefully. + for (_, mut ws) in clients.drain() { + let _ = ws.close(None); + } + return; + } + } + } + + // Send batched updates to each client. + let mut disconnected = Vec::new(); + for (client_id, updates) in &batches_by_client { + if let Some(ws) = clients.get_mut(client_id) { + if !updates.is_empty() { + let payload = serde_json::to_string(updates).unwrap(); + if ws.send(Message::Text(payload.into())).is_err() { + disconnected.push(*client_id); + } + } + } + } + + // Handle disconnects (also check for clients that closed their end). + for (client_id, ws) in clients.iter_mut() { + // Non-blocking read to detect closed connections. + // tungstenite in blocking mode would block here, so we just + // check on send failure above. + let _ = ws; // placeholder — send failure detection above is sufficient + let _ = client_id; + } + for client_id in disconnected { + clients.remove(&client_id); + client_input.disconnect(client_id, start.elapsed()); + eprintln!("Diagnostics client {client_id} disconnected"); + } + + // Advance time periodically even without client events, so the + // dataflow frontier can progress. + client_input.advance(start.elapsed()); + + std::thread::sleep(FLUSH_INTERVAL); + } +}