From 15ad5ab02e15aa9d33ac8f4db0b305e912cae366 Mon Sep 17 00:00:00 2001 From: Tomas Beran Date: Tue, 24 Mar 2026 00:57:07 +0100 Subject: [PATCH] fix: release asyncio.Lock before streaming to prevent orphan on client disconnect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Narrow the lock scope in ContextWebSocket.execute() so that the lock is only held during the prepare+send phase, not during result streaming. Previously, the lock was held for the entire generator lifetime including the _wait_for_result() streaming loop. When a client disconnected (e.g. SDK timeout), Starlette abandoned the generator while it was blocked at `await queue.get()`. The lock stayed held until the kernel finished internally, blocking all subsequent executions on the same context and causing cascading timeouts. The fix moves the streaming phase (Phase B) outside the `async with self._lock` block. This is safe because results are routed by unique message_id in _process_message() — no shared state is accessed during streaming. A try/finally ensures the execution entry is cleaned up even if the generator is abandoned. Fixes #213 --- template/server/messaging.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/template/server/messaging.py b/template/server/messaging.py index 0b151f8e..244338fc 100644 --- a/template/server/messaging.py +++ b/template/server/messaging.py @@ -294,6 +294,14 @@ async def execute( if self._ws is None: raise Exception("WebSocket not connected") + message_id = str(uuid.uuid4()) + + # Phase A: prepare and send to kernel (under lock) + # The lock serializes the prepare+send phase to protect shared state + # (_global_env_vars init, _cleanup_task lifecycle, WebSocket sends). + # It is released before streaming results so that a client disconnect + # during streaming does not orphan the lock and block subsequent + # executions. See https://github.com/e2b-dev/code-interpreter/issues/213 async with self._lock: # Wait for any pending cleanup task to complete if self._cleanup_task and not self._cleanup_task.done(): @@ -327,7 +335,6 @@ async def execute( ) complete_code = f"{indented_env_code}\n{complete_code}" - message_id = str(uuid.uuid4()) execution = Execution() self._executions[message_id] = execution @@ -362,11 +369,15 @@ async def execute( ) await execution.queue.put(UnexpectedEndOfExecution()) - # Stream the results + # Phase B: stream results (no lock held) + # Results are routed by unique message_id in _process_message(), + # so no shared state is accessed during streaming. + try: async for item in self._wait_for_result(message_id): yield item - - del self._executions[message_id] + finally: + # Clean up execution entry even if generator is abandoned + self._executions.pop(message_id, None) # Clean up env vars in a separate request after the main code has run if env_vars: