From 24203662844f95859b7716398bc8ff59d4adeb13 Mon Sep 17 00:00:00 2001 From: alliscode Date: Fri, 13 Feb 2026 11:41:21 -0800 Subject: [PATCH] Python: Fix HTTP call to MCP losing tracing parent (#3623) Add OpenTelemetry trace context propagation to MCPStreamableHTTPTool's httpx client via an event hook that injects W3C traceparent/tracestate headers into outgoing HTTP requests. This ensures the remote MCP server's spans are correctly parented under the agent framework's Execute Tool span. - Add _inject_otel_context async event hook using opentelemetry.propagate.inject - Add _get_instrumented_httpx_client to ensure all httpx clients have the hook - Cache auto-created httpx clients; clean up in close() - Update and add tests for trace context injection Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/core/agent_framework/_mcp.py | 42 ++++++++++- python/packages/core/tests/core/test_mcp.py | 73 ++++++++++++++++++-- 2 files changed, 106 insertions(+), 9 deletions(-) diff --git a/python/packages/core/agent_framework/_mcp.py b/python/packages/core/agent_framework/_mcp.py index d9a2b5579d..1c6e78e0e7 100644 --- a/python/packages/core/agent_framework/_mcp.py +++ b/python/packages/core/agent_framework/_mcp.py @@ -61,6 +61,19 @@ class MCPSpecificApproval(TypedDict, total=False): logger = logging.getLogger(__name__) + +async def _inject_otel_context(request: httpx.Request) -> None: # noqa: RUF029 + """Inject OpenTelemetry trace context into outgoing HTTP request headers. + + Used as an httpx event hook to propagate the active span context + (W3C ``traceparent`` / ``tracestate``) so that remote MCP servers + can correlate their spans with the calling agent's trace. + """ + from opentelemetry.propagate import inject + + inject(carrier=request.headers) + + # region: Helpers LOG_LEVEL_MAPPING: dict[types.LoggingLevel, int] = { @@ -1244,6 +1257,25 @@ def __init__( self.url = url self.terminate_on_close = terminate_on_close self._httpx_client: httpx.AsyncClient | None = http_client + self._auto_httpx_client: httpx.AsyncClient | None = None + + def _get_instrumented_httpx_client(self) -> httpx.AsyncClient: + """Return an httpx client with OpenTelemetry trace context propagation. + + If a user-provided client exists, the trace injection hook is added to it + (idempotently). Otherwise, an internally managed client is created and cached. + """ + if self._httpx_client is not None: + client = self._httpx_client + if _inject_otel_context not in client.event_hooks.get("request", []): + client.event_hooks.setdefault("request", []).append(_inject_otel_context) + return client + + if self._auto_httpx_client is None: + self._auto_httpx_client = httpx.AsyncClient( + event_hooks={"request": [_inject_otel_context]} + ) + return self._auto_httpx_client def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]: """Get an MCP streamable HTTP client. @@ -1251,13 +1283,19 @@ def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]: Returns: An async context manager for the streamable HTTP client transport. """ - # Pass the http_client (which may be None) to streamable_http_client return streamable_http_client( url=self.url, - http_client=self._httpx_client, + http_client=self._get_instrumented_httpx_client(), terminate_on_close=self.terminate_on_close if self.terminate_on_close is not None else True, ) + async def close(self) -> None: + """Disconnect from the MCP server and clean up resources.""" + await super().close() + if self._auto_httpx_client is not None: + await self._auto_httpx_client.aclose() + self._auto_httpx_client = None + class MCPWebsocketTool(MCPTool): """MCP tool for connecting to WebSocket-based MCP servers. diff --git a/python/packages/core/tests/core/test_mcp.py b/python/packages/core/tests/core/test_mcp.py index 38cb243412..576558f34f 100644 --- a/python/packages/core/tests/core/test_mcp.py +++ b/python/packages/core/tests/core/test_mcp.py @@ -1489,13 +1489,14 @@ def test_mcp_streamable_http_tool_get_mcp_client_all_params(): with patch("agent_framework._mcp.streamable_http_client") as mock_http_client: tool.get_mcp_client() - # Verify streamable_http_client was called with None for http_client - # (since we didn't provide one, the API will create its own) - mock_http_client.assert_called_once_with( - url="http://example.com", - http_client=None, - terminate_on_close=True, - ) + # An auto-created httpx client with OTel trace injection should be passed + mock_http_client.assert_called_once() + call_kwargs = mock_http_client.call_args.kwargs + assert call_kwargs["url"] == "http://example.com" + assert call_kwargs["terminate_on_close"] is True + # The http_client should be an auto-created instrumented client, not None + assert call_kwargs["http_client"] is not None + assert call_kwargs["http_client"] is tool._auto_httpx_client def test_mcp_websocket_tool_get_mcp_client_with_kwargs(): @@ -1696,6 +1697,7 @@ async def test_mcp_streamable_http_tool_httpx_client_cleanup(): # Test 2: Tool with user-provided client user_client = Mock() + user_client.event_hooks = {"request": [], "response": []} tool2 = MCPStreamableHTTPTool( name="test", url="http://localhost:8081/mcp", @@ -1715,6 +1717,63 @@ async def test_mcp_streamable_http_tool_httpx_client_cleanup(): assert call_args.kwargs["http_client"] is user_client, "User's client should be passed through" +async def test_mcp_streamable_http_tool_otel_trace_injection(): + """Test that MCPStreamableHTTPTool injects OpenTelemetry trace context into outgoing requests.""" + from agent_framework._mcp import _inject_otel_context + + # Test 1: Auto-created client gets the OTel hook + tool = MCPStreamableHTTPTool( + name="test", + url="http://localhost:8081/mcp", + ) + with patch("agent_framework._mcp.streamable_http_client") as mock_client: + tool.get_mcp_client() + call_kwargs = mock_client.call_args.kwargs + client = call_kwargs["http_client"] + assert _inject_otel_context in client.event_hooks["request"], ( + "Auto-created client should have OTel trace injection hook" + ) + + # Test 2: User-provided client gets the OTel hook added + import httpx + + user_client = httpx.AsyncClient() + tool2 = MCPStreamableHTTPTool( + name="test", + url="http://localhost:8081/mcp", + http_client=user_client, + ) + with patch("agent_framework._mcp.streamable_http_client") as mock_client: + tool2.get_mcp_client() + assert _inject_otel_context in user_client.event_hooks["request"], ( + "User-provided client should have OTel trace injection hook" + ) + + # Test 3: Hook is not duplicated on repeated calls + with patch("agent_framework._mcp.streamable_http_client") as mock_client: + tool2.get_mcp_client() + tool2.get_mcp_client() + count = user_client.event_hooks["request"].count(_inject_otel_context) + assert count == 1, f"Hook should appear exactly once, found {count}" + + await user_client.aclose() + + # Test 4: Auto-created client is cached and reused + tool3 = MCPStreamableHTTPTool( + name="test", + url="http://localhost:8081/mcp", + ) + with patch("agent_framework._mcp.streamable_http_client") as mock_client: + tool3.get_mcp_client() + first_client = tool3._auto_httpx_client + tool3.get_mcp_client() + assert tool3._auto_httpx_client is first_client, "Auto-created client should be reused" + + # Test 5: close() cleans up auto-created client + await tool3.close() + assert tool3._auto_httpx_client is None, "Auto-created client should be cleaned up after close()" + + async def test_load_tools_with_pagination(): """Test that load_tools handles pagination correctly.""" from unittest.mock import AsyncMock, MagicMock