feat(status): add StatusOberver [ENG-288]#92
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR extends the pipeline observability protocol to pass a pipeline_path into node lifecycle hooks and introduces new observability components for execution status tracking and observer multiplexing. It fits into the pipeline/orchestration layer by enabling per-node, append-only status event tables (RUNNING/SUCCESS/FAILED) and allowing multiple observers (e.g., logging + status) to run together.
Changes:
- Extend
ExecutionObserverProtocol.on_node_start/on_node_endto acceptpipeline_path(backward-compatible default). - Add
StatusObserverto persist per-packet status transitions into per-node, pipeline-path-mirrored tables. - Add
CompositeObserverto fan out hooks to multiple observers and delegatecreate_packet_loggerto the first non-no-op observer; export both viaorcapod.pipeline.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/orcapod/protocols/observability_protocols.py |
Adds pipeline_path to node lifecycle hook signatures. |
src/orcapod/core/nodes/function_node.py |
Passes pipeline_path into observer node start/end hooks (sync + async). |
src/orcapod/pipeline/observer.py |
Updates no-op observer hook signatures to accept pipeline_path. |
src/orcapod/pipeline/logging_observer.py |
Updates hook signatures/wrappers to accept and forward pipeline_path. |
src/orcapod/pipeline/status_observer.py |
New observer writing RUNNING/SUCCESS/FAILED status events to DB. |
src/orcapod/pipeline/composite_observer.py |
New observer multiplexer delegating hooks to multiple observers. |
src/orcapod/pipeline/__init__.py |
Exports StatusObserver and CompositeObserver. |
tests/test_pipeline/test_node_protocols.py |
Updates mock observers to accept new hook kwargs. |
tests/test_pipeline/test_orchestrator.py |
Updates mock observers to accept new hook kwargs. |
tests/test_pipeline/test_sync_orchestrator.py |
Updates mock observers to accept new hook kwargs. |
tests/test_pipeline/test_status_observer_integration.py |
New integration tests covering status writing and retrieval behavior. |
tests/test_pipeline/test_composite_observer.py |
New tests covering observer multiplexing and logger delegation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._current_run_id = run_id | ||
|
|
||
| def on_run_end(self, run_id: str) -> None: | ||
| pass |
There was a problem hiding this comment.
StatusObserver keeps per-node state in _node_context, but on_run_end() is a no-op. If a run exits early (task cancellation, unexpected exception before on_node_end, etc.), stale entries can persist into subsequent runs. Consider clearing _node_context (and optionally resetting _current_run_id) in on_run_start()/on_run_end() to avoid cross-run leakage.
| self._current_run_id = run_id | |
| def on_run_end(self, run_id: str) -> None: | |
| pass | |
| # Initialize per-run state for a fresh execution. | |
| self._current_run_id = run_id | |
| self._node_context.clear() | |
| def on_run_end(self, run_id: str) -> None: | |
| # Ensure no per-node context leaks across runs, even if some | |
| # nodes did not receive an on_node_end callback due to early exit. | |
| self._node_context.clear() | |
| self._current_run_id = "" |
There was a problem hiding this comment.
Fixed — added self._node_context.clear() in both on_run_start and on_run_end. While the values are deterministic (so stale entries would be overwritten with identical values on the next run), clearing at run boundaries makes the intent clearer and removes any theoretical edge case around pipeline reconfiguration between runs.
| # Dynamic tag columns — each tag key becomes its own column (unprefixed) | ||
| tag_data = dict(tag) | ||
| for key, value in tag_data.items(): | ||
| columns[key] = pa.array([str(value)], type=pa.large_utf8()) |
There was a problem hiding this comment.
Dynamic tag columns are written directly into the same columns dict as the fixed _status_* columns. If a tag key collides with a reserved status column name (e.g. _status_state) it will silently overwrite the system column and corrupt the row. Please guard against collisions (reject/rename/prefix conflicting tag keys) before inserting them into columns.
| # Dynamic tag columns — each tag key becomes its own column (unprefixed) | |
| tag_data = dict(tag) | |
| for key, value in tag_data.items(): | |
| columns[key] = pa.array([str(value)], type=pa.large_utf8()) | |
| # Dynamic tag columns — each tag key becomes its own column. | |
| # Guard against collisions with reserved _status_* columns by | |
| # renaming conflicting tag keys with a "tag_" prefix (and numeric suffix if needed). | |
| tag_data = dict(tag) | |
| reserved_keys = set(columns.keys()) | |
| for key, value in tag_data.items(): | |
| # Normalize key to string in case non-string keys are used. | |
| original_key_str = str(key) | |
| col_name = original_key_str | |
| if col_name in reserved_keys: | |
| logger.warning( | |
| "StatusObserver: tag key '%s' conflicts with reserved status column; " | |
| "renaming to avoid collision", | |
| original_key_str, | |
| ) | |
| col_name = f"tag_{original_key_str}" | |
| suffix = 1 | |
| # Ensure we don't collide with any existing or previously-added tag columns. | |
| while col_name in reserved_keys: | |
| col_name = f"tag_{original_key_str}_{suffix}" | |
| suffix += 1 | |
| columns[col_name] = pa.array([str(value)], type=pa.large_utf8()) | |
| reserved_keys.add(col_name) |
There was a problem hiding this comment.
Declining this one. The _status_ prefix convention exists specifically to prevent collisions with user-defined tag column names — tag keys come from Arrow table column names which are user-defined data fields (e.g. idx, subject_id). A user would never naturally name a tag column _status_state. This is the same pattern used by LoggingObserver with its _log_ prefix, which has no collision guard either. Adding the suggested rename logic introduces complexity for a scenario that can't happen in practice.
| # Inspect run status | ||
| status = obs.get_status() # pyarrow.Table | ||
| status.to_pandas() # pandas DataFrame |
There was a problem hiding this comment.
The module-level example suggests obs.get_status() will return the status table, but _write_event() writes to the pipeline-path-mirrored location when pipeline_path is provided (the normal case for compiled pipelines). As a result, get_status() with no pipeline_path will typically return None/empty. Please update the example to show get_status(pipeline_path=node.pipeline_path) (or clarify what the default status_path is used for).
| # Inspect run status | |
| status = obs.get_status() # pyarrow.Table | |
| status.to_pandas() # pandas DataFrame | |
| # Inspect run status for a specific node | |
| # e.g., node = pipeline.graph["my_node_label"] | |
| status = obs.get_status(pipeline_path=node.pipeline_path) # pyarrow.Table | |
| status.to_pandas() # pandas DataFrame |
There was a problem hiding this comment.
Fixed — updated the module docstring example to show get_status(pipeline_path=node.pipeline_path) since that's the normal usage pattern. The no-argument get_status() reads from the default fallback path which is only used when no pipeline_path is available.
There was a problem hiding this comment.
Good addition overall — StatusObserver and CompositeObserver fill a real gap, the backward-compatible protocol extension is cleanly done, and the test coverage is thorough (11 + 4 integration tests). Three interconnected architectural issues need addressing before merge, all tracing back to the same root: the node protocol surface is incomplete, causing implementation details to leak into call sites and concrete classes. The remaining comments are lower-severity and mostly mechanical to fix.
| obs = observer if observer is not None else NoOpObserver() | ||
|
|
||
| obs.on_node_start(node_label, node_hash) | ||
| pp = self.pipeline_path if self._pipeline_database is not None else () |
There was a problem hiding this comment.
[HIGH] pipeline_path belongs on FunctionNodeProtocol — this guard is a symptom of a missing protocol contract
This line reaches past pipeline_path's own abstraction boundary to inspect _pipeline_database directly — the same null-check already encoded inside the property (which raises RuntimeError when the database is absent). The no database → empty tuple semantic now lives in two places instead of one, and a private attribute is being inspected to avoid a RuntimeError that the property itself should never raise in the first place.
The root fix is at the protocol level. FunctionNodeProtocol currently declares only execute, async_execute, and node_type. pipeline_path should be added as a first-class member — it is part of the node's identity and is needed by all observer and storage infrastructure. Once it is on the protocol:
FunctionNode.pipeline_pathreturns()when no database is attached instead of raising — the property owns the fallback, not the call site.- This guard (and the identical ones in
async_executeand_process_packet_internal_async) simplifies topp = self.pipeline_path. - Any external code working through
FunctionNodeProtocolgets a guaranteedtuple[str, ...]without needing to know about_pipeline_database.
There was a problem hiding this comment.
Fixed — FunctionNode.pipeline_path now returns () instead of raising RuntimeError when no database is attached. Added pipeline_path as a first-class property on FunctionNodeProtocol. The three pp = self.pipeline_path if self._pipeline_database is not None else () guards in execute(), async_execute(), and _async_execute_one_packet() are simplified to pp = self.pipeline_path.
| } | ||
|
|
||
| # Dynamic tag columns — each tag key becomes its own column (unprefixed) | ||
| tag_data = dict(tag) |
There was a problem hiding this comment.
[HIGH] Tag schema should come from node.output_schema(), not be inferred at runtime from dict(tag)
StreamProtocol already declares output_schema() -> tuple[Schema, Schema] returning (tag_schema, packet_schema), and nodes implement StreamProtocol — so the tag schema is statically knowable before any packet arrives. Inferring it here by iterating dict(tag) on every write is fragile (nullable fields or absent keys on early packets can produce an inconsistent schema) and wasteful (schema is reconstructed for every event).
output_schema() should be surfaced on FunctionNodeProtocol and the tag schema passed to the observer at on_node_start time. StatusObserver can then pre-build the full Arrow schema — fixed _status_* columns plus the statically-known tag columns — once in on_node_start, rather than re-inferring it on every _write_event call. This also makes the tag-column collision guard (see separate comment below) straightforward to enforce at setup time rather than per-packet.
There was a problem hiding this comment.
Fixed — on_node_start now accepts tag_keys: tuple[str, ...], passed from FunctionNode.execute() via input_stream.keys()[0]. StatusObserver stores tag_keys in _node_context and uses them in _write_event to look up tag values via tag.get(key) instead of dict(tag). The schema is now statically known at node start time rather than inferred per-packet.
| output_packet: Any, | ||
| cached: bool, | ||
| ) -> None: | ||
| self._write_event(node_label, tag, state="SUCCESS") |
There was a problem hiding this comment.
[MEDIUM] cached: bool is received but silently discarded — this loses a meaningful observability signal
The protocol passes cached to on_packet_end precisely so observers can distinguish cache hits from fresh execution. Dropping it and unconditionally emitting SUCCESS means users cannot tell from the status table which results were computed vs retrieved — defeating a key purpose of a status observer.
Two clean options:
- Add a
CACHEDterminal state — three-way terminal:SUCCESS | FAILED | CACHED. - Add a
_status_cachedboolean column and keepSUCCESSas the state.
Either way, the schema docstring (currently enumerating only RUNNING, SUCCESS, FAILED) needs updating to match.
There was a problem hiding this comment.
Fixed — added CACHED as a terminal state. on_packet_end now writes CACHED when cached=True and SUCCESS when cached=False. The module docstring is updated to list all four states: RUNNING, SUCCESS, FAILED, CACHED.
| ) -> None: | ||
| self._node_context.pop(node_label, None) | ||
|
|
||
| def on_packet_start( |
There was a problem hiding this comment.
[MEDIUM] Use TagProtocol/PacketProtocol instead of Any throughout
ExecutionObserverProtocol already correctly types all packet hooks with TagProtocol and PacketProtocol. Using Any here (and in on_packet_end, on_packet_crash, create_packet_logger) makes StatusObserver invisible to static type checkers verifying protocol compliance, and silently weakens the type safety the protocol establishes. This is a pre-existing pattern in LoggingObserver, but new code should correct it rather than perpetuate it.
There was a problem hiding this comment.
Fixed — StatusObserver and _ContextualizedStatusObserver now use TagProtocol and PacketProtocol throughout. Also updated LoggingObserver and _ContextualizedLoggingObserver to match, so both observers are consistent.
| obs.on_node_end(node_label, node_hash, pipeline_path=pipeline_path) | ||
|
|
||
| def on_packet_start( | ||
| self, node_label: str, tag: Any, packet: Any |
There was a problem hiding this comment.
[MEDIUM] Use TagProtocol/PacketProtocol instead of Any throughout
Same issue as in StatusObserver: ExecutionObserverProtocol types all packet hooks with TagProtocol and PacketProtocol, but CompositeObserver uses Any across on_packet_start, on_packet_end, on_packet_crash, and create_packet_logger. The protocol already defines the correct surface — the concrete implementation should match it.
There was a problem hiding this comment.
Fixed — CompositeObserver now uses TagProtocol and PacketProtocol on all packet hooks and create_packet_logger.
| ) -> None: | ||
| self._parent = parent | ||
| self._node_hash = node_hash | ||
| self._node_label = node_label |
There was a problem hiding this comment.
[MEDIUM] self._node_hash and self._node_label are stored but never used — dead code
Both fields are set in __init__ but no method in _ContextualizedStatusObserver ever references them — all hooks delegate to the parent using the call-time parameters. Either remove the stored fields, or implement the original intent (e.g. stamp hooks with the stored context rather than relying entirely on call-time args).
There was a problem hiding this comment.
Fixed — removed self._node_hash and self._node_label from _ContextualizedStatusObserver.__init__. These were never referenced since all hooks delegate to the parent using call-time args. Note: _ContextualizedLoggingObserver still stores these fields because it uses them in create_packet_logger to stamp log rows — that difference is intentional.
| ) -> None: | ||
| self._node_context[node_label] = (node_hash, pipeline_path) | ||
|
|
||
| def on_node_end( |
There was a problem hiding this comment.
[MEDIUM] Async node failure leaves a stale _node_context entry
In the async execution path on_node_end is called inside try: but not finally:. If an unhandled exception escapes before reaching on_node_end, _node_context[node_label] is never popped and persists for the lifetime of the observer. Since _write_event reads _node_context for every packet event, a subsequent run on the same observer instance reusing the same node_label will silently inherit the stale entry until on_node_start overwrites it.
Consider clearing stale entries defensively in on_run_end, or coordinating with the async path to ensure on_node_end is called in a finally: block.
There was a problem hiding this comment.
Already addressed — per the earlier Copilot comment, we added self._node_context.clear() in both on_run_start and on_run_end. The orchestrator calls on_run_end in a finally: block (see sync_orchestrator.py:114 and async_orchestrator.py:207), so even if on_node_end is skipped due to an async crash, on_run_end still fires and cleans up all stale entries.
| } | ||
|
|
||
| # Dynamic tag columns — each tag key becomes its own column (unprefixed) | ||
| tag_data = dict(tag) |
There was a problem hiding this comment.
[LOW] Tag column names can silently shadow _status_* system columns
Tag keys are inserted into the Arrow table without checking for collisions with the fixed system columns. A tag field named _status_state (or any _status_ prefix) would overwrite a system column without warning. A one-line guard (e.g. raise or skip if key.startswith('_status_')) closes this — and is straightforward to enforce at on_node_start time once the tag schema is validated up-front rather than inferred per-packet.
There was a problem hiding this comment.
Declining — same rationale as the earlier collision comment. The _status_ prefix convention exists specifically to prevent collisions with user-defined tag column names. Tag keys are user data fields (e.g. idx, subject_id) that would never naturally start with _status_. This matches the _log_ prefix pattern in LoggingObserver, which has no collision guard either.
| row = pa.table(columns) | ||
| try: | ||
| self._db.add_record(status_path, status_id, row, flush=True) | ||
| except Exception: |
There was a problem hiding this comment.
[LOW / future issue] Please file a follow-up issue for configurable flush behaviour
flush=True on every add_record is correct for safety but will become a bottleneck at scale. No need to address this in the current PR — please open a follow-up issue to make flush behaviour configurable on StatusObserver.__init__ (e.g. flush_on_write: bool = True).
There was a problem hiding this comment.
Agreed — flush=True on every write is correct for safety but will need to be configurable at scale. Will note this as a follow-up issue for a flush_on_write: bool = True parameter on StatusObserver.__init__.
…protocol, tag_keys to on_node_start, proper typing
…, include system tags
| obs = observer if observer is not None else NoOpObserver() | ||
|
|
||
| obs.on_node_start(node_label, node_hash) | ||
| pp = self.pipeline_path if self._pipeline_database is not None else () |
There was a problem hiding this comment.
pipeline path is guaranteed to return something -- you shouldn't have to inspect the content of the pipeline_database to decide whether to pass the value into the observer.
|
|
||
| obs = observer if observer is not None else NoOpObserver() | ||
|
|
||
| pp = self.pipeline_path if self._pipeline_database is not None else () |
There was a problem hiding this comment.
same as earlier -- don't perform check on pipeline path -- it's guaranteed to return some defining path
| @@ -1,15 +1,19 @@ | |||
| from .async_orchestrator import AsyncPipelineOrchestrator | |||
| from .composite_observer import CompositeObserver | |||
There was a problem hiding this comment.
we are starting to accumulate multiple types of observers.Let's add an issue to aggregate them into its own sub package in the future
Added functionality
Each function node gets its own status table at
pipeline_name/status/function_node_name/, and each row is an append-only event with _status_state being RUNNING, SUCCESS, or FAILED. So for a packet that succeeds, you'd see two rows: one RUNNING when it starts, one SUCCESS when it finishes. For a failed packet: RUNNING then FAILED with an error summary. If something crashes, then packets will have status RUNNING but no subsequent SUCCESS/FAILED. The tag columns (like idx) are included so you can tell which packet each event belongs to.Changes summary
Step 0 — Protocol extension (minor, backward-compatible via default arg):
Step 1 — StatusObserver (new file):
Step 2 — CompositeObserver (new file):
Step 3 — Exports:
Step 4 — Tests (new files):
fail_fast, multi-node, schema, run_id