Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 0 additions & 116 deletions pkg/model/provider/anthropic/beta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ func (c *Client) createBetaStream(
slog.Error("Failed to convert messages for Anthropic Beta request", "error", err)
return nil, err
}
if err := validateAnthropicSequencingBeta(converted); err != nil {
slog.Warn("Invalid message sequencing for Anthropic Beta API detected, attempting self-repair", "error", err)
converted = repairAnthropicSequencingBeta(converted)
if err2 := validateAnthropicSequencingBeta(converted); err2 != nil {
slog.Error("Failed to self-repair Anthropic Beta sequencing", "error", err2)
return nil, err
}
}

sys := extractBetaSystemBlocks(messages)

Expand Down Expand Up @@ -146,114 +138,6 @@ func (c *Client) createBetaStream(
return ad, nil
}

// validateAnthropicSequencingBeta performs the same validation as standard API but for Beta payloads
func validateAnthropicSequencingBeta(msgs []anthropic.BetaMessageParam) error {
for i := range msgs {
m, ok := marshalToMapBeta(msgs[i])
if !ok || m["role"] != "assistant" {
continue
}

toolUseIDs := collectToolUseIDs(contentArrayBeta(m))
if len(toolUseIDs) == 0 {
continue
}

if i+1 >= len(msgs) {
slog.Warn("Anthropic (beta) sequencing invalid: assistant tool_use present but no next user tool_result message", "assistant_index", i)
return errors.New("assistant tool_use present but no subsequent user message with tool_result blocks (beta)")
}

next, ok := marshalToMapBeta(msgs[i+1])
if !ok || next["role"] != "user" {
slog.Warn("Anthropic (beta) sequencing invalid: next message after assistant tool_use is not user", "assistant_index", i, "next_role", next["role"])
return errors.New("assistant tool_use must be followed by a user message containing corresponding tool_result blocks (beta)")
}

toolResultIDs := collectToolResultIDs(contentArrayBeta(next))
missing := differenceIDs(toolUseIDs, toolResultIDs)
if len(missing) > 0 {
slog.Warn("Anthropic (beta) sequencing invalid: missing tool_result for tool_use id in next user message", "assistant_index", i, "tool_use_id", missing[0], "missing_count", len(missing))
return fmt.Errorf("missing tool_result for tool_use id %s in the next user message (beta)", missing[0])
}
}
return nil
}

// repairAnthropicSequencingBeta inserts a synthetic user message with tool_result blocks
// for any assistant tool_use blocks that don't have corresponding tool_result blocks
// in the immediate next user message.
func repairAnthropicSequencingBeta(msgs []anthropic.BetaMessageParam) []anthropic.BetaMessageParam {
if len(msgs) == 0 {
return msgs
}
repaired := make([]anthropic.BetaMessageParam, 0, len(msgs)+2)
for i := range msgs {
m, ok := marshalToMapBeta(msgs[i])
if !ok || m["role"] != "assistant" {
repaired = append(repaired, msgs[i])
continue
}

toolUseIDs := collectToolUseIDs(contentArrayBeta(m))
if len(toolUseIDs) == 0 {
repaired = append(repaired, msgs[i])
continue
}

// Check if the next message is a user message with tool_results
needsSyntheticMessage := true
if i+1 < len(msgs) {
if next, ok := marshalToMapBeta(msgs[i+1]); ok && next["role"] == "user" {
toolResultIDs := collectToolResultIDs(contentArrayBeta(next))
// Remove tool_use IDs that have corresponding tool_results
for id := range toolResultIDs {
delete(toolUseIDs, id)
}
// If all tool_use IDs have results, no synthetic message needed
if len(toolUseIDs) == 0 {
needsSyntheticMessage = false
}
}
}

// Append the assistant message first
repaired = append(repaired, msgs[i])

// If there are missing tool_results, insert a synthetic user message immediately after
if needsSyntheticMessage && len(toolUseIDs) > 0 {
slog.Debug("Inserting synthetic user message for missing tool_results",
"assistant_index", i,
"missing_count", len(toolUseIDs))

blocks := make([]anthropic.BetaContentBlockParamUnion, 0, len(toolUseIDs))
for id := range toolUseIDs {
slog.Debug("Creating synthetic tool_result", "tool_use_id", id)
blocks = append(blocks, anthropic.BetaContentBlockParamUnion{
OfToolResult: &anthropic.BetaToolResultBlockParam{
ToolUseID: id,
Content: []anthropic.BetaToolResultBlockParamContentUnion{
{OfText: &anthropic.BetaTextBlockParam{Text: "(tool execution failed)"}},
},
},
})
}
repaired = append(repaired, anthropic.BetaMessageParam{
Role: anthropic.BetaMessageParamRoleUser,
Content: blocks,
})
}
}
return repaired
}

// marshalToMapBeta is an alias for marshalToMap - shared with standard API.
// Kept as separate function for clarity in Beta-specific code paths.
var marshalToMapBeta = marshalToMap

// contentArrayBeta is an alias for contentArray - shared with standard API.
var contentArrayBeta = contentArray

// countAnthropicTokensBeta calls Anthropic's Count Tokens API for the provided Beta API payload
// and returns the number of input tokens.
func countAnthropicTokensBeta(
Expand Down
73 changes: 48 additions & 25 deletions pkg/model/provider/anthropic/beta_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
// blocks from the same assistant message MUST be grouped into a single user message.
func (c *Client) convertBetaMessages(ctx context.Context, messages []chat.Message) ([]anthropic.BetaMessageParam, error) {
var betaMessages []anthropic.BetaMessageParam
var pendingToolUseIDs map[string]struct{}

for i := 0; i < len(messages); i++ {
msg := &messages[i]
Expand Down Expand Up @@ -75,11 +76,15 @@ func (c *Client) convertBetaMessages(ctx context.Context, messages []chat.Messag

// Add tool calls
if len(msg.ToolCalls) > 0 {
pendingToolUseIDs = make(map[string]struct{}, len(msg.ToolCalls))
for _, toolCall := range msg.ToolCalls {
var inpts map[string]any
if err := json.Unmarshal([]byte(toolCall.Function.Arguments), &inpts); err != nil {
inpts = map[string]any{}
}
if toolCall.ID != "" {
pendingToolUseIDs[toolCall.ID] = struct{}{}
}
contentBlocks = append(contentBlocks, anthropic.BetaContentBlockParamUnion{
OfToolUse: &anthropic.BetaToolUseBlockParam{
ID: toolCall.ID,
Expand All @@ -88,6 +93,8 @@ func (c *Client) convertBetaMessages(ctx context.Context, messages []chat.Messag
},
})
}
} else {
pendingToolUseIDs = nil
}

if len(contentBlocks) > 0 {
Expand All @@ -102,38 +109,54 @@ func (c *Client) convertBetaMessages(ctx context.Context, messages []chat.Messag
// Collect consecutive tool messages and merge them into a single user message
// This is required by Anthropic API: all tool_result blocks for tool_use blocks
// from the same assistant message must be in the same user message
toolResultBlocks := []anthropic.BetaContentBlockParamUnion{
{
OfToolResult: &anthropic.BetaToolResultBlockParam{
ToolUseID: msg.ToolCallID,
Content: []anthropic.BetaToolResultBlockParamContentUnion{
{OfText: &anthropic.BetaTextBlockParam{Text: strings.TrimSpace(msg.Content)}},
},
},
},
if pendingToolUseIDs == nil {
// Orphan tool results (no preceding assistant tool_use in this window): drop them.
j := i
for j < len(messages) && messages[j].Role == chat.MessageRoleTool {
j++
}
i = j - 1
continue
}

// Look ahead for consecutive tool messages and merge them
j := i + 1
toolResultBlocks := make([]anthropic.BetaContentBlockParamUnion, 0)
hadToolMessages := false
j := i
for j < len(messages) && messages[j].Role == chat.MessageRoleTool {
toolResultBlocks = append(toolResultBlocks, anthropic.BetaContentBlockParamUnion{
OfToolResult: &anthropic.BetaToolResultBlockParam{
ToolUseID: messages[j].ToolCallID,
Content: []anthropic.BetaToolResultBlockParamContentUnion{
{OfText: &anthropic.BetaTextBlockParam{Text: strings.TrimSpace(messages[j].Content)}},
},
},
})
hadToolMessages = true
id := messages[j].ToolCallID
if id != "" {
if _, ok := pendingToolUseIDs[id]; ok {
toolResultBlocks = append(toolResultBlocks, anthropic.BetaContentBlockParamUnion{
OfToolResult: &anthropic.BetaToolResultBlockParam{
ToolUseID: id,
Content: []anthropic.BetaToolResultBlockParamContentUnion{
{OfText: &anthropic.BetaTextBlockParam{Text: strings.TrimSpace(messages[j].Content)}},
},
},
})
delete(pendingToolUseIDs, id)
}
}
j++
}

// Add the merged user message with all tool results
betaMessages = append(betaMessages, anthropic.BetaMessageParam{
Role: anthropic.BetaMessageParamRoleUser,
Content: toolResultBlocks,
})
if hadToolMessages && len(toolResultBlocks) == 0 {
return nil, fmt.Errorf("tool_result messages present but none match pending tool_use ids (beta converter)")
}
if len(pendingToolUseIDs) > 0 {
for id := range pendingToolUseIDs {
return nil, fmt.Errorf("missing tool_result for tool_use id %s (beta converter)", id)
}
}

// Skip the messages we've already processed
if len(toolResultBlocks) > 0 {
betaMessages = append(betaMessages, anthropic.BetaMessageParam{
Role: anthropic.BetaMessageParamRoleUser,
Content: toolResultBlocks,
})
}
pendingToolUseIDs = nil
i = j - 1
continue
}
Expand Down
61 changes: 41 additions & 20 deletions pkg/model/provider/anthropic/beta_converter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package anthropic

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -10,6 +11,39 @@ import (
"github.com/docker/cagent/pkg/tools"
)

func marshalToMapBeta(t *testing.T, v any) map[string]any {
t.Helper()
b, err := json.Marshal(v)
require.NoError(t, err)
var m map[string]any
require.NoError(t, json.Unmarshal(b, &m))
return m
}

func contentArrayBeta(m map[string]any) []any {
if a, ok := m["content"].([]any); ok {
return a
}
return nil
}

func collectToolResultIDsBeta(content []any) map[string]struct{} {
ids := make(map[string]struct{})
for _, c := range content {
cb, ok := c.(map[string]any)
if !ok {
continue
}
if cb["type"] != "tool_result" {
continue
}
if id, _ := cb["tool_use_id"].(string); id != "" {
ids[id] = struct{}{}
}
}
return ids
}

func TestConvertBetaMessages_MergesConsecutiveToolMessages(t *testing.T) {
// Simulates the roast battle scenario where:
// - Assistant message has 2 tool_use blocks (transfer_task calls)
Expand Down Expand Up @@ -65,27 +99,22 @@ func TestConvertBetaMessages_MergesConsecutiveToolMessages(t *testing.T) {

require.Len(t, betaMessages, 4, "Should have 4 messages after conversion")

msg0Map, _ := marshalToMapBeta(betaMessages[0])
msg1Map, _ := marshalToMapBeta(betaMessages[1])
msg2Map, _ := marshalToMapBeta(betaMessages[2])
msg3Map, _ := marshalToMapBeta(betaMessages[3])
msg0Map := marshalToMapBeta(t, betaMessages[0])
msg1Map := marshalToMapBeta(t, betaMessages[1])
msg2Map := marshalToMapBeta(t, betaMessages[2])
msg3Map := marshalToMapBeta(t, betaMessages[3])
assert.Equal(t, "user", msg0Map["role"])
assert.Equal(t, "assistant", msg1Map["role"])
assert.Equal(t, "user", msg2Map["role"])
assert.Equal(t, "assistant", msg3Map["role"])

userMsg2Map, ok := marshalToMapBeta(betaMessages[2])
require.True(t, ok)
userMsg2Map := marshalToMapBeta(t, betaMessages[2])
content := contentArrayBeta(userMsg2Map)
require.Len(t, content, 2, "User message should have 2 tool_result blocks")

toolResultIDs := collectToolResultIDs(content)
toolResultIDs := collectToolResultIDsBeta(content)
assert.Contains(t, toolResultIDs, "tool_call_1")
assert.Contains(t, toolResultIDs, "tool_call_2")

// Most importantly: validate that the sequence is valid for Anthropic API
err = validateAnthropicSequencingBeta(betaMessages)
require.NoError(t, err, "Converted messages should pass Anthropic sequencing validation")
}

func TestConvertBetaMessages_SingleToolMessage(t *testing.T) {
Expand Down Expand Up @@ -123,10 +152,6 @@ func TestConvertBetaMessages_SingleToolMessage(t *testing.T) {
betaMessages, err := testClient().convertBetaMessages(t.Context(), messages)
require.NoError(t, err)
require.Len(t, betaMessages, 4)

// Validate sequence
err = validateAnthropicSequencingBeta(betaMessages)
require.NoError(t, err)
}

func TestConvertBetaMessages_NonConsecutiveToolMessages(t *testing.T) {
Expand Down Expand Up @@ -181,10 +206,6 @@ func TestConvertBetaMessages_NonConsecutiveToolMessages(t *testing.T) {
},
}

betaMessages, err := testClient().convertBetaMessages(t.Context(), messages)
_, err := testClient().convertBetaMessages(t.Context(), messages)
require.NoError(t, err)

// Validate the entire sequence
err = validateAnthropicSequencingBeta(betaMessages)
require.NoError(t, err, "Messages with non-consecutive tool calls should still validate")
}
Loading