Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions js/src/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -436,4 +436,47 @@ export class Sandbox extends BaseSandbox {
throw formatRequestTimeoutError(error)
}
}

/**
* Interrupt a running execution in a context.
*
* This sends an interrupt signal to the Jupyter kernel, which stops the
* currently running code without restarting the kernel. All previously
* defined variables, imports, and state are preserved.
*
* This is useful for stopping long-running code after a timeout without
* losing kernel state.
*
* @param context context to interrupt.
*
* @returns void.
*/
async interruptCodeContext(context: Context | string): Promise<void> {
try {
const id = typeof context === 'string' ? context : context.id
const headers: Record<string, string> = {
'Content-Type': 'application/json',
}

if (this.trafficAccessToken) {
headers['E2B-Traffic-Access-Token'] = this.trafficAccessToken
}

const res = await fetch(`${this.jupyterUrl}/contexts/${id}/interrupt`, {
method: 'POST',
headers,
keepalive: true,
signal: this.connectionConfig.getSignal(
this.connectionConfig.requestTimeoutMs
),
})

const error = await extractError(res)
if (error) {
throw error
}
} catch (error) {
throw formatRequestTimeoutError(error)
}
}
}
38 changes: 38 additions & 0 deletions python/e2b_code_interpreter/code_interpreter_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,3 +369,41 @@ async def restart_code_context(
raise err
except httpx.TimeoutException:
raise format_request_timeout_error()

async def interrupt_code_context(
self,
context: Union[Context, str],
) -> None:
"""
Interrupt a running execution in a context.

This sends an interrupt signal to the Jupyter kernel, which stops the
currently running code without restarting the kernel. All previously
defined variables, imports, and state are preserved.

This is useful for stopping long-running code after a timeout without
losing kernel state.

:param context: Context to interrupt. Can be a Context object or a context ID string.

:return: None
"""
context_id = context.id if isinstance(context, Context) else context
try:
headers = {
"Content-Type": "application/json",
}
if self.traffic_access_token:
headers["E2B-Traffic-Access-Token"] = self.traffic_access_token

response = await self._client.post(
f"{self._jupyter_url}/contexts/{context_id}/interrupt",
headers=headers,
timeout=self.connection_config.request_timeout,
)

err = await aextract_exception(response)
if err:
raise err
except httpx.TimeoutException:
raise format_request_timeout_error()
39 changes: 39 additions & 0 deletions python/e2b_code_interpreter/code_interpreter_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,42 @@ def restart_code_context(
raise err
except httpx.TimeoutException:
raise format_request_timeout_error()

def interrupt_code_context(
self,
context: Union[Context, str],
) -> None:
"""
Interrupt a running execution in a context.

This sends an interrupt signal to the Jupyter kernel, which stops the
currently running code without restarting the kernel. All previously
defined variables, imports, and state are preserved.

This is useful for stopping long-running code after a timeout without
losing kernel state.

:param context: Context to interrupt. Can be a Context object or a context ID string.

:return: None
"""
context_id = context.id if isinstance(context, Context) else context

try:
headers: Dict[str, str] = {"Content-Type": "application/json"}
if self._envd_access_token:
headers["X-Access-Token"] = self._envd_access_token
if self.traffic_access_token:
headers["E2B-Traffic-Access-Token"] = self.traffic_access_token

response = self._client.post(
f"{self._jupyter_url}/contexts/{context_id}/interrupt",
headers=headers,
timeout=self.connection_config.request_timeout,
)

err = extract_exception(response)
if err:
raise err
except httpx.TimeoutException:
raise format_request_timeout_error()
21 changes: 21 additions & 0 deletions template/server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,27 @@ async def restart_context(context_id: str) -> None:
websockets[context_id] = ws


@app.post("/contexts/{context_id}/interrupt")
async def interrupt_context(context_id: str) -> None:
logger.info(f"Interrupting context {context_id}")

ws = websockets.get(context_id, None)
if not ws:
return PlainTextResponse(
f"Context {context_id} not found",
status_code=404,
)

response = await client.post(
f"{JUPYTER_BASE_URL}/api/kernels/{ws.context_id}/interrupt"
)
if not response.is_success:
return PlainTextResponse(
f"Failed to interrupt context {context_id}",
status_code=500,
)


@app.delete("/contexts/{context_id}")
async def remove_context(context_id: str) -> None:
logger.info(f"Removing context {context_id}")
Expand Down
30 changes: 22 additions & 8 deletions template/server/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ async def execute(
if self._ws is None:
raise Exception("WebSocket not connected")

# Phase A (under lock): prepare env vars, send request to kernel,
# and schedule env var cleanup. The lock serializes sends to the
# Jupyter kernel WebSocket and protects shared state like
# _global_env_vars and _cleanup_task.
message_id = str(uuid.uuid4())
async with self._lock:
# Wait for any pending cleanup task to complete
if self._cleanup_task and not self._cleanup_task.done():
Expand Down Expand Up @@ -327,7 +332,6 @@ async def execute(
)
complete_code = f"{indented_env_code}\n{complete_code}"

message_id = str(uuid.uuid4())
execution = Execution()
self._executions[message_id] = execution

Expand Down Expand Up @@ -362,18 +366,28 @@ async def execute(
)
await execution.queue.put(UnexpectedEndOfExecution())

# Stream the results
async for item in self._wait_for_result(message_id):
yield item

del self._executions[message_id]

# Clean up env vars in a separate request after the main code has run
# Schedule env var cleanup while still holding the lock.
# The cleanup sends a background execute_request to the kernel,
# which will be processed after the main code finishes (kernel
# serializes execution). The next execute() call will await
# this task in Phase A before proceeding.
if env_vars:
self._cleanup_task = asyncio.create_task(
self._cleanup_env_vars(env_vars)
)

# Phase B (no lock held): stream results back to client.
# Results are routed by unique message_id in _process_message(),
# so no shared state is accessed during streaming. If the client
# disconnects (SDK timeout), the generator is abandoned but the
# lock is already released — no orphaned lock.
try:
async for item in self._wait_for_result(message_id):
yield item
finally:
# Clean up execution entry even if generator is abandoned
self._executions.pop(message_id, None)

async def _receive_message(self):
if not self._ws:
logger.error("No WebSocket connection")
Expand Down
Loading