Skip to content

feat(status): add StatusOberver [ENG-288]#92

Merged
eywalker merged 4 commits intonauticalab:devfrom
brian-arnold:status-observer
Mar 21, 2026
Merged

feat(status): add StatusOberver [ENG-288]#92
eywalker merged 4 commits intonauticalab:devfrom
brian-arnold:status-observer

Conversation

@brian-arnold
Copy link
Collaborator

@brian-arnold brian-arnold commented Mar 21, 2026

Added functionality

Each function node gets its own status table at pipeline_name/status/function_node_name/, and each row is an append-only event with _status_state being RUNNING, SUCCESS, or FAILED. So for a packet that succeeds, you'd see two rows: one RUNNING when it starts, one SUCCESS when it finishes. For a failed packet: RUNNING then FAILED with an error summary. If something crashes, then packets will have status RUNNING but no subsequent SUCCESS/FAILED. The tag columns (like idx) are included so you can tell which packet each event belongs to.

Changes summary

Step 0 — Protocol extension (minor, backward-compatible via default arg):

  • protocols/observability_protocols.py — added pipeline_path: tuple[str, ...] = () to on_node_start/on_node_end
  • core/nodes/function_node.py — passes pipeline_path=pp in sync and async execute paths
  • pipeline/logging_observer.py, pipeline/observer.py — updated signatures to accept new param
  • tests/test_pipeline/test_node_protocols.py, test_orchestrator.py, test_sync_orchestrator.py — updated mock observers with **kwargs

Step 1 — StatusObserver (new file):

  • pipeline/status_observer.py — writes RUNNING/SUCCESS/FAILED events from observer hooks, per-node Delta Lake tables at mirrored paths, append-only

Step 2 — CompositeObserver (new file):

  • pipeline/composite_observer.py — multiplexes hooks to N child observers, delegates create_packet_logger to the first non-no-op child

Step 3 — Exports:

  • pipeline/init.py — exports StatusObserver and CompositeObserver

Step 4 — Tests (new files):

  • test_status_observer_integration.py — 11 tests covering success/failure/mixed status, path mirroring, tag columns, async,
    fail_fast, multi-node, schema, run_id
  • test_composite_observer.py — 4 tests covering combined logging+status, logger delegation, contextualization, failure tracking

@codecov
Copy link

codecov bot commented Mar 21, 2026

Codecov Report

❌ Patch coverage is 87.41722% with 19 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/pipeline/status_observer.py 81.70% 15 Missing ⚠️
src/orcapod/pipeline/logging_observer.py 77.77% 2 Missing ⚠️
src/orcapod/pipeline/composite_observer.py 97.36% 1 Missing ⚠️
src/orcapod/protocols/node_protocols.py 66.66% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the pipeline observability protocol to pass a pipeline_path into node lifecycle hooks and introduces new observability components for execution status tracking and observer multiplexing. It fits into the pipeline/orchestration layer by enabling per-node, append-only status event tables (RUNNING/SUCCESS/FAILED) and allowing multiple observers (e.g., logging + status) to run together.

Changes:

  • Extend ExecutionObserverProtocol.on_node_start/on_node_end to accept pipeline_path (backward-compatible default).
  • Add StatusObserver to persist per-packet status transitions into per-node, pipeline-path-mirrored tables.
  • Add CompositeObserver to fan out hooks to multiple observers and delegate create_packet_logger to the first non-no-op observer; export both via orcapod.pipeline.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/orcapod/protocols/observability_protocols.py Adds pipeline_path to node lifecycle hook signatures.
src/orcapod/core/nodes/function_node.py Passes pipeline_path into observer node start/end hooks (sync + async).
src/orcapod/pipeline/observer.py Updates no-op observer hook signatures to accept pipeline_path.
src/orcapod/pipeline/logging_observer.py Updates hook signatures/wrappers to accept and forward pipeline_path.
src/orcapod/pipeline/status_observer.py New observer writing RUNNING/SUCCESS/FAILED status events to DB.
src/orcapod/pipeline/composite_observer.py New observer multiplexer delegating hooks to multiple observers.
src/orcapod/pipeline/__init__.py Exports StatusObserver and CompositeObserver.
tests/test_pipeline/test_node_protocols.py Updates mock observers to accept new hook kwargs.
tests/test_pipeline/test_orchestrator.py Updates mock observers to accept new hook kwargs.
tests/test_pipeline/test_sync_orchestrator.py Updates mock observers to accept new hook kwargs.
tests/test_pipeline/test_status_observer_integration.py New integration tests covering status writing and retrieval behavior.
tests/test_pipeline/test_composite_observer.py New tests covering observer multiplexing and logger delegation.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +117 to +120
self._current_run_id = run_id

def on_run_end(self, run_id: str) -> None:
pass
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StatusObserver keeps per-node state in _node_context, but on_run_end() is a no-op. If a run exits early (task cancellation, unexpected exception before on_node_end, etc.), stale entries can persist into subsequent runs. Consider clearing _node_context (and optionally resetting _current_run_id) in on_run_start()/on_run_end() to avoid cross-run leakage.

Suggested change
self._current_run_id = run_id
def on_run_end(self, run_id: str) -> None:
pass
# Initialize per-run state for a fresh execution.
self._current_run_id = run_id
self._node_context.clear()
def on_run_end(self, run_id: str) -> None:
# Ensure no per-node context leaks across runs, even if some
# nodes did not receive an on_node_end callback due to early exit.
self._node_context.clear()
self._current_run_id = ""

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added self._node_context.clear() in both on_run_start and on_run_end. While the values are deterministic (so stale entries would be overwritten with identical values on the next run), clearing at run boundaries makes the intent clearer and removes any theoretical edge case around pipeline reconfiguration between runs.

Comment on lines +232 to +235
# Dynamic tag columns — each tag key becomes its own column (unprefixed)
tag_data = dict(tag)
for key, value in tag_data.items():
columns[key] = pa.array([str(value)], type=pa.large_utf8())
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamic tag columns are written directly into the same columns dict as the fixed _status_* columns. If a tag key collides with a reserved status column name (e.g. _status_state) it will silently overwrite the system column and corrupt the row. Please guard against collisions (reject/rename/prefix conflicting tag keys) before inserting them into columns.

Suggested change
# Dynamic tag columns — each tag key becomes its own column (unprefixed)
tag_data = dict(tag)
for key, value in tag_data.items():
columns[key] = pa.array([str(value)], type=pa.large_utf8())
# Dynamic tag columns — each tag key becomes its own column.
# Guard against collisions with reserved _status_* columns by
# renaming conflicting tag keys with a "tag_" prefix (and numeric suffix if needed).
tag_data = dict(tag)
reserved_keys = set(columns.keys())
for key, value in tag_data.items():
# Normalize key to string in case non-string keys are used.
original_key_str = str(key)
col_name = original_key_str
if col_name in reserved_keys:
logger.warning(
"StatusObserver: tag key '%s' conflicts with reserved status column; "
"renaming to avoid collision",
original_key_str,
)
col_name = f"tag_{original_key_str}"
suffix = 1
# Ensure we don't collide with any existing or previously-added tag columns.
while col_name in reserved_keys:
col_name = f"tag_{original_key_str}_{suffix}"
suffix += 1
columns[col_name] = pa.array([str(value)], type=pa.large_utf8())
reserved_keys.add(col_name)

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declining this one. The _status_ prefix convention exists specifically to prevent collisions with user-defined tag column names — tag keys come from Arrow table column names which are user-defined data fields (e.g. idx, subject_id). A user would never naturally name a tag column _status_state. This is the same pattern used by LoggingObserver with its _log_ prefix, which has no collision guard either. Adding the suggested rename logic introduces complexity for a scenario that can't happen in practice.

Comment on lines +16 to +18
# Inspect run status
status = obs.get_status() # pyarrow.Table
status.to_pandas() # pandas DataFrame
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module-level example suggests obs.get_status() will return the status table, but _write_event() writes to the pipeline-path-mirrored location when pipeline_path is provided (the normal case for compiled pipelines). As a result, get_status() with no pipeline_path will typically return None/empty. Please update the example to show get_status(pipeline_path=node.pipeline_path) (or clarify what the default status_path is used for).

Suggested change
# Inspect run status
status = obs.get_status() # pyarrow.Table
status.to_pandas() # pandas DataFrame
# Inspect run status for a specific node
# e.g., node = pipeline.graph["my_node_label"]
status = obs.get_status(pipeline_path=node.pipeline_path) # pyarrow.Table
status.to_pandas() # pandas DataFrame

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — updated the module docstring example to show get_status(pipeline_path=node.pipeline_path) since that's the normal usage pattern. The no-argument get_status() reads from the default fallback path which is only used when no pipeline_path is available.

@brian-arnold brian-arnold changed the title feat(status): add StatusOberver feat(status): add StatusOberver [ENG-288] Mar 21, 2026
Copy link

@agent-kurouto agent-kurouto bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good addition overall — StatusObserver and CompositeObserver fill a real gap, the backward-compatible protocol extension is cleanly done, and the test coverage is thorough (11 + 4 integration tests). Three interconnected architectural issues need addressing before merge, all tracing back to the same root: the node protocol surface is incomplete, causing implementation details to leak into call sites and concrete classes. The remaining comments are lower-severity and mostly mechanical to fix.

obs = observer if observer is not None else NoOpObserver()

obs.on_node_start(node_label, node_hash)
pp = self.pipeline_path if self._pipeline_database is not None else ()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] pipeline_path belongs on FunctionNodeProtocol — this guard is a symptom of a missing protocol contract

This line reaches past pipeline_path's own abstraction boundary to inspect _pipeline_database directly — the same null-check already encoded inside the property (which raises RuntimeError when the database is absent). The no database → empty tuple semantic now lives in two places instead of one, and a private attribute is being inspected to avoid a RuntimeError that the property itself should never raise in the first place.

The root fix is at the protocol level. FunctionNodeProtocol currently declares only execute, async_execute, and node_type. pipeline_path should be added as a first-class member — it is part of the node's identity and is needed by all observer and storage infrastructure. Once it is on the protocol:

  1. FunctionNode.pipeline_path returns () when no database is attached instead of raising — the property owns the fallback, not the call site.
  2. This guard (and the identical ones in async_execute and _process_packet_internal_async) simplifies to pp = self.pipeline_path.
  3. Any external code working through FunctionNodeProtocol gets a guaranteed tuple[str, ...] without needing to know about _pipeline_database.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — FunctionNode.pipeline_path now returns () instead of raising RuntimeError when no database is attached. Added pipeline_path as a first-class property on FunctionNodeProtocol. The three pp = self.pipeline_path if self._pipeline_database is not None else () guards in execute(), async_execute(), and _async_execute_one_packet() are simplified to pp = self.pipeline_path.

}

# Dynamic tag columns — each tag key becomes its own column (unprefixed)
tag_data = dict(tag)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] Tag schema should come from node.output_schema(), not be inferred at runtime from dict(tag)

StreamProtocol already declares output_schema() -> tuple[Schema, Schema] returning (tag_schema, packet_schema), and nodes implement StreamProtocol — so the tag schema is statically knowable before any packet arrives. Inferring it here by iterating dict(tag) on every write is fragile (nullable fields or absent keys on early packets can produce an inconsistent schema) and wasteful (schema is reconstructed for every event).

output_schema() should be surfaced on FunctionNodeProtocol and the tag schema passed to the observer at on_node_start time. StatusObserver can then pre-build the full Arrow schema — fixed _status_* columns plus the statically-known tag columns — once in on_node_start, rather than re-inferring it on every _write_event call. This also makes the tag-column collision guard (see separate comment below) straightforward to enforce at setup time rather than per-packet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — on_node_start now accepts tag_keys: tuple[str, ...], passed from FunctionNode.execute() via input_stream.keys()[0]. StatusObserver stores tag_keys in _node_context and uses them in _write_event to look up tag values via tag.get(key) instead of dict(tag). The schema is now statically known at node start time rather than inferred per-packet.

output_packet: Any,
cached: bool,
) -> None:
self._write_event(node_label, tag, state="SUCCESS")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] cached: bool is received but silently discarded — this loses a meaningful observability signal

The protocol passes cached to on_packet_end precisely so observers can distinguish cache hits from fresh execution. Dropping it and unconditionally emitting SUCCESS means users cannot tell from the status table which results were computed vs retrieved — defeating a key purpose of a status observer.

Two clean options:

  1. Add a CACHED terminal state — three-way terminal: SUCCESS | FAILED | CACHED.
  2. Add a _status_cached boolean column and keep SUCCESS as the state.

Either way, the schema docstring (currently enumerating only RUNNING, SUCCESS, FAILED) needs updating to match.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added CACHED as a terminal state. on_packet_end now writes CACHED when cached=True and SUCCESS when cached=False. The module docstring is updated to list all four states: RUNNING, SUCCESS, FAILED, CACHED.

) -> None:
self._node_context.pop(node_label, None)

def on_packet_start(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Use TagProtocol/PacketProtocol instead of Any throughout

ExecutionObserverProtocol already correctly types all packet hooks with TagProtocol and PacketProtocol. Using Any here (and in on_packet_end, on_packet_crash, create_packet_logger) makes StatusObserver invisible to static type checkers verifying protocol compliance, and silently weakens the type safety the protocol establishes. This is a pre-existing pattern in LoggingObserver, but new code should correct it rather than perpetuate it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — StatusObserver and _ContextualizedStatusObserver now use TagProtocol and PacketProtocol throughout. Also updated LoggingObserver and _ContextualizedLoggingObserver to match, so both observers are consistent.

obs.on_node_end(node_label, node_hash, pipeline_path=pipeline_path)

def on_packet_start(
self, node_label: str, tag: Any, packet: Any
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Use TagProtocol/PacketProtocol instead of Any throughout

Same issue as in StatusObserver: ExecutionObserverProtocol types all packet hooks with TagProtocol and PacketProtocol, but CompositeObserver uses Any across on_packet_start, on_packet_end, on_packet_crash, and create_packet_logger. The protocol already defines the correct surface — the concrete implementation should match it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — CompositeObserver now uses TagProtocol and PacketProtocol on all packet hooks and create_packet_logger.

) -> None:
self._parent = parent
self._node_hash = node_hash
self._node_label = node_label
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] self._node_hash and self._node_label are stored but never used — dead code

Both fields are set in __init__ but no method in _ContextualizedStatusObserver ever references them — all hooks delegate to the parent using the call-time parameters. Either remove the stored fields, or implement the original intent (e.g. stamp hooks with the stored context rather than relying entirely on call-time args).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — removed self._node_hash and self._node_label from _ContextualizedStatusObserver.__init__. These were never referenced since all hooks delegate to the parent using call-time args. Note: _ContextualizedLoggingObserver still stores these fields because it uses them in create_packet_logger to stamp log rows — that difference is intentional.

) -> None:
self._node_context[node_label] = (node_hash, pipeline_path)

def on_node_end(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Async node failure leaves a stale _node_context entry

In the async execution path on_node_end is called inside try: but not finally:. If an unhandled exception escapes before reaching on_node_end, _node_context[node_label] is never popped and persists for the lifetime of the observer. Since _write_event reads _node_context for every packet event, a subsequent run on the same observer instance reusing the same node_label will silently inherit the stale entry until on_node_start overwrites it.

Consider clearing stale entries defensively in on_run_end, or coordinating with the async path to ensure on_node_end is called in a finally: block.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already addressed — per the earlier Copilot comment, we added self._node_context.clear() in both on_run_start and on_run_end. The orchestrator calls on_run_end in a finally: block (see sync_orchestrator.py:114 and async_orchestrator.py:207), so even if on_node_end is skipped due to an async crash, on_run_end still fires and cleans up all stale entries.

}

# Dynamic tag columns — each tag key becomes its own column (unprefixed)
tag_data = dict(tag)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW] Tag column names can silently shadow _status_* system columns

Tag keys are inserted into the Arrow table without checking for collisions with the fixed system columns. A tag field named _status_state (or any _status_ prefix) would overwrite a system column without warning. A one-line guard (e.g. raise or skip if key.startswith('_status_')) closes this — and is straightforward to enforce at on_node_start time once the tag schema is validated up-front rather than inferred per-packet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Declining — same rationale as the earlier collision comment. The _status_ prefix convention exists specifically to prevent collisions with user-defined tag column names. Tag keys are user data fields (e.g. idx, subject_id) that would never naturally start with _status_. This matches the _log_ prefix pattern in LoggingObserver, which has no collision guard either.

row = pa.table(columns)
try:
self._db.add_record(status_path, status_id, row, flush=True)
except Exception:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[LOW / future issue] Please file a follow-up issue for configurable flush behaviour

flush=True on every add_record is correct for safety but will become a bottleneck at scale. No need to address this in the current PR — please open a follow-up issue to make flush behaviour configurable on StatusObserver.__init__ (e.g. flush_on_write: bool = True).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — flush=True on every write is correct for safety but will need to be configurable at scale. Will note this as a follow-up issue for a flush_on_write: bool = True parameter on StatusObserver.__init__.

Brian Arnold added 2 commits March 21, 2026 21:49
obs = observer if observer is not None else NoOpObserver()

obs.on_node_start(node_label, node_hash)
pp = self.pipeline_path if self._pipeline_database is not None else ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline path is guaranteed to return something -- you shouldn't have to inspect the content of the pipeline_database to decide whether to pass the value into the observer.


obs = observer if observer is not None else NoOpObserver()

pp = self.pipeline_path if self._pipeline_database is not None else ()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as earlier -- don't perform check on pipeline path -- it's guaranteed to return some defining path

@@ -1,15 +1,19 @@
from .async_orchestrator import AsyncPipelineOrchestrator
from .composite_observer import CompositeObserver
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are starting to accumulate multiple types of observers.Let's add an issue to aggregate them into its own sub package in the future

@eywalker eywalker merged commit 5b7dbca into nauticalab:dev Mar 21, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants