diff --git a/elixir/README.md b/elixir/README.md index 7ad17628f..bed639da0 100644 --- a/elixir/README.md +++ b/elixir/README.md @@ -132,11 +132,11 @@ Title: {{ issue.title }} Body: {{ issue.description }} Notes: - If a value is missing, defaults are used. -- Safer Codex defaults are used when policy fields are omitted: - - `codex.approval_policy` defaults to `{"reject":{"sandbox_approval":true,"rules":true,"mcp_elicitations":true}}` - - `codex.thread_sandbox` defaults to `workspace-write` +- Default Codex policy fields are tuned for unattended orchestration: + - `codex.approval_policy` defaults to `never` + - `codex.thread_sandbox` defaults to `danger-full-access` - `codex.turn_sandbox_policy` defaults to a `workspaceWrite` policy rooted at the current issue workspace -- Supported `codex.approval_policy` values depend on the targeted Codex app-server version. In the current local Codex schema, string values include `untrusted`, `on-failure`, `on-request`, and `never`, and object-form `reject` is also supported. +- Supported `codex.approval_policy` values depend on the targeted Codex app-server version. In the current local Codex schema, string values include `untrusted`, `on-failure`, `on-request`, `granular`, and `never`. - Supported `codex.thread_sandbox` values: `read-only`, `workspace-write`, `danger-full-access`. - When `codex.turn_sandbox_policy` is set explicitly, Symphony passes the map through to Codex unchanged. Compatibility then depends on the targeted Codex app-server version rather than local diff --git a/elixir/lib/symphony_elixir/codex/app_server.ex b/elixir/lib/symphony_elixir/codex/app_server.ex index 7da87ce99..148209c82 100644 --- a/elixir/lib/symphony_elixir/codex/app_server.ex +++ b/elixir/lib/symphony_elixir/codex/app_server.ex @@ -83,14 +83,23 @@ defmodule SymphonyElixir.Codex.AppServer do ) do on_message = Keyword.get(opts, :on_message, &default_on_message/1) - tool_executor = - Keyword.get(opts, :tool_executor, fn tool, arguments -> - DynamicTool.execute(tool, arguments) - end) - case start_turn(port, thread_id, prompt, issue, workspace, approval_policy, turn_sandbox_policy) do {:ok, turn_id} -> session_id = "#{thread_id}-#{turn_id}" + + tool_executor = + Keyword.get(opts, :tool_executor, fn tool, arguments -> + DynamicTool.execute( + tool, + arguments, + issue: issue, + workspace: workspace, + thread_id: thread_id, + turn_id: turn_id, + session_id: session_id + ) + end) + Logger.info("Codex session started for #{issue_context(issue)} session_id=#{session_id}") emit_message( diff --git a/elixir/lib/symphony_elixir/codex/dynamic_tool.ex b/elixir/lib/symphony_elixir/codex/dynamic_tool.ex index 446c7fd2c..6fe0748ff 100644 --- a/elixir/lib/symphony_elixir/codex/dynamic_tool.ex +++ b/elixir/lib/symphony_elixir/codex/dynamic_tool.ex @@ -3,12 +3,16 @@ defmodule SymphonyElixir.Codex.DynamicTool do Executes client-side tool calls requested by Codex app-server turns. """ - alias SymphonyElixir.Linear.Client + alias SymphonyElixir.{CodexMonitor.Store, Config, Linear.Client} @linear_graphql_tool "linear_graphql" + @codex_monitor_task_tool "codex_monitor_task" @linear_graphql_description """ Execute a raw GraphQL query or mutation against Linear using Symphony's configured auth. """ + @codex_monitor_task_description """ + Read and update the current CodexMonitor task, including board state, worklog entries, and run telemetry. + """ @linear_graphql_input_schema %{ "type" => "object", "additionalProperties" => false, @@ -25,6 +29,40 @@ defmodule SymphonyElixir.Codex.DynamicTool do } } } + @codex_monitor_task_input_schema %{ + "type" => "object", + "additionalProperties" => false, + "required" => ["action"], + "properties" => %{ + "action" => %{ + "type" => "string", + "enum" => ["get_task", "append_worklog", "update_state", "update_run"], + "description" => "Operation to perform against the current CodexMonitor task." + }, + "taskId" => %{ + "type" => ["string", "null"], + "description" => "Optional task id override. Defaults to the current issue/task id." + }, + "message" => %{ + "type" => ["string", "null"], + "description" => "Worklog entry to append for `append_worklog`." + }, + "state" => %{ + "type" => ["string", "null"], + "description" => "Target state for `update_state`." + }, + "threadId" => %{"type" => ["string", "null"]}, + "worktreeWorkspaceId" => %{"type" => ["string", "null"]}, + "branchName" => %{"type" => ["string", "null"]}, + "pullRequestUrl" => %{"type" => ["string", "null"]}, + "sessionId" => %{"type" => ["string", "null"]}, + "lastEvent" => %{"type" => ["string", "null"]}, + "lastMessage" => %{"type" => ["string", "null"]}, + "lastError" => %{"type" => ["string", "null"]}, + "retryCount" => %{"type" => ["integer", "null"]}, + "tokenTotal" => %{"type" => ["integer", "null"]} + } + } @spec execute(String.t() | nil, term(), keyword()) :: map() def execute(tool, arguments, opts \\ []) do @@ -32,6 +70,9 @@ defmodule SymphonyElixir.Codex.DynamicTool do @linear_graphql_tool -> execute_linear_graphql(arguments, opts) + @codex_monitor_task_tool -> + execute_codex_monitor_task(arguments, opts) + other -> failure_response(%{ "error" => %{ @@ -44,13 +85,25 @@ defmodule SymphonyElixir.Codex.DynamicTool do @spec tool_specs() :: [map()] def tool_specs do - [ - %{ - "name" => @linear_graphql_tool, - "description" => @linear_graphql_description, - "inputSchema" => @linear_graphql_input_schema - } - ] + case Config.settings!().tracker.kind do + "codex_monitor" -> + [ + %{ + "name" => @codex_monitor_task_tool, + "description" => @codex_monitor_task_description, + "inputSchema" => @codex_monitor_task_input_schema + } + ] + + _ -> + [ + %{ + "name" => @linear_graphql_tool, + "description" => @linear_graphql_description, + "inputSchema" => @linear_graphql_input_schema + } + ] + end end defp execute_linear_graphql(arguments, opts) do @@ -65,6 +118,144 @@ defmodule SymphonyElixir.Codex.DynamicTool do end end + defp execute_codex_monitor_task(arguments, opts) do + store = Keyword.get(opts, :codex_monitor_store, Store) + + with {:ok, normalized} <- normalize_codex_monitor_task_arguments(arguments, opts), + {:ok, response} <- execute_codex_monitor_action(store, normalized) do + graphql_response(response) + else + {:error, reason} -> + failure_response(tool_error_payload(reason)) + end + end + + defp normalize_codex_monitor_task_arguments(arguments, opts) when is_map(arguments) do + with {:ok, action} <- required_string(arguments, "action"), + {:ok, task_id} <- resolve_task_id(arguments, opts, action) do + {:ok, + %{ + action: action, + task_id: task_id, + message: optional_string(arguments, "message"), + state: optional_string(arguments, "state"), + run_updates: %{ + thread_id: optional_string(arguments, "threadId") || Keyword.get(opts, :thread_id), + worktree_workspace_id: optional_string(arguments, "worktreeWorkspaceId"), + branch_name: optional_string(arguments, "branchName"), + pull_request_url: optional_string(arguments, "pullRequestUrl"), + session_id: optional_string(arguments, "sessionId") || Keyword.get(opts, :session_id), + last_event: optional_string(arguments, "lastEvent"), + last_message: optional_string(arguments, "lastMessage"), + last_error: optional_string(arguments, "lastError"), + retry_count: optional_integer(arguments, "retryCount"), + token_total: optional_integer(arguments, "tokenTotal") + } + }} + end + end + + defp normalize_codex_monitor_task_arguments(_arguments, _opts), + do: {:error, :invalid_codex_monitor_task_arguments} + + defp execute_codex_monitor_action(store, %{action: "get_task", task_id: task_id}) do + store.get_task_context(task_id) + end + + defp execute_codex_monitor_action(store, %{action: "append_worklog", task_id: task_id, message: message}) do + with {:ok, message} <- require_value(message, :missing_codex_monitor_message), + :ok <- store.append_worklog(task_id, message), + {:ok, task_context} <- store.get_task_context(task_id) do + {:ok, task_context} + end + end + + defp execute_codex_monitor_action(store, %{action: "update_state", task_id: task_id, state: state, message: message}) do + with {:ok, state} <- require_value(state, :missing_codex_monitor_state), + :ok <- store.update_issue_state(task_id, state), + :ok <- maybe_append_worklog(store, task_id, message), + {:ok, task_context} <- store.get_task_context(task_id) do + {:ok, task_context} + end + end + + defp execute_codex_monitor_action(store, %{action: "update_run", task_id: task_id, run_updates: run_updates}) do + with :ok <- store.update_task_run(task_id, prune_nil_map(run_updates)), + {:ok, task_context} <- store.get_task_context(task_id) do + {:ok, task_context} + end + end + + defp execute_codex_monitor_action(_store, %{action: action}) do + {:error, {:unsupported_codex_monitor_task_action, action}} + end + + defp required_string(arguments, key) do + case optional_string(arguments, key) do + nil -> {:error, {:missing_required_argument, key}} + value -> {:ok, value} + end + end + + defp resolve_task_id(arguments, opts, action) do + case optional_string(arguments, "taskId") || issue_id_from_opts(opts) do + nil when action in ["get_task", "append_worklog", "update_state", "update_run"] -> + {:error, :missing_codex_monitor_task_id} + + task_id -> + {:ok, task_id} + end + end + + defp issue_id_from_opts(opts) do + case Keyword.get(opts, :issue) do + %{id: issue_id} when is_binary(issue_id) -> issue_id + _ -> nil + end + end + + defp optional_string(arguments, key) do + case Map.get(arguments, key) || Map.get(arguments, String.to_atom(key)) do + value when is_binary(value) -> + case String.trim(value) do + "" -> nil + trimmed -> trimmed + end + + _ -> + nil + end + end + + defp optional_integer(arguments, key) do + case Map.get(arguments, key) || Map.get(arguments, String.to_atom(key)) do + value when is_integer(value) -> + value + + value when is_binary(value) -> + case Integer.parse(value) do + {parsed, ""} -> parsed + _ -> nil + end + + _ -> + nil + end + end + + defp require_value(nil, reason), do: {:error, reason} + defp require_value(value, _reason), do: {:ok, value} + + defp maybe_append_worklog(_store, _task_id, nil), do: :ok + defp maybe_append_worklog(store, task_id, message), do: store.append_worklog(task_id, message) + + defp prune_nil_map(map) when is_map(map) do + Enum.reduce(map, %{}, fn + {_key, nil}, acc -> acc + {key, value}, acc -> Map.put(acc, key, value) + end) + end + defp normalize_linear_graphql_arguments(arguments) when is_binary(arguments) do case String.trim(arguments) do "" -> {:error, :missing_query} @@ -176,6 +367,104 @@ defmodule SymphonyElixir.Codex.DynamicTool do } end + defp tool_error_payload(:missing_codex_monitor_database_path) do + %{ + "error" => %{ + "message" => "Symphony is missing `tracker.database_path` for the CodexMonitor adapter." + } + } + end + + defp tool_error_payload(:missing_codex_monitor_task_id) do + %{ + "error" => %{ + "message" => "`codex_monitor_task` requires a task id or current issue context." + } + } + end + + defp tool_error_payload(:missing_codex_monitor_message) do + %{ + "error" => %{ + "message" => "`codex_monitor_task` requires `message` for `append_worklog`." + } + } + end + + defp tool_error_payload(:missing_codex_monitor_state) do + %{ + "error" => %{ + "message" => "`codex_monitor_task` requires `state` for `update_state`." + } + } + end + + defp tool_error_payload(:invalid_codex_monitor_task_arguments) do + %{ + "error" => %{ + "message" => "`codex_monitor_task` expects a JSON object with at least an `action` field." + } + } + end + + defp tool_error_payload({:missing_required_argument, key}) do + %{ + "error" => %{ + "message" => "Missing required argument `#{key}`." + } + } + end + + defp tool_error_payload({:unsupported_codex_monitor_task_action, action}) do + %{ + "error" => %{ + "message" => "Unsupported `codex_monitor_task` action #{inspect(action)}." + } + } + end + + defp tool_error_payload({:unknown_codex_monitor_state, state_name}) do + %{ + "error" => %{ + "message" => "Unknown CodexMonitor task state #{inspect(state_name)}." + } + } + end + + defp tool_error_payload(:task_not_found) do + %{ + "error" => %{ + "message" => "The requested CodexMonitor task was not found." + } + } + end + + defp tool_error_payload(:sqlite3_not_found) do + %{ + "error" => %{ + "message" => "The local `sqlite3` binary is required for the CodexMonitor Symphony adapter." + } + } + end + + defp tool_error_payload({:sqlite_command_failed, status, detail}) do + %{ + "error" => %{ + "message" => "CodexMonitor SQLite command failed with status #{status}.", + "detail" => to_string(detail) + } + } + end + + defp tool_error_payload({:sqlite_json_decode_failed, reason}) do + %{ + "error" => %{ + "message" => "Failed to decode CodexMonitor SQLite JSON output.", + "reason" => inspect(reason) + } + } + end + defp tool_error_payload({:linear_api_status, status}) do %{ "error" => %{ diff --git a/elixir/lib/symphony_elixir/codex_monitor/store.ex b/elixir/lib/symphony_elixir/codex_monitor/store.ex new file mode 100644 index 000000000..724ffd0f5 --- /dev/null +++ b/elixir/lib/symphony_elixir/codex_monitor/store.ex @@ -0,0 +1,556 @@ +defmodule SymphonyElixir.CodexMonitor.Store do + @moduledoc """ + Small SQLite-backed bridge for CodexMonitor workspace task boards. + + This intentionally uses the local `sqlite3` CLI for development/debugging so + the Symphony prototype can run against CodexMonitor without pulling in a new + database dependency. + """ + + alias SymphonyElixir.{Config, Linear.Issue} + + @sqlite_binary "sqlite3" + + @type task_run_updates :: %{ + optional(:thread_id) => String.t() | nil, + optional(:worktree_workspace_id) => String.t() | nil, + optional(:branch_name) => String.t() | nil, + optional(:pull_request_url) => String.t() | nil, + optional(:session_id) => String.t() | nil, + optional(:last_event) => String.t() | nil, + optional(:last_message) => String.t() | nil, + optional(:last_error) => String.t() | nil, + optional(:retry_count) => integer() | nil, + optional(:token_total) => integer() | nil + } + + @spec fetch_candidate_issues() :: {:ok, [Issue.t()]} | {:error, term()} + def fetch_candidate_issues do + fetch_issues_by_states(Config.settings!().tracker.active_states) + end + + @spec fetch_issues_by_states([String.t()]) :: {:ok, [Issue.t()]} | {:error, term()} + def fetch_issues_by_states(state_names) when is_list(state_names) do + with {:ok, db_path} <- database_path(), + statuses <- Enum.map(state_names, &normalize_status/1), + statuses <- Enum.reject(statuses, &is_nil/1), + false <- statuses == [], + {:ok, rows} <- query_json(db_path, tasks_by_states_sql(statuses)) do + {:ok, Enum.map(rows, &issue_from_row/1)} + else + true -> {:ok, []} + error -> error + end + end + + @spec fetch_issue_states_by_ids([String.t()]) :: {:ok, [Issue.t()]} | {:error, term()} + def fetch_issue_states_by_ids(issue_ids) when is_list(issue_ids) do + with {:ok, db_path} <- database_path(), + ids <- Enum.filter(issue_ids, &is_binary/1), + {:ok, rows} <- query_json(db_path, tasks_by_ids_sql(ids)) do + {:ok, Enum.map(rows, &issue_from_row/1)} + end + end + + @spec create_comment(String.t(), String.t()) :: :ok | {:error, term()} + def create_comment(issue_id, body) when is_binary(issue_id) and is_binary(body) do + append_worklog(issue_id, body) + end + + @spec update_issue_state(String.t(), String.t()) :: :ok | {:error, term()} + def update_issue_state(issue_id, state_name) + when is_binary(issue_id) and is_binary(state_name) do + with {:ok, db_path} <- database_path(), + {:ok, status} <- normalize_status_required(state_name), + {:ok, next_order_index} <- next_order_index(db_path, status), + :ok <- + exec( + db_path, + update_task_state_sql(issue_id, status, next_order_index, humanize_status(status)) + ) do + :ok + end + end + + @spec append_worklog(String.t(), String.t()) :: :ok | {:error, term()} + def append_worklog(issue_id, body) when is_binary(issue_id) and is_binary(body) do + with {:ok, db_path} <- database_path(), + trimmed when trimmed != "" <- String.trim(body), + :ok <- exec(db_path, insert_task_event_sql(issue_id, trimmed)) do + :ok + else + "" -> {:error, :missing_message} + error -> error + end + end + + @spec update_task_run(String.t(), task_run_updates()) :: :ok | {:error, term()} + def update_task_run(issue_id, attrs) when is_binary(issue_id) and is_map(attrs) do + with {:ok, db_path} <- database_path(), + {:ok, task} <- fetch_task(db_path, issue_id), + :ok <- upsert_task_run(db_path, task, attrs) do + :ok + end + end + + @spec get_task_context(String.t()) :: {:ok, map()} | {:error, term()} + def get_task_context(issue_id) when is_binary(issue_id) do + with {:ok, db_path} <- database_path(), + {:ok, [task]} <- fetch_issue_states_by_ids([issue_id]), + {:ok, events} <- query_json(db_path, recent_events_sql(issue_id)) do + {:ok, + %{ + "task" => task_to_map(task), + "recentEvents" => events, + "databasePath" => db_path + }} + else + {:ok, []} -> {:error, :task_not_found} + error -> error + end + end + + def get_task_context(_issue_id), do: {:error, :task_not_found} + + @spec database_path() :: {:ok, String.t()} | {:error, term()} + def database_path do + case Config.settings!().tracker.database_path do + path when is_binary(path) and path != "" -> {:ok, path} + _ -> {:error, :missing_codex_monitor_database_path} + end + end + + defp fetch_task(db_path, issue_id) do + case query_json(db_path, tasks_by_ids_sql([issue_id])) do + {:ok, [row | _]} -> {:ok, row} + {:ok, []} -> {:error, :task_not_found} + {:error, reason} -> {:error, reason} + end + end + + defp upsert_task_run(db_path, task, attrs) do + with {:ok, rows} <- query_json(db_path, latest_run_sql(task["id"])), + existing_run = List.first(rows), + target_run = if(fresh_task_run?(existing_run, attrs), do: nil, else: existing_run), + merged_attrs = merge_task_run_attrs(existing_run, attrs), + :ok <- exec(db_path, upsert_task_run_sql(task, target_run, merged_attrs)) do + :ok + end + end + + defp task_to_map(%Issue{} = task) do + %{ + "id" => task.id, + "identifier" => task.identifier, + "title" => task.title, + "description" => task.description, + "state" => task.state, + "branchName" => task.branch_name, + "url" => task.url, + "createdAt" => maybe_iso8601(task.created_at), + "updatedAt" => maybe_iso8601(task.updated_at) + } + end + + defp issue_from_row(row) do + %Issue{ + id: row["id"], + identifier: row["identifier"] || row["id"], + title: row["title"], + description: row["description"], + state: humanize_status(row["status"]), + branch_name: row["branch_name"], + url: nil, + assignee_id: nil, + blocked_by: [], + labels: [], + assigned_to_worker: true, + created_at: parse_datetime(row["created_at_ms"]), + updated_at: parse_datetime(row["updated_at_ms"]) + } + end + + defp maybe_iso8601(%DateTime{} = value), do: DateTime.to_iso8601(value) + defp maybe_iso8601(_value), do: nil + + defp parse_datetime(value) when is_integer(value) do + DateTime.from_unix!(value, :millisecond) + rescue + _ -> nil + end + + defp parse_datetime(value) when is_binary(value) do + case Integer.parse(value) do + {ms, ""} -> parse_datetime(ms) + _ -> nil + end + end + + defp parse_datetime(_value), do: nil + + defp tasks_by_states_sql(statuses) do + state_list = + statuses + |> Enum.uniq() + |> Enum.map_join(", ", &sql_string/1) + + """ + SELECT #{task_select_fields()} + FROM tasks t + LEFT JOIN task_runs r ON r.id = ( + SELECT id + FROM task_runs + WHERE task_id = t.id + ORDER BY started_at_ms DESC + LIMIT 1 + ) + WHERE #{workspace_filter_sql("t.workspace_id")} + AND t.status IN (#{state_list}) + ORDER BY t.updated_at_ms ASC, t.order_index ASC + """ + end + + defp tasks_by_ids_sql(issue_ids) do + id_list = + issue_ids + |> Enum.uniq() + |> Enum.map_join(", ", &sql_string/1) + + """ + SELECT #{task_select_fields()} + FROM tasks t + LEFT JOIN task_runs r ON r.id = ( + SELECT id + FROM task_runs + WHERE task_id = t.id + ORDER BY started_at_ms DESC + LIMIT 1 + ) + WHERE #{workspace_filter_sql("t.workspace_id")} + AND t.id IN (#{id_list}) + ORDER BY t.updated_at_ms ASC, t.order_index ASC + """ + end + + defp task_select_fields do + [ + "t.id", + "t.workspace_id", + "t.title", + "COALESCE(t.description, '') AS description", + "t.status", + "t.created_at_ms", + "t.updated_at_ms", + "COALESCE(r.branch_name, '') AS branch_name", + "t.id AS identifier" + ] + |> Enum.join(", ") + end + + defp latest_run_sql(issue_id) do + """ + SELECT + id, + thread_id, + worktree_workspace_id, + branch_name, + pull_request_url, + session_id, + last_event, + last_message, + last_error, + retry_count, + token_total + FROM task_runs + WHERE task_id = #{sql_string(issue_id)} + ORDER BY started_at_ms DESC + LIMIT 1 + """ + end + + defp recent_events_sql(issue_id) do + """ + SELECT id, task_id, workspace_id, message, created_at_ms + FROM task_events + WHERE task_id = #{sql_string(issue_id)} + AND #{workspace_filter_sql("workspace_id")} + ORDER BY created_at_ms DESC + LIMIT 20 + """ + end + + defp insert_task_event_sql(issue_id, message) do + now = now_ms() + + """ + INSERT INTO task_events (id, task_id, workspace_id, message, created_at_ms) + VALUES ( + #{sql_string(generate_id())}, + #{sql_string(issue_id)}, + (SELECT workspace_id FROM tasks WHERE id = #{sql_string(issue_id)} LIMIT 1), + #{sql_string(message)}, + #{now} + ); + UPDATE tasks + SET updated_at_ms = #{now} + WHERE id = #{sql_string(issue_id)} + AND #{workspace_filter_sql("workspace_id")}; + """ + end + + defp update_task_state_sql(issue_id, status, next_order_index, human_state) do + now = now_ms() + + """ + UPDATE tasks + SET status = #{sql_string(status)}, + order_index = #{next_order_index}, + updated_at_ms = #{now} + WHERE id = #{sql_string(issue_id)} + AND #{workspace_filter_sql("workspace_id")}; + INSERT INTO task_events (id, task_id, workspace_id, message, created_at_ms) + VALUES ( + #{sql_string(generate_id())}, + #{sql_string(issue_id)}, + (SELECT workspace_id FROM tasks WHERE id = #{sql_string(issue_id)} LIMIT 1), + #{sql_string("Symphony moved the task to #{human_state}.")}, + #{now} + ); + """ + end + + defp upsert_task_run_sql(task, existing_run, attrs) do + now = now_ms() + run_id = (existing_run && existing_run["id"]) || generate_id() + workspace_id = task["workspace_id"] + task_id = task["id"] + session_id = map_value(attrs, :session_id) + + if is_map(existing_run) do + """ + UPDATE task_runs + SET thread_id = #{sql_nullable_string(map_value(attrs, :thread_id))}, + worktree_workspace_id = #{sql_nullable_string(map_value(attrs, :worktree_workspace_id))}, + branch_name = #{sql_nullable_string(map_value(attrs, :branch_name))}, + pull_request_url = #{sql_nullable_string(map_value(attrs, :pull_request_url))}, + session_id = #{sql_nullable_string(session_id)}, + last_event = #{sql_nullable_string(map_value(attrs, :last_event))}, + last_message = #{sql_nullable_string(map_value(attrs, :last_message))}, + last_error = #{sql_nullable_string(map_value(attrs, :last_error))}, + retry_count = #{sql_nullable_integer(map_value(attrs, :retry_count), 0)}, + token_total = #{sql_nullable_integer(map_value(attrs, :token_total), 0)}, + updated_at_ms = #{now} + WHERE id = #{sql_string(run_id)}; + """ + else + """ + INSERT INTO task_runs ( + id, task_id, workspace_id, thread_id, worktree_workspace_id, branch_name, + pull_request_url, session_id, last_event, last_message, last_error, + retry_count, token_total, started_at_ms, updated_at_ms + ) VALUES ( + #{sql_string(run_id)}, + #{sql_string(task_id)}, + #{sql_string(workspace_id)}, + #{sql_nullable_string(map_value(attrs, :thread_id))}, + #{sql_nullable_string(map_value(attrs, :worktree_workspace_id))}, + #{sql_nullable_string(map_value(attrs, :branch_name))}, + #{sql_nullable_string(map_value(attrs, :pull_request_url))}, + #{sql_nullable_string(session_id)}, + #{sql_nullable_string(map_value(attrs, :last_event))}, + #{sql_nullable_string(map_value(attrs, :last_message))}, + #{sql_nullable_string(map_value(attrs, :last_error))}, + #{sql_nullable_integer(map_value(attrs, :retry_count), 0)}, + #{sql_nullable_integer(map_value(attrs, :token_total), 0)}, + #{now}, + #{now} + ); + """ + end + end + + defp fresh_task_run?(existing_run, attrs) when is_map(existing_run) and is_map(attrs) do + task_run_identity_keys() + |> Enum.any?(fn {attr_key, row_key} -> + attr_value = normalized_identity_value(map_value(attrs, attr_key)) + existing_value = normalized_identity_value(Map.get(existing_run, row_key)) + + attr_value != nil and existing_value != nil and attr_value != existing_value + end) + end + + defp fresh_task_run?(_existing_run, _attrs), do: false + + defp merge_task_run_attrs(nil, attrs), do: attrs + + defp merge_task_run_attrs(existing_run, attrs) when is_map(existing_run) and is_map(attrs) do + Enum.reduce(task_run_field_mappings(), %{}, fn {attr_key, row_key}, acc -> + value = + if has_map_value?(attrs, attr_key) do + map_value(attrs, attr_key) + else + Map.get(existing_run, row_key) + end + + Map.put(acc, attr_key, value) + end) + end + + defp task_run_field_mappings do + [ + {:thread_id, "thread_id"}, + {:worktree_workspace_id, "worktree_workspace_id"}, + {:branch_name, "branch_name"}, + {:pull_request_url, "pull_request_url"}, + {:session_id, "session_id"}, + {:last_event, "last_event"}, + {:last_message, "last_message"}, + {:last_error, "last_error"}, + {:retry_count, "retry_count"}, + {:token_total, "token_total"} + ] + end + + defp task_run_identity_keys do + [ + {:session_id, "session_id"}, + {:thread_id, "thread_id"} + ] + end + + defp normalized_identity_value(value) when is_binary(value) do + case String.trim(value) do + "" -> nil + trimmed -> trimmed + end + end + + defp normalized_identity_value(value), do: value + + defp has_map_value?(map, key) when is_map(map) do + Map.has_key?(map, key) || Map.has_key?(map, Atom.to_string(key)) + end + + defp next_order_index(db_path, status) do + sql = """ + SELECT COALESCE(MAX(order_index), -1) + 1 AS next_order_index + FROM tasks + WHERE #{workspace_filter_sql("workspace_id")} + AND status = #{sql_string(status)} + """ + + with {:ok, [row | _]} <- query_json(db_path, sql), + value when is_integer(value) <- row["next_order_index"] do + {:ok, value} + else + {:ok, [row | _]} -> + {:ok, row["next_order_index"] |> to_string() |> String.to_integer()} + + {:ok, []} -> + {:ok, 0} + + {:error, reason} -> + {:error, reason} + end + end + + defp workspace_filter_sql(column_name) do + case Config.settings!().tracker.workspace_id do + workspace_id when is_binary(workspace_id) and workspace_id != "" -> + "#{column_name} = #{sql_string(workspace_id)}" + + _ -> + "1 = 1" + end + end + + defp normalize_status_required(state_name) do + case normalize_status(state_name) do + nil -> {:error, {:unknown_codex_monitor_state, state_name}} + status -> {:ok, status} + end + end + + defp normalize_status(state_name) when is_binary(state_name) do + case state_name |> String.trim() |> String.downcase() do + "backlog" -> "backlog" + "todo" -> "todo" + "in progress" -> "in_progress" + "in_progress" -> "in_progress" + "human review" -> "human_review" + "human_review" -> "human_review" + "rework" -> "rework" + "merging" -> "merging" + "done" -> "done" + _ -> nil + end + end + + defp normalize_status(_state_name), do: nil + + defp humanize_status("backlog"), do: "Backlog" + defp humanize_status("todo"), do: "Todo" + defp humanize_status("in_progress"), do: "In Progress" + defp humanize_status("human_review"), do: "Human Review" + defp humanize_status("rework"), do: "Rework" + defp humanize_status("merging"), do: "Merging" + defp humanize_status("done"), do: "Done" + defp humanize_status(value) when is_binary(value), do: value + defp humanize_status(_value), do: "Unknown" + + defp map_value(map, key) do + Map.get(map, key) || Map.get(map, Atom.to_string(key)) + end + + defp query_json(db_path, sql) do + with :ok <- ensure_sqlite!() do + case System.cmd(@sqlite_binary, ["-json", db_path, sql], stderr_to_stdout: true) do + {output, 0} -> + case decode_sqlite_json(output) do + {:ok, payload} -> {:ok, payload} + {:error, reason} -> {:error, {:sqlite_json_decode_failed, reason}} + end + + {output, status} -> + {:error, {:sqlite_command_failed, status, output}} + end + end + end + + defp decode_sqlite_json(output) when is_binary(output) do + case String.trim(output) do + "" -> {:ok, []} + trimmed -> Jason.decode(trimmed) + end + end + + defp exec(db_path, sql) do + with :ok <- ensure_sqlite!() do + case System.cmd(@sqlite_binary, [db_path, sql], stderr_to_stdout: true) do + {_output, 0} -> :ok + {output, status} -> {:error, {:sqlite_command_failed, status, output}} + end + end + end + + defp ensure_sqlite! do + if System.find_executable(@sqlite_binary) do + :ok + else + {:error, :sqlite3_not_found} + end + end + + defp sql_string(value) when is_binary(value) do + "'" <> String.replace(value, "'", "''") <> "'" + end + + defp sql_nullable_string(nil), do: "NULL" + defp sql_nullable_string(value) when is_binary(value), do: sql_string(value) + + defp sql_nullable_integer(nil, default), do: Integer.to_string(default) + defp sql_nullable_integer(value, _default) when is_integer(value), do: Integer.to_string(value) + + defp now_ms, do: DateTime.utc_now() |> DateTime.to_unix(:millisecond) + defp generate_id, do: Ecto.UUID.generate() +end diff --git a/elixir/lib/symphony_elixir/config.ex b/elixir/lib/symphony_elixir/config.ex index 00e7f9b7e..c5b5db954 100644 --- a/elixir/lib/symphony_elixir/config.ex +++ b/elixir/lib/symphony_elixir/config.ex @@ -7,7 +7,7 @@ defmodule SymphonyElixir.Config do alias SymphonyElixir.Workflow @default_prompt_template """ - You are working on a Linear issue. + You are working on a tracked task. Identifier: {{ issue.identifier }} Title: {{ issue.title }} @@ -119,7 +119,7 @@ defmodule SymphonyElixir.Config do is_nil(settings.tracker.kind) -> {:error, :missing_tracker_kind} - settings.tracker.kind not in ["linear", "memory"] -> + settings.tracker.kind not in ["linear", "memory", "codex_monitor"] -> {:error, {:unsupported_tracker_kind, settings.tracker.kind}} settings.tracker.kind == "linear" and not is_binary(settings.tracker.api_key) -> @@ -128,6 +128,9 @@ defmodule SymphonyElixir.Config do settings.tracker.kind == "linear" and not is_binary(settings.tracker.project_slug) -> {:error, :missing_linear_project_slug} + settings.tracker.kind == "codex_monitor" and not is_binary(settings.tracker.database_path) -> + {:error, :missing_codex_monitor_database_path} + true -> :ok end diff --git a/elixir/lib/symphony_elixir/config/schema.ex b/elixir/lib/symphony_elixir/config/schema.ex index 17ead4ad6..87c5e4a90 100644 --- a/elixir/lib/symphony_elixir/config/schema.ex +++ b/elixir/lib/symphony_elixir/config/schema.ex @@ -50,6 +50,8 @@ defmodule SymphonyElixir.Config.Schema do field(:api_key, :string) field(:project_slug, :string) field(:assignee, :string) + field(:database_path, :string) + field(:workspace_id, :string) field(:active_states, {:array, :string}, default: ["Todo", "In Progress"]) field(:terminal_states, {:array, :string}, default: ["Closed", "Cancelled", "Canceled", "Duplicate", "Done"]) end @@ -59,7 +61,17 @@ defmodule SymphonyElixir.Config.Schema do schema |> cast( attrs, - [:kind, :endpoint, :api_key, :project_slug, :assignee, :active_states, :terminal_states], + [ + :kind, + :endpoint, + :api_key, + :project_slug, + :assignee, + :database_path, + :workspace_id, + :active_states, + :terminal_states + ], empty_values: [] ) end @@ -158,18 +170,8 @@ defmodule SymphonyElixir.Config.Schema do @primary_key false embedded_schema do field(:command, :string, default: "codex app-server") - - field(:approval_policy, StringOrMap, - default: %{ - "reject" => %{ - "sandbox_approval" => true, - "rules" => true, - "mcp_elicitations" => true - } - } - ) - - field(:thread_sandbox, :string, default: "workspace-write") + field(:approval_policy, StringOrMap, default: "never") + field(:thread_sandbox, :string, default: "danger-full-access") field(:turn_sandbox_policy, :map) field(:turn_timeout_ms, :integer, default: 3_600_000) field(:read_timeout_ms, :integer, default: 5_000) @@ -369,7 +371,8 @@ defmodule SymphonyElixir.Config.Schema do tracker = %{ settings.tracker | api_key: resolve_secret_setting(settings.tracker.api_key, System.get_env("LINEAR_API_KEY")), - assignee: resolve_secret_setting(settings.tracker.assignee, System.get_env("LINEAR_ASSIGNEE")) + assignee: resolve_secret_setting(settings.tracker.assignee, System.get_env("LINEAR_ASSIGNEE")), + database_path: resolve_path_value(settings.tracker.database_path, nil) } workspace = %{ @@ -435,6 +438,8 @@ defmodule SymphonyElixir.Config.Schema do end end + defp resolve_path_value(nil, default), do: default + defp resolve_env_value(value, fallback) when is_binary(value) do case env_reference_name(value) do {:ok, env_name} -> diff --git a/elixir/lib/symphony_elixir/orchestrator.ex b/elixir/lib/symphony_elixir/orchestrator.ex index 3cd814829..88a1007a9 100644 --- a/elixir/lib/symphony_elixir/orchestrator.ex +++ b/elixir/lib/symphony_elixir/orchestrator.ex @@ -237,6 +237,10 @@ defmodule SymphonyElixir.Orchestrator do Logger.error("Linear project slug missing in WORKFLOW.md") state + {:error, :missing_codex_monitor_database_path} -> + Logger.error("CodexMonitor tracker database_path missing in WORKFLOW.md") + state + {:error, :missing_tracker_kind} -> Logger.error("Tracker kind missing in WORKFLOW.md") @@ -333,6 +337,23 @@ defmodule SymphonyElixir.Orchestrator do select_worker_host(state, preferred_worker_host) end + @doc false + @spec dispatch_claim_state_name_for_test() :: String.t() + def dispatch_claim_state_name_for_test do + dispatch_claim_state_name() + end + + @doc false + @spec claim_issue_for_dispatch_for_test( + Issue.t(), + (String.t(), String.t() -> :ok | {:error, term()}), + ([String.t()] -> {:ok, [Issue.t()]} | {:error, term()}) + ) :: {:ok, Issue.t()} | {:error, term()} + def claim_issue_for_dispatch_for_test(%Issue{} = issue, update_issue_state_fun, issue_fetcher) + when is_function(update_issue_state_fun, 2) and is_function(issue_fetcher, 1) do + claim_issue_for_dispatch(issue, update_issue_state_fun, issue_fetcher) + end + defp reconcile_running_issue_states([], state, _active_states, _terminal_states), do: state defp reconcile_running_issue_states([issue | rest], state, active_states, terminal_states) do @@ -691,44 +712,67 @@ defmodule SymphonyElixir.Orchestrator do end defp spawn_issue_on_worker_host(%State{} = state, issue, attempt, recipient, worker_host) do + dispatch_gate_ref = make_ref() + case Task.Supervisor.start_child(SymphonyElixir.TaskSupervisor, fn -> - AgentRunner.run(issue, recipient, attempt: attempt, worker_host: worker_host) + receive do + {:dispatch_start, ^dispatch_gate_ref, claimed_issue} -> + AgentRunner.run(claimed_issue, recipient, attempt: attempt, worker_host: worker_host) + after + 5_000 -> + exit(:dispatch_start_timeout) + end end) do {:ok, pid} -> - ref = Process.monitor(pid) - - Logger.info("Dispatching issue to agent: #{issue_context(issue)} pid=#{inspect(pid)} attempt=#{inspect(attempt)} worker_host=#{worker_host || "local"}") - - running = - Map.put(state.running, issue.id, %{ - pid: pid, - ref: ref, - identifier: issue.identifier, - issue: issue, - worker_host: worker_host, - workspace_path: nil, - session_id: nil, - last_codex_message: nil, - last_codex_timestamp: nil, - last_codex_event: nil, - codex_app_server_pid: nil, - codex_input_tokens: 0, - codex_output_tokens: 0, - codex_total_tokens: 0, - codex_last_reported_input_tokens: 0, - codex_last_reported_output_tokens: 0, - codex_last_reported_total_tokens: 0, - turn_count: 0, - retry_attempt: normalize_retry_attempt(attempt), - started_at: DateTime.utc_now() - }) + case claim_issue_for_dispatch(issue, &Tracker.update_issue_state/2, &Tracker.fetch_issue_states_by_ids/1) do + {:ok, claimed_issue} -> + send(pid, {:dispatch_start, dispatch_gate_ref, claimed_issue}) + ref = Process.monitor(pid) + + Logger.info("Dispatching issue to agent: #{issue_context(claimed_issue)} pid=#{inspect(pid)} attempt=#{inspect(attempt)} worker_host=#{worker_host || "local"}") + + running = + Map.put(state.running, issue.id, %{ + pid: pid, + ref: ref, + identifier: claimed_issue.identifier, + issue: claimed_issue, + worker_host: worker_host, + workspace_path: nil, + session_id: nil, + last_codex_message: nil, + last_codex_timestamp: nil, + last_codex_event: nil, + codex_app_server_pid: nil, + codex_input_tokens: 0, + codex_output_tokens: 0, + codex_total_tokens: 0, + codex_last_reported_input_tokens: 0, + codex_last_reported_output_tokens: 0, + codex_last_reported_total_tokens: 0, + turn_count: 0, + retry_attempt: normalize_retry_attempt(attempt), + started_at: DateTime.utc_now() + }) - %{ - state - | running: running, - claimed: MapSet.put(state.claimed, issue.id), - retry_attempts: Map.delete(state.retry_attempts, issue.id) - } + %{ + state + | running: running, + claimed: MapSet.put(state.claimed, issue.id), + retry_attempts: Map.delete(state.retry_attempts, issue.id) + } + + {:error, reason} -> + Logger.warning("Failed to claim issue for dispatch: #{issue_context(issue)} reason=#{inspect(reason)}") + Process.exit(pid, :kill) + next_attempt = if is_integer(attempt), do: attempt + 1, else: nil + + schedule_issue_retry(state, issue.id, next_attempt, %{ + identifier: issue.identifier, + error: "failed to claim issue for dispatch: #{inspect(reason)}", + worker_host: worker_host + }) + end {:error, reason} -> Logger.error("Unable to spawn agent for #{issue_context(issue)}: #{inspect(reason)}") @@ -742,6 +786,43 @@ defmodule SymphonyElixir.Orchestrator do end end + defp claim_issue_for_dispatch(%Issue{state: state_name} = issue, update_issue_state_fun, issue_fetcher) + when is_binary(state_name) do + if normalize_issue_state(state_name) == "todo" do + claim_todo_issue_for_dispatch(issue, update_issue_state_fun, issue_fetcher) + else + {:ok, issue} + end + end + + defp claim_issue_for_dispatch(issue, _update_issue_state_fun, _issue_fetcher), do: {:ok, issue} + + defp claim_todo_issue_for_dispatch(%Issue{id: issue_id} = issue, update_issue_state_fun, issue_fetcher) + when is_binary(issue_id) and is_function(update_issue_state_fun, 2) and + is_function(issue_fetcher, 1) do + dispatch_state = dispatch_claim_state_name() + + with :ok <- update_issue_state_fun.(issue_id, dispatch_state) do + case issue_fetcher.([issue_id]) do + {:ok, [%Issue{} = refreshed_issue | _]} -> + {:ok, refreshed_issue} + + {:ok, _} -> + {:ok, %{issue | state: dispatch_state}} + + {:error, reason} -> + {:error, reason} + end + end + end + + defp dispatch_claim_state_name do + Config.settings!().tracker.active_states + |> Enum.find("In Progress", fn state_name -> + normalize_issue_state(state_name) != "todo" + end) + end + defp revalidate_issue_for_dispatch(%Issue{id: issue_id}, issue_fetcher, terminal_states) when is_binary(issue_id) and is_function(issue_fetcher, 1) do case issue_fetcher.([issue_id]) do diff --git a/elixir/lib/symphony_elixir/tracker.ex b/elixir/lib/symphony_elixir/tracker.ex index 000b6edf8..919c89dc2 100644 --- a/elixir/lib/symphony_elixir/tracker.ex +++ b/elixir/lib/symphony_elixir/tracker.ex @@ -40,6 +40,7 @@ defmodule SymphonyElixir.Tracker do def adapter do case Config.settings!().tracker.kind do "memory" -> SymphonyElixir.Tracker.Memory + "codex_monitor" -> SymphonyElixir.Tracker.CodexMonitor _ -> SymphonyElixir.Linear.Adapter end end diff --git a/elixir/lib/symphony_elixir/tracker/codex_monitor.ex b/elixir/lib/symphony_elixir/tracker/codex_monitor.ex new file mode 100644 index 000000000..687d446b5 --- /dev/null +++ b/elixir/lib/symphony_elixir/tracker/codex_monitor.ex @@ -0,0 +1,28 @@ +defmodule SymphonyElixir.Tracker.CodexMonitor do + @moduledoc """ + CodexMonitor-backed tracker adapter. + """ + + @behaviour SymphonyElixir.Tracker + + alias SymphonyElixir.CodexMonitor.Store + + @spec fetch_candidate_issues() :: {:ok, [term()]} | {:error, term()} + def fetch_candidate_issues, do: store_module().fetch_candidate_issues() + + @spec fetch_issues_by_states([String.t()]) :: {:ok, [term()]} | {:error, term()} + def fetch_issues_by_states(states), do: store_module().fetch_issues_by_states(states) + + @spec fetch_issue_states_by_ids([String.t()]) :: {:ok, [term()]} | {:error, term()} + def fetch_issue_states_by_ids(issue_ids), do: store_module().fetch_issue_states_by_ids(issue_ids) + + @spec create_comment(String.t(), String.t()) :: :ok | {:error, term()} + def create_comment(issue_id, body), do: store_module().create_comment(issue_id, body) + + @spec update_issue_state(String.t(), String.t()) :: :ok | {:error, term()} + def update_issue_state(issue_id, state_name), do: store_module().update_issue_state(issue_id, state_name) + + defp store_module do + Application.get_env(:symphony_elixir, :codex_monitor_store_module, Store) + end +end diff --git a/elixir/test/support/test_support.exs b/elixir/test/support/test_support.exs index 484c1cae7..9269dfc80 100644 --- a/elixir/test/support/test_support.exs +++ b/elixir/test/support/test_support.exs @@ -97,6 +97,8 @@ defmodule SymphonyElixir.TestSupport do tracker_api_token: "token", tracker_project_slug: "project", tracker_assignee: nil, + tracker_database_path: nil, + tracker_workspace_id: nil, tracker_active_states: ["Todo", "In Progress"], tracker_terminal_states: ["Closed", "Cancelled", "Canceled", "Duplicate", "Done"], poll_interval_ms: 30_000, @@ -108,8 +110,8 @@ defmodule SymphonyElixir.TestSupport do max_retry_backoff_ms: 300_000, max_concurrent_agents_by_state: %{}, codex_command: "codex app-server", - codex_approval_policy: %{reject: %{sandbox_approval: true, rules: true, mcp_elicitations: true}}, - codex_thread_sandbox: "workspace-write", + codex_approval_policy: "never", + codex_thread_sandbox: "danger-full-access", codex_turn_sandbox_policy: nil, codex_turn_timeout_ms: 3_600_000, codex_read_timeout_ms: 5_000, @@ -134,6 +136,8 @@ defmodule SymphonyElixir.TestSupport do tracker_api_token = Keyword.get(config, :tracker_api_token) tracker_project_slug = Keyword.get(config, :tracker_project_slug) tracker_assignee = Keyword.get(config, :tracker_assignee) + tracker_database_path = Keyword.get(config, :tracker_database_path) + tracker_workspace_id = Keyword.get(config, :tracker_workspace_id) tracker_active_states = Keyword.get(config, :tracker_active_states) tracker_terminal_states = Keyword.get(config, :tracker_terminal_states) poll_interval_ms = Keyword.get(config, :poll_interval_ms) @@ -172,6 +176,8 @@ defmodule SymphonyElixir.TestSupport do " api_key: #{yaml_value(tracker_api_token)}", " project_slug: #{yaml_value(tracker_project_slug)}", " assignee: #{yaml_value(tracker_assignee)}", + " database_path: #{yaml_value(tracker_database_path)}", + " workspace_id: #{yaml_value(tracker_workspace_id)}", " active_states: #{yaml_value(tracker_active_states)}", " terminal_states: #{yaml_value(tracker_terminal_states)}", "polling:", diff --git a/elixir/test/symphony_elixir/core_test.exs b/elixir/test/symphony_elixir/core_test.exs index 2e3323938..dbf694536 100644 --- a/elixir/test/symphony_elixir/core_test.exs +++ b/elixir/test/symphony_elixir/core_test.exs @@ -86,6 +86,27 @@ defmodule SymphonyElixir.CoreTest do write_workflow_file!(Workflow.workflow_file_path(), tracker_kind: "123") assert {:error, {:unsupported_tracker_kind, "123"}} = Config.validate!() + + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: nil + ) + + assert {:error, :missing_codex_monitor_database_path} = Config.validate!() + + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: "/tmp/codex-monitor/tasks.db", + tracker_active_states: ["Todo", "In Progress", "Rework", "Merging"], + tracker_terminal_states: ["Done"] + ) + + assert :ok = Config.validate!() + assert Config.settings!().tracker.database_path == "/tmp/codex-monitor/tasks.db" end test "current WORKFLOW.md file is valid and complete" do @@ -750,6 +771,40 @@ defmodule SymphonyElixir.CoreTest do assert Orchestrator.select_worker_host_for_test(state, "worker-a") == "worker-a" end + test "dispatch claim state uses the first configured active state after todo" do + write_workflow_file!(Workflow.workflow_file_path(), + tracker_active_states: ["Todo", "Doing", "Human Review"], + tracker_terminal_states: ["Done"] + ) + + assert Orchestrator.dispatch_claim_state_name_for_test() == "Doing" + end + + test "claim_issue_for_dispatch_for_test uses the configured working state" do + write_workflow_file!(Workflow.workflow_file_path(), + tracker_active_states: ["Todo", "Doing", "Human Review"], + tracker_terminal_states: ["Done"] + ) + + issue = %Issue{id: "issue-claim-1", identifier: "MT-CLAIM-1", state: "Todo"} + test_pid = self() + + assert {:ok, %Issue{state: "Doing"} = claimed_issue} = + Orchestrator.claim_issue_for_dispatch_for_test( + issue, + fn issue_id, state_name -> + send(test_pid, {:update_issue_state_called, issue_id, state_name}) + :ok + end, + fn [issue_id] -> + {:ok, [%Issue{id: issue_id, identifier: "MT-CLAIM-1", state: "Doing"}]} + end + ) + + assert claimed_issue.id == issue.id + assert_received {:update_issue_state_called, "issue-claim-1", "Doing"} + end + defp assert_due_in_range(due_at_ms, min_remaining_ms, max_remaining_ms) do remaining_ms = due_at_ms - System.monotonic_time(:millisecond) @@ -883,7 +938,7 @@ defmodule SymphonyElixir.CoreTest do prompt = PromptBuilder.build_prompt(issue) - assert prompt =~ "You are working on a Linear issue." + assert prompt =~ "You are working on a tracked task." assert prompt =~ "Identifier: MT-777" assert prompt =~ "Title: Make fallback prompt useful" assert prompt =~ "Body:" @@ -1483,17 +1538,9 @@ defmodule SymphonyElixir.CoreTest do |> String.trim_leading("JSON:") |> Jason.decode!() |> then(fn payload -> - expected_approval_policy = %{ - "reject" => %{ - "sandbox_approval" => true, - "rules" => true, - "mcp_elicitations" => true - } - } - payload["method"] == "thread/start" && - get_in(payload, ["params", "approvalPolicy"]) == expected_approval_policy && - get_in(payload, ["params", "sandbox"]) == "workspace-write" && + get_in(payload, ["params", "approvalPolicy"]) == "never" && + get_in(payload, ["params", "sandbox"]) == "danger-full-access" && get_in(payload, ["params", "cwd"]) == canonical_workspace end) else @@ -1516,17 +1563,9 @@ defmodule SymphonyElixir.CoreTest do |> String.trim_leading("JSON:") |> Jason.decode!() |> then(fn payload -> - expected_approval_policy = %{ - "reject" => %{ - "sandbox_approval" => true, - "rules" => true, - "mcp_elicitations" => true - } - } - payload["method"] == "turn/start" && get_in(payload, ["params", "cwd"]) == canonical_workspace && - get_in(payload, ["params", "approvalPolicy"]) == expected_approval_policy && + get_in(payload, ["params", "approvalPolicy"]) == "never" && get_in(payload, ["params", "sandboxPolicy"]) == expected_turn_sandbox_policy end) else diff --git a/elixir/test/symphony_elixir/dynamic_tool_test.exs b/elixir/test/symphony_elixir/dynamic_tool_test.exs index 294471ed9..e9fc1526b 100644 --- a/elixir/test/symphony_elixir/dynamic_tool_test.exs +++ b/elixir/test/symphony_elixir/dynamic_tool_test.exs @@ -1,7 +1,29 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do use SymphonyElixir.TestSupport - alias SymphonyElixir.Codex.DynamicTool + alias SymphonyElixir.{Codex.DynamicTool, CodexMonitor.Store} + + defmodule FakeCodexMonitorStore do + def get_task_context(task_id) do + send(self(), {:get_task_context_called, task_id}) + {:ok, %{"task" => %{"id" => task_id, "state" => "In Progress"}}} + end + + def append_worklog(task_id, message) do + send(self(), {:append_worklog_called, task_id, message}) + :ok + end + + def update_issue_state(task_id, state_name) do + send(self(), {:update_issue_state_called, task_id, state_name}) + :ok + end + + def update_task_run(task_id, attrs) do + send(self(), {:update_task_run_called, task_id, attrs}) + :ok + end + end test "tool_specs advertises the linear_graphql input contract" do assert [ @@ -65,6 +87,230 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["contentItems"] == [%{"type" => "inputText", "text" => response["output"]}] end + test "tool_specs advertises codex_monitor_task when the CodexMonitor tracker is active" do + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: "/tmp/codex-monitor/tasks.db", + tracker_active_states: ["Todo", "In Progress", "Rework", "Merging"], + tracker_terminal_states: ["Done"] + ) + + assert [ + %{ + "description" => description, + "inputSchema" => %{"properties" => %{"action" => _}, "required" => ["action"]}, + "name" => "codex_monitor_task" + } + ] = DynamicTool.tool_specs() + + assert description =~ "CodexMonitor" + end + + test "codex_monitor_task defaults to the current issue id and updates run metadata" do + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: "/tmp/codex-monitor/tasks.db", + tracker_active_states: ["Todo", "In Progress", "Rework", "Merging"], + tracker_terminal_states: ["Done"] + ) + + response = + DynamicTool.execute( + "codex_monitor_task", + %{ + "action" => "update_run", + "branchName" => "feature/task-1", + "lastEvent" => "implementation_complete", + "tokenTotal" => 456 + }, + codex_monitor_store: FakeCodexMonitorStore, + issue: %{id: "task-1"}, + thread_id: "thread-1", + session_id: "thread-1-turn-1" + ) + + assert_received {:update_task_run_called, "task-1", attrs} + assert attrs[:branch_name] == "feature/task-1" + assert attrs[:last_event] == "implementation_complete" + assert attrs[:token_total] == 456 + assert attrs[:thread_id] == "thread-1" + assert attrs[:session_id] == "thread-1-turn-1" + assert_received {:get_task_context_called, "task-1"} + assert response["success"] == true + end + + test "codex_monitor_task updates state and appends worklog entries" do + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: "/tmp/codex-monitor/tasks.db", + tracker_active_states: ["Todo", "In Progress", "Rework", "Merging"], + tracker_terminal_states: ["Done"] + ) + + response = + DynamicTool.execute( + "codex_monitor_task", + %{ + "action" => "update_state", + "taskId" => "task-2", + "state" => "Human Review", + "message" => "Ready for operator review." + }, + codex_monitor_store: FakeCodexMonitorStore + ) + + assert_received {:update_issue_state_called, "task-2", "Human Review"} + assert_received {:append_worklog_called, "task-2", "Ready for operator review."} + assert_received {:get_task_context_called, "task-2"} + assert response["success"] == true + end + + test "codex monitor store creates the first task run from an empty task_runs table" do + db_path = create_codex_monitor_task_db!("task-1") + + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: db_path, + tracker_workspace_id: "ws-1", + tracker_active_states: ["Todo", "In Progress", "Rework", "Merging"], + tracker_terminal_states: ["Done"] + ) + + assert :ok = + Store.update_task_run("task-1", %{ + branch_name: "feature/task-1", + thread_id: "thread-1", + session_id: "thread-1-turn-1" + }) + + assert [ + %{ + "branch_name" => "feature/task-1", + "last_event" => nil, + "session_id" => "thread-1-turn-1", + "thread_id" => "thread-1" + } + ] = + sqlite_json!( + db_path, + """ + SELECT thread_id, branch_name, session_id, last_event + FROM task_runs + WHERE task_id = 'task-1' + ORDER BY started_at_ms DESC + LIMIT 1 + """ + ) + end + + test "codex monitor store preserves existing run fields across sparse updates" do + db_path = create_codex_monitor_task_db!("task-1") + + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: db_path, + tracker_workspace_id: "ws-1", + tracker_active_states: ["Todo", "In Progress", "Rework", "Merging"], + tracker_terminal_states: ["Done"] + ) + + assert :ok = + Store.update_task_run("task-1", %{ + branch_name: "feature/task-1", + thread_id: "thread-1", + session_id: "thread-1-turn-1", + retry_count: 2, + token_total: 456 + }) + + assert :ok = Store.update_task_run("task-1", %{last_event: "implementation_complete"}) + + assert [ + %{ + "branch_name" => "feature/task-1", + "last_event" => "implementation_complete", + "retry_count" => 2, + "session_id" => "thread-1-turn-1", + "thread_id" => "thread-1", + "token_total" => 456 + } + ] = + sqlite_json!( + db_path, + """ + SELECT thread_id, branch_name, session_id, last_event, retry_count, token_total + FROM task_runs + WHERE task_id = 'task-1' + ORDER BY started_at_ms DESC + LIMIT 1 + """ + ) + end + + test "codex monitor store creates a fresh task run when a new session starts" do + db_path = create_codex_monitor_task_db!("task-1") + + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: db_path, + tracker_workspace_id: "ws-1", + tracker_active_states: ["Todo", "In Progress", "Rework", "Merging"], + tracker_terminal_states: ["Done"] + ) + + assert :ok = + Store.update_task_run("task-1", %{ + branch_name: "feature/task-1", + thread_id: "thread-1", + session_id: "thread-1-turn-1", + token_total: 123 + }) + + Process.sleep(5) + + assert :ok = + Store.update_task_run("task-1", %{ + thread_id: "thread-2", + session_id: "thread-2-turn-1" + }) + + assert [ + %{ + "branch_name" => "feature/task-1", + "session_id" => "thread-1-turn-1", + "thread_id" => "thread-1", + "token_total" => 123 + }, + %{ + "branch_name" => "feature/task-1", + "session_id" => "thread-2-turn-1", + "thread_id" => "thread-2", + "token_total" => 123 + } + ] = + sqlite_json!( + db_path, + """ + SELECT thread_id, branch_name, session_id, token_total + FROM task_runs + WHERE task_id = 'task-1' + ORDER BY started_at_ms ASC + """ + ) + end + test "linear_graphql accepts a raw GraphQL query string" do test_pid = self() @@ -307,4 +553,59 @@ defmodule SymphonyElixir.Codex.DynamicToolTest do assert response["success"] == true assert response["output"] == ":ok" end + + defp create_codex_monitor_task_db!(task_id, workspace_id \\ "ws-1") do + sqlite3_binary = System.find_executable("sqlite3") || raise "sqlite3 is required for this test" + root = Path.join(System.tmp_dir!(), "symphony-codex-monitor-#{System.unique_integer([:positive])}") + db_path = Path.join(root, "tasks.db") + File.mkdir_p!(root) + + on_exit(fn -> File.rm_rf(root) end) + + sql = """ + PRAGMA journal_mode = WAL; + CREATE TABLE tasks ( + id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL, + order_index INTEGER NOT NULL, + created_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL + ); + CREATE TABLE task_runs ( + id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, + thread_id TEXT, + worktree_workspace_id TEXT, + branch_name TEXT, + pull_request_url TEXT, + session_id TEXT, + last_event TEXT, + last_message TEXT, + last_error TEXT, + retry_count INTEGER NOT NULL DEFAULT 0, + token_total INTEGER NOT NULL DEFAULT 0, + started_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL + ); + INSERT INTO tasks (id, workspace_id, title, description, status, order_index, created_at_ms, updated_at_ms) + VALUES ('#{task_id}', '#{workspace_id}', 'Task', '', 'in_progress', 0, 0, 0); + """ + + {_, 0} = System.cmd(sqlite3_binary, [db_path, sql], stderr_to_stdout: true) + db_path + end + + defp sqlite_json!(db_path, sql) do + sqlite3_binary = System.find_executable("sqlite3") || raise "sqlite3 is required for this test" + {output, 0} = System.cmd(sqlite3_binary, ["-json", db_path, sql], stderr_to_stdout: true) + + case String.trim(output) do + "" -> [] + trimmed -> Jason.decode!(trimmed) + end + end end diff --git a/elixir/test/symphony_elixir/extensions_test.exs b/elixir/test/symphony_elixir/extensions_test.exs index d6309c966..3ef2dd27b 100644 --- a/elixir/test/symphony_elixir/extensions_test.exs +++ b/elixir/test/symphony_elixir/extensions_test.exs @@ -5,6 +5,7 @@ defmodule SymphonyElixir.ExtensionsTest do import Phoenix.LiveViewTest alias SymphonyElixir.Linear.Adapter + alias SymphonyElixir.Tracker.CodexMonitor, as: CodexMonitorTracker alias SymphonyElixir.Tracker.Memory @endpoint SymphonyElixirWeb.Endpoint @@ -203,6 +204,15 @@ defmodule SymphonyElixir.ExtensionsTest do write_workflow_file!(Workflow.workflow_file_path(), tracker_kind: "linear") assert SymphonyElixir.Tracker.adapter() == Adapter + + write_workflow_file!(Workflow.workflow_file_path(), + tracker_kind: "codex_monitor", + tracker_api_token: nil, + tracker_project_slug: nil, + tracker_database_path: "/tmp/codex-monitor/tasks.db" + ) + + assert SymphonyElixir.Tracker.adapter() == CodexMonitorTracker end test "linear adapter delegates reads and validates mutation responses" do diff --git a/elixir/test/symphony_elixir/workspace_and_config_test.exs b/elixir/test/symphony_elixir/workspace_and_config_test.exs index 59ff0850b..f22fa9a5c 100644 --- a/elixir/test/symphony_elixir/workspace_and_config_test.exs +++ b/elixir/test/symphony_elixir/workspace_and_config_test.exs @@ -746,15 +746,8 @@ defmodule SymphonyElixir.WorkspaceAndConfigTest do assert config.agent.max_concurrent_agents == 10 assert config.codex.command == "codex app-server" - assert config.codex.approval_policy == %{ - "reject" => %{ - "sandbox_approval" => true, - "rules" => true, - "mcp_elicitations" => true - } - } - - assert config.codex.thread_sandbox == "workspace-write" + assert config.codex.approval_policy == "never" + assert config.codex.thread_sandbox == "danger-full-access" assert {:ok, canonical_default_workspace_root} = SymphonyElixir.PathSafety.canonicalize(Path.join(System.tmp_dir!(), "symphony_workspaces"))