Skip to content

Fuse pipeline-connected operator chains in SubgraphBuilder::build()#761

Draft
antiguru wants to merge 8 commits intoTimelyDataflow:masterfrom
antiguru:pipeline-chain-fusion
Draft

Fuse pipeline-connected operator chains in SubgraphBuilder::build()#761
antiguru wants to merge 8 commits intoTimelyDataflow:masterfrom
antiguru:pipeline-chain-fusion

Conversation

@antiguru
Copy link
Member

@antiguru antiguru commented Mar 12, 2026

This PR detects groups of pipeline-connected operators during SubgraphBuilder::build() and fuses them into single logical operators, hiding intermediate nodes from the reachability tracker.

Group detection

Operators are fusible if they satisfy: !notify, identity internal summaries on all (input, output) pairs, operator.is_some(), not child 0.
Edges are fusible if both endpoints are fusible and the target uses pipeline pact.
Union-find groups operators connected by fusible edges into components of at least fuse_chain_length members (default 2, configurable via WorkerConfig::fuse_chain_length or --fuse-chain-length N).

Unlike the earlier chain-only approach, there is no restriction on fan-in, fan-out, or port count.
Diamonds (branch + merge), concat (multi-input), and mixed DAG topologies all fuse.

Fusion mechanics

GroupScheduler replaces ChainScheduler:

  • Members are executed in topological order (Kahn's algorithm on internal edges).
  • Port maps (input_map, output_map) translate between the group's external ports and member ports.
  • A capability map (capability_map[member][output_port] -> Vec<group_output_index>) computed via a single reverse-topological reachability pass routes internal capability changes to the correct group outputs.
  • Tombstoned members get forward_to = Some(representative) so pipeline channel activations are forwarded to the group representative during scheduling.

Correctness

Data flows through existing pipeline channels (unchanged by fusion).
Scheduling in topological order ensures producers run before consumers.
Consumeds/produceds are taken from the group's boundary members via the port maps.
Internal capabilities from all members are aggregated to group outputs via the capability map — valid because all summaries are identity, so timestamps are unchanged along any internal path.

The scheduling loop uses a HashSet for dedup (replacing the previous counter) to support activation forwarding to lower-indexed representatives.

Performance

Benchmark: event_driven_diamond 10 500 record 1000 (10 dataflows, 500 diamonds with 4 operators each, 1000 rounds with data):

Build Total
master (unfused) N/A timeout (no fusion, 20K operators per dataflow)
this PR 121ms 2.7s

Frontier-only benchmark: event_driven_diamond 10 10000 norecord 1000:

Build Total
this PR 2.7s 2.9s

Profiling shows the remaining build-time cost is memory allocation (~27%), with fuse_group itself at 3.3%.
Runtime is dominated by actual operator scheduling and data movement.

Validated against differential-dataflow's full test suite (54 tests, 0 failures).
In differential BFS, fusion merges groups like [Enter, Concatenate, Negate, AsCollection, Concatenate, ResultsIn] — operators that individually are cheap but collectively create scheduling overhead.

antiguru and others added 8 commits March 13, 2026 21:01
During subgraph construction, detect maximal chains of pipeline-connected
operators (single input/output, local, notify=false, no fan-in/fan-out)
and replace each chain with a single ChainScheduler that schedules
members sequentially. Intermediate pointstamps are hidden from the
reachability tracker, reducing progress tracking overhead.

Configurable via WorkerConfig::min_chain_length (default: 2, 0 disables).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Lift the notify=false restriction from chain fusion so operators like
inspect, probe, and unary_notify can be fused into chains. Frontier
changes are propagated to notify=true members via sparse
notify_frontiers (only allocated for members that need notification),
avoiding O(N²) overhead for chains where few members observe frontiers.

Tombstoned children are now added to the reachability builder with
(0, 0) inputs/outputs to preserve index positions.

The event_driven example gains a rounds parameter for benchmarking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Revert V2 notify=true fusion which had a correctness bug: frontier
propagation within fused chains did not correctly track capabilities
for notify=true operators, causing iterative computations (e.g.,
differential-dataflow's reduce) to fail to converge.

Add identity summary check to exclude operators with non-trivial
internal summaries (e.g., feedback operators with Product(0, 1))
from chains, even though they otherwise meet all fusion criteria.

Validated against differential-dataflow's full test suite.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Report ALL chain members' initial +peers capabilities to the chain's
SharedProgress, not just the last member's. Each member independently
drops its capability during execution, so all N members' initial caps
must be visible to the reachability tracker.

Remove dead notify=true fusion scaffolding (notify_frontiers, frontier
propagation in schedule()) since detect_chains excludes notify=true
operators. See commit message of 214e57f for rationale.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wire fuse_chain_length into install_options/from_matches so it can be
set from the command line via --fuse-chain-length N.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The `local` field on operators indicates whether progress information
is pre-circulated, not whether data flows through thread-local channels.
All regular operators return `local = true` regardless of their pact,
so the previous check didn't actually verify pipeline connectivity.

Add `is_pipeline()` to `ParallelizationContract` (default: false,
overridden to true for `Pipeline`), track it in `OperatorShape`, and
expose it through `Operate::pipeline()`. Chain detection checks the
target operator's `pipeline` in each link; the head's input pact is
irrelevant since it receives data from outside the chain.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the linear-chain-only fusion (1-input/1-output operators) with
general DAG subgraph fusion that handles arbitrary topologies: diamonds,
fan-in (concat), fan-out (branch), and mixed patterns.

Detection uses union-find over fusible edges instead of forward/reverse
chain walking. Fusibility constraints are unchanged per-operator
(\!notify, identity summaries, has operator) but the 1-input/1-output
restriction is lifted. GroupScheduler replaces ChainScheduler, using
topological sort for member execution order and port maps for the
group's external interface.

Activation forwarding (forward_to on tombstoned members) ensures
pipeline channel activations reach the group representative even when
data arrives at a non-representative member through feedback edges.

The reachability computation uses a single reverse-topological pass
over the group's internal DAG, replacing the previous per-node BFS
that was O(n^2) in group size.

Adds event_driven_diamond benchmark and DAG fusion tests (diamond,
multi-input merge, branch, repeated diamonds, Collatz mutual recursion).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add chapter 5.4 explaining how operator fusion works: fusibility
constraints, group detection, scheduling, capability mapping,
correctness argument for progress tracking, and configuration.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@antiguru antiguru force-pushed the pipeline-chain-fusion branch from 583c570 to fb034af Compare March 13, 2026 20:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant