diff --git a/.gitignore b/.gitignore index 1f891a01..5f007652 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ .DS_Store CLAUDE.md proxychains.conf +docker +dm_invite_sessions \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c615b8cb..9b8e3e52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -465,13 +465,47 @@ dependencies = [ "libloading", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" dependencies = [ - "axum-core", + "axum-core 0.5.2", "bytes", "form_urlencoded", "futures-util", @@ -481,7 +515,7 @@ dependencies = [ "hyper", "hyper-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -499,6 +533,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.5.2" @@ -1429,7 +1484,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.60.2", + "windows-sys 0.61.1", ] [[package]] @@ -1584,7 +1639,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -1628,6 +1683,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "eyeball" version = "0.8.8" @@ -1909,6 +1975,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -2814,7 +2886,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.53.4", ] [[package]] @@ -3347,6 +3419,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matchit" version = "0.8.4" @@ -3388,7 +3466,7 @@ dependencies = [ "async-once-cell", "async-stream", "async-trait", - "axum", + "axum 0.8.4", "backon", "bytes", "bytesize", @@ -4734,6 +4812,22 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "reqwest-eventsource" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632c55746dbb44275691640e7b40c907c16a2dc1a5842aa98aaec90da6ec6bde" +dependencies = [ + "eventsource-stream", + "futures-core", + "futures-timer", + "mime", + "nom", + "pin-project-lite", + "reqwest", + "thiserror 1.0.69", +] + [[package]] name = "resvg" version = "0.42.0" @@ -4871,6 +4965,7 @@ version = "0.0.1-pre-alpha-4" dependencies = [ "anyhow", "aws-lc-rs", + "axum 0.7.9", "bitflags 2.10.0", "blurhash", "bytesize", @@ -4880,6 +4975,7 @@ dependencies = [ "crossbeam-queue", "eyeball", "eyeball-im", + "futures", "futures-util", "hashbrown 0.16.1", "htmlize", @@ -4897,6 +4993,7 @@ dependencies = [ "rand 0.8.5", "rangemap", "reqwest", + "reqwest-eventsource", "robius-directories", "robius-location", "robius-open", @@ -4907,6 +5004,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", + "tokio-stream", "tracing-subscriber", "tsp_sdk", "unicode-segmentation", @@ -5146,7 +5244,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -5993,7 +6091,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -6974,7 +7072,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5c041614..5bb96873 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,11 @@ reqwest = { version = "0.12", default-features = false, features = [ "macos-system-configuration", ] } +## Dependencies for SSE support. +axum = "0.7" +futures = "0.3" +tokio-stream = "0.1" +reqwest-eventsource = "0.6" [features] default = [] diff --git a/src/bin/sse_server.rs b/src/bin/sse_server.rs new file mode 100644 index 00000000..533b89f7 --- /dev/null +++ b/src/bin/sse_server.rs @@ -0,0 +1,297 @@ +//! SSE Server with Matrix Bot +//! +//! This binary runs: +//! 1. An SSE (Server-Sent Events) server at http://127.0.0.1:3001/events +//! 2. A Matrix bot (testuser2) that responds to "/sse" commands with SSE stream headers +//! +//! When the bot receives "/sse", it sends a message with format: +//! `!SSE|http://127.0.0.1:3001/events|` +//! +//! After streaming completes, the bot edits the message with the full content. + +#![recursion_limit = "256"] + +use axum::{ + extract::State, + response::sse::{Event, Sse}, + routing::get, + Router, +}; +use futures::stream::{self, Stream}; +use std::{collections::HashMap, convert::Infallible, sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tokio_stream::StreamExt; + +use matrix_sdk::{ + Client, + config::SyncSettings, + room::Room, + ruma::{ + OwnedEventId, OwnedRoomId, + events::room::message::{ + MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent, + ReplacementMetadata, + }, + }, +}; + +/// Shared state for tracking SSE messages that need to be edited +#[derive(Clone)] +struct AppState { + /// Maps event_id to (room_id, accumulated_content) + pending_edits: Arc>>, + /// Matrix client for editing messages + client: Arc>>, +} + +impl Default for AppState { + fn default() -> Self { + Self { + pending_edits: Arc::new(Mutex::new(HashMap::new())), + client: Arc::new(Mutex::new(None)), + } + } +} + +/// The messages to stream via SSE +const SSE_MESSAGES: &[&str] = &[ + "Breaking: New Makepad release brings revolutionary UI performance!", + "Tech Update: Rust continues to dominate systems programming", + "Local News: Community embraces new SSE widget for real-time updates", + "Science: Researchers achieve breakthrough in quantum computing", + "Weather: Sunny skies expected throughout the week", +]; + +/// SSE handler that streams messages and edits Matrix message when complete +async fn sse_handler( + State(state): State, +) -> Sse>> { + // Collect all content for the final edit + let all_content: String = SSE_MESSAGES + .iter() + .map(|s| *s) + .collect::>() + .join("\n"); + + // Create a channel to signal stream completion + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let tx = Arc::new(Mutex::new(Some(tx))); + + let state_clone = state.clone(); + let tx_clone = tx.clone(); + + let stream = stream::iter(SSE_MESSAGES.iter().enumerate()) + .chain(stream::once(async { (SSE_MESSAGES.len(), &"[STREAM_END]") })) + .throttle(Duration::from_secs(1)) + .map(move |(i, msg)| { + let is_end = *msg == "[STREAM_END]"; + let data = if is_end { + format!(r#"{{"id": {}, "status": "complete"}}"#, i) + } else { + format!( + r#"{{"id": {}, "title": "News #{}", "content": "{}"}}"#, + i, + i + 1, + msg + ) + }; + + // Signal completion when STREAM_END is sent + if is_end { + let tx = tx_clone.clone(); + tokio::spawn(async move { + if let Some(sender) = tx.lock().await.take() { + let _ = sender.send(()); + } + }); + } + + Ok(Event::default().data(data)) + }); + + // Spawn a task that waits for stream completion signal, then edits the message + let content_for_edit = all_content; + tokio::spawn(async move { + // Wait for the stream to signal completion + let _ = rx.await; + // Edit the pending Matrix message + edit_pending_message(state_clone, content_for_edit).await; + }); + + Sse::new(stream).keep_alive( + axum::response::sse::KeepAlive::new() + .interval(Duration::from_secs(10)) + .text("keep-alive"), + ) +} + +/// Edit the most recent pending Matrix message with the accumulated content +async fn edit_pending_message(state: AppState, content: String) { + let client_guard = state.client.lock().await; + let Some(client) = client_guard.as_ref() else { + eprintln!("Matrix bot: No client available for editing"); + return; + }; + + // Get the first pending edit (there should be one per SSE request) + let pending_edit = { + let mut pending = state.pending_edits.lock().await; + pending.drain().next() + }; + + if let Some((event_id, (room_id, _))) = pending_edit { + if let Some(room) = client.get_room(&room_id) { + let final_content = format!("SSE Stream Complete:\n\n{}", content); + + // Create edit content using ReplacementMetadata + let metadata = ReplacementMetadata::new(event_id.clone(), None); + let new_content = RoomMessageEventContent::text_plain(&final_content) + .make_replacement(metadata); + + match room.send(new_content).await { + Ok(_) => { + println!("Matrix bot: Successfully edited message {}", event_id); + } + Err(e) => { + eprintln!("Matrix bot: Failed to edit message: {}", e); + } + } + } + } else { + println!("Matrix bot: No pending message to edit"); + } +} + +/// Run the Matrix bot that responds to /sse commands +async fn run_matrix_bot(state: AppState) -> anyhow::Result<()> { + // let homeserver_url = "http://localhost:8008"; + // let username = "testuser2"; + // let password = "testpassword"; + let homeserver_url = "https://matrix.org"; + let username = "ruitobeta"; + let password = "!C6WS3hcPGM:GMa"; + println!("Matrix bot: Connecting to homeserver {}", homeserver_url); + + // Build the client + let client = Client::builder() + .homeserver_url(homeserver_url) + .build() + .await?; + + // Login + client + .matrix_auth() + .login_username(username, password) + .initial_device_display_name("sse-bot") + .send() + .await?; + + println!("Matrix bot: Logged in as {}", username); + + // Store the client in shared state for later use + { + let mut client_guard = state.client.lock().await; + *client_guard = Some(client.clone()); + } + + // Clone state for the event handler + let state_clone = state.clone(); + + // Add event handler for room messages + client.add_event_handler( + move |event: OriginalSyncRoomMessageEvent, room: Room| { + let state = state_clone.clone(); + async move { + handle_room_message(event, room, state).await; + } + }, + ); + + // Start syncing + println!("Matrix bot: Starting sync..."); + client.sync(SyncSettings::default()).await?; + + Ok(()) +} + +/// Handle incoming room messages +async fn handle_room_message( + event: OriginalSyncRoomMessageEvent, + room: Room, + state: AppState, +) { + // Only handle text messages + let MessageType::Text(text_content) = &event.content.msgtype else { + return; + }; + + let body = text_content.body.trim(); + + // Check if message is "/sse" command + if body == "/sse" { + println!("Matrix bot: Received /sse command from {}", event.sender); + + // Send SSE header message + let sse_message = "!SSE|http://127.0.0.1:3001/events|"; + let content = RoomMessageEventContent::text_plain(sse_message); + + match room.send(content).await { + Ok(response) => { + let event_id = response.response.event_id; + let room_id = room.room_id().to_owned(); + + println!( + "Matrix bot: Sent SSE header message, event_id: {}", + event_id + ); + + // Store the event_id for later editing (will be edited when SSE stream ends) + { + let mut pending = state.pending_edits.lock().await; + pending.insert(event_id, (room_id, String::new())); + } + } + Err(e) => { + eprintln!("Matrix bot: Failed to send message: {}", e); + } + } + } +} + +#[tokio::main] +async fn main() { + println!("Starting SSE Server with Matrix Bot..."); + + // Create shared state + let state = AppState::default(); + + // Create SSE server with state + let app = Router::new() + .route("/events", get(sse_handler)) + .with_state(state.clone()); + + // Spawn SSE server + let sse_handle = tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind("127.0.0.1:3001") + .await + .unwrap(); + println!("SSE Server running on http://127.0.0.1:3001/events"); + axum::serve(listener, app).await.unwrap(); + }); + + // Give SSE server a moment to start + tokio::time::sleep(Duration::from_millis(500)).await; + + // Run Matrix bot + let bot_handle = tokio::spawn(async move { + if let Err(e) = run_matrix_bot(state).await { + eprintln!("Matrix bot error: {}", e); + } + }); + + // Wait for both to complete (they won't under normal operation) + tokio::select! { + _ = sse_handle => println!("SSE server stopped"), + _ = bot_handle => println!("Matrix bot stopped"), + } +} diff --git a/src/home/room_screen.rs b/src/home/room_screen.rs index dc42a462..5aff37d8 100644 --- a/src/home/room_screen.rs +++ b/src/home/room_screen.rs @@ -1149,6 +1149,7 @@ impl Widget for RoomScreen { &self.pinned_events, item_drawn_status, room_screen_widget_uid, + &mut tl_state.sse_streams, ) }, // TODO: properly implement `Poll` as a regular Message-like timeline item. @@ -1411,6 +1412,43 @@ impl RoomScreen { // log!("process_timeline_updates(): changed_indices: {changed_indices:?}, items len: {}\ncontent drawn: {:#?}\nprofile drawn: {:#?}", items.len(), tl.content_drawn_since_last_update, tl.profile_drawn_since_last_update); } tl.items = new_items; + + // Check new items for SSE messages and start fetching if needed + for item in tl.items.iter() { + if let Some(event_tl_item) = item.as_event() { + if let Some(event_id) = event_tl_item.event_id() { + // Check if this message contains an SSE header + if let TimelineItemContent::MsgLike(msg_like_content) = event_tl_item.content() { + if let MsgLikeKind::Message(message) = &msg_like_content.kind { + if let MessageType::Text(text_content) = message.msgtype() { + if let Some(sse_url) = parse_sse_header(&text_content.body) { + // Get or create SSE stream state + let sse_state = tl.sse_streams.entry(event_id.to_owned()).or_insert_with(|| { + SseStreamState { + url: sse_url.clone(), + accumulated_content: String::new(), + is_fetching: false, + is_complete: false, + } + }); + + // Start SSE fetch if not already fetching + if !sse_state.is_fetching && !sse_state.is_complete { + sse_state.is_fetching = true; + start_sse_fetch( + tl.kind.clone(), + event_id.to_owned(), + sse_url, + ); + } + } + } + } + } + } + } + } + done_loading = true; } TimelineUpdate::NewUnreadMessagesCount(unread_messages_count) => { @@ -1617,6 +1655,26 @@ impl RoomScreen { tl.tombstone_info = Some(successor_room_details); } TimelineUpdate::LinkPreviewFetched => {} + TimelineUpdate::SseContentUpdate { event_id, content, is_complete } => { + // Update the SSE stream state for this event + // Note: content is already the full accumulated content from sliding_sync + if let Some(sse_state) = tl.sse_streams.get_mut(&event_id) { + sse_state.accumulated_content = content; + if is_complete { + sse_state.is_complete = true; + sse_state.is_fetching = false; + } + } + // Clear the draw cache for all items so the message gets redrawn + // with the updated SSE content + tl.content_drawn_since_last_update.clear(); + + // Auto-scroll to bottom while SSE is streaming + if !tl.items.is_empty() { + portal_list.set_first_id_and_scroll(tl.items.len().saturating_sub(1), 0.0); + portal_list.set_tail_range(true); + } + } } } @@ -2286,6 +2344,7 @@ impl RoomScreen { scrolled_past_read_marker: false, latest_own_user_receipt: None, tombstone_info, + sse_streams: HashMap::new(), }; (tl_state, true) }; @@ -2759,6 +2818,15 @@ pub enum TimelineUpdate { Tombstoned(SuccessorRoomDetails), /// A notice that link preview data for a URL has been fetched and is now available. LinkPreviewFetched, + /// An update containing SSE (Server-Sent Events) content for a message. + SseContentUpdate { + /// The event ID of the message that contains the SSE stream. + event_id: OwnedEventId, + /// The new content to append to the message. + content: String, + /// Whether this is the final update (stream completed). + is_complete: bool, + }, } thread_local! { @@ -2877,6 +2945,34 @@ struct TimelineUiState { /// If `Some`, this room has been tombstoned and the details of its successor room /// are contained within. If `None`, the room has not been tombstoned. tombstone_info: Option, + + /// Tracks active SSE streams by event ID. + /// Maps event_id to (accumulated_content, is_fetching). + sse_streams: HashMap, +} + +/// State for tracking an SSE (Server-Sent Events) stream for a message. +#[derive(Debug, Clone)] +struct SseStreamState { + /// The URL of the SSE endpoint. + url: String, + /// Accumulated content from the SSE stream. + accumulated_content: String, + /// Whether the SSE stream is currently being fetched. + is_fetching: bool, + /// Whether the stream has completed. + is_complete: bool, +} + +impl Default for SseStreamState { + fn default() -> Self { + Self { + url: String::new(), + accumulated_content: String::new(), + is_fetching: false, + is_complete: false, + } + } } #[derive(Default, Debug)] @@ -3009,6 +3105,7 @@ fn populate_message_view( pinned_events: &[OwnedEventId], item_drawn_status: ItemDrawnStatus, room_screen_widget_uid: WidgetUid, + sse_streams: &mut HashMap, ) -> (WidgetRef, ItemDrawnStatus) { let mut new_drawn_status = item_drawn_status; let ts_millis = event_tl_item.timestamp(); @@ -3051,11 +3148,36 @@ fn populate_message_view( if existed && item_drawn_status.content_drawn { (item, true) } else { + // Check for SSE pattern in the message and display SSE content if available. + // Note: SSE fetch is triggered in process_timeline_updates when NewItems arrive. + let (display_body, display_formatted) = if parse_sse_header(body).is_some() { + // Check if we have SSE state for this event + if let Some(event_id) = event_tl_item.event_id() { + if let Some(sse_state) = sse_streams.get(event_id) { + // Display accumulated content or loading message + if sse_state.accumulated_content.is_empty() { + (format!("Loading SSE from {}...", sse_state.url), None) + } else if sse_state.is_complete { + (format!("SSE Complete:\n\n{}", sse_state.accumulated_content), None) + } else { + (format!("SSE Streaming...\n\n{}", sse_state.accumulated_content), None) + } + } else { + // SSE state not yet created (shouldn't happen normally) + (body.to_string(), formatted.clone()) + } + } else { + (body.to_string(), formatted.clone()) + } + } else { + (body.to_string(), formatted.clone()) + }; + new_drawn_status.content_drawn = populate_text_message_content( cx, &item.html_or_plaintext(ids!(content.message)), - body, - formatted.as_ref(), + &display_body, + display_formatted.as_ref(), Some(&mut item.link_preview(ids!(content.link_preview_view))), Some(media_cache), Some(link_preview_cache), @@ -4787,10 +4909,48 @@ impl MessageRef { /// /// This function requires passing in a reference to `Cx`, /// which isn't used, but acts as a guarantee that this function -/// must only be called by the main UI thread. +/// must only be called by the main UI thread. pub fn clear_timeline_states(_cx: &mut Cx) { // Clear timeline states cache TIMELINE_STATES.with_borrow_mut(|states| { states.clear(); }); } + +/// Parses an SSE header from a message body. +/// +/// The expected format is: `!SSE||` +/// For example: `!SSE|http://127.0.0.1:3000/events|` +/// +/// Returns the URL if the pattern matches, otherwise None. +fn parse_sse_header(body: &str) -> Option { + let trimmed = body.trim(); + if trimmed.starts_with("!SSE|") { + // Find the closing | + if let Some(end_idx) = trimmed[5..].find('|') { + let url = &trimmed[5..5 + end_idx]; + if !url.is_empty() { + return Some(url.to_string()); + } + } + } + None +} + +/// Starts an SSE fetch in a background task. +/// +/// This function submits an async request to fetch SSE content. +/// Updates are sent via the timeline update channel obtained from the timeline_kind. +fn start_sse_fetch( + timeline_kind: TimelineKind, + event_id: OwnedEventId, + url: String, +) { + use crate::sliding_sync::{submit_async_request, MatrixRequest}; + + submit_async_request(MatrixRequest::FetchSse { + timeline_kind, + event_id, + url, + }); +} diff --git a/src/sliding_sync.rs b/src/sliding_sync.rs index 4d4455b1..5541fab9 100644 --- a/src/sliding_sync.rs +++ b/src/sliding_sync.rs @@ -664,6 +664,15 @@ pub enum MatrixRequest { destination: Arc>, update_sender: Option>, }, + /// Request to fetch SSE (Server-Sent Events) content from a URL. + FetchSse { + /// The timeline kind (room) where the SSE message is displayed. + timeline_kind: TimelineKind, + /// The event ID of the message containing the SSE header. + event_id: OwnedEventId, + /// The SSE endpoint URL to fetch from. + url: String, + }, } /// Submits a request to the worker thread to be executed asynchronously. @@ -1928,6 +1937,127 @@ async fn matrix_worker_task( SignalToUI::set_ui_signal(); }); } + + MatrixRequest::FetchSse { timeline_kind, event_id, url } => { + let _fetch_sse_task = Handle::current().spawn(async move { + use futures_util::StreamExt; + use reqwest_eventsource::{Event, EventSource}; + + log!("Starting SSE fetch for event {} from {}", event_id, url); + + // Get the timeline sender using get_timeline_and_sender + let Some((_timeline, update_sender)) = get_timeline_and_sender(&timeline_kind) else { + error!("SSE: Failed to get timeline sender for {:?}", timeline_kind); + return; + }; + + let client = reqwest::Client::builder() + .no_proxy() + .build() + .unwrap_or_default(); + let request = client.get(&url); + + let mut es = match EventSource::new(request) { + Ok(es) => es, + Err(e) => { + error!("SSE: Failed to create EventSource for {}: {:?}", url, e); + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id, + content: format!("Error creating EventSource: {:?}", e), + is_complete: true, + }); + SignalToUI::set_ui_signal(); + return; + } + }; + + log!("SSE: Connected to {}, waiting for events...", url); + let mut accumulated = String::new(); + + while let Some(event) = es.next().await { + match event { + Ok(Event::Open) => { + log!("SSE: Connection opened for event {}", event_id); + } + Ok(Event::Message(msg)) => { + log!("SSE: Event '{}': {}", msg.event, msg.data); + + // Try to parse as JSON first + if let Ok(json) = serde_json::from_str::(&msg.data) { + // Check for content field + if let Some(content) = json.get("content").and_then(|c| c.as_str()) { + if !accumulated.is_empty() { + accumulated.push('\n'); + } + accumulated.push_str(content); + + // Send incremental update + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id: event_id.clone(), + content: accumulated.clone(), + is_complete: false, + }); + SignalToUI::set_ui_signal(); + } + + // Check for stream completion + if json.get("status").and_then(|s| s.as_str()) == Some("complete") + || json.get("type").and_then(|t| t.as_str()) == Some("complete") + { + log!("SSE: Stream completed for event {}", event_id); + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id: event_id.clone(), + content: accumulated.clone(), + is_complete: true, + }); + SignalToUI::set_ui_signal(); + es.close(); + return; + } + } else { + // Not JSON, treat as raw content + if !msg.data.is_empty() { + if !accumulated.is_empty() { + accumulated.push('\n'); + } + accumulated.push_str(&msg.data); + + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id: event_id.clone(), + content: accumulated.clone(), + is_complete: false, + }); + SignalToUI::set_ui_signal(); + } + } + } + Err(e) => { + error!("SSE: Error for event {}: {:?}", event_id, e); + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id, + content: if accumulated.is_empty() { + format!("SSE Error: {:?}", e) + } else { + format!("{}\n\nSSE Error: {:?}", accumulated, e) + }, + is_complete: true, + }); + SignalToUI::set_ui_signal(); + return; + } + } + } + + // Stream ended + log!("SSE: Connection closed for event {}", event_id); + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id, + content: accumulated, + is_complete: true, + }); + SignalToUI::set_ui_signal(); + }); + } } }