Fuse pipeline-connected operator chains in SubgraphBuilder::build()#761
Draft
antiguru wants to merge 8 commits intoTimelyDataflow:masterfrom
Draft
Fuse pipeline-connected operator chains in SubgraphBuilder::build()#761antiguru wants to merge 8 commits intoTimelyDataflow:masterfrom
antiguru wants to merge 8 commits intoTimelyDataflow:masterfrom
Conversation
During subgraph construction, detect maximal chains of pipeline-connected operators (single input/output, local, notify=false, no fan-in/fan-out) and replace each chain with a single ChainScheduler that schedules members sequentially. Intermediate pointstamps are hidden from the reachability tracker, reducing progress tracking overhead. Configurable via WorkerConfig::min_chain_length (default: 2, 0 disables). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Lift the notify=false restriction from chain fusion so operators like inspect, probe, and unary_notify can be fused into chains. Frontier changes are propagated to notify=true members via sparse notify_frontiers (only allocated for members that need notification), avoiding O(N²) overhead for chains where few members observe frontiers. Tombstoned children are now added to the reachability builder with (0, 0) inputs/outputs to preserve index positions. The event_driven example gains a rounds parameter for benchmarking. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Revert V2 notify=true fusion which had a correctness bug: frontier propagation within fused chains did not correctly track capabilities for notify=true operators, causing iterative computations (e.g., differential-dataflow's reduce) to fail to converge. Add identity summary check to exclude operators with non-trivial internal summaries (e.g., feedback operators with Product(0, 1)) from chains, even though they otherwise meet all fusion criteria. Validated against differential-dataflow's full test suite. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Report ALL chain members' initial +peers capabilities to the chain's SharedProgress, not just the last member's. Each member independently drops its capability during execution, so all N members' initial caps must be visible to the reachability tracker. Remove dead notify=true fusion scaffolding (notify_frontiers, frontier propagation in schedule()) since detect_chains excludes notify=true operators. See commit message of 214e57f for rationale. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wire fuse_chain_length into install_options/from_matches so it can be set from the command line via --fuse-chain-length N. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The `local` field on operators indicates whether progress information is pre-circulated, not whether data flows through thread-local channels. All regular operators return `local = true` regardless of their pact, so the previous check didn't actually verify pipeline connectivity. Add `is_pipeline()` to `ParallelizationContract` (default: false, overridden to true for `Pipeline`), track it in `OperatorShape`, and expose it through `Operate::pipeline()`. Chain detection checks the target operator's `pipeline` in each link; the head's input pact is irrelevant since it receives data from outside the chain. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the linear-chain-only fusion (1-input/1-output operators) with general DAG subgraph fusion that handles arbitrary topologies: diamonds, fan-in (concat), fan-out (branch), and mixed patterns. Detection uses union-find over fusible edges instead of forward/reverse chain walking. Fusibility constraints are unchanged per-operator (\!notify, identity summaries, has operator) but the 1-input/1-output restriction is lifted. GroupScheduler replaces ChainScheduler, using topological sort for member execution order and port maps for the group's external interface. Activation forwarding (forward_to on tombstoned members) ensures pipeline channel activations reach the group representative even when data arrives at a non-representative member through feedback edges. The reachability computation uses a single reverse-topological pass over the group's internal DAG, replacing the previous per-node BFS that was O(n^2) in group size. Adds event_driven_diamond benchmark and DAG fusion tests (diamond, multi-input merge, branch, repeated diamonds, Collatz mutual recursion). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add chapter 5.4 explaining how operator fusion works: fusibility constraints, group detection, scheduling, capability mapping, correctness argument for progress tracking, and configuration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
583c570 to
fb034af
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR detects groups of pipeline-connected operators during
SubgraphBuilder::build()and fuses them into single logical operators, hiding intermediate nodes from the reachability tracker.Group detection
Operators are fusible if they satisfy:
!notify, identity internal summaries on all (input, output) pairs,operator.is_some(), not child 0.Edges are fusible if both endpoints are fusible and the target uses pipeline pact.
Union-find groups operators connected by fusible edges into components of at least
fuse_chain_lengthmembers (default 2, configurable viaWorkerConfig::fuse_chain_lengthor--fuse-chain-length N).Unlike the earlier chain-only approach, there is no restriction on fan-in, fan-out, or port count.
Diamonds (branch + merge), concat (multi-input), and mixed DAG topologies all fuse.
Fusion mechanics
GroupSchedulerreplacesChainScheduler:input_map,output_map) translate between the group's external ports and member ports.capability_map[member][output_port] -> Vec<group_output_index>) computed via a single reverse-topological reachability pass routes internal capability changes to the correct group outputs.forward_to = Some(representative)so pipeline channel activations are forwarded to the group representative during scheduling.Correctness
Data flows through existing pipeline channels (unchanged by fusion).
Scheduling in topological order ensures producers run before consumers.
Consumeds/produceds are taken from the group's boundary members via the port maps.
Internal capabilities from all members are aggregated to group outputs via the capability map — valid because all summaries are identity, so timestamps are unchanged along any internal path.
The scheduling loop uses a
HashSetfor dedup (replacing thepreviouscounter) to support activation forwarding to lower-indexed representatives.Performance
Benchmark:
event_driven_diamond 10 500 record 1000(10 dataflows, 500 diamonds with 4 operators each, 1000 rounds with data):Frontier-only benchmark:
event_driven_diamond 10 10000 norecord 1000:Profiling shows the remaining build-time cost is memory allocation (~27%), with
fuse_groupitself at 3.3%.Runtime is dominated by actual operator scheduling and data movement.
Validated against differential-dataflow's full test suite (54 tests, 0 failures).
In differential BFS, fusion merges groups like
[Enter, Concatenate, Negate, AsCollection, Concatenate, ResultsIn]— operators that individually are cheap but collectively create scheduling overhead.