-
Notifications
You must be signed in to change notification settings - Fork 605
interrupts - graph - agent based #1533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -622,6 +622,13 @@ def _activate_interrupt(self, node: GraphNode, interrupts: list[Interrupt]) -> M | |
|
|
||
| self._interrupt_state.interrupts.update({interrupt.id: interrupt for interrupt in interrupts}) | ||
| self._interrupt_state.activate() | ||
| if isinstance(node.executor, Agent): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be AgentBase now?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not yet as
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gotcha, is anything blocking that @mkmeral ? |
||
| self._interrupt_state.context[node.node_id] = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain how interrupts work in graph at a higher level? I understand how this works for an interrupt that is raised in one Agent node by that Agent node. But what happens if two nodes are executing and one of them raises an interrupt?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If two nodes are executing in parallel and one interrupts, the other node will be allowed finished. Once done, the call stack returns back to Upon resuming, we unpack the interrupted node and the one already completed from interrupt state (src). Within For the interrupted node, we pass the user interrupt responses into From here, things proceed as normal.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For additional context, we explicitly test parallel node interrupts in the integ tests presented further down.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok that makes sense, but I do think this will make #1530 more important.I'm sure in a scenario like If C interrupts people will be similarly surprised that D still executed
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. D would not execute in this case. So A would be the entry point and execute first. On the next cycle, B and C would be picked up as the ready nodes to execute. They would do so concurrently. If B completes and C interrupts then we pause here and wait for the user to respond. Once the user responds, we finish executing C (and only C). Afterwards, we find the next batch of ready nodes. It is at this time that D executes.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anything that prevents us from switching this to snapshots later? |
||
| "activated": node.executor._interrupt_state.activated, | ||
| "interrupt_state": node.executor._interrupt_state.to_dict(), | ||
| "state": node.executor.state.get(), | ||
| "messages": node.executor.messages, | ||
| } | ||
|
|
||
| return MultiAgentNodeInterruptEvent(node.node_id, interrupts) | ||
|
|
||
|
|
@@ -920,16 +927,6 @@ async def _execute_node(self, node: GraphNode, invocation_state: dict[str, Any]) | |
| if agent_response is None: | ||
| raise ValueError(f"Node '{node.node_id}' did not produce a result event") | ||
|
|
||
| if agent_response.stop_reason == "interrupt": | ||
| node.executor.messages.pop() # remove interrupted tool use message | ||
| node.executor._interrupt_state.deactivate() | ||
|
|
||
| raise NotImplementedError( | ||
| f"node_id=<{node.node_id}>, " | ||
| "issue=<https://github.com/strands-agents/sdk-python/issues/204> " | ||
| "| user raised interrupt from an agent node" | ||
| ) | ||
|
|
||
| # Extract metrics with defaults | ||
| response_metrics = getattr(agent_response, "metrics", None) | ||
| usage = getattr( | ||
|
|
@@ -940,18 +937,24 @@ async def _execute_node(self, node: GraphNode, invocation_state: dict[str, Any]) | |
| node_result = NodeResult( | ||
| result=agent_response, | ||
| execution_time=round((time.time() - start_time) * 1000), | ||
| status=Status.COMPLETED, | ||
| status=Status.INTERRUPTED if agent_response.stop_reason == "interrupt" else Status.COMPLETED, | ||
| accumulated_usage=usage, | ||
| accumulated_metrics=metrics, | ||
| execution_count=1, | ||
| interrupts=agent_response.interrupts or [], | ||
| ) | ||
| else: | ||
| raise ValueError(f"Node '{node.node_id}' of type '{type(node.executor)}' is not supported") | ||
|
|
||
| # Mark as completed | ||
| node.execution_status = Status.COMPLETED | ||
| node.result = node_result | ||
| node.execution_time = node_result.execution_time | ||
|
|
||
| if node_result.status == Status.INTERRUPTED: | ||
| yield self._activate_interrupt(node, node_result.interrupts) | ||
| return | ||
|
|
||
| # Mark as completed | ||
| node.execution_status = Status.COMPLETED | ||
| self.state.completed_nodes.add(node) | ||
| self.state.results[node.node_id] = node_result | ||
| self.state.execution_order.append(node) | ||
|
|
@@ -1018,6 +1021,8 @@ def _accumulate_metrics(self, node_result: NodeResult) -> None: | |
| def _build_node_input(self, node: GraphNode) -> list[ContentBlock]: | ||
| """Build input text for a node based on dependency outputs. | ||
|
|
||
| If resuming from an interrupt, return user responses. | ||
|
|
||
| Example formatted output: | ||
| ``` | ||
| Original Task: Analyze the quarterly sales data and create a summary report | ||
|
|
@@ -1032,6 +1037,21 @@ def _build_node_input(self, node: GraphNode) -> list[ContentBlock]: | |
| - Agent: Data validation complete. All records verified, no anomalies detected. | ||
| ``` | ||
| """ | ||
| if self._interrupt_state.activated: | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is where we restore the agent node state upon resuming. We extract it from the graph interrupt state. |
||
| context = self._interrupt_state.context | ||
| if node.node_id in context and context[node.node_id]["activated"]: | ||
| agent_context = context[node.node_id] | ||
| agent = cast(Agent, node.executor) | ||
| agent.messages = agent_context["messages"] | ||
| agent.state = AgentState(agent_context["state"]) | ||
| agent._interrupt_state = _InterruptState.from_dict(agent_context["interrupt_state"]) | ||
|
|
||
| responses = context["responses"] | ||
| interrupts = agent._interrupt_state.interrupts | ||
| return [ | ||
| response for response in responses if response["interruptResponse"]["interruptId"] in interrupts | ||
| ] | ||
|
|
||
| # Get satisfied dependencies | ||
| dependency_results = {} | ||
| for edge in self.edges: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agents are not allowed to use session managers in a graph execution. Consequently, we need to store some agent state in the graph interrupt state to help persist the interrupt workflow between shutdowns. Note, this is the same behavior we have in Swarm (src).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We check if executor is an Agent instance right now because MultiAgentBase executors may not have the same context. We will figure out how to handle that case in a separate (and final) PR for graph interrupt support.