Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mdbook/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@
- [Internals](./chapter_5/chapter_5.md)
- [Communication](./chapter_5/chapter_5_1.md)
- [Progress Tracking](./chapter_5/chapter_5_2.md)
- [Containers](./chapter_5/chapter_5_3.md)
- [Logging](./chapter_5/chapter_5_4.md)
240 changes: 240 additions & 0 deletions mdbook/src/chapter_5/chapter_5_4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
# Logging

Timely dataflow provides a comprehensive logging infrastructure that records structural and runtime events as the dataflow executes.
These events allow you to reconstruct the dataflow graph, understand how data flows across scope boundaries, and profile operator execution.

All events are logged to named log streams, and each event carries a `Duration` timestamp (elapsed time since the worker started).
The primary log stream is `"timely"`, which carries `TimelyEvent` variants.
Additional typed log streams exist for progress and summary information (e.g. `"timely/progress/<type>"` and `"timely/summary/<type>"`).

## Structural Events

These events describe the shape of the dataflow graph. They are logged once during construction.

### OperatesEvent

Logged when an operator is created within a scope.

| Field | Type | Description |
|--------|--------------|-------------|
| `id` | `usize` | Worker-unique identifier for the operator, allocated by the worker. |
| `addr` | `Vec<usize>` | Hierarchical address: the path from the root scope to this operator. |
| `name` | `String` | Human-readable name (e.g. `"Map"`, `"Feedback"`, `"Subgraph"`). |

The `addr` field encodes the nesting structure.
For example, an address of `[0, 2, 1]` means: child 0 of the root, then child 2 within that scope, then child 1 within that.
Within any scope, child indices start at 1 for actual operators; index 0 is reserved (see [Scope Boundary Conventions](#scope-boundary-conventions) below).

The `id` field is a flat, worker-unique integer.
It is the key used by all other events (`ScheduleEvent`, `ShutdownEvent`, `MessagesEvent` via channels, etc.) to refer to this operator.
Two different workers will generally assign different `id` values to corresponding operators, but the `addr` will be the same.

### ChannelsEvent

Logged when a data channel is created between two operators (or between an operator and a scope boundary).

| Field | Type | Description |
|--------------|------------------|-------------|
| `id` | `usize` | Worker-unique channel identifier. |
| `scope_addr` | `Vec<usize>` | Address of the scope that *contains* this channel. |
| `source` | `(usize, usize)` | `(operator_index, output_port)` of the source within the containing scope. |
| `target` | `(usize, usize)` | `(operator_index, input_port)` of the target within the containing scope. |
| `typ` | `String` | The container type transported on this channel, as a string. |

The `source` and `target` tuples use **scope-local** operator indices (not the worker-unique `id` from `OperatesEvent`).
To resolve them, find the `OperatesEvent` whose `addr` equals `scope_addr` with the operator index appended.
For example, if `scope_addr` is `[0, 2]` and `source` is `(3, 0)`, the source operator has address `[0, 2, 3]` and you want output port 0.

When either the source or target operator index is 0, the channel crosses a scope boundary. See [Scope Boundary Conventions](#scope-boundary-conventions).

### CommChannelsEvent

Logged when a communication channel (for inter-worker exchange) is established.

| Field | Type | Description |
|--------------|-------------------|-------------|
| `identifier` | `usize` | Communication channel identifier. |
| `kind` | `CommChannelKind` | Either `Progress` or `Data`. |

## Runtime Events

These events describe what happens as the dataflow executes.

### ScheduleEvent

Logged when an operator begins or finishes a scheduling invocation.

| Field | Type | Description |
|--------------|-------------|-------------|
| `id` | `usize` | Worker-unique operator identifier (same as `OperatesEvent::id`). |
| `start_stop` | `StartStop` | `Start` when the operator begins executing, `Stop` when it returns. |

A matched pair of `Start` and `Stop` events brackets one invocation of the operator's `schedule()` method.
These pairs let you measure per-operator execution time.

### MessagesEvent

Logged when a batch of data is sent or received on a channel.

| Field | Type | Description |
|----------------|---------|-------------|
| `is_send` | `bool` | `true` for a send, `false` for a receive. |
| `channel` | `usize` | Channel identifier (same as `ChannelsEvent::id`). |
| `source` | `usize` | Source worker index. |
| `target` | `usize` | Target worker index. |
| `seq_no` | `usize` | Sequence number for this (source, target) pair on this channel. |
| `record_count` | `i64` | Number of records in the batch. |

For channels that stay within a single worker, `source` and `target` will be the same worker index.
For exchange (inter-worker) channels, they may differ.
The `record_count` comes from the container's `Accountable` trait implementation (e.g. `Vec::len()` cast to `i64`).

### ShutdownEvent

Logged when an operator is permanently shut down.

| Field | Type | Description |
|-------|---------|-------------|
| `id` | `usize` | Worker-unique operator identifier. |

### PushProgressEvent

Logged when frontier changes are pushed to an operator.

| Field | Type | Description |
|---------|---------|-------------|
| `op_id` | `usize` | Worker-unique operator identifier. |

### InputEvent

Logged around input ingestion logic.

| Field | Type | Description |
|--------------|-------------|-------------|
| `start_stop` | `StartStop` | `Start` or `Stop`. |

### ParkEvent

Logged when a worker parks (goes idle waiting for external events) or wakes up.

| Variant | Description |
|--------------------|-------------|
| `Park(Option<Duration>)` | Worker parks, with an optional maximum sleep duration. |
| `Unpark` | Worker wakes from a parked state. |

### ApplicationEvent

User-defined start/stop events for custom instrumentation.

| Field | Type | Description |
|------------|---------|-------------|
| `id` | `usize` | User-chosen event type identifier. |
| `is_start` | `bool` | `true` when activity begins, `false` when it stops. |

## Scope Boundary Conventions

Understanding scope boundaries is essential for interpreting `ChannelsEvent` data and reconstructing the full dataflow graph across nested scopes.

### Child Zero

By convention, **child index 0** within any scope is a pseudo-operator representing the scope's own boundary — its interface with its parent.
It is not a real operator; you will not see an `OperatesEvent` with a trailing 0 in its address.
Instead, child zero is the mechanism by which channels inside a scope connect to channels outside.

Child zero's ports are **inverted** relative to the scope's external interface:

- **Child zero's outputs** are the scope's **inputs** (data arriving from the parent).
- **Child zero's inputs** are the scope's **outputs** (data leaving to the parent).

This inversion makes the internal wiring uniform: every channel inside a scope connects an operator output to an operator input, even when one end is the scope boundary.

### Connecting Parent and Child

When a scope (say, an iterative scope) appears as operator `K` in its parent, and you look inside that scope, the relationship is:

| Parent perspective | Child perspective |
|--------------------|-------------------|
| Operator `K`, input port `i` | Child zero, output port `i` |
| Operator `K`, output port `j` | Child zero, input port `j` |

So if you see a `ChannelsEvent` in the parent scope with `target: (K, i)`, the data enters the child scope and appears as if it came from child zero's output port `i`.
Inside the child scope, a `ChannelsEvent` with `source: (0, i)` connects that incoming data to whatever internal operator consumes it.

Similarly, data produced inside the child scope that should leave the scope is connected via a `ChannelsEvent` with `target: (0, j)` inside the child scope, and emerges as output port `j` of operator `K` in the parent scope.

### Worked Example

Consider a dataflow with an iterative scope:

```
worker.dataflow(|scope| { // root scope, addr []
input // operator at [1]
.enter(scope.iterative(|inner| { // iterative scope at [2]
inner // ... operators inside at [2, 1], [2, 2], etc.
.map(...) // e.g. [2, 1]
.filter(...) // e.g. [2, 2]
}))
.inspect(...) // operator at [3]
});
```

You would see structural events like:

1. `OperatesEvent { id: _, addr: [0], name: "Dataflow" }` — the root scope itself.
2. `OperatesEvent { id: _, addr: [0, 1], name: "Input" }` — the input operator.
3. `OperatesEvent { id: _, addr: [0, 2], name: "Iterative" }` — the iterative scope (appears as an operator in the root).
4. `OperatesEvent { id: _, addr: [0, 2, 1], name: "Map" }` — the map, inside the iterative scope.
5. `OperatesEvent { id: _, addr: [0, 2, 2], name: "Filter" }` — the filter, inside the iterative scope.
6. `OperatesEvent { id: _, addr: [0, 3], name: "Inspect" }` — the inspect, in the root scope.

Channel events in the root scope (`scope_addr: [0]`) connecting `Input` to the iterative scope:
- `ChannelsEvent { scope_addr: [0], source: (1, 0), target: (2, 0), ... }` — from Input's output 0 to the iterative scope's input 0.

Channel events inside the iterative scope (`scope_addr: [0, 2]`):
- `ChannelsEvent { scope_addr: [0, 2], source: (0, 0), target: (1, 0), ... }` — from child zero's output 0 (= scope input 0) to Map's input 0.
- `ChannelsEvent { scope_addr: [0, 2], source: (1, 0), target: (2, 0), ... }` — from Map's output 0 to Filter's input 0.
- `ChannelsEvent { scope_addr: [0, 2], source: (2, 0), target: (0, 0), ... }` — from Filter's output 0 to child zero's input 0 (= scope output 0).

And back in the root scope:
- `ChannelsEvent { scope_addr: [0], source: (2, 0), target: (3, 0), ... }` — from the iterative scope's output 0 to Inspect's input 0.

This chain shows data flowing: Input → [into scope via child zero] → Map → Filter → [out of scope via child zero] → Inspect.

### Reconstructing the Full Graph

To reconstruct the dataflow graph from logged events:

1. **Build the operator tree** from `OperatesEvent` entries, using `addr` to establish parent-child relationships. Any operator whose `addr` has length `n` is a child of the operator (scope) whose `addr` is the first `n-1` elements.

2. **Build per-scope channel graphs** from `ChannelsEvent` entries. Group channels by `scope_addr`. Within each scope, the `source` and `target` pairs give you directed edges between scope-local operator indices.

3. **Stitch across scope boundaries** using child zero. When a channel in scope `S` has source or target operator index 0, it connects to the scope's external interface. Find the operator in `S`'s parent whose `addr` equals `S`, and link the corresponding port.

4. **Correlate runtime events** using the worker-unique `id` from `OperatesEvent` to join `ScheduleEvent`, `ShutdownEvent`, and other events. Use `ChannelsEvent::id` to join `MessagesEvent` records to their channel.

## Additional Log Streams

Beyond the main `"timely"` stream, there are typed log streams for deeper introspection:

- **`"timely/progress/<T>"`** — `TimelyProgressEvent<T>`: detailed progress messages (sends and receives of timestamp capability changes between workers). These include per-operator frontier updates with `(node, port, timestamp, delta)` tuples.

- **`"timely/summary/<TS>"`** — `OperatesSummaryEvent<TS>`: the internal connectivity summary of each operator, describing for each (input, output) pair the set of timestamp transformations (path summaries) that the operator may apply. These are logged once during construction.

- **`"timely/reachability/<T>"`** — reachability tracker events, recording how pointstamps propagate through the dataflow graph.

## Registering a Logger

To consume logging events, register a callback with the worker's log registry before building the dataflow:

```rust
worker.log_register()
.insert::<TimelyEventBuilder, _>("timely", |time, data| {
if let Some(data) = data {
for (elapsed, event) in data.iter() {
println!("{elapsed:?}\t{event:?}");
}
}
});
```

You can also use `BatchLogger` to forward events into a timely capture stream for downstream processing.
2 changes: 2 additions & 0 deletions timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ workspace = true
[features]
default = ["getopts"]
getopts = ["dep:getopts", "timely_communication/getopts"]
visualizer = ["dep:tungstenite"]

[dependencies]
columnar = { workspace = true }
Expand All @@ -35,3 +36,4 @@ timely_logging = { path = "../logging", version = "0.27" }
timely_communication = { path = "../communication", version = "0.27", default-features = false }
timely_container = { path = "../container", version = "0.27" }
smallvec = { version = "1.15.1", features = ["serde", "const_generics"] }
tungstenite = { version = "0.26", optional = true }
77 changes: 77 additions & 0 deletions timely/examples/logging-live.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//! Streams timely events over WebSocket for live visualization.
//!
//! Usage:
//! cargo run --example logging-live --features visualizer [-- timely args]
//!
//! Then open `visualizer/index.html` in a browser and connect to `ws://localhost:51371`.

use timely::dataflow::operators::{Input, Exchange, Enter, Leave, Inspect, Probe, Feedback, ConnectLoop, Concat};
use timely::dataflow::operators::vec::{Map, Filter};
use timely::dataflow::{InputHandle, Scope};
use timely::visualizer::Server;

Check failure on line 11 in timely/examples/logging-live.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

unresolved import `timely::visualizer`

Check failure on line 11 in timely/examples/logging-live.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.86

unresolved import `timely::visualizer`

Check failure on line 11 in timely/examples/logging-live.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unresolved import `timely::visualizer`

fn main() {
let port = std::env::var("TIMELY_VIS_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(51371);

let server = Server::start(port);

timely::execute_from_args(std::env::args(), move |worker| {

server.register(worker);

// A richer dataflow to exercise the visualizer:
//
// Input -> Map -> Exchange -> Filter ----> Region[ Inspect ] -> Probe
// \ ^
// +-> Feedback ----+
//
// This gives us: multiple pipeline stages, an exchange, branching,
// a nested scope (region), a feedback loop with a back-edge, and
// real data flowing through channels.
let mut input = InputHandle::new();
let mut probe = timely::dataflow::ProbeHandle::new();
worker.dataflow(|scope| {
let (handle, cycle) = scope.feedback::<Vec<u64>>(1);

let mapped = scope
.input_from(&mut input)
.container::<Vec<_>>()
.map(|x: u64| x.wrapping_mul(17).wrapping_add(3))
.exchange(|&x: &u64| x);

let filtered = mapped.filter(|&x: &u64| x % 2 == 0);

let looped = filtered.clone().concat(cycle);

scope.region(|inner| {
looped
.enter(inner)
.inspect(|_x| { })
.leave()
})
.probe_with(&mut probe);

// Feed back values that haven't reached zero yet.
filtered
.map(|x: u64| x / 4)
.filter(|&x: &u64| x > 0)
.connect_loop(handle);
});

// Continuously feed data so events keep flowing.
let mut round = 0u64;
loop {
for i in 0..100u64 {
input.send(round * 100 + i);
}
round += 1;
input.advance_to(round);
while probe.less_than(input.time()) {
worker.step();
}
}
}).unwrap();
}
3 changes: 3 additions & 0 deletions timely/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ pub mod order;
pub mod logging;
// pub mod log_events;

#[cfg(feature = "visualizer")]
pub mod visualizer;

pub mod scheduling;

/// A composite trait for types usable as containers in timely dataflow.
Expand Down
Loading
Loading