From 39a44787ac20342b3cae70f2994369e311f6e317 Mon Sep 17 00:00:00 2001 From: Jacob Alber Date: Tue, 10 Feb 2026 14:08:31 +0000 Subject: [PATCH] Fix CheckpointInfo.Parent always null in InProcessRunner (#3796) Track the last CheckpointInfo in InProcessRunner so that newly created checkpoints reference their parent. When resuming from a checkpoint, the resumed-from checkpoint becomes the parent of the next checkpoint. Adds tests verifying: - First checkpoint has null parent - Subsequent checkpoints chain parents correctly - Checkpoint after resume references the resumed-from checkpoint --- .../InProc/InProcessRunner.cs | 10 +- .../CheckpointParentTests.cs | 177 ++++++++++++++++++ 2 files changed, 183 insertions(+), 4 deletions(-) create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs index 58e1890eed..efdcb786e5 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs @@ -245,6 +245,7 @@ await this.RaiseWorkflowEventAsync(this.StepTracer.Complete(this.RunContext.Next } private WorkflowInfo? _workflowInfoCache; + private CheckpointInfo? _lastCheckpointInfo; private readonly List _checkpoints = []; internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = default) { @@ -270,10 +271,10 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d RunnerStateData runnerData = await this.RunContext.ExportStateAsync().ConfigureAwait(false); Dictionary stateData = await this.RunContext.StateManager.ExportStateAsync().ConfigureAwait(false); - Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData); - CheckpointInfo checkpointInfo = await this.CheckpointManager.CommitCheckpointAsync(this.RunId, checkpoint).ConfigureAwait(false); - this.StepTracer.TraceCheckpointCreated(checkpointInfo); - this._checkpoints.Add(checkpointInfo); + Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData, this._lastCheckpointInfo); + this._lastCheckpointInfo = await this.CheckpointManager.CommitCheckpointAsync(this.RunId, checkpoint).ConfigureAwait(false); + this.StepTracer.TraceCheckpointCreated(this._lastCheckpointInfo); + this._checkpoints.Add(this._lastCheckpointInfo); } public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default) @@ -304,6 +305,7 @@ public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, Can await this.EdgeMap.ImportStateAsync(checkpoint).ConfigureAwait(false); await Task.WhenAll(executorNotifyTask, republishRequestsTask.AsTask()).ConfigureAwait(false); + this._lastCheckpointInfo = checkpointInfo; this.StepTracer.Reload(this.StepTracer.StepNumber); } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs new file mode 100644 index 0000000000..223d0b585d --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/CheckpointParentTests.cs @@ -0,0 +1,177 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.Checkpointing; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +/// +/// Tests for verifying that CheckpointInfo.Parent is properly populated +/// when checkpoints are created during workflow execution (GH #3796). +/// +public class CheckpointParentTests +{ + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(ExecutionEnvironment environment) + { + // Arrange: A simple two-step workflow that will produce at least one checkpoint. + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); + + // Act + Checkpointed checkpointed = + await env.StreamAsync(workflow, "Hello", checkpointManager); + + List checkpoints = []; + await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync()) + { + if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + { + checkpoints.Add(cp); + } + } + + // Assert: The first checkpoint should have been created and stored with a null parent. + checkpoints.Should().NotBeEmpty("at least one checkpoint should have been created"); + + CheckpointInfo firstCheckpoint = checkpoints[0]; + Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager) + .LookupCheckpointAsync(firstCheckpoint.RunId, firstCheckpoint); + storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent"); + } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(ExecutionEnvironment environment) + { + // Arrange: A workflow with a loop that will produce multiple checkpoints. + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + + // A -> B -> A (loop) to generate multiple supersteps/checkpoints. + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .AddEdge(executorB, executorA) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); + + // Act + await using Checkpointed checkpointed = + await env.StreamAsync(workflow, "Hello", checkpointManager); + + List checkpoints = []; + using CancellationTokenSource cts = new(); + + await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync(cts.Token)) + { + if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + { + checkpoints.Add(cp); + if (checkpoints.Count >= 3) + { + cts.Cancel(); + } + } + } + + // Assert: We should have at least 3 checkpoints + checkpoints.Should().HaveCountGreaterThanOrEqualTo(3); + + // Verify the parent chain + Checkpoint stored0 = await ((ICheckpointManager)checkpointManager) + .LookupCheckpointAsync(checkpoints[0].RunId, checkpoints[0]); + stored0.Parent.Should().BeNull("the first checkpoint should have no parent"); + + Checkpoint stored1 = await ((ICheckpointManager)checkpointManager) + .LookupCheckpointAsync(checkpoints[1].RunId, checkpoints[1]); + stored1.Parent.Should().NotBeNull("the second checkpoint should have a parent"); + stored1.Parent.Should().Be(checkpoints[0], "the second checkpoint's parent should be the first checkpoint"); + + Checkpoint stored2 = await ((ICheckpointManager)checkpointManager) + .LookupCheckpointAsync(checkpoints[2].RunId, checkpoints[2]); + stored2.Parent.Should().NotBeNull("the third checkpoint should have a parent"); + stored2.Parent.Should().Be(checkpoints[1], "the third checkpoint's parent should be the second checkpoint"); + } + + [Theory] + [InlineData(ExecutionEnvironment.InProcess_Lockstep)] + [InlineData(ExecutionEnvironment.InProcess_OffThread)] + internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAsync(ExecutionEnvironment environment) + { + // Arrange: A looping workflow that produces checkpoints. + ForwardMessageExecutor executorA = new("A"); + ForwardMessageExecutor executorB = new("B"); + + Workflow workflow = new WorkflowBuilder(executorA) + .AddEdge(executorA, executorB) + .AddEdge(executorB, executorA) + .Build(); + + CheckpointManager checkpointManager = CheckpointManager.CreateInMemory(); + IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment(); + + // First run: collect a checkpoint to resume from + await using Checkpointed checkpointed = + await env.StreamAsync(workflow, "Hello", checkpointManager); + + List firstRunCheckpoints = []; + using CancellationTokenSource cts = new(); + await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync(cts.Token)) + { + if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + { + firstRunCheckpoints.Add(cp); + if (firstRunCheckpoints.Count >= 2) + { + cts.Cancel(); + } + } + } + + firstRunCheckpoints.Should().HaveCountGreaterThanOrEqualTo(2); + CheckpointInfo resumePoint = firstRunCheckpoints[0]; + + // Dispose the first run to release workflow ownership before resuming. + await checkpointed.DisposeAsync(); + + // Act: Resume from the first checkpoint + Checkpointed resumed = + await env.ResumeStreamAsync(workflow, resumePoint, checkpointManager); + + List resumedCheckpoints = []; + using CancellationTokenSource cts2 = new(); + await foreach (WorkflowEvent evt in resumed.Run.WatchStreamAsync(cts2.Token)) + { + if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp) + { + resumedCheckpoints.Add(cp); + if (resumedCheckpoints.Count >= 1) + { + cts2.Cancel(); + } + } + } + + // Assert: The first checkpoint after resume should have the resume point as its parent. + resumedCheckpoints.Should().NotBeEmpty(); + Checkpoint storedResumed = await ((ICheckpointManager)checkpointManager) + .LookupCheckpointAsync(resumedCheckpoints[0].RunId, resumedCheckpoints[0]); + storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent"); + storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from"); + } +}