-
Notifications
You must be signed in to change notification settings - Fork 251
fix(#1738): prevent sub-agent streaming messages from being persisted to parent session #1739
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
aheritier
wants to merge
1
commit into
docker:main
Choose a base branch
from
aheritier:fix/1738-subsession-streaming-persistence
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+163
−0
Draft
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,142 @@ | ||
| package runtime | ||
|
|
||
| import ( | ||
| "context" | ||
| "sync" | ||
| "testing" | ||
|
|
||
| "github.com/stretchr/testify/assert" | ||
| "github.com/stretchr/testify/require" | ||
|
|
||
| "github.com/docker/cagent/pkg/agent" | ||
| "github.com/docker/cagent/pkg/chat" | ||
| "github.com/docker/cagent/pkg/model/provider/base" | ||
| "github.com/docker/cagent/pkg/session" | ||
| "github.com/docker/cagent/pkg/team" | ||
| "github.com/docker/cagent/pkg/tools" | ||
| "github.com/docker/cagent/pkg/tools/builtin" | ||
| ) | ||
|
|
||
| // multiStreamProvider returns different streams on consecutive calls. | ||
| type multiStreamProvider struct { | ||
| id string | ||
| mu sync.Mutex | ||
| streams []chat.MessageStream | ||
| idx int | ||
| } | ||
|
|
||
| func (m *multiStreamProvider) ID() string { return m.id } | ||
|
|
||
| func (m *multiStreamProvider) CreateChatCompletionStream(_ context.Context, _ []chat.Message, _ []tools.Tool) (chat.MessageStream, error) { | ||
| m.mu.Lock() | ||
| defer m.mu.Unlock() | ||
| if m.idx >= len(m.streams) { | ||
| return m.streams[len(m.streams)-1], nil | ||
| } | ||
| s := m.streams[m.idx] | ||
| m.idx++ | ||
| return s, nil | ||
| } | ||
|
|
||
| func (m *multiStreamProvider) BaseConfig() base.Config { return base.Config{} } | ||
|
|
||
| func (m *multiStreamProvider) MaxTokens() int { return 0 } | ||
|
|
||
| func TestPersistentRuntime_SubAgentMessagesNotPersistedToParent(t *testing.T) { | ||
| // Stream 1 (root): produces a transfer_task tool call to "worker" | ||
| rootStream := newStreamBuilder(). | ||
| AddToolCallName("call_transfer", "transfer_task"). | ||
| AddToolCallArguments("call_transfer", `{"agent":"worker","task":"do work","expected_output":"result"}`). | ||
| AddStopWithUsage(10, 5). | ||
| Build() | ||
|
|
||
| // Stream 2 (worker sub-agent): produces streaming content simulating work | ||
| workerStream := newStreamBuilder(). | ||
| AddContent("I am doing "). | ||
| AddContent("the work now."). | ||
| AddStopWithUsage(5, 10). | ||
| Build() | ||
|
|
||
| prov := &multiStreamProvider{ | ||
| id: "test/mock-model", | ||
| streams: []chat.MessageStream{rootStream, workerStream}, | ||
| } | ||
|
|
||
| worker := agent.New("worker", "Worker agent", agent.WithModel(prov)) | ||
| root := agent.New("root", "Root coordinator", | ||
| agent.WithModel(prov), | ||
| agent.WithToolSets(builtin.NewTransferTaskTool()), | ||
| ) | ||
| agent.WithSubAgents(worker)(root) | ||
|
|
||
| tm := team.New(team.WithAgents(root, worker)) | ||
|
|
||
| store := session.NewInMemorySessionStore() | ||
|
|
||
| rt, err := New(tm, | ||
| WithSessionCompaction(false), | ||
| WithModelStore(mockModelStore{}), | ||
| WithSessionStore(store), | ||
| ) | ||
| require.NoError(t, err) | ||
|
|
||
| sess := session.New( | ||
| session.WithUserMessage("Please delegate work to the worker"), | ||
| session.WithToolsApproved(true), | ||
| ) | ||
| sess.Title = "Test Transfer Persistence" | ||
|
|
||
| err = store.AddSession(t.Context(), sess) | ||
| require.NoError(t, err) | ||
|
|
||
| evCh := rt.RunStream(t.Context(), sess) | ||
| for range evCh { | ||
| } | ||
|
|
||
| parentSess, err := store.GetSession(t.Context(), sess.ID) | ||
| require.NoError(t, err) | ||
|
|
||
| // Verify no sub-agent messages leaked into the parent session | ||
| for _, item := range parentSess.Messages { | ||
| if !item.IsMessage() { | ||
| continue | ||
| } | ||
| assert.NotEqual(t, "worker", item.Message.AgentName, | ||
| "Sub-agent 'worker' messages should not be in the parent session. "+ | ||
| "Found message with role=%s content=%q", | ||
| item.Message.Message.Role, item.Message.Message.Content) | ||
| } | ||
|
|
||
| // Verify the sub-session was persisted and contains the worker's messages | ||
| var subSess *session.Session | ||
| for _, item := range parentSess.Messages { | ||
| if item.IsSubSession() { | ||
| subSess = item.SubSession | ||
| break | ||
| } | ||
| } | ||
| require.NotNil(t, subSess, | ||
| "Sub-session should be persisted in the parent session") | ||
|
|
||
| var workerMsgCount int | ||
| for _, item := range subSess.Messages { | ||
| if item.IsMessage() && item.Message.AgentName == "worker" { | ||
| workerMsgCount++ | ||
| } | ||
| } | ||
| assert.Positive(t, workerMsgCount, | ||
| "Worker messages should be in the sub-session") | ||
|
|
||
| // Verify the root agent's assistant message (with transfer_task tool call) | ||
| // and the tool result are both persisted in the parent | ||
| var roles []chat.MessageRole | ||
| for _, item := range parentSess.Messages { | ||
| if item.IsMessage() { | ||
| roles = append(roles, item.Message.Message.Role) | ||
| } | ||
| } | ||
| assert.Contains(t, roles, chat.MessageRoleAssistant, | ||
| "Parent session should contain root's assistant message with the transfer_task tool call") | ||
| assert.Contains(t, roles, chat.MessageRoleTool, | ||
| "Parent session should contain the tool result for transfer_task") | ||
| } |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The depth counter could theoretically become negative if AgentSwitchingEvent(false) is received without a matching AgentSwitchingEvent(true), although this shouldn't happen in normal operation. Consider adding a defensive check to prevent negative values:
This would make the code more resilient to unexpected event sequences.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partially agreed. A negative depth can't cause functional harm (all guards check
> 0), but silently clamping would mask a real bug in the event system — an unmatchedAgentSwitching(false)would meanhandleTaskTransfer's event pairing is broken, which we'd want to know about.Applied the guard but with a
slog.Warninstead of silently ignoring:This gives us the defensive behavior while surfacing the anomaly if it ever occurs.