diff --git a/js/src/sandbox.ts b/js/src/sandbox.ts index a02b9d9e..070cd33c 100644 --- a/js/src/sandbox.ts +++ b/js/src/sandbox.ts @@ -436,4 +436,47 @@ export class Sandbox extends BaseSandbox { throw formatRequestTimeoutError(error) } } + + /** + * Interrupt a running execution in a context. + * + * This sends an interrupt signal to the Jupyter kernel, which stops the + * currently running code without restarting the kernel. All previously + * defined variables, imports, and state are preserved. + * + * This is useful for stopping long-running code after a timeout without + * losing kernel state. + * + * @param context context to interrupt. + * + * @returns void. + */ + async interruptCodeContext(context: Context | string): Promise { + try { + const id = typeof context === 'string' ? context : context.id + const headers: Record = { + 'Content-Type': 'application/json', + } + + if (this.trafficAccessToken) { + headers['E2B-Traffic-Access-Token'] = this.trafficAccessToken + } + + const res = await fetch(`${this.jupyterUrl}/contexts/${id}/interrupt`, { + method: 'POST', + headers, + keepalive: true, + signal: this.connectionConfig.getSignal( + this.connectionConfig.requestTimeoutMs + ), + }) + + const error = await extractError(res) + if (error) { + throw error + } + } catch (error) { + throw formatRequestTimeoutError(error) + } + } } diff --git a/python/e2b_code_interpreter/code_interpreter_async.py b/python/e2b_code_interpreter/code_interpreter_async.py index de938240..e310bd5b 100644 --- a/python/e2b_code_interpreter/code_interpreter_async.py +++ b/python/e2b_code_interpreter/code_interpreter_async.py @@ -369,3 +369,41 @@ async def restart_code_context( raise err except httpx.TimeoutException: raise format_request_timeout_error() + + async def interrupt_code_context( + self, + context: Union[Context, str], + ) -> None: + """ + Interrupt a running execution in a context. + + This sends an interrupt signal to the Jupyter kernel, which stops the + currently running code without restarting the kernel. All previously + defined variables, imports, and state are preserved. + + This is useful for stopping long-running code after a timeout without + losing kernel state. + + :param context: Context to interrupt. Can be a Context object or a context ID string. + + :return: None + """ + context_id = context.id if isinstance(context, Context) else context + try: + headers = { + "Content-Type": "application/json", + } + if self.traffic_access_token: + headers["E2B-Traffic-Access-Token"] = self.traffic_access_token + + response = await self._client.post( + f"{self._jupyter_url}/contexts/{context_id}/interrupt", + headers=headers, + timeout=self.connection_config.request_timeout, + ) + + err = await aextract_exception(response) + if err: + raise err + except httpx.TimeoutException: + raise format_request_timeout_error() diff --git a/python/e2b_code_interpreter/code_interpreter_sync.py b/python/e2b_code_interpreter/code_interpreter_sync.py index 3adb2804..af58b7ff 100644 --- a/python/e2b_code_interpreter/code_interpreter_sync.py +++ b/python/e2b_code_interpreter/code_interpreter_sync.py @@ -366,3 +366,42 @@ def restart_code_context( raise err except httpx.TimeoutException: raise format_request_timeout_error() + + def interrupt_code_context( + self, + context: Union[Context, str], + ) -> None: + """ + Interrupt a running execution in a context. + + This sends an interrupt signal to the Jupyter kernel, which stops the + currently running code without restarting the kernel. All previously + defined variables, imports, and state are preserved. + + This is useful for stopping long-running code after a timeout without + losing kernel state. + + :param context: Context to interrupt. Can be a Context object or a context ID string. + + :return: None + """ + context_id = context.id if isinstance(context, Context) else context + + try: + headers: Dict[str, str] = {"Content-Type": "application/json"} + if self._envd_access_token: + headers["X-Access-Token"] = self._envd_access_token + if self.traffic_access_token: + headers["E2B-Traffic-Access-Token"] = self.traffic_access_token + + response = self._client.post( + f"{self._jupyter_url}/contexts/{context_id}/interrupt", + headers=headers, + timeout=self.connection_config.request_timeout, + ) + + err = extract_exception(response) + if err: + raise err + except httpx.TimeoutException: + raise format_request_timeout_error() diff --git a/template/server/main.py b/template/server/main.py index 1f296926..cbbbee81 100644 --- a/template/server/main.py +++ b/template/server/main.py @@ -183,6 +183,27 @@ async def restart_context(context_id: str) -> None: websockets[context_id] = ws +@app.post("/contexts/{context_id}/interrupt") +async def interrupt_context(context_id: str) -> None: + logger.info(f"Interrupting context {context_id}") + + ws = websockets.get(context_id, None) + if not ws: + return PlainTextResponse( + f"Context {context_id} not found", + status_code=404, + ) + + response = await client.post( + f"{JUPYTER_BASE_URL}/api/kernels/{ws.context_id}/interrupt" + ) + if not response.is_success: + return PlainTextResponse( + f"Failed to interrupt context {context_id}", + status_code=500, + ) + + @app.delete("/contexts/{context_id}") async def remove_context(context_id: str) -> None: logger.info(f"Removing context {context_id}") diff --git a/template/server/messaging.py b/template/server/messaging.py index 0b151f8e..3cf11a1a 100644 --- a/template/server/messaging.py +++ b/template/server/messaging.py @@ -294,6 +294,11 @@ async def execute( if self._ws is None: raise Exception("WebSocket not connected") + # Phase A (under lock): prepare env vars, send request to kernel, + # and schedule env var cleanup. The lock serializes sends to the + # Jupyter kernel WebSocket and protects shared state like + # _global_env_vars and _cleanup_task. + message_id = str(uuid.uuid4()) async with self._lock: # Wait for any pending cleanup task to complete if self._cleanup_task and not self._cleanup_task.done(): @@ -327,7 +332,6 @@ async def execute( ) complete_code = f"{indented_env_code}\n{complete_code}" - message_id = str(uuid.uuid4()) execution = Execution() self._executions[message_id] = execution @@ -362,18 +366,28 @@ async def execute( ) await execution.queue.put(UnexpectedEndOfExecution()) - # Stream the results - async for item in self._wait_for_result(message_id): - yield item - - del self._executions[message_id] - - # Clean up env vars in a separate request after the main code has run + # Schedule env var cleanup while still holding the lock. + # The cleanup sends a background execute_request to the kernel, + # which will be processed after the main code finishes (kernel + # serializes execution). The next execute() call will await + # this task in Phase A before proceeding. if env_vars: self._cleanup_task = asyncio.create_task( self._cleanup_env_vars(env_vars) ) + # Phase B (no lock held): stream results back to client. + # Results are routed by unique message_id in _process_message(), + # so no shared state is accessed during streaming. If the client + # disconnects (SDK timeout), the generator is abandoned but the + # lock is already released — no orphaned lock. + try: + async for item in self._wait_for_result(message_id): + yield item + finally: + # Clean up execution entry even if generator is abandoned + self._executions.pop(message_id, None) + async def _receive_message(self): if not self._ws: logger.error("No WebSocket connection")