-
Notifications
You must be signed in to change notification settings - Fork 0
chore: removal of nullable fields #96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughReplaces lifecycle-based service initialization with explicit async context managers and run() methods across multiple services. Removes the LifecycleEnabled base class and refactors dependency injection wiring in providers. Introduces new SSE routing and connection registry components while removing shutdown manager and bridge patterns. Converts worker entry points to FastStream-based architecture. Updates various services including EventBus, NotificationService, UserSettingsService, DLQManager, and more to use new initialization patterns. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes This PR involves substantial architectural restructuring across the codebase: removal of a foundational base class (LifecycleEnabled), refactoring of multiple service initialization patterns, complete removal of several modules (lifecycle, coordinator services), introduction of new SSE components, and conversion of worker entry points to a new framework. The changes are heterogeneous in nature—each service follows similar patterns but requires independent reasoning. The high complexity comes from the pervasive nature of the lifecycle pattern changes, the need to understand multiple service rewiring approaches, and the interdependencies between DI providers, services, and worker scripts. Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issues found across 2 files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/notification_service.py (1)
136-156: Add_subscribe_to_events()to background tasks so execution event notifications fire.
_subscribe_to_events()is never invoked, so the Kafka consumer for execution-event notifications (EXECUTION_COMPLETED, EXECUTION_FAILED, EXECUTION_TIMEOUT) never starts. Add it to the background task queue:🔧 Proposed fix
def _start_background_tasks(self) -> None: """Start background processing tasks.""" tasks = [ + asyncio.create_task(self._subscribe_to_events()), asyncio.create_task(self._process_pending_notifications()), asyncio.create_task(self._cleanup_old_notifications()), ]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/notification_service.py (1)
118-155: _subscribe_to_events() is never invoked—execution-event notifications won't be consumed.The async method
_subscribe_to_events()is defined (line 207) but never called anywhere in the codebase. This means the service will not set up Kafka consumer subscriptions for execution events, breaking event-driven notifications entirely.Additionally,
_start_background_tasks()callsasyncio.create_task()in__init__without explicitly verifying a running loop. While lazy DI instantiation during async route handling may avoid an immediateRuntimeError, this pattern is fragile and doesn't fail fast.Move event subscription setup into an async initialization point (e.g., a
start()method called from the lifespan handler, or schedule_subscribe_to_events()as a background task once the loop is guaranteed to be running).
sse shutdown manager: moved to DI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/core/providers.py (1)
515-531: Remove task creation fromNotificationService.__init__or make the provider async-aware.
NotificationService.__init__callsasyncio.create_task()on lines 199-200 to start background tasks. Sinceget_notification_serviceis a synchronous provider, this will raiseRuntimeError: no running event loopif resolved in any context before an event loop is active. Either defer task startup to an explicit async method (e.g.,async def start()) called after service resolution in an async context, or convert the provider to resolve the service in an async scope.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/services/k8s_worker/worker.py (1)
169-175: Clearidempotent_consumeron shutdown to avoid stale “running” status.
get_statusnow reports"running"based onidempotent_consumer is not None, but__aexit__doesn’t null it after stopping. This can report running even after shutdown.🐛 Proposed fix
@@ if self.idempotent_consumer: await self.idempotent_consumer.stop() + self.idempotent_consumer = NoneAlso applies to: 430-434
backend/app/services/sse/kafka_redis_bridge.py (1)
16-60: Make aenter idempotent and exception‑safe.Tests call aenter twice; current code overwrites
self.consumerswithout stopping the already-started set, which can leak running consumers. Also, a partial start failure leaves running consumers behind. Guard re-entry and clean up on failure.🔧 Suggested fix
async def __aenter__(self) -> "SSEKafkaRedisBridge": """Start the SSE Kafka→Redis bridge.""" self.logger.info(f"Starting SSE Kafka→Redis bridge with {self.num_consumers} consumers") + if self.consumers: + self.logger.debug("SSE Kafka→Redis bridge already started") + return self + # Phase 1: Build all consumers and track them immediately (no I/O) self.consumers = [self._build_consumer(i) for i in range(self.num_consumers)] # Phase 2: Start all in parallel - already tracked in self.consumers for cleanup topics = list(CONSUMER_GROUP_SUBSCRIPTIONS[GroupId.WEBSOCKET_GATEWAY]) - await asyncio.gather(*[c.start(topics) for c in self.consumers]) + try: + await asyncio.gather(*[c.start(topics) for c in self.consumers]) + except Exception: + await asyncio.gather(*[c.stop() for c in self.consumers], return_exceptions=True) + self.consumers.clear() + raise
🤖 Fix all issues with AI agents
In `@backend/app/services/coordinator/coordinator.py`:
- Around line 112-117: The log message in ExecutionCoordinator.__aenter__ is
misleading because queue_manager is constructed in __init__; update __aenter__
to not claim initialization: remove or replace the "Queue manager initialized"
logger call in __aenter__ (referencing ExecutionCoordinator.__aenter__ and the
queue_manager created in ExecutionCoordinator.__init__) and, if you need to
indicate readiness here, change it to a truthful message such as
"ExecutionCoordinator service started" or "Queue manager ready" so the log
reflects actual work performed in __aenter__.
In `@backend/app/services/pod_monitor/monitor.py`:
- Around line 289-297: The current _backoff() raises RuntimeError when reconnect
attempts exceed config.max_reconnect_attempts which propagates out of
_watch_loop() and kills the _watch_task; instead, modify _backoff() to not raise
but mark the monitor as paused (e.g., set self._paused = True and log the
failure with details) and/or schedule a retry via a new helper (e.g.,
_ensure_watch_running) that runs in background to attempt restart, and update
_watch_loop() to check self._paused and exit gracefully without killing the
background manager; ensure references to _backoff, _watch_loop, _watch_task, and
config.max_reconnect_attempts are used so the new paused flag or background
restart task can detect and restart the watcher.
🧹 Nitpick comments (6)
backend/app/services/coordinator/coordinator.py (1)
196-197: Redundanthasattrcheck.
idempotency_manageris always assigned in__init__(line 97), sohasattris unnecessary. The None check alone is sufficient.Suggested simplification
- if hasattr(self, "idempotency_manager") and self.idempotency_manager: + if self.idempotency_manager: await self.idempotency_manager.close()backend/workers/run_k8s_worker.py (1)
42-46: Consider exiting if the worker stops unexpectedly.The loop now waits only on
shutdown_event. If the worker/consumer dies, the process may keep running and just log status. If you want automatic recovery or a clean exit, consider breaking/setting the shutdown event when status indicates not running.♻️ Possible adjustment
while not shutdown_event.is_set(): await asyncio.sleep(60) status = await worker.get_status() logger.info(f"Kubernetes worker status: {status}") + if not status.get("running", True): + logger.error("KubernetesWorker not running; initiating shutdown.") + shutdown_event.set()backend/app/services/pod_monitor/monitor.py (1)
122-138: Consider logging exceptions from the watch task during shutdown.If
_watch_taskterminates due to an exception other thanCancelledError(e.g.,RuntimeErrorfrom_backoff), it will be silently ignored during__aexit__. Consider retrieving and logging any exception.🔧 Suggested improvement
if self._watch_task: self._watch_task.cancel() try: await self._watch_task except asyncio.CancelledError: pass + except Exception as e: + self.logger.warning(f"Watch task terminated with error: {e}")backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
118-125: Assertion may not validate event handling outcome.The assertion
len(orch._sagas) > 0verifies thatregister_saga(_Saga)populated_sagas, not that_handle_eventprocessed the event. Based on the relevant code snippet showing_handle_eventiterates overself._sagas, this assertion passes due to theregister_sagacall on line 121, not because of the event handling on line 123.Consider asserting on observable side effects of event handling, such as checking the fake repository for saved sagas.
🔧 Suggested improvement
`@pytest.mark.asyncio` async def test_min_success_flow(event_metrics: EventMetrics) -> None: - orch = _orch(event_metrics) - orch.register_saga(_Saga) + fake_repo = _FakeRepo() + orch = SagaOrchestrator( + config=SagaConfig(name="t", enable_compensation=True, store_events=True, publish_commands=False), + saga_repository=fake_repo, + producer=_FakeProd(), + schema_registry_manager=MagicMock(spec=SchemaRegistryManager), + settings=MagicMock(spec=Settings), + event_store=_FakeStore(), + idempotency_manager=_FakeIdem(), + resource_allocation_repository=_FakeAlloc(), + logger=_test_logger, + event_metrics=event_metrics, + ) + orch.register_saga(_Saga) # Handle the event await orch._handle_event(make_execution_requested_event(execution_id="e")) - # basic sanity; deep behavior covered by integration - assert len(orch._sagas) > 0 + # Verify saga was created and saved + assert len(fake_repo.saved) > 0backend/workers/run_saga_orchestrator.py (1)
31-43: Consider liveness handling after dropping theis_runninggate.
With onlyshutdown_event, the worker will keep sleeping even if the orchestrator stops unexpectedly; please confirm that’s intended or wire a failure hook to trigger shutdown.backend/app/services/saga/saga_orchestrator.py (1)
361-384: Consider re-raisingCancelledErrorfor cleaner task semantics.While suppressing
CancelledErrorworks here sinceasyncio.gather(..., return_exceptions=True)is used in__aexit__, re-raising is the recommended pattern for asyncio tasks. This ensures the task is properly marked as cancelled rather than completing normally.♻️ Optional: Re-raise CancelledError
except asyncio.CancelledError: self.logger.info("Timeout checker cancelled") + raise
| async def __aenter__(self) -> "ExecutionCoordinator": | ||
| """Start the coordinator service.""" | ||
| self.logger.info("Starting ExecutionCoordinator service...") | ||
|
|
||
| await self.queue_manager.start() | ||
| self.logger.info("Queue manager initialized") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Misleading log message.
The log at line 116 states "Queue manager initialized" but queue_manager is constructed in __init__ (lines 74-80), not here. Consider removing this log or moving it to reflect actual initialization work being done at this point.
Suggested fix
async def __aenter__(self) -> "ExecutionCoordinator":
"""Start the coordinator service."""
self.logger.info("Starting ExecutionCoordinator service...")
- self.logger.info("Queue manager initialized")
-
await self.idempotency_manager.initialize()🤖 Prompt for AI Agents
In `@backend/app/services/coordinator/coordinator.py` around lines 112 - 117, The
log message in ExecutionCoordinator.__aenter__ is misleading because
queue_manager is constructed in __init__; update __aenter__ to not claim
initialization: remove or replace the "Queue manager initialized" logger call in
__aenter__ (referencing ExecutionCoordinator.__aenter__ and the queue_manager
created in ExecutionCoordinator.__init__) and, if you need to indicate readiness
here, change it to a truthful message such as "ExecutionCoordinator service
started" or "Queue manager ready" so the log reflects actual work performed in
__aenter__.
| async def _backoff(self) -> None: | ||
| """Handle watch errors with exponential backoff.""" | ||
| self._reconnect_attempts += 1 | ||
|
|
||
| if self._reconnect_attempts > self.config.max_reconnect_attempts: | ||
| self.logger.error( | ||
| f"Max reconnect attempts ({self.config.max_reconnect_attempts}) exceeded, stopping pod monitor" | ||
| f"Max reconnect attempts ({self.config.max_reconnect_attempts}) exceeded" | ||
| ) | ||
| self._state = MonitorState.STOPPING | ||
| return | ||
| raise RuntimeError("Max reconnect attempts exceeded") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check _backoff usage and context
rg -n -A 10 '_backoff' backend/app/services/pod_monitor/monitor.py | head -80Repository: HardMax71/Integr8sCode
Length of output: 1335
🏁 Script executed:
#!/bin/bash
# Check the _watch_loop implementation to see error handling
rg -n -B 5 -A 20 '_watch_loop' backend/app/services/pod_monitor/monitor.pyRepository: HardMax71/Integr8sCode
Length of output: 2198
🏁 Script executed:
#!/bin/bash
# Look for exception handling that might catch RuntimeError
rg -n -B 3 -A 8 'except' backend/app/services/pod_monitor/monitor.pyRepository: HardMax71/Integr8sCode
Length of output: 4657
🏁 Script executed:
#!/bin/bash
# Check if there's any monitoring/restart logic for the watch_task failure
rg -n -B 2 -A 10 '_watch_task' backend/app/services/pod_monitor/monitor.pyRepository: HardMax71/Integr8sCode
Length of output: 1841
🏁 Script executed:
#!/bin/bash
# Check the full _backoff implementation to see what happens after the exception
sed -n '289,320p' backend/app/services/pod_monitor/monitor.pyRepository: HardMax71/Integr8sCode
Length of output: 1348
The watch task fails silently when max reconnect attempts are exceeded, leaving the monitor unable to watch for pod events.
When _backoff() raises RuntimeError upon exceeding max attempts, the exception propagates out of _watch_loop() and terminates the _watch_task. Since there is no error monitoring or automatic restart mechanism, the monitor stops functioning without any recovery path. Consider either: (1) gracefully degrading by pausing watches and allowing manual recovery, or (2) adding a background monitor to detect and restart failed tasks.
🤖 Prompt for AI Agents
In `@backend/app/services/pod_monitor/monitor.py` around lines 289 - 297, The
current _backoff() raises RuntimeError when reconnect attempts exceed
config.max_reconnect_attempts which propagates out of _watch_loop() and kills
the _watch_task; instead, modify _backoff() to not raise but mark the monitor as
paused (e.g., set self._paused = True and log the failure with details) and/or
schedule a retry via a new helper (e.g., _ensure_watch_running) that runs in
background to attempt restart, and update _watch_loop() to check self._paused
and exit gracefully without killing the background manager; ensure references to
_backoff, _watch_loop, _watch_task, and config.max_reconnect_attempts are used
so the new paused flag or background restart task can detect and restart the
watcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8 issues found across 32 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/workers/run_pod_monitor.py">
<violation number="1" location="backend/workers/run_pod_monitor.py:45">
P2: The loop now ignores the PodMonitor state, so if the monitor stops internally the worker will keep polling forever instead of exiting. Consider reintroducing a state check (or equivalent stop condition) to avoid hanging on a stopped monitor.</violation>
</file>
<file name="backend/app/services/saga/saga_orchestrator.py">
<violation number="1" location="backend/app/services/saga/saga_orchestrator.py:364">
P2: The timeout checker now stops permanently on any unexpected exception because the loop no longer catches and logs `Exception`. A transient repository error will kill the task and timeout checks will never resume. Wrap the body of the loop in a try/except to log and continue.</violation>
</file>
<file name="backend/app/services/k8s_worker/worker.py">
<violation number="1" location="backend/app/services/k8s_worker/worker.py:433">
P2: `running` is now derived from `idempotent_consumer` existence, but the shutdown path stops the consumer without clearing the reference. After `__aexit__`, `get_status()` will still return running=True even though the worker has stopped. Track an explicit running flag or clear `self.idempotent_consumer` on shutdown so status reflects the actual lifecycle state.</violation>
</file>
<file name="backend/tests/unit/services/saga/test_saga_orchestrator_unit.py">
<violation number="1" location="backend/tests/unit/services/saga/test_saga_orchestrator_unit.py:125">
P3: This assertion no longer validates that `_handle_event` did anything; `_sagas` is populated by `register_saga` before the handler runs, so the test will pass even if event handling fails. Assert on a side effect of `_handle_event` (e.g., running instances or saved saga) instead.</violation>
</file>
<file name="backend/workers/run_k8s_worker.py">
<violation number="1" location="backend/workers/run_k8s_worker.py:43">
P2: The run loop no longer exits when the KubernetesWorker stops. If the worker shuts down unexpectedly, this loop keeps running and prevents graceful cleanup until a signal arrives. Consider restoring the `worker.is_running` guard (or breaking when status indicates stopped) so the process terminates cleanly when the worker stops.</violation>
</file>
<file name="backend/app/events/event_store_consumer.py">
<violation number="1" location="backend/app/events/event_store_consumer.py:131">
P2: Unhandled exceptions in the batch processor will terminate the background task and stop future flushes; add error handling inside the loop to log and continue.</violation>
</file>
<file name="backend/workers/run_coordinator.py">
<violation number="1" location="backend/workers/run_coordinator.py:43">
P2: The loop no longer exits when the coordinator stops, so the worker can keep polling indefinitely after a coordinator failure instead of shutting down and cleaning up.</violation>
</file>
<file name="backend/app/services/pod_monitor/monitor.py">
<violation number="1" location="backend/app/services/pod_monitor/monitor.py:297">
P2: Raising RuntimeError in the watch backoff path will terminate the background watch task without a clean shutdown; since the task is created with `create_task` and `__aexit__` only suppresses `CancelledError`, this exception will bubble up later on shutdown or leave the monitor silently stopped. Consider exiting the watch loop via cancellation instead of throwing a RuntimeError.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| while True: | ||
| # Check every 30 seconds | ||
| await asyncio.sleep(30) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: The timeout checker now stops permanently on any unexpected exception because the loop no longer catches and logs Exception. A transient repository error will kill the task and timeout checks will never resume. Wrap the body of the loop in a try/except to log and continue.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/saga/saga_orchestrator.py, line 364:
<comment>The timeout checker now stops permanently on any unexpected exception because the loop no longer catches and logs `Exception`. A transient repository error will kill the task and timeout checks will never resume. Wrap the body of the loop in a try/except to log and continue.</comment>
<file context>
@@ -363,8 +360,8 @@ async def _fail_saga(self, instance: Saga, error_message: str) -> None:
- while self.is_running:
- try:
+ try:
+ while True:
# Check every 30 seconds
await asyncio.sleep(30)
</file context>
| ) | ||
| self._state = MonitorState.STOPPING | ||
| return | ||
| raise RuntimeError("Max reconnect attempts exceeded") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Raising RuntimeError in the watch backoff path will terminate the background watch task without a clean shutdown; since the task is created with create_task and __aexit__ only suppresses CancelledError, this exception will bubble up later on shutdown or leave the monitor silently stopped. Consider exiting the watch loop via cancellation instead of throwing a RuntimeError.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/pod_monitor/monitor.py, line 297:
<comment>Raising RuntimeError in the watch backoff path will terminate the background watch task without a clean shutdown; since the task is created with `create_task` and `__aexit__` only suppresses `CancelledError`, this exception will bubble up later on shutdown or leave the monitor silently stopped. Consider exiting the watch loop via cancellation instead of throwing a RuntimeError.</comment>
<file context>
@@ -342,16 +286,15 @@ async def _publish_event(self, event: DomainEvent, pod: k8s_client.V1Pod) -> Non
)
- self._state = MonitorState.STOPPING
- return
+ raise RuntimeError("Max reconnect attempts exceeded")
# Calculate exponential backoff
</file context>
| await orch._handle_event(make_execution_requested_event(execution_id="e")) | ||
| # basic sanity; deep behavior covered by integration | ||
| assert orch.is_running is True | ||
| assert len(orch._sagas) > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P3: This assertion no longer validates that _handle_event did anything; _sagas is populated by register_saga before the handler runs, so the test will pass even if event handling fails. Assert on a side effect of _handle_event (e.g., running instances or saved saga) instead.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/tests/unit/services/saga/test_saga_orchestrator_unit.py, line 125:
<comment>This assertion no longer validates that `_handle_event` did anything; `_sagas` is populated by `register_saga` before the handler runs, so the test will pass even if event handling fails. Assert on a side effect of `_handle_event` (e.g., running instances or saved saga) instead.</comment>
<file context>
@@ -119,11 +119,10 @@ def _orch(event_metrics: EventMetrics) -> SagaOrchestrator:
await orch._handle_event(make_execution_requested_event(execution_id="e"))
# basic sanity; deep behavior covered by integration
- assert orch.is_running is True
+ assert len(orch._sagas) > 0
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/tests/integration/conftest.py`:
- Around line 74-83: The SSERedisBus constructor is being called with arguments
in the wrong order in the sse_redis_bus fixture: SSERedisBus expects
(redis_client, logger, exec_prefix, notif_prefix) but the fixture passes
exec_prefix and notif_prefix positionally and logger last; fix by passing logger
as the second argument (either positionally after redis_client or as
logger=<logger>) and ensure exec_prefix and notif_prefix are provided afterwards
(or as keywords) so the call to SSERedisBus(redis_client, logger,
exec_prefix=..., notif_prefix=..., logger=...) matches the constructor
signature.
🧹 Nitpick comments (1)
backend/tests/integration/idempotency/test_consumer_idempotent.py (1)
9-10: Redundant import ofEventDispatcher.
EventDispatcheris imported twice—once fromapp.events.core(line 9) and again aliased asDispfromapp.events.core.dispatcher(line 10). Sinceapp.events.corelikely re-exports from the dispatcher module, these are the same class.Suggested fix
-from app.events.core import ConsumerConfig, EventDispatcher, UnifiedConsumer, UnifiedProducer -from app.events.core.dispatcher import EventDispatcher as Disp +from app.events.core import ConsumerConfig, EventDispatcher, UnifiedConsumer, UnifiedProducerThen update line 47 to use
EventDispatcherdirectly:- disp: Disp = EventDispatcher(logger=_test_logger) + disp = EventDispatcher(logger=_test_logger)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
13 issues found across 47 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/workers/run_k8s_worker.py">
<violation number="1" location="backend/workers/run_k8s_worker.py:53">
P1: Exceptions from completed tasks in `done` set are silently swallowed. If `consumer.run()` or `logic.ensure_daemonset_task()` raises an exception, the worker will silently proceed to shutdown without logging or propagating the error. Add exception handling for tasks in `done`.</violation>
</file>
<file name="backend/workers/run_result_processor.py">
<violation number="1" location="backend/workers/run_result_processor.py:42">
P2: Exceptions from completed tasks in `done` set are never retrieved. If `consumer.run()` crashes with an exception before the shutdown signal, the error is silently ignored. Check for exceptions in completed tasks to properly log failures.</violation>
</file>
<file name="backend/app/core/dishka_lifespan.py">
<violation number="1" location="backend/app/core/dishka_lifespan.py:116">
P3: Inconsistent logging style: use structured logging with `extra` parameter instead of f-string interpolation to match the rest of this file's logging pattern. This improves log aggregation and follows the established convention.</violation>
</file>
<file name="backend/app/services/coordinator/coordinator_logic.py">
<violation number="1" location="backend/app/services/coordinator/coordinator_logic.py:280">
P1: `event.priority` is an `int`, so calling `.name` on it will raise `AttributeError`. The priority value must be converted to `QueuePriority` enum first.</violation>
</file>
<file name="backend/app/services/result_processor/processor_logic.py">
<violation number="1" location="backend/app/services/result_processor/processor_logic.py:33">
P3: Misleading docstring: The class is no longer stateful after this refactoring. The removed `_state`, `_consumer`, and `_dispatcher` attributes were what made it stateful. Consider updating the docstring to reflect that this class is now a stateless service that processes events.</violation>
</file>
<file name="backend/app/services/sse/sse_connection_registry.py">
<violation number="1" location="backend/app/services/sse/sse_connection_registry.py:49">
P1: Metrics are decremented even when the connection wasn't actually registered, which can lead to incorrect (potentially negative) connection counts. The code should only decrement metrics when a connection was actually removed from the registry.</violation>
</file>
<file name="backend/app/events/core/consumer.py">
<violation number="1" location="backend/app/events/core/consumer.py:109">
P1: `event` may be undefined when the error callback is invoked. If `self._schema_registry.deserialize_event()` throws an exception, `event` was never assigned, causing a `NameError`. Initialize `event` before the try block or guard the callback invocation.</violation>
</file>
<file name="backend/workers/run_coordinator.py">
<violation number="1" location="backend/workers/run_coordinator.py:53">
P1: Exceptions from completed tasks are silently lost. If `run_task` finishes with an exception (e.g., consumer or scheduling loop crashes), the error is never retrieved or logged. You should check tasks in `done` for exceptions to avoid silent failures.</violation>
</file>
<file name="backend/app/services/sse/event_router.py">
<violation number="1" location="backend/app/services/sse/event_router.py:44">
P2: Inefficient use of `model_dump()` just to extract a single field. Since all SSE-relevant events have `execution_id` as a direct attribute, use `getattr(event, 'execution_id', None)` to avoid serializing the entire model.</violation>
</file>
<file name="backend/workers/run_pod_monitor.py">
<violation number="1" location="backend/workers/run_pod_monitor.py:45">
P1: Exceptions from completed tasks are silently swallowed. When using `asyncio.wait()`, you must check completed tasks for exceptions, otherwise failures in `monitor.run()` will go unnoticed. Add exception handling for tasks in `done`.</violation>
</file>
<file name="backend/workers/run_saga_orchestrator.py">
<violation number="1" location="backend/workers/run_saga_orchestrator.py:65">
P1: Exceptions from completed tasks are silently swallowed. If `run_task` fails with an exception, it ends up in `done` but the exception is never checked or re-raised. Add exception handling for completed tasks to avoid silent failures.</violation>
</file>
<file name="backend/tests/integration/result_processor/test_result_processor.py">
<violation number="1" location="backend/tests/integration/result_processor/test_result_processor.py:130">
P2: Unused parameter `consumer_config` after refactoring. The function now creates its own `processor_consumer_config` and `stored_consumer_config` locally instead of using the injected fixture. Remove the unused parameter from the function signature.</violation>
</file>
<file name="backend/app/services/pod_monitor/monitor.py">
<violation number="1" location="backend/app/services/pod_monitor/monitor.py:123">
P2: `asyncio.CancelledError` is caught but not re-raised, breaking Python's async cancellation contract. The task will appear to complete normally instead of being cancelled, which can cause issues with task lifecycle management (e.g., `task.cancelled()` returns False, callers won't see the exception).</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| del self._active_connections[execution_id] | ||
|
|
||
| self.logger.debug("Unregistered SSE connection", extra={"connection_id": connection_id}) | ||
| self.metrics.decrement_sse_connections("executions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Metrics are decremented even when the connection wasn't actually registered, which can lead to incorrect (potentially negative) connection counts. The code should only decrement metrics when a connection was actually removed from the registry.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/sse/sse_connection_registry.py, line 49:
<comment>Metrics are decremented even when the connection wasn't actually registered, which can lead to incorrect (potentially negative) connection counts. The code should only decrement metrics when a connection was actually removed from the registry.</comment>
<file context>
@@ -0,0 +1,57 @@
+ del self._active_connections[execution_id]
+
+ self.logger.debug("Unregistered SSE connection", extra={"connection_id": connection_id})
+ self.metrics.decrement_sse_connections("executions")
+
+ def get_connection_count(self) -> int:
</file context>
| self._event_metrics.record_kafka_consumption_error( | ||
| msg.topic, self._config.group_id, type(e).__name__ | ||
| ) | ||
| if self._error_callback: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: event may be undefined when the error callback is invoked. If self._schema_registry.deserialize_event() throws an exception, event was never assigned, causing a NameError. Initialize event before the try block or guard the callback invocation.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/events/core/consumer.py, line 109:
<comment>`event` may be undefined when the error callback is invoked. If `self._schema_registry.deserialize_event()` throws an exception, `event` was never assigned, causing a `NameError`. Initialize `event` before the try block or guard the callback invocation.</comment>
<file context>
@@ -63,189 +69,58 @@ async def start(self, topics: list[KafkaTopic]) -> None:
+ self._event_metrics.record_kafka_consumption_error(
+ msg.topic, self._config.group_id, type(e).__name__
+ )
+ if self._error_callback:
+ await self._error_callback(e, event)
</file context>
|
|
||
| async def route_event(self, event: DomainEvent) -> None: | ||
| """Route an event to Redis for SSE delivery.""" | ||
| data = event.model_dump() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Inefficient use of model_dump() just to extract a single field. Since all SSE-relevant events have execution_id as a direct attribute, use getattr(event, 'execution_id', None) to avoid serializing the entire model.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/sse/event_router.py, line 44:
<comment>Inefficient use of `model_dump()` just to extract a single field. Since all SSE-relevant events have `execution_id` as a direct attribute, use `getattr(event, 'execution_id', None)` to avoid serializing the entire model.</comment>
<file context>
@@ -0,0 +1,63 @@
+
+ async def route_event(self, event: DomainEvent) -> None:
+ """Route an event to Redis for SSE delivery."""
+ data = event.model_dump()
+ execution_id = data.get("execution_id")
+
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/services/coordinator/coordinator_logic.py (1)
231-281: Cast priority toQueuePrioritybefore using.name.If
event.priorityis an int (as it appears elsewhere),priority.namewill raise and the schedule path will fail.🐛 Proposed fix: normalize to QueuePriority
- priority = getattr(event, "priority", QueuePriority.NORMAL) - self.metrics.record_coordinator_queue_time(queue_time, priority.name) + priority = QueuePriority(getattr(event, "priority", QueuePriority.NORMAL)) + self.metrics.record_coordinator_queue_time(queue_time, priority.name)backend/app/services/result_processor/processor_logic.py (1)
88-93: Guard memory limit parsing to avoid handler failure.Line 90 uses
rstrip("Mi"), which will mis-handleGior other formats and can raiseValueError, aborting the handler. Consider a minimal parse guard to keep result processing resilient.🛠️ Suggested fix
- settings_limit = self._settings.K8S_POD_MEMORY_LIMIT - memory_limit_mib = int(settings_limit.rstrip("Mi")) # TODO: Less brittle acquisition of limit - memory_percent = (memory_mib / memory_limit_mib) * 100 - self._metrics.memory_utilization_percent.record( - memory_percent, attributes={"lang_and_version": lang_and_version} - ) + settings_limit = self._settings.K8S_POD_MEMORY_LIMIT + try: + if settings_limit.endswith("Gi"): + memory_limit_mib = int(settings_limit[:-2]) * 1024 + elif settings_limit.endswith("Mi"): + memory_limit_mib = int(settings_limit[:-2]) + else: + raise ValueError(f"Unsupported unit: {settings_limit}") + memory_percent = (memory_mib / memory_limit_mib) * 100 + self._metrics.memory_utilization_percent.record( + memory_percent, attributes={"lang_and_version": lang_and_version} + ) + except ValueError: + self.logger.warning( + "Skipping memory utilization metric due to unparseable limit", + extra={"limit": settings_limit}, + )
🤖 Fix all issues with AI agents
In `@backend/app/core/dishka_lifespan.py`:
- Around line 102-127: Background tasks created in the tasks list
(run_sse_consumers, event_store_consumer.run, notification_service.run) are
fire-and-forget and only awaited at shutdown, so runtime exceptions can be lost;
update the creation of each asyncio.create_task to attach an add_done_callback
that checks task.exception(), logs the error via logger, and on first non-None
exception cancels all other tasks in the tasks collection (safely skipping
already done/cancelled tasks) to implement immediate supervision and fail-fast
behavior, ensuring you still await asyncio.gather(*tasks,
return_exceptions=True) in the finally block to finish cleanup.
In `@backend/app/core/providers.py`:
- Around line 463-472: The provider get_sse_connection_registry currently
creates an SSEConnectionRegistry per request (provide(scope=Scope.REQUEST)),
which resets _active_connections and causes get_connection_count() to always
return 0; change the provider to use application scope
(provide(scope=Scope.APP)) so SSEConnectionRegistry is a singleton for the app,
preserving the asyncio.Lock() and shared _active_connections across requests and
allowing registrations from SSE streams to be visible to the health endpoint.
In `@backend/app/events/core/consumer.py`:
- Around line 104-110: The variable event can be unbound if deserialize_event()
raises, causing a NameError when awaiting self._error_callback(e, event); fix by
initializing event = None before the try/except where deserialize_event() is
called (so the callback always receives a defined value), and keep the existing
error logging and metrics calls (e.g., the except block that calls
self.logger.error, self._event_metrics.record_kafka_consumption_error and awaits
self._error_callback).
In `@backend/app/services/idempotency/middleware.py`:
- Around line 123-144: register_idempotent_handler wraps a handler in
IdempotentEventHandler then passes it to EventDispatcher.register which expects
handler.__name__; set the wrapper's __name__ (or use
functools.update_wrapper/functools.wraps) so the wrapped instance exposes the
original handler's __name__ to avoid AttributeError. In practice, after creating
wrapped in register_idempotent_handler, assign wrapped.__name__ =
getattr(handler, "__name__", type(handler).__name__) (or call
update_wrapper(wrapped, handler)) before calling
self._dispatcher.register(event_type)(wrapped).
In `@backend/app/services/sse/sse_connection_registry.py`:
- Around line 30-49: The register_connection and unregister_connection currently
call metrics.increment_sse_connections / decrement_sse_connections
unconditionally, causing drift on duplicate adds/removes; modify
register_connection to only call metrics.increment_sse_connections("executions")
(and log) if connection_id was not already present in
self._active_connections[execution_id], and modify unregister_connection to only
call metrics.decrement_sse_connections("executions") (and log) if connection_id
was actually removed (i.e., it existed before discard and the set changed or
length decreased); keep the async with self._lock around these checks and
mutations and use the same symbols (_active_connections, register_connection,
unregister_connection, metrics.increment_sse_connections,
metrics.decrement_sse_connections) to locate the changes.
In `@backend/tests/integration/events/test_consumer_lifecycle.py`:
- Around line 40-52: The test signals readiness too early: move the
consumer_started.set() so it runs after the internal consumer is initialized
inside run() (or after consumer.run() completes its startup logic) to avoid
calling consumer.seek_to_beginning()/consumer.seek_to_end() before
consumer._consumer is assigned; locate the run_with_signal coroutine and ensure
the readiness event is set only once consumer.run() has created/started the
AIOKafkaConsumer (or add an explicit check/wait for consumer._consumer to be
non-None before performing seeks).
In `@backend/workers/run_coordinator.py`:
- Around line 42-64: The code awaits asyncio.wait([run_task, shutdown_task]) but
only inspects pending tasks, so exceptions from completed tasks (e.g., run_task
from run_coordinator_tasks()) are never re-raised; update the logic after
asyncio.wait to iterate over the done set and call task.result() (or check
task.exception()) for each done task and re-raise or surface any exceptions (at
least for run_task) before cancelling pending tasks; locate
run_coordinator_tasks, run_task, shutdown_task, and the asyncio.wait call to
implement this change so failures in run_coordinator_tasks propagate correctly.
In `@backend/workers/run_k8s_worker.py`:
- Around line 49-64: The code creates run_task =
asyncio.create_task(run_worker_tasks()) and shutdown_task =
asyncio.create_task(shutdown_event.wait()) then awaits asyncio.wait but never
retrieves results from completed tasks, so exceptions from run_worker_tasks()
are swallowed; after the asyncio.wait call inspect each task in the done set
(e.g., check task.done() and call task.result() or task.exception()) and
re-raise or propagate any exception found (for example by calling task.result()
which will raise) before cancelling pending tasks so that worker failures
surface instead of being masked.
In `@backend/workers/run_result_processor.py`:
- Around line 35-53: The current shutdown sequence swallows exceptions from
consumer.run() because run_task's exception is never retrieved; after the
asyncio.wait completes, inspect the completed tasks (e.g., check if run_task is
in the done set), call run_task.result() (or handle/run_task.exception()) to
surface any exception, log it with logger.error including context, and re-raise
or set an exit error before cancelling pending tasks (shutdown_task) so failures
in consumer.run() are not treated as normal shutdown; update the logic around
run_task/run_task.result(), shutdown_task, and shutdown_event accordingly.
In `@backend/workers/run_saga_orchestrator.py`:
- Around line 54-70: The current shutdown logic cancels pending tasks but can
silently swallow exceptions from completed tasks like run_task; after
asyncio.wait returns, inspect the done set (e.g., run_task in done) and
propagate any errors by awaiting the completed tasks or checking
task.exception() and re-raising it before returning/finishing; keep existing
cancellation/await handling for pending tasks (shutdown_task) but ensure
exceptions from run_orchestrator_tasks are not ignored so the orchestrator
failure surfaces to the caller.
♻️ Duplicate comments (4)
backend/app/events/event_store_consumer.py (1)
102-104: Add error handling for batch storage and commit failures.If
_store_batch()orconsumer.commit()throws an exception (e.g., transient MongoDB or Kafka broker issue), the consumer will terminate entirely. Consider wrapping these operations in a try-except to log and continue processing subsequent batches.🐛 Proposed fix
if events: - await self._store_batch(events) - await consumer.commit() + try: + await self._store_batch(events) + await consumer.commit() + except Exception as e: + self.logger.error(f"Failed to store/commit batch: {e}", exc_info=True) + # Continue processing - Kafka will redeliver uncommitted messagesbackend/app/services/pod_monitor/monitor.py (1)
277-285: Max reconnect RuntimeError terminates monitor ungracefully.This issue was previously flagged. When
_backoff()raisesRuntimeErrorafter max attempts, it propagates through_watch_loop()torun(), which only catchesCancelledError. The monitor stops without cleanup or recovery path. Consider either catching this inrun()for graceful degradation or using cancellation-based exit.backend/app/services/saga/saga_logic.py (1)
299-321: Keep the timeout loop resilient to transient errors.As written, any unexpected exception will terminate the loop permanently. This matches the earlier concern and still applies here.
🐛 Proposed fix: log and continue on unexpected errors
async def check_timeouts_loop(self) -> None: """Check for saga timeouts (runs until cancelled).""" try: while True: - # Check every 30 seconds - await asyncio.sleep(30) - - cutoff_time = datetime.now(UTC) - timedelta(seconds=self.config.timeout_seconds) - - timed_out = await self._repo.find_timed_out_sagas(cutoff_time) - - for instance in timed_out: - self.logger.warning(f"Saga {instance.saga_id} timed out") - - instance.state = SagaState.TIMEOUT - instance.error_message = f"Saga timed out after {self.config.timeout_seconds} seconds" - instance.completed_at = datetime.now(UTC) - - await self._save_saga(instance) - self._running_instances.pop(instance.saga_id, None) + try: + # Check every 30 seconds + await asyncio.sleep(30) + + cutoff_time = datetime.now(UTC) - timedelta(seconds=self.config.timeout_seconds) + + timed_out = await self._repo.find_timed_out_sagas(cutoff_time) + + for instance in timed_out: + self.logger.warning(f"Saga {instance.saga_id} timed out") + + instance.state = SagaState.TIMEOUT + instance.error_message = f"Saga timed out after {self.config.timeout_seconds} seconds" + instance.completed_at = datetime.now(UTC) + + await self._save_saga(instance) + self._running_instances.pop(instance.saga_id, None) + except Exception as e: + self.logger.error(f"Error in timeout checker: {e}", exc_info=True) + await asyncio.sleep(5) except asyncio.CancelledError: self.logger.info("Timeout checker cancelled")backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
93-99: Assertion doesn’t validate handle_event side effects.
register_sagaalready populates_sagas, so the current assertion can pass even ifhandle_eventdoes nothing. Consider asserting on a side effect (e.g., repo save or saga state change) after the handler runs.
🧹 Nitpick comments (8)
backend/app/services/k8s_worker/worker_logic.py (4)
56-56: Redundant fallback for non-optional parameter.The
configparameter is typed asK8sWorkerConfig(non-optional), yet line 56 has a fallbackconfig or K8sWorkerConfig(). Either make the parameterOptional[K8sWorkerConfig]or remove the fallback to match the signature.- self.config = config or K8sWorkerConfig() + self.config = config
154-154: Fire-and-forget task lacks exception propagation.The task created here is not stored, so any unhandled exception will be silently lost (logged only). While the current implementation handles exceptions internally in
_create_pod_for_execution, consider storing task references for better observability and graceful shutdown handling.
305-308: Defensive null checks on required dependency.
self.produceris assigned from a required constructor parameter (non-optionalUnifiedProducer). These null checks suggest the producer might be None, but that would indicate a constructor bug. Either remove the checks or type the field asOptional[UnifiedProducer]if nullability is intentional.Also applies to: 319-322, 336-339
356-356: Undocumented hardcoded delay.The 5-second sleep before DaemonSet operations appears arbitrary. Consider documenting the reason (e.g., waiting for API server readiness) or making it configurable via
K8sWorkerConfig.backend/tests/e2e/test_k8s_worker_create_pod.py (1)
83-91: Resource cleanup may be skipped on test failure.If the assertions on lines 85-87 fail, the cleanup on lines 90-91 won't execute, potentially leaving orphaned Kubernetes resources. Consider wrapping the verification and cleanup in a try/finally block.
🧹 Suggested fix for reliable cleanup
# Verify resources exist - got_cm = logic.v1.read_namespaced_config_map(name=f"script-{exec_id}", namespace=ns) - assert got_cm is not None - got_pod = logic.v1.read_namespaced_pod(name=f"executor-{exec_id}", namespace=ns) - assert got_pod is not None - - # Cleanup - logic.v1.delete_namespaced_pod(name=f"executor-{exec_id}", namespace=ns) - logic.v1.delete_namespaced_config_map(name=f"script-{exec_id}", namespace=ns) + try: + got_cm = logic.v1.read_namespaced_config_map(name=f"script-{exec_id}", namespace=ns) + assert got_cm is not None + got_pod = logic.v1.read_namespaced_pod(name=f"executor-{exec_id}", namespace=ns) + assert got_pod is not None + finally: + # Cleanup regardless of assertion outcome + logic.v1.delete_namespaced_pod(name=f"executor-{exec_id}", namespace=ns) + logic.v1.delete_namespaced_config_map(name=f"script-{exec_id}", namespace=ns)backend/app/services/sse/event_router.py (1)
42-46: Avoid fullmodel_dump()just to readexecution_id.
model_dump()allocates a full dict (potentially large payloads). A direct attribute lookup avoids the extra work.Suggested refactor
- data = event.model_dump() - execution_id = data.get("execution_id") + execution_id = getattr(event, "execution_id", None)backend/tests/integration/services/sse/test_partitioned_event_router.py (1)
39-41: Consider closing the subscription after the test.The test opens a Redis subscription but doesn't explicitly close it. While the fixture cleanup may handle the Redis client, explicitly closing the subscription would be cleaner.
♻️ Suggested cleanup
# Await the subscription - verify event arrived in Redis msg = await asyncio.wait_for(subscription.get(RedisSSEMessage), timeout=2.0) assert msg is not None assert str(msg.event_type) == str(ev.event_type) + + await subscription.close()backend/app/services/saga/saga_service.py (1)
162-178: Consider renaming method to reflect new implementation.The method
get_saga_status_from_orchestratorreferences "orchestrator" which no longer exists. Consider renaming to something likeget_saga_status_liveorget_live_saga_statusto better reflect it fetches fromSagaLogic's in-memory state first.
| # Phase 3: Start run() methods as background tasks | ||
| # Note: Services are already initialized by their DI providers (which handle __aenter__/__aexit__) | ||
|
|
||
| async def run_sse_consumers() -> None: | ||
| """Run SSE consumers using TaskGroup.""" | ||
| async with asyncio.TaskGroup() as tg: | ||
| for consumer in sse_consumers: | ||
| tg.create_task(consumer.run()) | ||
|
|
||
| tasks = [ | ||
| asyncio.create_task(run_sse_consumers(), name="sse_consumers"), | ||
| asyncio.create_task(event_store_consumer.run(), name="event_store_consumer"), | ||
| asyncio.create_task(notification_service.run(), name="notification_service"), | ||
| ] | ||
| logger.info(f"Background services started ({len(sse_consumers)} SSE consumers)") | ||
|
|
||
| try: | ||
| yield | ||
| finally: | ||
| # Cancel all background tasks on shutdown | ||
| logger.info("Shutting down background services...") | ||
| for task in tasks: | ||
| task.cancel() | ||
|
|
||
| # Wait for tasks to finish cancellation | ||
| await asyncio.gather(*tasks, return_exceptions=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n backend/app/core/dishka_lifespan.py | head -150Repository: HardMax71/Integr8sCode
Length of output: 188
🌐 Web query:
asyncio.create_task() exception handling unwaited tasks Python
💡 Result:
Short answer: an exception in a Task created with asyncio.create_task() is saved on the Task and will be logged as "Task exception was never retrieved" if nobody ever awaits the Task or calls result()/exception(). Fixes: await the task, use a TaskGroup (Python 3.11+) or keep a strong reference and handle its result in a callback, or install a loop exception handler. Examples and links:
-
Await or gather tasks (propagates/raises exceptions):
- await task
- await asyncio.gather(...)
-
Use asyncio.TaskGroup (Python 3.11+) to automatically wait and raise grouped exceptions:
async with asyncio.TaskGroup() as tg:
tg.create_task(coro()) # all tasks awaited when context exits. [1] -
Keep references + inspect exceptions (for "fire-and-forget"):
background = set()
task = asyncio.create_task(coro())
background.add(task)
task.add_done_callback(lambda t: background.discard(t) or log_exception(t))
def log_exception(t):
if t.cancelled(): return
exc = t.exception()
if exc is not None:
logger.error("task failed", exc_info=exc) # or handle it. [2][3] -
Global handling: loop.set_exception_handler(...) can be used for loop-level logging/control.
Sources: asyncio docs on Task/TaskGroup and "detect never-retrieved exceptions". [1][2][3]
References
[1] Python docs — Coroutines and Tasks / TaskGroup (asyncio.TaskGroup). (docs.python.org)
[2] Python docs — Detect never-retrieved exceptions in asyncio (explains "Task exception was never retrieved"). (docs.python.org)
[3] Practical guidance: keeping strong references and add_done_callback examples. (runebook.dev)
If you want, tell me your Python version and I’ll give the exact snippet tailored to it.
Citations:
- 1: https://docs.python.org/3.11/library/asyncio-task.html?utm_source=openai
- 2: https://docs.python.org/3.9/library/asyncio-dev.html?utm_source=openai
- 3: https://runebook.dev/en/docs/python/library/asyncio-task/asyncio.Task?utm_source=openai
🌐 Web query:
Python asyncio TaskGroup vs create_task exception handling
💡 Result:
Short answer
- asyncio.TaskGroup.create_task(...) creates tasks that belong to the TaskGroup; if any member task raises (other than CancelledError) the group cancels the remaining member tasks, waits for them, and then raises an ExceptionGroup (or re-raises KeyboardInterrupt/SystemExit). [1]
- asyncio.create_task(...) (or loop.create_task) creates a standalone Task not owned by a TaskGroup; its exceptions do not affect a surrounding TaskGroup and it is not cancelled when the group is torn down. Use standalone tasks if you want a task to outlive or be isolated from the group. [1][2]
Key differences and practical notes
- Exception aggregation: TaskGroup collects non-cancellation exceptions from members and raises them as an ExceptionGroup when the group exits; plain create_task does not provide that aggregation. [1]
- Cancellation semantics: When a TaskGroup member fails, the group cancels the remaining member tasks (and may cancel the task running the async-with body). Standalone tasks created with asyncio.create_task are not cancelled by the TaskGroup. [1][2]
- Edge cases/bugs: There have been reported corner cases (e.g., tasks added after abort) and fixes/discussions in CPython issue tracker — be careful if you dynamically add tasks after a failure. [3]
If you want, I can show a minimal code example demonstrating both behaviors.
🏁 Script executed:
# Try alternative repository access
git clone https://github.com/HardMax71/Integr8sCode.git /tmp/repo 2>&1 | head -20Repository: HardMax71/Integr8sCode
Length of output: 155
🏁 Script executed:
cd /tmp/repo && cat -n backend/app/core/dishka_lifespan.py | sed -n '90,140p'Repository: HardMax71/Integr8sCode
Length of output: 1066
🏁 Script executed:
cd /tmp/repo && cat -n backend/app/core/dishka_lifespan.py | sed -n '1,150p'Repository: HardMax71/Integr8sCode
Length of output: 4966
🏁 Script executed:
cd /tmp/repo && cat -n backend/app/core/dishka_lifespan.pyRepository: HardMax71/Integr8sCode
Length of output: 4966
🏁 Script executed:
cd /tmp/repo && git log --oneline backend/app/core/dishka_lifespan.py | head -10Repository: HardMax71/Integr8sCode
Length of output: 608
🏁 Script executed:
cd /tmp/repo && git show HEAD:backend/app/core/dishka_lifespan.py | wc -lRepository: HardMax71/Integr8sCode
Length of output: 69
🏁 Script executed:
cd /tmp/repo && git show 577a184:backend/app/core/dishka_lifespan.py 2>/dev/null | tail -60Repository: HardMax71/Integr8sCode
Length of output: 2666
🏁 Script executed:
cd /tmp/repo && git log --all --oneline --grep="background\|task\|supervision" | head -20Repository: HardMax71/Integr8sCode
Length of output: 163
🏁 Script executed:
cd /tmp/repo && git diff 577a184^..577a184 -- backend/app/core/dishka_lifespan.py | head -150Repository: HardMax71/Integr8sCode
Length of output: 2243
🏁 Script executed:
cd /tmp/repo && git show 0a65186 -- backend/app/core/dishka_lifespan.py | head -200Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
cd /tmp/repo && git log --oneline | head -20Repository: HardMax71/Integr8sCode
Length of output: 921
🏁 Script executed:
cd /tmp/repo && git log --oneline --all | grep -i "asyncio\|task"Repository: HardMax71/Integr8sCode
Length of output: 163
🏁 Script executed:
cd /tmp/repo && git show d6fe5ea -- backend/app/core/dishka_lifespan.pyRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
cd /tmp/repo && git log --all --oneline | grep -i "task fix"Repository: HardMax71/Integr8sCode
Length of output: 163
🏁 Script executed:
cd /tmp/repo && git show d6fe5ea:backend/app/core/dishka_lifespan.py 2>&1 | catRepository: HardMax71/Integr8sCode
Length of output: 152
🏁 Script executed:
cd /tmp/repo && git branch -aRepository: HardMax71/Integr8sCode
Length of output: 2486
🏁 Script executed:
cd /tmp/repo && git log --all --oneline --decorate -- backend/app/core/dishka_lifespan.py | head -20Repository: HardMax71/Integr8sCode
Length of output: 1088
🏁 Script executed:
cd /tmp/repo && git show ff45a52:backend/app/core/dishka_lifespan.py 2>&1 | catRepository: HardMax71/Integr8sCode
Length of output: 4984
Add runtime supervision for background tasks.
Tasks are created fire-and-forget and only awaited on shutdown with return_exceptions=True; a runtime exception in any consumer/service will go silent until exit. Use a done callback to log failures and cancel remaining tasks immediately.
Suggested supervision pattern
tasks = [
asyncio.create_task(run_sse_consumers(), name="sse_consumers"),
asyncio.create_task(event_store_consumer.run(), name="event_store_consumer"),
asyncio.create_task(notification_service.run(), name="notification_service"),
]
logger.info(f"Background services started ({len(sse_consumers)} SSE consumers)")
+
+ def _handle_task_done(task: asyncio.Task) -> None:
+ if task.cancelled():
+ return
+ exc = task.exception()
+ if exc:
+ logger.error("Background task failed", exc_info=exc)
+ for t in tasks:
+ if t is not task:
+ t.cancel()
+
+ for task in tasks:
+ task.add_done_callback(_handle_task_done)🤖 Prompt for AI Agents
In `@backend/app/core/dishka_lifespan.py` around lines 102 - 127, Background tasks
created in the tasks list (run_sse_consumers, event_store_consumer.run,
notification_service.run) are fire-and-forget and only awaited at shutdown, so
runtime exceptions can be lost; update the creation of each asyncio.create_task
to attach an add_done_callback that checks task.exception(), logs the error via
logger, and on first non-None exception cancels all other tasks in the tasks
collection (safely skipping already done/cancelled tasks) to implement immediate
supervision and fail-fast behavior, ensuring you still await
asyncio.gather(*tasks, return_exceptions=True) in the finally block to finish
cleanup.
| @provide(scope=Scope.REQUEST) | ||
| def get_sse_shutdown_manager( | ||
| self, logger: logging.Logger, connection_metrics: ConnectionMetrics | ||
| ) -> SSEShutdownManager: | ||
| return create_sse_shutdown_manager(logger=logger, connection_metrics=connection_metrics) | ||
| def get_sse_connection_registry( | ||
| self, | ||
| logger: logging.Logger, | ||
| connection_metrics: ConnectionMetrics, | ||
| ) -> SSEConnectionRegistry: | ||
| return SSEConnectionRegistry( | ||
| logger=logger, | ||
| connection_metrics=connection_metrics, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Find SSEConnectionRegistry class definition
fd -t f "\.py$" | xargs grep -l "class SSEConnectionRegistry"Repository: HardMax71/Integr8sCode
Length of output: 188
🏁 Script executed:
# Search for get_connection_count usage
rg "get_connection_count" --type py -A 3 -B 3Repository: HardMax71/Integr8sCode
Length of output: 6270
🏁 Script executed:
# Search for SSEConnectionRegistry usage
rg "SSEConnectionRegistry" --type py -A 2 -B 2Repository: HardMax71/Integr8sCode
Length of output: 6034
🏁 Script executed:
# Check how SSEService is provided
rg "@provide.*SSEService" --type py -A 5 -B 2Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check for Scope definitions and their meanings
rg "class Scope" --type py -A 10Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Look at sse_connection_registry.py to understand persistence expectations
cat -n backend/app/services/sse/sse_connection_registry.py | head -50Repository: HardMax71/Integr8sCode
Length of output: 2296
🏁 Script executed:
# Search for SSEService provider definition more broadly
rg "get_sse_service|SSEService" backend/app/core/providers.py -A 5 -B 5Repository: HardMax71/Integr8sCode
Length of output: 1278
🏁 Script executed:
# Look at the full providers.py around the SSEService definition
rg "def get_" backend/app/core/providers.py | grep -i sseRepository: HardMax71/Integr8sCode
Length of output: 355
🏁 Script executed:
# Check SSEService class to understand if it needs app-wide state
cat -n backend/app/services/sse/sse_service.py | head -80Repository: HardMax71/Integr8sCode
Length of output: 3831
🏁 Script executed:
# Check if there's any caching or special handling for REQUEST scope singletons
rg "Scope\." backend/app/core/providers.py -B 2 -A 1 | head -40Repository: HardMax71/Integr8sCode
Length of output: 706
🏁 Script executed:
# Look for health status endpoint that uses get_connection_count
rg "get_health_status" --type py -A 10 -B 5Repository: HardMax71/Integr8sCode
Length of output: 3730
🏁 Script executed:
# Search for any test that shows REQUEST vs APP scope behavior
rg "Scope\.(REQUEST|APP)" backend/tests --type py | head -20Repository: HardMax71/Integr8sCode
Length of output: 48
SSEConnectionRegistry scope must be Scope.APP, not Scope.REQUEST.
With Scope.REQUEST, a new registry instance is created per HTTP request, resulting in an empty _active_connections dictionary on every health status check. This causes get_connection_count() to always return 0. The registry is designed to track active connections globally across the application—registrations from one request's SSE stream must be visible when the health endpoint is called from a different request. The internal asyncio.Lock() and the persistent state expectations confirm the registry is intended as application-wide infrastructure.
🤖 Prompt for AI Agents
In `@backend/app/core/providers.py` around lines 463 - 472, The provider
get_sse_connection_registry currently creates an SSEConnectionRegistry per
request (provide(scope=Scope.REQUEST)), which resets _active_connections and
causes get_connection_count() to always return 0; change the provider to use
application scope (provide(scope=Scope.APP)) so SSEConnectionRegistry is a
singleton for the app, preserving the asyncio.Lock() and shared
_active_connections across requests and allowing registrations from SSE streams
to be visible to the health endpoint.
| except Exception as e: | ||
| self.logger.error(f"Error processing message: {e}", exc_info=True) | ||
| self._event_metrics.record_kafka_consumption_error( | ||
| msg.topic, self._config.group_id, type(e).__name__ | ||
| ) | ||
| if self._error_callback: | ||
| await self._error_callback(e, event) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix: event may be unbound when error callback is invoked.
If deserialize_event() throws an exception (line 80), the event variable is never assigned. The error callback at line 110 will then raise a NameError.
🐛 Proposed fix
+ event: DomainEvent | None = None
try:
event = await self._schema_registry.deserialize_event(msg.value, msg.topic)
headers = {k: v.decode() if isinstance(v, bytes) else v for k, v in (msg.headers or [])}
ctx = extract_trace_context(headers)
with tracer.start_as_current_span(
"kafka.consume",
context=ctx,
kind=SpanKind.CONSUMER,
attributes={
EventAttributes.KAFKA_TOPIC: msg.topic,
EventAttributes.KAFKA_PARTITION: msg.partition,
EventAttributes.KAFKA_OFFSET: msg.offset,
EventAttributes.EVENT_TYPE: event.event_type,
EventAttributes.EVENT_ID: event.event_id,
},
):
await self._dispatcher.dispatch(event)
if not self._config.enable_auto_commit:
await self._consumer.commit()
self._event_metrics.record_kafka_message_consumed(msg.topic, self._config.group_id)
except Exception as e:
self.logger.error(f"Error processing message: {e}", exc_info=True)
self._event_metrics.record_kafka_consumption_error(
msg.topic, self._config.group_id, type(e).__name__
)
- if self._error_callback:
+ if self._error_callback and event is not None:
await self._error_callback(e, event)🤖 Prompt for AI Agents
In `@backend/app/events/core/consumer.py` around lines 104 - 110, The variable
event can be unbound if deserialize_event() raises, causing a NameError when
awaiting self._error_callback(e, event); fix by initializing event = None before
the try/except where deserialize_event() is called (so the callback always
receives a defined value), and keep the existing error logging and metrics calls
(e.g., the except block that calls self.logger.error,
self._event_metrics.record_kafka_consumption_error and awaits
self._error_callback).
| def register_idempotent_handler( | ||
| self, | ||
| event_type: str, | ||
| event_type: EventType, | ||
| handler: Callable[[DomainEvent], Awaitable[None]], | ||
| key_strategy: str | None = None, | ||
| custom_key_func: Callable[[DomainEvent], str] | None = None, | ||
| fields: Set[str] | None = None, | ||
| fields: set[str] | None = None, | ||
| ttl_seconds: int | None = None, | ||
| cache_result: bool = True, | ||
| on_duplicate: Callable[[DomainEvent, Any], Any] | None = None, | ||
| ) -> None: | ||
| """Subscribe an idempotent handler for specific event type""" | ||
| # Create the idempotent handler wrapper | ||
| idempotent_wrapper = IdempotentEventHandler( | ||
| """Register an idempotent handler for an event type.""" | ||
| wrapped = IdempotentEventHandler( | ||
| handler=handler, | ||
| idempotency_manager=self.idempotency_manager, | ||
| logger=self.logger, | ||
| key_strategy=key_strategy or self.default_key_strategy, | ||
| idempotency_manager=self._idempotency_manager, | ||
| logger=self._logger, | ||
| key_strategy=key_strategy or self._default_key_strategy, | ||
| custom_key_func=custom_key_func, | ||
| fields=fields, | ||
| ttl_seconds=ttl_seconds or self.default_ttl_seconds, | ||
| cache_result=cache_result, | ||
| ttl_seconds=ttl_seconds or self._default_ttl_seconds, | ||
| on_duplicate=on_duplicate, | ||
| ) | ||
|
|
||
| # Create an async handler that processes the message | ||
| async def async_handler(message: Any) -> Any: | ||
| self.logger.info(f"IDEMPOTENT HANDLER CALLED for {event_type}") | ||
|
|
||
| # Extract event from confluent-kafka Message | ||
| if not hasattr(message, "value"): | ||
| self.logger.error(f"Received non-Message object for {event_type}: {type(message)}") | ||
| return None | ||
|
|
||
| # Debug log to check message details | ||
| self.logger.info( | ||
| f"Handler for {event_type} - Message type: {type(message)}, " | ||
| f"has key: {hasattr(message, 'key')}, " | ||
| f"has topic: {hasattr(message, 'topic')}" | ||
| ) | ||
|
|
||
| raw_value = message.value | ||
|
|
||
| # Debug the raw value | ||
| self.logger.info(f"Raw value extracted: {raw_value[:100] if raw_value else 'None or empty'}") | ||
|
|
||
| # Handle tombstone messages (null value for log compaction) | ||
| if raw_value is None: | ||
| self.logger.warning(f"Received empty message for {event_type} - tombstone or consumed value") | ||
| return None | ||
|
|
||
| # Handle empty messages | ||
| if not raw_value: | ||
| self.logger.warning(f"Received empty message for {event_type} - empty bytes") | ||
| return None | ||
|
|
||
| try: | ||
| # Deserialize using schema registry if available | ||
| event = await self.consumer._schema_registry.deserialize_event(raw_value, message.topic) | ||
| if not event: | ||
| self.logger.error(f"Failed to deserialize event for {event_type}") | ||
| return None | ||
|
|
||
| # Call the idempotent wrapper directly in async context | ||
| await idempotent_wrapper(event) | ||
|
|
||
| self.logger.debug(f"Successfully processed {event_type} event: {event.event_id}") | ||
| return None | ||
| except Exception as e: | ||
| self.logger.error(f"Failed to process message for {event_type}: {e}", exc_info=True) | ||
| raise | ||
|
|
||
| # Register with the dispatcher if available | ||
| if self.dispatcher: | ||
| # Create wrapper for EventDispatcher | ||
| async def dispatch_handler(event: DomainEvent) -> None: | ||
| await idempotent_wrapper(event) | ||
|
|
||
| self.dispatcher.register(EventType(event_type))(dispatch_handler) | ||
| else: | ||
| # Fallback to direct consumer registration if no dispatcher | ||
| self.logger.error(f"No EventDispatcher available for registering idempotent handler for {event_type}") | ||
|
|
||
| async def start(self, topics: list[KafkaTopic]) -> None: | ||
| """Start the consumer with idempotency""" | ||
| self.logger.info(f"IdempotentConsumerWrapper.start called with topics: {topics}") | ||
| # Make handlers idempotent before starting | ||
| self.make_handlers_idempotent() | ||
|
|
||
| # Start the consumer with required topics parameter | ||
| await self.consumer.start(topics) | ||
| self.logger.info("IdempotentConsumerWrapper started successfully") | ||
|
|
||
| async def stop(self) -> None: | ||
| """Stop the consumer""" | ||
| await self.consumer.stop() | ||
|
|
||
| # Delegate other methods to the wrapped consumer | ||
| def __getattr__(self, name: str) -> Any: | ||
| return getattr(self.consumer, name) | ||
| self._dispatcher.register(event_type)(wrapped) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix handler registration to avoid AttributeError on __name__.
EventDispatcher.register logs handler.__name__, but IdempotentEventHandler instances don’t define it, so register_idempotent_handler can crash at registration time.
🐛 Proposed fix: set a safe __name__ before registration
wrapped = IdempotentEventHandler(
handler=handler,
idempotency_manager=self._idempotency_manager,
logger=self._logger,
key_strategy=key_strategy or self._default_key_strategy,
custom_key_func=custom_key_func,
fields=fields,
ttl_seconds=ttl_seconds or self._default_ttl_seconds,
on_duplicate=on_duplicate,
)
+ wrapped.__name__ = getattr(handler, "__name__", wrapped.__class__.__name__)
self._dispatcher.register(event_type)(wrapped)🤖 Prompt for AI Agents
In `@backend/app/services/idempotency/middleware.py` around lines 123 - 144,
register_idempotent_handler wraps a handler in IdempotentEventHandler then
passes it to EventDispatcher.register which expects handler.__name__; set the
wrapper's __name__ (or use functools.update_wrapper/functools.wraps) so the
wrapped instance exposes the original handler's __name__ to avoid
AttributeError. In practice, after creating wrapped in
register_idempotent_handler, assign wrapped.__name__ = getattr(handler,
"__name__", type(handler).__name__) (or call update_wrapper(wrapped, handler))
before calling self._dispatcher.register(event_type)(wrapped).
| async def register_connection(self, execution_id: str, connection_id: str) -> None: | ||
| """Register a new SSE connection.""" | ||
| async with self._lock: | ||
| if execution_id not in self._active_connections: | ||
| self._active_connections[execution_id] = set() | ||
|
|
||
| self._active_connections[execution_id].add(connection_id) | ||
| self.logger.debug("Registered SSE connection", extra={"connection_id": connection_id}) | ||
| self.metrics.increment_sse_connections("executions") | ||
|
|
||
| async def unregister_connection(self, execution_id: str, connection_id: str) -> None: | ||
| """Unregister an SSE connection.""" | ||
| async with self._lock: | ||
| if execution_id in self._active_connections: | ||
| self._active_connections[execution_id].discard(connection_id) | ||
| if not self._active_connections[execution_id]: | ||
| del self._active_connections[execution_id] | ||
|
|
||
| self.logger.debug("Unregistered SSE connection", extra={"connection_id": connection_id}) | ||
| self.metrics.decrement_sse_connections("executions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent metrics drift on duplicate register/unregister.
Line 36 and Line 49 update metrics even when the connection was already present/absent, which can over/under-count (and potentially go negative). Consider only updating metrics when the set actually changes.
🛠️ Suggested fix
async def register_connection(self, execution_id: str, connection_id: str) -> None:
"""Register a new SSE connection."""
async with self._lock:
- if execution_id not in self._active_connections:
- self._active_connections[execution_id] = set()
-
- self._active_connections[execution_id].add(connection_id)
- self.logger.debug("Registered SSE connection", extra={"connection_id": connection_id})
- self.metrics.increment_sse_connections("executions")
+ conns = self._active_connections.setdefault(execution_id, set())
+ if connection_id not in conns:
+ conns.add(connection_id)
+ self.metrics.increment_sse_connections("executions")
+ self.logger.debug("Registered SSE connection", extra={"connection_id": connection_id})
async def unregister_connection(self, execution_id: str, connection_id: str) -> None:
"""Unregister an SSE connection."""
async with self._lock:
- if execution_id in self._active_connections:
- self._active_connections[execution_id].discard(connection_id)
- if not self._active_connections[execution_id]:
- del self._active_connections[execution_id]
-
- self.logger.debug("Unregistered SSE connection", extra={"connection_id": connection_id})
- self.metrics.decrement_sse_connections("executions")
+ conns = self._active_connections.get(execution_id)
+ if conns and connection_id in conns:
+ conns.remove(connection_id)
+ if not conns:
+ del self._active_connections[execution_id]
+ self.metrics.decrement_sse_connections("executions")
+ self.logger.debug("Unregistered SSE connection", extra={"connection_id": connection_id})🤖 Prompt for AI Agents
In `@backend/app/services/sse/sse_connection_registry.py` around lines 30 - 49,
The register_connection and unregister_connection currently call
metrics.increment_sse_connections / decrement_sse_connections unconditionally,
causing drift on duplicate adds/removes; modify register_connection to only call
metrics.increment_sse_connections("executions") (and log) if connection_id was
not already present in self._active_connections[execution_id], and modify
unregister_connection to only call
metrics.decrement_sse_connections("executions") (and log) if connection_id was
actually removed (i.e., it existed before discard and the set changed or length
decreased); keep the async with self._lock around these checks and mutations and
use the same symbols (_active_connections, register_connection,
unregister_connection, metrics.increment_sse_connections,
metrics.decrement_sse_connections) to locate the changes.
| async def run_with_signal() -> None: | ||
| consumer_started.set() | ||
| await consumer.run() | ||
|
|
||
| task = asyncio.create_task(run_with_signal()) | ||
|
|
||
| try: | ||
| st = c.get_status() | ||
| assert st.state == "running" and st.is_running is True | ||
| # Exercise seek functions; don't force specific partition offsets | ||
| await c.seek_to_beginning() | ||
| await c.seek_to_end() | ||
| # No need to sleep; just ensure we can call seek APIs while running | ||
| # Wait for consumer to start | ||
| await asyncio.wait_for(consumer_started.wait(), timeout=5.0) | ||
|
|
||
| # Exercise seek functions while consumer is running | ||
| await consumer.seek_to_beginning() | ||
| await consumer.seek_to_end() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test may have a race: consumer_started signals before consumer is fully running.
consumer_started.set() fires before consumer.run() is called, so the seek operations at lines 51-52 may execute before the internal AIOKafkaConsumer is started. The seek_to_beginning() and seek_to_end() methods guard with if self._consumer, but self._consumer is only assigned inside run() after construction.
Consider moving the event signal after consumer initialization:
💡 Suggested improvement
async def run_with_signal() -> None:
- consumer_started.set()
- await consumer.run()
+ # Small delay to ensure consumer.run() has started and set self._consumer
+ await asyncio.sleep(0.1)
+ consumer_started.set()
+ await consumer.run()Alternatively, add a short delay before exercising seek methods, or check that consumer._consumer is not None before proceeding.
🤖 Prompt for AI Agents
In `@backend/tests/integration/events/test_consumer_lifecycle.py` around lines 40
- 52, The test signals readiness too early: move the consumer_started.set() so
it runs after the internal consumer is initialized inside run() (or after
consumer.run() completes its startup logic) to avoid calling
consumer.seek_to_beginning()/consumer.seek_to_end() before consumer._consumer is
assigned; locate the run_with_signal coroutine and ensure the readiness event is
set only once consumer.run() has created/started the AIOKafkaConsumer (or add an
explicit check/wait for consumer._consumer to be non-None before performing
seeks).
backend/workers/run_coordinator.py
Outdated
| async def run_coordinator_tasks() -> None: | ||
| """Run consumer and scheduling loop using TaskGroup.""" | ||
| async with asyncio.TaskGroup() as tg: | ||
| tg.create_task(consumer.run()) | ||
| tg.create_task(logic.scheduling_loop()) | ||
|
|
||
| try: | ||
| # Wait for shutdown signal or service to stop | ||
| while coordinator.is_running and not shutdown_event.is_set(): | ||
| await asyncio.sleep(60) | ||
| status = await coordinator.get_status() | ||
| logger.info(f"Coordinator status: {status}") | ||
| # Run coordinator until shutdown signal | ||
| run_task = asyncio.create_task(run_coordinator_tasks()) | ||
| shutdown_task = asyncio.create_task(shutdown_event.wait()) | ||
|
|
||
| done, pending = await asyncio.wait( | ||
| [run_task, shutdown_task], | ||
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
|
|
||
| # Cancel remaining tasks | ||
| for task in pending: | ||
| task.cancel() | ||
| try: | ||
| await task | ||
| except asyncio.CancelledError: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n backend/workers/run_coordinator.py | head -80Repository: HardMax71/Integr8sCode
Length of output: 188
🌐 Web query:
asyncio.wait exception handling task failures not propagated
💡 Result:
Short answer
asyncio.wait does not re-raise exceptions from tasks. It just returns (done, pending); exceptions raised inside tasks are stored on the Task object and only propagated if you await the task or call task.result()/task.exception(). Use await on the tasks (or gather), or check task.exception() / use return_when=FIRST_EXCEPTION to detect failures. [1][2][3]
Why this happens (concise)
- create_task schedules a Task that runs independently; an unhandled exception becomes the Task’s exception state — it is not automatically re-raised in the creator. The loop will warn if the exception is never retrieved. [1][2]
- asyncio.wait only waits for completion (or first exception when you use FIRST_EXCEPTION) and returns sets; it does not itself raise the exception. You must inspect/await the returned Task(s) to get the exception. [1][2]
What to do (options)
- Await the tasks (propagates exception): results = await asyncio.gather(*tasks) (gather will re-raise the first exception by default). [1][4]
- Use await individually or call task.result()/task.exception() on tasks returned by asyncio.wait(done, ...). Example:
- done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
- for t in done:
exc = t.exception()
if exc is not None: handle/raise(exc)
- Run a monitor loop that uses return_when=FIRST_EXCEPTION and reacts immediately (reschedule/cancel/raise). [5]
References
[1] Python docs — asyncio.wait / tasks & futures.
[2] StackOverflow — explanation: exceptions stay in Task until retrieved.
[3] Python issue tracker — unhandled-task exceptions are delivered to exception handler; tasks don’t implicitly propagate to parent.
[4] asyncio.gather behavior (docs / tutorials).
[5] StackOverflow example of an exception-aware scheduler using asyncio.wait(FIRST_EXCEPTION).
If you want, I can paste a minimal code example showing each approach.
Surface task failures after asyncio.wait.
If run_coordinator_tasks() fails, the exception is stored in the task but never retrieved because the code only processes pending tasks. The done set containing the failed task is ignored, so the worker exits cleanly despite a crash.
🐛 Proposed fix: propagate completed task exceptions
done, pending = await asyncio.wait(
[run_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
)
# Cancel remaining tasks
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
+
+ # Surface failures from completed tasks
+ for task in done:
+ if exc := task.exception():
+ logger.error("Coordinator task failed: %s", exc)
+ raise exc🤖 Prompt for AI Agents
In `@backend/workers/run_coordinator.py` around lines 42 - 64, The code awaits
asyncio.wait([run_task, shutdown_task]) but only inspects pending tasks, so
exceptions from completed tasks (e.g., run_task from run_coordinator_tasks())
are never re-raised; update the logic after asyncio.wait to iterate over the
done set and call task.result() (or check task.exception()) for each done task
and re-raise or surface any exceptions (at least for run_task) before cancelling
pending tasks; locate run_coordinator_tasks, run_task, shutdown_task, and the
asyncio.wait call to implement this change so failures in run_coordinator_tasks
propagate correctly.
| logger.info("ResultProcessor consumer initialized, starting run...") | ||
|
|
||
| logger.info("ResultProcessor started and running") | ||
| try: | ||
| # Run consumer until shutdown signal | ||
| run_task = asyncio.create_task(consumer.run()) | ||
| shutdown_task = asyncio.create_task(shutdown_event.wait()) | ||
|
|
||
| done, pending = await asyncio.wait( | ||
| [run_task, shutdown_task], | ||
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
|
|
||
| # Wait for shutdown signal or service to stop | ||
| while processor._state == ProcessingState.PROCESSING and not shutdown_event.is_set(): | ||
| await asyncio.sleep(60) | ||
| status = await processor.get_status() | ||
| logger.info(f"ResultProcessor status: {status}") | ||
| # Cancel remaining tasks | ||
| for task in pending: | ||
| task.cancel() | ||
| try: | ||
| await task | ||
| except asyncio.CancelledError: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n backend/workers/run_result_processor.py | head -80Repository: HardMax71/Integr8sCode
Length of output: 3310
🏁 Script executed:
cat -n backend/workers/run_result_processor.py | tail -30Repository: HardMax71/Integr8sCode
Length of output: 1203
🏁 Script executed:
python3 << 'EOF'
import asyncio
# Test 1: What happens if a task in done raises an exception?
async def test_exception_handling():
async def failing_task():
raise ValueError("Task failed")
async def wait_task():
await asyncio.sleep(0.1)
task1 = asyncio.create_task(failing_task())
task2 = asyncio.create_task(wait_task())
done, pending = await asyncio.wait(
[task1, task2],
return_when=asyncio.FIRST_COMPLETED,
)
print("Done tasks:", done)
print("Pending tasks:", pending)
print("task1 in done:", task1 in done)
print("task1 cancelled:", task1.cancelled())
try:
exc = task1.exception()
print("Exception from task1:", exc)
except asyncio.CancelledError:
print("Task1 was cancelled")
except Exception as e:
print("Error getting exception:", e)
# Now cancel task2
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Pending task was cancelled")
asyncio.run(test_exception_handling())
EOFRepository: HardMax71/Integr8sCode
Length of output: 152
Surface consumer.run() failures instead of swallowing them.
If run_task completes with an exception, it will be in the done set, but the exception is never retrieved. The worker exits normally without signaling failure. Check if run_task completed with an exception and either log it or re-raise before shutdown.
Suggested fix
done, pending = await asyncio.wait(
[run_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
)
+ if run_task in done and not run_task.cancelled():
+ exc = run_task.exception()
+ if exc:
+ logger.error("ResultProcessor consumer crashed", exc_info=exc)
+ raise exc
+
# Cancel remaining tasks
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| logger.info("ResultProcessor consumer initialized, starting run...") | |
| logger.info("ResultProcessor started and running") | |
| try: | |
| # Run consumer until shutdown signal | |
| run_task = asyncio.create_task(consumer.run()) | |
| shutdown_task = asyncio.create_task(shutdown_event.wait()) | |
| done, pending = await asyncio.wait( | |
| [run_task, shutdown_task], | |
| return_when=asyncio.FIRST_COMPLETED, | |
| ) | |
| # Wait for shutdown signal or service to stop | |
| while processor._state == ProcessingState.PROCESSING and not shutdown_event.is_set(): | |
| await asyncio.sleep(60) | |
| status = await processor.get_status() | |
| logger.info(f"ResultProcessor status: {status}") | |
| # Cancel remaining tasks | |
| for task in pending: | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass | |
| logger.info("ResultProcessor consumer initialized, starting run...") | |
| try: | |
| # Run consumer until shutdown signal | |
| run_task = asyncio.create_task(consumer.run()) | |
| shutdown_task = asyncio.create_task(shutdown_event.wait()) | |
| done, pending = await asyncio.wait( | |
| [run_task, shutdown_task], | |
| return_when=asyncio.FIRST_COMPLETED, | |
| ) | |
| if run_task in done and not run_task.cancelled(): | |
| exc = run_task.exception() | |
| if exc: | |
| logger.error("ResultProcessor consumer crashed", exc_info=exc) | |
| raise exc | |
| # Cancel remaining tasks | |
| for task in pending: | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass |
🤖 Prompt for AI Agents
In `@backend/workers/run_result_processor.py` around lines 35 - 53, The current
shutdown sequence swallows exceptions from consumer.run() because run_task's
exception is never retrieved; after the asyncio.wait completes, inspect the
completed tasks (e.g., check if run_task is in the done set), call
run_task.result() (or handle/run_task.exception()) to surface any exception, log
it with logger.error including context, and re-raise or set an exit error before
cancelling pending tasks (shutdown_task) so failures in consumer.run() are not
treated as normal shutdown; update the logic around run_task/run_task.result(),
shutdown_task, and shutdown_event accordingly.
| try: | ||
| # Wait for shutdown signal or service to stop | ||
| while orchestrator.is_running and not shutdown_event.is_set(): | ||
| await asyncio.sleep(1) | ||
| # Run orchestrator until shutdown signal | ||
| run_task = asyncio.create_task(run_orchestrator_tasks()) | ||
| shutdown_task = asyncio.create_task(shutdown_event.wait()) | ||
|
|
||
| done, pending = await asyncio.wait( | ||
| [run_task, shutdown_task], | ||
| return_when=asyncio.FIRST_COMPLETED, | ||
| ) | ||
|
|
||
| # Cancel remaining tasks | ||
| for task in pending: | ||
| task.cancel() | ||
| try: | ||
| await task | ||
| except asyncio.CancelledError: | ||
| pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exceptions from run_task may be silently swallowed.
If run_orchestrator_tasks() raises an exception (propagated from the TaskGroup), the current flow catches pending tasks but doesn't re-raise the exception from done tasks. This could hide failures.
🔧 Proposed fix to propagate exceptions
done, pending = await asyncio.wait(
[run_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
)
# Cancel remaining tasks
for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
+ # Re-raise any exception from completed tasks (except shutdown)
+ for task in done:
+ if task is not shutdown_task:
+ task.result() # Raises if task had an exception📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| # Wait for shutdown signal or service to stop | |
| while orchestrator.is_running and not shutdown_event.is_set(): | |
| await asyncio.sleep(1) | |
| # Run orchestrator until shutdown signal | |
| run_task = asyncio.create_task(run_orchestrator_tasks()) | |
| shutdown_task = asyncio.create_task(shutdown_event.wait()) | |
| done, pending = await asyncio.wait( | |
| [run_task, shutdown_task], | |
| return_when=asyncio.FIRST_COMPLETED, | |
| ) | |
| # Cancel remaining tasks | |
| for task in pending: | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass | |
| try: | |
| # Run orchestrator until shutdown signal | |
| run_task = asyncio.create_task(run_orchestrator_tasks()) | |
| shutdown_task = asyncio.create_task(shutdown_event.wait()) | |
| done, pending = await asyncio.wait( | |
| [run_task, shutdown_task], | |
| return_when=asyncio.FIRST_COMPLETED, | |
| ) | |
| # Cancel remaining tasks | |
| for task in pending: | |
| task.cancel() | |
| try: | |
| await task | |
| except asyncio.CancelledError: | |
| pass | |
| # Re-raise any exception from completed tasks (except shutdown) | |
| for task in done: | |
| if task is not shutdown_task: | |
| task.result() # Raises if task had an exception |
🤖 Prompt for AI Agents
In `@backend/workers/run_saga_orchestrator.py` around lines 54 - 70, The current
shutdown logic cancels pending tasks but can silently swallow exceptions from
completed tasks like run_task; after asyncio.wait returns, inspect the done set
(e.g., run_task in done) and propagate any errors by awaiting the completed
tasks or checking task.exception() and re-raising it before returning/finishing;
keep existing cancellation/await handling for pending tasks (shutdown_task) but
ensure exceptions from run_orchestrator_tasks are not ignored so the
orchestrator failure surfaces to the caller.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 issues found across 38 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/idempotency/faststream_middleware.py">
<violation number="1" location="backend/app/services/idempotency/faststream_middleware.py:80">
P2: If `mark_failed` throws an exception (e.g., network error), the original handler exception will be masked. Wrap `mark_failed` in a try-except to preserve the original exception.</violation>
</file>
<file name="backend/pyproject.toml">
<violation number="1" location="backend/pyproject.toml:92">
P2: Pin the dataclasses-avroschema dependency to a specific version to keep dependency resolution reproducible and consistent with the rest of the project.</violation>
<violation number="2" location="backend/pyproject.toml:127">
P2: Pin faststream to a specific version (or add an upper bound) to avoid unexpected breaking changes during dependency resolution.</violation>
</file>
<file name="backend/workers/run_result_processor.py">
<violation number="1" location="backend/workers/run_result_processor.py:157">
P2: The catch-all handler silently discards unhandled events without logging. This could mask routing issues or missing handlers for new event types. Consider logging a warning when unknown events are received.</violation>
</file>
<file name="backend/workers/run_k8s_worker.py">
<violation number="1" location="backend/workers/run_k8s_worker.py:142">
P2: Silent catch-all handler will discard unmatched events without any logging. This can mask bugs in filter logic or cause silent data loss when new event types are added. Consider logging unmatched events at warning level to aid debugging.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| await idempotency.mark_completed(event=event, key_strategy="event_based") | ||
| return handler_result | ||
| except Exception as e: | ||
| await idempotency.mark_failed(event=event, error=str(e), key_strategy="event_based") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: If mark_failed throws an exception (e.g., network error), the original handler exception will be masked. Wrap mark_failed in a try-except to preserve the original exception.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/idempotency/faststream_middleware.py, line 80:
<comment>If `mark_failed` throws an exception (e.g., network error), the original handler exception will be masked. Wrap `mark_failed` in a try-except to preserve the original exception.</comment>
<file context>
@@ -0,0 +1,81 @@
+ await idempotency.mark_completed(event=event, key_strategy="event_based")
+ return handler_result
+ except Exception as e:
+ await idempotency.mark_failed(event=event, error=str(e), key_strategy="event_based")
+ raise
</file context>
| "yarl==1.20.1", | ||
| "zipp==3.20.2", | ||
| "monggregate==0.22.1", | ||
| "faststream[kafka]>=0.6.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Pin faststream to a specific version (or add an upper bound) to avoid unexpected breaking changes during dependency resolution.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/pyproject.toml, line 127:
<comment>Pin faststream to a specific version (or add an upper bound) to avoid unexpected breaking changes during dependency resolution.</comment>
<file context>
@@ -124,6 +124,7 @@ dependencies = [
"yarl==1.20.1",
"zipp==3.20.2",
"monggregate==0.22.1",
+ "faststream[kafka]>=0.6.0",
]
</file context>
| "faststream[kafka]>=0.6.0", | |
| "faststream[kafka]==0.6.0", |
| "pyasn1_modules==0.4.2", | ||
| "pydantic==2.9.2", | ||
| "pydantic-avro==0.9.1", | ||
| "dataclasses-avroschema[pydantic]>=0.65.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Pin the dataclasses-avroschema dependency to a specific version to keep dependency resolution reproducible and consistent with the rest of the project.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/pyproject.toml, line 92:
<comment>Pin the dataclasses-avroschema dependency to a specific version to keep dependency resolution reproducible and consistent with the rest of the project.</comment>
<file context>
@@ -89,7 +89,7 @@ dependencies = [
"pyasn1_modules==0.4.2",
"pydantic==2.9.2",
- "pydantic-avro==0.9.1",
+ "dataclasses-avroschema[pydantic]>=0.65.0",
"pydantic-settings==2.5.2",
"pydantic_core==2.23.4",
</file context>
| "dataclasses-avroschema[pydantic]>=0.65.0", | |
| "dataclasses-avroschema[pydantic]==0.65.0", |
|
|
||
| # Default handler for unmatched events (prevents message loss) | ||
| @subscriber() | ||
| async def handle_other(event: DomainEvent) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: The catch-all handler silently discards unhandled events without logging. This could mask routing issues or missing handlers for new event types. Consider logging a warning when unknown events are received.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_result_processor.py, line 157:
<comment>The catch-all handler silently discards unhandled events without logging. This could mask routing issues or missing handlers for new event types. Consider logging a warning when unknown events are received.</comment>
<file context>
@@ -76,9 +75,106 @@ def main() -> None:
+
+ # Default handler for unmatched events (prevents message loss)
+ @subscriber()
+ async def handle_other(event: DomainEvent) -> None:
+ pass
+
</file context>
|
|
||
| # Default handler for unmatched events (prevents message loss) | ||
| @subscriber() | ||
| async def handle_other(event: DomainEvent) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Silent catch-all handler will discard unmatched events without any logging. This can mask bugs in filter logic or cause silent data loss when new event types are added. Consider logging unmatched events at warning level to aid debugging.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/workers/run_k8s_worker.py, line 142:
<comment>Silent catch-all handler will discard unmatched events without any logging. This can mask bugs in filter logic or cause silent data loss when new event types are added. Consider logging unmatched events at warning level to aid debugging.</comment>
<file context>
@@ -88,9 +69,99 @@ def main() -> None:
+
+ # Default handler for unmatched events (prevents message loss)
+ @subscriber()
+ async def handle_other(event: DomainEvent) -> None:
+ pass
+
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/pod_monitor/event_mapper.py (1)
11-21: GuardResourceUsageAvroconstruction whenresource_usageis null.
If executor output includes"resource_usage": null, the current call raises and drops the event. Use a safe fallback to{}.🐛 Suggested fix
- return PodLogs( + return PodLogs( stdout=data.get("stdout", ""), stderr=data.get("stderr", ""), exit_code=data.get("exit_code", 0), - resource_usage=ResourceUsageAvro(**data.get("resource_usage", {})), + resource_usage=ResourceUsageAvro(**(data.get("resource_usage") or {})), )Also applies to: 40-47, 491-502
🤖 Fix all issues with AI agents
In `@backend/tests/integration/conftest.py`:
- Around line 74-83: The sse_redis_bus fixture calls SSERedisBus with
redis_client as the only positional argument but SSERedisBus.__init__ expects
logger as the second positional parameter; fix by passing the test logger as the
second positional argument (e.g., SSERedisBus(redis_client, _test_logger,
exec_prefix=..., notif_prefix=...)) or convert to all keywords including
logger=_test_logger to ensure the logger is provided correctly.
In `@backend/tests/integration/result_processor/test_result_processor.py`:
- Around line 72-83: Change the consumer setup in the test to avoid replaying
historical Kafka events: when creating the test ConsumerConfig (the instance
assigned to processor_consumer_config) set auto_offset_reset="latest" instead of
"earliest" and avoid extremely small batch sizes (e.g., remove or increase
max_poll_records=1) so the consumer won't process long retention history slowly;
also ensure test code starts the consumer (the consumer created from
processor_consumer_config) before producing the test events so only new messages
are consumed.
♻️ Duplicate comments (4)
backend/app/core/providers.py (1)
461-470: SSEConnectionRegistry must useScope.APP, notScope.REQUEST.With
Scope.REQUEST, each HTTP request creates a new registry instance with an empty_active_connectionsdictionary. This causesget_connection_count()to always return 0 from the health endpoint since registrations from SSE streams are invisible to other requests. The registry is designed as application-wide infrastructure to track connections globally.- `@provide`(scope=Scope.REQUEST) + `@provide`(scope=Scope.APP) def get_sse_connection_registry(backend/workers/run_pod_monitor.py (1)
74-88: Exceptions frommonitor.run()are silently swallowed.When
asyncio.wait()returns, ifmonitor_taskcompleted with an exception (e.g., K8s API failure), that exception is never retrieved from thedoneset. The worker exits normally without logging the actual failure cause.🐛 Proposed fix
done, pending = await asyncio.wait( [monitor_task, shutdown_task], return_when=asyncio.FIRST_COMPLETED, ) + # Check for exceptions in completed tasks + for task in done: + if task is not shutdown_task and not task.cancelled(): + exc = task.exception() + if exc: + logger.error("PodMonitor crashed", exc_info=exc) + raise exc + for task in pending: task.cancel() with suppress(asyncio.CancelledError): await taskbackend/app/core/dishka_lifespan.py (1)
105-119: Background task failures may go unnoticed until shutdown.Tasks created with
asyncio.create_task()that fail during normal operation will have their exceptions silently swallowed until shutdown whengather(*tasks, return_exceptions=True)is called. Ifrun_sse_consumers(),event_store_consumer.run(), ornotification_service.run()raises an exception, the application continues running in a degraded state without any indication.Consider adding done callbacks to log failures and optionally trigger shutdown on critical task failures.
🛠️ Suggested supervision pattern
tasks = [ asyncio.create_task(run_sse_consumers(), name="sse_consumers"), asyncio.create_task(event_store_consumer.run(), name="event_store_consumer"), asyncio.create_task(notification_service.run(), name="notification_service"), ] + + def _handle_task_done(task: asyncio.Task) -> None: + if task.cancelled(): + return + exc = task.exception() + if exc: + logger.error( + "Background task failed", + extra={"task_name": task.get_name()}, + exc_info=exc, + ) + + for task in tasks: + task.add_done_callback(_handle_task_done) + logger.info( "Background services started", extra={"sse_consumer_count": len(sse_consumers)}, )backend/app/services/saga/saga_logic.py (1)
315-325: Timeout loop should not die on transient exceptions.Any unexpected exception inside
check_timeouts_once()will terminate the loop and stop timeout enforcement. Wrap the loop body to log-and-continue on generic exceptions.🛠️ Suggested hardening
async def check_timeouts_loop(self) -> None: """Check for saga timeouts (runs until cancelled).""" try: while True: # Check every 30 seconds await asyncio.sleep(30) - await self.check_timeouts_once() + try: + await self.check_timeouts_once() + except Exception: + self.logger.error("Timeout checker failed; continuing loop", exc_info=True) except asyncio.CancelledError: self.logger.info("Timeout checker cancelled")
🧹 Nitpick comments (8)
backend/app/services/saga/execution_saga.py (1)
62-100: Useevent.execution_idas a fallback for robustness.
Ifexecution_idis ever missing in context (future refactors or alternate entry points), this step will fail before publishing. A simple fallback makes it resilient.♻️ Suggested tweak
- execution_id = context.get("execution_id") + execution_id = context.get("execution_id") or event.execution_idbackend/workers/run_result_processor.py (1)
134-158: Handlers call private methods (_handle_*) onProcessorLogic.The handlers directly invoke methods prefixed with
_(e.g.,_handle_completed,_handle_failed), which conventionally indicates internal/private methods not intended for external use. Consider either making these methods public (remove underscore prefix) or providing a public interface method.backend/app/services/idempotency/faststream_middleware.py (2)
44-50: Fail-open approach silently skips idempotency on DI failures.The broad
except Exceptioncatch ensures message processing continues even when dependencies are unavailable. Consider logging a warning here to aid debugging when idempotency is unexpectedly skipped due to misconfiguration.♻️ Suggested improvement
# Resolve dependencies from request-scoped container try: idempotency = await container.get(IdempotencyManager) schema_registry = await container.get(SchemaRegistryManager) - except Exception: + except Exception as e: # Dependencies not available - skip idempotency + # Log at debug level to help diagnose misconfiguration + import logging + logging.getLogger(__name__).debug( + "Skipping idempotency: failed to resolve dependencies: %s", e + ) return await call_next(msg)
58-62: Hardcoded topic string "idempotency" passed to deserialize_event.The
topicparameter indeserialize_eventappears to be used only for error context. If the actual topic is available frommsg, consider passing it for more accurate error messages during debugging.backend/workers/run_saga_orchestrator.py (1)
160-161: Consider structured logging for consistency.Using f-string interpolation for logging is inconsistent with the structured logging pattern (
extra={}) used elsewhere in the codebase.♻️ Suggested change
- app_logger.info(f"Subscribing to topics: {topics}") + app_logger.info("Subscribing to topics", extra={"topics": topics})backend/app/events/schema/schema_registry.py (1)
84-102: Minor redundancy: AvroSchema created twice.
_ensure_schema_registeredalready creates anAvroSchemaobject (line 61), butserialize_eventcreates it again (line 92). Consider caching theAvroSchemaobject alongside the schema ID to avoid redundant creation.backend/app/services/result_processor/processor_logic.py (2)
54-66: Consider explicit type check instead of assert for production safety.Using
assert isinstance(event, ...)for type narrowing works, but assertions are stripped when Python runs with-O(optimized mode). If this could happen in production, the type check would be silently skipped.♻️ Safer alternative
async def _handle_completed_wrapper(self, event: DomainEvent) -> None: - assert isinstance(event, ExecutionCompletedEvent) + if not isinstance(event, ExecutionCompletedEvent): + raise TypeError(f"Expected ExecutionCompletedEvent, got {type(event).__name__}") await self._handle_completed(event)Apply similarly to
_handle_failed_wrapperand_handle_timeout_wrapper.
177-193: Hardcodedservice_version="1.0.0"in event metadata.Consider reading the version from settings or a shared constant to avoid manual updates when the service version changes.
| @pytest_asyncio.fixture | ||
| async def sse_redis_bus(redis_client: redis.Redis) -> SSERedisBus: | ||
| """Provide SSERedisBus with unique prefixes for test isolation.""" | ||
| suffix = uuid.uuid4().hex[:6] | ||
| return SSERedisBus( | ||
| redis_client, | ||
| exec_prefix=f"sse:exec:{suffix}:", | ||
| notif_prefix=f"sse:notif:{suffix}:", | ||
| logger=_test_logger, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find SSERedisBus class definition
fd -t f -e py . | xargs rg -l "class SSERedisBus"Repository: HardMax71/Integr8sCode
Length of output: 103
🏁 Script executed:
#!/bin/bash
# Read the SSERedisBus class definition to check __init__ signature
cat -n backend/app/services/sse/redis_bus.py | head -80Repository: HardMax71/Integr8sCode
Length of output: 3507
Fix argument order in SSERedisBus initialization.
The SSERedisBus.__init__ constructor requires logger as the second positional parameter (not optional). The fixture passes redis_client positionally but omits logger from the positional arguments, causing a runtime TypeError. Either pass logger as the second positional argument or use all keyword arguments:
SSERedisBus(
redis_client,
_test_logger, # Pass as 2nd positional arg
exec_prefix=f"sse:exec:{suffix}:",
notif_prefix=f"sse:notif:{suffix}:",
)🤖 Prompt for AI Agents
In `@backend/tests/integration/conftest.py` around lines 74 - 83, The
sse_redis_bus fixture calls SSERedisBus with redis_client as the only positional
argument but SSERedisBus.__init__ expects logger as the second positional
parameter; fix by passing the test logger as the second positional argument
(e.g., SSERedisBus(redis_client, _test_logger, exec_prefix=...,
notif_prefix=...)) or convert to all keywords including logger=_test_logger to
ensure the logger is provided correctly.
| # Create consumer config with unique group id | ||
| processor_consumer_config = ConsumerConfig( | ||
| bootstrap_servers=test_settings.KAFKA_BOOTSTRAP_SERVERS, | ||
| group_id=f"{GroupId.RESULT_PROCESSOR}.test.{uuid.uuid4().hex[:8]}", | ||
| max_poll_records=1, | ||
| enable_auto_commit=True, | ||
| auto_offset_reset="earliest", | ||
| session_timeout_ms=test_settings.KAFKA_SESSION_TIMEOUT_MS, | ||
| heartbeat_interval_ms=test_settings.KAFKA_HEARTBEAT_INTERVAL_MS, | ||
| max_poll_interval_ms=test_settings.KAFKA_MAX_POLL_INTERVAL_MS, | ||
| request_timeout_ms=test_settings.KAFKA_REQUEST_TIMEOUT_MS, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, find and examine the test file
fd "test_result_processor.py" | head -5Repository: HardMax71/Integr8sCode
Length of output: 133
🏁 Script executed:
# Get the line count and read the relevant section
wc -l backend/tests/integration/result_processor/test_result_processor.pyRepository: HardMax71/Integr8sCode
Length of output: 137
🏁 Script executed:
# Read the full test function to understand context
sed -n '50,160p' backend/tests/integration/result_processor/test_result_processor.pyRepository: HardMax71/Integr8sCode
Length of output: 4255
🏁 Script executed:
# Find KafkaTopic definition
fd "\.py$" | xargs grep -l "class KafkaTopic" | head -3Repository: HardMax71/Integr8sCode
Length of output: 99
🏁 Script executed:
# Check for any test-specific topic setup or fixtures
grep -r "KafkaTopic\|EXECUTION_COMPLETED\|test.*topic" backend/tests/integration/result_processor/conftest.py 2>/dev/null | head -20Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Look for KafkaTopic definition
rg "KafkaTopic" -A 5 backend/tests/integration/result_processor/test_result_processor.py | head -20Repository: HardMax71/Integr8sCode
Length of output: 1015
🏁 Script executed:
# Find where KafkaTopic is defined
fd "\.py$" -type f | xargs rg "class KafkaTopic|KafkaTopic\s*=" | head -10Repository: HardMax71/Integr8sCode
Length of output: 406
🏁 Script executed:
# Read the KafkaTopic enum definition
cat -n backend/app/domain/enums/kafka.pyRepository: HardMax71/Integr8sCode
Length of output: 5026
🏁 Script executed:
# Check if there are any test-specific topic prefixes or setup
grep -r "test.*topic\|topic.*test" backend/tests/integration/ | grep -v ".pyc" | head -10Repository: HardMax71/Integr8sCode
Length of output: 1278
🏁 Script executed:
# Look at the full conftest.py to understand test setup
cat -n backend/tests/integration/result_processor/conftest.pyRepository: HardMax71/Integr8sCode
Length of output: 152
🏁 Script executed:
# Find conftest.py files related to the test
find backend/tests/integration -name "conftest.py" -type fRepository: HardMax71/Integr8sCode
Length of output: 103
🏁 Script executed:
# Check the parent conftest file
cat -n backend/tests/integration/conftest.py | head -100Repository: HardMax71/Integr8sCode
Length of output: 4212
🏁 Script executed:
# Check if there's a settings file that shows topic prefixes
rg "KAFKA_TOPIC_PREFIX" backend/tests --type py | head -10Repository: HardMax71/Integr8sCode
Length of output: 497
🏁 Script executed:
# Look for test environment setup documentation
rg "auto_offset_reset|earliest|latest" backend --type py | grep -i test | head -15Repository: HardMax71/Integr8sCode
Length of output: 1290
🏁 Script executed:
# Check if this test has any history of issues or skips
grep -B 5 -A 5 "test_result_processor_integration\|def test_" backend/tests/integration/result_processor/test_result_processor.py | head -30Repository: HardMax71/Integr8sCode
Length of output: 353
🏁 Script executed:
# Look at how other integration tests handle Kafka produce/consume
rg "auto_offset_reset" backend/tests/integration --type py -B 3 -A 3 | head -50Repository: HardMax71/Integr8sCode
Length of output: 4110
🏁 Script executed:
# Check what the actual test name is and if it passes
sed -n '1,80p' backend/tests/integration/result_processor/test_result_processor.pyRepository: HardMax71/Integr8sCode
Length of output: 3131
🏁 Script executed:
# Check if topics are created per test or persist across tests
grep -r "topic\|Topic" backend/tests/conftest.py | head -20Repository: HardMax71/Integr8sCode
Length of output: 324
🏁 Script executed:
# Look for any Kafka setup/teardown in CI or scripts
find backend/scripts -name "*.sh" -o -name "*.py" | head -10Repository: HardMax71/Integr8sCode
Length of output: 156
🏁 Script executed:
# Check if UnifiedConsumer or the test has any special offset management
rg "UnifiedConsumer|seek\|offset" backend/tests/integration/result_processor/test_result_processor.py -A 2 -B 2Repository: HardMax71/Integr8sCode
Length of output: 789
🏁 Script executed:
# Check the create_topics script to see how topics are managed
cat -n backend/scripts/create_topics.pyRepository: HardMax71/Integr8sCode
Length of output: 4843
🏁 Script executed:
# Check if there's any cleanup or retention policy for test topics
grep -r "retention\|cleanup\|compact" backend/scripts backend/tests --type py | head -15Repository: HardMax71/Integr8sCode
Length of output: 183
🏁 Script executed:
# Verify the exact behavior: are unique consumer groups actually created fresh?
# Look at how consumer group management works
rg "group_id" backend/tests/integration/result_processor/test_result_processor.py -B 1 -A 1Repository: HardMax71/Integr8sCode
Length of output: 393
🏁 Script executed:
# Check if events are filtered by execution_id to understand if stale events matter
sed -n '100,165p' backend/tests/integration/result_processor/test_result_processor.pyRepository: HardMax71/Integr8sCode
Length of output: 2445
🏁 Script executed:
# See if there are any other tests in the same file that might produce events to same topics
grep -n "def test_\|await producer.produce" backend/tests/integration/result_processor/test_result_processor.pyRepository: HardMax71/Integr8sCode
Length of output: 174
🏁 Script executed:
# Check how other shared-topic tests handle this pattern
find backend/tests/integration -name "*.py" -exec grep -l "auto_offset_reset.*earliest" {} \;Repository: HardMax71/Integr8sCode
Length of output: 221
🏁 Script executed:
# Check if there are any offset seeking utilities being used
rg "seek\|Offset" backend/app/events --type py -B 2 -A 2 | head -30Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Look at what UnifiedConsumer does with offset_reset
find backend/app -name "*.py" -exec grep -l "UnifiedConsumer" {} \;Repository: HardMax71/Integr8sCode
Length of output: 203
🏁 Script executed:
# Check the UnifiedConsumer implementation
rg "class UnifiedConsumer" backend/app -A 30Repository: HardMax71/Integr8sCode
Length of output: 2277
🏁 Script executed:
# Check if there are any issues or timeouts reported with this test
git log --oneline backend/tests/integration/result_processor/test_result_processor.py | head -10Repository: HardMax71/Integr8sCode
Length of output: 111
🏁 Script executed:
# Verify the behavior: does auto_offset_reset="earliest" on a fresh group read only new events or all historical?
# Check ConsumerConfig and how it's used
cat -n backend/app/events/core/consumer.py | sed -n '1,100p'Repository: HardMax71/Integr8sCode
Length of output: 4718
🏁 Script executed:
# Check if there's any documentation on how offset_reset works in the test setup
grep -A 10 "auto_offset_reset" backend/app/events/core/consumer.pyRepository: HardMax71/Integr8sCode
Length of output: 625
🏁 Script executed:
# Let's verify the Kafka version and aiokafka behavior documentation
rg "aiokafka|AIOKafkaConsumer" backend/pyproject.toml backend/requirements.txt 2>/dev/nullRepository: HardMax71/Integr8sCode
Length of output: 118
🏁 Script executed:
# Check if there are any flakiness or timeout issues reported in other tests
grep -r "flaky\|timeout\|slow" backend/tests/integration --include="*.py" | grep -i kafka | head -10Repository: HardMax71/Integr8sCode
Length of output: 335
🏁 Script executed:
# Verify execution_id filtering is actually preventing cross-test interference
sed -n '100,115p' backend/tests/integration/result_processor/test_result_processor.pyRepository: HardMax71/Integr8sCode
Length of output: 638
Prefer starting consumers before producing events to avoid replaying historical Kafka events.
With a fresh consumer group and auto_offset_reset="earliest" on shared production topics (7-day retention), the consumers will replay all historical events before processing the new one. Although the execution_id filter prevents test failure, processing large event histories sequentially (especially with max_poll_records=1) risks timeout failures. Start consumers first and use auto_offset_reset="latest" to ensure only new events are processed.
🔧 Suggested adjustment
processor_consumer_config = ConsumerConfig(
bootstrap_servers=test_settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=f"{GroupId.RESULT_PROCESSOR}.test.{uuid.uuid4().hex[:8]}",
max_poll_records=1,
enable_auto_commit=True,
- auto_offset_reset="earliest",
+ auto_offset_reset="latest",
session_timeout_ms=test_settings.KAFKA_SESSION_TIMEOUT_MS,
heartbeat_interval_ms=test_settings.KAFKA_HEARTBEAT_INTERVAL_MS,
max_poll_interval_ms=test_settings.KAFKA_MAX_POLL_INTERVAL_MS,
request_timeout_ms=test_settings.KAFKA_REQUEST_TIMEOUT_MS,
)
# Create processor consumer (idempotency is now handled by FastStream middleware in production)
processor_consumer = UnifiedConsumer(
processor_consumer_config,
dispatcher=processor_dispatcher,
schema_registry=schema_registry,
settings=test_settings,
logger=_test_logger,
event_metrics=event_metrics,
topics=[KafkaTopic.EXECUTION_COMPLETED, KafkaTopic.EXECUTION_FAILED, KafkaTopic.EXECUTION_TIMEOUT],
)
# Setup a separate consumer to capture ResultStoredEvent
stored_dispatcher = EventDispatcher(logger=_test_logger)
stored_received = asyncio.Event()
`@stored_dispatcher.register`(EventType.RESULT_STORED)
async def _stored(event: ResultStoredEvent) -> None:
if event.execution_id == execution_id:
stored_received.set()
stored_consumer_config = ConsumerConfig(
bootstrap_servers=test_settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=f"test.result_stored.{uuid.uuid4().hex[:8]}",
max_poll_records=1,
enable_auto_commit=True,
- auto_offset_reset="earliest",
+ auto_offset_reset="latest",
)
stored_consumer = UnifiedConsumer(
stored_consumer_config,
stored_dispatcher,
schema_registry=schema_registry,
settings=test_settings,
logger=_test_logger,
event_metrics=event_metrics,
topics=[KafkaTopic.EXECUTION_RESULTS],
)
+
+ # Start consumers as background tasks
+ processor_task = asyncio.create_task(processor_consumer.run())
+ stored_task = asyncio.create_task(stored_consumer.run())
- # Produce the event BEFORE starting consumers (auto_offset_reset="earliest" will read it)
+ # Produce the event AFTER starting consumers
usage = ResourceUsageAvro(
execution_time_wall_seconds=0.5,
cpu_time_jiffies=100,
clk_tck_hertz=100,
peak_memory_kb=1024,
)
evt = ExecutionCompletedEvent(
execution_id=execution_id,
exit_code=0,
stdout="hello",
stderr="",
resource_usage=usage,
metadata=EventMetadata(service_name="tests", service_version="1.0.0"),
)
await producer.produce(evt, key=execution_id)
-
- # Start consumers as background tasks
- processor_task = asyncio.create_task(processor_consumer.run())
- stored_task = asyncio.create_task(stored_consumer.run())Also applies to: 123-142
🤖 Prompt for AI Agents
In `@backend/tests/integration/result_processor/test_result_processor.py` around
lines 72 - 83, Change the consumer setup in the test to avoid replaying
historical Kafka events: when creating the test ConsumerConfig (the instance
assigned to processor_consumer_config) set auto_offset_reset="latest" instead of
"earliest" and avoid extremely small batch sizes (e.g., remove or increase
max_poll_records=1) so the consumer won't process long retention history slowly;
also ensure test code starts the consumer (the consumer created from
processor_consumer_config) before producing the test events so only new messages
are consumed.




Summary
Removing nullable fields where possible - source of errors.
Type of Change
Test Plan
uv run pytest)npx playwright test)Manual test steps:
Checklist
pre-commit run --all-filesand all checks passSummary by cubic
Removed LifecycleEnabled and factory wrappers across services. DI now manages lifecycles; EventBusManager is removed, NotificationService self-starts, and SSE uses a stateless event router with a connection registry.
Refactors
Migration
Written for commit 66456d0. Summary will update on new commits.
Summary by CodeRabbit
Release Notes
Architecture Improvements
Event Processing & Messaging
Worker Services
Configuration
✏️ Tip: You can customize this high-level summary in your review settings.