From f4f3b84258269c23c9f4b7d09a8182c1ab14fbbd Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 19 Feb 2026 10:58:08 -0800 Subject: [PATCH 01/12] Nexus error ser. --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 7720ae21d..5faeb6761 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -230,3 +230,6 @@ exclude = ["temporalio/bridge/target/**/*"] [tool.uv] # Prevent uv commands from building the package by default package = false + +[tool.uv.sources] +nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python.git", branch = "amazzeo/add-failure" } From d8cafbf5ebc7a3f75380c6a6a2351ce034dfcbe5 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 25 Feb 2026 14:08:16 -0800 Subject: [PATCH 02/12] Update to nexus-rpc 1.4.0 --- pyproject.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5faeb6761..7720ae21d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -230,6 +230,3 @@ exclude = ["temporalio/bridge/target/**/*"] [tool.uv] # Prevent uv commands from building the package by default package = false - -[tool.uv.sources] -nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python.git", branch = "amazzeo/add-failure" } From 062f5d4380cbb9da4ee0166a0cb77c80b6137cb1 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 15 Jan 2026 12:12:41 -0800 Subject: [PATCH 03/12] Populate Nexus request deadline into operation contexts when present on the task received from Core. --- pyproject.toml | 3 + temporalio/bridge/proto/nexus/nexus_pb2.py | 22 ++++---- temporalio/worker/_nexus.py | 38 ++++++++++--- tests/nexus/test_workflow_run_operation.py | 65 +++++++++++++++++++++- uv.lock | 10 +--- 5 files changed, 109 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 7720ae21d..2e270bde4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -230,3 +230,6 @@ exclude = ["temporalio/bridge/target/**/*"] [tool.uv] # Prevent uv commands from building the package by default package = false + +[tool.uv.sources] +nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python" } diff --git a/temporalio/bridge/proto/nexus/nexus_pb2.py b/temporalio/bridge/proto/nexus/nexus_pb2.py index 2a1d8b786..cba925157 100644 --- a/temporalio/bridge/proto/nexus/nexus_pb2.py +++ b/temporalio/bridge/proto/nexus/nexus_pb2.py @@ -34,7 +34,7 @@ ) DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n#temporal/sdk/core/nexus/nexus.proto\x12\rcoresdk.nexus\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a#temporal/api/nexus/v1/message.proto\x1a\x36temporal/api/workflowservice/v1/request_response.proto\x1a%temporal/sdk/core/common/common.proto"\xf8\x01\n\x14NexusOperationResult\x12\x34\n\tcompleted\x18\x01 \x01(\x0b\x32\x1f.temporal.api.common.v1.PayloadH\x00\x12\x32\n\x06\x66\x61iled\x18\x02 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\tcancelled\x18\x03 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\ttimed_out\x18\x04 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xee\x01\n\x13NexusTaskCompletion\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\tcompleted\x18\x02 \x01(\x0b\x32\x1f.temporal.api.nexus.v1.ResponseH\x00\x12\x38\n\x05\x65rror\x18\x03 \x01(\x0b\x32#.temporal.api.nexus.v1.HandlerErrorB\x02\x18\x01H\x00\x12\x14\n\nack_cancel\x18\x04 \x01(\x08H\x00\x12\x33\n\x07\x66\x61ilure\x18\x05 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xd0\x01\n\tNexusTask\x12K\n\x04task\x18\x01 \x01(\x0b\x32;.temporal.api.workflowservice.v1.PollNexusTaskQueueResponseH\x00\x12\x35\n\x0b\x63\x61ncel_task\x18\x02 \x01(\x0b\x32\x1e.coresdk.nexus.CancelNexusTaskH\x00\x12\x34\n\x10request_deadline\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.TimestampB\t\n\x07variant"[\n\x0f\x43\x61ncelNexusTask\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\x06reason\x18\x02 \x01(\x0e\x32$.coresdk.nexus.NexusTaskCancelReason*;\n\x15NexusTaskCancelReason\x12\r\n\tTIMED_OUT\x10\x00\x12\x13\n\x0fWORKER_SHUTDOWN\x10\x01*\x7f\n\x1eNexusOperationCancellationType\x12\x1f\n\x1bWAIT_CANCELLATION_COMPLETED\x10\x00\x12\x0b\n\x07\x41\x42\x41NDON\x10\x01\x12\x0e\n\nTRY_CANCEL\x10\x02\x12\x1f\n\x1bWAIT_CANCELLATION_REQUESTED\x10\x03\x42+\xea\x02(Temporalio::Internal::Bridge::Api::Nexusb\x06proto3' + b'\n#temporal/sdk/core/nexus/nexus.proto\x12\rcoresdk.nexus\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a#temporal/api/nexus/v1/message.proto\x1a\x36temporal/api/workflowservice/v1/request_response.proto\x1a%temporal/sdk/core/common/common.proto"\xf8\x01\n\x14NexusOperationResult\x12\x34\n\tcompleted\x18\x01 \x01(\x0b\x32\x1f.temporal.api.common.v1.PayloadH\x00\x12\x32\n\x06\x66\x61iled\x18\x02 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\tcancelled\x18\x03 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\ttimed_out\x18\x04 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xb5\x01\n\x13NexusTaskCompletion\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\tcompleted\x18\x02 \x01(\x0b\x32\x1f.temporal.api.nexus.v1.ResponseH\x00\x12\x34\n\x05\x65rror\x18\x03 \x01(\x0b\x32#.temporal.api.nexus.v1.HandlerErrorH\x00\x12\x14\n\nack_cancel\x18\x04 \x01(\x08H\x00\x42\x08\n\x06status"\xd0\x01\n\tNexusTask\x12K\n\x04task\x18\x01 \x01(\x0b\x32;.temporal.api.workflowservice.v1.PollNexusTaskQueueResponseH\x00\x12\x35\n\x0b\x63\x61ncel_task\x18\x02 \x01(\x0b\x32\x1e.coresdk.nexus.CancelNexusTaskH\x00\x12\x34\n\x10request_deadline\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.TimestampB\t\n\x07variant"[\n\x0f\x43\x61ncelNexusTask\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\x06reason\x18\x02 \x01(\x0e\x32$.coresdk.nexus.NexusTaskCancelReason*;\n\x15NexusTaskCancelReason\x12\r\n\tTIMED_OUT\x10\x00\x12\x13\n\x0fWORKER_SHUTDOWN\x10\x01*\x7f\n\x1eNexusOperationCancellationType\x12\x1f\n\x1bWAIT_CANCELLATION_COMPLETED\x10\x00\x12\x0b\n\x07\x41\x42\x41NDON\x10\x01\x12\x0e\n\nTRY_CANCEL\x10\x02\x12\x1f\n\x1bWAIT_CANCELLATION_REQUESTED\x10\x03\x42+\xea\x02(Temporalio::Internal::Bridge::Api::Nexusb\x06proto3' ) _NEXUSTASKCANCELREASON = DESCRIPTOR.enum_types_by_name["NexusTaskCancelReason"] @@ -106,18 +106,16 @@ DESCRIPTOR._serialized_options = ( b"\352\002(Temporalio::Internal::Bridge::Api::Nexus" ) - _NEXUSTASKCOMPLETION.fields_by_name["error"]._options = None - _NEXUSTASKCOMPLETION.fields_by_name["error"]._serialized_options = b"\030\001" - _NEXUSTASKCANCELREASON._serialized_start = 1092 - _NEXUSTASKCANCELREASON._serialized_end = 1151 - _NEXUSOPERATIONCANCELLATIONTYPE._serialized_start = 1153 - _NEXUSOPERATIONCANCELLATIONTYPE._serialized_end = 1280 + _NEXUSTASKCANCELREASON._serialized_start = 1035 + _NEXUSTASKCANCELREASON._serialized_end = 1094 + _NEXUSOPERATIONCANCELLATIONTYPE._serialized_start = 1096 + _NEXUSOPERATIONCANCELLATIONTYPE._serialized_end = 1223 _NEXUSOPERATIONRESULT._serialized_start = 297 _NEXUSOPERATIONRESULT._serialized_end = 545 _NEXUSTASKCOMPLETION._serialized_start = 548 - _NEXUSTASKCOMPLETION._serialized_end = 786 - _NEXUSTASK._serialized_start = 789 - _NEXUSTASK._serialized_end = 997 - _CANCELNEXUSTASK._serialized_start = 999 - _CANCELNEXUSTASK._serialized_end = 1090 + _NEXUSTASKCOMPLETION._serialized_end = 729 + _NEXUSTASK._serialized_start = 732 + _NEXUSTASK._serialized_end = 940 + _CANCELNEXUSTASK._serialized_start = 942 + _CANCELNEXUSTASK._serialized_end = 1033 # @@protoc_insertion_point(module_scope) diff --git a/temporalio/worker/_nexus.py b/temporalio/worker/_nexus.py index eb54dde30..8804bc584 100644 --- a/temporalio/worker/_nexus.py +++ b/temporalio/worker/_nexus.py @@ -8,6 +8,7 @@ import threading from collections.abc import Callable, Mapping, Sequence from dataclasses import dataclass +from datetime import datetime, timezone from functools import reduce from typing import ( Any, @@ -123,10 +124,17 @@ async def raise_from_exception_queue() -> NoReturn: task_cancellation = _NexusTaskCancellation() start_op_task = asyncio.create_task( self._handle_start_operation_task( - task.task_token, - task.request.start_operation, - dict(task.request.header), - task_cancellation, + task_token=task.task_token, + start_request=task.request.start_operation, + headers=dict(task.request.header), + task_cancellation=task_cancellation, + request_deadline=( + nexus_task.request_deadline.ToDatetime().replace( + tzinfo=timezone.utc + ) + if nexus_task.HasField("request_deadline") + else None + ), ) ) self._running_tasks[task.task_token] = _RunningNexusTask( @@ -136,10 +144,17 @@ async def raise_from_exception_queue() -> NoReturn: task_cancellation = _NexusTaskCancellation() cancel_op_task = asyncio.create_task( self._handle_cancel_operation_task( - task.task_token, - task.request.cancel_operation, - dict(task.request.header), - task_cancellation, + task_token=task.task_token, + request=task.request.cancel_operation, + headers=dict(task.request.header), + task_cancellation=task_cancellation, + request_deadline=( + nexus_task.request_deadline.ToDatetime().replace( + tzinfo=timezone.utc + ) + if nexus_task.HasField("request_deadline") + else None + ), ) ) self._running_tasks[task.task_token] = _RunningNexusTask( @@ -209,6 +224,7 @@ async def _handle_cancel_operation_task( request: temporalio.api.nexus.v1.CancelOperationRequest, headers: Mapping[str, str], task_cancellation: nexusrpc.handler.OperationTaskCancellation, + request_deadline: datetime | None, ) -> None: """Handle a cancel operation task. @@ -226,6 +242,7 @@ async def _handle_cancel_operation_task( operation=request.operation, headers=headers, task_cancellation=task_cancellation, + request_deadline=request_deadline, ) temporalio.nexus._operation_context._TemporalCancelOperationContext( info=lambda: Info(task_queue=self._task_queue), @@ -276,6 +293,7 @@ async def _handle_start_operation_task( start_request: temporalio.api.nexus.v1.StartOperationRequest, headers: Mapping[str, str], task_cancellation: nexusrpc.handler.OperationTaskCancellation, + request_deadline: datetime | None, ) -> None: """Handle a start operation task. @@ -285,7 +303,7 @@ async def _handle_start_operation_task( try: try: start_response = await self._start_operation( - start_request, headers, task_cancellation + start_request, headers, task_cancellation, request_deadline ) except asyncio.CancelledError: completion = temporalio.bridge.proto.nexus.NexusTaskCompletion( @@ -328,6 +346,7 @@ async def _start_operation( start_request: temporalio.api.nexus.v1.StartOperationRequest, headers: Mapping[str, str], cancellation: nexusrpc.handler.OperationTaskCancellation, + request_deadline: datetime | None, ) -> temporalio.api.nexus.v1.StartOperationResponse: """Invoke the Nexus handler's start_operation method and construct the StartOperationResponse. @@ -352,6 +371,7 @@ async def _start_operation( ], callback_headers=dict(start_request.callback_header), task_cancellation=cancellation, + request_deadline=request_deadline, ) temporalio.nexus._operation_context._TemporalStartOperationContext( nexus_context=ctx, diff --git a/tests/nexus/test_workflow_run_operation.py b/tests/nexus/test_workflow_run_operation.py index 97517ea1c..57c7b309b 100644 --- a/tests/nexus/test_workflow_run_operation.py +++ b/tests/nexus/test_workflow_run_operation.py @@ -59,6 +59,34 @@ def op(self) -> OperationHandler[Input, str]: return MyOperation() +class RequestDeadlineOperation(WorkflowRunOperationHandler): + """Operation that asserts request_deadline is accessible.""" + + def __init__(self): # type: ignore[reportMissingSuperCall] + pass + + async def start( + self, ctx: StartOperationContext, input: Input + ) -> StartOperationResultAsync: + assert ( + ctx.request_deadline is not None + ), "request_deadline should be set in workflow_run_operation" + tctx = WorkflowRunOperationContext._from_start_operation_context(ctx) + handle = await tctx.start_workflow( + EchoWorkflow.run, + input.value, + id=str(uuid.uuid4()), + ) + return StartOperationResultAsync(handle.to_token()) + + +@service_handler +class RequestDeadlineHandler: + @operation_handler + def op(self) -> OperationHandler[Input, str]: + return RequestDeadlineOperation() + + @service class Service: op: Operation[Input, str] @@ -115,4 +143,39 @@ async def test_workflow_run_operation( id=str(uuid.uuid4()), task_queue=task_queue, ) - assert result == "test" + if hasattr(service_handler_cls, "__expected__error__"): + status_code, message = service_handler_cls.__expected__error__ + assert resp.status_code == status_code + failure = Failure(**resp.json()) + assert re.search(message, failure.message) + else: + assert resp.status_code == 201 + + +async def test_request_deadline_is_accessible_in_workflow_run_operation( + env: WorkflowEnvironment, +): + """Test that request_deadline is accessible in WorkflowRunOperationContext.""" + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + endpoint = (await create_nexus_endpoint(task_queue, env.client)).endpoint.id + assert (service_defn := nexusrpc.get_service_definition(RequestDeadlineHandler)) + service_client = ServiceClient( + server_address=ServiceClient.default_server_address(env), + endpoint=endpoint, + service=service_defn.name, + ) + async with Worker( + env.client, + task_queue=task_queue, + nexus_service_handlers=[RequestDeadlineHandler()], + ): + resp = await service_client.start_operation( + "op", + dataclass_as_dict(Input(value="test")), + {"Request-Timeout": "30s"}, + ) + # The assertion in the handler verified request_deadline was accessible + assert resp.status_code == 201 diff --git a/uv.lock b/uv.lock index 02b882306..619483251 100644 --- a/uv.lock +++ b/uv.lock @@ -1820,15 +1820,11 @@ wheels = [ [[package]] name = "nexus-rpc" -version = "1.4.0" -source = { registry = "https://pypi.org/simple" } +version = "1.3.0" +source = { git = "https://github.com/nexus-rpc/sdk-python#5613448d2bf578619c34d0a99a4779ed9f76de9b" } dependencies = [ { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/35/d5/cd1ffb202b76ebc1b33c1332a3416e55a39929006982adc2b1eb069aaa9b/nexus_rpc-1.4.0.tar.gz", hash = "sha256:3b8b373d4865671789cc43623e3dc0bcbf192562e40e13727e17f1c149050fba", size = 82367, upload-time = "2026-02-25T22:01:34.053Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/11/52/6327a5f4fda01207205038a106a99848a41c83e933cd23ea2cab3d2ebc6c/nexus_rpc-1.4.0-py3-none-any.whl", hash = "sha256:14c953d3519113f8ccec533a9efdb6b10c28afef75d11cdd6d422640c40b3a49", size = 29645, upload-time = "2026-02-25T22:01:33.122Z" }, -] [[package]] name = "nh3" @@ -3124,7 +3120,7 @@ dev = [ requires-dist = [ { name = "grpcio", marker = "extra == 'grpc'", specifier = ">=1.48.2,<2" }, { name = "mcp", marker = "extra == 'openai-agents'", specifier = ">=1.9.4,<2" }, - { name = "nexus-rpc", specifier = "==1.4.0" }, + { name = "nexus-rpc", git = "https://github.com/nexus-rpc/sdk-python" }, { name = "openai-agents", marker = "extra == 'openai-agents'", specifier = ">=0.3,<0.7" }, { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-sdk", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, From 678d39d57020c2e89a98447f4a51493d4f5f44bf Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 15 Jan 2026 13:09:55 -0800 Subject: [PATCH 04/12] Add test to confirm request deadline is present in cancel operation contexts --- tests/helpers/nexus.py | 111 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) diff --git a/tests/helpers/nexus.py b/tests/helpers/nexus.py index d3142f74a..73f33f215 100644 --- a/tests/helpers/nexus.py +++ b/tests/helpers/nexus.py @@ -1,3 +1,114 @@ def make_nexus_endpoint_name(task_queue: str) -> str: # Create endpoints for different task queues without name collisions. return f"nexus-endpoint-{task_queue}" + + +# TODO(nexus-preview): How do we recommend that users create endpoints in their own tests? +# See https://github.com/temporalio/sdk-typescript/pull/1708/files?show-viewed-files=true&file-filters%5B%5D=&w=0#r2082549085 +async def create_nexus_endpoint( + task_queue: str, client: Client +) -> temporalio.api.operatorservice.v1.CreateNexusEndpointResponse: + name = make_nexus_endpoint_name(task_queue) + return await client.operator_service.create_nexus_endpoint( + temporalio.api.operatorservice.v1.CreateNexusEndpointRequest( + spec=temporalio.api.nexus.v1.EndpointSpec( + name=name, + target=temporalio.api.nexus.v1.EndpointTarget( + worker=temporalio.api.nexus.v1.EndpointTarget.Worker( + namespace=client.namespace, + task_queue=task_queue, + ) + ), + ) + ) + ) + + +@dataclass +class ServiceClient: + server_address: str # E.g. http://127.0.0.1:7243 + endpoint: str + service: str + + async def start_operation( + self, + operation: str, + body: dict[str, Any] | None = None, + headers: Mapping[str, str] = {}, + ) -> httpx.Response: + """ + Start a Nexus operation. + """ + # TODO(nexus-preview): Support callback URL as query param + async with httpx.AsyncClient() as http_client: + return await http_client.post( + f"http://{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}", + json=body, + headers=headers, + ) + + async def cancel_operation( + self, + operation: str, + token: str, + headers: Mapping[str, str] = {}, + ) -> httpx.Response: + async with httpx.AsyncClient() as http_client: + return await http_client.post( + f"http://{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}/cancel", + # Token can also be sent as "Nexus-Operation-Token" header + params={"token": token}, + headers=headers, + ) + + @staticmethod + def default_server_address(env: WorkflowEnvironment) -> str: + # TODO(nexus-preview): nexus tests are making http requests directly but this is + # not officially supported. + parsed = urlparse(env.client.service_client.config.target_host) + host = parsed.hostname or "127.0.0.1" + http_port = getattr(env, "_http_port", 7243) + return f"{host}:{http_port}" + + +def dataclass_as_dict(dataclass: Any) -> dict[str, Any]: + """ + Return a shallow dict of the dataclass's fields. + + dataclasses.as_dict goes too far (attempts to pickle values) + """ + return { + field.name: getattr(dataclass, field.name) + for field in dataclasses.fields(dataclass) + } + + +@dataclass +class Failure: + """A Nexus Failure object, with details parsed into an exception. + + https://github.com/nexus-rpc/api/blob/main/SPEC.md#failure + """ + + message: str = "" + metadata: dict[str, str] | None = None + details: dict[str, Any] | None = None + + exception_from_details: BaseException | None = dataclasses.field( + init=False, default=None + ) + + def __post_init__(self) -> None: + if self.metadata and (error_type := self.metadata.get("type")): + self.exception_from_details = self._instantiate_exception( + error_type, self.details + ) + + def _instantiate_exception( + self, error_type: str, _details: dict[str, Any] | None + ) -> BaseException: + proto = { + "temporal.api.failure.v1.Failure": temporalio.api.failure.v1.Failure, + }[error_type]() + json_format.ParseDict(self.details, proto, ignore_unknown_fields=True) + return FailureConverter.default.from_failure(proto, PayloadConverter.default) From cd11c0e21bc7d72c0bf88ff79100a08337cbb446 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 15 Jan 2026 13:18:04 -0800 Subject: [PATCH 05/12] Update request deadline tests for workflow_run_operation to reflect how users will invoke rather than using a private api --- tests/nexus/test_workflow_run_operation.py | 31 +++++++--------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/tests/nexus/test_workflow_run_operation.py b/tests/nexus/test_workflow_run_operation.py index 57c7b309b..f7b2dfe9c 100644 --- a/tests/nexus/test_workflow_run_operation.py +++ b/tests/nexus/test_workflow_run_operation.py @@ -13,9 +13,8 @@ ) from nexusrpc.handler._decorators import operation_handler -from temporalio import workflow -from temporalio.client import Client -from temporalio.nexus import WorkflowRunOperationContext +from temporalio import nexus, workflow +from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation from temporalio.nexus._operation_handlers import WorkflowRunOperationHandler from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker @@ -59,32 +58,20 @@ def op(self) -> OperationHandler[Input, str]: return MyOperation() -class RequestDeadlineOperation(WorkflowRunOperationHandler): - """Operation that asserts request_deadline is accessible.""" - - def __init__(self): # type: ignore[reportMissingSuperCall] - pass - - async def start( - self, ctx: StartOperationContext, input: Input - ) -> StartOperationResultAsync: +@service_handler +class RequestDeadlineHandler: + @workflow_run_operation + async def op( + self, ctx: WorkflowRunOperationContext, input: Input + ) -> nexus.WorkflowHandle[str]: assert ( ctx.request_deadline is not None ), "request_deadline should be set in workflow_run_operation" - tctx = WorkflowRunOperationContext._from_start_operation_context(ctx) - handle = await tctx.start_workflow( + return await ctx.start_workflow( EchoWorkflow.run, input.value, id=str(uuid.uuid4()), ) - return StartOperationResultAsync(handle.to_token()) - - -@service_handler -class RequestDeadlineHandler: - @operation_handler - def op(self) -> OperationHandler[Input, str]: - return RequestDeadlineOperation() @service From b0f2ac238680448fbeb2d7ef82fb7bf07cb6f838 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 15 Jan 2026 13:24:11 -0800 Subject: [PATCH 06/12] refactor request deadline out of if branches in nexus worker --- temporalio/worker/_nexus.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/temporalio/worker/_nexus.py b/temporalio/worker/_nexus.py index 8804bc584..35339cc66 100644 --- a/temporalio/worker/_nexus.py +++ b/temporalio/worker/_nexus.py @@ -120,6 +120,13 @@ async def raise_from_exception_queue() -> NoReturn: if nexus_task.HasField("task"): task = nexus_task.task + request_deadline = ( + nexus_task.request_deadline.ToDatetime().replace( + tzinfo=timezone.utc + ) + if nexus_task.HasField("request_deadline") + else None + ) if task.request.HasField("start_operation"): task_cancellation = _NexusTaskCancellation() start_op_task = asyncio.create_task( @@ -128,13 +135,7 @@ async def raise_from_exception_queue() -> NoReturn: start_request=task.request.start_operation, headers=dict(task.request.header), task_cancellation=task_cancellation, - request_deadline=( - nexus_task.request_deadline.ToDatetime().replace( - tzinfo=timezone.utc - ) - if nexus_task.HasField("request_deadline") - else None - ), + request_deadline=request_deadline, ) ) self._running_tasks[task.task_token] = _RunningNexusTask( @@ -148,13 +149,7 @@ async def raise_from_exception_queue() -> NoReturn: request=task.request.cancel_operation, headers=dict(task.request.header), task_cancellation=task_cancellation, - request_deadline=( - nexus_task.request_deadline.ToDatetime().replace( - tzinfo=timezone.utc - ) - if nexus_task.HasField("request_deadline") - else None - ), + request_deadline=request_deadline, ) ) self._running_tasks[task.task_token] = _RunningNexusTask( From 3cbcc94c72ea5378d2bf6922969f14608e784072 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 25 Feb 2026 15:18:51 -0800 Subject: [PATCH 07/12] Fix bad rebase in pyproject.toml --- pyproject.toml | 3 --- uv.lock | 10 +++++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2e270bde4..7720ae21d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -230,6 +230,3 @@ exclude = ["temporalio/bridge/target/**/*"] [tool.uv] # Prevent uv commands from building the package by default package = false - -[tool.uv.sources] -nexus-rpc = { git = "https://github.com/nexus-rpc/sdk-python" } diff --git a/uv.lock b/uv.lock index 619483251..02b882306 100644 --- a/uv.lock +++ b/uv.lock @@ -1820,11 +1820,15 @@ wheels = [ [[package]] name = "nexus-rpc" -version = "1.3.0" -source = { git = "https://github.com/nexus-rpc/sdk-python#5613448d2bf578619c34d0a99a4779ed9f76de9b" } +version = "1.4.0" +source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "typing-extensions" }, ] +sdist = { url = "https://files.pythonhosted.org/packages/35/d5/cd1ffb202b76ebc1b33c1332a3416e55a39929006982adc2b1eb069aaa9b/nexus_rpc-1.4.0.tar.gz", hash = "sha256:3b8b373d4865671789cc43623e3dc0bcbf192562e40e13727e17f1c149050fba", size = 82367, upload-time = "2026-02-25T22:01:34.053Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/11/52/6327a5f4fda01207205038a106a99848a41c83e933cd23ea2cab3d2ebc6c/nexus_rpc-1.4.0-py3-none-any.whl", hash = "sha256:14c953d3519113f8ccec533a9efdb6b10c28afef75d11cdd6d422640c40b3a49", size = 29645, upload-time = "2026-02-25T22:01:33.122Z" }, +] [[package]] name = "nh3" @@ -3120,7 +3124,7 @@ dev = [ requires-dist = [ { name = "grpcio", marker = "extra == 'grpc'", specifier = ">=1.48.2,<2" }, { name = "mcp", marker = "extra == 'openai-agents'", specifier = ">=1.9.4,<2" }, - { name = "nexus-rpc", git = "https://github.com/nexus-rpc/sdk-python" }, + { name = "nexus-rpc", specifier = "==1.4.0" }, { name = "openai-agents", marker = "extra == 'openai-agents'", specifier = ">=0.3,<0.7" }, { name = "opentelemetry-api", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, { name = "opentelemetry-sdk", marker = "extra == 'opentelemetry'", specifier = ">=1.11.1,<2" }, From 915a9fb0cb6c0d69e8810672fdece28c66b4e6ad Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 25 Feb 2026 15:25:43 -0800 Subject: [PATCH 08/12] gen protos --- temporalio/bridge/proto/nexus/nexus_pb2.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/temporalio/bridge/proto/nexus/nexus_pb2.py b/temporalio/bridge/proto/nexus/nexus_pb2.py index cba925157..2a1d8b786 100644 --- a/temporalio/bridge/proto/nexus/nexus_pb2.py +++ b/temporalio/bridge/proto/nexus/nexus_pb2.py @@ -34,7 +34,7 @@ ) DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n#temporal/sdk/core/nexus/nexus.proto\x12\rcoresdk.nexus\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a#temporal/api/nexus/v1/message.proto\x1a\x36temporal/api/workflowservice/v1/request_response.proto\x1a%temporal/sdk/core/common/common.proto"\xf8\x01\n\x14NexusOperationResult\x12\x34\n\tcompleted\x18\x01 \x01(\x0b\x32\x1f.temporal.api.common.v1.PayloadH\x00\x12\x32\n\x06\x66\x61iled\x18\x02 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\tcancelled\x18\x03 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\ttimed_out\x18\x04 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xb5\x01\n\x13NexusTaskCompletion\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\tcompleted\x18\x02 \x01(\x0b\x32\x1f.temporal.api.nexus.v1.ResponseH\x00\x12\x34\n\x05\x65rror\x18\x03 \x01(\x0b\x32#.temporal.api.nexus.v1.HandlerErrorH\x00\x12\x14\n\nack_cancel\x18\x04 \x01(\x08H\x00\x42\x08\n\x06status"\xd0\x01\n\tNexusTask\x12K\n\x04task\x18\x01 \x01(\x0b\x32;.temporal.api.workflowservice.v1.PollNexusTaskQueueResponseH\x00\x12\x35\n\x0b\x63\x61ncel_task\x18\x02 \x01(\x0b\x32\x1e.coresdk.nexus.CancelNexusTaskH\x00\x12\x34\n\x10request_deadline\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.TimestampB\t\n\x07variant"[\n\x0f\x43\x61ncelNexusTask\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\x06reason\x18\x02 \x01(\x0e\x32$.coresdk.nexus.NexusTaskCancelReason*;\n\x15NexusTaskCancelReason\x12\r\n\tTIMED_OUT\x10\x00\x12\x13\n\x0fWORKER_SHUTDOWN\x10\x01*\x7f\n\x1eNexusOperationCancellationType\x12\x1f\n\x1bWAIT_CANCELLATION_COMPLETED\x10\x00\x12\x0b\n\x07\x41\x42\x41NDON\x10\x01\x12\x0e\n\nTRY_CANCEL\x10\x02\x12\x1f\n\x1bWAIT_CANCELLATION_REQUESTED\x10\x03\x42+\xea\x02(Temporalio::Internal::Bridge::Api::Nexusb\x06proto3' + b'\n#temporal/sdk/core/nexus/nexus.proto\x12\rcoresdk.nexus\x1a\x1fgoogle/protobuf/timestamp.proto\x1a$temporal/api/common/v1/message.proto\x1a%temporal/api/failure/v1/message.proto\x1a#temporal/api/nexus/v1/message.proto\x1a\x36temporal/api/workflowservice/v1/request_response.proto\x1a%temporal/sdk/core/common/common.proto"\xf8\x01\n\x14NexusOperationResult\x12\x34\n\tcompleted\x18\x01 \x01(\x0b\x32\x1f.temporal.api.common.v1.PayloadH\x00\x12\x32\n\x06\x66\x61iled\x18\x02 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\tcancelled\x18\x03 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x12\x35\n\ttimed_out\x18\x04 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xee\x01\n\x13NexusTaskCompletion\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\tcompleted\x18\x02 \x01(\x0b\x32\x1f.temporal.api.nexus.v1.ResponseH\x00\x12\x38\n\x05\x65rror\x18\x03 \x01(\x0b\x32#.temporal.api.nexus.v1.HandlerErrorB\x02\x18\x01H\x00\x12\x14\n\nack_cancel\x18\x04 \x01(\x08H\x00\x12\x33\n\x07\x66\x61ilure\x18\x05 \x01(\x0b\x32 .temporal.api.failure.v1.FailureH\x00\x42\x08\n\x06status"\xd0\x01\n\tNexusTask\x12K\n\x04task\x18\x01 \x01(\x0b\x32;.temporal.api.workflowservice.v1.PollNexusTaskQueueResponseH\x00\x12\x35\n\x0b\x63\x61ncel_task\x18\x02 \x01(\x0b\x32\x1e.coresdk.nexus.CancelNexusTaskH\x00\x12\x34\n\x10request_deadline\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.TimestampB\t\n\x07variant"[\n\x0f\x43\x61ncelNexusTask\x12\x12\n\ntask_token\x18\x01 \x01(\x0c\x12\x34\n\x06reason\x18\x02 \x01(\x0e\x32$.coresdk.nexus.NexusTaskCancelReason*;\n\x15NexusTaskCancelReason\x12\r\n\tTIMED_OUT\x10\x00\x12\x13\n\x0fWORKER_SHUTDOWN\x10\x01*\x7f\n\x1eNexusOperationCancellationType\x12\x1f\n\x1bWAIT_CANCELLATION_COMPLETED\x10\x00\x12\x0b\n\x07\x41\x42\x41NDON\x10\x01\x12\x0e\n\nTRY_CANCEL\x10\x02\x12\x1f\n\x1bWAIT_CANCELLATION_REQUESTED\x10\x03\x42+\xea\x02(Temporalio::Internal::Bridge::Api::Nexusb\x06proto3' ) _NEXUSTASKCANCELREASON = DESCRIPTOR.enum_types_by_name["NexusTaskCancelReason"] @@ -106,16 +106,18 @@ DESCRIPTOR._serialized_options = ( b"\352\002(Temporalio::Internal::Bridge::Api::Nexus" ) - _NEXUSTASKCANCELREASON._serialized_start = 1035 - _NEXUSTASKCANCELREASON._serialized_end = 1094 - _NEXUSOPERATIONCANCELLATIONTYPE._serialized_start = 1096 - _NEXUSOPERATIONCANCELLATIONTYPE._serialized_end = 1223 + _NEXUSTASKCOMPLETION.fields_by_name["error"]._options = None + _NEXUSTASKCOMPLETION.fields_by_name["error"]._serialized_options = b"\030\001" + _NEXUSTASKCANCELREASON._serialized_start = 1092 + _NEXUSTASKCANCELREASON._serialized_end = 1151 + _NEXUSOPERATIONCANCELLATIONTYPE._serialized_start = 1153 + _NEXUSOPERATIONCANCELLATIONTYPE._serialized_end = 1280 _NEXUSOPERATIONRESULT._serialized_start = 297 _NEXUSOPERATIONRESULT._serialized_end = 545 _NEXUSTASKCOMPLETION._serialized_start = 548 - _NEXUSTASKCOMPLETION._serialized_end = 729 - _NEXUSTASK._serialized_start = 732 - _NEXUSTASK._serialized_end = 940 - _CANCELNEXUSTASK._serialized_start = 942 - _CANCELNEXUSTASK._serialized_end = 1033 + _NEXUSTASKCOMPLETION._serialized_end = 786 + _NEXUSTASK._serialized_start = 789 + _NEXUSTASK._serialized_end = 997 + _CANCELNEXUSTASK._serialized_start = 999 + _CANCELNEXUSTASK._serialized_end = 1090 # @@protoc_insertion_point(module_scope) From 25ab55b7da5e46a92aad4cd35b51dd944a1928b0 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Wed, 25 Feb 2026 15:48:42 -0800 Subject: [PATCH 09/12] fix some rebase mistakes. Update test to not use the removed http client --- tests/helpers/nexus.py | 111 --------------------- tests/nexus/test_workflow_run_operation.py | 57 ++++++----- 2 files changed, 34 insertions(+), 134 deletions(-) diff --git a/tests/helpers/nexus.py b/tests/helpers/nexus.py index 73f33f215..d3142f74a 100644 --- a/tests/helpers/nexus.py +++ b/tests/helpers/nexus.py @@ -1,114 +1,3 @@ def make_nexus_endpoint_name(task_queue: str) -> str: # Create endpoints for different task queues without name collisions. return f"nexus-endpoint-{task_queue}" - - -# TODO(nexus-preview): How do we recommend that users create endpoints in their own tests? -# See https://github.com/temporalio/sdk-typescript/pull/1708/files?show-viewed-files=true&file-filters%5B%5D=&w=0#r2082549085 -async def create_nexus_endpoint( - task_queue: str, client: Client -) -> temporalio.api.operatorservice.v1.CreateNexusEndpointResponse: - name = make_nexus_endpoint_name(task_queue) - return await client.operator_service.create_nexus_endpoint( - temporalio.api.operatorservice.v1.CreateNexusEndpointRequest( - spec=temporalio.api.nexus.v1.EndpointSpec( - name=name, - target=temporalio.api.nexus.v1.EndpointTarget( - worker=temporalio.api.nexus.v1.EndpointTarget.Worker( - namespace=client.namespace, - task_queue=task_queue, - ) - ), - ) - ) - ) - - -@dataclass -class ServiceClient: - server_address: str # E.g. http://127.0.0.1:7243 - endpoint: str - service: str - - async def start_operation( - self, - operation: str, - body: dict[str, Any] | None = None, - headers: Mapping[str, str] = {}, - ) -> httpx.Response: - """ - Start a Nexus operation. - """ - # TODO(nexus-preview): Support callback URL as query param - async with httpx.AsyncClient() as http_client: - return await http_client.post( - f"http://{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}", - json=body, - headers=headers, - ) - - async def cancel_operation( - self, - operation: str, - token: str, - headers: Mapping[str, str] = {}, - ) -> httpx.Response: - async with httpx.AsyncClient() as http_client: - return await http_client.post( - f"http://{self.server_address}/nexus/endpoints/{self.endpoint}/services/{self.service}/{operation}/cancel", - # Token can also be sent as "Nexus-Operation-Token" header - params={"token": token}, - headers=headers, - ) - - @staticmethod - def default_server_address(env: WorkflowEnvironment) -> str: - # TODO(nexus-preview): nexus tests are making http requests directly but this is - # not officially supported. - parsed = urlparse(env.client.service_client.config.target_host) - host = parsed.hostname or "127.0.0.1" - http_port = getattr(env, "_http_port", 7243) - return f"{host}:{http_port}" - - -def dataclass_as_dict(dataclass: Any) -> dict[str, Any]: - """ - Return a shallow dict of the dataclass's fields. - - dataclasses.as_dict goes too far (attempts to pickle values) - """ - return { - field.name: getattr(dataclass, field.name) - for field in dataclasses.fields(dataclass) - } - - -@dataclass -class Failure: - """A Nexus Failure object, with details parsed into an exception. - - https://github.com/nexus-rpc/api/blob/main/SPEC.md#failure - """ - - message: str = "" - metadata: dict[str, str] | None = None - details: dict[str, Any] | None = None - - exception_from_details: BaseException | None = dataclasses.field( - init=False, default=None - ) - - def __post_init__(self) -> None: - if self.metadata and (error_type := self.metadata.get("type")): - self.exception_from_details = self._instantiate_exception( - error_type, self.details - ) - - def _instantiate_exception( - self, error_type: str, _details: dict[str, Any] | None - ) -> BaseException: - proto = { - "temporal.api.failure.v1.Failure": temporalio.api.failure.v1.Failure, - }[error_type]() - json_format.ParseDict(self.details, proto, ignore_unknown_fields=True) - return FailureConverter.default.from_failure(proto, PayloadConverter.default) diff --git a/tests/nexus/test_workflow_run_operation.py b/tests/nexus/test_workflow_run_operation.py index f7b2dfe9c..c2570f937 100644 --- a/tests/nexus/test_workflow_run_operation.py +++ b/tests/nexus/test_workflow_run_operation.py @@ -14,6 +14,7 @@ from nexusrpc.handler._decorators import operation_handler from temporalio import nexus, workflow +from temporalio.client import Client from temporalio.nexus import WorkflowRunOperationContext, workflow_run_operation from temporalio.nexus._operation_handlers import WorkflowRunOperationHandler from temporalio.testing import WorkflowEnvironment @@ -58,22 +59,41 @@ def op(self) -> OperationHandler[Input, str]: return MyOperation() -@service_handler +@service +class RequestDeadlineService: + op: Operation[Input, str] + + +@service_handler(service=RequestDeadlineService) class RequestDeadlineHandler: @workflow_run_operation async def op( - self, ctx: WorkflowRunOperationContext, input: Input + self, ctx: WorkflowRunOperationContext, _input: Input ) -> nexus.WorkflowHandle[str]: assert ( ctx.request_deadline is not None ), "request_deadline should be set in workflow_run_operation" return await ctx.start_workflow( EchoWorkflow.run, - input.value, + str(ctx.request_deadline), id=str(uuid.uuid4()), ) +@workflow.defn +class RequestDeadlineWorkflow: + @workflow.run + async def run(self, input: Input, task_queue: str) -> str: + client = workflow.create_nexus_client( + service=RequestDeadlineService, + endpoint=make_nexus_endpoint_name(task_queue), + ) + return await client.execute_operation( + RequestDeadlineService.op, + input, + ) + + @service class Service: op: Operation[Input, str] @@ -130,16 +150,11 @@ async def test_workflow_run_operation( id=str(uuid.uuid4()), task_queue=task_queue, ) - if hasattr(service_handler_cls, "__expected__error__"): - status_code, message = service_handler_cls.__expected__error__ - assert resp.status_code == status_code - failure = Failure(**resp.json()) - assert re.search(message, failure.message) - else: - assert resp.status_code == 201 + assert result == "test" async def test_request_deadline_is_accessible_in_workflow_run_operation( + client: Client, env: WorkflowEnvironment, ): """Test that request_deadline is accessible in WorkflowRunOperationContext.""" @@ -147,22 +162,18 @@ async def test_request_deadline_is_accessible_in_workflow_run_operation( pytest.skip("Nexus tests don't work with time-skipping server") task_queue = str(uuid.uuid4()) - endpoint = (await create_nexus_endpoint(task_queue, env.client)).endpoint.id - assert (service_defn := nexusrpc.get_service_definition(RequestDeadlineHandler)) - service_client = ServiceClient( - server_address=ServiceClient.default_server_address(env), - endpoint=endpoint, - service=service_defn.name, - ) + endpoint_name = make_nexus_endpoint_name(task_queue) + await env.create_nexus_endpoint(endpoint_name, task_queue) async with Worker( env.client, task_queue=task_queue, nexus_service_handlers=[RequestDeadlineHandler()], + workflows=[RequestDeadlineWorkflow, EchoWorkflow], ): - resp = await service_client.start_operation( - "op", - dataclass_as_dict(Input(value="test")), - {"Request-Timeout": "30s"}, + result = await client.execute_workflow( + RequestDeadlineWorkflow.run, + args=[Input(value="test"), task_queue], + task_queue=task_queue, + id=str(uuid.uuid4()), ) - # The assertion in the handler verified request_deadline was accessible - assert resp.status_code == 201 + assert result From 9a72f1d28320bbb96a2596c60867530f638eb705 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 26 Feb 2026 12:12:00 -0800 Subject: [PATCH 10/12] Add request deadline tests for StartOperationContext and CancelOperationContext Adds tests verifying request_deadline is accessible and timezone-aware (UTC) in both StartOperationContext and CancelOperationContext. Also strengthens the existing WorkflowRunOperationContext test to validate the deadline is a proper UTC datetime rather than just truthy. Co-Authored-By: Claude Opus 4.6 --- tests/nexus/test_workflow_caller.py | 130 +++++++++++++++++++++ tests/nexus/test_workflow_run_operation.py | 4 +- 2 files changed, 133 insertions(+), 1 deletion(-) diff --git a/tests/nexus/test_workflow_caller.py b/tests/nexus/test_workflow_caller.py index 70417625d..bbbf503f9 100644 --- a/tests/nexus/test_workflow_caller.py +++ b/tests/nexus/test_workflow_caller.py @@ -7,6 +7,7 @@ import uuid from collections.abc import Awaitable, Callable from dataclasses import dataclass +from datetime import datetime, timezone from enum import IntEnum from typing import Any from urllib.request import urlopen @@ -173,6 +174,11 @@ class HeaderTestService: cancellable_operation: nexusrpc.Operation[None, str] +@nexusrpc.service +class RequestDeadlineService: + cancellable_op: nexusrpc.Operation[None, str] + + # ----------------------------------------------------------------------------- # Service implementation # @@ -335,6 +341,40 @@ def cancellable_operation(self) -> OperationHandler[None, str]: return CancellableOperationHandler(self.cancel_headers_received) +class CancellableDeadlineOperationHandler(OperationHandler[None, str]): + """Operation handler that captures request_deadline from start and cancel contexts.""" + + def __init__( + self, + start_deadlines_received: list[datetime | None], + cancel_deadlines_received: list[datetime | None], + ) -> None: + self._start_deadlines_received = start_deadlines_received + self._cancel_deadlines_received = cancel_deadlines_received + + async def start( + self, ctx: StartOperationContext, input: None + ) -> StartOperationResultAsync: + self._start_deadlines_received.append(ctx.request_deadline) + return StartOperationResultAsync("test-token") + + async def cancel(self, ctx: CancelOperationContext, token: str) -> None: + self._cancel_deadlines_received.append(ctx.request_deadline) + + +@service_handler(service=RequestDeadlineService) +class RequestDeadlineServiceImpl: + def __init__(self) -> None: + self.start_deadlines_received: list[datetime | None] = [] + self.cancel_deadlines_received: list[datetime | None] = [] + + @operation_handler + def cancellable_op(self) -> OperationHandler[None, str]: + return CancellableDeadlineOperationHandler( + self.start_deadlines_received, self.cancel_deadlines_received + ) + + # ----------------------------------------------------------------------------- # Caller workflow # @@ -570,6 +610,26 @@ async def run(self, input: CancelHeaderTestCallerWfInput) -> None: await asyncio.sleep(0.1) +@workflow.defn +class CancelDeadlineCallerWorkflow: + """Workflow that starts a cancellable operation and then cancels it, for deadline testing.""" + + @workflow.run + async def run(self, task_queue: str) -> None: + nexus_client = workflow.create_nexus_client( + service=RequestDeadlineService, + endpoint=make_nexus_endpoint_name(task_queue), + ) + op_handle = await nexus_client.start_operation( + RequestDeadlineService.cancellable_op, + None, + ) + # Request cancellation - this sends a cancel operation to the handler + op_handle.cancel() + # Wait briefly to allow cancel request to be processed + await asyncio.sleep(0.1) + + @workflow.defn class WorkflowRunHeaderTestCallerWorkflow: """Workflow that calls a workflow_run_operation and verifies headers.""" @@ -2172,3 +2232,73 @@ async def test_task_executor_operation_cancel_method( # Verify the workflow completed successfully result = await caller_wf_handle.result() assert result == "cancelled_successfully" + + +async def test_request_deadline_is_accessible_in_start_operation( + client: Client, + env: WorkflowEnvironment, +): + """Test that request_deadline is accessible in StartOperationContext.""" + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + service_handler = RequestDeadlineServiceImpl() + + async with Worker( + client, + nexus_service_handlers=[service_handler], + workflows=[CancelDeadlineCallerWorkflow], + task_queue=task_queue, + ): + endpoint_name = make_nexus_endpoint_name(task_queue) + await env.create_nexus_endpoint(endpoint_name, task_queue) + + await client.execute_workflow( + CancelDeadlineCallerWorkflow.run, + task_queue, + id=str(uuid.uuid4()), + task_queue=task_queue, + ) + + assert len(service_handler.start_deadlines_received) == 1 + deadline = service_handler.start_deadlines_received[0] + assert ( + deadline is not None + ), "request_deadline should be set in StartOperationContext" + assert deadline.tzinfo is timezone.utc, "request_deadline should be in utc" + + +async def test_request_deadline_is_accessible_in_cancel_operation( + client: Client, + env: WorkflowEnvironment, +): + """Test that request_deadline is accessible in CancelOperationContext.""" + if env.supports_time_skipping: + pytest.skip("Nexus tests don't work with time-skipping server") + + task_queue = str(uuid.uuid4()) + service_handler = RequestDeadlineServiceImpl() + + async with Worker( + client, + nexus_service_handlers=[service_handler], + workflows=[CancelDeadlineCallerWorkflow], + task_queue=task_queue, + ): + endpoint_name = make_nexus_endpoint_name(task_queue) + await env.create_nexus_endpoint(endpoint_name, task_queue) + + await client.execute_workflow( + CancelDeadlineCallerWorkflow.run, + task_queue, + id=str(uuid.uuid4()), + task_queue=task_queue, + ) + + assert len(service_handler.cancel_deadlines_received) == 1 + deadline = service_handler.cancel_deadlines_received[0] + assert ( + deadline is not None + ), "request_deadline should be set in CancelOperationContext" + assert deadline.tzinfo is timezone.utc, "request_deadline should be in utc" diff --git a/tests/nexus/test_workflow_run_operation.py b/tests/nexus/test_workflow_run_operation.py index c2570f937..a00b6ed40 100644 --- a/tests/nexus/test_workflow_run_operation.py +++ b/tests/nexus/test_workflow_run_operation.py @@ -1,5 +1,6 @@ import uuid from dataclasses import dataclass +from datetime import datetime, timezone from typing import Any import nexusrpc @@ -176,4 +177,5 @@ async def test_request_deadline_is_accessible_in_workflow_run_operation( task_queue=task_queue, id=str(uuid.uuid4()), ) - assert result + parsed = datetime.fromisoformat(result) + assert parsed.tzinfo is timezone.utc, "request_deadline should be in utc" From 885e7938d589c13b951585095a442a5a161225d1 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 26 Feb 2026 12:27:09 -0800 Subject: [PATCH 11/12] Move request_deadline assertion from handler to test body in workflow_run_operation test Captures the deadline into a list on the handler instance and asserts in the test body, consistent with the start/cancel operation deadline tests. This gives clear pytest failure messages instead of opaque handler errors. Co-Authored-By: Claude Opus 4.6 --- tests/nexus/test_workflow_run_operation.py | 25 ++++++++++++++-------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/tests/nexus/test_workflow_run_operation.py b/tests/nexus/test_workflow_run_operation.py index a00b6ed40..489353165 100644 --- a/tests/nexus/test_workflow_run_operation.py +++ b/tests/nexus/test_workflow_run_operation.py @@ -67,16 +67,17 @@ class RequestDeadlineService: @service_handler(service=RequestDeadlineService) class RequestDeadlineHandler: + def __init__(self) -> None: + self.start_deadlines_received: list[datetime | None] = [] + @workflow_run_operation async def op( - self, ctx: WorkflowRunOperationContext, _input: Input + self, ctx: WorkflowRunOperationContext, input: Input ) -> nexus.WorkflowHandle[str]: - assert ( - ctx.request_deadline is not None - ), "request_deadline should be set in workflow_run_operation" + self.start_deadlines_received.append(ctx.request_deadline) return await ctx.start_workflow( EchoWorkflow.run, - str(ctx.request_deadline), + input.value, id=str(uuid.uuid4()), ) @@ -165,17 +166,23 @@ async def test_request_deadline_is_accessible_in_workflow_run_operation( task_queue = str(uuid.uuid4()) endpoint_name = make_nexus_endpoint_name(task_queue) await env.create_nexus_endpoint(endpoint_name, task_queue) + service_handler = RequestDeadlineHandler() async with Worker( env.client, task_queue=task_queue, - nexus_service_handlers=[RequestDeadlineHandler()], + nexus_service_handlers=[service_handler], workflows=[RequestDeadlineWorkflow, EchoWorkflow], ): - result = await client.execute_workflow( + await client.execute_workflow( RequestDeadlineWorkflow.run, args=[Input(value="test"), task_queue], task_queue=task_queue, id=str(uuid.uuid4()), ) - parsed = datetime.fromisoformat(result) - assert parsed.tzinfo is timezone.utc, "request_deadline should be in utc" + + assert len(service_handler.start_deadlines_received) == 1 + deadline = service_handler.start_deadlines_received[0] + assert ( + deadline is not None + ), "request_deadline should be set in WorkflowRunOperationContext" + assert deadline.tzinfo is timezone.utc, "request_deadline should be in utc" From 1214268d9f57148bd60ca41065fc9cd034f2a7e8 Mon Sep 17 00:00:00 2001 From: Alex Mazzeo Date: Thu, 26 Feb 2026 13:46:58 -0800 Subject: [PATCH 12/12] Fold start and cancel tests into a single test. Replace sleep with an asyncio.Event to avoid potential flakes --- tests/nexus/test_workflow_caller.py | 46 ++++++++++------------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/tests/nexus/test_workflow_caller.py b/tests/nexus/test_workflow_caller.py index bbbf503f9..681659c79 100644 --- a/tests/nexus/test_workflow_caller.py +++ b/tests/nexus/test_workflow_caller.py @@ -348,9 +348,11 @@ def __init__( self, start_deadlines_received: list[datetime | None], cancel_deadlines_received: list[datetime | None], + cancel_received: asyncio.Event, ) -> None: self._start_deadlines_received = start_deadlines_received self._cancel_deadlines_received = cancel_deadlines_received + self._cancel_received = cancel_received async def start( self, ctx: StartOperationContext, input: None @@ -360,6 +362,7 @@ async def start( async def cancel(self, ctx: CancelOperationContext, token: str) -> None: self._cancel_deadlines_received.append(ctx.request_deadline) + self._cancel_received.set() @service_handler(service=RequestDeadlineService) @@ -367,11 +370,14 @@ class RequestDeadlineServiceImpl: def __init__(self) -> None: self.start_deadlines_received: list[datetime | None] = [] self.cancel_deadlines_received: list[datetime | None] = [] + self.cancel_received = asyncio.Event() @operation_handler def cancellable_op(self) -> OperationHandler[None, str]: return CancellableDeadlineOperationHandler( - self.start_deadlines_received, self.cancel_deadlines_received + self.start_deadlines_received, + self.cancel_deadlines_received, + self.cancel_received, ) @@ -623,11 +629,15 @@ async def run(self, task_queue: str) -> None: op_handle = await nexus_client.start_operation( RequestDeadlineService.cancellable_op, None, + cancellation_type=workflow.NexusOperationCancellationType.WAIT_REQUESTED, ) # Request cancellation - this sends a cancel operation to the handler op_handle.cancel() - # Wait briefly to allow cancel request to be processed - await asyncio.sleep(0.1) + + try: + await op_handle + except NexusOperationError: + pass @workflow.defn @@ -2234,7 +2244,7 @@ async def test_task_executor_operation_cancel_method( assert result == "cancelled_successfully" -async def test_request_deadline_is_accessible_in_start_operation( +async def test_request_deadline_is_accessible_in_operation( client: Client, env: WorkflowEnvironment, ): @@ -2268,33 +2278,7 @@ async def test_request_deadline_is_accessible_in_start_operation( ), "request_deadline should be set in StartOperationContext" assert deadline.tzinfo is timezone.utc, "request_deadline should be in utc" - -async def test_request_deadline_is_accessible_in_cancel_operation( - client: Client, - env: WorkflowEnvironment, -): - """Test that request_deadline is accessible in CancelOperationContext.""" - if env.supports_time_skipping: - pytest.skip("Nexus tests don't work with time-skipping server") - - task_queue = str(uuid.uuid4()) - service_handler = RequestDeadlineServiceImpl() - - async with Worker( - client, - nexus_service_handlers=[service_handler], - workflows=[CancelDeadlineCallerWorkflow], - task_queue=task_queue, - ): - endpoint_name = make_nexus_endpoint_name(task_queue) - await env.create_nexus_endpoint(endpoint_name, task_queue) - - await client.execute_workflow( - CancelDeadlineCallerWorkflow.run, - task_queue, - id=str(uuid.uuid4()), - task_queue=task_queue, - ) + await asyncio.wait_for(service_handler.cancel_received.wait(), 1) assert len(service_handler.cancel_deadlines_received) == 1 deadline = service_handler.cancel_deadlines_received[0]