diff --git a/template/server/messaging.py b/template/server/messaging.py index 0b151f8e..244338fc 100644 --- a/template/server/messaging.py +++ b/template/server/messaging.py @@ -294,6 +294,14 @@ async def execute( if self._ws is None: raise Exception("WebSocket not connected") + message_id = str(uuid.uuid4()) + + # Phase A: prepare and send to kernel (under lock) + # The lock serializes the prepare+send phase to protect shared state + # (_global_env_vars init, _cleanup_task lifecycle, WebSocket sends). + # It is released before streaming results so that a client disconnect + # during streaming does not orphan the lock and block subsequent + # executions. See https://github.com/e2b-dev/code-interpreter/issues/213 async with self._lock: # Wait for any pending cleanup task to complete if self._cleanup_task and not self._cleanup_task.done(): @@ -327,7 +335,6 @@ async def execute( ) complete_code = f"{indented_env_code}\n{complete_code}" - message_id = str(uuid.uuid4()) execution = Execution() self._executions[message_id] = execution @@ -362,11 +369,15 @@ async def execute( ) await execution.queue.put(UnexpectedEndOfExecution()) - # Stream the results + # Phase B: stream results (no lock held) + # Results are routed by unique message_id in _process_message(), + # so no shared state is accessed during streaming. + try: async for item in self._wait_for_result(message_id): yield item - - del self._executions[message_id] + finally: + # Clean up execution entry even if generator is abandoned + self._executions.pop(message_id, None) # Clean up env vars in a separate request after the main code has run if env_vars: