Introduce graph IR layer to eliminate tombstoned operators#764
Draft
antiguru wants to merge 9 commits intoTimelyDataflow:masterfrom
Draft
Introduce graph IR layer to eliminate tombstoned operators#764antiguru wants to merge 9 commits intoTimelyDataflow:masterfrom
antiguru wants to merge 9 commits intoTimelyDataflow:masterfrom
Conversation
During subgraph construction, detect maximal chains of pipeline-connected operators (single input/output, local, notify=false, no fan-in/fan-out) and replace each chain with a single ChainScheduler that schedules members sequentially. Intermediate pointstamps are hidden from the reachability tracker, reducing progress tracking overhead. Configurable via WorkerConfig::min_chain_length (default: 2, 0 disables). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Lift the notify=false restriction from chain fusion so operators like inspect, probe, and unary_notify can be fused into chains. Frontier changes are propagated to notify=true members via sparse notify_frontiers (only allocated for members that need notification), avoiding O(N²) overhead for chains where few members observe frontiers. Tombstoned children are now added to the reachability builder with (0, 0) inputs/outputs to preserve index positions. The event_driven example gains a rounds parameter for benchmarking. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Revert V2 notify=true fusion which had a correctness bug: frontier propagation within fused chains did not correctly track capabilities for notify=true operators, causing iterative computations (e.g., differential-dataflow's reduce) to fail to converge. Add identity summary check to exclude operators with non-trivial internal summaries (e.g., feedback operators with Product(0, 1)) from chains, even though they otherwise meet all fusion criteria. Validated against differential-dataflow's full test suite. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Report ALL chain members' initial +peers capabilities to the chain's SharedProgress, not just the last member's. Each member independently drops its capability during execution, so all N members' initial caps must be visible to the reachability tracker. Remove dead notify=true fusion scaffolding (notify_frontiers, frontier propagation in schedule()) since detect_chains excludes notify=true operators. See commit message of 214e57f for rationale. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Wire fuse_chain_length into install_options/from_matches so it can be set from the command line via --fuse-chain-length N. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The `local` field on operators indicates whether progress information is pre-circulated, not whether data flows through thread-local channels. All regular operators return `local = true` regardless of their pact, so the previous check didn't actually verify pipeline connectivity. Add `is_pipeline()` to `ParallelizationContract` (default: false, overridden to true for `Pipeline`), track it in `OperatorShape`, and expose it through `Operate::pipeline()`. Chain detection checks the target operator's `pipeline` in each link; the head's input pact is irrelevant since it receives data from outside the chain. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the linear-chain-only fusion (1-input/1-output operators) with general DAG subgraph fusion that handles arbitrary topologies: diamonds, fan-in (concat), fan-out (branch), and mixed patterns. Detection uses union-find over fusible edges instead of forward/reverse chain walking. Fusibility constraints are unchanged per-operator (\!notify, identity summaries, has operator) but the 1-input/1-output restriction is lifted. GroupScheduler replaces ChainScheduler, using topological sort for member execution order and port maps for the group's external interface. Activation forwarding (forward_to on tombstoned members) ensures pipeline channel activations reach the group representative even when data arrives at a non-representative member through feedback edges. The reachability computation uses a single reverse-topological pass over the group's internal DAG, replacing the previous per-node BFS that was O(n^2) in group size. Adds event_driven_diamond benchmark and DAG fusion tests (diamond, multi-input merge, branch, repeated diamonds, Collatz mutual recursion). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add chapter 5.4 explaining how operator fusion works: fusibility constraints, group detection, scheduling, capability mapping, correctness argument for progress tracking, and configuration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract fusion logic from SubgraphBuilder::build() into a new GraphIR intermediate representation in graph_ir.rs. The IR uses a BTreeMap for nodes, allowing fused-away operators to be removed entirely rather than tombstoned. Lowering produces a dense PerOperatorState array with fresh sequential indices and an old-to-new mapping for activation forwarding. This removes the forward_to field from PerOperatorState, the HashSet-based dedup in the scheduling loop (replaced by simple monotonic-index dedup), and all tombstone special-casing in incomplete tracking. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
d5bf6c7 to
10e6a97
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Introduces a
GraphIRintermediate representation between graph construction and the final operator array, eliminating tombstoned operators entirely.progress/graph_ir.rswithGraphIR,NodeIR, fusion pass, andlower()to produce dense indicesHashSet, noforward_to)PerOperatorStateor progress tracking at allSubgraphgainsactivation_index_mapto translate old operator indices to new dense indices during loweringAlternative to the graph pass API approach in #TODO.
🤖 Generated with Claude Code