diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index ef5caeba8..1f723f8ca 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -8,6 +8,18 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -622,6 +634,7 @@ dependencies = [ "objc2-foundation", "portable-pty", "reqwest 0.12.28", + "rusqlite", "serde", "serde_json", "sha2", @@ -1166,6 +1179,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + +[[package]] +name = "fallible-streaming-iterator" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" + [[package]] name = "fastrand" version = "2.3.0" @@ -1720,12 +1745,30 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +[[package]] +name = "hashlink" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" +dependencies = [ + "hashbrown 0.14.5", +] + [[package]] name = "heck" version = "0.4.1" @@ -2346,6 +2389,17 @@ dependencies = [ "redox_syscall 0.7.0", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libssh2-sys" version = "0.3.1" @@ -3950,6 +4004,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rusqlite" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" +dependencies = [ + "bitflags 2.11.0", + "fallible-iterator", + "fallible-streaming-iterator", + "hashlink", + "libsqlite3-sys", + "smallvec", +] + [[package]] name = "rustc-hash" version = "1.1.0" diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index af0979c11..4863068b1 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -43,6 +43,7 @@ libc = "0.2" chrono = { version = "0.4", features = ["clock"] } shell-words = "1.1" toml_edit = "0.20.2" +rusqlite = { version = "0.32", features = ["bundled"] } [target."cfg(not(any(target_os = \"android\", target_os = \"ios\")))".dependencies] tauri-plugin-updater = "2.10.0" diff --git a/src-tauri/resources/symphony/manifest.json b/src-tauri/resources/symphony/manifest.json new file mode 100644 index 000000000..6fecf32a8 --- /dev/null +++ b/src-tauri/resources/symphony/manifest.json @@ -0,0 +1,10 @@ +{ + "version": "0.1.0-codexmonitor", + "binaries": { + "darwin-aarch64": "codex_monitor_symphony", + "darwin-x86_64": "codex_monitor_symphony", + "linux-x86_64": "codex_monitor_symphony", + "windows-x86_64": "codex_monitor_symphony.exe" + }, + "notes": "Place pinned CodexMonitor Symphony release binaries in this directory for bundle builds." +} diff --git a/src-tauri/src/backend/events.rs b/src-tauri/src/backend/events.rs index 3fe03b2e7..92864c4db 100644 --- a/src-tauri/src/backend/events.rs +++ b/src-tauri/src/backend/events.rs @@ -1,6 +1,8 @@ use serde::Serialize; use serde_json::Value; +use crate::types::WorkspaceSymphonyEvent; + #[derive(Serialize, Clone)] pub(crate) struct AppServerEvent { pub(crate) workspace_id: String, @@ -28,4 +30,5 @@ pub(crate) trait EventSink: Clone + Send + Sync + 'static { fn emit_app_server_event(&self, event: AppServerEvent); fn emit_terminal_output(&self, event: TerminalOutput); fn emit_terminal_exit(&self, event: TerminalExit); + fn emit_workspace_symphony_event(&self, event: WorkspaceSymphonyEvent); } diff --git a/src-tauri/src/bin/codex_monitor_daemon.rs b/src-tauri/src/bin/codex_monitor_daemon.rs index 914043ac6..93bc5f90a 100644 --- a/src-tauri/src/bin/codex_monitor_daemon.rs +++ b/src-tauri/src/bin/codex_monitor_daemon.rs @@ -23,6 +23,8 @@ mod rules; mod shared; #[path = "../storage.rs"] mod storage; +#[path = "../symphony_binary.rs"] +mod symphony_binary; #[path = "codex_monitor_daemon/transport.rs"] mod transport; #[allow(dead_code)] @@ -82,13 +84,13 @@ use shared::process_core::kill_child_process_tree; use shared::prompts_core::{self, CustomPromptEntry}; use shared::{ agents_config_core, codex_aux_core, codex_core, files_core, git_core, git_ui_core, - local_usage_core, settings_core, workspaces_core, worktree_core, + local_usage_core, settings_core, symphony_core, workspaces_core, worktree_core, }; use storage::{read_settings, read_workspaces}; use types::{ AppSettings, GitCommitDiff, GitFileDiff, GitHubIssuesResponse, GitHubPullRequestComment, GitHubPullRequestDiff, GitHubPullRequestsResponse, GitLogResponse, LocalUsageSnapshot, - WorkspaceEntry, WorkspaceInfo, WorkspaceSettings, WorktreeSetupStatus, + WorkspaceEntry, WorkspaceInfo, WorkspaceSettings, WorkspaceSymphonyEvent, WorktreeSetupStatus, }; use workspace_settings::apply_workspace_settings_update; @@ -122,6 +124,7 @@ struct DaemonEventSink { #[derive(Clone)] enum DaemonEvent { AppServer(AppServerEvent), + WorkspaceSymphony(WorkspaceSymphonyEvent), #[allow(dead_code)] TerminalOutput(TerminalOutput), #[allow(dead_code)] @@ -140,6 +143,10 @@ impl EventSink for DaemonEventSink { fn emit_terminal_exit(&self, event: TerminalExit) { let _ = self.tx.send(DaemonEvent::TerminalExit(event)); } + + fn emit_workspace_symphony_event(&self, event: WorkspaceSymphonyEvent) { + let _ = self.tx.send(DaemonEvent::WorkspaceSymphony(event)); + } } struct DaemonConfig { @@ -158,6 +165,7 @@ struct DaemonState { event_sink: DaemonEventSink, codex_login_cancels: Mutex>, daemon_binary_path: Option, + symphony_runtimes: Arc, } #[derive(Serialize, Deserialize)] @@ -185,6 +193,7 @@ impl DaemonState { event_sink, codex_login_cancels: Mutex::new(HashMap::new()), daemon_binary_path, + symphony_runtimes: Arc::new(Mutex::new(HashMap::new())), } } @@ -757,8 +766,7 @@ impl DaemonState { limit: Option, sort_key: Option, ) -> Result { - codex_core::list_threads_core(&self.sessions, workspace_id, cursor, limit, sort_key) - .await + codex_core::list_threads_core(&self.sessions, workspace_id, cursor, limit, sort_key).await } async fn list_mcp_server_status( diff --git a/src-tauri/src/bin/codex_monitor_daemon/rpc.rs b/src-tauri/src/bin/codex_monitor_daemon/rpc.rs index a7ce31812..3c1614d65 100644 --- a/src-tauri/src/bin/codex_monitor_daemon/rpc.rs +++ b/src-tauri/src/bin/codex_monitor_daemon/rpc.rs @@ -41,6 +41,10 @@ fn build_event_notification(event: DaemonEvent) -> Option { "method": "app-server-event", "params": payload, }), + DaemonEvent::WorkspaceSymphony(payload) => json!({ + "method": "workspace-symphony-event", + "params": payload, + }), DaemonEvent::TerminalOutput(payload) => json!({ "method": "terminal-output", "params": payload, @@ -85,10 +89,7 @@ pub(super) fn parse_optional_string(value: &Value, key: &str) -> Option } } -pub(super) fn parse_optional_nullable_string( - value: &Value, - key: &str, -) -> Option> { +pub(super) fn parse_optional_nullable_string(value: &Value, key: &str) -> Option> { match value { Value::Object(map) => match map.get(key) { Some(Value::Null) => Some(None), diff --git a/src-tauri/src/bin/codex_monitor_daemon/rpc/workspace.rs b/src-tauri/src/bin/codex_monitor_daemon/rpc/workspace.rs index 3f18527f7..998624724 100644 --- a/src-tauri/src/bin/codex_monitor_daemon/rpc/workspace.rs +++ b/src-tauri/src/bin/codex_monitor_daemon/rpc/workspace.rs @@ -188,6 +188,218 @@ pub(super) async fn try_handle( .await, ) } + "get_workspace_symphony_status" => { + let request = parse_request_or_err!(params, workspace_rpc::WorkspaceIdRequest); + Some( + serialize_result(symphony_core::get_workspace_symphony_snapshot_core( + &state.symphony_runtimes, + &state.storage_path, + &request.workspace_id, + )) + .await, + ) + } + "start_workspace_symphony" => { + let request = parse_request_or_err!(params, workspace_rpc::WorkspaceIdRequest); + let binary_path = match symphony_binary::resolve_symphony_binary_path() { + Ok(path) => path, + Err(err) => return Some(Err(err)), + }; + Some( + serialize_result(symphony_core::start_workspace_symphony_core( + state.symphony_runtimes.clone(), + &state.workspaces, + &state.storage_path, + &request.workspace_id, + state.event_sink.clone(), + binary_path, + )) + .await, + ) + } + "stop_workspace_symphony" => { + let request = parse_request_or_err!(params, workspace_rpc::WorkspaceIdRequest); + Some( + match symphony_core::stop_workspace_symphony_core( + &state.symphony_runtimes, + &request.workspace_id, + ) + .await + { + Ok(status) => { + state + .event_sink + .emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id: request.workspace_id, + kind: "runtime_stopped".to_string(), + status: Some(status.clone()), + task: None, + telemetry: None, + message: None, + }); + serialize_value(status) + } + Err(err) => Err(err), + }, + ) + } + "list_workspace_symphony_tasks" => { + let request = parse_request_or_err!(params, workspace_rpc::WorkspaceIdRequest); + Some( + serialize_result(symphony_core::list_workspace_symphony_tasks_core( + &state.storage_path, + &request.workspace_id, + )) + .await, + ) + } + "create_workspace_symphony_task" => { + let request = + parse_request_or_err!(params, workspace_rpc::CreateWorkspaceSymphonyTaskRequest); + Some( + match symphony_core::create_workspace_symphony_task_core( + &state.storage_path, + &request.workspace_id, + request.input, + ) + .await + { + Ok(task) => { + state + .event_sink + .emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id: request.workspace_id, + kind: "task_created".to_string(), + status: None, + task: Some(task.clone()), + telemetry: None, + message: None, + }); + serialize_value(task) + } + Err(err) => Err(err), + }, + ) + } + "update_workspace_symphony_task" => { + let request = + parse_request_or_err!(params, workspace_rpc::UpdateWorkspaceSymphonyTaskRequest); + Some( + match symphony_core::update_workspace_symphony_task_core( + &state.storage_path, + &request.workspace_id, + request.input, + ) + .await + { + Ok(task) => { + state + .event_sink + .emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id: request.workspace_id, + kind: "task_updated".to_string(), + status: None, + task: Some(task.clone()), + telemetry: None, + message: None, + }); + serialize_value(task) + } + Err(err) => Err(err), + }, + ) + } + "move_workspace_symphony_task" => { + let request = + parse_request_or_err!(params, workspace_rpc::MoveWorkspaceSymphonyTaskRequest); + Some( + match symphony_core::move_workspace_symphony_task_core( + &state.storage_path, + &request.workspace_id, + request.input, + ) + .await + { + Ok(task) => { + state + .event_sink + .emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id: request.workspace_id, + kind: "task_moved".to_string(), + status: None, + task: Some(task.clone()), + telemetry: None, + message: None, + }); + serialize_value(task) + } + Err(err) => Err(err), + }, + ) + } + "delete_workspace_symphony_task" => { + let request = parse_request_or_err!(params, workspace_rpc::TaskIdRequest); + Some( + match symphony_core::delete_workspace_symphony_task_core( + &state.storage_path, + &request.workspace_id, + &request.task_id, + ) + .await + { + Ok(()) => { + state + .event_sink + .emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id: request.workspace_id, + kind: "task_deleted".to_string(), + status: None, + task: None, + telemetry: None, + message: Some(request.task_id), + }); + Ok(json!({ "ok": true })) + } + Err(err) => Err(err), + }, + ) + } + "get_workspace_symphony_telemetry" => { + let request = parse_request_or_err!(params, workspace_rpc::TaskIdRequest); + Some( + serialize_result(symphony_core::get_workspace_symphony_telemetry_core( + &state.storage_path, + &request.workspace_id, + &request.task_id, + )) + .await, + ) + } + "read_workspace_symphony_workflow_override" => { + let request = parse_request_or_err!(params, workspace_rpc::WorkspaceIdRequest); + Some( + serialize_result(symphony_core::read_workspace_symphony_workflow_override_core( + &state.workspaces, + &state.storage_path, + &request.workspace_id, + )) + .await, + ) + } + "write_workspace_symphony_workflow_override" => { + let request = parse_request_or_err!( + params, + workspace_rpc::WriteWorkspaceSymphonyWorkflowOverrideRequest + ); + Some( + serialize_ok(symphony_core::write_workspace_symphony_workflow_override_core( + &state.storage_path, + &request.workspace_id, + &request.content, + )) + .await, + ) + } "add_clone" => { let request = parse_request_or_err!(params, workspace_rpc::AddCloneRequest); Some( diff --git a/src-tauri/src/event_sink.rs b/src-tauri/src/event_sink.rs index f27b0d318..8415c70d5 100644 --- a/src-tauri/src/event_sink.rs +++ b/src-tauri/src/event_sink.rs @@ -1,6 +1,7 @@ use tauri::{AppHandle, Emitter}; use crate::backend::events::{AppServerEvent, EventSink, TerminalExit, TerminalOutput}; +use crate::types::WorkspaceSymphonyEvent; #[derive(Clone)] pub(crate) struct TauriEventSink { @@ -25,4 +26,8 @@ impl EventSink for TauriEventSink { fn emit_terminal_exit(&self, event: TerminalExit) { let _ = self.app.emit("terminal-exit", event); } + + fn emit_workspace_symphony_event(&self, event: WorkspaceSymphonyEvent) { + let _ = self.app.emit("workspace-symphony-event", event); + } } diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index daa52c483..f4022ba26 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -28,6 +28,8 @@ mod settings; mod shared; mod state; mod storage; +mod symphony; +mod symphony_binary; mod tailscale; #[cfg(desktop)] mod terminal; @@ -295,6 +297,17 @@ pub fn run() { dictation::dictation_stop, dictation::dictation_cancel, local_usage::local_usage_snapshot, + symphony::get_workspace_symphony_status, + symphony::start_workspace_symphony, + symphony::stop_workspace_symphony, + symphony::list_workspace_symphony_tasks, + symphony::create_workspace_symphony_task, + symphony::update_workspace_symphony_task, + symphony::move_workspace_symphony_task, + symphony::delete_workspace_symphony_task, + symphony::get_workspace_symphony_telemetry, + symphony::read_workspace_symphony_workflow_override, + symphony::write_workspace_symphony_workflow_override, notifications::is_macos_debug_build, notifications::app_build_type, notifications::send_notification_fallback, diff --git a/src-tauri/src/remote_backend/mod.rs b/src-tauri/src/remote_backend/mod.rs index 4ae3868d9..18c0fd631 100644 --- a/src-tauri/src/remote_backend/mod.rs +++ b/src-tauri/src/remote_backend/mod.rs @@ -150,6 +150,10 @@ fn can_retry_after_disconnect(method: &str) -> bool { | "apps_list" | "collaboration_mode_list" | "connect_workspace" + | "get_workspace_symphony_status" + | "read_workspace_symphony_workflow_override" + | "list_workspace_symphony_tasks" + | "get_workspace_symphony_telemetry" | "experimental_feature_list" | "set_workspace_runtime_codex_args" | "file_read" diff --git a/src-tauri/src/remote_backend/transport.rs b/src-tauri/src/remote_backend/transport.rs index 57e7a4cbf..708c8cde9 100644 --- a/src-tauri/src/remote_backend/transport.rs +++ b/src-tauri/src/remote_backend/transport.rs @@ -135,6 +135,9 @@ pub(crate) async fn dispatch_incoming_line( "app-server-event" => { let _ = app.emit("app-server-event", params); } + "workspace-symphony-event" => { + let _ = app.emit("workspace-symphony-event", params); + } "terminal-output" => { let _ = app.emit("terminal-output", params); } diff --git a/src-tauri/src/shared/mod.rs b/src-tauri/src/shared/mod.rs index 87e639fbf..d52a23b1a 100644 --- a/src-tauri/src/shared/mod.rs +++ b/src-tauri/src/shared/mod.rs @@ -12,6 +12,7 @@ pub(crate) mod local_usage_core; pub(crate) mod process_core; pub(crate) mod prompts_core; pub(crate) mod settings_core; +pub(crate) mod symphony_core; pub(crate) mod workspace_rpc; pub(crate) mod workspaces_core; pub(crate) mod worktree_core; diff --git a/src-tauri/src/shared/symphony_core.rs b/src-tauri/src/shared/symphony_core.rs new file mode 100644 index 000000000..4d817a77c --- /dev/null +++ b/src-tauri/src/shared/symphony_core.rs @@ -0,0 +1,1561 @@ +use std::collections::HashMap; +use std::fs::{self, File, OpenOptions}; +use std::io::{BufRead, BufReader as StdBufReader, Write}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use git2::Repository; +use rusqlite::{params, Connection, OptionalExtension}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::{Child, Command}; +use tokio::sync::Mutex; +use tokio::time::{sleep, Duration}; + +use crate::backend::events::EventSink; +use crate::files::io::{read_text_file_within, write_text_file_within, TextFileResponse}; +use crate::shared::process_core::kill_child_process_tree; +use crate::types::{ + CreateWorkspaceTaskInput, MoveWorkspaceTaskInput, UpdateWorkspaceTaskInput, WorkspaceEntry, + WorkspaceSymphonyEvent, WorkspaceSymphonyHealth, WorkspaceSymphonyRuntimeState, + WorkspaceSymphonySnapshot, WorkspaceSymphonyStatus, WorkspaceTask, WorkspaceTaskEvent, + WorkspaceTaskLiveRun, WorkspaceTaskRun, WorkspaceTaskStatus, WorkspaceTaskTelemetry, +}; + +#[derive(Debug)] +pub(crate) struct ManagedSymphonyRuntime { + pub(crate) child: Arc>, + pub(crate) status: WorkspaceSymphonyStatus, +} + +pub(crate) type SymphonyRuntimeRegistry = Mutex>; + +const TASK_DB_NAME: &str = "tasks.db"; +const WORKFLOW_FILE_NAME: &str = "WORKFLOW.codexmonitor.local.md"; +const WORKFLOW_OVERRIDE_FILE_NAME: &str = "WORKFLOW.override.md"; +const LOG_FILE_NAME: &str = "symphony.log"; +const SYMPHONY_GUARDRAILS_ACK_FLAG: &str = + "--i-understand-that-this-will-be-running-without-the-usual-guardrails"; +const SYMPHONY_STALE_HEALTH_THRESHOLD_MS: i64 = 15_000; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum WorkflowProfile { + Generic, + CodexMonitor, +} + +pub(crate) fn automation_root_for_workspace(storage_path: &Path, workspace_id: &str) -> PathBuf { + let data_dir = storage_path + .parent() + .map(Path::to_path_buf) + .unwrap_or_else(|| PathBuf::from(".")); + data_dir.join("automation").join(workspace_id) +} + +fn task_db_path(storage_path: &Path, workspace_id: &str) -> PathBuf { + automation_root_for_workspace(storage_path, workspace_id).join(TASK_DB_NAME) +} + +fn workflow_path(storage_path: &Path, workspace_id: &str) -> PathBuf { + automation_root_for_workspace(storage_path, workspace_id).join(WORKFLOW_FILE_NAME) +} + +fn log_path(storage_path: &Path, workspace_id: &str) -> PathBuf { + automation_root_for_workspace(storage_path, workspace_id).join(LOG_FILE_NAME) +} + +fn now_unix_ms() -> i64 { + chrono::Utc::now().timestamp_millis() +} + +fn derive_symphony_health(status: &WorkspaceSymphonyStatus) -> WorkspaceSymphonyHealth { + match status.state { + WorkspaceSymphonyRuntimeState::Error => WorkspaceSymphonyHealth::Error, + WorkspaceSymphonyRuntimeState::Stopped => WorkspaceSymphonyHealth::Stopped, + WorkspaceSymphonyRuntimeState::Starting => WorkspaceSymphonyHealth::Stale, + WorkspaceSymphonyRuntimeState::Running => { + if status.last_error.is_some() { + return WorkspaceSymphonyHealth::Error; + } + let now = now_unix_ms(); + match status.last_heartbeat_at_ms { + Some(last_heartbeat) + if now.saturating_sub(last_heartbeat) <= SYMPHONY_STALE_HEALTH_THRESHOLD_MS => + { + WorkspaceSymphonyHealth::Healthy + } + Some(_) => WorkspaceSymphonyHealth::Stale, + None => WorkspaceSymphonyHealth::Stale, + } + } + } +} + +fn hydrate_runtime_status(mut status: WorkspaceSymphonyStatus) -> WorkspaceSymphonyStatus { + status.uptime_ms = status + .started_at_ms + .map(|started_at_ms| now_unix_ms().saturating_sub(started_at_ms)); + status.health = derive_symphony_health(&status); + status +} + +fn automation_dir_ready(storage_path: &Path, workspace_id: &str) -> Result { + let root = automation_root_for_workspace(storage_path, workspace_id); + fs::create_dir_all(&root).map_err(|err| err.to_string())?; + Ok(root) +} + +fn open_connection(path: &Path) -> Result { + let connection = Connection::open(path).map_err(|err| err.to_string())?; + connection + .execute_batch( + " + PRAGMA journal_mode = WAL; + CREATE TABLE IF NOT EXISTS tasks ( + id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, + title TEXT NOT NULL, + description TEXT, + status TEXT NOT NULL, + order_index INTEGER NOT NULL, + created_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS task_events ( + id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, + message TEXT NOT NULL, + created_at_ms INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS task_runs ( + id TEXT PRIMARY KEY, + task_id TEXT NOT NULL, + workspace_id TEXT NOT NULL, + thread_id TEXT, + worktree_workspace_id TEXT, + branch_name TEXT, + pull_request_url TEXT, + session_id TEXT, + last_event TEXT, + last_message TEXT, + last_error TEXT, + retry_count INTEGER NOT NULL DEFAULT 0, + token_total INTEGER NOT NULL DEFAULT 0, + started_at_ms INTEGER NOT NULL, + updated_at_ms INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS runtime_snapshots ( + id TEXT PRIMARY KEY, + workspace_id TEXT NOT NULL, + payload_json TEXT NOT NULL, + created_at_ms INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_tasks_workspace_status_order + ON tasks(workspace_id, status, order_index); + CREATE INDEX IF NOT EXISTS idx_task_events_task_created + ON task_events(task_id, created_at_ms DESC); + CREATE INDEX IF NOT EXISTS idx_task_runs_task_started + ON task_runs(task_id, started_at_ms DESC); + ", + ) + .map_err(|err| err.to_string())?; + Ok(connection) +} + +fn ensure_task_store(storage_path: &Path, workspace_id: &str) -> Result { + let _ = automation_dir_ready(storage_path, workspace_id)?; + let db_path = task_db_path(storage_path, workspace_id); + let _ = open_connection(&db_path)?; + Ok(db_path) +} + +fn status_from_str(value: &str) -> WorkspaceTaskStatus { + match value { + "todo" => WorkspaceTaskStatus::Todo, + "in_progress" => WorkspaceTaskStatus::InProgress, + "human_review" => WorkspaceTaskStatus::HumanReview, + "rework" => WorkspaceTaskStatus::Rework, + "merging" => WorkspaceTaskStatus::Merging, + "done" => WorkspaceTaskStatus::Done, + _ => WorkspaceTaskStatus::Backlog, + } +} + +fn can_user_move_task(from: &WorkspaceTaskStatus, to: &WorkspaceTaskStatus) -> bool { + let _ = from; + let _ = to; + true +} + +fn next_order_index( + connection: &Connection, + workspace_id: &str, + status: &WorkspaceTaskStatus, +) -> Result { + let max = connection + .query_row( + "SELECT COALESCE(MAX(order_index), -1) FROM tasks WHERE workspace_id = ?1 AND status = ?2", + params![workspace_id, status.as_str()], + |row| row.get::<_, i64>(0), + ) + .map_err(|err| err.to_string())?; + Ok(max + 1) +} + +fn append_task_event( + connection: &Connection, + workspace_id: &str, + task_id: &str, + message: &str, +) -> Result<(), String> { + connection + .execute( + "INSERT INTO task_events (id, task_id, workspace_id, message, created_at_ms) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + uuid::Uuid::new_v4().to_string(), + task_id, + workspace_id, + message, + now_unix_ms() + ], + ) + .map_err(|err| err.to_string())?; + Ok(()) +} + +fn read_active_run( + connection: &Connection, + workspace_id: &str, + task_id: &str, +) -> Result, String> { + connection + .query_row( + "SELECT id, task_id, workspace_id, thread_id, worktree_workspace_id, branch_name, + pull_request_url, session_id, last_event, last_message, last_error, + retry_count, token_total, started_at_ms, updated_at_ms + FROM task_runs + WHERE workspace_id = ?1 AND task_id = ?2 + ORDER BY started_at_ms DESC + LIMIT 1", + params![workspace_id, task_id], + |row| { + Ok(WorkspaceTaskRun { + id: row.get(0)?, + task_id: row.get(1)?, + workspace_id: row.get(2)?, + thread_id: row.get(3)?, + worktree_workspace_id: row.get(4)?, + branch_name: row.get(5)?, + pull_request_url: row.get(6)?, + session_id: row.get(7)?, + last_event: row.get(8)?, + last_message: row.get(9)?, + last_error: row.get(10)?, + retry_count: row.get(11)?, + token_total: row.get(12)?, + started_at_ms: row.get(13)?, + updated_at_ms: row.get(14)?, + }) + }, + ) + .optional() + .map_err(|err| err.to_string()) +} + +fn read_task_claimed_at_ms( + connection: &Connection, + workspace_id: &str, + task_id: &str, +) -> Result, String> { + connection + .query_row( + "SELECT created_at_ms + FROM task_events + WHERE workspace_id = ?1 AND task_id = ?2 + AND ( + message = 'Symphony moved the task to In Progress.' + OR message LIKE 'Task moved to in_progress.%' + ) + ORDER BY created_at_ms DESC + LIMIT 1", + params![workspace_id, task_id], + |row| row.get::<_, i64>(0), + ) + .optional() + .map_err(|err| err.to_string()) +} + +fn strip_terminal_control_sequences(input: &str) -> String { + let bytes = input.as_bytes(); + let mut output = Vec::with_capacity(input.len()); + let mut index = 0usize; + + while index < bytes.len() { + if bytes[index] == 0x1b { + index += 1; + if index >= bytes.len() { + break; + } + if bytes[index] == b'[' { + index += 1; + while index < bytes.len() { + let byte = bytes[index]; + index += 1; + if (0x40..=0x7e).contains(&byte) { + break; + } + } + continue; + } + continue; + } + output.push(bytes[index]); + index += 1; + } + + String::from_utf8_lossy(&output).into_owned() +} + +fn parse_count_token(value: &str) -> Option { + let digits = value + .chars() + .filter(|character| character.is_ascii_digit()) + .collect::(); + if digits.is_empty() { + None + } else { + digits.parse::().ok() + } +} + +fn parse_runtime_status_metrics( + line: &str, +) -> Option<(Option<(usize, usize)>, Option<(i64, i64, i64)>)> { + let sanitized = strip_terminal_control_sequences(line); + let trimmed = sanitized.trim(); + if trimmed.is_empty() { + return None; + } + + let agents = trimmed + .split_once("Agents:") + .and_then(|(_, suffix)| suffix.trim().split_once('/')) + .and_then(|(active, max)| { + let active = active.trim().parse::().ok()?; + let max = max + .split_whitespace() + .next() + .and_then(|value| value.parse::().ok())?; + Some((active, max)) + }); + + let tokens = trimmed.split_once("Tokens:").and_then(|(_, suffix)| { + let parts = suffix.split('|').map(str::trim).collect::>(); + if parts.len() < 3 { + return None; + } + let input_tokens = parts + .first() + .and_then(|value| value.strip_prefix("in")) + .and_then(parse_count_token)?; + let output_tokens = parts + .get(1) + .and_then(|value| value.strip_prefix("out")) + .and_then(parse_count_token)?; + let total_tokens = parts + .get(2) + .and_then(|value| value.strip_prefix("total")) + .and_then(parse_count_token)?; + Some((input_tokens, output_tokens, total_tokens)) + }); + + (agents.is_some() || tokens.is_some()).then_some((agents, tokens)) +} + +fn parse_task_live_run_from_dashboard_line( + line: &str, + task_id: &str, + claimed_at_ms: Option, +) -> Option { + let sanitized = strip_terminal_control_sequences(line); + let trimmed = sanitized.trim(); + if trimmed.is_empty() || !trimmed.contains("...") { + return None; + } + + let tokens = trimmed.split_whitespace().collect::>(); + let id_index = tokens.iter().position(|token| { + token.ends_with("...") && task_id.starts_with(token.trim_end_matches("...")) + })?; + let pid_index = tokens + .iter() + .enumerate() + .skip(id_index + 1) + .find_map(|(index, token)| { + token + .chars() + .all(|character| character.is_ascii_digit()) + .then_some(index) + })?; + let slash_index = tokens + .iter() + .enumerate() + .skip(pid_index + 1) + .find_map(|(index, token)| (*token == "/").then_some(index))?; + + let stage = tokens + .get(id_index + 1..pid_index)? + .join(" ") + .trim() + .to_string(); + if stage.is_empty() { + return None; + } + + let age_label = tokens + .get(pid_index + 1..slash_index) + .map(|slice| slice.join(" ")) + .filter(|value| !value.trim().is_empty()); + let turn_count = tokens + .get(slash_index + 1) + .and_then(|value| value.parse::().ok()); + let token_total = tokens + .get(slash_index + 2) + .map(|value| value.replace(',', "")) + .and_then(|value| value.parse::().ok()); + let session_id = tokens.get(slash_index + 3).map(|value| (*value).to_string()); + let current_event = if tokens.len() > slash_index + 4 { + Some(tokens[slash_index + 4..].join(" ")) + } else { + None + }; + + Some(WorkspaceTaskLiveRun { + stage, + agent_pid: tokens + .get(pid_index) + .and_then(|value| value.parse::().ok()), + age_label, + turn_count, + token_total, + session_id, + current_event, + claimed_at_ms, + observed_at_ms: now_unix_ms(), + }) +} + +fn read_live_task_run_from_log( + storage_path: &Path, + workspace_id: &str, + task_id: &str, + claimed_at_ms: Option, +) -> Option { + let log_file = log_path(storage_path, workspace_id); + let contents = fs::read_to_string(log_file).ok()?; + for line in contents.lines().rev() { + if let Some(parsed) = parse_task_live_run_from_dashboard_line(line, task_id, claimed_at_ms) + { + return Some(parsed); + } + } + None +} + +fn read_task( + connection: &Connection, + workspace_id: &str, + task_id: &str, +) -> Result { + let mut task = connection + .query_row( + "SELECT id, workspace_id, title, description, status, order_index, created_at_ms, updated_at_ms + FROM tasks + WHERE workspace_id = ?1 AND id = ?2", + params![workspace_id, task_id], + |row| { + Ok(WorkspaceTask { + id: row.get(0)?, + workspace_id: row.get(1)?, + title: row.get(2)?, + description: row.get(3)?, + status: status_from_str(&row.get::<_, String>(4)?), + order_index: row.get(5)?, + created_at_ms: row.get(6)?, + updated_at_ms: row.get(7)?, + active_run: None, + }) + }, + ) + .map_err(|err| err.to_string())?; + task.active_run = read_active_run(connection, workspace_id, task_id)?; + Ok(task) +} + +fn list_tasks_from_db( + storage_path: &Path, + workspace_id: &str, +) -> Result, String> { + let db_path = ensure_task_store(storage_path, workspace_id)?; + let connection = open_connection(&db_path)?; + let mut statement = connection + .prepare( + "SELECT id, workspace_id, title, description, status, order_index, created_at_ms, updated_at_ms + FROM tasks + WHERE workspace_id = ?1 + ORDER BY + CASE status + WHEN 'backlog' THEN 0 + WHEN 'todo' THEN 1 + WHEN 'in_progress' THEN 2 + WHEN 'human_review' THEN 3 + WHEN 'rework' THEN 4 + WHEN 'merging' THEN 5 + ELSE 6 + END, + order_index, + updated_at_ms DESC", + ) + .map_err(|err| err.to_string())?; + + let task_rows = statement + .query_map(params![workspace_id], |row| { + Ok(WorkspaceTask { + id: row.get(0)?, + workspace_id: row.get(1)?, + title: row.get(2)?, + description: row.get(3)?, + status: status_from_str(&row.get::<_, String>(4)?), + order_index: row.get(5)?, + created_at_ms: row.get(6)?, + updated_at_ms: row.get(7)?, + active_run: None, + }) + }) + .map_err(|err| err.to_string())?; + + let mut tasks = Vec::new(); + for row in task_rows { + let mut task = row.map_err(|err| err.to_string())?; + task.active_run = read_active_run(&connection, workspace_id, &task.id)?; + tasks.push(task); + } + Ok(tasks) +} + +fn update_runtime_snapshot( + storage_path: &Path, + workspace_id: &str, + payload_json: &str, +) -> Result<(), String> { + let db_path = ensure_task_store(storage_path, workspace_id)?; + let connection = open_connection(&db_path)?; + connection + .execute( + "DELETE FROM runtime_snapshots WHERE workspace_id = ?1", + params![workspace_id], + ) + .map_err(|err| err.to_string())?; + connection + .execute( + "INSERT INTO runtime_snapshots (id, workspace_id, payload_json, created_at_ms) + VALUES (?1, ?2, ?3, ?4)", + params![ + uuid::Uuid::new_v4().to_string(), + workspace_id, + payload_json, + now_unix_ms() + ], + ) + .map_err(|err| err.to_string())?; + Ok(()) +} + +async fn apply_runtime_status_update( + runtimes: Arc, + storage_path: PathBuf, + workspace_id: String, + line: &str, +) -> Option { + let parsed = parse_runtime_status_metrics(line); + let mut status_to_emit = None; + { + let mut guard = runtimes.lock().await; + if let Some(runtime) = guard.get_mut(&workspace_id) { + runtime.status.last_heartbeat_at_ms = Some(now_unix_ms()); + runtime.status.last_activity_at_ms = Some(now_unix_ms()); + if let Some((agents, tokens)) = parsed { + if let Some((active_agents, max_agents)) = agents { + runtime.status.active_agents = active_agents; + runtime.status.max_agents = max_agents; + } + if let Some((input_tokens, output_tokens, total_tokens)) = tokens { + runtime.status.input_tokens = input_tokens; + runtime.status.output_tokens = output_tokens; + runtime.status.total_tokens = total_tokens; + } + } + runtime.status = hydrate_runtime_status(runtime.status.clone()); + status_to_emit = Some(runtime.status.clone()); + } + } + if let Some(status) = status_to_emit.as_ref() { + let payload = serde_json::to_string(status).unwrap_or_else(|_| "{}".to_string()); + let _ = update_runtime_snapshot(&storage_path, &workspace_id, &payload); + } + status_to_emit +} + +fn probe_binary_version(binary_path: &Path) -> Option { + std::process::Command::new(binary_path) + .arg("--version") + .output() + .ok() + .and_then(|output| { + if output.status.success() { + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); + if stdout.is_empty() { + None + } else { + Some(stdout) + } + } else { + None + } + }) +} + +fn is_escript_wrapper(binary_path: &Path) -> bool { + let file = match File::open(binary_path) { + Ok(file) => file, + Err(_) => return false, + }; + let mut reader = StdBufReader::new(file); + let mut first_line = Vec::new(); + if reader.read_until(b'\n', &mut first_line).is_err() { + return false; + } + String::from_utf8_lossy(&first_line).contains("escript") +} + +fn candidate_escript_search_dirs() -> Vec { + let mut dirs = Vec::new(); + + if let Some(path_var) = std::env::var_os("PATH") { + dirs.extend(std::env::split_paths(&path_var)); + } + + #[cfg(target_os = "macos")] + { + dirs.push(PathBuf::from("/opt/homebrew/bin")); + dirs.push(PathBuf::from("/usr/local/bin")); + dirs.push(PathBuf::from("/usr/bin")); + } + + if let Some(home) = std::env::var_os("HOME").map(PathBuf::from) { + let mise_erlang_root = home.join(".local/share/mise/installs/erlang"); + dirs.push(mise_erlang_root.join("latest").join("bin")); + + if let Ok(entries) = std::fs::read_dir(&mise_erlang_root) { + let mut version_dirs = entries + .filter_map(Result::ok) + .map(|entry| entry.path()) + .filter(|path| path.is_dir()) + .collect::>(); + version_dirs.sort(); + version_dirs.reverse(); + for version_dir in version_dirs { + dirs.push(version_dir.join("bin")); + } + } + } + + let mut unique = Vec::new(); + for dir in dirs { + if !unique.iter().any(|existing| existing == &dir) { + unique.push(dir); + } + } + unique +} + +fn resolve_escript_path() -> Option { + candidate_escript_search_dirs() + .into_iter() + .map(|dir| dir.join("escript")) + .find(|path| path.is_file()) +} + +fn build_symphony_launch_command( + binary_path: &Path, + workflow_path: &Path, +) -> Result { + if is_escript_wrapper(binary_path) { + if let Some(escript_path) = resolve_escript_path() { + let mut command = Command::new(escript_path); + command + .arg(binary_path) + .arg(SYMPHONY_GUARDRAILS_ACK_FLAG) + .arg(workflow_path); + return Ok(command); + } + + return Err( + "Unable to resolve `escript` needed to launch the Symphony debug binary." + .to_string(), + ); + } + + let mut command = Command::new(binary_path); + command.arg(SYMPHONY_GUARDRAILS_ACK_FLAG).arg(workflow_path); + Ok(command) +} + +fn build_default_workflow_content(storage_path: &Path, entry: &WorkspaceEntry) -> String { + let workspace_root = automation_root_for_workspace(storage_path, &entry.id).join("workspaces"); + let db_path = task_db_path(storage_path, &entry.id); + let profile = detect_workflow_profile(entry); + let repo_name = workspace_repo_name(entry); + let clone_source = workspace_clone_source(entry); + let max_concurrent_agents = match profile { + WorkflowProfile::Generic => 1, + WorkflowProfile::CodexMonitor => 10, + }; + let codex_command = match profile { + WorkflowProfile::Generic => "codex app-server", + WorkflowProfile::CodexMonitor => { + "codex --config shell_environment_policy.inherit=all --config model_reasoning_effort=xhigh app-server" + } + }; + let profile_guidance = match profile { + WorkflowProfile::Generic => format!( + "## Repository-specific guidance\n\n- Read `AGENTS.md` if present before changing architecture or running broad validation.\n- Preserve the existing architecture and external contracts unless the task explicitly requires a change.\n- Prefer focused validation that proves the changed behavior directly.\n- Keep edits scoped to the current repository copy at `{}`.\n", + entry.path + ), + WorkflowProfile::CodexMonitor => "- Read `AGENTS.md`, `docs/codebase-map.md`, and `README.md` before changing architecture or running broad validation.\n- Preserve the app and daemon shared-core architecture described in `AGENTS.md`.\n- Keep backend behavior changes in `src-tauri/src/shared/*` first when logic is cross-runtime.\n- Keep frontend IPC in `src/services/tauri.ts` and preserve app/daemon parity.\n- Use the existing validation matrix from `AGENTS.md`: always run `npm run typecheck`, run `npm run test` for frontend behavior changes, and run `cd src-tauri && cargo check` for Rust changes.\n- Avoid unrelated refactors in `src/App.tsx` and the listed hotspot files unless the task demands it.\n".to_string(), + }; + + format!( + "---\ntracker:\n kind: codex_monitor\n database_path: {}\n workspace_id: {}\n active_states:\n - Todo\n - In Progress\n - Human Review\n - Rework\n - Merging\n terminal_states:\n - Done\npolling:\n interval_ms: 5000\nworkspace:\n root: {}\nhooks:\n after_create: |\n git clone --reference-if-able {} {} .\nagent:\n max_concurrent_agents: {}\n max_turns: 20\ncodex:\n command: {}\n approval_policy: never\n thread_sandbox: danger-full-access\n turn_sandbox_policy:\n type: dangerFullAccess\n---\n\nYou are working on a CodexMonitor local task `{{{{ issue.identifier }}}}` for the `{}` repository.\n\n{{% if attempt %}}\nContinuation context:\n\n- This is retry attempt #{{{{ attempt }}}} because the task is still in an active state.\n- Resume from the current workspace state instead of restarting from scratch.\n- Do not repeat already-completed investigation or validation unless needed for new code changes.\n- Do not stop while the task remains active unless you are blocked by missing required tools, auth, permissions, or secrets.\n{{% endif %}}\n\nTask context:\nIdentifier: {{{{ issue.identifier }}}}\nTitle: {{{{ issue.title }}}}\nCurrent status: {{{{ issue.state }}}}\n\nDescription:\n{{% if issue.description %}}\n{{{{ issue.description }}}}\n{{% else %}}\nNo description provided.\n{{% endif %}}\n\nInstructions:\n\n1. This is an unattended orchestration session. Never ask a human to perform follow-up actions.\n2. Only stop early for a true blocker: missing required auth, permissions, tools, or secrets.\n3. Final message must report completed actions and blockers only. Do not include user follow-up steps.\n4. Work only in the provided repository copy. Do not touch any other path.\n5. Use the `codex_monitor_task` dynamic tool as your source of truth for task state, worklog, and run telemetry.\n\n## Default posture\n\n- Start by reading the current task, the repo state, and the task status, then follow the matching flow for that status.\n- Reproduce first and record the concrete signal in the task worklog before editing code.\n- Keep the task worklog current with your plan, validation evidence, blockers, and final outcome.\n- Treat any task-authored `Validation`, `Test Plan`, or `Testing` section as mandatory.\n- Operate autonomously end-to-end unless blocked by missing tools, auth, or secrets.\n\n## Status map\n\n- `Backlog` -> out of scope for this workflow; do not modify.\n- `Todo` -> claim and continue. Symphony moves claimed Todo tasks to `In Progress` automatically.\n- `In Progress` -> continue implementation, validation, and PR work.\n- `Human Review` -> do not code by default; poll for review outcomes and wait for human approval.\n- `Rework` -> reviewer requested changes; resume implementation, validation, and PR updates.\n- `Merging` -> approved by human; land the change using the repository's documented landing flow, then move the task to `Done`.\n- `Done` -> terminal state; do nothing and exit.\n\n## Execution rules\n\n1. Start by calling `codex_monitor_task` with `get_task` and keep the task state aligned with the real work.\n2. Use `append_worklog` to record meaningful milestones instead of keeping progress only in your head.\n3. Use `update_run` whenever branch, PR, worktree, session, or other run metadata changes materially.\n4. Before editing code:\n - record reproduction evidence in the worklog\n - sync with latest `origin/main` when the repository uses it and note the result in the worklog\n5. During implementation:\n - keep the plan and validation notes current in the worklog\n - address actionable PR feedback if a PR already exists\n - rerun validation after feedback-driven changes\n6. Before moving to `Human Review`:\n - required validation is green\n - required acceptance items are satisfied\n - task state and run metadata are current\n - the worklog clearly explains what changed and how it was validated\n7. In `Human Review`, do not change code unless new review feedback requires another implementation pass. If that happens, move the task to `Rework` and continue from the existing workspace state.\n8. In `Rework`, address the requested changes, rerun the relevant validation, update the PR if present, and return to `Human Review` only when feedback is resolved.\n9. In `Merging`, follow the repository landing flow instead of waiting for more implementation work; after merge is complete, move the task to `Done`.\n\n{}\n", + db_path.display(), + entry.id, + workspace_root.display(), + entry.path, + clone_source, + max_concurrent_agents, + codex_command, + repo_name, + profile_guidance + ) +} + +fn workspace_repo_name(entry: &WorkspaceEntry) -> String { + Path::new(&entry.path) + .file_name() + .and_then(|value| value.to_str()) + .filter(|value| !value.trim().is_empty()) + .unwrap_or(entry.name.as_str()) + .to_string() +} + +fn detect_workflow_profile(entry: &WorkspaceEntry) -> WorkflowProfile { + let repo_name = workspace_repo_name(entry).to_ascii_lowercase(); + let workspace_name = entry.name.to_ascii_lowercase(); + if repo_name == "codexmonitor" + || repo_name == "codex-monitor" + || workspace_name == "codexmonitor" + || workspace_name == "codex monitor" + { + WorkflowProfile::CodexMonitor + } else { + WorkflowProfile::Generic + } +} + +fn workspace_clone_source(entry: &WorkspaceEntry) -> String { + preferred_remote_clone_source(Path::new(&entry.path)).unwrap_or_else(|| entry.path.clone()) +} + +fn preferred_remote_clone_source(repo_root: &Path) -> Option { + let repo = Repository::open(repo_root).ok()?; + let remotes = repo.remotes().ok()?; + let mut names = Vec::new(); + + if remotes.iter().any(|remote| remote == Some("origin")) { + names.push("origin".to_string()); + } + + for name in remotes.iter().flatten() { + if name != "origin" { + names.push(name.to_string()); + } + } + + names.into_iter().find_map(|name| { + let remote = repo.find_remote(&name).ok()?; + let url = remote.url()?.trim(); + if url.is_empty() || is_local_clone_source(url) { + None + } else { + Some(url.to_string()) + } + }) +} + +fn is_local_clone_source(source: &str) -> bool { + let trimmed = source.trim(); + if trimmed.is_empty() { + return true; + } + + trimmed.starts_with("file://") + || trimmed.starts_with('/') + || trimmed.starts_with("./") + || trimmed.starts_with("../") +} + +fn read_workflow_override_response( + storage_path: &Path, + entry: &WorkspaceEntry, +) -> Result { + let root = automation_root_for_workspace(storage_path, &entry.id); + let response = read_text_file_within( + &root, + WORKFLOW_OVERRIDE_FILE_NAME, + true, + "Symphony workspace automation directory", + "Symphony workflow override", + false, + )?; + if response.exists { + return Ok(response); + } + Ok(TextFileResponse { + exists: false, + content: build_default_workflow_content(storage_path, entry), + truncated: false, + }) +} + +fn write_workflow_override( + storage_path: &Path, + workspace_id: &str, + content: &str, +) -> Result<(), String> { + let root = automation_root_for_workspace(storage_path, workspace_id); + write_text_file_within( + &root, + WORKFLOW_OVERRIDE_FILE_NAME, + content, + true, + "Symphony workspace automation directory", + "Symphony workflow override", + false, + ) +} + +fn write_generated_workflow( + storage_path: &Path, + entry: &WorkspaceEntry, +) -> Result { + let path = workflow_path(storage_path, &entry.id); + let workspace_root = automation_root_for_workspace(storage_path, &entry.id).join("workspaces"); + fs::create_dir_all(&workspace_root).map_err(|err| err.to_string())?; + let content = read_workflow_override_response(storage_path, entry)?.content; + fs::write(&path, content).map_err(|err| err.to_string())?; + Ok(path) +} + +async fn append_log_line(log_path: PathBuf, line: String) { + if let Ok(mut file) = OpenOptions::new().create(true).append(true).open(log_path) { + let _ = writeln!(file, "{line}"); + } +} + +async fn spawn_process_watchers( + runtimes: Arc, + storage_path: PathBuf, + workspace_id: String, + child: Arc>, + log_path: PathBuf, + event_sink: impl EventSink, +) { + let stdout = { + let mut locked = child.lock().await; + locked.stdout.take() + }; + let stderr = { + let mut locked = child.lock().await; + locked.stderr.take() + }; + + if let Some(stdout) = stdout { + let runtimes_clone = Arc::clone(&runtimes); + let storage_path_clone = storage_path.clone(); + let event_sink_clone = event_sink.clone(); + let log_path_clone = log_path.clone(); + let workspace_id_clone = workspace_id.clone(); + tokio::spawn(async move { + let mut lines = BufReader::new(stdout).lines(); + while let Ok(Some(line)) = lines.next_line().await { + append_log_line(log_path_clone.clone(), line.clone()).await; + let status = apply_runtime_status_update( + Arc::clone(&runtimes_clone), + storage_path_clone.clone(), + workspace_id_clone.clone(), + &line, + ) + .await; + event_sink_clone.emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id: workspace_id_clone.clone(), + kind: "heartbeat".to_string(), + status, + task: None, + telemetry: None, + message: None, + }); + } + }); + } + + if let Some(stderr) = stderr { + let runtimes_clone = Arc::clone(&runtimes); + let storage_path_clone = storage_path.clone(); + let event_sink_clone = event_sink.clone(); + let log_path_clone = log_path.clone(); + let workspace_id_clone = workspace_id.clone(); + tokio::spawn(async move { + let mut lines = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = lines.next_line().await { + append_log_line(log_path_clone.clone(), line.clone()).await; + let status = apply_runtime_status_update( + Arc::clone(&runtimes_clone), + storage_path_clone.clone(), + workspace_id_clone.clone(), + &line, + ) + .await; + event_sink_clone.emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id: workspace_id_clone.clone(), + kind: "heartbeat".to_string(), + status, + task: None, + telemetry: None, + message: None, + }); + } + }); + } + + let event_sink_clone = event_sink.clone(); + tokio::spawn(async move { + let exit_status = loop { + let status = { + let mut locked = child.lock().await; + locked.try_wait().ok().flatten() + }; + if status.is_some() { + break status; + } + sleep(Duration::from_millis(250)).await; + }; + let mut status_to_emit = None; + { + let mut guard = runtimes.lock().await; + if let Some(runtime) = guard.remove(&workspace_id) { + let mut status = runtime.status; + status.state = WorkspaceSymphonyRuntimeState::Stopped; + status.pid = None; + status.active_agents = 0; + if let Some(exit_status) = exit_status { + if !exit_status.success() { + status.last_error = + Some(format!("Symphony exited with status {exit_status}")); + } + } + status_to_emit = Some(hydrate_runtime_status(status)); + } + } + if let Some(status) = status_to_emit { + event_sink_clone.emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id, + kind: "runtime_stopped".to_string(), + status: Some(status), + task: None, + telemetry: None, + message: None, + }); + } + }); +} + +pub(crate) async fn get_workspace_symphony_status_core( + runtimes: &SymphonyRuntimeRegistry, + storage_path: &Path, + workspace_id: &str, +) -> Result { + let tasks = list_tasks_from_db(storage_path, workspace_id).unwrap_or_default(); + let active_tasks = tasks + .iter() + .filter(|task| { + matches!( + task.status, + WorkspaceTaskStatus::InProgress + | WorkspaceTaskStatus::HumanReview + | WorkspaceTaskStatus::Rework + | WorkspaceTaskStatus::Merging + ) + }) + .count(); + let retrying_tasks = tasks + .iter() + .filter(|task| { + task.active_run + .as_ref() + .and_then(|run| run.last_error.as_ref()) + .is_some() + }) + .count(); + + let mut guard = runtimes.lock().await; + if let Some(runtime) = guard.get_mut(workspace_id) { + runtime.status.total_tasks = tasks.len(); + runtime.status.active_tasks = active_tasks; + runtime.status.retrying_tasks = retrying_tasks; + runtime.status = hydrate_runtime_status(runtime.status.clone()); + return Ok(runtime.status.clone()); + } + + Ok(hydrate_runtime_status(WorkspaceSymphonyStatus { + workspace_id: workspace_id.to_string(), + state: WorkspaceSymphonyRuntimeState::Stopped, + health: WorkspaceSymphonyHealth::Stopped, + binary_path: None, + binary_version: None, + pid: None, + started_at_ms: None, + last_heartbeat_at_ms: None, + last_error: None, + log_path: Some(log_path(storage_path, workspace_id).display().to_string()), + total_tasks: tasks.len(), + active_tasks, + retrying_tasks, + active_agents: 0, + max_agents: 0, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + uptime_ms: None, + last_activity_at_ms: None, + })) +} + +pub(crate) async fn get_workspace_symphony_snapshot_core( + runtimes: &SymphonyRuntimeRegistry, + storage_path: &Path, + workspace_id: &str, +) -> Result { + let tasks = list_tasks_from_db(storage_path, workspace_id)?; + let status = get_workspace_symphony_status_core(runtimes, storage_path, workspace_id).await?; + Ok(WorkspaceSymphonySnapshot { status, tasks }) +} + +pub(crate) async fn list_workspace_symphony_tasks_core( + storage_path: &Path, + workspace_id: &str, +) -> Result, String> { + list_tasks_from_db(storage_path, workspace_id) +} + +pub(crate) async fn create_workspace_symphony_task_core( + storage_path: &Path, + workspace_id: &str, + input: CreateWorkspaceTaskInput, +) -> Result { + let title = input.title.trim(); + if title.is_empty() { + return Err("Task title is required.".to_string()); + } + let db_path = ensure_task_store(storage_path, workspace_id)?; + let connection = open_connection(&db_path)?; + let task_id = uuid::Uuid::new_v4().to_string(); + let now = now_unix_ms(); + let order_index = next_order_index(&connection, workspace_id, &WorkspaceTaskStatus::Backlog)?; + connection + .execute( + "INSERT INTO tasks (id, workspace_id, title, description, status, order_index, created_at_ms, updated_at_ms) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)", + params![ + task_id, + workspace_id, + title, + input.description.map(|value| value.trim().to_string()), + WorkspaceTaskStatus::Backlog.as_str(), + order_index, + now, + now + ], + ) + .map_err(|err| err.to_string())?; + append_task_event( + &connection, + workspace_id, + &task_id, + "Task created in Backlog.", + )?; + read_task(&connection, workspace_id, &task_id) +} + +pub(crate) async fn update_workspace_symphony_task_core( + storage_path: &Path, + workspace_id: &str, + input: UpdateWorkspaceTaskInput, +) -> Result { + let db_path = ensure_task_store(storage_path, workspace_id)?; + let connection = open_connection(&db_path)?; + let current = read_task(&connection, workspace_id, &input.task_id)?; + let next_title = input + .title + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .unwrap_or(current.title.clone()); + let next_description = input + .description + .map(|value| value.trim().to_string()) + .or_else(|| current.description.clone()); + connection + .execute( + "UPDATE tasks SET title = ?1, description = ?2, updated_at_ms = ?3 WHERE workspace_id = ?4 AND id = ?5", + params![next_title, next_description, now_unix_ms(), workspace_id, input.task_id], + ) + .map_err(|err| err.to_string())?; + append_task_event( + &connection, + workspace_id, + &input.task_id, + "Task details updated.", + )?; + read_task(&connection, workspace_id, &input.task_id) +} + +pub(crate) async fn move_workspace_symphony_task_core( + storage_path: &Path, + workspace_id: &str, + input: MoveWorkspaceTaskInput, +) -> Result { + let db_path = ensure_task_store(storage_path, workspace_id)?; + let connection = open_connection(&db_path)?; + let current = read_task(&connection, workspace_id, &input.task_id)?; + if !can_user_move_task(¤t.status, &input.status) { + return Err("That move is not allowed from the board.".to_string()); + } + + let order_index = if let Some(position) = input.position { + i64::from(position) + } else { + next_order_index(&connection, workspace_id, &input.status)? + }; + + connection + .execute( + "UPDATE tasks SET status = ?1, order_index = ?2, updated_at_ms = ?3 WHERE workspace_id = ?4 AND id = ?5", + params![ + input.status.as_str(), + order_index, + now_unix_ms(), + workspace_id, + input.task_id + ], + ) + .map_err(|err| err.to_string())?; + append_task_event( + &connection, + workspace_id, + &input.task_id, + &format!("Task moved to {}.", input.status.as_str()), + )?; + read_task(&connection, workspace_id, &input.task_id) +} + +pub(crate) async fn delete_workspace_symphony_task_core( + storage_path: &Path, + workspace_id: &str, + task_id: &str, +) -> Result<(), String> { + let db_path = ensure_task_store(storage_path, workspace_id)?; + let connection = open_connection(&db_path)?; + connection + .execute( + "DELETE FROM task_events WHERE workspace_id = ?1 AND task_id = ?2", + params![workspace_id, task_id], + ) + .map_err(|err| err.to_string())?; + connection + .execute( + "DELETE FROM task_runs WHERE workspace_id = ?1 AND task_id = ?2", + params![workspace_id, task_id], + ) + .map_err(|err| err.to_string())?; + connection + .execute( + "DELETE FROM tasks WHERE workspace_id = ?1 AND id = ?2", + params![workspace_id, task_id], + ) + .map_err(|err| err.to_string())?; + Ok(()) +} + +pub(crate) async fn get_workspace_symphony_telemetry_core( + storage_path: &Path, + workspace_id: &str, + task_id: &str, +) -> Result { + let db_path = ensure_task_store(storage_path, workspace_id)?; + let connection = open_connection(&db_path)?; + let task = read_task(&connection, workspace_id, task_id)?; + let mut statement = connection + .prepare( + "SELECT id, task_id, workspace_id, message, created_at_ms + FROM task_events + WHERE workspace_id = ?1 AND task_id = ?2 + ORDER BY created_at_ms DESC + LIMIT 100", + ) + .map_err(|err| err.to_string())?; + let rows = statement + .query_map(params![workspace_id, task_id], |row| { + Ok(WorkspaceTaskEvent { + id: row.get(0)?, + task_id: row.get(1)?, + workspace_id: row.get(2)?, + message: row.get(3)?, + created_at_ms: row.get(4)?, + }) + }) + .map_err(|err| err.to_string())?; + let mut events = Vec::new(); + for row in rows { + events.push(row.map_err(|err| err.to_string())?); + } + let claimed_at_ms = read_task_claimed_at_ms(&connection, workspace_id, task_id)?; + let live_run = read_live_task_run_from_log(storage_path, workspace_id, task_id, claimed_at_ms); + Ok(WorkspaceTaskTelemetry { + task, + events, + live_run, + }) +} + +pub(crate) async fn read_workspace_symphony_workflow_override_core( + workspaces: &Mutex>, + storage_path: &Path, + workspace_id: &str, +) -> Result { + let entry = { + let guard = workspaces.lock().await; + guard + .get(workspace_id) + .cloned() + .ok_or_else(|| "workspace not found".to_string())? + }; + read_workflow_override_response(storage_path, &entry) +} + +pub(crate) async fn write_workspace_symphony_workflow_override_core( + storage_path: &Path, + workspace_id: &str, + content: &str, +) -> Result<(), String> { + write_workflow_override(storage_path, workspace_id, content) +} + +pub(crate) async fn start_workspace_symphony_core( + runtimes: Arc, + workspaces: &Mutex>, + storage_path: &Path, + workspace_id: &str, + event_sink: impl EventSink, + binary_path: PathBuf, +) -> Result { + let existing = { + runtimes + .lock() + .await + .get(workspace_id) + .map(|runtime| runtime.status.clone()) + }; + if let Some(status) = existing { + if status.state == WorkspaceSymphonyRuntimeState::Running { + return Ok(status); + } + } + + let entry = { + let guard = workspaces.lock().await; + guard + .get(workspace_id) + .cloned() + .ok_or_else(|| "workspace not found".to_string())? + }; + + let _ = ensure_task_store(storage_path, workspace_id)?; + let workflow = write_generated_workflow(storage_path, &entry)?; + let log_path = log_path(storage_path, workspace_id); + let _ = File::create(&log_path).map_err(|err| err.to_string())?; + + let mut command = build_symphony_launch_command(&binary_path, &workflow)?; + command.current_dir(automation_root_for_workspace(storage_path, workspace_id)); + command.stdout(std::process::Stdio::piped()); + command.stderr(std::process::Stdio::piped()); + let child = command.spawn().map_err(|err| err.to_string())?; + let pid = child.id(); + let child = Arc::new(Mutex::new(child)); + let status = hydrate_runtime_status(WorkspaceSymphonyStatus { + workspace_id: workspace_id.to_string(), + state: WorkspaceSymphonyRuntimeState::Running, + health: WorkspaceSymphonyHealth::Healthy, + binary_path: Some(binary_path.display().to_string()), + binary_version: probe_binary_version(&binary_path), + pid, + started_at_ms: Some(now_unix_ms()), + last_heartbeat_at_ms: Some(now_unix_ms()), + last_error: None, + log_path: Some(log_path.display().to_string()), + total_tasks: list_tasks_from_db(storage_path, workspace_id) + .unwrap_or_default() + .len(), + active_tasks: 0, + retrying_tasks: 0, + active_agents: 0, + max_agents: 1, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + uptime_ms: None, + last_activity_at_ms: Some(now_unix_ms()), + }); + + { + let mut guard = runtimes.lock().await; + guard.insert( + workspace_id.to_string(), + ManagedSymphonyRuntime { + child: Arc::clone(&child), + status: status.clone(), + }, + ); + } + + let status_json = serde_json::to_string(&status).unwrap_or_else(|_| "{}".to_string()); + let _ = update_runtime_snapshot(storage_path, workspace_id, &status_json); + event_sink.emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id: workspace_id.to_string(), + kind: "runtime_started".to_string(), + status: Some(status.clone()), + task: None, + telemetry: None, + message: None, + }); + + spawn_process_watchers( + Arc::clone(&runtimes), + storage_path.to_path_buf(), + workspace_id.to_string(), + child, + log_path, + event_sink.clone(), + ) + .await; + + Ok(status) +} + +pub(crate) async fn stop_workspace_symphony_core( + runtimes: &SymphonyRuntimeRegistry, + workspace_id: &str, +) -> Result { + let runtime = { + let mut guard = runtimes.lock().await; + guard.remove(workspace_id) + }; + + let Some(runtime) = runtime else { + return Ok(hydrate_runtime_status(WorkspaceSymphonyStatus { + workspace_id: workspace_id.to_string(), + state: WorkspaceSymphonyRuntimeState::Stopped, + health: WorkspaceSymphonyHealth::Stopped, + binary_path: None, + binary_version: None, + pid: None, + started_at_ms: None, + last_heartbeat_at_ms: None, + last_error: None, + log_path: None, + total_tasks: 0, + active_tasks: 0, + retrying_tasks: 0, + active_agents: 0, + max_agents: 0, + input_tokens: 0, + output_tokens: 0, + total_tokens: 0, + uptime_ms: None, + last_activity_at_ms: None, + })); + }; + + { + let mut child = runtime.child.lock().await; + let _ = kill_child_process_tree(&mut child).await; + } + + let mut status = runtime.status; + status.state = WorkspaceSymphonyRuntimeState::Stopped; + status.pid = None; + status.active_agents = 0; + Ok(hydrate_runtime_status(status)) +} + +#[cfg(test)] +mod tests { + use super::{ + build_default_workflow_content, detect_workflow_profile, is_local_clone_source, + preferred_remote_clone_source, WorkflowProfile, + }; + use crate::types::{WorkspaceEntry, WorkspaceKind, WorkspaceSettings}; + use git2::Repository; + use std::fs; + use std::path::Path; + + fn sample_workspace_entry(name: &str, path: &str) -> WorkspaceEntry { + WorkspaceEntry { + id: "ws-1".to_string(), + name: name.to_string(), + path: path.to_string(), + kind: WorkspaceKind::Main, + parent_id: None, + worktree: None, + settings: WorkspaceSettings::default(), + } + } + + fn temp_repo_root(name: &str) -> std::path::PathBuf { + let path = std::env::temp_dir().join(format!( + "codex-monitor-symphony-core-tests-{}-{}", + name, + uuid::Uuid::new_v4() + )); + fs::create_dir_all(&path).expect("create temp repo root"); + path + } + + #[test] + fn detects_codexmonitor_profile_from_repo_name() { + let entry = sample_workspace_entry( + "CodexMonitor", + "/Users/dimillian/Documents/Dev/CodexMonitor", + ); + + assert_eq!(detect_workflow_profile(&entry), WorkflowProfile::CodexMonitor); + } + + #[test] + fn detects_generic_profile_for_other_repositories() { + let entry = sample_workspace_entry("MyApp", "/Users/dimillian/Documents/Dev/MyApp"); + + assert_eq!(detect_workflow_profile(&entry), WorkflowProfile::Generic); + } + + #[test] + fn codexmonitor_default_workflow_uses_profile_guidance() { + let entry = sample_workspace_entry( + "CodexMonitor", + "/Users/dimillian/Documents/Dev/CodexMonitor", + ); + let workflow = build_default_workflow_content(Path::new("/tmp/codex-monitor.json"), &entry); + + assert!(workflow.contains("docs/codebase-map.md")); + assert!(workflow.contains("max_concurrent_agents: 10")); + assert!(workflow.contains("model_reasoning_effort=xhigh")); + assert!(workflow.contains("Human Review")); + } + + #[test] + fn generic_default_workflow_avoids_codexmonitor_specific_guidance() { + let entry = sample_workspace_entry("MyApp", "/Users/dimillian/Documents/Dev/MyApp"); + let workflow = build_default_workflow_content(Path::new("/tmp/codex-monitor.json"), &entry); + + assert!(workflow.contains("Read `AGENTS.md` if present")); + assert!(!workflow.contains("docs/codebase-map.md")); + assert!(workflow.contains("max_concurrent_agents: 1")); + assert!(workflow.contains("codex app-server")); + } + + #[test] + fn workflow_prefers_non_local_origin_remote_for_clone_source() { + let temp = temp_repo_root("origin-remote"); + let repo = Repository::init(&temp).expect("init repo"); + repo.remote("origin", "https://github.com/Dimillian/CodexMonitor.git") + .expect("add remote"); + + let entry = sample_workspace_entry("CodexMonitor", temp.to_str().expect("utf8 path")); + let workflow = build_default_workflow_content(Path::new("/tmp/codex-monitor.json"), &entry); + + assert!(workflow.contains("git clone --reference-if-able")); + assert!(workflow.contains("https://github.com/Dimillian/CodexMonitor.git .")); + + fs::remove_dir_all(temp).expect("cleanup temp repo"); + } + + #[test] + fn workflow_ignores_local_origin_and_uses_other_network_remote() { + let temp = temp_repo_root("mixed-remotes"); + let local_remote = temp.join("mirror.git"); + fs::create_dir_all(&local_remote).expect("create local remote dir"); + let repo = Repository::init(&temp).expect("init repo"); + repo.remote("origin", local_remote.to_str().expect("utf8 local path")) + .expect("add local origin"); + repo.remote("upstream", "git@github.com:Dimillian/CodexMonitor.git") + .expect("add upstream"); + + let resolved = preferred_remote_clone_source(&temp); + assert_eq!( + resolved.as_deref(), + Some("git@github.com:Dimillian/CodexMonitor.git") + ); + + fs::remove_dir_all(temp).expect("cleanup temp repo"); + } + + #[test] + fn workflow_falls_back_to_local_path_when_no_network_remote_exists() { + let temp = temp_repo_root("local-only"); + let repo = Repository::init(&temp).expect("init repo"); + let local_remote = temp.join("mirror.git"); + fs::create_dir_all(&local_remote).expect("create local remote dir"); + repo.remote("origin", local_remote.to_str().expect("utf8 local path")) + .expect("add local origin"); + + let entry = sample_workspace_entry("MyApp", temp.to_str().expect("utf8 path")); + let workflow = build_default_workflow_content(Path::new("/tmp/codex-monitor.json"), &entry); + + assert!(workflow.contains(&format!( + "git clone --reference-if-able {} {} .", + temp.display(), + temp.display() + ))); + + fs::remove_dir_all(temp).expect("cleanup temp repo"); + } + + #[test] + fn local_clone_source_detection_matches_path_style_urls() { + assert!(is_local_clone_source("/tmp/repo")); + assert!(is_local_clone_source("./repo")); + assert!(is_local_clone_source("../repo")); + assert!(is_local_clone_source("file:///tmp/repo")); + assert!(!is_local_clone_source("https://github.com/Dimillian/CodexMonitor.git")); + assert!(!is_local_clone_source("git@github.com:Dimillian/CodexMonitor.git")); + } +} diff --git a/src-tauri/src/shared/workspace_rpc.rs b/src-tauri/src/shared/workspace_rpc.rs index 85acbf46e..33219f674 100644 --- a/src-tauri/src/shared/workspace_rpc.rs +++ b/src-tauri/src/shared/workspace_rpc.rs @@ -2,7 +2,9 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::Value; -use crate::types::WorkspaceSettings; +use crate::types::{ + CreateWorkspaceTaskInput, MoveWorkspaceTaskInput, UpdateWorkspaceTaskInput, WorkspaceSettings, +}; #[allow(dead_code)] pub(crate) fn to_params(request: &T) -> Result { @@ -60,6 +62,20 @@ pub(crate) struct WorkspaceIdRequest { pub(crate) workspace_id: String, } +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct TaskIdRequest { + pub(crate) workspace_id: String, + pub(crate) task_id: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WriteWorkspaceSymphonyWorkflowOverrideRequest { + pub(crate) workspace_id: String, + pub(crate) content: String, +} + #[derive(Debug, Serialize, Deserialize)] pub(crate) struct IdRequest { pub(crate) id: String, @@ -108,3 +124,24 @@ pub(crate) struct OpenWorkspaceInRequest { pub(crate) struct GetOpenAppIconRequest { pub(crate) app_name: String, } + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct CreateWorkspaceSymphonyTaskRequest { + pub(crate) workspace_id: String, + pub(crate) input: CreateWorkspaceTaskInput, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct UpdateWorkspaceSymphonyTaskRequest { + pub(crate) workspace_id: String, + pub(crate) input: UpdateWorkspaceTaskInput, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct MoveWorkspaceSymphonyTaskRequest { + pub(crate) workspace_id: String, + pub(crate) input: MoveWorkspaceTaskInput, +} diff --git a/src-tauri/src/state.rs b/src-tauri/src/state.rs index e28cf4ed7..a7d973813 100644 --- a/src-tauri/src/state.rs +++ b/src-tauri/src/state.rs @@ -7,6 +7,7 @@ use tokio::sync::Mutex; use crate::dictation::DictationState; use crate::shared::codex_core::CodexLoginCancelState; +use crate::shared::symphony_core::SymphonyRuntimeRegistry; use crate::storage::{read_settings, read_workspaces}; use crate::types::{AppSettings, TcpDaemonState, TcpDaemonStatus, WorkspaceEntry}; @@ -41,6 +42,7 @@ pub(crate) struct AppState { pub(crate) dictation: Mutex, pub(crate) codex_login_cancels: Mutex>, pub(crate) tcp_daemon: Mutex, + pub(crate) symphony_runtimes: Arc, } impl AppState { @@ -64,6 +66,7 @@ impl AppState { dictation: Mutex::new(DictationState::default()), codex_login_cancels: Mutex::new(HashMap::new()), tcp_daemon: Mutex::new(TcpDaemonRuntime::default()), + symphony_runtimes: Arc::new(Mutex::new(HashMap::new())), } } } diff --git a/src-tauri/src/symphony.rs b/src-tauri/src/symphony.rs new file mode 100644 index 000000000..fb38b667a --- /dev/null +++ b/src-tauri/src/symphony.rs @@ -0,0 +1,374 @@ +use tauri::{AppHandle, State}; + +use crate::backend::events::EventSink; +use crate::event_sink::TauriEventSink; +use crate::remote_backend; +use crate::shared::{symphony_core, workspace_rpc}; +use crate::state::AppState; +use crate::symphony_binary::resolve_symphony_binary_path; +use crate::types::{ + CreateWorkspaceTaskInput, MoveWorkspaceTaskInput, UpdateWorkspaceTaskInput, + WorkspaceSymphonyEvent, WorkspaceSymphonySnapshot, WorkspaceSymphonyStatus, WorkspaceTask, + WorkspaceTaskTelemetry, +}; +use crate::files::io::TextFileResponse; + +fn workspace_remote_params(request: &T) -> Result { + workspace_rpc::to_params(request) +} + +#[tauri::command] +pub(crate) async fn get_workspace_symphony_status( + workspace_id: String, + state: State<'_, AppState>, + app: AppHandle, +) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::WorkspaceIdRequest { workspace_id }; + let response = remote_backend::call_remote( + &*state, + app, + "get_workspace_symphony_status", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + symphony_core::get_workspace_symphony_snapshot_core( + &state.symphony_runtimes, + &state.storage_path, + &workspace_id, + ) + .await +} + +#[tauri::command] +pub(crate) async fn start_workspace_symphony( + workspace_id: String, + state: State<'_, AppState>, + app: AppHandle, +) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::WorkspaceIdRequest { workspace_id }; + let response = remote_backend::call_remote( + &*state, + app, + "start_workspace_symphony", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + let binary_path = resolve_symphony_binary_path()?; + symphony_core::start_workspace_symphony_core( + state.symphony_runtimes.clone(), + &state.workspaces, + &state.storage_path, + &workspace_id, + TauriEventSink::new(app), + binary_path, + ) + .await +} + +#[tauri::command] +pub(crate) async fn stop_workspace_symphony( + workspace_id: String, + state: State<'_, AppState>, + app: AppHandle, +) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::WorkspaceIdRequest { workspace_id }; + let response = remote_backend::call_remote( + &*state, + app, + "stop_workspace_symphony", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + let status = + symphony_core::stop_workspace_symphony_core(&state.symphony_runtimes, &workspace_id) + .await?; + TauriEventSink::new(app).emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id, + kind: "runtime_stopped".to_string(), + status: Some(status.clone()), + task: None, + telemetry: None, + message: None, + }); + Ok(status) +} + +#[tauri::command] +pub(crate) async fn list_workspace_symphony_tasks( + workspace_id: String, + state: State<'_, AppState>, + app: AppHandle, +) -> Result, String> { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::WorkspaceIdRequest { workspace_id }; + let response = remote_backend::call_remote( + &*state, + app, + "list_workspace_symphony_tasks", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + symphony_core::list_workspace_symphony_tasks_core(&state.storage_path, &workspace_id).await +} + +#[tauri::command] +pub(crate) async fn create_workspace_symphony_task( + workspace_id: String, + input: CreateWorkspaceTaskInput, + state: State<'_, AppState>, + app: AppHandle, +) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::CreateWorkspaceSymphonyTaskRequest { + workspace_id, + input, + }; + let response = remote_backend::call_remote( + &*state, + app, + "create_workspace_symphony_task", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + let task = symphony_core::create_workspace_symphony_task_core( + &state.storage_path, + &workspace_id, + input, + ) + .await?; + TauriEventSink::new(app).emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id, + kind: "task_created".to_string(), + status: None, + task: Some(task.clone()), + telemetry: None, + message: None, + }); + Ok(task) +} + +#[tauri::command] +pub(crate) async fn update_workspace_symphony_task( + workspace_id: String, + input: UpdateWorkspaceTaskInput, + state: State<'_, AppState>, + app: AppHandle, +) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::UpdateWorkspaceSymphonyTaskRequest { + workspace_id, + input, + }; + let response = remote_backend::call_remote( + &*state, + app, + "update_workspace_symphony_task", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + let task = symphony_core::update_workspace_symphony_task_core( + &state.storage_path, + &workspace_id, + input, + ) + .await?; + TauriEventSink::new(app).emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id, + kind: "task_updated".to_string(), + status: None, + task: Some(task.clone()), + telemetry: None, + message: None, + }); + Ok(task) +} + +#[tauri::command] +pub(crate) async fn move_workspace_symphony_task( + workspace_id: String, + task_id: String, + status: crate::types::WorkspaceTaskStatus, + position: Option, + state: State<'_, AppState>, + app: AppHandle, +) -> Result { + let input = MoveWorkspaceTaskInput { + task_id, + status, + position, + }; + + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::MoveWorkspaceSymphonyTaskRequest { + workspace_id, + input, + }; + let response = remote_backend::call_remote( + &*state, + app, + "move_workspace_symphony_task", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + let task = + symphony_core::move_workspace_symphony_task_core(&state.storage_path, &workspace_id, input) + .await?; + TauriEventSink::new(app).emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id, + kind: "task_moved".to_string(), + status: None, + task: Some(task.clone()), + telemetry: None, + message: None, + }); + Ok(task) +} + +#[tauri::command] +pub(crate) async fn delete_workspace_symphony_task( + workspace_id: String, + task_id: String, + state: State<'_, AppState>, + app: AppHandle, +) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::TaskIdRequest { + workspace_id, + task_id, + }; + remote_backend::call_remote( + &*state, + app, + "delete_workspace_symphony_task", + workspace_remote_params(&request)?, + ) + .await?; + return Ok(()); + } + + symphony_core::delete_workspace_symphony_task_core( + &state.storage_path, + &workspace_id, + &task_id, + ) + .await?; + TauriEventSink::new(app).emit_workspace_symphony_event(WorkspaceSymphonyEvent { + workspace_id, + kind: "task_deleted".to_string(), + status: None, + task: None, + telemetry: None, + message: Some(task_id), + }); + Ok(()) +} + +#[tauri::command] +pub(crate) async fn get_workspace_symphony_telemetry( + workspace_id: String, + task_id: String, + state: State<'_, AppState>, + app: AppHandle, +) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::TaskIdRequest { + workspace_id, + task_id, + }; + let response = remote_backend::call_remote( + &*state, + app, + "get_workspace_symphony_telemetry", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + symphony_core::get_workspace_symphony_telemetry_core( + &state.storage_path, + &workspace_id, + &task_id, + ) + .await +} + +#[tauri::command] +pub(crate) async fn read_workspace_symphony_workflow_override( + workspace_id: String, + state: State<'_, AppState>, + app: AppHandle, +) -> Result { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::WorkspaceIdRequest { workspace_id }; + let response = remote_backend::call_remote( + &*state, + app, + "read_workspace_symphony_workflow_override", + workspace_remote_params(&request)?, + ) + .await?; + return serde_json::from_value(response).map_err(|err| err.to_string()); + } + + symphony_core::read_workspace_symphony_workflow_override_core( + &state.workspaces, + &state.storage_path, + &workspace_id, + ) + .await +} + +#[tauri::command] +pub(crate) async fn write_workspace_symphony_workflow_override( + workspace_id: String, + content: String, + state: State<'_, AppState>, + app: AppHandle, +) -> Result<(), String> { + if remote_backend::is_remote_mode(&*state).await { + let request = workspace_rpc::WriteWorkspaceSymphonyWorkflowOverrideRequest { + workspace_id, + content, + }; + remote_backend::call_remote( + &*state, + app, + "write_workspace_symphony_workflow_override", + workspace_remote_params(&request)?, + ) + .await?; + return Ok(()); + } + + symphony_core::write_workspace_symphony_workflow_override_core( + &state.storage_path, + &workspace_id, + &content, + ) + .await +} diff --git a/src-tauri/src/symphony_binary.rs b/src-tauri/src/symphony_binary.rs new file mode 100644 index 000000000..3649719bc --- /dev/null +++ b/src-tauri/src/symphony_binary.rs @@ -0,0 +1,131 @@ +use std::path::{Path, PathBuf}; + +pub(crate) fn symphony_binary_candidates() -> &'static [&'static str] { + if cfg!(windows) { + &[ + "codex_monitor_symphony.exe", + "codex-monitor-symphony.exe", + "symphony.exe", + ] + } else { + &[ + "codex_monitor_symphony", + "codex-monitor-symphony", + "symphony", + ] + } +} + +fn push_unique(dirs: &mut Vec, path: PathBuf) { + if !dirs.iter().any(|entry| entry == &path) { + dirs.push(path); + } +} + +fn symphony_dev_candidates(current_dir: &Path) -> Vec { + let mut candidates = Vec::new(); + + // In dev, the process cwd can be either the repo root or `src-tauri/`. + // Walk a few ancestor levels so the sibling `../symphony` repo is found + // from both layouts without hard-coding only one relative hop. + for ancestor in current_dir.ancestors().take(4) { + push_unique( + &mut candidates, + ancestor.join("../symphony/elixir/bin/symphony"), + ); + } + + candidates +} + +fn symphony_search_dirs(executable_dir: &Path) -> Vec { + let mut dirs = Vec::new(); + push_unique(&mut dirs, executable_dir.to_path_buf()); + push_unique(&mut dirs, executable_dir.join("resources").join("symphony")); + push_unique(&mut dirs, executable_dir.join("symphony")); + + #[cfg(target_os = "macos")] + { + if let Some(contents_dir) = executable_dir.parent() { + push_unique(&mut dirs, contents_dir.join("Resources")); + push_unique( + &mut dirs, + contents_dir + .join("Resources") + .join("resources") + .join("symphony"), + ); + push_unique(&mut dirs, contents_dir.join("Resources").join("symphony")); + } + push_unique(&mut dirs, PathBuf::from("/opt/homebrew/bin")); + push_unique(&mut dirs, PathBuf::from("/usr/local/bin")); + } + + #[cfg(target_os = "linux")] + { + push_unique(&mut dirs, PathBuf::from("/usr/local/bin")); + push_unique(&mut dirs, PathBuf::from("/usr/bin")); + } + + dirs +} + +pub(crate) fn resolve_symphony_binary_path() -> Result { + let candidate_names = symphony_binary_candidates(); + let mut attempted_paths: Vec = Vec::new(); + + if let Ok(explicit_raw) = std::env::var("CODEX_MONITOR_SYMPHONY_PATH") { + let explicit = explicit_raw.trim(); + if !explicit.is_empty() { + let explicit_path = PathBuf::from(explicit); + if explicit_path.is_file() { + return Ok(explicit_path); + } + if explicit_path.is_dir() { + for name in candidate_names { + let candidate = explicit_path.join(name); + if candidate.is_file() { + return Ok(candidate); + } + attempted_paths.push(candidate); + } + } else { + attempted_paths.push(explicit_path); + } + } + } + + if let Ok(current_dir) = std::env::current_dir() { + for dev_candidate in symphony_dev_candidates(¤t_dir) { + if dev_candidate.is_file() { + return Ok(dev_candidate); + } + attempted_paths.push(dev_candidate); + } + } + + let current_exe = std::env::current_exe().map_err(|err| err.to_string())?; + let executable_dir = current_exe + .parent() + .ok_or_else(|| "Unable to resolve executable directory".to_string())?; + for search_dir in symphony_search_dirs(executable_dir) { + for name in candidate_names { + let candidate = search_dir.join(name); + if candidate.is_file() { + return Ok(candidate); + } + attempted_paths.push(candidate); + } + } + + let attempted = attempted_paths + .iter() + .map(|path| path.display().to_string()) + .collect::>() + .join(", "); + + Err(format!( + "Unable to locate Symphony binary (tried: {})", + attempted + )) +} diff --git a/src-tauri/src/types.rs b/src-tauri/src/types.rs index 6b595106b..b469da2dc 100644 --- a/src-tauri/src/types.rs +++ b/src-tauri/src/types.rs @@ -315,6 +315,8 @@ pub(crate) struct WorkspaceGroup { pub(crate) struct WorkspaceSettings { #[serde(default, rename = "sidebarCollapsed")] pub(crate) sidebar_collapsed: bool, + #[serde(default, rename = "workspaceHomeTab")] + pub(crate) workspace_home_tab: Option, #[serde(default, rename = "sortOrder")] pub(crate) sort_order: Option, #[serde(default, rename = "groupId")] @@ -333,6 +335,235 @@ pub(crate) struct WorkspaceSettings { pub(crate) worktrees_folder: Option, } +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub(crate) enum WorkspaceHomeTab { + Configuration, + Symphony, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub(crate) enum WorkspaceTaskStatus { + Backlog, + Todo, + InProgress, + HumanReview, + Rework, + Merging, + Done, +} + +impl WorkspaceTaskStatus { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Self::Backlog => "backlog", + Self::Todo => "todo", + Self::InProgress => "in_progress", + Self::HumanReview => "human_review", + Self::Rework => "rework", + Self::Merging => "merging", + Self::Done => "done", + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WorkspaceTask { + pub(crate) id: String, + pub(crate) workspace_id: String, + pub(crate) title: String, + #[serde(default)] + pub(crate) description: Option, + pub(crate) status: WorkspaceTaskStatus, + pub(crate) order_index: i64, + pub(crate) created_at_ms: i64, + pub(crate) updated_at_ms: i64, + #[serde(default)] + pub(crate) active_run: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WorkspaceTaskRun { + pub(crate) id: String, + pub(crate) task_id: String, + pub(crate) workspace_id: String, + #[serde(default)] + pub(crate) thread_id: Option, + #[serde(default)] + pub(crate) worktree_workspace_id: Option, + #[serde(default)] + pub(crate) branch_name: Option, + #[serde(default)] + pub(crate) pull_request_url: Option, + #[serde(default)] + pub(crate) session_id: Option, + #[serde(default)] + pub(crate) last_event: Option, + #[serde(default)] + pub(crate) last_message: Option, + #[serde(default)] + pub(crate) last_error: Option, + pub(crate) retry_count: i64, + pub(crate) token_total: i64, + pub(crate) started_at_ms: i64, + #[serde(default)] + pub(crate) updated_at_ms: i64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WorkspaceTaskEvent { + pub(crate) id: String, + pub(crate) task_id: String, + pub(crate) workspace_id: String, + pub(crate) message: String, + pub(crate) created_at_ms: i64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WorkspaceTaskTelemetry { + pub(crate) task: WorkspaceTask, + #[serde(default)] + pub(crate) events: Vec, + #[serde(default)] + pub(crate) live_run: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WorkspaceTaskLiveRun { + pub(crate) stage: String, + #[serde(default)] + pub(crate) agent_pid: Option, + #[serde(default)] + pub(crate) age_label: Option, + #[serde(default)] + pub(crate) turn_count: Option, + #[serde(default)] + pub(crate) token_total: Option, + #[serde(default)] + pub(crate) session_id: Option, + #[serde(default)] + pub(crate) current_event: Option, + #[serde(default)] + pub(crate) claimed_at_ms: Option, + pub(crate) observed_at_ms: i64, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub(crate) enum WorkspaceSymphonyRuntimeState { + Stopped, + Starting, + Running, + Error, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub(crate) enum WorkspaceSymphonyHealth { + Healthy, + Stale, + Error, + Stopped, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WorkspaceSymphonyStatus { + pub(crate) workspace_id: String, + pub(crate) state: WorkspaceSymphonyRuntimeState, + pub(crate) health: WorkspaceSymphonyHealth, + #[serde(default)] + pub(crate) binary_path: Option, + #[serde(default)] + pub(crate) binary_version: Option, + #[serde(default)] + pub(crate) pid: Option, + #[serde(default)] + pub(crate) started_at_ms: Option, + #[serde(default)] + pub(crate) last_heartbeat_at_ms: Option, + #[serde(default)] + pub(crate) last_error: Option, + #[serde(default)] + pub(crate) log_path: Option, + #[serde(default)] + pub(crate) total_tasks: usize, + #[serde(default)] + pub(crate) active_tasks: usize, + #[serde(default)] + pub(crate) retrying_tasks: usize, + #[serde(default)] + pub(crate) active_agents: usize, + #[serde(default)] + pub(crate) max_agents: usize, + #[serde(default)] + pub(crate) input_tokens: i64, + #[serde(default)] + pub(crate) output_tokens: i64, + #[serde(default)] + pub(crate) total_tokens: i64, + #[serde(default)] + pub(crate) uptime_ms: Option, + #[serde(default)] + pub(crate) last_activity_at_ms: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WorkspaceSymphonySnapshot { + pub(crate) status: WorkspaceSymphonyStatus, + #[serde(default)] + pub(crate) tasks: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct CreateWorkspaceTaskInput { + pub(crate) title: String, + #[serde(default)] + pub(crate) description: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct UpdateWorkspaceTaskInput { + pub(crate) task_id: String, + #[serde(default)] + pub(crate) title: Option, + #[serde(default)] + pub(crate) description: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct MoveWorkspaceTaskInput { + pub(crate) task_id: String, + pub(crate) status: WorkspaceTaskStatus, + #[serde(default)] + pub(crate) position: Option, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub(crate) struct WorkspaceSymphonyEvent { + pub(crate) workspace_id: String, + pub(crate) kind: String, + #[serde(default)] + pub(crate) status: Option, + #[serde(default)] + pub(crate) task: Option, + #[serde(default)] + pub(crate) telemetry: Option, + #[serde(default)] + pub(crate) message: Option, +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub(crate) struct LaunchScriptEntry { pub(crate) id: String, diff --git a/src-tauri/src/workspaces/tests.rs b/src-tauri/src/workspaces/tests.rs index 01311ae70..17a734b86 100644 --- a/src-tauri/src/workspaces/tests.rs +++ b/src-tauri/src/workspaces/tests.rs @@ -49,6 +49,7 @@ fn workspace_with_id_and_kind( worktree, settings: WorkspaceSettings { sidebar_collapsed: false, + workspace_home_tab: None, sort_order, group_id: None, clone_source_workspace_id: None, diff --git a/src-tauri/tauri.conf.json b/src-tauri/tauri.conf.json index 41ca09949..071afba8c 100644 --- a/src-tauri/tauri.conf.json +++ b/src-tauri/tauri.conf.json @@ -43,6 +43,9 @@ "active": true, "targets": "all", "createUpdaterArtifacts": true, + "resources": [ + "./resources/symphony/**/*" + ], "iOS": { "frameworks": [ "libz.tbd", diff --git a/src/features/app/components/MainApp.tsx b/src/features/app/components/MainApp.tsx index 029a1f78d..e8821b40c 100644 --- a/src/features/app/components/MainApp.tsx +++ b/src/features/app/components/MainApp.tsx @@ -47,7 +47,9 @@ import { useMainAppSidebarMenuOrchestration } from "@app/hooks/useMainAppSidebar import { useMainAppWorktreeState } from "@app/hooks/useMainAppWorktreeState"; import { useMainAppWorkspaceActions } from "@app/hooks/useMainAppWorkspaceActions"; import { useMainAppWorkspaceLifecycle } from "@app/hooks/useMainAppWorkspaceLifecycle"; +import { useWorkspaceSymphonyStatuses } from "@app/hooks/useWorkspaceSymphonyStatuses"; import { useHomeAccount } from "@app/hooks/useHomeAccount"; +import { useWorkspaceSymphonyWorkflow } from "@/features/workspaces/hooks/useWorkspaceSymphonyWorkflow"; import type { ComposerEditorSettings, ServiceTier, @@ -1315,6 +1317,22 @@ export default function MainApp() { refresh: refreshAgentMd, save: saveAgentMd, } = agentMdState; + const { + content: workflowContent, + exists: workflowExists, + truncated: workflowTruncated, + isLoading: workflowLoading, + isSaving: workflowSaving, + error: workflowError, + isDirty: workflowDirty, + setContent: setWorkflowContent, + refresh: refreshWorkflow, + save: saveWorkflow, + } = useWorkspaceSymphonyWorkflow({ + activeWorkspace, + onDebug: addDebugEntry, + }); + const workspaceSymphonyStatusByWorkspace = useWorkspaceSymphonyStatuses(workspaces); const promptActions = useMainAppPromptActions({ activeWorkspace, connectWorkspace, @@ -1690,6 +1708,25 @@ export default function MainApp() { onAgentMdSave: () => { void saveAgentMd(); }, + workflowContent, + workflowExists, + workflowTruncated, + workflowLoading, + workflowSaving, + workflowError, + workflowDirty, + onWorkflowChange: setWorkflowContent, + onWorkflowRefresh: () => { + void refreshWorkflow(); + }, + onWorkflowSave: () => { + void saveWorkflow(); + }, + onWorkspaceHomeTabChange: async (tab) => { + await updateWorkspaceSettings(activeWorkspace.id, { + workspaceHomeTab: tab, + }); + }, } : null, }); @@ -1732,6 +1769,7 @@ export default function MainApp() { activeWorkspace, activeWorkspaceId, activeThreadId, + workspaceSymphonyStatusByWorkspace, activeItems, userInputRequests, approvals, diff --git a/src/features/app/components/Sidebar.tsx b/src/features/app/components/Sidebar.tsx index 53ce98a06..75e96e988 100644 --- a/src/features/app/components/Sidebar.tsx +++ b/src/features/app/components/Sidebar.tsx @@ -6,6 +6,7 @@ import type { ThreadListSortKey, ThreadSummary, WorkspaceInfo, + WorkspaceSymphonyStatus, } from "../../../types"; import { createPortal } from "react-dom"; import { memo, useCallback, useEffect, useMemo, useState } from "react"; @@ -75,6 +76,7 @@ type SidebarProps = { threadListOrganizeMode: ThreadListOrganizeMode; onSetThreadListOrganizeMode: (organizeMode: ThreadListOrganizeMode) => void; onRefreshAllThreads: () => void; + workspaceSymphonyStatusByWorkspace?: Record; activeWorkspaceId: string | null; activeThreadId: string | null; userInputRequests?: RequestUserInputRequest[]; @@ -136,6 +138,7 @@ export const Sidebar = memo(function Sidebar({ threadListOrganizeMode, onSetThreadListOrganizeMode, onRefreshAllThreads, + workspaceSymphonyStatusByWorkspace = {}, activeWorkspaceId, activeThreadId, userInputRequests = [], @@ -1002,6 +1005,7 @@ export const Sidebar = memo(function Sidebar({ key={entry.id} workspace={entry} workspaceName={renderHighlightedName(entry.name)} + symphonyStatus={workspaceSymphonyStatusByWorkspace[entry.id] ?? null} isActive={entry.id === activeWorkspaceId} isCollapsed={isCollapsed} addMenuOpen={addMenuOpen} diff --git a/src/features/app/components/WorkspaceCard.tsx b/src/features/app/components/WorkspaceCard.tsx index 670384ae4..d9cc99ad1 100644 --- a/src/features/app/components/WorkspaceCard.tsx +++ b/src/features/app/components/WorkspaceCard.tsx @@ -1,10 +1,11 @@ import type { MouseEvent } from "react"; -import type { WorkspaceInfo } from "../../../types"; +import type { WorkspaceInfo, WorkspaceSymphonyStatus } from "../../../types"; type WorkspaceCardProps = { workspace: WorkspaceInfo; workspaceName?: React.ReactNode; + symphonyStatus?: WorkspaceSymphonyStatus | null; isActive: boolean; isCollapsed: boolean; addMenuOpen: boolean; @@ -25,6 +26,7 @@ type WorkspaceCardProps = { export function WorkspaceCard({ workspace, workspaceName, + symphonyStatus = null, isActive, isCollapsed, addMenuOpen, @@ -53,7 +55,7 @@ export function WorkspaceCard({ } }} > -
+
{workspaceName ?? workspace.name} @@ -98,6 +100,22 @@ export function WorkspaceCard({ +
+ {symphonyStatus?.state === "running" ? ( +
+ + + {symphonyStatus.activeAgents === 1 + ? "1 agent" + : `${symphonyStatus.activeAgents} agents`} + {symphonyStatus.activeTasks > 0 + ? ` • ${symphonyStatus.activeTasks} active` + : ""} + +
+ ) : null}
{!workspace.connected && ( ; activeItems: LayoutNodesOptions["primary"]["messagesProps"]["items"]; userInputRequests: SidebarProps["userInputRequests"]; approvals: LayoutNodesOptions["primary"]["approvalToastsProps"]["approvals"]; @@ -251,6 +257,7 @@ export function useMainAppLayoutSurfaces({ activeWorkspace, activeWorkspaceId, activeThreadId, + workspaceSymphonyStatusByWorkspace, activeItems, userInputRequests, approvals, @@ -411,6 +418,7 @@ export function useMainAppLayoutSurfaces({ threadListOrganizeMode, onSetThreadListOrganizeMode, onRefreshAllThreads, + workspaceSymphonyStatusByWorkspace, activeWorkspaceId, activeThreadId, userInputRequests, diff --git a/src/features/app/hooks/useWorkspaceSymphonyStatuses.ts b/src/features/app/hooks/useWorkspaceSymphonyStatuses.ts new file mode 100644 index 000000000..dd0d0c872 --- /dev/null +++ b/src/features/app/hooks/useWorkspaceSymphonyStatuses.ts @@ -0,0 +1,86 @@ +import { useCallback, useEffect, useState } from "react"; +import type { WorkspaceInfo, WorkspaceSymphonyEvent, WorkspaceSymphonyStatus } from "@/types"; +import { subscribeWorkspaceSymphonyEvents } from "@services/events"; +import { getWorkspaceSymphonyStatus } from "@services/tauri"; +import { useTauriEvent } from "@app/hooks/useTauriEvent"; + +type WorkspaceSymphonyStatusMap = Record; + +export function useWorkspaceSymphonyStatuses(workspaces: WorkspaceInfo[]) { + const [statusByWorkspace, setStatusByWorkspace] = useState({}); + + const refreshWorkspaceStatus = useCallback(async (workspaceId: string) => { + try { + const snapshot = await getWorkspaceSymphonyStatus(workspaceId); + setStatusByWorkspace((prev) => { + if (snapshot.status.state !== "running") { + if (!(workspaceId in prev)) { + return prev; + } + const next = { ...prev }; + delete next[workspaceId]; + return next; + } + return { + ...prev, + [workspaceId]: snapshot.status, + }; + }); + } catch { + setStatusByWorkspace((prev) => { + if (!(workspaceId in prev)) { + return prev; + } + const next = { ...prev }; + delete next[workspaceId]; + return next; + }); + } + }, []); + + useEffect(() => { + let cancelled = false; + const workspaceIds = workspaces + .filter((workspace) => (workspace.kind ?? "main") !== "worktree") + .map((workspace) => workspace.id); + + if (workspaceIds.length === 0) { + setStatusByWorkspace({}); + return; + } + + void Promise.all( + workspaceIds.map(async (workspaceId) => { + try { + const snapshot = await getWorkspaceSymphonyStatus(workspaceId); + return snapshot.status.state === "running" + ? ([workspaceId, snapshot.status] as const) + : null; + } catch { + return null; + } + }), + ).then((entries) => { + if (cancelled) { + return; + } + setStatusByWorkspace( + Object.fromEntries(entries.filter((entry): entry is readonly [string, WorkspaceSymphonyStatus] => Boolean(entry))), + ); + }); + + return () => { + cancelled = true; + }; + }, [workspaces]); + + useTauriEvent( + subscribeWorkspaceSymphonyEvents, + (event: WorkspaceSymphonyEvent) => { + void refreshWorkspaceStatus(event.workspaceId); + }, + { enabled: workspaces.length > 0 }, + ); + + return statusByWorkspace; +} diff --git a/src/features/workspaces/components/WorkspaceHome.tsx b/src/features/workspaces/components/WorkspaceHome.tsx index 529e8b053..ae1e6e26b 100644 --- a/src/features/workspaces/components/WorkspaceHome.tsx +++ b/src/features/workspaces/components/WorkspaceHome.tsx @@ -14,6 +14,7 @@ import type { DictationTranscript, ModelOption, SkillOption, + WorkspaceHomeTab, WorkspaceInfo, } from "../../../types"; import { ComposerInput } from "../../composer/components/ComposerInput"; @@ -31,6 +32,7 @@ import { FileEditorCard } from "../../shared/components/FileEditorCard"; import { WorkspaceHomeRunControls } from "./WorkspaceHomeRunControls"; import { WorkspaceHomeHistory } from "./WorkspaceHomeHistory"; import { WorkspaceHomeGitInitBanner } from "./WorkspaceHomeGitInitBanner"; +import { WorkspaceHomeSymphonySection } from "./WorkspaceHomeSymphonySection"; import { buildIconPath } from "./workspaceHomeHelpers"; import { useWorkspaceHomeSuggestionsStyle } from "../hooks/useWorkspaceHomeSuggestionsStyle"; import type { ThreadStatusById } from "../../../utils/threadStatus"; @@ -96,6 +98,17 @@ type WorkspaceHomeProps = { onAgentMdChange: (value: string) => void; onAgentMdRefresh: () => void; onAgentMdSave: () => void; + workflowContent: string; + workflowExists: boolean; + workflowTruncated: boolean; + workflowLoading: boolean; + workflowSaving: boolean; + workflowError: string | null; + workflowDirty: boolean; + onWorkflowChange: (value: string) => void; + onWorkflowRefresh: () => void; + onWorkflowSave: () => void; + onWorkspaceHomeTabChange: (tab: WorkspaceHomeTab) => void | Promise; }; export function WorkspaceHome({ @@ -159,6 +172,17 @@ export function WorkspaceHome({ onAgentMdChange, onAgentMdRefresh, onAgentMdSave, + workflowContent, + workflowExists, + workflowTruncated, + workflowLoading, + workflowSaving, + workflowError, + workflowDirty, + onWorkflowChange, + onWorkflowRefresh, + onWorkflowSave, + onWorkspaceHomeTabChange, }: WorkspaceHomeProps) { const [showIcon, setShowIcon] = useState(true); const [selectionStart, setSelectionStart] = useState(null); @@ -346,9 +370,32 @@ export function WorkspaceHome({ const agentMdSaveLabel = agentMdExists ? "Save" : "Create"; const agentMdSaveDisabled = agentMdLoading || agentMdSaving || !agentMdDirty; const agentMdRefreshDisabled = agentMdLoading || agentMdSaving; + const selectedTab = workspace.settings.workspaceHomeTab ?? "configuration"; + const workflowStatus = workflowLoading + ? "Loading…" + : workflowSaving + ? "Saving…" + : workflowExists + ? "Override" + : "Default"; + const workflowMetaParts: string[] = []; + if (workflowStatus) { + workflowMetaParts.push(workflowStatus); + } + if (workflowTruncated) { + workflowMetaParts.push("Truncated"); + } + const workflowMeta = workflowMetaParts.join(" · "); + const workflowSaveLabel = workflowExists ? "Save" : "Create"; + const workflowSaveDisabled = workflowLoading || workflowSaving || !workflowDirty; + const workflowRefreshDisabled = workflowLoading || workflowSaving; return ( -
+
{showIcon && (
- {showGitInitBanner && ( - - )} - -
-
- 0 || activeImages.length > 0} - isProcessing={isSubmitting} - onStop={() => {}} - onSend={() => { - void handleRunSubmit(); +
+ +
- {error &&
{error}
} + Symphony +
- + {selectedTab === "configuration" ? ( + <> + {showGitInitBanner && ( + + )} -
- {agentMdTruncated && ( -
- Showing the first part of a large file. +
+
+ 0 || activeImages.length > 0} + isProcessing={isSubmitting} + onStop={() => {}} + onSend={() => { + void handleRunSubmit(); + }} + dictationState={dictationState} + dictationLevel={dictationLevel} + dictationEnabled={dictationEnabled} + onToggleDictation={onToggleDictation} + onCancelDictation={onCancelDictation} + onOpenDictationSettings={onOpenDictationSettings} + dictationError={dictationError} + onDismissDictationError={onDismissDictationError} + dictationHint={dictationHint} + onDismissDictationHint={onDismissDictationHint} + attachments={activeImages} + onAddAttachment={() => { + void pickImages(); + }} + onAttachImages={attachImages} + onRemoveAttachment={removeImage} + onTextChange={handleTextChangeWithHistory} + onSelectionChange={handleSelectionChange} + onKeyDown={handleComposerKeyDown} + isExpanded={false} + onToggleExpand={undefined} + textareaRef={textareaRef} + suggestionsOpen={isAutocompleteOpen} + suggestions={autocompleteMatches} + highlightIndex={highlightIndex} + onHighlightIndex={setHighlightIndex} + onSelectSuggestion={applyAutocomplete} + suggestionsStyle={suggestionsStyle} + /> +
+ {error &&
{error}
}
- )} - -
- + + +
+ {agentMdTruncated && ( +
+ Showing the first part of a large file. +
+ )} + +
+ + + + ) : ( + + )}
); } diff --git a/src/features/workspaces/components/WorkspaceHomeSymphonySection.tsx b/src/features/workspaces/components/WorkspaceHomeSymphonySection.tsx new file mode 100644 index 000000000..699134eef --- /dev/null +++ b/src/features/workspaces/components/WorkspaceHomeSymphonySection.tsx @@ -0,0 +1,888 @@ +import { useCallback, useEffect, useMemo, useState } from "react"; +import { revealItemInDir } from "@tauri-apps/plugin-opener"; +import Plus from "lucide-react/dist/esm/icons/plus"; +import type { + WorkspaceInfo, + WorkspaceSymphonyHealth, + WorkspaceSymphonySnapshot, + WorkspaceSymphonyStatus, + WorkspaceTask, + WorkspaceTaskEvent, + WorkspaceTaskStatus, + WorkspaceTaskTelemetry, +} from "../../../types"; +import { + createWorkspaceSymphonyTask, + deleteWorkspaceSymphonyTask, + getWorkspaceSymphonyStatus, + getWorkspaceSymphonyTelemetry, + listWorkspaces, + moveWorkspaceSymphonyTask, + startWorkspaceSymphony, + stopWorkspaceSymphony, + updateWorkspaceSymphonyTask, +} from "../../../services/tauri"; +import { subscribeWorkspaceSymphonyEvents } from "../../../services/events"; +import { ModalShell } from "../../design-system/components/modal/ModalShell"; +import { FileEditorCard } from "../../shared/components/FileEditorCard"; + +const COLUMNS: { status: WorkspaceTaskStatus; label: string }[] = [ + { status: "backlog", label: "Backlog" }, + { status: "todo", label: "Todo" }, + { status: "in_progress", label: "In Progress" }, + { status: "human_review", label: "Human Review" }, + { status: "rework", label: "Rework" }, + { status: "merging", label: "Merging" }, + { status: "done", label: "Done" }, +]; + +type WorkspaceHomeSymphonySectionProps = { + workspace: WorkspaceInfo; + workflowContent: string; + workflowExists: boolean; + workflowTruncated: boolean; + workflowLoading: boolean; + workflowSaving: boolean; + workflowError: string | null; + workflowDirty: boolean; + workflowMeta: string; + workflowSaveLabel: string; + workflowSaveDisabled: boolean; + workflowRefreshDisabled: boolean; + onWorkflowChange: (value: string) => void; + onWorkflowRefresh: () => void; + onWorkflowSave: () => void; + onSelectInstance: (workspaceId: string, threadId: string) => void; +}; + +function formatRelativeTime(timestamp?: number | null) { + if (!timestamp) { + return "Never"; + } + const seconds = Math.max(0, Math.round((Date.now() - timestamp) / 1000)); + if (seconds < 60) { + return `${seconds}s ago`; + } + const minutes = Math.round(seconds / 60); + if (minutes < 60) { + return `${minutes}m ago`; + } + const hours = Math.round(minutes / 60); + if (hours < 48) { + return `${hours}h ago`; + } + const days = Math.round(hours / 24); + return `${days}d ago`; +} + +function formatAbsoluteTime(timestamp?: number | null) { + if (!timestamp) { + return "Unknown"; + } + return new Date(timestamp).toLocaleString([], { + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + }); +} + +function formatDuration(durationMs?: number | null) { + if (!durationMs || durationMs <= 0) { + return "0m"; + } + const totalSeconds = Math.floor(durationMs / 1000); + const hours = Math.floor(totalSeconds / 3600); + const minutes = Math.floor((totalSeconds % 3600) / 60); + const seconds = totalSeconds % 60; + if (hours > 0) { + return `${hours}h ${minutes}m`; + } + if (minutes > 0) { + return `${minutes}m ${seconds}s`; + } + return `${seconds}s`; +} + +function formatTaskStatusLabel(status: WorkspaceTaskStatus) { + return status.replace(/_/g, " "); +} + +function formatStateLabel(state: WorkspaceSymphonyStatus["state"]) { + switch (state) { + case "running": + return "Running"; + case "starting": + return "Starting"; + case "error": + return "Error"; + default: + return "Stopped"; + } +} + +function formatHealthLabel(health: WorkspaceSymphonyHealth) { + switch (health) { + case "healthy": + return "Healthy"; + case "stale": + return "Stale"; + case "error": + return "Error"; + default: + return "Stopped"; + } +} + +function formatTokenCount(value?: number | null) { + return new Intl.NumberFormat().format(value ?? 0); +} + +function buildTaskSummary(task: WorkspaceTask) { + const run = task.activeRun; + if (!run) { + return task.status === "todo" + ? "Queued for Symphony." + : task.description?.trim() || "No activity yet."; + } + return ( + run.lastMessage || + run.lastEvent || + run.lastError || + task.description?.trim() || + "Live telemetry pending." + ); +} + +function buildTimelineEvents( + telemetry: WorkspaceTaskTelemetry | null, +): Array { + return (telemetry?.events ?? []).map((event) => ({ + ...event, + tone: /moved|created|deleted|claimed|review/i.test(event.message) + ? "transition" + : "telemetry", + })); +} + +export function WorkspaceHomeSymphonySection({ + workspace, + workflowContent, + workflowExists, + workflowTruncated, + workflowLoading, + workflowSaving, + workflowError, + workflowDirty, + workflowMeta, + workflowSaveLabel, + workflowSaveDisabled, + workflowRefreshDisabled, + onWorkflowChange, + onWorkflowRefresh, + onWorkflowSave, + onSelectInstance, +}: WorkspaceHomeSymphonySectionProps) { + const [snapshot, setSnapshot] = useState(null); + const [telemetry, setTelemetry] = useState(null); + const [selectedTaskId, setSelectedTaskId] = useState(null); + const [draftTaskId, setDraftTaskId] = useState(null); + const [showCreateModal, setShowCreateModal] = useState(false); + const [newTitle, setNewTitle] = useState(""); + const [newDescription, setNewDescription] = useState(""); + const [editTitle, setEditTitle] = useState(""); + const [editDescription, setEditDescription] = useState(""); + const [busyAction, setBusyAction] = useState(null); + const [error, setError] = useState(null); + const [workspacesById, setWorkspacesById] = useState>({}); + + const selectedTask = useMemo( + () => snapshot?.tasks.find((task) => task.id === selectedTaskId) ?? null, + [selectedTaskId, snapshot?.tasks], + ); + + useEffect(() => { + if (!selectedTask) { + if (!selectedTaskId) { + setDraftTaskId(null); + } + return; + } + if (draftTaskId === selectedTask.id) { + return; + } + setEditTitle(selectedTask.title); + setEditDescription(selectedTask.description ?? ""); + setDraftTaskId(selectedTask.id); + }, [draftTaskId, selectedTask, selectedTaskId]); + + const loadTelemetry = useCallback( + async (taskId: string | null) => { + if (!taskId) { + setTelemetry(null); + return; + } + const nextTelemetry = await getWorkspaceSymphonyTelemetry(workspace.id, taskId); + setTelemetry(nextTelemetry); + }, + [workspace.id], + ); + + const refreshSnapshot = useCallback( + async (taskIdOverride?: string | null) => { + const nextSnapshot = await getWorkspaceSymphonyStatus(workspace.id); + setSnapshot(nextSnapshot); + const nextSelectedTaskId = + taskIdOverride === undefined + ? selectedTaskId && nextSnapshot.tasks.some((task) => task.id === selectedTaskId) + ? selectedTaskId + : (nextSnapshot.tasks[0]?.id ?? null) + : taskIdOverride; + setSelectedTaskId(nextSelectedTaskId); + await loadTelemetry(nextSelectedTaskId); + return nextSnapshot; + }, + [loadTelemetry, selectedTaskId, workspace.id], + ); + + useEffect(() => { + let cancelled = false; + void listWorkspaces().then((entries) => { + if (cancelled) { + return; + } + setWorkspacesById(Object.fromEntries(entries.map((entry) => [entry.id, entry]))); + }); + return () => { + cancelled = true; + }; + }, []); + + useEffect(() => { + let cancelled = false; + + const load = async () => { + try { + const nextSnapshot = await getWorkspaceSymphonyStatus(workspace.id); + if (cancelled) { + return; + } + setSnapshot(nextSnapshot); + const nextSelectedTaskId = + selectedTaskId && nextSnapshot.tasks.some((task) => task.id === selectedTaskId) + ? selectedTaskId + : (nextSnapshot.tasks[0]?.id ?? null); + setSelectedTaskId(nextSelectedTaskId); + if (nextSelectedTaskId) { + const nextTelemetry = await getWorkspaceSymphonyTelemetry( + workspace.id, + nextSelectedTaskId, + ); + if (!cancelled) { + setTelemetry(nextTelemetry); + } + } else if (!cancelled) { + setTelemetry(null); + } + setError(null); + } catch (loadError) { + if (!cancelled) { + setError(loadError instanceof Error ? loadError.message : String(loadError)); + } + } + }; + + void load(); + const unsubscribe = subscribeWorkspaceSymphonyEvents((event) => { + if (event.workspaceId !== workspace.id) { + return; + } + void load(); + }); + + return () => { + cancelled = true; + unsubscribe(); + }; + }, [selectedTaskId, workspace.id]); + + const tasksByStatus = useMemo(() => { + const grouped = new Map(); + for (const column of COLUMNS) { + grouped.set(column.status, []); + } + for (const task of snapshot?.tasks ?? []) { + grouped.get(task.status)?.push(task); + } + return grouped; + }, [snapshot?.tasks]); + + const timelineEvents = useMemo(() => buildTimelineEvents(telemetry), [telemetry]); + + const runAction = async ( + actionKey: string, + action: () => Promise, + ) => { + setBusyAction(actionKey); + setError(null); + try { + const nextSelectedTaskId = await action(); + await refreshSnapshot(nextSelectedTaskId ?? undefined); + } catch (actionError) { + setError(actionError instanceof Error ? actionError.message : String(actionError)); + } finally { + setBusyAction(null); + } + }; + + const handleCreateTask = async () => { + const trimmedTitle = newTitle.trim(); + if (!trimmedTitle) { + return; + } + await runAction("create", async () => { + const task = await createWorkspaceSymphonyTask(workspace.id, { + title: trimmedTitle, + description: newDescription.trim() || null, + }); + setNewTitle(""); + setNewDescription(""); + setShowCreateModal(false); + return task.id; + }); + }; + + const handleSaveTask = async () => { + if (!selectedTask) { + return; + } + await runAction(`save:${selectedTask.id}`, async () => { + const task = await updateWorkspaceSymphonyTask(workspace.id, { + taskId: selectedTask.id, + title: editTitle.trim() || selectedTask.title, + description: editDescription.trim() || null, + }); + return task.id; + }); + }; + + const moveTask = async (task: WorkspaceTask, status: WorkspaceTaskStatus) => { + await runAction(`move:${task.id}:${status}`, async () => { + const moved = await moveWorkspaceSymphonyTask(workspace.id, task.id, status); + return moved.id; + }); + }; + + const handleDeleteTask = async (task: WorkspaceTask) => { + await runAction(`delete:${task.id}`, async () => { + await deleteWorkspaceSymphonyTask(workspace.id, task.id); + if (selectedTaskId === task.id) { + setDraftTaskId(null); + } + return null; + }); + }; + + const handleToggleRuntime = async () => { + if ((snapshot?.status.state ?? "stopped") === "running") { + await runAction("runtime:stop", async () => { + await stopWorkspaceSymphony(workspace.id); + }); + return; + } + await runAction("runtime:start", async () => { + await startWorkspaceSymphony(workspace.id); + }); + }; + + const runtimeStatus = snapshot?.status; + const selectedTelemetry = telemetry?.task.id === selectedTask?.id ? telemetry : null; + const selectedRun = selectedTask?.activeRun ?? null; + const selectedLiveRun = selectedTelemetry?.liveRun ?? null; + const selectedActivity = + selectedLiveRun?.currentEvent ?? selectedRun?.lastEvent ?? "Waiting for activity."; + const selectedMessage = + selectedRun?.lastMessage ?? + selectedRun?.lastError ?? + (selectedTask ? buildTaskSummary(selectedTask) : "No message yet."); + const linkedWorktree = + (selectedRun?.worktreeWorkspaceId && + workspacesById[selectedRun.worktreeWorkspaceId]) || + null; + const runtimeHeaderCards = [ + { + label: "Health", + value: formatHealthLabel(runtimeStatus?.health ?? "stopped"), + meta: formatStateLabel(runtimeStatus?.state ?? "stopped"), + tone: runtimeStatus?.health ?? "stopped", + }, + { + label: "Running", + value: formatDuration(runtimeStatus?.uptimeMs), + meta: `Heartbeat ${formatRelativeTime(runtimeStatus?.lastHeartbeatAtMs)}`, + tone: "neutral", + }, + { + label: "Agents", + value: `${runtimeStatus?.activeAgents ?? 0}/${runtimeStatus?.maxAgents ?? 0}`, + meta: `${runtimeStatus?.activeTasks ?? 0} active tasks`, + tone: "neutral", + }, + { + label: "Input Tokens", + value: formatTokenCount(runtimeStatus?.inputTokens), + meta: `Output ${formatTokenCount(runtimeStatus?.outputTokens)}`, + tone: "neutral", + }, + { + label: "Total Tokens", + value: formatTokenCount(runtimeStatus?.totalTokens), + meta: `${runtimeStatus?.retryingTasks ?? 0} retrying`, + tone: "neutral", + }, + ]; + + return ( +
+
+
+
+
Live controls
+
Status, health, tokens, and agents.
+
+
+ {runtimeStatus?.logPath ? ( + + ) : null} + +
+
+ +
+ {runtimeHeaderCards.map((card) => ( +
+
{card.label}
+
{card.value}
+
{card.meta}
+
+ ))} +
+ +
+ + {formatHealthLabel(runtimeStatus?.health ?? "stopped")} + + + Binary: {runtimeStatus?.binaryVersion ?? runtimeStatus?.binaryPath ?? "Not resolved"} + + {runtimeStatus?.lastActivityAtMs ? ( + + Last activity {formatRelativeTime(runtimeStatus.lastActivityAtMs)} + + ) : null} +
+ {runtimeStatus?.lastError ? ( +
{runtimeStatus.lastError}
+ ) : null} +
+ + {error ?
{error}
: null} + +
+
+
+
Kanban
+
Queue and live task flow.
+
+
+ +
+ {COLUMNS.map((column) => { + const tasks = tasksByStatus.get(column.status) ?? []; + return ( +
+
+
+ {column.label} + {column.status === "backlog" ? ( + + ) : null} +
+ {tasks.length} +
+
+ {tasks.length === 0 ? ( +
No tasks.
+ ) : null} + {tasks.map((task) => ( + + ))} +
+
+ ); + })} +
+
+ + {selectedTask ? ( +
+
+
+
Selected task
+
+ {selectedTask.title} · Updated {formatRelativeTime(selectedTask.updatedAtMs)} +
+
+
+
+ + +
+ +
+
+ +
+
+
+ setEditTitle(event.target.value)} + placeholder="Task title" + /> +