Skip to content
Open
2 changes: 2 additions & 0 deletions livekit-agents/livekit/agents/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def make_session_report(self, session: AgentSession | None = None) -> SessionRep

sr = SessionReport(
enable_recording=session._enable_recording,
include_internal_events=session._include_internal_events,
job_id=self.job.id,
room_id=self.job.room.sid,
room=self.job.room.name,
Expand All @@ -273,6 +274,7 @@ def make_session_report(self, session: AgentSession | None = None) -> SessionRep
audio_recording_started_at=recorder_io.recording_started_at if recorder_io else None,
started_at=session._started_at,
events=session._recorded_events,
internal_events=session._recorded_internal_events,
chat_history=session.history.copy(),
)

Expand Down
3 changes: 2 additions & 1 deletion livekit-agents/livekit/agents/llm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
CompletionUsage,
FunctionToolCall,
LLMError,
LLMOutputEvent,
LLMStream,
)
from .realtime import (
Expand Down Expand Up @@ -97,8 +98,8 @@
"GenerationCreatedEvent",
"MessageGeneration",
"RealtimeSessionReconnectedEvent",
"RealtimeSessionRestoredEvent",
"LLMError",
"LLMOutputEvent",
]

# Cleanup docs of unexported modules
Expand Down
13 changes: 13 additions & 0 deletions livekit-agents/livekit/agents/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
from abc import ABC, abstractmethod
from collections.abc import AsyncIterable, AsyncIterator
from dataclasses import dataclass
from datetime import datetime, timezone
from types import TracebackType
from typing import Any, ClassVar, Generic, Literal, TypeVar, Union
Expand All @@ -26,6 +27,7 @@
NOT_GIVEN,
APIConnectOptions,
NotGivenOr,
TimedString,
)
from ..utils import aio
from .chat_context import ChatContext, ChatRole
Expand Down Expand Up @@ -70,6 +72,17 @@ class ChatChunk(BaseModel):
usage: CompletionUsage | None = None


@dataclass
class LLMOutputEvent:
type: Literal[
"llm_chunk_output",
"llm_str_output",
"llm_timed_string_output",
"realtime_audio_output",
]
data: ChatChunk | str | TimedString | rtc.AudioFrame


class LLMError(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
type: Literal["llm_error"] = "llm_error"
Expand Down
6 changes: 5 additions & 1 deletion livekit-agents/livekit/agents/llm/realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

@dataclass
class InputSpeechStartedEvent:
pass
type: Literal["input_speech_started"] = "input_speech_started"


@dataclass
class InputSpeechStoppedEvent:
user_transcription_enabled: bool
type: Literal["input_speech_stopped"] = "input_speech_stopped"


@dataclass
Expand All @@ -32,6 +33,7 @@ class MessageGeneration:
text_stream: AsyncIterable[str] # could be io.TimedString
audio_stream: AsyncIterable[rtc.AudioFrame]
modalities: Awaitable[list[Literal["text", "audio"]]]
type: Literal["message_generation"] = "message_generation"


@dataclass
Expand All @@ -42,6 +44,7 @@ class GenerationCreatedEvent:
"""True if the message was generated by the user using generate_reply()"""
response_id: str | None = None
"""The response ID associated with this generation, used for metrics attribution"""
type: Literal["generation_created"] = "generation_created"


class RealtimeModelError(BaseModel):
Expand Down Expand Up @@ -127,6 +130,7 @@ class InputTranscriptionCompleted:
transcript: str
"""transcript of the input audio"""
is_final: bool
type: Literal["input_transcription_completed"] = "input_transcription_completed"


@dataclass
Expand Down
1 change: 1 addition & 0 deletions livekit-agents/livekit/agents/tts/tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class SynthesizedAudio:
"""Segment ID, each segment is separated by a flush (streaming only)"""
delta_text: str = ""
"""Current segment of the synthesized audio (streaming only)"""
type: Literal["synthesized_audio"] = "synthesized_audio"


@dataclass
Expand Down
14 changes: 13 additions & 1 deletion livekit-agents/livekit/agents/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
_T = TypeVar("_T")


@dataclass
class FlushSentinel:
pass
type: Literal["flush_sentinel"] = "flush_sentinel"


class NotGiven:
Expand Down Expand Up @@ -114,3 +115,14 @@ def __new__(
obj.confidence = confidence
obj.start_time_offset = start_time_offset
return obj

def to_dict(self) -> dict:
return {
"text": self,
"start_time": self.start_time if self.start_time is not NOT_GIVEN else None,
"end_time": self.end_time if self.end_time is not NOT_GIVEN else None,
"confidence": self.confidence if self.confidence is not NOT_GIVEN else None,
"start_time_offset": self.start_time_offset
if self.start_time_offset is not NOT_GIVEN
else None,
}
21 changes: 19 additions & 2 deletions livekit-agents/livekit/agents/voice/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
from livekit import rtc

from .. import inference, llm, stt, tokenize, tts, utils, vad
from ..llm import ChatContext, RealtimeModel, find_function_tools
from ..llm import ChatContext, LLMOutputEvent, RealtimeModel, find_function_tools
from ..llm.chat_context import _ReadOnlyChatContext
from ..log import logger
from ..types import NOT_GIVEN, FlushSentinel, NotGivenOr
from ..utils import is_given, misc
from .io import TimedString
from .speech_handle import SpeechHandle

if TYPE_CHECKING:
Expand All @@ -22,7 +23,6 @@
from .agent_activity import AgentActivity
from .agent_session import AgentSession
from .audio_recognition import TurnDetectionMode
from .io import TimedString


@dataclass
Expand Down Expand Up @@ -391,6 +391,7 @@ async def _forward_input() -> None:
try:
async for event in stream:
yield event
activity.session.maybe_collect(event)
finally:
await utils.aio.cancel_and_wait(forward_task)

Expand All @@ -417,6 +418,9 @@ async def llm_node(
) as stream:
async for chunk in stream:
yield chunk
activity.session.maybe_collect(
LLMOutputEvent(type="llm_chunk_output", data=chunk)
)

@staticmethod
async def tts_node(
Expand Down Expand Up @@ -447,6 +451,7 @@ async def _forward_input() -> None:
try:
async for ev in stream:
yield ev.frame
activity.session.maybe_collect(ev)
finally:
await utils.aio.cancel_and_wait(forward_task)

Expand All @@ -455,8 +460,17 @@ async def transcription_node(
agent: Agent, text: AsyncIterable[str | TimedString], model_settings: ModelSettings
) -> AsyncGenerator[str | TimedString, None]:
"""Default implementation for `Agent.transcription_node`"""
activity = agent._get_activity_or_raise()
async for delta in text:
yield delta
if isinstance(delta, TimedString):
activity.session.maybe_collect(
LLMOutputEvent(type="llm_timed_string_output", data=delta)
)
else:
activity.session.maybe_collect(
LLMOutputEvent(type="llm_str_output", data=delta)
)

@staticmethod
async def realtime_audio_output_node(
Expand All @@ -470,6 +484,9 @@ async def realtime_audio_output_node(

async for frame in audio:
yield frame
activity.session.maybe_collect(
LLMOutputEvent(type="realtime_audio_output", data=frame)
)

@property
def realtime_llm_session(self) -> llm.RealtimeSession:
Expand Down
14 changes: 13 additions & 1 deletion livekit-agents/livekit/agents/voice/agent_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,8 @@ def _on_error(

self._session._on_error(error)

def _on_input_speech_started(self, _: llm.InputSpeechStartedEvent) -> None:
def _on_input_speech_started(self, ev: llm.InputSpeechStartedEvent) -> None:
self._session.maybe_collect(ev)
if self.vad is None:
self._session._update_user_state("speaking")

Expand All @@ -1119,6 +1120,7 @@ def _on_input_speech_started(self, _: llm.InputSpeechStartedEvent) -> None:
)

def _on_input_speech_stopped(self, ev: llm.InputSpeechStoppedEvent) -> None:
self._session.maybe_collect(ev)
if self.vad is None:
self._session._update_user_state("listening")

Expand All @@ -1128,6 +1130,7 @@ def _on_input_speech_stopped(self, ev: llm.InputSpeechStoppedEvent) -> None:
)

def _on_input_audio_transcription_completed(self, ev: llm.InputTranscriptionCompleted) -> None:
self._session.maybe_collect(ev)
self._session._user_input_transcribed(
UserInputTranscribedEvent(transcript=ev.transcript, is_final=ev.is_final)
)
Expand All @@ -1140,6 +1143,8 @@ def _on_input_audio_transcription_completed(self, ev: llm.InputTranscriptionComp
self._session._conversation_item_added(msg)

def _on_generation_created(self, ev: llm.GenerationCreatedEvent) -> None:
self._session.maybe_collect(ev)

if ev.user_initiated:
# user_initiated generations are directly handled inside _realtime_reply_task
return
Expand Down Expand Up @@ -1214,6 +1219,7 @@ def _interrupt_by_audio_activity(self) -> None:
def on_start_of_speech(self, ev: vad.VADEvent | None) -> None:
speech_start_time = time.time()
if ev:
self._session.maybe_collect(ev)
speech_start_time = speech_start_time - ev.speech_duration
self._session._update_user_state("speaking", last_speaking_time=speech_start_time)
self._user_silence_event.clear()
Expand All @@ -1226,6 +1232,7 @@ def on_start_of_speech(self, ev: vad.VADEvent | None) -> None:
def on_end_of_speech(self, ev: vad.VADEvent | None) -> None:
speech_end_time = time.time()
if ev:
self._session.maybe_collect(ev)
speech_end_time = speech_end_time - ev.silence_duration
self._session._update_user_state(
"listening",
Expand All @@ -1241,6 +1248,8 @@ def on_end_of_speech(self, ev: vad.VADEvent | None) -> None:
self._start_false_interruption_timer(timeout)

def on_vad_inference_done(self, ev: vad.VADEvent) -> None:
self._session.maybe_collect(ev)

if self._turn_detection in ("manual", "realtime_llm"):
# ignore vad inference done event if turn_detection is manual or realtime_llm
return
Expand All @@ -1258,6 +1267,8 @@ def on_vad_inference_done(self, ev: vad.VADEvent) -> None:
self._user_silence_event.set()

def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) -> None:
self._session.maybe_collect(ev)

if isinstance(self.llm, llm.RealtimeModel) and self.llm.capabilities.user_transcription:
# skip stt transcription if user_transcription is enabled on the realtime model
return
Expand Down Expand Up @@ -1286,6 +1297,7 @@ def on_interim_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None) -
self._start_false_interruption_timer(timeout)

def on_final_transcript(self, ev: stt.SpeechEvent, *, speaking: bool | None = None) -> None:
self._session.maybe_collect(ev)
if isinstance(self.llm, llm.RealtimeModel) and self.llm.capabilities.user_transcription:
# skip stt transcription if user_transcription is enabled on the realtime model
return
Expand Down
Loading