diff --git a/src/common/orpc/schemas/project.ts b/src/common/orpc/schemas/project.ts index eb32c81df8..3c63ba98dd 100644 --- a/src/common/orpc/schemas/project.ts +++ b/src/common/orpc/schemas/project.ts @@ -74,10 +74,13 @@ export const WorkspaceConfigSchema = z.object({ description: "When true, switch_agent tool is enabled for this workspace (set when session starts from Auto agent).", }), - taskStatus: z.enum(["queued", "running", "awaiting_report", "reported"]).optional().meta({ - description: - "Agent task lifecycle status for child workspaces (queued|running|awaiting_report|reported).", - }), + taskStatus: z + .enum(["queued", "running", "awaiting_report", "interrupted", "reported"]) + .optional() + .meta({ + description: + "Agent task lifecycle status for child workspaces (queued|running|awaiting_report|interrupted|reported).", + }), reportedAt: z.string().optional().meta({ description: "ISO 8601 timestamp for when an agent task reported completion (optional).", }), diff --git a/src/common/orpc/schemas/workspace.ts b/src/common/orpc/schemas/workspace.ts index 139a92535e..d3a9e4096c 100644 --- a/src/common/orpc/schemas/workspace.ts +++ b/src/common/orpc/schemas/workspace.ts @@ -46,10 +46,13 @@ export const WorkspaceMetadataSchema = z.object({ description: 'If set, selects an agent definition for this workspace (e.g., "explore" or "exec").', }), - taskStatus: z.enum(["queued", "running", "awaiting_report", "reported"]).optional().meta({ - description: - "Agent task lifecycle status for child workspaces (queued|running|awaiting_report|reported).", - }), + taskStatus: z + .enum(["queued", "running", "awaiting_report", "interrupted", "reported"]) + .optional() + .meta({ + description: + "Agent task lifecycle status for child workspaces (queued|running|awaiting_report|interrupted|reported).", + }), reportedAt: z.string().optional().meta({ description: "ISO 8601 timestamp for when an agent task reported completion (optional).", }), diff --git a/src/common/utils/tools/toolDefinitions.ts b/src/common/utils/tools/toolDefinitions.ts index 255a59baa3..9825341a65 100644 --- a/src/common/utils/tools/toolDefinitions.ts +++ b/src/common/utils/tools/toolDefinitions.ts @@ -481,7 +481,13 @@ export const TaskTerminateToolResultSchema = z // task_list (list descendant sub-agent tasks) // ----------------------------------------------------------------------------- -const TaskListStatusSchema = z.enum(["queued", "running", "awaiting_report", "reported"]); +const TaskListStatusSchema = z.enum([ + "queued", + "running", + "awaiting_report", + "interrupted", + "reported", +]); const TaskListThinkingLevelSchema = z.enum(["off", "low", "medium", "high", "xhigh", "max"]); export const TaskListToolArgsSchema = z diff --git a/src/node/services/taskService.test.ts b/src/node/services/taskService.test.ts index f7c20e0f33..c3ac4a51ec 100644 --- a/src/node/services/taskService.test.ts +++ b/src/node/services/taskService.test.ts @@ -157,6 +157,7 @@ function createWorkspaceServiceMocks( overrides?: Partial<{ sendMessage: ReturnType; resumeStream: ReturnType; + clearQueue: ReturnType; remove: ReturnType; emit: ReturnType; getInfo: ReturnType; @@ -167,6 +168,7 @@ function createWorkspaceServiceMocks( workspaceService: WorkspaceService; sendMessage: ReturnType; resumeStream: ReturnType; + clearQueue: ReturnType; remove: ReturnType; emit: ReturnType; getInfo: ReturnType; @@ -178,6 +180,7 @@ function createWorkspaceServiceMocks( const resumeStream = overrides?.resumeStream ?? mock((): Promise> => Promise.resolve(Ok({ started: true }))); + const clearQueue = overrides?.clearQueue ?? mock((): Result => Ok(undefined)); const remove = overrides?.remove ?? mock((): Promise> => Promise.resolve(Ok(undefined))); const emit = overrides?.emit ?? mock(() => true); @@ -191,6 +194,7 @@ function createWorkspaceServiceMocks( workspaceService: { sendMessage, resumeStream, + clearQueue, remove, emit, getInfo, @@ -199,6 +203,7 @@ function createWorkspaceServiceMocks( } as unknown as WorkspaceService, sendMessage, resumeStream, + clearQueue, remove, emit, getInfo, @@ -1840,7 +1845,7 @@ describe("TaskService", () => { expect(remove).toHaveBeenNthCalledWith(2, parentTaskId, true); }); - test("terminateAllDescendantAgentTasks terminates entire subtree leaf-first", async () => { + test("terminateAllDescendantAgentTasks interrupts entire subtree leaf-first", async () => { const config = await createTestConfig(rootDir); const projectPath = path.join(rootDir, "repo"); @@ -1878,13 +1883,25 @@ describe("TaskService", () => { taskSettings: { maxParallelAgentTasks: 3, maxTaskNestingDepth: 3 }, }); - const { aiService, stopStream } = createAIServiceMocks(config); - const { workspaceService, remove } = createWorkspaceServiceMocks(); + const callOrder: string[] = []; + const clearQueue = mock((workspaceId: string): Result => { + callOrder.push(`clear:${workspaceId}`); + return Ok(undefined); + }); + const stopStream = mock((workspaceId: string): Promise> => { + callOrder.push(`stop:${workspaceId}`); + return Promise.resolve(Ok(undefined)); + }); + + const { aiService } = createAIServiceMocks(config, { stopStream }); + const { workspaceService, remove } = createWorkspaceServiceMocks({ clearQueue }); const { taskService } = createTaskServiceHarness(config, { aiService, workspaceService }); - const terminatedTaskIds = await taskService.terminateAllDescendantAgentTasks(rootWorkspaceId); - expect(terminatedTaskIds).toEqual([childTaskId, parentTaskId]); + const interruptedTaskIds = await taskService.terminateAllDescendantAgentTasks(rootWorkspaceId); + expect(interruptedTaskIds).toEqual([childTaskId, parentTaskId]); + expect(clearQueue).toHaveBeenNthCalledWith(1, childTaskId); + expect(clearQueue).toHaveBeenNthCalledWith(2, parentTaskId); expect(stopStream).toHaveBeenNthCalledWith( 1, childTaskId, @@ -1895,8 +1912,20 @@ describe("TaskService", () => { parentTaskId, expect.objectContaining({ abandonPartial: true }) ); - expect(remove).toHaveBeenNthCalledWith(1, childTaskId, true); - expect(remove).toHaveBeenNthCalledWith(2, parentTaskId, true); + expect(callOrder).toEqual([ + `clear:${childTaskId}`, + `stop:${childTaskId}`, + `clear:${parentTaskId}`, + `stop:${parentTaskId}`, + ]); + expect(remove).not.toHaveBeenCalled(); + + const saved = config.loadConfigOrDefault(); + const tasks = saved.projects.get(projectPath)?.workspaces ?? []; + const parentTask = tasks.find((workspace) => workspace.id === parentTaskId); + const childTask = tasks.find((workspace) => workspace.id === childTaskId); + expect(parentTask?.taskStatus).toBe("interrupted"); + expect(childTask?.taskStatus).toBe("interrupted"); }); test("terminateAllDescendantAgentTasks is a no-op with no descendants", async () => { @@ -1929,6 +1958,172 @@ describe("TaskService", () => { expect(remove).not.toHaveBeenCalled(); }); + test("terminateAllDescendantAgentTasks preserves queued task prompts across repeated interrupts", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const queuedTaskId = "task-queued"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "queued-task"), + id: queuedTaskId, + name: "agent_exec_queued", + parentWorkspaceId: rootWorkspaceId, + agentType: "exec", + taskStatus: "queued", + taskPrompt: "resume me later", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const { taskService } = createTaskServiceHarness(config); + + const firstInterruptedTaskIds = + await taskService.terminateAllDescendantAgentTasks(rootWorkspaceId); + expect(firstInterruptedTaskIds).toEqual([queuedTaskId]); + + const secondInterruptedTaskIds = + await taskService.terminateAllDescendantAgentTasks(rootWorkspaceId); + expect(secondInterruptedTaskIds).toEqual([queuedTaskId]); + + const saved = config.loadConfigOrDefault(); + const tasks = saved.projects.get(projectPath)?.workspaces ?? []; + const queuedTask = tasks.find((workspace) => workspace.id === queuedTaskId); + expect(queuedTask?.taskStatus).toBe("interrupted"); + expect(queuedTask?.taskPrompt).toBe("resume me later"); + }); + + test("markInterruptedTaskRunning restores interrupted descendant tasks to running without clearing prompt", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const childTaskId = "task-child"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "child-task"), + id: childTaskId, + name: "agent_explore_child", + parentWorkspaceId: rootWorkspaceId, + agentType: "explore", + taskStatus: "interrupted", + taskPrompt: "stale prompt", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const { taskService } = createTaskServiceHarness(config); + + const transitioned = await taskService.markInterruptedTaskRunning(childTaskId); + expect(transitioned).toBe(true); + + const saved = config.loadConfigOrDefault(); + const tasks = saved.projects.get(projectPath)?.workspaces ?? []; + const childTask = tasks.find((workspace) => workspace.id === childTaskId); + expect(childTask?.taskStatus).toBe("running"); + expect(childTask?.taskPrompt).toBe("stale prompt"); + }); + + test("markInterruptedTaskRunning is a no-op for non-interrupted workspaces", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const childTaskId = "task-child"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "child-task"), + id: childTaskId, + name: "agent_explore_child", + parentWorkspaceId: rootWorkspaceId, + agentType: "explore", + taskStatus: "running", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const editConfigSpy = spyOn(config, "editConfig"); + const { taskService } = createTaskServiceHarness(config); + + const transitioned = await taskService.markInterruptedTaskRunning(childTaskId); + + expect(transitioned).toBe(false); + expect(editConfigSpy).not.toHaveBeenCalled(); + }); + + test("restoreInterruptedTaskAfterResumeFailure reverts running descendant tasks", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const rootWorkspaceId = "root-111"; + const childTaskId = "task-child"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "root"), id: rootWorkspaceId, name: "root" }, + { + path: path.join(projectPath, "child-task"), + id: childTaskId, + name: "agent_explore_child", + parentWorkspaceId: rootWorkspaceId, + agentType: "explore", + taskStatus: "running", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const { taskService } = createTaskServiceHarness(config); + + await taskService.restoreInterruptedTaskAfterResumeFailure(childTaskId); + + const saved = config.loadConfigOrDefault(); + const tasks = saved.projects.get(projectPath)?.workspaces ?? []; + const childTask = tasks.find((workspace) => workspace.id === childTaskId); + expect(childTask?.taskStatus).toBe("interrupted"); + }); + test("initialize resumes awaiting_report tasks after restart", async () => { const config = await createTestConfig(rootDir); @@ -2093,6 +2288,50 @@ describe("TaskService", () => { expect(report.reportMarkdown).toBe("ok"); }); + test("waitForAgentReport rejects interrupted tasks without waiting", async () => { + const config = await createTestConfig(rootDir); + + const projectPath = path.join(rootDir, "repo"); + const parentId = "parent-111"; + const childId = "child-222"; + + await config.saveConfig({ + projects: new Map([ + [ + projectPath, + { + workspaces: [ + { path: path.join(projectPath, "parent"), id: parentId, name: "parent" }, + { + path: path.join(projectPath, "child"), + id: childId, + name: "agent_explore_child", + parentWorkspaceId: parentId, + agentType: "explore", + taskStatus: "interrupted", + }, + ], + }, + ], + ]), + taskSettings: { maxParallelAgentTasks: 1, maxTaskNestingDepth: 3 }, + }); + + const { taskService } = createTaskServiceHarness(config); + + let caught: unknown = null; + try { + await taskService.waitForAgentReport(childId, { timeoutMs: 10_000 }); + } catch (error: unknown) { + caught = error; + } + + expect(caught).toBeInstanceOf(Error); + if (caught instanceof Error) { + expect(caught.message).toMatch(/Task interrupted/); + } + }); + test("waitForAgentReport returns persisted report after workspace is removed", async () => { const config = await createTestConfig(rootDir); diff --git a/src/node/services/taskService.ts b/src/node/services/taskService.ts index fee18682a7..d92e1983fd 100644 --- a/src/node/services/taskService.ts +++ b/src/node/services/taskService.ts @@ -1160,10 +1160,16 @@ export class TaskService { } /** - * Terminate all descendant agent tasks for a workspace (leaf-first). + * Interrupt all descendant agent tasks for a workspace (leaf-first). * * Rationale: when a user hard-interrupts a parent workspace, descendants must * also stop so they cannot later auto-resume the interrupted parent. + * + * Keep interrupted task workspaces on disk so users can inspect or manually + * resume them later. + * + * Legacy naming note: this method retains the original "terminate" name for + * compatibility with existing call sites. */ async terminateAllDescendantAgentTasks(workspaceId: string): Promise { assert( @@ -1171,7 +1177,7 @@ export class TaskService { "terminateAllDescendantAgentTasks: workspaceId must be non-empty" ); - const terminatedTaskIds: string[] = []; + const interruptedTaskIds: string[] = []; { await using _lock = await this.mutex.acquire(); @@ -1180,10 +1186,10 @@ export class TaskService { const index = this.buildAgentTaskIndex(cfg); const descendants = this.listDescendantAgentTaskIdsFromIndex(index, workspaceId); if (descendants.length === 0) { - return terminatedTaskIds; + return interruptedTaskIds; } - // Delete leaves first to avoid leaving children with missing parents. + // Interrupt leaves first to avoid descendant/ancestor status races. const parentById = index.parentById; const depthById = new Map(); for (const id of descendants) { @@ -1191,9 +1197,23 @@ export class TaskService { } descendants.sort((a, b) => (depthById.get(b) ?? 0) - (depthById.get(a) ?? 0)); - const terminationError = new Error("Parent workspace interrupted"); + const interruptionError = new Error("Parent workspace interrupted"); for (const id of descendants) { + // Best-effort: clear queue first. AgentSession stream-end cleanup auto-flushes + // queued messages, so descendants must not keep pending input after a hard interrupt. + try { + const clearQueueResult = this.workspaceService.clearQueue(id); + if (!clearQueueResult.success) { + log.debug("terminateAllDescendantAgentTasks: clearQueue failed", { + taskId: id, + error: clearQueueResult.error, + }); + } + } catch (error: unknown) { + log.debug("terminateAllDescendantAgentTasks: clearQueue threw", { taskId: id, error }); + } + // Best-effort: stop any active stream immediately to avoid further token usage. try { const stopResult = await this.aiService.stopStream(id, { abandonPartial: true }); @@ -1206,25 +1226,47 @@ export class TaskService { this.remindedAwaitingReport.delete(id); this.completedReportsByTaskId.delete(id); - this.rejectWaiters(id, terminationError); + this.rejectWaiters(id, interruptionError); - const removeResult = await this.workspaceService.remove(id, true); - if (!removeResult.success) { - log.error("terminateAllDescendantAgentTasks: failed to remove task workspace", { + const updated = await this.editWorkspaceEntry( + id, + (ws) => { + const previousStatus = ws.taskStatus; + const persistedQueuedPrompt = coerceNonEmptyString(ws.taskPrompt); + ws.taskStatus = "interrupted"; + + // Queued tasks persist their initial prompt in config until first start. + // Preserve that prompt when interrupting queued descendants so users can + // still inspect/resume the preserved workspace intent. + // + // Also preserve across repeated hard interrupts: once a never-started task + // is first interrupted, its status becomes "interrupted". Later cascades + // must not clear the same persisted prompt. + if (previousStatus !== "queued" && !persistedQueuedPrompt) { + ws.taskPrompt = undefined; + } + }, + { allowMissing: true } + ); + if (!updated) { + log.debug("terminateAllDescendantAgentTasks: descendant workspace missing", { taskId: id, - error: removeResult.error, }); continue; } - terminatedTaskIds.push(id); + interruptedTaskIds.push(id); } } + for (const taskId of interruptedTaskIds) { + await this.emitWorkspaceMetadata(taskId); + } + // Free slots and start any queued tasks (best-effort). await this.maybeStartQueuedTasks(); - return terminatedTaskIds; + return interruptedTaskIds; } private async rollbackFailedTaskCreate( @@ -1345,12 +1387,17 @@ export class TaskService { // persisted artifact from the requesting workspace session dir. const cfg = this.config.loadConfigOrDefault(); const taskWorkspaceEntry = findWorkspaceEntry(cfg, taskId); - if (!taskWorkspaceEntry || taskWorkspaceEntry.workspace.taskStatus === "reported") { + const taskStatus = taskWorkspaceEntry?.workspace.taskStatus; + if (!taskWorkspaceEntry || taskStatus === "reported" || taskStatus === "interrupted") { const persisted = await tryReadPersistedReport(); if (persisted) { return persisted; } + if (taskStatus === "interrupted") { + throw new Error("Task interrupted"); + } + throw new Error("Task not found"); } @@ -1370,14 +1417,23 @@ export class TaskService { return; } - if (taskWorkspaceEntry.workspace.taskStatus === "reported") { + if ( + taskWorkspaceEntry.workspace.taskStatus === "reported" || + taskWorkspaceEntry.workspace.taskStatus === "interrupted" + ) { const persisted = await tryReadPersistedReport(); if (persisted) { resolve(persisted); return; } - reject(new Error("Task not found")); + reject( + new Error( + taskWorkspaceEntry.workspace.taskStatus === "interrupted" + ? "Task interrupted" + : "Task not found" + ) + ); return; } @@ -2292,6 +2348,87 @@ export class TaskService { this.interruptedParentWorkspaceIds.add(workspaceId); } + /** + * If a preserved descendant task workspace was previously interrupted and the user manually + * resumes it, restore taskStatus=running so stream-end finalization can proceed normally. + * + * Returns true only when a state transition happened. + */ + async markInterruptedTaskRunning(workspaceId: string): Promise { + assert(workspaceId.length > 0, "markInterruptedTaskRunning: workspaceId must be non-empty"); + + const configAtStart = this.config.loadConfigOrDefault(); + const entryAtStart = findWorkspaceEntry(configAtStart, workspaceId); + if (!entryAtStart?.workspace.parentWorkspaceId) { + return false; + } + if (entryAtStart.workspace.taskStatus !== "interrupted") { + return false; + } + + let transitionedToRunning = false; + await this.editWorkspaceEntry( + workspaceId, + (ws) => { + // Only descendant task workspaces have task lifecycle status. + if (!ws.parentWorkspaceId) { + return; + } + if (ws.taskStatus !== "interrupted") { + return; + } + + // Preserve taskPrompt here: interrupted queued tasks store their only initial + // prompt in config. If send/resume fails, restoreInterruptedTaskAfterResumeFailure + // must be able to retain that original prompt for inspection/retry. + ws.taskStatus = "running"; + transitionedToRunning = true; + }, + { allowMissing: true } + ); + + if (!transitionedToRunning) { + return false; + } + + await this.emitWorkspaceMetadata(workspaceId); + return true; + } + + /** + * Revert a pre-stream interrupted->running transition when send/resume fails to start + * or complete. This preserves fail-fast interrupted semantics for task_await. + */ + async restoreInterruptedTaskAfterResumeFailure(workspaceId: string): Promise { + assert( + workspaceId.length > 0, + "restoreInterruptedTaskAfterResumeFailure: workspaceId must be non-empty" + ); + + let revertedToInterrupted = false; + await this.editWorkspaceEntry( + workspaceId, + (ws) => { + if (!ws.parentWorkspaceId) { + return; + } + if (ws.taskStatus !== "running") { + return; + } + + ws.taskStatus = "interrupted"; + revertedToInterrupted = true; + }, + { allowMissing: true } + ); + + if (!revertedToInterrupted) { + return; + } + + await this.emitWorkspaceMetadata(workspaceId); + } + private async handleStreamEnd(event: StreamEndEvent): Promise { const workspaceId = event.workspaceId; @@ -2363,6 +2500,9 @@ export class TaskService { } const status = entry.workspace.taskStatus; + if (status === "interrupted") { + return; + } if (status === "reported") { await this.finalizeTerminationPhaseForReportedTask(workspaceId); return; diff --git a/src/node/services/workspaceService.test.ts b/src/node/services/workspaceService.test.ts index 73e762abc1..fab0ad968b 100644 --- a/src/node/services/workspaceService.test.ts +++ b/src/node/services/workspaceService.test.ts @@ -192,6 +192,7 @@ describe("WorkspaceService sendMessage status clearing", () => { isBusy: ReturnType; queueMessage: ReturnType; sendMessage: ReturnType; + resumeStream: ReturnType; }; beforeEach(async () => { @@ -262,6 +263,7 @@ describe("WorkspaceService sendMessage status clearing", () => { isBusy: mock(() => true), queueMessage: mock(() => undefined), sendMessage: mock(() => Promise.resolve(Ok(undefined))), + resumeStream: mock(() => Promise.resolve(Ok({ started: true }))), }; ( @@ -326,6 +328,186 @@ describe("WorkspaceService sendMessage status clearing", () => { expect(updateAgentStatus).not.toHaveBeenCalled(); }); + test("sendMessage restores interrupted task status before successful send", async () => { + fakeSession.isBusy.mockReturnValue(false); + + const markInterruptedTaskRunning = mock(() => Promise.resolve(true)); + const restoreInterruptedTaskAfterResumeFailure = mock(() => Promise.resolve()); + workspaceService.setTaskService({ + markInterruptedTaskRunning, + restoreInterruptedTaskAfterResumeFailure, + resetAutoResumeCount: mock(() => undefined), + } as unknown as TaskService); + + const result = await workspaceService.sendMessage("test-workspace", "hello", { + model: "openai:gpt-4o-mini", + agentId: "exec", + }); + + expect(result.success).toBe(true); + expect(markInterruptedTaskRunning).toHaveBeenCalledWith("test-workspace"); + expect(restoreInterruptedTaskAfterResumeFailure).not.toHaveBeenCalled(); + }); + + test("resumeStream restores interrupted task status before successful resume", async () => { + const markInterruptedTaskRunning = mock(() => Promise.resolve(true)); + const restoreInterruptedTaskAfterResumeFailure = mock(() => Promise.resolve()); + workspaceService.setTaskService({ + markInterruptedTaskRunning, + restoreInterruptedTaskAfterResumeFailure, + resetAutoResumeCount: mock(() => undefined), + } as unknown as TaskService); + + const result = await workspaceService.resumeStream("test-workspace", { + model: "openai:gpt-4o-mini", + agentId: "exec", + }); + + expect(result.success).toBe(true); + expect(markInterruptedTaskRunning).toHaveBeenCalledWith("test-workspace"); + expect(restoreInterruptedTaskAfterResumeFailure).not.toHaveBeenCalled(); + }); + + test("resumeStream keeps interrupted task status when no stream starts", async () => { + fakeSession.resumeStream.mockResolvedValue(Ok({ started: false })); + + const markInterruptedTaskRunning = mock(() => Promise.resolve(true)); + const restoreInterruptedTaskAfterResumeFailure = mock(() => Promise.resolve()); + workspaceService.setTaskService({ + markInterruptedTaskRunning, + restoreInterruptedTaskAfterResumeFailure, + resetAutoResumeCount: mock(() => undefined), + } as unknown as TaskService); + + const result = await workspaceService.resumeStream("test-workspace", { + model: "openai:gpt-4o-mini", + agentId: "exec", + }); + + expect(result.success).toBe(true); + if (result.success) { + expect(result.data.started).toBe(false); + } + expect(markInterruptedTaskRunning).toHaveBeenCalledWith("test-workspace"); + expect(restoreInterruptedTaskAfterResumeFailure).toHaveBeenCalledWith("test-workspace"); + }); + + test("resumeStream does not start interrupted tasks while still busy", async () => { + const getAgentTaskStatus = mock(() => "interrupted" as const); + const markInterruptedTaskRunning = mock(() => Promise.resolve(false)); + workspaceService.setTaskService({ + getAgentTaskStatus, + markInterruptedTaskRunning, + resetAutoResumeCount: mock(() => undefined), + } as unknown as TaskService); + + const result = await workspaceService.resumeStream("test-workspace", { + model: "openai:gpt-4o-mini", + agentId: "exec", + }); + + expect(result.success).toBe(false); + if (!result.success && result.error.type === "unknown") { + expect(result.error.raw).toContain("Interrupted task is still winding down"); + } + expect(getAgentTaskStatus).toHaveBeenCalledWith("test-workspace"); + expect(markInterruptedTaskRunning).not.toHaveBeenCalled(); + expect(fakeSession.resumeStream).not.toHaveBeenCalled(); + }); + + test("sendMessage does not queue interrupted tasks while still busy", async () => { + const getAgentTaskStatus = mock(() => "interrupted" as const); + const markInterruptedTaskRunning = mock(() => Promise.resolve(false)); + workspaceService.setTaskService({ + getAgentTaskStatus, + markInterruptedTaskRunning, + resetAutoResumeCount: mock(() => undefined), + } as unknown as TaskService); + + const result = await workspaceService.sendMessage("test-workspace", "hello", { + model: "openai:gpt-4o-mini", + agentId: "exec", + }); + + expect(result.success).toBe(false); + if (!result.success && result.error.type === "unknown") { + expect(result.error.raw).toContain("Interrupted task is still winding down"); + } + expect(getAgentTaskStatus).toHaveBeenCalledWith("test-workspace"); + expect(markInterruptedTaskRunning).not.toHaveBeenCalled(); + expect(fakeSession.queueMessage).not.toHaveBeenCalled(); + }); + + test("sendMessage restores interrupted status when resumed send fails", async () => { + fakeSession.isBusy.mockReturnValue(false); + fakeSession.sendMessage.mockResolvedValue( + Err({ + type: "unknown" as const, + raw: "runtime startup failed after user turn persisted", + }) + ); + + const markInterruptedTaskRunning = mock(() => Promise.resolve(true)); + const restoreInterruptedTaskAfterResumeFailure = mock(() => Promise.resolve()); + workspaceService.setTaskService({ + markInterruptedTaskRunning, + restoreInterruptedTaskAfterResumeFailure, + resetAutoResumeCount: mock(() => undefined), + } as unknown as TaskService); + + const result = await workspaceService.sendMessage("test-workspace", "hello", { + model: "openai:gpt-4o-mini", + agentId: "exec", + }); + + expect(result.success).toBe(false); + expect(markInterruptedTaskRunning).toHaveBeenCalledWith("test-workspace"); + expect(restoreInterruptedTaskAfterResumeFailure).toHaveBeenCalledWith("test-workspace"); + }); + + test("sendMessage restores interrupted status when resumed send throws", async () => { + fakeSession.isBusy.mockReturnValue(false); + fakeSession.sendMessage.mockRejectedValue(new Error("send explode")); + + const markInterruptedTaskRunning = mock(() => Promise.resolve(true)); + const restoreInterruptedTaskAfterResumeFailure = mock(() => Promise.resolve()); + workspaceService.setTaskService({ + markInterruptedTaskRunning, + restoreInterruptedTaskAfterResumeFailure, + resetAutoResumeCount: mock(() => undefined), + } as unknown as TaskService); + + const result = await workspaceService.sendMessage("test-workspace", "hello", { + model: "openai:gpt-4o-mini", + agentId: "exec", + }); + + expect(result.success).toBe(false); + expect(markInterruptedTaskRunning).toHaveBeenCalledWith("test-workspace"); + expect(restoreInterruptedTaskAfterResumeFailure).toHaveBeenCalledWith("test-workspace"); + }); + + test("resumeStream restores interrupted status when resumed stream throws", async () => { + fakeSession.resumeStream.mockRejectedValue(new Error("resume explode")); + + const markInterruptedTaskRunning = mock(() => Promise.resolve(true)); + const restoreInterruptedTaskAfterResumeFailure = mock(() => Promise.resolve()); + workspaceService.setTaskService({ + markInterruptedTaskRunning, + restoreInterruptedTaskAfterResumeFailure, + resetAutoResumeCount: mock(() => undefined), + } as unknown as TaskService); + + const result = await workspaceService.resumeStream("test-workspace", { + model: "openai:gpt-4o-mini", + agentId: "exec", + }); + + expect(result.success).toBe(false); + expect(markInterruptedTaskRunning).toHaveBeenCalledWith("test-workspace"); + expect(restoreInterruptedTaskAfterResumeFailure).toHaveBeenCalledWith("test-workspace"); + }); + test("does not clear persisted agent status directly when direct send fails after turn acceptance", async () => { fakeSession.isBusy.mockReturnValue(false); fakeSession.sendMessage.mockResolvedValue( diff --git a/src/node/services/workspaceService.ts b/src/node/services/workspaceService.ts index f3998ce4af..3f1bab1ec0 100644 --- a/src/node/services/workspaceService.ts +++ b/src/node/services/workspaceService.ts @@ -3187,6 +3187,7 @@ export class WorkspaceService extends EventEmitter { options, }); + let resumedInterruptedTask = false; try { // Block streaming while workspace is being renamed to prevent path conflicts if (this.renamingWorkspaces.has(workspaceId)) { @@ -3296,6 +3297,14 @@ export class WorkspaceService extends EventEmitter { const shouldQueue = !normalizedOptions?.editMessageId && session.isBusy(); if (shouldQueue) { + const taskStatus = this.taskService?.getAgentTaskStatus?.(workspaceId); + if (taskStatus === "interrupted") { + return Err({ + type: "unknown", + raw: "Interrupted task is still winding down. Wait until it is idle, then try again.", + }); + } + if (internal?.requireIdle) { return Err({ type: "unknown", @@ -3329,6 +3338,20 @@ export class WorkspaceService extends EventEmitter { if (!internal?.skipAutoResumeReset) { this.taskService?.resetAutoResumeCount(workspaceId); } + + // Non-destructive interrupt cascades preserve descendant task workspaces with + // taskStatus=interrupted. Transition before starting a new stream so TaskService + // stream-end handling does not early-return on interrupted status. + try { + resumedInterruptedTask = + (await this.taskService?.markInterruptedTaskRunning?.(workspaceId)) ?? false; + } catch (error: unknown) { + log.error("Failed to restore interrupted task status before sendMessage", { + workspaceId, + error, + }); + } + const result = await session.sendMessage(message, normalizedOptions, { synthetic: internal?.synthetic, }); @@ -3338,11 +3361,33 @@ export class WorkspaceService extends EventEmitter { error: result.error, }); + if (resumedInterruptedTask) { + try { + await this.taskService?.restoreInterruptedTaskAfterResumeFailure?.(workspaceId); + } catch (error: unknown) { + log.error("Failed to restore interrupted task status after sendMessage failure", { + workspaceId, + error, + }); + } + } + return result; } return result; } catch (error) { + if (resumedInterruptedTask) { + try { + await this.taskService?.restoreInterruptedTaskAfterResumeFailure?.(workspaceId); + } catch (restoreError: unknown) { + log.error("Failed to restore interrupted task status after sendMessage throw", { + workspaceId, + error: restoreError, + }); + } + } + const errorMessage = error instanceof Error ? error.message : JSON.stringify(error, null, 2); log.error("Unexpected error in sendMessage handler:", error); @@ -3368,6 +3413,7 @@ export class WorkspaceService extends EventEmitter { options: SendMessageOptions, internal?: { allowQueuedAgentTask?: boolean } ): Promise> { + let resumedInterruptedTask = false; try { // Block streaming while workspace is being renamed to prevent path conflicts if (this.renamingWorkspaces.has(workspaceId)) { @@ -3423,20 +3469,80 @@ export class WorkspaceService extends EventEmitter { const session = this.getOrCreateSession(workspaceId); + const taskStatus = this.taskService?.getAgentTaskStatus?.(workspaceId); + if (taskStatus === "interrupted" && session.isBusy()) { + return Err({ + type: "unknown", + raw: "Interrupted task is still winding down. Wait until it is idle, then try again.", + }); + } + const normalizedOptions = this.normalizeSendMessageAgentId(options); // Persist last-used model + thinking level for cross-device consistency. await this.maybePersistAISettingsFromOptions(workspaceId, normalizedOptions, "resume"); + // Non-destructive interrupt cascades preserve descendant task workspaces with + // taskStatus=interrupted. Transition before stream start so TaskService stream-end + // handling does not early-return on interrupted status. + try { + resumedInterruptedTask = + (await this.taskService?.markInterruptedTaskRunning?.(workspaceId)) ?? false; + } catch (error: unknown) { + log.error("Failed to restore interrupted task status before resumeStream", { + workspaceId, + error, + }); + } + const result = await session.resumeStream(normalizedOptions); if (!result.success) { log.error("resumeStream handler: session returned error", { workspaceId, error: result.error, }); + if (resumedInterruptedTask) { + try { + await this.taskService?.restoreInterruptedTaskAfterResumeFailure?.(workspaceId); + } catch (error: unknown) { + log.error("Failed to restore interrupted task status after resumeStream failure", { + workspaceId, + error, + }); + } + } + return result; } + + // resumeStream can succeed without starting a new stream when the session is + // still busy (started=false). Keep interrupted semantics in that case. + if (!result.data.started) { + if (resumedInterruptedTask) { + try { + await this.taskService?.restoreInterruptedTaskAfterResumeFailure?.(workspaceId); + } catch (error: unknown) { + log.error("Failed to restore interrupted task status after no-op resumeStream", { + workspaceId, + error, + }); + } + } + return result; + } + return result; } catch (error) { + if (resumedInterruptedTask) { + try { + await this.taskService?.restoreInterruptedTaskAfterResumeFailure?.(workspaceId); + } catch (restoreError: unknown) { + log.error("Failed to restore interrupted task status after resumeStream throw", { + workspaceId, + error: restoreError, + }); + } + } + const errorMessage = getErrorMessage(error); log.error("Unexpected error in resumeStream handler:", error); @@ -3531,16 +3637,16 @@ export class WorkspaceService extends EventEmitter { // descendant sub-agents cannot finish later and auto-resume this workspace. if (!options?.soft) { try { - const terminatedTaskIds = + const interruptedTaskIds = await this.taskService?.terminateAllDescendantAgentTasks?.(workspaceId); - if (terminatedTaskIds && terminatedTaskIds.length > 0) { - log.debug("Cascade-terminated descendant tasks on interrupt", { + if (interruptedTaskIds && interruptedTaskIds.length > 0) { + log.debug("Cascade-interrupted descendant tasks on interrupt", { workspaceId, - terminatedTaskIds, + interruptedTaskIds, }); } } catch (error: unknown) { - log.error("Failed to cascade-terminate descendant tasks on interrupt", { + log.error("Failed to cascade-interrupt descendant tasks on interrupt", { workspaceId, error, });