Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 125 additions & 54 deletions livekit/livekit_agent.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions protobufs/livekit_agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,19 @@ message JobTermination {
string job_id = 1;
}

message AgentSessionStateDelta {
Copy link
Contributor

@paulwe paulwe Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message AgentSessionState {
  uint64 version = 1;
  bytes snapshot = 2;
  bytes delta = 3;
}

request

  • version + snapshot - cold start request for worker that is not expected to have the state already
  • version - request for worker expected to have up to date state (recently served the same session)

response

  • version + snapshot - initial response or resync after schema change
  • version + delta - response to ongoing session

Copy link
Contributor Author

@longcw longcw Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there could be multiple deltas if the cloud doesn't combine them into a single one. maybe we can have only deltas and use the v1 as the base snapshot, what do you think

message AgentSessionState {
  // the version of the latest delta, version_1 means the delta from empty state
  uint64 version = 1;
  repeated bytes deltas = 2;
}

request

  • deltas empty - request for worker to have up to date state, which is the same as version in the request
  • deltas from 1 up to the version - cold start

response

  • version 1 + delta - initial response or request to drop previous deltas in cloud if any
  • version X + delta - response to ongoing session
  • version 0 + error - ask for cold start (potentially return a version X that requests deltas after that version)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on benchmarks merging the changesets is inexpensive enough that we can always deliver snapshots for a cold start request.
we shouldn't expect the server to maintain the full changeset history. the minimum requirement is changeset version n and snapshot n-1 (in case we have to cold start a worker for a client retry).
separating the snapshot and delta fields makes the api more explicit than relying on version number to interpret the meaning of the data field.

Copy link
Contributor Author

@longcw longcw Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean the cloud will merge the changesets into a single changeset, or merge the changesets to the snapshot? I think it makes a lot of sense if the cloud can merge the changesets to the snapshot.

Copy link
Contributor

@paulwe paulwe Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's what i mean. we'll only ever send the agent a snapshot for cold start

// if base_version is empty, this is the delta from the initial state
optional string base_version = 1;
string new_version = 2;
Comment on lines +192 to +193
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we keep this simple by making these uint64s and incrementing by 1 for every turn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it could be int

bytes changeset = 3;
}

message TextMessageRequest {
string message_id = 1;
string session_id = 2;
string agent_name = 3;
string metadata = 4;
bytes session_data = 5;
repeated AgentSessionStateDelta session_state = 5;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two options for sync state from cloud to worker:

  1. always send the full db
  2. ask the worker what version you have and sync a partial of the delta to worker

for the second case we may also need a protocol to ask and answer the db version

Copy link
Contributor

@paulwe paulwe Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can maintain sticky routing with state on the server that includes which state version a worker has.
in the TextWorkerRequest when we think the state already exists we can send the state version instead of session data.
the agent can optionally respond with an error instructing the server to retry with a state snapshot - this should be very uncommon.

string text = 6;
}

Expand All @@ -207,6 +214,6 @@ message PushTextRequest {
message TextMessageResponse {
// Indicate the request is completed
string message_id = 1;
bytes session_data = 2;
AgentSessionStateDelta session_state = 2;
string error = 3;
}