From abbeed394f753518242189c71df810af0ca1e5d2 Mon Sep 17 00:00:00 2001 From: Cristian Pufu Date: Fri, 20 Feb 2026 15:40:57 +0200 Subject: [PATCH] fix: add Claude integration genome --- CLAUDE.md | 42 + GUIDE.md | 1952 ------------------------------------- INTEGRATION_GENOME.md | 2135 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 2177 insertions(+), 1952 deletions(-) create mode 100644 CLAUDE.md delete mode 100644 GUIDE.md create mode 100644 INTEGRATION_GENOME.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..49ea3f5 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,42 @@ +# UiPath Runtime Python SDK + +This repository contains the runtime abstractions and interfaces for building agent +integrations in the UiPath ecosystem. + +## Building a New Integration + +If you are here to build a new integration for a Python agentic framework, read and +follow **INTEGRATION_GENOME.md** — it is a complete blueprint that guides you through +every step of building a UiPath runtime integration. + +Start by reading the genome's Phase 0 (Framework Discovery) and asking the user the +discovery questions. Then proceed through each phase sequentially, following the quality +gates. + +## Project Structure + +- `src/uipath/runtime/` — Core protocols and contracts +- `src/uipath/runtime/base.py` — `UiPathRuntimeProtocol`, `UiPathExecutionRuntime` +- `src/uipath/runtime/factory.py` — `UiPathRuntimeFactoryProtocol` +- `src/uipath/runtime/registry.py` — `UiPathRuntimeFactoryRegistry` +- `src/uipath/runtime/events/` — Event types (`UiPathRuntimeStateEvent`, `UiPathRuntimeMessageEvent`) +- `src/uipath/runtime/resumable/` — HITL support (`UiPathResumableRuntime`) +- `src/uipath/runtime/debug/` — Debugger support (`UiPathDebugRuntime`, breakpoints) +- `src/uipath/runtime/chat/` — Chat bridge support (`UiPathChatRuntime`) +- `src/uipath/runtime/schema.py` — Schema models (`UiPathRuntimeGraph`, nodes, edges) +- `src/uipath/runtime/errors/` — Error contracts and categories +- `INTEGRATION_GENOME.md` — Complete guide for building new integrations + +## Development + +```bash +uv sync --all-extras +uv run ruff check . +uv run ruff check --fix . +uv run pytest +``` + +## Reference Integrations + +- `uipath-langchain-python` — LangGraph integration (FULL tier) +- `uipath-integrations-python` — OpenAI Agents, LlamaIndex, Google ADK, Agent Framework diff --git a/GUIDE.md b/GUIDE.md deleted file mode 100644 index 22d107b..0000000 --- a/GUIDE.md +++ /dev/null @@ -1,1952 +0,0 @@ -# UiPath Runtime Integration Guide - -This document provides step-by-step instructions to generate a complete UiPath runtime integration for any Python agentic framework (e.g., Google ADK, Pydantic AI, CrewAI, AutoGen, etc.). - -## Prerequisites - -Before starting, you need: -1. The **target framework** name and its PyPI package (e.g., `google-adk`, `pydantic-ai`) -2. The **uipath-runtime** package (`uipath-runtime`) - this contains all protocols/contracts -3. The **uipath** SDK (`uipath`) - core UiPath functionality -4. Familiarity with the target framework's agent/workflow execution model - -## Architecture Overview - -A UiPath runtime integration is a Python package that bridges a third-party agentic framework with the UiPath platform. It implements a standardized protocol so UiPath can: -- **Discover** available agents/workflows from a config file -- **Infer schemas** (input/output JSON schemas) from user code -- **Execute** agents and return structured results -- **Stream** execution events in real-time -- **Suspend/Resume** execution for human-in-the-loop (HITL) scenarios -- **Visualize** agent graphs (nodes, edges, tools, handoffs) - -### Protocol Hierarchy - -``` -UiPathRuntimeProtocol (main contract) -├── UiPathExecutableProtocol → execute(input, options) -> UiPathRuntimeResult -├── UiPathStreamableProtocol → stream(input, options) -> AsyncGenerator[UiPathRuntimeEvent] -├── UiPathSchemaProtocol → get_schema() -> UiPathRuntimeSchema -└── UiPathDisposableProtocol → dispose() -> None - -UiPathRuntimeFactoryProtocol (factory contract) -├── discover_entrypoints() -> list[str] -├── new_runtime(entrypoint, runtime_id) -> UiPathRuntimeProtocol -├── get_storage() -> UiPathRuntimeStorageProtocol | None -├── get_settings() -> UiPathRuntimeFactorySettings | None -└── dispose() -> None -``` - -### Wrapper/Decorator Pattern - -The factory wraps the base runtime with additional capabilities: - -``` -UiPathResumableRuntime ← Adds HITL/resume trigger management - └── YourFrameworkRuntime ← Your base integration (implements UiPathRuntimeProtocol) -``` - -At a higher level, the platform may further wrap with: -``` -UiPathExecutionRuntime ← Adds tracing/telemetry - └── UiPathChatRuntime ← Adds conversational chat bridge - └── UiPathDebugRuntime ← Adds debugger/breakpoint support - └── UiPathResumableRuntime - └── YourFrameworkRuntime -``` - -You only implement `YourFrameworkRuntime` and `YourFrameworkFactory`. The wrappers are provided by `uipath-runtime`. - ---- - -## Package Structure - -Create a new package with this layout (replace `{framework}` with short name, e.g., `google_adk`, `pydantic_ai`): - -``` -uipath-{framework}/ -├── pyproject.toml -├── README.md -├── src/ -│ └── uipath_{framework}/ -│ ├── __init__.py -│ ├── middlewares.py # Optional: UiPath SDK middlewares -│ └── runtime/ -│ ├── __init__.py # Factory registration + exports -│ ├── factory.py # UiPathRuntimeFactoryProtocol impl -│ ├── runtime.py # UiPathRuntimeProtocol impl -│ ├── schema.py # Schema inference (input/output/graph) -│ ├── loader.py # Dynamic agent/workflow loading -│ ├── config.py # Config file parser ({framework}.json) -│ ├── errors.py # Framework-specific error codes -│ └── _storage.py # Optional: SQLite storage for HITL -├── tests/ -│ ├── test_schema_inference.py -│ ├── test_context.py -│ ├── test_graph.py -│ ├── test_integration.py -│ └── test_storage.py # If HITL supported -└── samples/ - └── quickstart-agent/ - ├── pyproject.toml - ├── {framework}.json - └── main.py -``` - ---- - -## Step-by-Step Implementation - -### STEP 1: Project Setup and Configuration - -**Goal:** Create the package skeleton, config parser, and agent loader. - -#### 1.1 Create `pyproject.toml` - -```toml -[project] -name = "uipath-{framework}" -version = "0.0.1" -description = "UiPath {Framework} SDK" -readme = "README.md" -requires-python = ">=3.11" -dependencies = [ - "aiosqlite>=0.20.0", - "{framework-pypi-package}>=X.Y.Z", # The target framework - "uipath>=2.8.0, <2.9.0", - "uipath-runtime>=0.6.0, <0.7.0", -] - -[project.entry-points."uipath.runtime.factories"] -{framework-id} = "uipath_{framework}.runtime:register_runtime_factory" - -[build-system] -requires = ["hatchling"] -build-backend = "hatchling.build" - -[tool.pytest.ini_options] -testpaths = ["tests"] -python_files = "test_*.py" -addopts = "-ra -q" -asyncio_default_fixture_loop_scope = "function" -asyncio_mode = "auto" -``` - -**Key:** The `[project.entry-points."uipath.runtime.factories"]` section registers your factory so it's auto-discovered by the UiPath CLI. - -#### 1.2 Create Configuration File Parser (`config.py`) - -The config file (e.g., `{framework}.json`) maps entrypoint names to Python file paths: - -```json -{ - "agents": { - "my_agent": "main.py:agent", - "my_workflow": "src/workflows/main.py:workflow" - } -} -``` - -The format is always `"file_path:variable_name"`. - -**Implementation pattern:** - -```python -"""Configuration file parser for {Framework} runtime.""" -import json -import os - -CONFIG_FILE = "{framework}.json" # e.g., "google_adk.json", "pydantic_ai.json" - -class FrameworkConfig: - def __init__(self, config_path: str = CONFIG_FILE): - self._config_path = config_path - self._agents: dict[str, str] | None = None - - @property - def exists(self) -> bool: - return os.path.isfile(self._config_path) - - @property - def agents(self) -> dict[str, str]: - if self._agents is None: - self._agents = self._load_agents() - return self._agents - - @property - def entrypoints(self) -> list[str]: - return list(self.agents.keys()) - - def _load_agents(self) -> dict[str, str]: - with open(self._config_path) as f: - data = json.load(f) - if "agents" not in data or not isinstance(data["agents"], dict): - raise ValueError(f"Invalid {self._config_path}: missing 'agents' dict") - return data["agents"] -``` - -#### 1.3 Create Agent/Workflow Loader (`loader.py`) - -Dynamically imports the user's agent from a Python file. - -**Implementation pattern:** - -```python -"""Dynamic agent loader for {Framework}.""" -import importlib.util -import inspect -import os -import sys -from contextlib import asynccontextmanager - -class AgentLoader: - def __init__(self, name: str, file_path: str, variable_name: str): - self.name = name - self.file_path = file_path - self.variable_name = variable_name - self._context_manager = None - - @classmethod - def from_path_string(cls, name: str, path_string: str) -> "AgentLoader": - """Parse 'file.py:variable' format.""" - if ":" not in path_string: - raise ValueError(f"Invalid path format: {path_string}. Expected 'file.py:variable'") - file_path, variable_name = path_string.split(":", 1) - return cls(name, file_path, variable_name) - - async def load(self): - """Load and return the agent/workflow object.""" - # 1. Security: ensure file is within CWD - abs_path = os.path.abspath(os.path.normpath(self.file_path)) - cwd = os.path.abspath(os.getcwd()) - if not abs_path.startswith(cwd): - raise ValueError(f"Agent file must be within current directory: {self.file_path}") - - if not os.path.isfile(abs_path): - raise FileNotFoundError(f"Agent file not found: {abs_path}") - - # 2. Setup Python path - if cwd not in sys.path: - sys.path.insert(0, cwd) - src_dir = os.path.join(cwd, "src") - if os.path.isdir(src_dir) and src_dir not in sys.path: - sys.path.insert(0, src_dir) - - # 3. Import module - module_name = f"_uipath_agent_{self.name}" - spec = importlib.util.spec_from_file_location(module_name, abs_path) - module = importlib.util.module_from_spec(spec) - sys.modules[module_name] = module - spec.loader.exec_module(module) - - # 4. Get variable - agent_obj = getattr(module, self.variable_name, None) - if agent_obj is None: - raise AttributeError(f"Variable '{self.variable_name}' not found in {self.file_path}") - - # 5. Resolve if callable/async/context-manager - return await self._resolve(agent_obj) - - async def _resolve(self, obj): - """Resolve agent from various definition patterns.""" - # Direct instance - return as-is - if not callable(obj) or isinstance(obj, type): - return obj - - # Try calling it - result = obj() - - # Async context manager - if hasattr(result, "__aenter__"): - self._context_manager = result - return await result.__aenter__() - - # Awaitable (async function) - if inspect.isawaitable(result): - return await result - - return result - - async def cleanup(self): - if self._context_manager: - await self._context_manager.__aexit__(None, None, None) - self._context_manager = None -``` - -#### 1.4 Create Error Definitions (`errors.py`) - -```python -"""Framework-specific error codes.""" -from enum import Enum -from uipath.runtime.errors import UiPathBaseRuntimeError, UiPathErrorCategory - -class FrameworkErrorCode(str, Enum): - AGENT_EXECUTION_FAILURE = "AGENT_EXECUTION_FAILURE" - TIMEOUT_ERROR = "TIMEOUT_ERROR" - SERIALIZE_OUTPUT_ERROR = "SERIALIZE_OUTPUT_ERROR" - CONFIG_MISSING = "CONFIG_MISSING" - CONFIG_INVALID = "CONFIG_INVALID" - AGENT_NOT_FOUND = "AGENT_NOT_FOUND" - AGENT_TYPE_ERROR = "AGENT_TYPE_ERROR" - AGENT_LOAD_ERROR = "AGENT_LOAD_ERROR" - AGENT_IMPORT_ERROR = "AGENT_IMPORT_ERROR" - -class FrameworkRuntimeError(UiPathBaseRuntimeError): - def __init__(self, code, title, detail, category=UiPathErrorCategory.UNKNOWN, status=None): - super().__init__( - code=code, title=title, detail=detail, - category=category, status=status, prefix="{Framework}" - ) -``` - -#### Validation for Step 1 - -Write tests to verify: -- Config file parsing (valid JSON, missing fields, non-existent file) -- Agent loader path validation (security check, missing file, missing variable) -- Agent loader resolution (direct instance, sync function, async function) -- Error code mapping - -```python -# tests/test_config.py -import pytest, json, os - -def test_config_loads_valid_file(tmp_path): - config_file = tmp_path / "{framework}.json" - config_file.write_text(json.dumps({"agents": {"agent1": "main.py:agent"}})) - os.chdir(tmp_path) - config = FrameworkConfig(str(config_file)) - assert config.exists - assert config.entrypoints == ["agent1"] - -def test_config_rejects_invalid_format(tmp_path): - config_file = tmp_path / "{framework}.json" - config_file.write_text(json.dumps({"wrong_key": {}})) - with pytest.raises(ValueError): - FrameworkConfig(str(config_file)).agents - -def test_loader_rejects_path_outside_cwd(): - loader = AgentLoader("test", "/etc/passwd", "agent") - with pytest.raises(ValueError, match="within current directory"): - await loader.load() -``` - ---- - -### STEP 2: Schema Inference (`get_schema()`) - -**Goal:** Automatically detect input/output JSON schemas from the user's agent code. - -This is the most framework-specific step. You must understand how the target framework defines: -- **Input**: What data does the agent accept? (messages, structured config, context objects) -- **Output**: What does the agent return? (text, structured data, Pydantic models) -- **Graph**: What is the agent's structure? (nodes, tools, handoffs, subgraphs) - -#### 2.1 Research the Framework's Input/Output Model - -For each framework, identify: - -| Framework | Input Model | Output Model | Graph Model | -|-----------|------------|--------------|-------------| -| **OpenAI Agents** | `messages: str\|list` + `Agent[Context]` generic param | `agent.output_type` (Pydantic model or string) | Agents + handoffs + tools (flat) | -| **LlamaIndex** | `StartEvent` Pydantic model | `StopEvent` or `workflow.output_cls` | Workflow steps + event flow | -| **LangGraph** | `graph.get_input_jsonschema()` | `graph.get_output_jsonschema()` | `graph.get_graph(xray=N)` | -| **Google ADK** | `LlmAgent.input_schema` (Pydantic `BaseModel`) — only on `LlmAgent`, not `BaseAgent` | `LlmAgent.output_schema` (Pydantic `BaseModel`) or `LlmAgent.output_key` (str) | `BaseAgent.sub_agents` tree + `LlmAgent.tools` list (`BaseTool`, `Callable`, `AgentTool`) | -| **Pydantic AI** | `agent.run()` params / `deps_type` | `agent.result_type` | Agent + tools | -| **CrewAI** | Crew's input variables | Crew's output type | Agents + tasks graph | - -#### 2.2 Schema Inference Principles - -**The user's code dictates the schemas.** If the framework agent has strongly-typed input/output (e.g., Pydantic models), those types ARE the schemas. Messages is only the fallback for conversational agents without typed I/O. - -**Key rules:** -1. **Use `isinstance` checks, not `getattr` guessing.** Verify the agent's actual type before accessing framework-specific attributes. Different agent types (e.g., LlmAgent vs composite agents) expose different attributes. -2. **Import types from their actual module paths**, not from `__init__.py` re-exports that may not be stable. E.g., `from framework.tools.base_tool import BaseTool` instead of `from framework.tools import BaseTool`. -3. **Use `uipath.runtime.schema` helpers** (`transform_references`, `transform_nullable_types`) instead of reimplementing ref resolution or nullable handling. -4. **Typed input replaces messages.** If the agent defines `input_schema` (Pydantic model), use that as the full input schema. Don't merge it with messages. -5. **Typed output replaces the generic result.** If the agent defines `output_schema` or equivalent, use that as the full output schema. - -#### 2.3 Implement Schema Extraction (`schema.py`) - -```python -"""Schema inference for {Framework} agents.""" -from typing import Any -from pydantic import BaseModel, TypeAdapter -from uipath.runtime.schema import ( - UiPathRuntimeSchema, UiPathRuntimeGraph, UiPathRuntimeNode, UiPathRuntimeEdge, - transform_references, transform_nullable_types, -) - -# Import the actual agent types for isinstance checks -from framework import BaseAgent, LlmAgent # Use real framework types - - -def get_entrypoints_schema(agent: BaseAgent) -> dict[str, Any]: - """Extract input/output JSON schemas from a framework agent. - - The user's code defines the agent type and its I/O schemas. - Use isinstance checks to determine what attributes are available. - """ - schema: dict[str, Any] = { - "input": _default_input_schema(), - "output": _default_output_schema(), - } - - # Only access typed I/O attributes on agent types that have them - if not isinstance(agent, LlmAgent): - return schema - - # Input: if agent has typed input_schema, use it as the full input - input_type = agent.input_schema # Framework-specific attribute - if input_type is not None and _is_pydantic_model(input_type): - try: - adapter = TypeAdapter(input_type) - json_schema = adapter.json_schema() - resolved, _ = transform_references(json_schema) - schema["input"] = { - "type": "object", - "properties": transform_nullable_types( - resolved.get("properties", {}) - ), - "required": resolved.get("required", []), - } - except Exception: - pass - - # Output: if agent has typed output, use it as the full output - output_type = agent.output_schema # Framework-specific attribute - if output_type is not None and _is_pydantic_model(output_type): - try: - adapter = TypeAdapter(output_type) - json_schema = adapter.json_schema() - resolved, _ = transform_references(json_schema) - schema["output"] = { - "type": "object", - "properties": transform_nullable_types( - resolved.get("properties", {}) - ), - "required": resolved.get("required", []), - } - except Exception: - pass - - return schema - - -def _default_input_schema() -> dict[str, Any]: - """Fallback input schema for conversational agents without typed input.""" - return { - "type": "object", - "properties": { - "messages": { - "anyOf": [ - {"type": "string"}, - {"type": "array", "items": {"type": "object"}} - ], - "title": "Messages", - "description": "User messages to send to the agent", - } - }, - "required": ["messages"], - } - - -def _default_output_schema() -> dict[str, Any]: - """Fallback output schema when no typed output is defined.""" - return { - "type": "object", - "properties": { - "result": { - "title": "Result", - "description": "The agent's response", - "anyOf": [ - {"type": "string"}, - {"type": "object"}, - {"type": "array", "items": {"type": "object"}}, - ], - } - }, - "required": ["result"], - } - - -def _is_pydantic_model(type_hint: Any) -> bool: - """Check if a type hint is a Pydantic BaseModel class.""" - import inspect - try: - return inspect.isclass(type_hint) and issubclass(type_hint, BaseModel) - except TypeError: - return False -``` - -#### 2.3 Implement Graph Building - -```python -def get_agent_graph(agent) -> UiPathRuntimeGraph: - """Build a visualization graph from the agent's structure. - - Node types: - - "__start__": Entry point - - "__end__": Exit point - - "node": Regular processing step - - "model": LLM call step - - "tool": Tool execution step - - "external": Human interaction point (HITL) - - Edge rules: - - __start__ -> first node (label: "input") - - last node -> __end__ (label: "output") - - Bidirectional edges for tool calls (agent <-> tools) - - Bidirectional edges for handoffs (agent <-> agent) - """ - nodes = [] - edges = [] - visited = set() # Prevent circular references - - # Always add start/end - nodes.append(UiPathRuntimeNode(id="__start__", name="__start__", type="__start__")) - nodes.append(UiPathRuntimeNode(id="__end__", name="__end__", type="__end__")) - - # Framework-specific: traverse agent structure - _add_agent_nodes(agent, nodes, edges, visited) - - # Connect start -> main agent - if len(nodes) > 2: - main_node = nodes[2] # First non-control node - edges.append(UiPathRuntimeEdge(source="__start__", target=main_node.id, label="input")) - edges.append(UiPathRuntimeEdge(source=main_node.id, target="__end__", label="output")) - - return UiPathRuntimeGraph(nodes=nodes, edges=edges) - - -def _add_agent_nodes(agent, nodes, edges, visited): - """Recursively add agent nodes. FRAMEWORK-SPECIFIC. - - For each framework, detect: - 1. Sub-agents / handoffs -> create nodes + bidirectional edges - 2. Tools -> create aggregated tool nodes with metadata - 3. Subgraphs -> create nodes with nested UiPathRuntimeGraph - """ - raise NotImplementedError("Implement for your framework") -``` - -#### Validation for Step 2 - -```python -# tests/test_schema_inference.py -import pytest - -def test_conversational_agent_uses_messages(): - """Agent without typed input falls back to messages.""" - agent = create_basic_agent() # No input_schema - schema = get_entrypoints_schema(agent) - assert "messages" in schema["input"]["properties"] - assert "messages" in schema["input"]["required"] - -def test_typed_input_replaces_messages(): - """Agent with input_schema uses it as the full input — no messages.""" - agent = create_agent_with_input_schema() - schema = get_entrypoints_schema(agent) - assert "my_field" in schema["input"]["properties"] - assert "messages" not in schema["input"]["properties"] - -def test_typed_output_replaces_generic_result(): - """Agent with output_schema uses it as the full output.""" - agent = create_agent_with_output_schema() - schema = get_entrypoints_schema(agent) - assert "my_result" in schema["output"]["properties"] - assert "result" not in schema["output"]["properties"] - -def test_output_schema_fallback(): - """Agent without typed output falls back to generic result.""" - agent = create_basic_agent() - schema = get_entrypoints_schema(agent) - assert "result" in schema["output"]["properties"] - -def test_composite_agent_gets_default_schemas(): - """Composite/orchestrator agents that don't have typed I/O get defaults.""" - agent = create_composite_agent() - schema = get_entrypoints_schema(agent) - assert "messages" in schema["input"]["properties"] - assert "result" in schema["output"]["properties"] - -def test_graph_has_start_end(): - agent = create_test_agent() - graph = get_agent_graph(agent) - node_ids = {n.id for n in graph.nodes} - assert "__start__" in node_ids - assert "__end__" in node_ids - -def test_graph_tools_have_metadata(): - agent = create_agent_with_tools() - graph = get_agent_graph(agent) - tool_nodes = [n for n in graph.nodes if n.type == "tool"] - assert len(tool_nodes) > 0 - assert "tool_names" in tool_nodes[0].metadata -``` - ---- - -### STEP 3: Execute (`execute()`) - -**Goal:** Run the agent synchronously and return a structured result. - -#### 3.1 Implement the Runtime Class (`runtime.py`) - -```python -"""Runtime implementation for {Framework}.""" -import uuid -from typing import Any, AsyncGenerator - -from uipath.runtime import ( - UiPathExecuteOptions, UiPathStreamOptions, - UiPathRuntimeResult, UiPathRuntimeStatus, - UiPathRuntimeEvent, UiPathRuntimeSchema, -) -from uipath.runtime.events import UiPathRuntimeMessageEvent, UiPathRuntimeStateEvent - -from uipath.core.serialization import serialize_defaults -from .schema import get_entrypoints_schema, get_agent_graph -from .errors import FrameworkRuntimeError, FrameworkErrorCode - - -class UiPathFrameworkRuntime: - """UiPath runtime for {Framework} agents. - - Implements UiPathRuntimeProtocol via duck typing (Python Protocol). - """ - - def __init__(self, agent, entrypoint: str = "", runtime_id: str | None = None): - self.agent = agent - self.entrypoint = entrypoint - self.runtime_id = runtime_id or str(uuid.uuid4()) - - # ── execute ────────────────────────────────────────────────────── - - async def execute( - self, - input: dict[str, Any] | None = None, - options: UiPathExecuteOptions | None = None, - ) -> UiPathRuntimeResult: - """Execute the agent and return the final result.""" - try: - # 1. Parse input into framework-specific format - agent_input = self._prepare_input(input or {}) - - # 2. Run the agent using the framework's runner - # FRAMEWORK-SPECIFIC: Replace with actual runner call - # Examples: - # - OpenAI Agents: Runner.run(starting_agent=self.agent, input=agent_input) - # - LlamaIndex: workflow.run(**agent_input) - # - LangGraph: graph.ainvoke(agent_input, config=self._get_config()) - # - Pydantic AI: agent.run(agent_input) - # - Google ADK: runner.run(agent=self.agent, input=agent_input) - result = await self._run_agent(agent_input, options) - - # 3. Serialize output to dict - output = serialize_defaults(result) - if not isinstance(output, dict): - output = {"result": output} - - return UiPathRuntimeResult( - output=output, - status=UiPathRuntimeStatus.SUCCESSFUL, - ) - - except Exception as e: - raise self._create_error(e) from e - - # ── stream (placeholder - implemented in Step 4) ───────────────── - - async def stream( - self, - input: dict[str, Any] | None = None, - options: UiPathStreamOptions | None = None, - ) -> AsyncGenerator[UiPathRuntimeEvent, None]: - """Stream execution events. Implemented in Step 4.""" - # Fallback: just execute and yield result - result = await self.execute(input, options) - yield result - - # ── get_schema ─────────────────────────────────────────────────── - - async def get_schema(self) -> UiPathRuntimeSchema: - """Return input/output schema and graph structure.""" - schemas = get_entrypoints_schema(self.agent) - graph = get_agent_graph(self.agent) - - return UiPathRuntimeSchema( - filePath=self.entrypoint, - uniqueId=str(uuid.uuid4()), - type="agent", - input=schemas["input"], - output=schemas["output"], - graph=graph, - ) - - # ── dispose ────────────────────────────────────────────────────── - - async def dispose(self) -> None: - """Clean up resources.""" - pass - - # ── Internal helpers ───────────────────────────────────────────── - - def _prepare_input(self, input: dict[str, Any]) -> Any: - """Convert UiPath input dict to framework-specific format. - - FRAMEWORK-SPECIFIC. Common patterns: - - Extract "messages" field for conversational input - - Parse remaining fields into context/deps object - - Map UiPath message format to framework's message types - """ - messages = input.get("messages", "") - # Return framework-specific input format - return messages - - async def _run_agent(self, agent_input, options): - """Run the agent using the framework's execution API. - - FRAMEWORK-SPECIFIC. This is the core integration point. - """ - raise NotImplementedError("Implement with framework's runner") - - def _create_error(self, e: Exception) -> FrameworkRuntimeError: - """Map exceptions to structured runtime errors.""" - import json - if isinstance(e, json.JSONDecodeError): - return FrameworkRuntimeError( - code=FrameworkErrorCode.AGENT_EXECUTION_FAILURE, - title="Invalid JSON input", - detail=str(e), - ) - if isinstance(e, TimeoutError): - return FrameworkRuntimeError( - code=FrameworkErrorCode.TIMEOUT_ERROR, - title="Agent execution timed out", - detail=str(e), - ) - return FrameworkRuntimeError( - code=FrameworkErrorCode.AGENT_EXECUTION_FAILURE, - title="Agent execution failed", - detail=str(e), - ) -``` - -#### 3.2 Output Serialization - -Use the `serialize_defaults` function from `uipath.core.serialization` — do NOT create a custom `_serialize.py`. This utility handles all common Python types (Pydantic models, dataclasses, dicts, lists, enums, primitives) and is already tested and maintained by the core SDK. - -```python -from uipath.core.serialization import serialize_defaults - -# In execute(): -output = serialize_defaults(result) -if not isinstance(output, dict): - output = {"result": output} - -# In stream() for event payloads: -payload = serialize_defaults(event_data) -``` - -#### Validation for Step 3 - -```python -# tests/test_execute.py -import pytest - -@pytest.mark.asyncio -async def test_execute_returns_successful_result(): - runtime = create_test_runtime() # With a mock agent - result = await runtime.execute({"messages": "Hello"}) - assert result.status == UiPathRuntimeStatus.SUCCESSFUL - assert isinstance(result.output, dict) - -@pytest.mark.asyncio -async def test_execute_wraps_errors(): - runtime = create_failing_runtime() - with pytest.raises(FrameworkRuntimeError): - await runtime.execute({"messages": "trigger error"}) - -@pytest.mark.asyncio -async def test_get_schema_returns_valid_schema(): - runtime = create_test_runtime() - schema = await runtime.get_schema() - assert schema.type == "agent" - assert "messages" in schema.input["properties"] - assert schema.graph is not None -``` - ---- - -### STEP 4: Streaming (`stream()`) - -**Goal:** Yield real-time execution events during agent execution. - -#### 4.1 Understand Event Types - -Your `stream()` must yield these event types: - -| Event | When to emit | Payload | -|-------|-------------|---------| -| `UiPathRuntimeMessageEvent` | AI generates text, tool calls, reasoning | Framework-specific message object (serialized to dict) | -| `UiPathRuntimeStateEvent` | Agent state changes, node transitions | `{"key": "value"}` state dict, with `node_name` | -| `UiPathRuntimeResult` | **Final event** - execution complete | Output dict + status | - -#### 4.2 Implement Streaming - -Replace the placeholder `stream()` in runtime.py: - -```python -async def stream( - self, - input: dict[str, Any] | None = None, - options: UiPathStreamOptions | None = None, -) -> AsyncGenerator[UiPathRuntimeEvent, None]: - """Stream execution events in real-time.""" - try: - agent_input = self._prepare_input(input or {}) - - # FRAMEWORK-SPECIFIC: Use the framework's streaming API - # Examples: - # OpenAI: Runner.run_streamed() + result.stream_events() - # LlamaIndex: handler.stream_events(expose_internal=True) - # LangGraph: graph.astream() with stream_mode=["messages","updates","tasks"] - # Pydantic AI: agent.run_stream() - # Google ADK: runner.run_stream() - - async for event in self._stream_agent(agent_input, options): - runtime_event = self._convert_event(event) - if runtime_event is not None: - yield runtime_event - - # Final result (MUST be last yield) - result = self._get_final_result() - yield UiPathRuntimeResult( - output=serialize_defaults(result), - status=UiPathRuntimeStatus.SUCCESSFUL, - ) - - except Exception as e: - raise self._create_error(e) from e - - -def _convert_event(self, event) -> UiPathRuntimeEvent | None: - """Convert a framework event to a UiPath runtime event. - - FRAMEWORK-SPECIFIC. Map each framework event type: - - For MESSAGE events (AI text, tool calls, reasoning): - return UiPathRuntimeMessageEvent( - payload=serialize_defaults(event), - metadata={"event_name": "message_created"}, - ) - - For STATE events (node transitions, state updates): - return UiPathRuntimeStateEvent( - payload=serialize_defaults(state_dict), - node_name="node_that_produced_this", - metadata={"event_name": "state_update"}, - ) - - For events to SKIP (raw HTTP responses, internal bookkeeping): - return None - """ - raise NotImplementedError("Implement event conversion for your framework") -``` - -#### 4.3 Event Mapping Reference - -**OpenAI Agents events:** -- `run_item_stream_event` + `message_output_created` → `UiPathRuntimeMessageEvent` -- `run_item_stream_event` + `reasoning_item_created` → `UiPathRuntimeMessageEvent` -- `run_item_stream_event` (other) → `UiPathRuntimeStateEvent` -- `agent_updated_stream_event` → `UiPathRuntimeStateEvent` -- Raw response events → skip (too granular) - -**LlamaIndex events:** -- `AgentInput`, `AgentOutput`, `AgentStream` → `UiPathRuntimeMessageEvent` -- `ToolCall`, `ToolCallResult` → `UiPathRuntimeMessageEvent` -- Other workflow events → `UiPathRuntimeStateEvent` - -**LangGraph stream chunks:** -- `messages` tuple (BaseMessage/AIMessageChunk) → `UiPathRuntimeMessageEvent` -- `updates` tuple (node state dicts) → `UiPathRuntimeStateEvent` -- `tasks` tuple → `UiPathRuntimeStateEvent` - -#### Validation for Step 4 - -```python -# tests/test_streaming.py -import pytest - -@pytest.mark.asyncio -async def test_stream_yields_events(): - runtime = create_test_runtime() - events = [] - async for event in runtime.stream({"messages": "Hello"}): - events.append(event) - assert len(events) >= 1 - assert isinstance(events[-1], UiPathRuntimeResult) # Last event is result - -@pytest.mark.asyncio -async def test_stream_message_events(): - runtime = create_test_runtime() - message_events = [] - async for event in runtime.stream({"messages": "Hello"}): - if isinstance(event, UiPathRuntimeMessageEvent): - message_events.append(event) - assert len(message_events) > 0 - assert all(e.payload is not None for e in message_events) -``` - ---- - -### STEP 5: Human-in-the-Loop / Durable Execution - -**Goal:** Support suspending execution when human input is needed, and resuming later. - -Not all frameworks support this. Check if the target framework has: -- **Interrupt/suspend semantics** (LangGraph `interrupt()`, LlamaIndex `InputRequiredEvent`) -- **State serialization** (checkpointing, context saving) -- **Resume capability** (continuing from a saved state) - -#### 5.1 Determine HITL Capability - -| Framework | HITL Support | Mechanism | -|-----------|-------------|-----------| -| **LangGraph** | Full | `interrupt()` function, `Command(resume=...)`, `AsyncSqliteSaver` checkpointer | -| **LlamaIndex** | Full | `InputRequiredEvent`, `HumanResponseEvent`, `JsonPickleSerializer` context save | -| **OpenAI Agents** | None built-in | No native suspend/resume (would need custom implementation) | -| **Google ADK** | Partial | Depends on orchestration mode | -| **Pydantic AI** | None built-in | No native suspend/resume | -| **CrewAI** | Partial | Human input tool, but no state serialization | - -#### 5.2 Implement Storage (`_storage.py`) - -If the framework supports HITL, implement SQLite-based storage: - -```python -"""SQLite storage for resume triggers and state persistence.""" -import json -import asyncio -import aiosqlite -from typing import Any -from uipath.runtime import UiPathResumeTrigger - -class SqliteResumableStorage: - """Async SQLite storage for HITL resume triggers and KV data.""" - - def __init__(self, db_path: str): - self._db_path = db_path - self._db: aiosqlite.Connection | None = None - self._lock = asyncio.Lock() - - async def initialize(self): - self._db = await aiosqlite.connect(self._db_path) - await self._db.execute("PRAGMA journal_mode=WAL") - await self._db.execute("PRAGMA synchronous=NORMAL") - - # Resume triggers table - await self._db.execute(""" - CREATE TABLE IF NOT EXISTS __uipath_resume_triggers ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - runtime_id TEXT NOT NULL, - interrupt_id TEXT, - data TEXT NOT NULL, - timestamp DATETIME DEFAULT CURRENT_TIMESTAMP - ) - """) - await self._db.execute( - "CREATE INDEX IF NOT EXISTS idx_triggers_runtime ON __uipath_resume_triggers(runtime_id)" - ) - - # Key-value table - await self._db.execute(""" - CREATE TABLE IF NOT EXISTS __uipath_runtime_kv ( - runtime_id TEXT NOT NULL, - namespace TEXT NOT NULL, - key TEXT NOT NULL, - value TEXT, - timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (runtime_id, namespace, key) - ) - """) - await self._db.commit() - - # UiPathResumableStorageProtocol methods - async def save_triggers(self, runtime_id: str, triggers: list[UiPathResumeTrigger]): - async with self._lock: - await self._db.execute( - "DELETE FROM __uipath_resume_triggers WHERE runtime_id = ?", (runtime_id,) - ) - for trigger in triggers: - await self._db.execute( - "INSERT INTO __uipath_resume_triggers (runtime_id, interrupt_id, data) VALUES (?, ?, ?)", - (runtime_id, trigger.interrupt_id, trigger.model_dump_json(by_alias=True)), - ) - await self._db.commit() - - async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None: - async with self._lock: - cursor = await self._db.execute( - "SELECT data FROM __uipath_resume_triggers WHERE runtime_id = ?", (runtime_id,) - ) - rows = await cursor.fetchall() - if not rows: - return None - return [UiPathResumeTrigger.model_validate_json(row[0]) for row in rows] - - async def delete_trigger(self, runtime_id: str, trigger: UiPathResumeTrigger): - async with self._lock: - await self._db.execute( - "DELETE FROM __uipath_resume_triggers WHERE runtime_id = ? AND interrupt_id = ?", - (runtime_id, trigger.interrupt_id), - ) - await self._db.commit() - - # UiPathRuntimeStorageProtocol methods - async def set_value(self, runtime_id: str, namespace: str, key: str, value: Any): - async with self._lock: - if value is None: - serialized = None - elif isinstance(value, str): - serialized = f"s:{value}" - else: - serialized = f"j:{json.dumps(value)}" - await self._db.execute( - "INSERT OR REPLACE INTO __uipath_runtime_kv VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)", - (runtime_id, namespace, key, serialized), - ) - await self._db.commit() - - async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: - async with self._lock: - cursor = await self._db.execute( - "SELECT value FROM __uipath_runtime_kv WHERE runtime_id=? AND namespace=? AND key=?", - (runtime_id, namespace, key), - ) - row = await cursor.fetchone() - if row is None or row[0] is None: - return None - val = row[0] - if val.startswith("s:"): - return val[2:] - if val.startswith("j:"): - return json.loads(val[2:]) - return val - - async def dispose(self): - if self._db: - await self._db.close() - self._db = None -``` - -#### 5.3 Implement Suspend/Resume in Runtime - -When the framework signals a need for human input: - -```python -# In runtime.py - extend execute() and stream() - -async def _handle_suspension(self, suspend_value, runtime_id): - """Handle agent suspension for HITL. - - Returns UiPathRuntimeResult with SUSPENDED status and resume trigger info. - - FRAMEWORK-SPECIFIC: - - LangGraph: detect state.interrupts, save checkpointed state - - LlamaIndex: detect InputRequiredEvent, save context via JsonPickleSerializer - """ - return UiPathRuntimeResult( - output=serialize_defaults(suspend_value), - status=UiPathRuntimeStatus.SUSPENDED, - # Triggers are managed by UiPathResumableRuntime wrapper - ) - -async def _handle_resume(self, input, options): - """Resume a suspended execution. - - FRAMEWORK-SPECIFIC: - - LangGraph: wrap input in Command(resume=input), graph resumes from checkpoint - - LlamaIndex: load saved context, send HumanResponseEvent - """ - raise NotImplementedError("Implement resume for your framework") -``` - -#### 5.4 Wire HITL into Factory - -```python -# In factory.py - new_runtime() wraps with UiPathResumableRuntime - -from uipath.runtime import UiPathResumableRuntime - -async def new_runtime(self, entrypoint, runtime_id, **kwargs): - agent = await self._resolve_agent(entrypoint) - base_runtime = UiPathFrameworkRuntime(agent, entrypoint, runtime_id) - - # Only wrap if framework supports HITL - if self._supports_hitl(): - storage = await self.get_storage() - trigger_manager = self._create_trigger_manager() - return UiPathResumableRuntime( - delegate=base_runtime, - storage=storage, - trigger_manager=trigger_manager, - runtime_id=runtime_id, - ) - return base_runtime -``` - -#### Validation for Step 5 - -```python -# tests/test_storage.py -import pytest - -@pytest.mark.asyncio -async def test_save_and_retrieve_triggers(tmp_path): - storage = SqliteResumableStorage(str(tmp_path / "state.db")) - await storage.initialize() - - trigger = UiPathResumeTrigger(interrupt_id="int-1", ...) - await storage.save_triggers("runtime-1", [trigger]) - retrieved = await storage.get_triggers("runtime-1") - assert len(retrieved) == 1 - assert retrieved[0].interrupt_id == "int-1" - - await storage.dispose() - -@pytest.mark.asyncio -async def test_kv_store(tmp_path): - storage = SqliteResumableStorage(str(tmp_path / "state.db")) - await storage.initialize() - - await storage.set_value("rt-1", "ns", "key", {"foo": "bar"}) - val = await storage.get_value("rt-1", "ns", "key") - assert val == {"foo": "bar"} - - await storage.dispose() - -# tests/test_hitl.py - if framework supports it -@pytest.mark.asyncio -async def test_suspend_and_resume(): - runtime = create_hitl_runtime() - result = await runtime.execute({"messages": "request approval"}) - assert result.status == UiPathRuntimeStatus.SUSPENDED - - resumed = await runtime.execute( - {"messages": "approved"}, - options=UiPathExecuteOptions(resume=True), - ) - assert resumed.status == UiPathRuntimeStatus.SUCCESSFUL -``` - ---- - -### STEP 6: Chat Message Streaming (UiPathConversation Format) - -**Goal:** Convert framework-specific streaming messages to UiPath's conversation protocol for real-time chat UIs. - -This step is needed when the runtime is used in a **conversational context** (chat UI). The `UiPathChatRuntime` wrapper handles the chat protocol, but your runtime's `UiPathRuntimeMessageEvent` payloads should be structured enough for conversion. - -#### 6.1 UiPath Conversation Event Hierarchy - -``` -UiPathConversationEvent -├── conversation_id -└── exchange (one turn) - ├── exchange_id - └── message - ├── message_id - ├── start (role, timestamp) - ├── content_part - │ ├── start (content_part_id, mime_type) - │ ├── chunk (data: text chunk) - │ └── end - ├── tool_call - │ ├── start (tool_call_id, name, input) - │ └── end (output, timestamp) - └── end -``` - -#### 6.2 Message Mapper (Optional but recommended for chat-heavy frameworks) - -If your framework has rich message types (like LangChain's `BaseMessage`, `AIMessageChunk`), implement a mapper: - -```python -"""Chat message mapper for {Framework} <-> UiPath conversation format.""" -from uipath.core.chat import ( - UiPathConversationMessageEvent, - UiPathConversationMessageStartEvent, - UiPathConversationMessageEndEvent, - UiPathConversationContentPartEvent, - UiPathConversationContentPartStartEvent, - UiPathConversationContentPartChunkEvent, - UiPathConversationContentPartEndEvent, - UiPathConversationToolCallEvent, - UiPathConversationToolCallStartEvent, - UiPathConversationToolCallEndEvent, -) - -class ChatMessageMapper: - """Maps framework messages to UiPath conversation events.""" - - def __init__(self, storage=None): - self._current_message_id: str | None = None - self._storage = storage # For correlating tool calls to messages - - def map_event(self, message) -> list[UiPathConversationMessageEvent]: - """Convert a framework message to UiPath conversation events. - - FRAMEWORK-SPECIFIC. Must handle: - 1. Message start (new AI message) -> MessageStartEvent - 2. Text chunks -> ContentPartChunkEvent - 3. Tool call starts -> ToolCallStartEvent - 4. Tool results -> ToolCallEndEvent - 5. Message end -> MessageEndEvent - """ - raise NotImplementedError("Implement for your framework") - - def map_input_messages(self, messages) -> list: - """Convert UiPath message format to framework's message types. - - FRAMEWORK-SPECIFIC. Handle: - - String messages -> framework's user message type - - UiPathConversationMessage list -> framework message types - - Dict list -> parse to framework message types - """ - raise NotImplementedError("Implement for your framework") -``` - -#### Validation for Step 6 - -```python -# tests/test_messages.py -import pytest - -def test_map_ai_message_to_conversation_events(): - mapper = ChatMessageMapper() - events = mapper.map_event(create_ai_message("Hello!")) - assert any(e.start is not None for e in events) # Has start event - assert any(e.content_part is not None for e in events) # Has content - -def test_map_tool_call_events(): - mapper = ChatMessageMapper() - events = mapper.map_event(create_tool_call_message()) - tool_events = [e for e in events if e.tool_call is not None] - assert len(tool_events) > 0 -``` - ---- - -### STEP 7: Factory Implementation and Registration - -**Goal:** Tie everything together with the factory that creates runtime instances. - -#### 7.1 Implement the Factory (`factory.py`) - -```python -"""Runtime factory for {Framework}.""" -import asyncio -import logging -from typing import Any - -from uipath.runtime import ( - UiPathRuntimeContext, - UiPathRuntimeFactoryProtocol, - UiPathRuntimeFactorySettings, - UiPathRuntimeProtocol, - UiPathRuntimeStorageProtocol, - UiPathResumableRuntime, -) - -from .config import FrameworkConfig, CONFIG_FILE -from .loader import AgentLoader -from .runtime import UiPathFrameworkRuntime -from .errors import FrameworkRuntimeError, FrameworkErrorCode -from ._storage import SqliteResumableStorage - -logger = logging.getLogger(__name__) - - -class UiPathFrameworkRuntimeFactory: - """Factory for creating {Framework} runtime instances. - - Implements UiPathRuntimeFactoryProtocol via duck typing. - """ - - def __init__(self, context: UiPathRuntimeContext | None = None): - self.context = context or UiPathRuntimeContext() - self._agent_cache: dict[str, Any] = {} - self._agent_loaders: dict[str, AgentLoader] = {} - self._agent_lock = asyncio.Lock() - self._storage: SqliteResumableStorage | None = None - - # Optional: instrument framework for tracing - # e.g., OpenAIAgentsInstrumentor().instrument() - - def discover_entrypoints(self) -> list[str]: - """Return list of available agent names from config.""" - config = FrameworkConfig() - if not config.exists: - return [] - return config.entrypoints - - async def new_runtime( - self, entrypoint: str, runtime_id: str, **kwargs - ) -> UiPathRuntimeProtocol: - """Create a new runtime instance for an entrypoint.""" - agent = await self._resolve_agent(entrypoint) - storage = await self.get_storage() - - base_runtime = UiPathFrameworkRuntime( - agent=agent, - entrypoint=entrypoint, - runtime_id=runtime_id, - storage=storage, # Pass storage if runtime needs it (e.g., for state) - ) - - # Wrap with UiPathResumableRuntime for HITL support (if applicable) - # Remove this wrapping if framework doesn't support HITL - return base_runtime - - async def get_storage(self) -> UiPathRuntimeStorageProtocol | None: - """Get or create shared SQLite storage.""" - if self._storage is None: - db_path = self.context.state_file_path or "__uipath/state.db" - self._storage = SqliteResumableStorage(db_path) - await self._storage.initialize() - return self._storage - - async def get_settings(self) -> UiPathRuntimeFactorySettings | None: - return None - - async def dispose(self) -> None: - """Clean up all resources.""" - for loader in self._agent_loaders.values(): - try: - await loader.cleanup() - except Exception as e: - logger.warning(f"Error cleaning up loader: {e}") - self._agent_cache.clear() - self._agent_loaders.clear() - if self._storage: - await self._storage.dispose() - self._storage = None - - async def _resolve_agent(self, entrypoint: str): - """Load and cache agent for an entrypoint.""" - async with self._agent_lock: - if entrypoint in self._agent_cache: - return self._agent_cache[entrypoint] - - config = FrameworkConfig() - if not config.exists: - raise FrameworkRuntimeError( - code=FrameworkErrorCode.CONFIG_MISSING, - title="Config file not found", - detail=f"{CONFIG_FILE} not found in current directory", - ) - - if entrypoint not in config.agents: - raise FrameworkRuntimeError( - code=FrameworkErrorCode.AGENT_NOT_FOUND, - title="Agent not found", - detail=f"Agent '{entrypoint}' not in {CONFIG_FILE}", - ) - - path_string = config.agents[entrypoint] - loader = AgentLoader.from_path_string(entrypoint, path_string) - agent = await loader.load() - - self._agent_cache[entrypoint] = agent - self._agent_loaders[entrypoint] = loader - return agent -``` - -#### 7.2 Register the Factory (`__init__.py`) - -```python -"""UiPath {Framework} Runtime Package.""" -from uipath.runtime import UiPathRuntimeFactoryRegistry, UiPathRuntimeContext - -from .factory import UiPathFrameworkRuntimeFactory -from .config import CONFIG_FILE - - -def register_runtime_factory(): - """Register the {Framework} runtime factory with UiPath.""" - def create_factory(context: UiPathRuntimeContext | None = None): - return UiPathFrameworkRuntimeFactory( - context=context if context else UiPathRuntimeContext() - ) - - UiPathRuntimeFactoryRegistry.register( - "{framework-id}", # e.g., "google-adk", "pydantic-ai" - create_factory, - CONFIG_FILE, # e.g., "google_adk.json" - ) -``` - -#### 7.3 Entry Point in `pyproject.toml` - -Already covered in Step 1, but critical: - -```toml -[project.entry-points."uipath.runtime.factories"] -{framework-id} = "uipath_{framework}.runtime:register_runtime_factory" -``` - -This allows `uipath run` CLI to auto-discover your factory when the config file is present. - -#### Validation for Step 7 - -```python -# tests/test_factory.py -import pytest - -@pytest.mark.asyncio -async def test_factory_discovers_entrypoints(tmp_path): - # Setup config file - write_config(tmp_path, {"agents": {"agent1": "main.py:agent"}}) - os.chdir(tmp_path) - - factory = UiPathFrameworkRuntimeFactory() - entrypoints = factory.discover_entrypoints() - assert entrypoints == ["agent1"] - -@pytest.mark.asyncio -async def test_factory_creates_runtime(tmp_path): - # Setup config + agent file - write_config(tmp_path, {"agents": {"agent1": "main.py:agent"}}) - write_agent_file(tmp_path / "main.py") - os.chdir(tmp_path) - - factory = UiPathFrameworkRuntimeFactory() - runtime = await factory.new_runtime("agent1", "test-runtime-id") - assert runtime is not None - - schema = await runtime.get_schema() - assert schema.type == "agent" - - await factory.dispose() - -@pytest.mark.asyncio -async def test_factory_rejects_unknown_entrypoint(tmp_path): - write_config(tmp_path, {"agents": {"agent1": "main.py:agent"}}) - os.chdir(tmp_path) - - factory = UiPathFrameworkRuntimeFactory() - with pytest.raises(FrameworkRuntimeError): - await factory.new_runtime("nonexistent", "test-id") -``` - ---- - -### STEP 8: Integration Testing with a Sample Agent - -**Goal:** Create an end-to-end sample that validates the full integration works. - -#### 8.1 Create a Sample Agent - -``` -samples/quickstart-agent/ -├── pyproject.toml -├── {framework}.json -└── main.py -``` - -**`{framework}.json`:** -```json -{ - "agents": { - "assistant": "main.py:agent" - } -} -``` - -**`main.py`:** Create a simple agent using the target framework. - -#### 8.2 End-to-End Tests - -```python -# tests/test_e2e.py -import pytest - -@pytest.mark.asyncio -async def test_full_lifecycle(): - """Test the complete runtime lifecycle.""" - factory = UiPathFrameworkRuntimeFactory() - - # 1. Discover - entrypoints = factory.discover_entrypoints() - assert len(entrypoints) > 0 - - # 2. Create runtime - runtime = await factory.new_runtime(entrypoints[0], "test-run") - - # 3. Schema - schema = await runtime.get_schema() - assert schema.input is not None - assert schema.output is not None - - # 4. Execute - result = await runtime.execute({"messages": "What is 2+2?"}) - assert result.status == UiPathRuntimeStatus.SUCCESSFUL - assert result.output is not None - - # 5. Stream - events = [] - async for event in runtime.stream({"messages": "What is 2+2?"}): - events.append(event) - assert len(events) >= 1 - assert isinstance(events[-1], UiPathRuntimeResult) - - # 6. Cleanup - await runtime.dispose() - await factory.dispose() -``` - ---- - -### STEP 9: LLM Provider Classes (Chat Module) - -Most framework integrations need to route LLM calls through UiPath's LLM Gateway (AgentHub). This step shows how to create provider-specific LLM classes that transparently proxy requests through the gateway. - -#### 9.1 Architecture - -The `chat/` module provides drop-in LLM classes that users pass to their framework's agent constructor. Each class wraps a framework's native LLM class (or implements its base protocol) and redirects HTTP traffic through UiPath's gateway: - -``` -Framework Agent - └── UiPathOpenAI / UiPathGemini / UiPathAnthropic (your chat module) - └── UiPath LLM Gateway - └── Actual LLM provider (OpenAI, Google, Anthropic) -``` - -Gateway URL format: `{UIPATH_URL}/agenthub_/llm/raw/vendor/{vendor}/model/{model}/completions` - -#### 9.2 Shared Utilities (`_common.py`) - -Create a `_common.py` with three shared helpers used by all providers: - -```python -# src/uipath_{framework}/chat/_common.py -import os -from uipath.utils import EndpointManager - -def get_uipath_config() -> tuple[str, str]: - """Read UIPATH_URL and UIPATH_ACCESS_TOKEN from environment.""" - uipath_url = os.getenv("UIPATH_URL") - if not uipath_url: - raise ValueError("UIPATH_URL environment variable is required") - access_token = os.getenv("UIPATH_ACCESS_TOKEN") - if not access_token: - raise ValueError("UIPATH_ACCESS_TOKEN environment variable is required") - return uipath_url, access_token - -def get_uipath_headers(token: str) -> dict[str, str]: - """Build auth + tracking headers for gateway requests.""" - headers = {"Authorization": f"Bearer {token}"} - if job_key := os.getenv("UIPATH_JOB_KEY"): - headers["X-UiPath-JobKey"] = job_key - if process_key := os.getenv("UIPATH_PROCESS_KEY"): - headers["X-UiPath-ProcessKey"] = process_key - return headers - -def build_gateway_url(vendor: str, model: str, uipath_url: str | None = None) -> str: - """Build full gateway URL using EndpointManager.""" - if not uipath_url: - uipath_url = os.getenv("UIPATH_URL") - if not uipath_url: - raise ValueError("UIPATH_URL environment variable is required") - vendor_endpoint = EndpointManager.get_vendor_endpoint() - formatted = vendor_endpoint.format(vendor=vendor, model=model) - return f"{uipath_url.rstrip('/')}/{formatted}" -``` - -#### 9.3 Gateway Vendor Routing, API Flavor, and Model Naming - -The gateway URL requires a **vendor** and **model** in the path. The correct vendor depends on the model name prefix: - -| Model prefix | Vendor | API Flavor Header | Example model | -|---|---|---|---| -| `gpt-*`, `o1-*`, `o3-*` | `openai` | `OpenAiChatCompletions` | `gpt-4.1-2025-04-14` | -| `gemini-*` | `vertexai` | `GeminiGenerateContent` | `gemini-2.5-flash` | -| `anthropic.*` (Bedrock) | `awsbedrock` | `AnthropicClaude` | `anthropic.claude-haiku-4-5-20251001-v1:0` | -| `claude-*` (Vertex) | `vertexai` | `AnthropicClaude` | `claude-sonnet-4-20250514` | - -**Required header**: Every request MUST include `X-UiPath-LlmGateway-ApiFlavor` so the gateway knows how to parse the request body. Without it, the gateway returns a 500 error. - -**URL-rewriting is required**: Many provider SDKs append their own API paths to the `base_url` (e.g., the Anthropic SDK appends `/v1/messages`). This causes 404 errors because the gateway expects requests at `.../completions` exactly. Always use a URL-rewriting httpx transport instead of relying on `base_url`. - -Example URL: `{UIPATH_URL}/agenthub_/llm/raw/vendor/awsbedrock/model/anthropic.claude-haiku-4-5-20251001-v1:0/completions` - -#### 9.4 Three Integration Strategies - -There are three strategies depending on what the framework's LLM class exposes: - -**Strategy A: URL-Rewriting Transport with SDK Client** (e.g., Anthropic) - -When the provider's SDK creates its own HTTP client, provide a custom `http_client` with a URL-rewriting transport: - -```python -# src/uipath_{framework}/chat/anthropic.py -import httpx -from functools import cached_property -from google.adk.models.anthropic_llm import AnthropicLlm -from typing_extensions import override -from uipath._utils._ssl_context import get_httpx_client_kwargs -from ._common import build_gateway_url, get_uipath_config, get_uipath_headers - -class _AsyncUrlRewriteTransport(httpx.AsyncBaseTransport): - def __init__(self, gateway_url: str, **kwargs): - self.gateway_url = gateway_url - self._transport = httpx.AsyncHTTPTransport(**kwargs) - - async def handle_async_request(self, request): - if "/v1/messages" in str(request.url): - gateway_url_parsed = httpx.URL(self.gateway_url) - headers = dict(request.headers) - headers["host"] = gateway_url_parsed.host - request = httpx.Request( - method=request.method, url=self.gateway_url, - headers=headers, content=request.content, - extensions=request.extensions, - ) - return await self._transport.handle_async_request(request) - -class UiPathAnthropic(AnthropicLlm): - @cached_property - @override - def _anthropic_client(self): - from anthropic import AsyncAnthropic # type: ignore[import-not-found] - uipath_url, token = get_uipath_config() - # anthropic.claude-* → Bedrock, claude-* → Vertex AI - vendor = "awsbedrock" if self.model.startswith("anthropic.") else "vertexai" - gateway_url = build_gateway_url(vendor, self.model, uipath_url) - auth_headers = get_uipath_headers(token) - auth_headers["X-UiPath-LlmGateway-ApiFlavor"] = "AnthropicClaude" - client_kwargs = get_httpx_client_kwargs() - http_client = httpx.AsyncClient( - transport=_AsyncUrlRewriteTransport(gateway_url), - headers=auth_headers, **client_kwargs, - ) - return AsyncAnthropic(api_key=token, http_client=http_client) -``` - -**Key pattern**: The parent class (`AnthropicLlm`) creates its async client in a `cached_property`. Override that property to inject a custom `http_client` with a URL-rewriting transport. Do NOT use `base_url` — the Anthropic SDK appends `/v1/messages` to it, causing 404 errors. The rest of the class (content conversion, streaming, tool calling) works unchanged. - -**Strategy B: URL-Rewriting httpx Transport** (for SDK clients that build URLs internally — e.g., Gemini) - -When the provider's SDK builds URLs internally, intercept at the HTTP transport layer: - -```python -# src/uipath_{framework}/chat/gemini.py -import httpx -from functools import cached_property -from google.adk.models.google_llm import Gemini -from google.genai import Client, types -from typing_extensions import override -from uipath._utils._ssl_context import get_httpx_client_kwargs -from ._common import build_gateway_url, get_uipath_config, get_uipath_headers - -def _rewrite_request_for_gateway(request: httpx.Request, gateway_url: str) -> httpx.Request: - """Rewrite generateContent URLs to the UiPath gateway.""" - url_str = str(request.url) - if "generateContent" in url_str or "streamGenerateContent" in url_str: - is_streaming = "streamGenerateContent" in url_str - headers = dict(request.headers) - if is_streaming: - headers["X-UiPath-Streaming-Enabled"] = "true" - gateway_url_parsed = httpx.URL(gateway_url) - headers["host"] = gateway_url_parsed.host - return httpx.Request( - method=request.method, url=gateway_url, - headers=headers, content=request.content, - extensions=request.extensions, - ) - return request - -class _AsyncUrlRewriteTransport(httpx.AsyncBaseTransport): - def __init__(self, gateway_url: str, **kwargs): - self.gateway_url = gateway_url - self._transport = httpx.AsyncHTTPTransport(**kwargs) - - async def handle_async_request(self, request): - request = _rewrite_request_for_gateway(request, self.gateway_url) - return await self._transport.handle_async_request(request) - -class UiPathGemini(Gemini): - @cached_property - @override - def api_client(self) -> Client: - uipath_url, token = get_uipath_config() - gateway_url = build_gateway_url("vertexai", self.model, uipath_url) - auth_headers = get_uipath_headers(token) - auth_headers["X-UiPath-LlmGateway-ApiFlavor"] = "GeminiGenerateContent" - client_kwargs = get_httpx_client_kwargs() - http_options = types.HttpOptions( - headers=self._tracking_headers(), - httpx_async_client=httpx.AsyncClient( - transport=_AsyncUrlRewriteTransport(gateway_url), - headers=auth_headers, **client_kwargs, - ), - ) - return Client(api_key="uipath-gateway", http_options=http_options) -``` - -**Key pattern**: The `Gemini` class creates a `google.genai.Client` in a `cached_property` called `api_client`. The `Client` constructor accepts `http_options` with custom httpx clients. Inject a transport that intercepts `generateContent`/`streamGenerateContent` requests and rewrites the URL to the gateway. All native Gemini features (streaming, caching, tool calling) work automatically. - -**Strategy C: Raw HTTP + Full Content Conversion** (when no framework LLM class exists — e.g., OpenAI) - -When the framework doesn't have a built-in class for the provider, implement `BaseLlm` directly: - -```python -# src/uipath_{framework}/chat/openai.py -from google.adk.models.base_llm import BaseLlm -from google.genai import types - -class UiPathOpenAI(BaseLlm): - model: str = "gpt-4.1-2025-04-14" - - async def generate_content_async(self, llm_request, stream=False): - # 1. Convert LlmRequest → OpenAI chat completions format - body = self._build_request_body(llm_request, self.model, stream) - # 2. POST to gateway via httpx - # 3. Parse response (SSE for streaming) → yield LlmResponse - ... -``` - -This requires implementing content conversion between the framework's types and the provider's API format. Key conversions: -- `types.Content` (role + parts) → OpenAI messages (role + content/tool_calls) -- `types.FunctionDeclaration` → OpenAI tools format -- `response_schema` → OpenAI `response_format` with `json_schema` -- SSE `data: {...}` lines → `LlmResponse` with partial content - -> **Tip**: Look at the framework's existing LiteLLM integration (e.g., `google.adk.models.lite_llm`) for content conversion patterns. It speaks OpenAI format internally and has helpers like `_content_to_message_param()` and `_function_declaration_to_tool_param()` that you can use as reference. - -#### 9.5 Lazy Imports for Optional Dependencies - -If a provider SDK is an optional dependency (e.g., `anthropic`), use lazy imports at every level: - -```python -# chat/__init__.py — lazy imports avoid loading optional deps at import time -def __getattr__(name): - if name == "UiPathOpenAI": - from .openai import UiPathOpenAI - return UiPathOpenAI - if name == "UiPathGemini": - from .gemini import UiPathGemini - return UiPathGemini - if name == "UiPathAnthropic": - from .anthropic import UiPathAnthropic - return UiPathAnthropic - raise AttributeError(f"module {__name__!r} has no attribute {name!r}") - -__all__ = ["UiPathOpenAI", "UiPathGemini", "UiPathAnthropic"] -``` - -**Why**: Eager imports cause `ModuleNotFoundError` when optional dependencies aren't installed. The `__getattr__` pattern defers loading until the class is actually used. - -In `pyproject.toml`, declare optional deps: -```toml -[project.optional-dependencies] -anthropic = ["anthropic>=0.43.0"] -``` - -#### 9.6 SSL Context - -Always use `get_httpx_client_kwargs()` from `uipath._utils._ssl_context` when creating httpx clients. This provides the correct SSL context, timeout, and redirect settings: - -```python -from uipath._utils._ssl_context import get_httpx_client_kwargs - -client_kwargs = get_httpx_client_kwargs() -# Returns: {"follow_redirects": True, "timeout": 30.0, "verify": ssl_context} - -async with httpx.AsyncClient(**client_kwargs) as client: - ... -``` - -#### 9.7 Package Layout - -``` -src/uipath_{framework}/ -├── __init__.py # Lazy exports for LLM classes -├── chat/ -│ ├── __init__.py # Lazy imports (__getattr__ pattern) -│ ├── _common.py # Shared: get_uipath_config, headers, gateway URL -│ ├── openai.py # UiPathOpenAI(BaseLlm) — raw HTTP -│ ├── gemini.py # UiPathGemini(Gemini) — URL-rewriting transport -│ └── anthropic.py # UiPathAnthropic(AnthropicLlm) — URL-rewriting transport -└── runtime/ - └── ... # Runtime integration (STEPs 1-8) -``` - -#### 9.8 Usage - -```python -from uipath_google_adk.chat import UiPathOpenAI, UiPathGemini, UiPathAnthropic -from google.adk.agents import Agent - -# OpenAI models via gateway -agent = Agent(name="assistant", model=UiPathOpenAI(model="gpt-4.1-2025-04-14"), ...) - -# Gemini models via gateway -agent = Agent(name="assistant", model=UiPathGemini(model="gemini-2.5-flash"), ...) - -# Anthropic models via gateway -agent = Agent(name="assistant", model=UiPathAnthropic(model="claude-sonnet-4-20250514"), ...) -``` - -#### 9.9 Checklist - -- [ ] Shared `_common.py` with `get_uipath_config()`, `get_uipath_headers()`, `build_gateway_url()` -- [ ] Each provider class routes through UiPath gateway URL -- [ ] Correct **vendor** selected per model prefix (`openai`, `vertexai`, `awsbedrock` — see table in 9.3) -- [ ] `X-UiPath-LlmGateway-ApiFlavor` header set on every request (gateway returns 500 without it) -- [ ] URL-rewriting transport used (do NOT rely on SDK `base_url` — SDKs append paths causing 404s) -- [ ] Auth headers include `Authorization`, optional `X-UiPath-JobKey`, `X-UiPath-ProcessKey` -- [ ] Uses `EndpointManager.get_vendor_endpoint()` for URL construction -- [ ] Uses `get_httpx_client_kwargs()` for SSL/timeout when creating httpx clients -- [ ] Optional dependencies use lazy imports (`__getattr__`) at all package levels -- [ ] Optional deps declared in `pyproject.toml` under `[project.optional-dependencies]` -- [ ] mypy overrides added for optional import modules -- [ ] Streaming sets `X-UiPath-Streaming-Enabled: true` header -- [ ] All type annotations pass mypy (use `Dict[str, Any]` not bare `dict` for generic types) - ---- - -## Quick Reference: Key Imports - -```python -# Serialization (use this instead of writing your own _serialize.py) -from uipath.core.serialization import serialize_defaults - -# Core protocols and types -from uipath.runtime import ( - UiPathRuntimeProtocol, # Main protocol to implement - UiPathRuntimeFactoryProtocol, # Factory protocol to implement - UiPathRuntimeFactoryRegistry, # Register your factory - UiPathRuntimeContext, # Execution context - UiPathExecuteOptions, # Execute options (resume, breakpoints) - UiPathStreamOptions, # Stream options - UiPathRuntimeResult, # Execution result - UiPathRuntimeStatus, # SUCCESSFUL, FAULTED, SUSPENDED - UiPathRuntimeEvent, # Base event class - UiPathRuntimeSchema, # Schema container - UiPathResumableRuntime, # HITL wrapper - UiPathResumeTrigger, # Resume trigger model - UiPathStreamNotSupportedError, # If streaming not supported - UiPathBreakpointResult, # Breakpoint hit result -) - -# Event types -from uipath.runtime.events import ( - UiPathRuntimeMessageEvent, # AI messages, tool calls - UiPathRuntimeStateEvent, # State updates, node transitions -) - -# Schema utilities -from uipath.runtime.schema import ( - UiPathRuntimeGraph, # Graph container - UiPathRuntimeNode, # Graph node - UiPathRuntimeEdge, # Graph edge - transform_references, # Resolve $ref in JSON schema - transform_nullable_types, # Handle nullable types - transform_attachments, # Handle UiPathAttachment refs -) - -# Error handling -from uipath.runtime.errors import ( - UiPathBaseRuntimeError, # Base error class - UiPathErrorCategory, # DEPLOYMENT, SYSTEM, USER, UNKNOWN - UiPathErrorCode, # Standard error codes - UiPathErrorContract, # Structured error info -) - -# Storage (if HITL) -from uipath.runtime.storage import UiPathRuntimeStorageProtocol -from uipath.runtime.resumable.protocols import UiPathResumableStorageProtocol -``` - ---- - -## Framework-Specific Implementation Checklist - -When implementing for a specific framework, check off these items: - -### Schema Inference -- [ ] Uses `isinstance` checks (not `getattr`) to verify agent type before accessing I/O attributes -- [ ] Imports framework types from their actual module paths, not `__init__.py` re-exports -- [ ] Uses `uipath.runtime.schema.transform_references()` for `$ref` resolution (don't reimplement) -- [ ] Uses `uipath.runtime.schema.transform_nullable_types()` for nullable handling (don't reimplement) -- [ ] When agent has typed input (Pydantic model), uses it as the FULL input schema (replaces messages) -- [ ] When agent has typed output, uses it as the FULL output schema (replaces generic result) -- [ ] Falls back to `messages` input schema only for conversational agents without typed input -- [ ] Falls back to generic `{"result": any}` output schema only when no typed output exists -- [ ] Tool type parameters are strongly typed (e.g., `Callable | BaseTool | BaseToolset`) -- [ ] Builds visualization graph with correct node types - -### Execution -- [ ] `execute()` runs agent and returns `UiPathRuntimeResult` -- [ ] Input `messages` field is correctly passed to framework -- [ ] Additional input fields are parsed into context/deps/config -- [ ] Output is serialized to dict via `uipath.core.serialization.serialize_defaults()` (do NOT create a custom `_serialize.py`) -- [ ] Errors are caught and wrapped in `FrameworkRuntimeError` - -### Streaming -- [ ] `stream()` yields `UiPathRuntimeEvent` instances -- [ ] AI text/messages → `UiPathRuntimeMessageEvent` -- [ ] State transitions → `UiPathRuntimeStateEvent` -- [ ] Final event is always `UiPathRuntimeResult` -- [ ] Internal/bookkeeping events are filtered out - -### HITL (if supported) -- [ ] Detects suspension signals from framework -- [ ] Saves state/context before suspending -- [ ] Returns `UiPathRuntimeStatus.SUSPENDED` with trigger info -- [ ] Can resume from saved state when `options.resume=True` -- [ ] SQLite storage for triggers and KV data - -### Factory -- [ ] Reads config file (`{framework}.json`) -- [ ] Discovers entrypoints from config -- [ ] Loads agents dynamically from `file.py:variable` paths -- [ ] Caches loaded agents for reuse -- [ ] Supports multiple agent definition patterns (instance, function, async, context manager) -- [ ] Registers via `UiPathRuntimeFactoryRegistry.register()` -- [ ] Entry point in `pyproject.toml` -- [ ] `dispose()` cleans up all resources - -### Testing -- [ ] Schema inference tests (input, output, graph) -- [ ] Config parsing tests (valid, invalid, missing) -- [ ] Loader tests (security, resolution patterns) -- [ ] Execute tests (success, error handling) -- [ ] Streaming tests (event types, final result) -- [ ] Storage tests (if HITL) -- [ ] E2E lifecycle test - ---- - -## Existing Integrations as Reference - -| Integration | Config File | Runner API | HITL | Package | -|-------------|------------|------------|------|---------| -| **OpenAI Agents** | `openai_agents.json` | `Runner.run()` / `Runner.run_streamed()` | No | `uipath-openai-agents` | -| **LlamaIndex** | `llama_index.json` | `workflow.run()` / `handler.stream_events()` | Yes (`InputRequiredEvent`) | `uipath-llamaindex` | -| **LangGraph** | `langgraph.json` | `graph.ainvoke()` / `graph.astream()` | Yes (`interrupt()`, `Command(resume=)`) | `uipath-langchain` | - -Source code locations (in this monorepo): -- **OpenAI Agents**: `packages/uipath-openai-agents/src/uipath_openai_agents/runtime/` -- **LlamaIndex**: `packages/uipath-llamaindex/src/uipath_llamaindex/runtime/` -- **LangGraph**: See `uipath-langchain-python` repo: `src/uipath_langchain/runtime/` -- **Runtime contracts**: `uipath-runtime-python` repo, or installed at `.venv/Lib/site-packages/uipath/runtime/` - -LLM Gateway chat module locations: -- **Google ADK**: `packages/uipath-google-adk/src/uipath_google_adk/chat/` (OpenAI, Gemini, Anthropic) -- **OpenAI Agents**: `packages/uipath-openai-agents/src/uipath_openai_agents/chat/` (OpenAI) -- **LlamaIndex**: `packages/uipath-llamaindex/src/uipath_llamaindex/llms/` (Vertex/Gemini) diff --git a/INTEGRATION_GENOME.md b/INTEGRATION_GENOME.md new file mode 100644 index 0000000..dc61415 --- /dev/null +++ b/INTEGRATION_GENOME.md @@ -0,0 +1,2135 @@ +# UiPath Runtime Integration Genome + +> A complete blueprint for Claude Code to autonomously build a new UiPath runtime integration +> for any Python agentic framework. + +--- + +## How This Document Works + +This is not documentation for humans to read and implement manually. This is a **genome** — a +structured specification that Claude Code reads and executes step-by-step to produce a working +integration package. Each phase has inputs, instructions, quality gates, and reference code +from real integrations. + +**Execution model:** +1. Phase 0 — Ask questions about the target framework +2. Phase 1 — Analyze the framework and map to UiPath contracts +3. Phase 2 — Build the integration step-by-step (9 steps, with quality gates) +4. Phase 3 — Polish (tests, CLI middleware, samples, README) + +--- + +## Architecture Overview + +### What an Integration Is + +An integration is a Python package (`uipath-`) that bridges a third-party agentic +framework with the UiPath platform. It implements two core protocols: + +- **`UiPathRuntimeProtocol`** — execute agents, stream events, provide schemas +- **`UiPathRuntimeFactoryProtocol`** — discover entrypoints, create runtime instances + +### The Runtime Wrapper Stack + +The CLI automatically wraps your runtime with cross-cutting concerns. **You only implement +the innermost layer.** The CLI handles the rest based on execution context: + +``` +┌─ UiPathExecutionRuntime ← Tracing/telemetry [CLI: eval mode] +│ └─ UiPathChatRuntime ← Chat bridge (CAS) [CLI: job_id + conversation_id] +│ └─ UiPathDebugRuntime ← Debugger/breakpoints [CLI: debug command] +│ └─ UiPathResumableRuntime ← HITL/resume triggers [Factory: if HITL supported] +│ └─ YourFrameworkRuntime ← YOUR CODE [Factory: always] +``` + +**Who applies each wrapper:** + +| Wrapper | Applied By | Condition | +|---------|-----------|-----------| +| `UiPathExecutionRuntime` | CLI (`cli_eval`) | Evaluation runs — adds OpenTelemetry spans, execution logging | +| `UiPathChatRuntime` | CLI (`cli_run`, `cli_debug`) | Cloud execution with `job_id` AND `conversation_id` — bridges chat events to CAS via WebSocket | +| `UiPathDebugRuntime` | CLI (`cli_debug`) | Debug mode — manages breakpoints, state event emission, trigger polling via SignalR or console bridge | +| `UiPathResumableRuntime` | Your Factory | When your framework supports HITL — manages resume triggers and storage | +| `YourFrameworkRuntime` | Your Factory | Always — your core implementation | + +**You never instantiate the outer wrappers.** Your factory creates `YourFrameworkRuntime`, +optionally wraps it with `UiPathResumableRuntime`, and returns it. The CLI does the rest. + +### Graph Visualization and State Events + +Your integration participates in the debugging/visualization system by: + +1. **Providing a graph** via `get_schema()` — nodes and edges that describe the agent's structure +2. **Emitting state events** during `stream()` — `STARTED`, `UPDATED`, `COMPLETED`, `FAULTED` + phases for each node as it executes +3. **Supporting breakpoints** — node IDs from the graph can be used as breakpoint targets + +The debug bridge (console or SignalR) receives state events and breakpoint results to provide +a visual debugging experience. The `UiPathDebugRuntime` wrapper intercepts these events from +your `stream()` output and forwards them to the bridge. + +**State event lifecycle per node:** +``` +STARTED → UPDATED (0..N times) → COMPLETED + → FAULTED (if error) +``` + +**Breakpoint modes:** +- `breakpoints: ["node_a", "node_b"]` — suspend before specific nodes +- `breakpoints: "*"` — step mode, suspend before every node +- `breakpoints: None` — no breakpoints, free-running + +--- + +## Phase 0 — Framework Discovery + +> Goal: Understand the target framework to determine what to build. + +Ask the user these questions. Use the answers to populate `integration_config.yaml`. + +### Questions + +**Q1: Framework Identity** +``` +What framework are you integrating? +- Name (e.g., "CrewAI", "AutoGen", "PydanticAI") +- PyPI package name (e.g., "crewai", "autogen-agentchat", "pydantic-ai") +- Import path (e.g., "crewai", "autogen_agentchat", "pydantic_ai") +``` + +**Q2: Agent Definition Pattern** +``` +How are agents/workflows defined in this framework? +a) Class instantiation — Agent(name="...", tools=[...]) +b) Decorator/function — @agent def my_agent(): ... +c) Configuration file — YAML/JSON that declares agents +d) Graph/workflow builder — StateGraph().add_node(...) +``` + +**Q3: Execution API** +``` +How do you run an agent in this framework? +- Method name for synchronous execution (e.g., agent.run(), crew.kickoff()) +- Method name for async execution (e.g., agent.arun(), await runner.run()) +- What does it return? (string, object, custom result type) +``` + +**Q4: Streaming Support** +``` +Does the framework support streaming execution events? +a) Yes — async generator / async iterator (e.g., agent.astream(), runner.run_streamed()) +b) Yes — callback/event hooks (e.g., on_tool_start, on_llm_end) +c) No — only synchronous execution +``` + +**Q5: HITL / Interrupt Support** +``` +Does the framework support pausing execution for human input? +a) Yes — built-in interrupt/checkpoint system (e.g., LangGraph interrupts, workflow events) +b) Yes — tool-level callbacks that can block (e.g., human_input tool) +c) No — runs to completion without interruption +``` + +**Q6: Message Format** +``` +What message format does the framework use internally? +a) OpenAI-style — {"role": "user", "content": "..."} +b) Framework-specific message objects (e.g., HumanMessage, AIMessage) +c) Simple strings +d) Custom content types (e.g., google.genai.types.Content) +``` + +**Q7: Typed Input/Output** +``` +Does the agent define typed input/output schemas? +a) Yes — Pydantic models or type hints on the agent definition +b) Partial — input types but output is unstructured +c) No — accepts/returns generic dicts or strings +``` + +### Output: integration_config.yaml + +```yaml +framework: + name: "CrewAI" + package: "crewai" + import_path: "crewai" + config_file: "crewai.json" + +agent_definition: "class" # class | function | config | graph +execution_api: + sync: "crew.kickoff" + async: "crew.akickoff" + return_type: "CrewOutput" +streaming: "callback" # async_generator | callback | none +hitl: "none" # builtin | tool_callback | none +message_format: "openai" # openai | framework | string | custom +typed_schemas: "partial" # yes | partial | no + +capability_tier: "CORE" # Derived: FULL | CORE | MINIMAL +``` + +### Capability Tier Derivation + +``` +IF streaming != "none" AND hitl == "builtin": + tier = FULL +ELIF streaming != "none": + tier = CORE +ELSE: + tier = MINIMAL +``` + +| Tier | Files to Generate | +|------|------------------| +| **FULL** | config, loader, schema, runtime (execute + stream), storage, resumable, factory, errors, CLI middleware, tests | +| **CORE** | config, loader, schema, runtime (execute + stream), factory, errors, CLI middleware, tests | +| **MINIMAL** | config, loader, schema, runtime (execute only), factory, errors, CLI middleware, tests | + +--- + +## Phase 1 — Architecture Mapping + +> Goal: Map the target framework's concepts to UiPath runtime contracts. + +### Concept Mapping Table + +Fill this out based on Phase 0 answers: + +``` +Framework Concept → UiPath Contract +───────────────────────────────────────────── +Agent/Crew/Workflow → Entrypoint (string in config JSON) +agent.run() / crew.kickoff → runtime.execute() +agent.astream() → runtime.stream() +Agent class definition → get_schema() input/output +Agent tools list → UiPathRuntimeNode (type="tool") +Agent sub-agents → UiPathRuntimeNode (type="agent") with subgraph +Framework events → UiPathRuntimeStateEvent / UiPathRuntimeMessageEvent +Framework result → UiPathRuntimeResult.output +Framework errors → UiPathBaseRuntimeError with error codes +Framework interrupts → UiPathRuntimeStatus.SUSPENDED + UiPathResumeTrigger +Config file (JSON) → Factory.discover_entrypoints() +``` + +### Package Structure to Generate + +``` +packages/uipath-/ +├── src/uipath_/ +│ ├── __init__.py +│ ├── middlewares.py # CLI middleware registration +│ ├── _cli/ +│ │ ├── __init__.py +│ │ ├── cli_new.py # Scaffolding command +│ │ └── _templates/ +│ │ ├── main.py.template # Starter agent template +│ │ └── config.json.template # Config file template +│ ├── chat/ # [Optional] LLM provider classes +│ │ ├── __init__.py +│ │ ├── openai.py # OpenAI via UiPath Gateway +│ │ ├── anthropic.py # Anthropic via UiPath Gateway +│ │ └── gemini.py # Gemini via UiPath Gateway +│ └── runtime/ +│ ├── __init__.py +│ ├── config.py # Config file parser +│ ├── loader.py # Dynamic agent loader +│ ├── schema.py # Schema + graph extraction +│ ├── runtime.py # UiPathRuntimeProtocol impl +│ ├── factory.py # UiPathRuntimeFactoryProtocol impl +│ ├── errors.py # Error codes and exception classes +│ ├── messages.py # [CORE+] Message format mapper +│ └── storage.py # [FULL only] SQLite storage adapter +├── tests/ +│ ├── __init__.py +│ ├── conftest.py +│ ├── test_schema.py +│ ├── test_runtime.py +│ └── test_factory.py +├── samples/ +│ └── simple_agent/ +│ ├── main.py +│ └── .json +├── pyproject.toml +└── README.md +``` + +--- + +## Phase 2 — Build Execution + +> 9 steps. Each has instructions, reference code, and a quality gate. + +--- + +### Step 1: Project Scaffold + +> Create the package structure, pyproject.toml, and basic module files. + +#### pyproject.toml Template + +```toml +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.backends" + +[project] +name = "uipath-" +version = "0.0.1" +description = "UiPath integration for " +requires-python = ">=3.11" +dependencies = [ + "uipath>=2.8.35, <2.9.0", + "uipath-runtime>=0.9.0, <1.0.0", + ">=X.Y.Z, = "uipath_.runtime:register_runtime_factory" + +[project.entry-points."uipath.middlewares"] +register = "uipath_.middlewares:register_middleware" + +[tool.hatch.build.targets.wheel] +packages = ["src/uipath_"] + +[tool.ruff] +line-length = 88 + +[tool.ruff.format] +quote-style = "double" + +[tool.mypy] +python_version = "3.11" +strict = true +plugins = ["pydantic.mypy"] + +[[tool.mypy.overrides]] +module = ".*" +ignore_missing_imports = true +``` + +#### Entry Points Explained + +The two entry points are critical: + +1. **`uipath.runtime.factories`** — Registers your factory so the CLI can discover it when + your config file (e.g., `crewai.json`) exists in a project directory. The CLI calls + `UiPathRuntimeFactoryRegistry.get()` which iterates over all registered factories and + picks the one whose config file is present. + +2. **`uipath.middlewares`** — Registers your CLI middleware so `uipath new` can offer your + framework as a scaffolding option. When a user runs `uipath new`, the CLI calls all + registered middlewares which can inject their own commands. + +#### __init__.py Template + +```python +"""UiPath Integration.""" + +def __getattr__(name: str): + """Lazy imports for optional heavy dependencies.""" + if name == "UiPathRuntimeFactory": + from .runtime.factory import UiPathRuntimeFactory + return UiPathRuntimeFactory + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") +``` + +#### QUALITY GATE 1 +``` +✓ Package installs with `uv pip install -e .` +✓ Entry points are discoverable: `python -c "from importlib.metadata import entry_points; print(entry_points(group='uipath.runtime.factories'))"` +✓ Directory structure matches the template above +✓ No import errors on `import uipath_` +``` + +--- + +### Step 2: Config & Loader + +> Parse the framework's config file and dynamically load agents. + +#### Config File Format + +Every integration uses a JSON config file that maps entrypoint names to Python module paths: + +```json +{ + "agents": { + "my_agent": "main.py:agent", + "another": "src/agents/advanced.py:workflow" + } +} +``` + +The key is the entrypoint name (used in CLI: `uipath run my_agent`), the value is +`:` pointing to the agent/workflow object. + +**Naming convention:** Use `agents` as the key for agent-based frameworks, `workflows` for +workflow-based frameworks (like LlamaIndex). + +#### config.py — Reference Implementation + +```python +"""Configuration loader for integration.""" + +from __future__ import annotations + +import json +from pathlib import Path + + +class Config: + """Loads and caches .json configuration.""" + + def __init__(self, config_path: str = ".json") -> None: + self._config_path = config_path + self._agents: dict[str, str] | None = None + + @property + def agents(self) -> dict[str, str]: + if self._agents is None: + self._agents = self._load_agents() + return self._agents + + def _load_agents(self) -> dict[str, str]: + config_file = Path(self._config_path) + if not config_file.exists(): + raise FileNotFoundError( + f"Config file not found: {self._config_path}. " + f"Run 'uipath init' to generate it." + ) + with open(config_file) as f: + data = json.load(f) + return data.get("agents", {}) +``` + +#### loader.py — Reference Implementation + +The loader dynamically imports the Python module and extracts the agent variable. +It must handle both sync and async agent constructors, and support context managers. + +```python +"""Dynamic agent/workflow loader for integration.""" + +from __future__ import annotations + +import importlib +import inspect +import sys +from pathlib import Path +from typing import Any, Self + + +class AgentLoader: + """Loads an agent from a module_path:variable_name string.""" + + def __init__(self, module_path: str, variable_name: str) -> None: + self._module_path = module_path + self._variable_name = variable_name + self._agent: Any = None + + @classmethod + def from_entrypoint(cls, entrypoint: str, base_path: str = ".") -> Self: + """Parse 'path/to/file.py:variable' format.""" + if ":" not in entrypoint: + raise ValueError( + f"Invalid entrypoint format: {entrypoint}. " + f"Expected 'path/to/file.py:variable_name'" + ) + module_path, variable_name = entrypoint.rsplit(":", 1) + # Resolve relative to base_path + full_path = str(Path(base_path) / module_path) + return cls(full_path, variable_name) + + async def load(self) -> Any: + """Load and return the agent object.""" + # Add parent directory to sys.path for imports + module_dir = str(Path(self._module_path).parent.resolve()) + if module_dir not in sys.path: + sys.path.insert(0, module_dir) + + # Import the module + module_name = Path(self._module_path).stem + spec = importlib.util.spec_from_file_location( + module_name, self._module_path + ) + if spec is None or spec.loader is None: + raise ImportError(f"Cannot load module: {self._module_path}") + + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + spec.loader.exec_module(module) + + # Get the variable + agent = getattr(module, self._variable_name, None) + if agent is None: + raise AttributeError( + f"Module {self._module_path} has no attribute " + f"'{self._variable_name}'" + ) + + # Handle callables (factory functions) + if callable(agent) and not isinstance(agent, type): + agent = agent() if not inspect.iscoroutinefunction(agent) else await agent() + + # Handle async context managers + if hasattr(agent, "__aenter__"): + agent = await agent.__aenter__() + + self._agent = agent + return agent + + async def cleanup(self) -> None: + """Cleanup resources (async context managers).""" + if self._agent and hasattr(self._agent, "__aexit__"): + await self._agent.__aexit__(None, None, None) +``` + +#### errors.py — Reference Implementation + +```python +"""Error codes and exceptions for integration.""" + +from __future__ import annotations + +from enum import Enum + +from uipath.runtime.errors import UiPathBaseRuntimeError, UiPathErrorCategory + + +class UiPathErrorCode(Enum): + """Error codes for runtime errors.""" + + AGENT_LOAD_FAILURE = ".AGENT_LOAD_FAILURE" + AGENT_EXECUTION_ERROR = ".AGENT_EXECUTION_ERROR" + AGENT_TIMEOUT = ".AGENT_TIMEOUT" + CONFIG_MISSING = ".CONFIG_MISSING" + CONFIG_INVALID = ".CONFIG_INVALID" + SCHEMA_INFERENCE_ERROR = ".SCHEMA_INFERENCE_ERROR" + STREAM_ERROR = ".STREAM_ERROR" + + +class UiPathRuntimeError(UiPathBaseRuntimeError): + """Runtime error for integration.""" + + def __init__( + self, + code: UiPathErrorCode, + title: str, + detail: str, + category: UiPathErrorCategory, + status: int | None = None, + ) -> None: + super().__init__( + code=code.value, + title=title, + detail=detail, + category=category, + status=status, + prefix="", + ) +``` + +#### QUALITY GATE 2 +``` +✓ Config parser loads a sample .json without errors +✓ Loader can import a simple Python module and extract a variable +✓ Error codes are properly defined with unique prefixes +✓ from_entrypoint("main.py:agent") correctly parses the string +``` + +--- + +### Step 3: Schema Inference + +> Extract input/output JSON schemas and build a graph for visualization. + +Schema inference serves two purposes: +1. **Input/Output schemas** — used by the platform to validate data and generate UIs +2. **Graph structure** — used by the debugger for visualization and breakpoint targeting + +#### Key Principles + +1. **Use `isinstance` checks, not `getattr` guessing** — Verify actual agent types before + accessing framework-specific attributes. +2. **Import from actual module paths** — Not from `__init__.py` re-exports which may be unstable. +3. **Use helper utilities** — `transform_references()`, `transform_nullable_types()`, + `transform_attachments()` from `uipath.runtime.schema`. +4. **Typed input replaces messages** — If agent defines `input_schema`, use it as the FULL + input (don't merge with a generic messages field). +5. **Typed output replaces generic result** — If agent defines `output_schema`, use it as + the FULL output. +6. **Fallback to defaults** — Only use `{"messages": string}` for conversational agents + WITHOUT typed input. + +#### Default Schemas (Conversational Agent) + +When the agent has no typed input/output, use the conversational default: + +```python +DEFAULT_INPUT_SCHEMA = { + "type": "object", + "properties": { + "messages": {"type": "string", "description": "User message"} + }, + "required": ["messages"], +} + +DEFAULT_OUTPUT_SCHEMA = { + "type": "object", + "properties": { + "messages": {"type": "string", "description": "Agent response"} + }, + "required": ["messages"], +} +``` + +#### Graph Building + +The graph describes the agent's structure for debugging visualization. Each node in the graph +can be a breakpoint target (using `node.id`). + +```python +from uipath.runtime.schema import ( + UiPathRuntimeGraph, + UiPathRuntimeNode, + UiPathRuntimeEdge, + UiPathRuntimeSchema, + transform_references, + transform_nullable_types, + transform_attachments, +) + + +def build_graph(agent: Any) -> UiPathRuntimeGraph | None: + """Build a visualization graph from the framework agent. + + Node IDs become breakpoint targets. The debug bridge uses them to: + - Display execution progress (which node is running) + - Set breakpoints (pause before/after specific nodes) + - Show the agent's structure in the UI + """ + nodes: list[UiPathRuntimeNode] = [] + edges: list[UiPathRuntimeEdge] = [] + + # Example: Extract tools as nodes + for tool in getattr(agent, "tools", []): + nodes.append(UiPathRuntimeNode( + id=tool.name, # ← This ID is used for breakpoints + name=tool.name, + type="tool", + metadata={"description": getattr(tool, "description", None)}, + )) + + # Example: Extract sub-agents as nodes with subgraphs + for sub_agent in getattr(agent, "agents", []): + sub_graph = build_graph(sub_agent) # Recursive + nodes.append(UiPathRuntimeNode( + id=sub_agent.name, + name=sub_agent.name, + type="agent", + subgraph=sub_graph, # ← Nested graph for composite nodes + )) + + # Example: Add the main agent node + nodes.insert(0, UiPathRuntimeNode( + id="agent", + name=agent.name if hasattr(agent, "name") else "agent", + type="model", + metadata={"model": getattr(agent, "model", None)}, + )) + + # Build edges (agent → tools, agent → sub-agents) + for node in nodes[1:]: + edges.append(UiPathRuntimeEdge( + source="agent", + target=node.id, + label=None, + )) + + if not nodes: + return None + + return UiPathRuntimeGraph(nodes=nodes, edges=edges) +``` + +#### Schema Types Reference + +```python +class UiPathRuntimeNode(BaseModel): + id: str # Unique ID — used as breakpoint target + name: str # Display name + type: str # "tool", "model", "agent", "node" + subgraph: UiPathRuntimeGraph | None = None # Nested graph for hierarchical agents + metadata: dict[str, Any] | None = None # Framework-specific metadata + +class UiPathRuntimeEdge(BaseModel): + source: str # Source node ID + target: str # Target node ID + label: str | None = None # Condition or transition label + +class UiPathRuntimeGraph(BaseModel): + nodes: list[UiPathRuntimeNode] + edges: list[UiPathRuntimeEdge] + +class UiPathRuntimeSchema(BaseModel): + file_path: str # Entrypoint identifier + unique_id: str # UUID for this schema + type: str # "agent", "workflow", etc. + input: dict[str, Any] # Input JSON schema + output: dict[str, Any] # Output JSON schema + graph: UiPathRuntimeGraph | None = None # Visualization graph + metadata: dict[str, Any] | None = None # Custom metadata +``` + +#### Complete get_schema() Implementation Pattern + +```python +import uuid +from uipath.runtime.schema import ( + UiPathRuntimeSchema, + transform_references, + transform_nullable_types, + transform_attachments, +) + + +async def get_schema(self) -> UiPathRuntimeSchema: + """Extract schema from the loaded agent.""" + # 1. Try typed input schema + input_schema = self._get_typed_input_schema() + if input_schema: + input_schema = transform_attachments( + transform_nullable_types( + transform_references(input_schema) + ) + ) + else: + input_schema = DEFAULT_INPUT_SCHEMA + + # 2. Try typed output schema + output_schema = self._get_typed_output_schema() + if output_schema: + output_schema = transform_attachments( + transform_nullable_types( + transform_references(output_schema) + ) + ) + else: + output_schema = DEFAULT_OUTPUT_SCHEMA + + # 3. Build visualization graph + graph = build_graph(self._agent) + + return UiPathRuntimeSchema( + file_path=self._entrypoint, + unique_id=str(uuid.uuid4()), + type="agent", + input=input_schema, + output=output_schema, + graph=graph, + ) +``` + +#### Reference: How Existing Integrations Build Graphs + +**OpenAI Agents** — Extracts agents, handoffs (sub-agents), tools: +```python +# Agents become nodes with type="agent" +# Handoffs become edges between agent nodes +# Tools become nodes with type="tool" +# Nested agents get subgraph with recursive extraction +``` + +**LangGraph** — Extracts from compiled StateGraph: +```python +# Graph nodes → UiPathRuntimeNode (type detected from node content) +# Graph edges → UiPathRuntimeEdge (conditional edges get labels) +# Subgraphs → nested UiPathRuntimeGraph via node.subgraph +``` + +**Google ADK** — Extracts from LlmAgent tree: +```python +# Agent → node with type="model" +# sub_agents → recursive nodes with subgraphs +# tools → nodes with type="tool" +``` + +#### QUALITY GATE 3 +``` +✓ get_schema() returns valid UiPathRuntimeSchema +✓ input/output schemas are valid JSON Schema +✓ Graph nodes have unique IDs (these become breakpoint targets) +✓ Graph edges reference valid source/target node IDs +✓ transform_references() resolves all $ref pointers +✓ Typed schemas override defaults when present +``` + +--- + +### Step 4: Execute Implementation + +> Implement `runtime.execute()` — synchronous agent execution. + +#### Core Contract + +```python +async def execute( + self, + input: dict[str, Any], + options: UiPathExecuteOptions, +) -> UiPathRuntimeResult: +``` + +**Input:** Always a `dict[str, Any]` parsed from JSON. Your runtime must map this to +whatever the framework expects. + +**Output:** Always a `UiPathRuntimeResult` with: +- `output` — JSON-serializable result (use `serialize_json()`) +- `status` — `SUCCESSFUL`, `FAULTED`, or `SUSPENDED` +- `error` — `UiPathErrorContract` if faulted + +#### UiPathExecuteOptions + +```python +class UiPathExecuteOptions(BaseModel): + resume: bool = False # Resume from suspended state + breakpoints: list[str] | Literal["*"] | None = None # Breakpoint node IDs + + model_config = ConfigDict(extra="allow") # Accepts arbitrary extra fields +``` + +The `breakpoints` field is populated by `UiPathDebugRuntime` from the debug bridge. +Your runtime should check this and yield `UiPathBreakpointResult` when a node matches. +If your framework doesn't support breakpoints natively, the debug wrapper handles it +via stream interception — but you should still emit state events so the wrapper knows +which node is executing. + +#### Implementation Pattern + +```python +import traceback +from typing import Any + +from uipath.core.serialization import serialize_json +from uipath.runtime import ( + UiPathExecuteOptions, + UiPathRuntimeResult, + UiPathRuntimeStatus, +) +from uipath.runtime.errors import UiPathErrorCategory + +from .errors import UiPathErrorCode, UiPathRuntimeError + + +class UiPathRuntime: + """UiPath runtime implementation for .""" + + def __init__( + self, + agent: Any, + runtime_id: str, + entrypoint: str, + ) -> None: + self._agent = agent + self._runtime_id = runtime_id + self._entrypoint = entrypoint + + async def execute( + self, + input: dict[str, Any], + options: UiPathExecuteOptions, + ) -> UiPathRuntimeResult: + """Execute the agent and return result.""" + try: + # 1. Map input dict to framework format + framework_input = self._map_input(input) + + # 2. Run the agent + result = await self._run_agent(framework_input, options) + + # 3. Serialize output + output = serialize_json(result) + + return UiPathRuntimeResult( + output=output, + status=UiPathRuntimeStatus.SUCCESSFUL, + ) + except UiPathRuntimeError: + raise # Re-raise our own errors + except Exception as e: + raise UiPathRuntimeError( + code=UiPathErrorCode.AGENT_EXECUTION_ERROR, + title="Agent execution failed", + detail=f"{type(e).__name__}: {e}\n{traceback.format_exc()}", + category=UiPathErrorCategory.SYSTEM, + ) from e + + def _map_input(self, input: dict[str, Any]) -> Any: + """Map UiPath input dict to framework-specific format.""" + # For conversational agents: + if "messages" in input: + return input["messages"] # or convert to framework message type + # For typed agents: + return input + + async def _run_agent(self, input: Any, options: UiPathExecuteOptions) -> Any: + """Run the framework agent. Override per framework.""" + # Example patterns: + # return await self._agent.arun(input) + # return await self._agent.kickoff(inputs=input) + # return await Runner.run(self._agent, input) + raise NotImplementedError +``` + +#### Serialization Rules + +Always use `serialize_json()` from `uipath.core.serialization`. It handles: +- Pydantic models → dict +- Dataclasses → dict +- Enums → value +- datetime → ISO string +- Custom objects → best-effort serialization + +```python +from uipath.core.serialization import serialize_json + +# DO: +output = serialize_json(framework_result) + +# DON'T: +output = framework_result.model_dump() # May miss nested types +output = json.loads(json.dumps(result, default=str)) # Lossy +``` + +#### Resume Support (for HITL) + +When `options.resume` is `True`, the agent should resume from its last checkpoint +rather than starting fresh. How this works depends on the framework: + +```python +async def execute(self, input, options): + if options.resume: + # Resume from checkpoint + result = await self._resume_agent(input, options) + else: + # Fresh execution + result = await self._run_agent(self._map_input(input), options) + ... +``` + +#### QUALITY GATE 4 +``` +✓ execute() returns UiPathRuntimeResult with status SUCCESSFUL on happy path +✓ execute() returns UiPathRuntimeResult with status FAULTED on error (via exception) +✓ Output is serialized via serialize_json() +✓ Errors are wrapped in UiPathRuntimeError with proper category +✓ Input mapping correctly handles both "messages" and typed inputs +✓ Resume flag is respected (if HITL supported) +``` + +--- + +### Step 5: Streaming Events + +> Implement `runtime.stream()` — real-time event emission during execution. + +**Skip this step if capability tier is MINIMAL.** + +#### Core Contract + +```python +async def stream( + self, + input: dict[str, Any], + options: UiPathStreamOptions, +) -> AsyncGenerator[UiPathRuntimeEvent, None]: +``` + +Your `stream()` must yield events as the agent executes, and **always yield a +`UiPathRuntimeResult` as the final event**. + +#### Event Types + +There are two event types you emit during streaming: + +**1. `UiPathRuntimeStateEvent`** — Node lifecycle events for graph visualization + +```python +from uipath.runtime.events import UiPathRuntimeStateEvent, UiPathRuntimeStatePhase + +# When a node starts executing: +yield UiPathRuntimeStateEvent( + payload={"input": node_input}, # State data (what went into the node) + node_name="agent_node", # Simple node name + qualified_node_name="parent:agent_node",# Full path (for subgraph hierarchy) + phase=UiPathRuntimeStatePhase.STARTED, # Node is starting +) + +# When a node's state updates mid-execution: +yield UiPathRuntimeStateEvent( + payload={"partial_output": partial}, + node_name="agent_node", + qualified_node_name="parent:agent_node", + phase=UiPathRuntimeStatePhase.UPDATED, +) + +# When a node completes: +yield UiPathRuntimeStateEvent( + payload={"output": node_output}, + node_name="agent_node", + qualified_node_name="parent:agent_node", + phase=UiPathRuntimeStatePhase.COMPLETED, +) + +# When a node fails: +yield UiPathRuntimeStateEvent( + payload={"error": str(error)}, + node_name="agent_node", + qualified_node_name="parent:agent_node", + phase=UiPathRuntimeStatePhase.FAULTED, +) +``` + +**Important:** The `node_name` values must correspond to `UiPathRuntimeNode.id` values +from your graph (Step 3). This is how the debugger knows which graph node is currently +executing, and how breakpoints match. + +**`qualified_node_name`** includes the subgraph hierarchy, separated by colons. For a +top-level node, it equals `node_name`. For a nested node: `"parent_graph:child_node"`. + +**2. `UiPathRuntimeMessageEvent`** — AI messages, tool calls, text chunks + +```python +from uipath.runtime.events import UiPathRuntimeMessageEvent + +# When the LLM produces output (text chunk, tool call, etc.) +yield UiPathRuntimeMessageEvent( + payload=message_object, # Framework-specific message object +) +``` + +The `payload` of message events is framework-specific. The `UiPathChatRuntime` wrapper +(applied by the CLI) will consume these and forward them to the chat bridge for UI display. + +#### Breakpoint Support in Streaming + +When `options.breakpoints` is set, your stream should check if the current node matches +a breakpoint and yield a `UiPathBreakpointResult`: + +```python +from uipath.runtime import UiPathBreakpointResult, UiPathStreamOptions + + +async def stream(self, input, options: UiPathStreamOptions): + breakpoints = options.breakpoints if options else None + + for node in execution_nodes: + # Check if this node is a breakpoint target + if breakpoints is not None: + if breakpoints == "*" or node.id in breakpoints: + yield UiPathBreakpointResult( + breakpoint_node=node.id, + breakpoint_type="before", + current_state=current_state_snapshot, + next_nodes=[next_node.id for next_node in node.successors], + ) + # After yielding breakpoint, the UiPathDebugRuntime wrapper + # will wait for resume, then call stream() again with + # options.resume=True and updated breakpoints + + # Emit STARTED + yield UiPathRuntimeStateEvent( + payload={}, node_name=node.id, + phase=UiPathRuntimeStatePhase.STARTED, + ) + + # Execute node... + result = await node.execute() + + # Emit COMPLETED + yield UiPathRuntimeStateEvent( + payload={"output": result}, node_name=node.id, + phase=UiPathRuntimeStatePhase.COMPLETED, + ) + + # ALWAYS yield final result + yield UiPathRuntimeResult( + output=serialize_json(final_output), + status=UiPathRuntimeStatus.SUCCESSFUL, + ) +``` + +**Note:** If your framework doesn't expose node-level execution hooks, you can still +support breakpoints by mapping framework events to nodes. The `UiPathDebugRuntime` +wrapper will intercept `UiPathBreakpointResult` events and handle the pause/resume cycle. + +#### Streaming Strategies by Framework Type + +**A) Framework has async generator / async iterator:** +```python +# Direct mapping — cleanest approach +async for event in framework.astream(input): + yield map_to_uipath_event(event) +yield final_result +``` + +**B) Framework has callback/event hooks:** +```python +# Use asyncio.Queue as bridge +queue: asyncio.Queue[UiPathRuntimeEvent] = asyncio.Queue() + +def on_node_start(node_name, data): + queue.put_nowait(UiPathRuntimeStateEvent( + payload=data, node_name=node_name, + phase=UiPathRuntimeStatePhase.STARTED, + )) + +# Register callbacks, run agent, drain queue +task = asyncio.create_task(self._run_with_callbacks(input, queue)) +while not task.done() or not queue.empty(): + event = await asyncio.wait_for(queue.get(), timeout=1.0) + yield event +``` + +**C) Framework is synchronous only (MINIMAL tier):** +```python +# Wrap execute in stream +async def stream(self, input, options): + result = await self.execute(input, UiPathExecuteOptions(**options.model_dump())) + yield result +``` + +#### If Streaming Is Not Supported + +If the framework truly cannot stream, raise the dedicated exception: + +```python +from uipath.runtime import UiPathStreamNotSupportedError + +async def stream(self, input, options): + raise UiPathStreamNotSupportedError() +``` + +#### QUALITY GATE 5 +``` +✓ stream() yields UiPathRuntimeStateEvent with STARTED/COMPLETED phases for each node +✓ node_name values in state events match node IDs in the graph from get_schema() +✓ stream() always yields UiPathRuntimeResult as the final event +✓ Breakpoints are checked when options.breakpoints is set +✓ UiPathBreakpointResult includes breakpoint_node, current_state, next_nodes +✓ Message events are yielded for LLM output (text chunks, tool calls) +``` + +--- + +### Step 6: HITL / Resumability + +> Implement suspend/resume for human-in-the-loop scenarios. + +**Skip this step if capability tier is not FULL.** + +When an agent needs human input (e.g., approval, clarification), execution suspends: +1. Your runtime detects the interrupt signal from the framework +2. You yield a `UiPathRuntimeResult` with `status=SUSPENDED` and a `UiPathResumeTrigger` +3. The `UiPathResumableRuntime` wrapper persists the trigger to storage +4. Later, when human input arrives, the CLI calls `execute(input, resume=True)` +5. Your runtime loads the checkpoint and resumes + +#### Storage Implementation + +```python +"""SQLite storage adapter for HITL state.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import aiosqlite + +from uipath.runtime import UiPathResumableStorageProtocol, UiPathResumeTrigger + + +class SqliteStorage: + """SQLite-backed storage for resume triggers and key-value pairs.""" + + def __init__(self, db_path: str = ".uipath/state.db") -> None: + self._db_path = db_path + self._db: aiosqlite.Connection | None = None + + async def setup(self) -> None: + Path(self._db_path).parent.mkdir(parents=True, exist_ok=True) + self._db = await aiosqlite.connect(self._db_path) + await self._db.execute( + "CREATE TABLE IF NOT EXISTS __uipath_resume_triggers " + "(runtime_id TEXT, data TEXT)" + ) + await self._db.execute( + "CREATE TABLE IF NOT EXISTS __uipath_runtime_kv " + "(runtime_id TEXT, namespace TEXT, key TEXT, value TEXT, " + "PRIMARY KEY (runtime_id, namespace, key))" + ) + await self._db.commit() + + async def save_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + assert self._db + for trigger in triggers: + await self._db.execute( + "INSERT INTO __uipath_resume_triggers VALUES (?, ?)", + (runtime_id, trigger.model_dump_json()), + ) + await self._db.commit() + + async def get_triggers( + self, runtime_id: str + ) -> list[UiPathResumeTrigger] | None: + assert self._db + cursor = await self._db.execute( + "SELECT data FROM __uipath_resume_triggers WHERE runtime_id = ?", + (runtime_id,), + ) + rows = await cursor.fetchall() + if not rows: + return None + return [UiPathResumeTrigger.model_validate_json(row[0]) for row in rows] + + async def delete_trigger( + self, runtime_id: str, trigger: UiPathResumeTrigger + ) -> None: + assert self._db + await self._db.execute( + "DELETE FROM __uipath_resume_triggers " + "WHERE runtime_id = ? AND data = ?", + (runtime_id, trigger.model_dump_json()), + ) + await self._db.commit() + + async def set_value( + self, runtime_id: str, namespace: str, key: str, value: Any + ) -> None: + assert self._db + await self._db.execute( + "INSERT OR REPLACE INTO __uipath_runtime_kv VALUES (?, ?, ?, ?)", + (runtime_id, namespace, key, json.dumps(value)), + ) + await self._db.commit() + + async def get_value( + self, runtime_id: str, namespace: str, key: str + ) -> Any: + assert self._db + cursor = await self._db.execute( + "SELECT value FROM __uipath_runtime_kv " + "WHERE runtime_id = ? AND namespace = ? AND key = ?", + (runtime_id, namespace, key), + ) + row = await cursor.fetchone() + return json.loads(row[0]) if row else None + + async def dispose(self) -> None: + if self._db: + await self._db.close() +``` + +#### Suspension Pattern in stream() + +When the framework signals an interrupt: + +```python +from uipath.runtime import ( + UiPathRuntimeResult, + UiPathRuntimeStatus, + UiPathResumeTrigger, + UiPathResumeTriggerType, + UiPathResumeTriggerName, + UiPathApiTrigger, +) + + +# Inside stream(): +if framework_signals_interrupt(event): + trigger = UiPathResumeTrigger( + interrupt_id=str(event.interrupt_id), + trigger_type=UiPathResumeTriggerType.API, + trigger_name=UiPathResumeTriggerName( + name=event.tool_name or "human_input", + title=event.prompt or "Waiting for input", + ), + api_resume=UiPathApiTrigger( + payload=event.context, # Data needed to resume + ), + ) + + yield UiPathRuntimeResult( + output=None, + status=UiPathRuntimeStatus.SUSPENDED, + trigger=trigger, # Single trigger + # OR triggers=[trigger] # Multiple concurrent triggers + ) + return # Stop streaming — wrapper handles persistence +``` + +#### Wrapping with UiPathResumableRuntime + +In your factory (Step 7), wrap the base runtime: + +```python +from uipath.runtime import UiPathResumableRuntime, UiPathResumeTriggerProtocol + +# Your trigger handler (usually UiPathResumeTriggerHandler from uipath SDK) +from uipath._services._resume_trigger_handler import UiPathResumeTriggerHandler + +runtime = UiPathRuntime(agent, runtime_id, entrypoint) +storage = SqliteStorage() +await storage.setup() + +resumable = UiPathResumableRuntime( + delegate=runtime, + storage=storage, + trigger_manager=UiPathResumeTriggerHandler(), + runtime_id=runtime_id, +) +return resumable # Return this from factory.new_runtime() +``` + +#### QUALITY GATE 6 +``` +✓ Storage correctly persists and retrieves triggers via SQLite +✓ Suspension yields UiPathRuntimeResult with SUSPENDED status +✓ Resume trigger includes interrupt_id, trigger_type, trigger_name +✓ UiPathResumableRuntime wraps the base runtime in factory +✓ Resume with options.resume=True correctly resumes from checkpoint +``` + +--- + +### Step 7: Factory & Registration + +> Implement the factory that creates runtime instances and register it. + +#### Factory Implementation + +```python +"""Runtime factory for integration.""" + +from __future__ import annotations + +import asyncio +from typing import Any + +from uipath.runtime import ( + UiPathRuntimeContext, + UiPathRuntimeFactoryProtocol, + UiPathRuntimeFactoryRegistry, + UiPathRuntimeFactorySettings, + UiPathRuntimeProtocol, + UiPathRuntimeStorageProtocol, + UiPathResumableRuntime, +) + +from .config import Config +from .loader import AgentLoader +from .runtime import UiPathRuntime +from .errors import UiPathErrorCode, UiPathRuntimeError + + +class UiPathRuntimeFactory: + """Factory for creating runtime instances.""" + + def __init__(self, context: UiPathRuntimeContext) -> None: + self._context = context + self._config = Config() + self._agent_cache: dict[str, Any] = {} + self._agent_lock = asyncio.Lock() + self._storage: SqliteStorage | None = None # FULL tier only + + def discover_entrypoints(self) -> list[str]: + """List available agents from config file.""" + return list(self._config.agents.keys()) + + async def new_runtime( + self, + entrypoint: str, + runtime_id: str, + **kwargs: Any, + ) -> UiPathRuntimeProtocol: + """Create a new runtime instance for the given entrypoint.""" + if entrypoint not in self._config.agents: + raise UiPathRuntimeError( + code=UiPathErrorCode.CONFIG_INVALID, + title=f"Unknown entrypoint: {entrypoint}", + detail=f"Available: {list(self._config.agents.keys())}", + category=UiPathErrorCategory.USER, + ) + + agent = await self._resolve_agent(entrypoint) + + runtime = UiPathRuntime( + agent=agent, + runtime_id=runtime_id, + entrypoint=entrypoint, + ) + + # FULL tier: Wrap with UiPathResumableRuntime + # CORE/MINIMAL: Return runtime directly + # + # if self._storage: + # from uipath._services._resume_trigger_handler import ( + # UiPathResumeTriggerHandler, + # ) + # return UiPathResumableRuntime( + # delegate=runtime, + # storage=self._storage, + # trigger_manager=UiPathResumeTriggerHandler(), + # runtime_id=runtime_id, + # ) + + return runtime + + async def _resolve_agent(self, entrypoint: str) -> Any: + """Load and optionally cache the agent for the given entrypoint. + + IMPORTANT: Only cache agents if the framework allows the same instance + to be executed concurrently. Some frameworks (e.g., Agent Framework) + maintain internal state per execution, so each runtime must get a fresh + agent instance. When in doubt, don't cache. + + Cache-safe: OpenAI Agents, Google ADK, LangGraph (compiled graphs are reusable) + NOT safe: Agent Framework (workflow instances hold execution state) + """ + # Option A: With caching (if framework supports concurrent reuse) + async with self._agent_lock: + if entrypoint in self._agent_cache: + return self._agent_cache[entrypoint] + + entrypoint_path = self._config.agents[entrypoint] + loader = AgentLoader.from_entrypoint(entrypoint_path) + agent = await loader.load() + + self._agent_cache[entrypoint] = agent + return agent + + # Option B: Without caching (if framework doesn't allow concurrent reuse) + # entrypoint_path = self._config.agents[entrypoint] + # loader = AgentLoader.from_entrypoint(entrypoint_path) + # return await loader.load() + + async def get_storage(self) -> UiPathRuntimeStorageProtocol | None: + """Return storage for HITL state (FULL tier only).""" + return self._storage + + async def get_settings(self) -> UiPathRuntimeFactorySettings | None: + """Return factory settings.""" + return None + + async def dispose(self) -> None: + """Cleanup resources.""" + if self._storage: + await self._storage.dispose() + self._agent_cache.clear() +``` + +#### Registration Function + +This is what the entry point in `pyproject.toml` calls: + +```python +# At module level in runtime/__init__.py or runtime/factory.py + +def register_runtime_factory() -> None: + """Register the runtime factory.""" + + def factory_callable( + context: UiPathRuntimeContext, + ) -> UiPathRuntimeFactory: + return UiPathRuntimeFactory(context) + + UiPathRuntimeFactoryRegistry.register( + "", # Unique factory ID + factory_callable, # Creates factory from context + ".json", # Config file that triggers this factory + ) +``` + +#### How Discovery Works (CLI Flow) + +When a user runs `uipath init` or `uipath run`: + +1. CLI calls `UiPathRuntimeFactoryRegistry.get(context=ctx)` +2. Registry iterates over all registered factories (via entry points) +3. For each factory, checks if its config file exists in the current directory +4. Returns the factory whose config file is found +5. CLI calls `factory.discover_entrypoints()` to list available agents +6. CLI calls `factory.new_runtime(entrypoint, runtime_id)` to create runtime + +During `uipath init`, the CLI also: +1. Calls `runtime.get_schema()` for each entrypoint +2. Writes `entry-points.json` with all schemas (input, output, graph) +3. Generates mermaid diagrams from the graph +4. Generates AGENTS.md documentation + +#### QUALITY GATE 7 +``` +✓ Factory discovers entrypoints from config file +✓ new_runtime() returns a valid UiPathRuntimeProtocol implementation +✓ Registration function correctly registers with UiPathRuntimeFactoryRegistry +✓ Agent caching with asyncio.Lock IF the framework allows concurrent reuse of the same instance + (e.g., OpenAI Agents, Google ADK, LangGraph — compiled graphs/agents are stateless) +✓ NO caching if the framework maintains internal execution state per instance + (e.g., Agent Framework — workflow instances cannot be reused concurrently) +✓ Unknown entrypoint raises proper error +✓ dispose() cleans up all resources +``` + +--- + +### Step 8: LLM Provider Classes (UiPath LLM Gateway) + +> Route LLM calls through UiPath's LLM Gateway for managed access. + +When agents run on the UiPath platform, LLM calls should route through the UiPath LLM +Gateway (AgentHub). This provides centralized model management, rate limiting, and billing. + +#### Gateway Architecture + +``` +Your Agent → Framework LLM Client → HTTP Transport (intercepted) → UiPath Gateway → LLM Provider + ↑ + URL rewriting + + auth injection +``` + +**Gateway URL format:** +``` +{UIPATH_URL}/agenthub_/llm/raw/vendor/{vendor}/model/{model}/completions +``` + +**Vendor mapping:** + +| Model Prefix | Vendor | API Flavor Header | +|-------------|--------|-------------------| +| `gpt-*`, `o1-*`, `o3-*` | `openai` | `OpenAiChatCompletions` | +| `gemini-*` | `vertexai` | `GeminiGenerateContent` | +| `anthropic.*` (Bedrock names) | `awsbedrock` | `invoke` or `converse` | +| `claude-*` (direct) | `vertexai` | `AnthropicClaude` | + +#### Required Headers + +Every request to the gateway MUST include: + +| Header | Value | Required | +|--------|-------|----------| +| `Authorization` | `Bearer {UIPATH_ACCESS_TOKEN}` | YES | +| `X-UiPath-LlmGateway-ApiFlavor` | See vendor mapping | YES | +| `X-UiPath-Streaming-Enabled` | `"true"` or `"false"` | YES | +| `X-UiPath-JobKey` | `{UIPATH_JOB_KEY}` | Optional | +| `X-UiPath-ProcessKey` | `{UIPATH_PROCESS_KEY}` | Optional | +| `X-UiPath-AgentHub-Config` | Config name | Optional | +| `X-UiPath-LlmGateway-ByoIsConnectionId` | Connection ID | Optional | + +**Without `X-UiPath-LlmGateway-ApiFlavor`, the gateway returns HTTP 500.** + +#### Environment Variables + +```python +import os + +UIPATH_URL = os.environ.get("UIPATH_URL", "") +UIPATH_ACCESS_TOKEN = os.environ.get("UIPATH_ACCESS_TOKEN", "") +UIPATH_JOB_KEY = os.environ.get("UIPATH_JOB_KEY") +UIPATH_PROCESS_KEY = os.environ.get("UIPATH_PROCESS_KEY") +``` + +#### Implementation Strategy: URL-Rewriting Transport + +The cleanest approach is to intercept HTTP at the transport layer, rewriting URLs and +injecting headers. This works with any SDK that uses `httpx` internally. + +```python +"""UiPath LLM Gateway transport for .""" + +from __future__ import annotations + +import os +from urllib.parse import quote + +import httpx +from uipath._utils._ssl_context import get_httpx_client_kwargs + + +def _get_uipath_headers(streaming: bool = False) -> dict[str, str]: + """Build gateway headers.""" + token = os.environ.get("UIPATH_ACCESS_TOKEN", "") + headers = { + "Authorization": f"Bearer {token}", + "X-UiPath-Streaming-Enabled": "true" if streaming else "false", + } + job_key = os.environ.get("UIPATH_JOB_KEY") + if job_key: + headers["X-UiPath-JobKey"] = job_key + process_key = os.environ.get("UIPATH_PROCESS_KEY") + if process_key: + headers["X-UiPath-ProcessKey"] = process_key + return headers + + +def _build_gateway_url(vendor: str, model: str) -> str: + """Build the gateway URL for a given vendor and model.""" + base_url = os.environ.get("UIPATH_URL", "") + encoded_model = quote(model, safe="") + return f"{base_url}/agenthub_/llm/raw/vendor/{vendor}/model/{encoded_model}/completions" + + +class _AsyncUrlRewriteTransport(httpx.AsyncBaseTransport): + """Rewrites outgoing HTTP requests to route through UiPath Gateway.""" + + def __init__( + self, + gateway_url: str, + api_flavor: str, + wrapped: httpx.AsyncBaseTransport | None = None, + ) -> None: + self._gateway_url = gateway_url + self._api_flavor = api_flavor + self._wrapped = wrapped or httpx.AsyncHTTPTransport( + **get_httpx_client_kwargs() + ) + + async def handle_async_request(self, request: httpx.Request) -> httpx.Response: + # Detect if streaming from request body + streaming = self._is_streaming(request) + + # Rewrite URL + request.url = httpx.URL(self._gateway_url) + + # Inject headers + headers = _get_uipath_headers(streaming=streaming) + headers["X-UiPath-LlmGateway-ApiFlavor"] = self._api_flavor + for key, value in headers.items(): + request.headers[key] = value + + return await self._wrapped.handle_async_request(request) + + def _is_streaming(self, request: httpx.Request) -> bool: + """Detect streaming from request body.""" + # Read body and check for "stream": true + try: + import json + body = json.loads(request.content) + return body.get("stream", False) + except Exception: + return False + + +class _SyncUrlRewriteTransport(httpx.BaseTransport): + """Sync version of URL rewrite transport.""" + + def __init__( + self, + gateway_url: str, + api_flavor: str, + wrapped: httpx.BaseTransport | None = None, + ) -> None: + self._gateway_url = gateway_url + self._api_flavor = api_flavor + self._wrapped = wrapped or httpx.HTTPTransport( + **get_httpx_client_kwargs() + ) + + def handle_request(self, request: httpx.Request) -> httpx.Response: + streaming = False + try: + import json + body = json.loads(request.content) + streaming = body.get("stream", False) + except Exception: + pass + + request.url = httpx.URL(self._gateway_url) + headers = _get_uipath_headers(streaming=streaming) + headers["X-UiPath-LlmGateway-ApiFlavor"] = self._api_flavor + for key, value in headers.items(): + request.headers[key] = value + + return self._wrapped.handle_request(request) +``` + +#### Per-Provider Integration + +**OpenAI models** (most common): +```python +"""UiPath OpenAI chat model for .""" + +from __future__ import annotations + +from functools import cached_property + +# Import from your framework's OpenAI integration +from .llms import OpenAIChat # or whatever the class is + + +class UiPathOpenAI(OpenAIChat): + """OpenAI chat model routed through UiPath Gateway.""" + + model: str = "gpt-4.1-2025-04-14" + + @cached_property + def _client(self): + """Override SDK client to inject gateway transport.""" + import openai + + gateway_url = _build_gateway_url("openai", self.model) + transport = _AsyncUrlRewriteTransport( + gateway_url=gateway_url, + api_flavor="OpenAiChatCompletions", + ) + http_client = httpx.AsyncClient(transport=transport) + return openai.AsyncOpenAI( + api_key="gateway-managed", # Not used — gateway handles auth + http_client=http_client, + ) +``` + +**Anthropic models:** +```python +class UiPathAnthropic(FrameworkAnthropicBase): + """Anthropic model routed through UiPath Gateway (via AWS Bedrock).""" + + @cached_property + def _client(self): + import anthropic + + gateway_url = _build_gateway_url("awsbedrock", self.model) + transport = _AsyncUrlRewriteTransport( + gateway_url=gateway_url, + api_flavor="invoke", + ) + http_client = httpx.AsyncClient(transport=transport) + return anthropic.AsyncAnthropic( + api_key="gateway-managed", + http_client=http_client, + ) +``` + +**Gemini models:** +```python +class UiPathGemini(FrameworkGeminiBase): + """Gemini model routed through UiPath Gateway (via Vertex AI).""" + + @cached_property + def api_client(self): + from google import genai + from google.genai import types + + gateway_url = _build_gateway_url("vertexai", self.model) + transport = _AsyncUrlRewriteTransport( + gateway_url=gateway_url, + api_flavor="GeminiGenerateContent", + ) + http_client = httpx.AsyncClient(transport=transport) + return genai.Client( + api_key="gateway-managed", + http_options=types.HttpOptions( + httpx_async_client=http_client, + ), + ) +``` + +#### SSL Context + +Always use UiPath's SSL context helper for proper certificate handling: + +```python +from uipath._utils._ssl_context import get_httpx_client_kwargs + +# Use in transport creation: +transport = httpx.AsyncHTTPTransport(**get_httpx_client_kwargs()) +``` + +#### Lazy Imports for Optional Dependencies + +LLM providers are optional — not every project needs all providers. Use lazy imports: + +```python +# In chat/__init__.py +def __getattr__(name: str): + if name == "UiPathOpenAI": + from .openai import UiPathOpenAI + return UiPathOpenAI + if name == "UiPathAnthropic": + from .anthropic import UiPathAnthropic + return UiPathAnthropic + if name == "UiPathGemini": + from .gemini import UiPathGemini + return UiPathGemini + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") +``` + +#### QUALITY GATE 8 +``` +✓ Gateway URL is correctly constructed with vendor and model +✓ X-UiPath-LlmGateway-ApiFlavor header is set (without it → 500) +✓ Authorization header includes Bearer token +✓ Streaming detection works (X-UiPath-Streaming-Enabled) +✓ SSL context uses get_httpx_client_kwargs() +✓ Provider classes are lazily imported +✓ API key is set to placeholder (gateway handles actual auth) +``` + +--- + +### Step 9: CLI Middleware & Scaffolding + +> Register CLI commands so `uipath new` can scaffold projects for your framework. + +#### middlewares.py + +```python +"""CLI middleware registration for integration.""" + +from __future__ import annotations + +from uipath._cli._middlewares import Middlewares + + +def register_middleware() -> None: + """Register the new-project middleware.""" + Middlewares.register("new", __new_middleware) + + +def __new_middleware(next_handler, **kwargs): + """Middleware for 'uipath new' command — offers as an option.""" + from ._cli.cli_new import _new + return _new(next_handler, **kwargs) +``` + +#### cli_new.py — Scaffolding Command + +```python +"""Scaffolding for new projects.""" + +from __future__ import annotations + +import shutil +from pathlib import Path + + +def _new(next_handler, **kwargs): + """Create a new project from template.""" + template_dir = Path(__file__).parent / "_templates" + + # Copy template files + shutil.copy(template_dir / "main.py.template", "main.py") + shutil.copy(template_dir / "config.json.template", ".json") + + # Generate pyproject.toml if not exists + if not Path("pyproject.toml").exists(): + _generate_pyproject() + + print(f"Created project. Next steps:") + print(f" 1. Edit main.py to define your agent") + print(f" 2. Run: uipath init") + print(f" 3. Run: uipath run ") +``` + +#### Templates + +**_templates/main.py.template:** +```python +""" agent definition.""" + +from import Agent # Adjust import for your framework + + +# Define your agent here +agent = Agent( + name="my_agent", + instructions="You are a helpful assistant.", + # tools=[...], +) +``` + +**_templates/config.json.template:** +```json +{ + "agents": { + "agent": "main.py:agent" + } +} +``` + +#### QUALITY GATE 9 +``` +✓ `uipath new` shows the framework as an option +✓ Template files are created in the current directory +✓ Generated main.py is valid Python (no syntax errors) +✓ Generated config JSON is valid and references main.py:agent +✓ `uipath init` successfully discovers the generated entrypoint +``` + +--- + +## Phase 3 — Integration Polish + +### Tests + +#### conftest.py + +```python +"""Test fixtures for integration.""" + +import pytest +from click.testing import CliRunner + + +@pytest.fixture +def runner() -> CliRunner: + return CliRunner() + + +@pytest.fixture +def sample_config(tmp_path): + """Create a sample config file.""" + config = tmp_path / ".json" + config.write_text('{"agents": {"test_agent": "main.py:agent"}}') + return config +``` + +#### test_schema.py + +```python +"""Tests for schema inference.""" + +import pytest +from uipath_.runtime.schema import build_graph + + +def test_graph_has_unique_node_ids(): + """Node IDs must be unique — they're breakpoint targets.""" + graph = build_graph(mock_agent) + ids = [node.id for node in graph.nodes] + assert len(ids) == len(set(ids)) + + +def test_schema_has_input_output(): + """Schema must have valid input and output JSON schemas.""" + schema = runtime.get_schema() + assert "type" in schema.input + assert "type" in schema.output +``` + +#### test_runtime.py + +```python +"""Tests for runtime execution.""" + +import pytest +from uipath.runtime import UiPathExecuteOptions, UiPathRuntimeStatus + + +@pytest.mark.asyncio +async def test_execute_returns_successful(): + result = await runtime.execute({"messages": "hello"}, UiPathExecuteOptions()) + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output is not None + + +@pytest.mark.asyncio +async def test_stream_yields_state_events(): + from uipath.runtime.events import UiPathRuntimeStateEvent, UiPathRuntimeStatePhase + + events = [] + async for event in runtime.stream({"messages": "hello"}, UiPathStreamOptions()): + events.append(event) + + state_events = [e for e in events if isinstance(e, UiPathRuntimeStateEvent)] + assert any(e.phase == UiPathRuntimeStatePhase.STARTED for e in state_events) + assert any(e.phase == UiPathRuntimeStatePhase.COMPLETED for e in state_events) +``` + +### README.md Template + +```markdown +# UiPath Integration + +UiPath runtime integration for [](framework-url). + +## Installation + +```bash +pip install uipath- +``` + +## Quick Start + +1. Create your agent in `main.py` +2. Create `.json` with entrypoint mapping +3. Run `uipath init` to generate schemas +4. Run `uipath run ` to execute + +## Configuration + +`.json`: +```json +{ + "agents": { + "agent": "main.py:agent" + } +} +``` + +## Features + +| Feature | Supported | +|---------|-----------| +| Execute | Yes | +| Streaming | Yes/No | +| HITL | Yes/No | +| Breakpoints | Yes/No | +| LLM Gateway | Yes/No | +``` + +--- + +## Quick Reference — All Key Imports + +```python +# Core protocols +from uipath.runtime import ( + UiPathRuntimeProtocol, + UiPathRuntimeFactoryProtocol, + UiPathRuntimeFactoryRegistry, + UiPathRuntimeFactorySettings, + UiPathRuntimeContext, +) + +# Execution +from uipath.runtime import ( + UiPathExecuteOptions, + UiPathStreamOptions, + UiPathRuntimeResult, + UiPathRuntimeStatus, # SUCCESSFUL, FAULTED, SUSPENDED + UiPathStreamNotSupportedError, +) + +# Events +from uipath.runtime import UiPathRuntimeEvent +from uipath.runtime.events import ( + UiPathRuntimeEventType, # RUNTIME_MESSAGE, RUNTIME_STATE, RUNTIME_ERROR, RUNTIME_RESULT + UiPathRuntimeStatePhase, # STARTED, UPDATED, COMPLETED, FAULTED + UiPathRuntimeMessageEvent, + UiPathRuntimeStateEvent, +) + +# Schema & Graph +from uipath.runtime import UiPathRuntimeSchema +from uipath.runtime.schema import ( + UiPathRuntimeGraph, + UiPathRuntimeNode, + UiPathRuntimeEdge, + transform_references, + transform_nullable_types, + transform_attachments, +) + +# HITL / Resumable +from uipath.runtime import ( + UiPathResumableRuntime, + UiPathResumableStorageProtocol, + UiPathResumeTriggerProtocol, + UiPathResumeTrigger, + UiPathResumeTriggerType, # API, JOB, TASK, QUEUE_ITEM, ... + UiPathResumeTriggerName, + UiPathApiTrigger, + UiPathRuntimeStorageProtocol, +) + +# Debug (you don't implement these — the CLI applies them) +from uipath.runtime import ( + UiPathDebugProtocol, + UiPathDebugRuntime, + UiPathBreakpointResult, + UiPathDebugQuitError, +) + +# Chat (you don't implement this — the CLI applies it) +from uipath.runtime import ( + UiPathChatProtocol, + UiPathChatRuntime, +) + +# Execution wrapper (you don't implement this — the CLI applies it) +from uipath.runtime import UiPathExecutionRuntime + +# Serialization +from uipath.core.serialization import serialize_json + +# Errors +from uipath.runtime.errors import ( + UiPathBaseRuntimeError, + UiPathErrorContract, + UiPathErrorCategory, # USER, SYSTEM, DEPLOYMENT, UNKNOWN + UiPathErrorCode, +) + +# SSL (for LLM Gateway) +from uipath._utils._ssl_context import get_httpx_client_kwargs +``` + +--- + +## Checklist — Integration Completeness + +### Required (all tiers) +- [ ] `pyproject.toml` with entry points for `uipath.runtime.factories` and `uipath.middlewares` +- [ ] Config file parser (`.json`) +- [ ] Agent loader (dynamic import from `file.py:variable`) +- [ ] Error codes and exception class +- [ ] `get_schema()` returning `UiPathRuntimeSchema` with input/output schemas +- [ ] `execute()` returning `UiPathRuntimeResult` +- [ ] Factory with `discover_entrypoints()` and `new_runtime()` +- [ ] Registration function for `UiPathRuntimeFactoryRegistry` +- [ ] CLI middleware for `uipath new` + +### Required for CORE+ tier +- [ ] `stream()` yielding `UiPathRuntimeEvent` objects +- [ ] `UiPathRuntimeStateEvent` with STARTED/COMPLETED phases per node +- [ ] `node_name` in state events matches `UiPathRuntimeNode.id` in graph +- [ ] `UiPathRuntimeMessageEvent` for LLM output +- [ ] Final event is always `UiPathRuntimeResult` +- [ ] Graph building with nodes and edges for visualization + +### Required for FULL tier +- [ ] SQLite storage for resume triggers +- [ ] Suspension detection (framework interrupts → SUSPENDED status) +- [ ] `UiPathResumeTrigger` creation with proper trigger types +- [ ] `UiPathResumableRuntime` wrapping in factory +- [ ] Resume support (`options.resume=True`) + +### Optional (recommended) +- [ ] Breakpoint support (`UiPathBreakpointResult` for node-level debugging) +- [ ] LLM provider classes routing through UiPath Gateway +- [ ] `qualified_node_name` for subgraph hierarchy in state events +- [ ] Agent caching with `asyncio.Lock` in factory (only if framework allows concurrent reuse) +- [ ] Lazy imports for optional dependencies +- [ ] Sample project in `samples/` +- [ ] Integration tests + +--- + +## Reference: Existing Integrations + +Study these for real-world patterns: + +| Integration | Package | Config File | Streaming | HITL | LLM Gateway | +|-------------|---------|-------------|-----------|------|-------------| +| **LangGraph** | `uipath-langchain-python` | `langgraph.json` | Yes (async gen) | Yes | Yes (OpenAI) | +| **OpenAI Agents** | `uipath-openai-agents` | `openai_agents.json` | Yes (async gen) | No | Yes (OpenAI) | +| **LlamaIndex** | `uipath-llamaindex` | `llama_index.json` | Yes (async gen) | Yes | Yes (Bedrock, Vertex) | +| **Google ADK** | `uipath-google-adk` | `google_adk.json` | Yes (async gen) | Partial | Yes (OpenAI, Anthropic, Gemini) | +| **Agent Framework** | `uipath-agent-framework` | `agent_framework.json` | Yes (async gen) | Yes | No |