From 953a2a0f5e9ab83e24e703c699cc475d516f8b4a Mon Sep 17 00:00:00 2001 From: Development User Date: Fri, 27 Feb 2026 20:52:41 +0000 Subject: [PATCH 1/2] Fix start_activity() dropping priority parameter The priority parameter was accepted in the function signature but not forwarded to workflow_start_activity(), unlike execute_activity() and start_activity_class() which correctly pass it. Co-Authored-By: Claude Opus 4.6 (1M context) --- temporalio/workflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/temporalio/workflow.py b/temporalio/workflow.py index 96c105493..45a8512eb 100644 --- a/temporalio/workflow.py +++ b/temporalio/workflow.py @@ -2087,6 +2087,7 @@ def start_activity( activity_id=activity_id, versioning_intent=versioning_intent, summary=summary, + priority=priority, ) From 3392890ae667afd361adbfa381d1530b9048c5ff Mon Sep 17 00:00:00 2001 From: Development User Date: Fri, 27 Feb 2026 21:08:48 +0000 Subject: [PATCH 2/2] Add E2E interceptor test for start_activity() priority forwarding Uses the established interceptor pattern: a workflow calls start_activity() with priority=Priority(priority_key=3), a WorkflowOutboundInterceptor captures the StartActivityInput, and the test asserts the captured input has the correct priority. Fails without the fix since start_activity() was silently dropping the priority param. Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/worker/test_interceptor.py | 64 ++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/worker/test_interceptor.py b/tests/worker/test_interceptor.py index 1392cd350..9dddcefaf 100644 --- a/tests/worker/test_interceptor.py +++ b/tests/worker/test_interceptor.py @@ -7,6 +7,7 @@ from temporalio import activity, workflow from temporalio.client import Client, WorkflowUpdateFailedError +from temporalio.common import Priority from temporalio.exceptions import ApplicationError from temporalio.testing import WorkflowEnvironment from temporalio.worker import ( @@ -324,3 +325,66 @@ async def test_workflow_instance_access_from_interceptor(client: Client): task_queue=task_queue, ) assert difference == 0 + + +@activity.defn +async def priority_activity() -> str: + return "done" + + +captured_start_activity_inputs: List[StartActivityInput] = [] + + +class PriorityCapturingInterceptor(Interceptor): + def workflow_interceptor_class( + self, input: WorkflowInterceptorClassInput + ) -> Optional[Type[WorkflowInboundInterceptor]]: + return PriorityCapturingInboundInterceptor + + +class PriorityCapturingInboundInterceptor(WorkflowInboundInterceptor): + def init(self, outbound: WorkflowOutboundInterceptor) -> None: + super().init(PriorityCapturingOutboundInterceptor(outbound)) + + async def execute_workflow(self, input: ExecuteWorkflowInput) -> Any: + return await super().execute_workflow(input) + + +class PriorityCapturingOutboundInterceptor(WorkflowOutboundInterceptor): + def start_activity(self, input: StartActivityInput) -> workflow.ActivityHandle: + captured_start_activity_inputs.append(input) + return super().start_activity(input) + + +@workflow.defn +class StartActivityPriorityWorkflow: + @workflow.run + async def run(self) -> str: + # Use start_activity (not execute_activity) to test that path + handle = workflow.start_activity( + priority_activity, + start_to_close_timeout=timedelta(seconds=5), + priority=Priority(priority_key=3), + ) + return await handle + + +async def test_start_activity_forwards_priority(client: Client): + captured_start_activity_inputs.clear() + task_queue = f"task_queue_{uuid.uuid4()}" + async with Worker( + client, + task_queue=task_queue, + workflows=[StartActivityPriorityWorkflow], + activities=[priority_activity], + interceptors=[PriorityCapturingInterceptor()], + ): + result = await client.execute_workflow( + StartActivityPriorityWorkflow.run, + id=f"workflow_{uuid.uuid4()}", + task_queue=task_queue, + ) + assert result == "done" + + assert len(captured_start_activity_inputs) == 1 + assert captured_start_activity_inputs[0].priority == Priority(priority_key=3)