Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ await this.RaiseWorkflowEventAsync(this.StepTracer.Complete(this.RunContext.Next
}

private WorkflowInfo? _workflowInfoCache;
private CheckpointInfo? _lastCheckpointInfo;
private readonly List<CheckpointInfo> _checkpoints = [];
internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
{
Expand All @@ -270,10 +271,10 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
RunnerStateData runnerData = await this.RunContext.ExportStateAsync().ConfigureAwait(false);
Dictionary<ScopeKey, PortableValue> stateData = await this.RunContext.StateManager.ExportStateAsync().ConfigureAwait(false);

Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData);
CheckpointInfo checkpointInfo = await this.CheckpointManager.CommitCheckpointAsync(this.RunId, checkpoint).ConfigureAwait(false);
this.StepTracer.TraceCheckpointCreated(checkpointInfo);
this._checkpoints.Add(checkpointInfo);
Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData, this._lastCheckpointInfo);
this._lastCheckpointInfo = await this.CheckpointManager.CommitCheckpointAsync(this.RunId, checkpoint).ConfigureAwait(false);
this.StepTracer.TraceCheckpointCreated(this._lastCheckpointInfo);
this._checkpoints.Add(this._lastCheckpointInfo);
}

public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -304,6 +305,7 @@ public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, Can
await this.EdgeMap.ImportStateAsync(checkpoint).ConfigureAwait(false);
await Task.WhenAll(executorNotifyTask, republishRequestsTask.AsTask()).ConfigureAwait(false);

this._lastCheckpointInfo = checkpointInfo;
this.StepTracer.Reload(this.StepTracer.StepNumber);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.Checkpointing;

namespace Microsoft.Agents.AI.Workflows.UnitTests;

/// <summary>
/// Tests for verifying that CheckpointInfo.Parent is properly populated
/// when checkpoints are created during workflow execution (GH #3796).
/// </summary>
public class CheckpointParentTests
{
[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(ExecutionEnvironment environment)
{
// Arrange: A simple two-step workflow that will produce at least one checkpoint.
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// Act
Checkpointed<StreamingRun> checkpointed =
await env.StreamAsync(workflow, "Hello", checkpointManager);

List<CheckpointInfo> checkpoints = [];
await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
}
}

// Assert: The first checkpoint should have been created and stored with a null parent.
checkpoints.Should().NotBeEmpty("at least one checkpoint should have been created");

CheckpointInfo firstCheckpoint = checkpoints[0];
Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(firstCheckpoint.RunId, firstCheckpoint);
storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent");
}

[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(ExecutionEnvironment environment)
{
// Arrange: A workflow with a loop that will produce multiple checkpoints.
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");

// A -> B -> A (loop) to generate multiple supersteps/checkpoints.
Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.AddEdge(executorB, executorA)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// Act
await using Checkpointed<StreamingRun> checkpointed =
await env.StreamAsync(workflow, "Hello", checkpointManager);

List<CheckpointInfo> checkpoints = [];
using CancellationTokenSource cts = new();

await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
if (checkpoints.Count >= 3)
{
cts.Cancel();
}
}
}

// Assert: We should have at least 3 checkpoints
checkpoints.Should().HaveCountGreaterThanOrEqualTo(3);

// Verify the parent chain
Checkpoint stored0 = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(checkpoints[0].RunId, checkpoints[0]);
stored0.Parent.Should().BeNull("the first checkpoint should have no parent");

Checkpoint stored1 = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(checkpoints[1].RunId, checkpoints[1]);
stored1.Parent.Should().NotBeNull("the second checkpoint should have a parent");
stored1.Parent.Should().Be(checkpoints[0], "the second checkpoint's parent should be the first checkpoint");

Checkpoint stored2 = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(checkpoints[2].RunId, checkpoints[2]);
stored2.Parent.Should().NotBeNull("the third checkpoint should have a parent");
stored2.Parent.Should().Be(checkpoints[1], "the third checkpoint's parent should be the second checkpoint");
}

[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAsync(ExecutionEnvironment environment)
{
// Arrange: A looping workflow that produces checkpoints.
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.AddEdge(executorB, executorA)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
IWorkflowExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// First run: collect a checkpoint to resume from
await using Checkpointed<StreamingRun> checkpointed =
await env.StreamAsync(workflow, "Hello", checkpointManager);

List<CheckpointInfo> firstRunCheckpoints = [];
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in checkpointed.Run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
firstRunCheckpoints.Add(cp);
if (firstRunCheckpoints.Count >= 2)
{
cts.Cancel();
}
}
}

firstRunCheckpoints.Should().HaveCountGreaterThanOrEqualTo(2);
CheckpointInfo resumePoint = firstRunCheckpoints[0];

// Dispose the first run to release workflow ownership before resuming.
await checkpointed.DisposeAsync();

// Act: Resume from the first checkpoint
Checkpointed<StreamingRun> resumed =
await env.ResumeStreamAsync(workflow, resumePoint, checkpointManager);

List<CheckpointInfo> resumedCheckpoints = [];
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.Run.WatchStreamAsync(cts2.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
if (resumedCheckpoints.Count >= 1)
{
cts2.Cancel();
}
}
}

// Assert: The first checkpoint after resume should have the resume point as its parent.
resumedCheckpoints.Should().NotBeEmpty();
Checkpoint storedResumed = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(resumedCheckpoints[0].RunId, resumedCheckpoints[0]);
storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent");
storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from");
}
}
Loading