From 664f8c7e035aa41973405c33ff103836a7e4d0ef Mon Sep 17 00:00:00 2001 From: alanpoon Date: Thu, 12 Mar 2026 11:42:05 +0800 Subject: [PATCH 1/3] flow_message --- .gitignore | 2 + Cargo.lock | 82 +++++++++-- Cargo.toml | 4 + src/bin/sse_server.rs | 299 ++++++++++++++++++++++++++++++++++++++++ src/home/room_screen.rs | 241 +++++++++++++++++++++++++++++++- 5 files changed, 616 insertions(+), 12 deletions(-) create mode 100644 src/bin/sse_server.rs diff --git a/.gitignore b/.gitignore index 1f891a019..5f0076523 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ .DS_Store CLAUDE.md proxychains.conf +docker +dm_invite_sessions \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c615b8cb7..a3cc763c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -465,13 +465,47 @@ dependencies = [ "libloading", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5" dependencies = [ - "axum-core", + "axum-core 0.5.2", "bytes", "form_urlencoded", "futures-util", @@ -481,7 +515,7 @@ dependencies = [ "hyper", "hyper-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -499,6 +533,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-core" version = "0.5.2" @@ -1429,7 +1484,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.60.2", + "windows-sys 0.61.1", ] [[package]] @@ -1584,7 +1639,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -2814,7 +2869,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07033963ba89ebaf1584d767badaa2e8fcec21aedea6b8c0346d487d49c28667" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.53.4", ] [[package]] @@ -3347,6 +3402,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matchit" version = "0.8.4" @@ -3388,7 +3449,7 @@ dependencies = [ "async-once-cell", "async-stream", "async-trait", - "axum", + "axum 0.8.4", "backon", "bytes", "bytesize", @@ -4871,6 +4932,7 @@ version = "0.0.1-pre-alpha-4" dependencies = [ "anyhow", "aws-lc-rs", + "axum 0.7.9", "bitflags 2.10.0", "blurhash", "bytesize", @@ -4880,6 +4942,7 @@ dependencies = [ "crossbeam-queue", "eyeball", "eyeball-im", + "futures", "futures-util", "hashbrown 0.16.1", "htmlize", @@ -4907,6 +4970,7 @@ dependencies = [ "serde_json", "thiserror 2.0.17", "tokio", + "tokio-stream", "tracing-subscriber", "tsp_sdk", "unicode-segmentation", @@ -5146,7 +5210,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -5993,7 +6057,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -6974,7 +7038,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 5c041614c..0a6f2f112 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,6 +87,10 @@ reqwest = { version = "0.12", default-features = false, features = [ "macos-system-configuration", ] } +## Dependencies for SSE support. +axum = "0.7" +futures = "0.3" +tokio-stream = "0.1" [features] default = [] diff --git a/src/bin/sse_server.rs b/src/bin/sse_server.rs new file mode 100644 index 000000000..04a64f65f --- /dev/null +++ b/src/bin/sse_server.rs @@ -0,0 +1,299 @@ +//! SSE Server with Matrix Bot +//! +//! This binary runs: +//! 1. An SSE (Server-Sent Events) server at http://127.0.0.1:3001/events +//! 2. A Matrix bot (testuser2) that responds to "/sse" commands with SSE stream headers +//! +//! When the bot receives "/sse", it sends a message with format: +//! `!SSE|http://127.0.0.1:3001/events|` +//! +//! After streaming completes, the bot edits the message with the full content. + +#![recursion_limit = "256"] + +use axum::{ + response::sse::{Event, Sse}, + routing::get, + Router, +}; +use futures::stream::{self, Stream}; +use std::{collections::HashMap, convert::Infallible, sync::Arc, time::Duration}; +use tokio::sync::Mutex; +use tokio_stream::StreamExt; + +use matrix_sdk::{ + Client, + config::SyncSettings, + room::Room, + ruma::{ + OwnedEventId, OwnedRoomId, + events::room::message::{ + MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent, + ReplacementMetadata, + }, + }, +}; + +/// Shared state for tracking SSE messages that need to be edited +#[derive(Clone, Default)] +struct SseState { + /// Maps event_id to (room_id, accumulated_content) + pending_edits: Arc>>, +} + +/// The messages to stream via SSE +const SSE_MESSAGES: &[&str] = &[ + "Breaking: New Makepad release brings revolutionary UI performance!", + "Tech Update: Rust continues to dominate systems programming", + "Local News: Community embraces new SSE widget for real-time updates", + "Science: Researchers achieve breakthrough in quantum computing", + "Weather: Sunny skies expected throughout the week", + "[STREAM_END]", // Marker for end of stream +]; + +/// SSE handler that streams messages and signals completion +async fn sse_handler() -> Sse>> { + let stream = stream::iter(SSE_MESSAGES.iter().enumerate()) + .throttle(Duration::from_secs(1)) + .map(|(i, msg)| { + let data = if *msg == "[STREAM_END]" { + format!(r#"{{"id": {}, "status": "complete"}}"#, i) + } else { + format!( + r#"{{"id": {}, "title": "News #{}", "content": "{}"}}"#, + i, + i + 1, + msg + ) + }; + Ok(Event::default().data(data)) + }); + + Sse::new(stream).keep_alive( + axum::response::sse::KeepAlive::new() + .interval(Duration::from_secs(10)) + .text("keep-alive"), + ) +} + +/// Run the Matrix bot that responds to /sse commands +async fn run_matrix_bot(state: SseState) -> anyhow::Result<()> { + let homeserver_url = "http://localhost:8008"; + let username = "testuser2"; + let password = "testpassword"; + + println!("Matrix bot: Connecting to homeserver {}", homeserver_url); + + // Build the client + let client = Client::builder() + .homeserver_url(homeserver_url) + .build() + .await?; + + // Login + client + .matrix_auth() + .login_username(username, password) + .initial_device_display_name("sse-bot") + .send() + .await?; + + println!("Matrix bot: Logged in as {}", username); + + // Clone state for the event handler + let state_clone = state.clone(); + let client_clone = client.clone(); + + // Add event handler for room messages + client.add_event_handler( + move |event: OriginalSyncRoomMessageEvent, room: Room| { + let state = state_clone.clone(); + let client = client_clone.clone(); + async move { + handle_room_message(event, room, state, client).await; + } + }, + ); + + // Start syncing + println!("Matrix bot: Starting sync..."); + client.sync(SyncSettings::default()).await?; + + Ok(()) +} + +/// Handle incoming room messages +async fn handle_room_message( + event: OriginalSyncRoomMessageEvent, + room: Room, + state: SseState, + client: Client, +) { + // Only handle text messages + let MessageType::Text(text_content) = &event.content.msgtype else { + return; + }; + + let body = text_content.body.trim(); + + // Check if message is "/sse" command + if body == "/sse" { + println!("Matrix bot: Received /sse command from {}", event.sender); + + // Send SSE header message + let sse_message = "!SSE|http://127.0.0.1:3001/events|"; + let content = RoomMessageEventContent::text_plain(sse_message); + + match room.send(content).await { + Ok(response) => { + let event_id = response.response.event_id; + let room_id = room.room_id().to_owned(); + + println!( + "Matrix bot: Sent SSE header message, event_id: {}", + event_id + ); + + // Store the event_id for later editing + { + let mut pending = state.pending_edits.lock().await; + pending.insert(event_id.clone(), (room_id, String::new())); + } + + // Spawn a task to fetch SSE and accumulate content + let state_clone = state.clone(); + let client_clone = client.clone(); + tokio::spawn(async move { + fetch_sse_and_edit(event_id, state_clone, client_clone).await; + }); + } + Err(e) => { + eprintln!("Matrix bot: Failed to send message: {}", e); + } + } + } +} + +/// Fetch SSE content and edit the original message when complete +async fn fetch_sse_and_edit(event_id: OwnedEventId, state: SseState, client: Client) { + println!("Matrix bot: Starting SSE fetch for event {}", event_id); + + let sse_url = "http://127.0.0.1:3001/events"; + + // Use reqwest to fetch SSE + let response = match reqwest::Client::new() + .get(sse_url) + .header("Accept", "text/event-stream") + .send() + .await + { + Ok(resp) => resp, + Err(e) => { + eprintln!("Matrix bot: Failed to connect to SSE server: {}", e); + return; + } + }; + + let mut accumulated_content = String::new(); + let mut stream = response.bytes_stream(); + + while let Some(chunk_result) = stream.next().await { + match chunk_result { + Ok(chunk) => { + let text = String::from_utf8_lossy(&chunk); + // Parse SSE data lines + for line in text.lines() { + if let Some(data) = line.strip_prefix("data: ") { + // Parse JSON to extract content + if let Ok(json) = serde_json::from_str::(data) { + if let Some(content) = json.get("content").and_then(|c| c.as_str()) { + if !accumulated_content.is_empty() { + accumulated_content.push_str("\n"); + } + accumulated_content.push_str(content); + } + // Check for stream end + if json.get("status").and_then(|s| s.as_str()) == Some("complete") { + println!("Matrix bot: SSE stream completed"); + break; + } + } + } + } + } + Err(e) => { + eprintln!("Matrix bot: Error reading SSE stream: {}", e); + break; + } + } + } + + // Now edit the original message with accumulated content + let room_id = { + let pending = state.pending_edits.lock().await; + pending.get(&event_id).map(|(room_id, _)| room_id.clone()) + }; + + if let Some(room_id) = room_id { + if let Some(room) = client.get_room(&room_id) { + let final_content = format!( + "SSE Stream Complete:\n\n{}", + accumulated_content + ); + + // Create edit content using ReplacementMetadata + let metadata = ReplacementMetadata::new(event_id.clone(), None); + let new_content = RoomMessageEventContent::text_plain(&final_content) + .make_replacement(metadata); + + match room.send(new_content).await { + Ok(_) => { + println!("Matrix bot: Successfully edited message {}", event_id); + } + Err(e) => { + eprintln!("Matrix bot: Failed to edit message: {}", e); + } + } + + // Remove from pending edits + let mut pending = state.pending_edits.lock().await; + pending.remove(&event_id); + } + } +} + +#[tokio::main] +async fn main() { + println!("Starting SSE Server with Matrix Bot..."); + + // Create shared state + let state = SseState::default(); + + // Create SSE server + let app = Router::new().route("/events", get(sse_handler)); + + // Spawn SSE server + let sse_handle = tokio::spawn(async move { + let listener = tokio::net::TcpListener::bind("127.0.0.1:3001") + .await + .unwrap(); + println!("SSE Server running on http://127.0.0.1:3001/events"); + axum::serve(listener, app).await.unwrap(); + }); + + // Give SSE server a moment to start + tokio::time::sleep(Duration::from_millis(500)).await; + + // Run Matrix bot + let bot_handle = tokio::spawn(async move { + if let Err(e) = run_matrix_bot(state).await { + eprintln!("Matrix bot error: {}", e); + } + }); + + // Wait for both to complete (they won't under normal operation) + tokio::select! { + _ = sse_handle => println!("SSE server stopped"), + _ = bot_handle => println!("Matrix bot stopped"), + } +} diff --git a/src/home/room_screen.rs b/src/home/room_screen.rs index dc42a462e..9c87bef3c 100644 --- a/src/home/room_screen.rs +++ b/src/home/room_screen.rs @@ -2,6 +2,7 @@ //! of events (messages,state changes, etc.), along with an input bar at the bottom. use std::{borrow::Cow, cell::RefCell, ops::{DerefMut, Range}, sync::Arc}; +use futures_util::StreamExt as FuturesStreamExt; use bytesize::ByteSize; use hashbrown::{HashMap, HashSet}; @@ -1149,6 +1150,8 @@ impl Widget for RoomScreen { &self.pinned_events, item_drawn_status, room_screen_widget_uid, + &mut tl_state.sse_streams, + tl_state.update_receiver.clone(), ) }, // TODO: properly implement `Poll` as a regular Message-like timeline item. @@ -1617,6 +1620,19 @@ impl RoomScreen { tl.tombstone_info = Some(successor_room_details); } TimelineUpdate::LinkPreviewFetched => {} + TimelineUpdate::SseContentUpdate { event_id, content, is_complete } => { + // Update the SSE stream state for this event + if let Some(sse_state) = tl.sse_streams.get_mut(&event_id) { + sse_state.accumulated_content.push_str(&content); + if is_complete { + sse_state.is_complete = true; + sse_state.is_fetching = false; + } + } + // Clear the draw cache for all items so the message gets redrawn + // with the updated SSE content + tl.content_drawn_since_last_update.clear(); + } } } @@ -2286,6 +2302,7 @@ impl RoomScreen { scrolled_past_read_marker: false, latest_own_user_receipt: None, tombstone_info, + sse_streams: HashMap::new(), }; (tl_state, true) }; @@ -2759,6 +2776,15 @@ pub enum TimelineUpdate { Tombstoned(SuccessorRoomDetails), /// A notice that link preview data for a URL has been fetched and is now available. LinkPreviewFetched, + /// An update containing SSE (Server-Sent Events) content for a message. + SseContentUpdate { + /// The event ID of the message that contains the SSE stream. + event_id: OwnedEventId, + /// The new content to append to the message. + content: String, + /// Whether this is the final update (stream completed). + is_complete: bool, + }, } thread_local! { @@ -2877,6 +2903,34 @@ struct TimelineUiState { /// If `Some`, this room has been tombstoned and the details of its successor room /// are contained within. If `None`, the room has not been tombstoned. tombstone_info: Option, + + /// Tracks active SSE streams by event ID. + /// Maps event_id to (accumulated_content, is_fetching). + sse_streams: HashMap, +} + +/// State for tracking an SSE (Server-Sent Events) stream for a message. +#[derive(Debug, Clone)] +struct SseStreamState { + /// The URL of the SSE endpoint. + url: String, + /// Accumulated content from the SSE stream. + accumulated_content: String, + /// Whether the SSE stream is currently being fetched. + is_fetching: bool, + /// Whether the stream has completed. + is_complete: bool, +} + +impl Default for SseStreamState { + fn default() -> Self { + Self { + url: String::new(), + accumulated_content: String::new(), + is_fetching: false, + is_complete: false, + } + } } #[derive(Default, Debug)] @@ -3009,6 +3063,8 @@ fn populate_message_view( pinned_events: &[OwnedEventId], item_drawn_status: ItemDrawnStatus, room_screen_widget_uid: WidgetUid, + sse_streams: &mut HashMap, + update_receiver: crossbeam_channel::Receiver, ) -> (WidgetRef, ItemDrawnStatus) { let mut new_drawn_status = item_drawn_status; let ts_millis = event_tl_item.timestamp(); @@ -3051,11 +3107,49 @@ fn populate_message_view( if existed && item_drawn_status.content_drawn { (item, true) } else { + // Check for SSE pattern: !SSE|| + let (display_body, display_formatted) = if let Some(sse_url) = parse_sse_header(body) { + // Get or create SSE stream state + if let Some(event_id) = event_tl_item.event_id() { + let sse_state = sse_streams.entry(event_id.to_owned()).or_insert_with(|| { + SseStreamState { + url: sse_url.clone(), + accumulated_content: String::new(), + is_fetching: false, + is_complete: false, + } + }); + + // Start SSE fetch if not already fetching + if !sse_state.is_fetching && !sse_state.is_complete { + sse_state.is_fetching = true; + start_sse_fetch( + event_id.to_owned(), + sse_url, + update_receiver.clone(), + ); + } + + // Display accumulated content or loading message + if sse_state.accumulated_content.is_empty() { + (format!("Loading SSE from {}...", sse_state.url), None) + } else if sse_state.is_complete { + (format!("SSE Complete:\n\n{}", sse_state.accumulated_content), None) + } else { + (format!("SSE Streaming...\n\n{}", sse_state.accumulated_content), None) + } + } else { + (body.to_string(), formatted.clone()) + } + } else { + (body.to_string(), formatted.clone()) + }; + new_drawn_status.content_drawn = populate_text_message_content( cx, &item.html_or_plaintext(ids!(content.message)), - body, - formatted.as_ref(), + &display_body, + display_formatted.as_ref(), Some(&mut item.link_preview(ids!(content.link_preview_view))), Some(media_cache), Some(link_preview_cache), @@ -4787,10 +4881,151 @@ impl MessageRef { /// /// This function requires passing in a reference to `Cx`, /// which isn't used, but acts as a guarantee that this function -/// must only be called by the main UI thread. +/// must only be called by the main UI thread. pub fn clear_timeline_states(_cx: &mut Cx) { // Clear timeline states cache TIMELINE_STATES.with_borrow_mut(|states| { states.clear(); }); } + +/// Parses an SSE header from a message body. +/// +/// The expected format is: `!SSE||` +/// For example: `!SSE|http://127.0.0.1:3000/events|` +/// +/// Returns the URL if the pattern matches, otherwise None. +fn parse_sse_header(body: &str) -> Option { + let trimmed = body.trim(); + if trimmed.starts_with("!SSE|") { + // Find the closing | + if let Some(end_idx) = trimmed[5..].find('|') { + let url = &trimmed[5..5 + end_idx]; + if !url.is_empty() { + return Some(url.to_string()); + } + } + } + None +} + +/// Starts an SSE fetch in a background task. +/// +/// This function spawns an async task that: +/// 1. Connects to the SSE URL +/// 2. Reads the event stream +/// 3. Sends updates via the timeline update channel +fn start_sse_fetch( + event_id: OwnedEventId, + url: String, + _update_receiver: crossbeam_channel::Receiver, +) { + // We need to get the update sender, not receiver + // The sender should be obtained from the timeline state + // For now, we'll use a global approach with SignalToUI + + use tokio::runtime::Handle; + + // Check if we're in a tokio runtime context + if let Ok(handle) = Handle::try_current() { + let event_id_clone = event_id.clone(); + handle.spawn(async move { + log!("Starting SSE fetch for event {} from {}", event_id_clone, url); + + match fetch_sse_content(&url).await { + Ok(content_updates) => { + for (content, is_complete) in content_updates { + // We need to send updates through the timeline system + // Since we don't have direct access to the sender, + // we'll use the global TIMELINE_STATES to update + update_sse_content(&event_id_clone, &content, is_complete); + SignalToUI::set_ui_signal(); + + if is_complete { + break; + } + } + } + Err(e) => { + error!("SSE fetch error for event {}: {}", event_id_clone, e); + update_sse_content(&event_id_clone, &format!("Error: {}", e), true); + SignalToUI::set_ui_signal(); + } + } + }); + } else { + error!("No tokio runtime available for SSE fetch"); + } +} + +/// Fetches SSE content from the given URL. +/// +/// Returns a vector of (content, is_complete) tuples. +async fn fetch_sse_content(url: &str) -> Result, String> { + + let client = reqwest::Client::new(); + let response = client + .get(url) + .header("Accept", "text/event-stream") + .send() + .await + .map_err(|e| format!("Failed to connect: {}", e))?; + + let mut stream = response.bytes_stream(); + let mut updates = Vec::new(); + let mut accumulated = String::new(); + + while let Some(chunk_result) = stream.next().await { + match chunk_result { + Ok(chunk) => { + let text = String::from_utf8_lossy(&chunk); + for line in text.lines() { + if let Some(data) = line.strip_prefix("data: ") { + // Parse JSON to extract content + if let Ok(json) = serde_json::from_str::(data) { + if let Some(content) = json.get("content").and_then(|c| c.as_str()) { + if !accumulated.is_empty() { + accumulated.push('\n'); + } + accumulated.push_str(content); + updates.push((accumulated.clone(), false)); + } + // Check for stream end + if json.get("status").and_then(|s| s.as_str()) == Some("complete") { + updates.push((accumulated.clone(), true)); + return Ok(updates); + } + } + } + } + } + Err(e) => { + return Err(format!("Stream error: {}", e)); + } + } + } + + // Stream ended without explicit complete marker + updates.push((accumulated, true)); + Ok(updates) +} + +/// Updates the SSE content in the timeline state. +/// +/// This is called from the background SSE fetch task to update the accumulated content. +fn update_sse_content(event_id: &OwnedEventId, content: &str, is_complete: bool) { + TIMELINE_STATES.with_borrow_mut(|states| { + for tl_state in states.values_mut() { + if let Some(sse_state) = tl_state.sse_streams.get_mut(event_id) { + sse_state.accumulated_content = content.to_string(); + sse_state.is_complete = is_complete; + if is_complete { + sse_state.is_fetching = false; + } + // Clear the draw cache so the message gets redrawn + tl_state.content_drawn_since_last_update.clear(); + return; + } + } + }); +} From 90eb30a1896922843ccd7cb2e40b0a4aa6f46d4d Mon Sep 17 00:00:00 2001 From: alanpoon Date: Thu, 12 Mar 2026 13:10:19 +0800 Subject: [PATCH 2/3] added event_source --- Cargo.lock | 34 ++++++ Cargo.toml | 1 + src/bin/sse_server.rs | 232 ++++++++++++++++++++-------------------- src/home/room_screen.rs | 139 ++++-------------------- src/sliding_sync.rs | 124 +++++++++++++++++++++ 5 files changed, 297 insertions(+), 233 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a3cc763c9..9b8e3e521 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1683,6 +1683,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "eyeball" version = "0.8.8" @@ -1964,6 +1975,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -4795,6 +4812,22 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "reqwest-eventsource" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632c55746dbb44275691640e7b40c907c16a2dc1a5842aa98aaec90da6ec6bde" +dependencies = [ + "eventsource-stream", + "futures-core", + "futures-timer", + "mime", + "nom", + "pin-project-lite", + "reqwest", + "thiserror 1.0.69", +] + [[package]] name = "resvg" version = "0.42.0" @@ -4960,6 +4993,7 @@ dependencies = [ "rand 0.8.5", "rangemap", "reqwest", + "reqwest-eventsource", "robius-directories", "robius-location", "robius-open", diff --git a/Cargo.toml b/Cargo.toml index 0a6f2f112..5bb968731 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,6 +91,7 @@ reqwest = { version = "0.12", default-features = false, features = [ axum = "0.7" futures = "0.3" tokio-stream = "0.1" +reqwest-eventsource = "0.6" [features] default = [] diff --git a/src/bin/sse_server.rs b/src/bin/sse_server.rs index 04a64f65f..533b89f79 100644 --- a/src/bin/sse_server.rs +++ b/src/bin/sse_server.rs @@ -12,6 +12,7 @@ #![recursion_limit = "256"] use axum::{ + extract::State, response::sse::{Event, Sse}, routing::get, Router, @@ -35,10 +36,21 @@ use matrix_sdk::{ }; /// Shared state for tracking SSE messages that need to be edited -#[derive(Clone, Default)] -struct SseState { +#[derive(Clone)] +struct AppState { /// Maps event_id to (room_id, accumulated_content) pending_edits: Arc>>, + /// Matrix client for editing messages + client: Arc>>, +} + +impl Default for AppState { + fn default() -> Self { + Self { + pending_edits: Arc::new(Mutex::new(HashMap::new())), + client: Arc::new(Mutex::new(None)), + } + } } /// The messages to stream via SSE @@ -48,15 +60,32 @@ const SSE_MESSAGES: &[&str] = &[ "Local News: Community embraces new SSE widget for real-time updates", "Science: Researchers achieve breakthrough in quantum computing", "Weather: Sunny skies expected throughout the week", - "[STREAM_END]", // Marker for end of stream ]; -/// SSE handler that streams messages and signals completion -async fn sse_handler() -> Sse>> { +/// SSE handler that streams messages and edits Matrix message when complete +async fn sse_handler( + State(state): State, +) -> Sse>> { + // Collect all content for the final edit + let all_content: String = SSE_MESSAGES + .iter() + .map(|s| *s) + .collect::>() + .join("\n"); + + // Create a channel to signal stream completion + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + let tx = Arc::new(Mutex::new(Some(tx))); + + let state_clone = state.clone(); + let tx_clone = tx.clone(); + let stream = stream::iter(SSE_MESSAGES.iter().enumerate()) + .chain(stream::once(async { (SSE_MESSAGES.len(), &"[STREAM_END]") })) .throttle(Duration::from_secs(1)) - .map(|(i, msg)| { - let data = if *msg == "[STREAM_END]" { + .map(move |(i, msg)| { + let is_end = *msg == "[STREAM_END]"; + let data = if is_end { format!(r#"{{"id": {}, "status": "complete"}}"#, i) } else { format!( @@ -66,9 +95,29 @@ async fn sse_handler() -> Sse>> { msg ) }; + + // Signal completion when STREAM_END is sent + if is_end { + let tx = tx_clone.clone(); + tokio::spawn(async move { + if let Some(sender) = tx.lock().await.take() { + let _ = sender.send(()); + } + }); + } + Ok(Event::default().data(data)) }); + // Spawn a task that waits for stream completion signal, then edits the message + let content_for_edit = all_content; + tokio::spawn(async move { + // Wait for the stream to signal completion + let _ = rx.await; + // Edit the pending Matrix message + edit_pending_message(state_clone, content_for_edit).await; + }); + Sse::new(stream).keep_alive( axum::response::sse::KeepAlive::new() .interval(Duration::from_secs(10)) @@ -76,12 +125,51 @@ async fn sse_handler() -> Sse>> { ) } -/// Run the Matrix bot that responds to /sse commands -async fn run_matrix_bot(state: SseState) -> anyhow::Result<()> { - let homeserver_url = "http://localhost:8008"; - let username = "testuser2"; - let password = "testpassword"; +/// Edit the most recent pending Matrix message with the accumulated content +async fn edit_pending_message(state: AppState, content: String) { + let client_guard = state.client.lock().await; + let Some(client) = client_guard.as_ref() else { + eprintln!("Matrix bot: No client available for editing"); + return; + }; + + // Get the first pending edit (there should be one per SSE request) + let pending_edit = { + let mut pending = state.pending_edits.lock().await; + pending.drain().next() + }; + + if let Some((event_id, (room_id, _))) = pending_edit { + if let Some(room) = client.get_room(&room_id) { + let final_content = format!("SSE Stream Complete:\n\n{}", content); + + // Create edit content using ReplacementMetadata + let metadata = ReplacementMetadata::new(event_id.clone(), None); + let new_content = RoomMessageEventContent::text_plain(&final_content) + .make_replacement(metadata); + match room.send(new_content).await { + Ok(_) => { + println!("Matrix bot: Successfully edited message {}", event_id); + } + Err(e) => { + eprintln!("Matrix bot: Failed to edit message: {}", e); + } + } + } + } else { + println!("Matrix bot: No pending message to edit"); + } +} + +/// Run the Matrix bot that responds to /sse commands +async fn run_matrix_bot(state: AppState) -> anyhow::Result<()> { + // let homeserver_url = "http://localhost:8008"; + // let username = "testuser2"; + // let password = "testpassword"; + let homeserver_url = "https://matrix.org"; + let username = "ruitobeta"; + let password = "!C6WS3hcPGM:GMa"; println!("Matrix bot: Connecting to homeserver {}", homeserver_url); // Build the client @@ -100,17 +188,21 @@ async fn run_matrix_bot(state: SseState) -> anyhow::Result<()> { println!("Matrix bot: Logged in as {}", username); + // Store the client in shared state for later use + { + let mut client_guard = state.client.lock().await; + *client_guard = Some(client.clone()); + } + // Clone state for the event handler let state_clone = state.clone(); - let client_clone = client.clone(); // Add event handler for room messages client.add_event_handler( move |event: OriginalSyncRoomMessageEvent, room: Room| { let state = state_clone.clone(); - let client = client_clone.clone(); async move { - handle_room_message(event, room, state, client).await; + handle_room_message(event, room, state).await; } }, ); @@ -126,8 +218,7 @@ async fn run_matrix_bot(state: SseState) -> anyhow::Result<()> { async fn handle_room_message( event: OriginalSyncRoomMessageEvent, room: Room, - state: SseState, - client: Client, + state: AppState, ) { // Only handle text messages let MessageType::Text(text_content) = &event.content.msgtype else { @@ -154,18 +245,11 @@ async fn handle_room_message( event_id ); - // Store the event_id for later editing + // Store the event_id for later editing (will be edited when SSE stream ends) { let mut pending = state.pending_edits.lock().await; - pending.insert(event_id.clone(), (room_id, String::new())); + pending.insert(event_id, (room_id, String::new())); } - - // Spawn a task to fetch SSE and accumulate content - let state_clone = state.clone(); - let client_clone = client.clone(); - tokio::spawn(async move { - fetch_sse_and_edit(event_id, state_clone, client_clone).await; - }); } Err(e) => { eprintln!("Matrix bot: Failed to send message: {}", e); @@ -174,103 +258,17 @@ async fn handle_room_message( } } -/// Fetch SSE content and edit the original message when complete -async fn fetch_sse_and_edit(event_id: OwnedEventId, state: SseState, client: Client) { - println!("Matrix bot: Starting SSE fetch for event {}", event_id); - - let sse_url = "http://127.0.0.1:3001/events"; - - // Use reqwest to fetch SSE - let response = match reqwest::Client::new() - .get(sse_url) - .header("Accept", "text/event-stream") - .send() - .await - { - Ok(resp) => resp, - Err(e) => { - eprintln!("Matrix bot: Failed to connect to SSE server: {}", e); - return; - } - }; - - let mut accumulated_content = String::new(); - let mut stream = response.bytes_stream(); - - while let Some(chunk_result) = stream.next().await { - match chunk_result { - Ok(chunk) => { - let text = String::from_utf8_lossy(&chunk); - // Parse SSE data lines - for line in text.lines() { - if let Some(data) = line.strip_prefix("data: ") { - // Parse JSON to extract content - if let Ok(json) = serde_json::from_str::(data) { - if let Some(content) = json.get("content").and_then(|c| c.as_str()) { - if !accumulated_content.is_empty() { - accumulated_content.push_str("\n"); - } - accumulated_content.push_str(content); - } - // Check for stream end - if json.get("status").and_then(|s| s.as_str()) == Some("complete") { - println!("Matrix bot: SSE stream completed"); - break; - } - } - } - } - } - Err(e) => { - eprintln!("Matrix bot: Error reading SSE stream: {}", e); - break; - } - } - } - - // Now edit the original message with accumulated content - let room_id = { - let pending = state.pending_edits.lock().await; - pending.get(&event_id).map(|(room_id, _)| room_id.clone()) - }; - - if let Some(room_id) = room_id { - if let Some(room) = client.get_room(&room_id) { - let final_content = format!( - "SSE Stream Complete:\n\n{}", - accumulated_content - ); - - // Create edit content using ReplacementMetadata - let metadata = ReplacementMetadata::new(event_id.clone(), None); - let new_content = RoomMessageEventContent::text_plain(&final_content) - .make_replacement(metadata); - - match room.send(new_content).await { - Ok(_) => { - println!("Matrix bot: Successfully edited message {}", event_id); - } - Err(e) => { - eprintln!("Matrix bot: Failed to edit message: {}", e); - } - } - - // Remove from pending edits - let mut pending = state.pending_edits.lock().await; - pending.remove(&event_id); - } - } -} - #[tokio::main] async fn main() { println!("Starting SSE Server with Matrix Bot..."); // Create shared state - let state = SseState::default(); + let state = AppState::default(); - // Create SSE server - let app = Router::new().route("/events", get(sse_handler)); + // Create SSE server with state + let app = Router::new() + .route("/events", get(sse_handler)) + .with_state(state.clone()); // Spawn SSE server let sse_handle = tokio::spawn(async move { diff --git a/src/home/room_screen.rs b/src/home/room_screen.rs index 9c87bef3c..695df99e4 100644 --- a/src/home/room_screen.rs +++ b/src/home/room_screen.rs @@ -2,7 +2,6 @@ //! of events (messages,state changes, etc.), along with an input bar at the bottom. use std::{borrow::Cow, cell::RefCell, ops::{DerefMut, Range}, sync::Arc}; -use futures_util::StreamExt as FuturesStreamExt; use bytesize::ByteSize; use hashbrown::{HashMap, HashSet}; @@ -1151,7 +1150,7 @@ impl Widget for RoomScreen { item_drawn_status, room_screen_widget_uid, &mut tl_state.sse_streams, - tl_state.update_receiver.clone(), + tl_state.update_sender.clone(), ) }, // TODO: properly implement `Poll` as a regular Message-like timeline item. @@ -1622,8 +1621,9 @@ impl RoomScreen { TimelineUpdate::LinkPreviewFetched => {} TimelineUpdate::SseContentUpdate { event_id, content, is_complete } => { // Update the SSE stream state for this event + // Note: content is already the full accumulated content from sliding_sync if let Some(sse_state) = tl.sse_streams.get_mut(&event_id) { - sse_state.accumulated_content.push_str(&content); + sse_state.accumulated_content = content; if is_complete { sse_state.is_complete = true; sse_state.is_fetching = false; @@ -1632,6 +1632,12 @@ impl RoomScreen { // Clear the draw cache for all items so the message gets redrawn // with the updated SSE content tl.content_drawn_since_last_update.clear(); + + // Auto-scroll to bottom while SSE is streaming + if !tl.items.is_empty() { + portal_list.set_first_id_and_scroll(tl.items.len().saturating_sub(1), 0.0); + portal_list.set_tail_range(true); + } } } } @@ -2290,6 +2296,7 @@ impl RoomScreen { content_drawn_since_last_update: RangeSet::new(), profile_drawn_since_last_update: RangeSet::new(), update_receiver, + update_sender: update_sender.clone(), request_sender, media_cache: MediaCache::new(Some(update_sender.clone())), link_preview_cache: LinkPreviewCache::new(Some(update_sender)), @@ -2846,6 +2853,9 @@ struct TimelineUiState { /// which is okay because a sender on an unbounded channel never needs to block. update_receiver: crossbeam_channel::Receiver, + /// The channel sender for timeline updates, used for SSE fetching. + update_sender: crossbeam_channel::Sender, + /// The sender for timeline requests from a RoomScreen showing this room /// to the background async task that handles this room's timeline updates. request_sender: TimelineRequestSender, @@ -3064,7 +3074,7 @@ fn populate_message_view( item_drawn_status: ItemDrawnStatus, room_screen_widget_uid: WidgetUid, sse_streams: &mut HashMap, - update_receiver: crossbeam_channel::Receiver, + update_sender: crossbeam_channel::Sender, ) -> (WidgetRef, ItemDrawnStatus) { let mut new_drawn_status = item_drawn_status; let ts_millis = event_tl_item.timestamp(); @@ -3126,7 +3136,7 @@ fn populate_message_view( start_sse_fetch( event_id.to_owned(), sse_url, - update_receiver.clone(), + update_sender.clone(), ); } @@ -4911,121 +4921,18 @@ fn parse_sse_header(body: &str) -> Option { /// Starts an SSE fetch in a background task. /// -/// This function spawns an async task that: -/// 1. Connects to the SSE URL -/// 2. Reads the event stream -/// 3. Sends updates via the timeline update channel +/// This function submits an async request to fetch SSE content. +/// Updates are sent via the timeline update channel. fn start_sse_fetch( event_id: OwnedEventId, url: String, - _update_receiver: crossbeam_channel::Receiver, + update_sender: crossbeam_channel::Sender, ) { - // We need to get the update sender, not receiver - // The sender should be obtained from the timeline state - // For now, we'll use a global approach with SignalToUI - - use tokio::runtime::Handle; - - // Check if we're in a tokio runtime context - if let Ok(handle) = Handle::try_current() { - let event_id_clone = event_id.clone(); - handle.spawn(async move { - log!("Starting SSE fetch for event {} from {}", event_id_clone, url); - - match fetch_sse_content(&url).await { - Ok(content_updates) => { - for (content, is_complete) in content_updates { - // We need to send updates through the timeline system - // Since we don't have direct access to the sender, - // we'll use the global TIMELINE_STATES to update - update_sse_content(&event_id_clone, &content, is_complete); - SignalToUI::set_ui_signal(); - - if is_complete { - break; - } - } - } - Err(e) => { - error!("SSE fetch error for event {}: {}", event_id_clone, e); - update_sse_content(&event_id_clone, &format!("Error: {}", e), true); - SignalToUI::set_ui_signal(); - } - } - }); - } else { - error!("No tokio runtime available for SSE fetch"); - } -} - -/// Fetches SSE content from the given URL. -/// -/// Returns a vector of (content, is_complete) tuples. -async fn fetch_sse_content(url: &str) -> Result, String> { - - let client = reqwest::Client::new(); - let response = client - .get(url) - .header("Accept", "text/event-stream") - .send() - .await - .map_err(|e| format!("Failed to connect: {}", e))?; - - let mut stream = response.bytes_stream(); - let mut updates = Vec::new(); - let mut accumulated = String::new(); - - while let Some(chunk_result) = stream.next().await { - match chunk_result { - Ok(chunk) => { - let text = String::from_utf8_lossy(&chunk); - for line in text.lines() { - if let Some(data) = line.strip_prefix("data: ") { - // Parse JSON to extract content - if let Ok(json) = serde_json::from_str::(data) { - if let Some(content) = json.get("content").and_then(|c| c.as_str()) { - if !accumulated.is_empty() { - accumulated.push('\n'); - } - accumulated.push_str(content); - updates.push((accumulated.clone(), false)); - } - // Check for stream end - if json.get("status").and_then(|s| s.as_str()) == Some("complete") { - updates.push((accumulated.clone(), true)); - return Ok(updates); - } - } - } - } - } - Err(e) => { - return Err(format!("Stream error: {}", e)); - } - } - } - - // Stream ended without explicit complete marker - updates.push((accumulated, true)); - Ok(updates) -} + use crate::sliding_sync::{submit_async_request, MatrixRequest}; -/// Updates the SSE content in the timeline state. -/// -/// This is called from the background SSE fetch task to update the accumulated content. -fn update_sse_content(event_id: &OwnedEventId, content: &str, is_complete: bool) { - TIMELINE_STATES.with_borrow_mut(|states| { - for tl_state in states.values_mut() { - if let Some(sse_state) = tl_state.sse_streams.get_mut(event_id) { - sse_state.accumulated_content = content.to_string(); - sse_state.is_complete = is_complete; - if is_complete { - sse_state.is_fetching = false; - } - // Clear the draw cache so the message gets redrawn - tl_state.content_drawn_since_last_update.clear(); - return; - } - } + submit_async_request(MatrixRequest::FetchSse { + event_id, + url, + update_sender, }); } diff --git a/src/sliding_sync.rs b/src/sliding_sync.rs index 4d4455b1f..55c2ab47c 100644 --- a/src/sliding_sync.rs +++ b/src/sliding_sync.rs @@ -664,6 +664,15 @@ pub enum MatrixRequest { destination: Arc>, update_sender: Option>, }, + /// Request to fetch SSE (Server-Sent Events) content from a URL. + FetchSse { + /// The event ID of the message containing the SSE header. + event_id: OwnedEventId, + /// The SSE endpoint URL to fetch from. + url: String, + /// The sender for timeline updates. + update_sender: crossbeam_channel::Sender, + }, } /// Submits a request to the worker thread to be executed asynchronously. @@ -1928,6 +1937,121 @@ async fn matrix_worker_task( SignalToUI::set_ui_signal(); }); } + + MatrixRequest::FetchSse { event_id, url, update_sender } => { + let _fetch_sse_task = Handle::current().spawn(async move { + use futures_util::StreamExt; + use reqwest_eventsource::{Event, EventSource}; + + log!("Starting SSE fetch for event {} from {}", event_id, url); + + let client = reqwest::Client::builder() + .no_proxy() + .build() + .unwrap_or_default(); + let request = client.get(&url); + + let mut es = match EventSource::new(request) { + Ok(es) => es, + Err(e) => { + error!("SSE: Failed to create EventSource for {}: {:?}", url, e); + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id, + content: format!("Error creating EventSource: {:?}", e), + is_complete: true, + }); + SignalToUI::set_ui_signal(); + return; + } + }; + + log!("SSE: Connected to {}, waiting for events...", url); + let mut accumulated = String::new(); + + while let Some(event) = es.next().await { + match event { + Ok(Event::Open) => { + log!("SSE: Connection opened for event {}", event_id); + } + Ok(Event::Message(msg)) => { + log!("SSE: Event '{}': {}", msg.event, msg.data); + + // Try to parse as JSON first + if let Ok(json) = serde_json::from_str::(&msg.data) { + // Check for content field + if let Some(content) = json.get("content").and_then(|c| c.as_str()) { + if !accumulated.is_empty() { + accumulated.push('\n'); + } + accumulated.push_str(content); + + // Send incremental update + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id: event_id.clone(), + content: accumulated.clone(), + is_complete: false, + }); + SignalToUI::set_ui_signal(); + } + + // Check for stream completion + if json.get("status").and_then(|s| s.as_str()) == Some("complete") + || json.get("type").and_then(|t| t.as_str()) == Some("complete") + { + log!("SSE: Stream completed for event {}", event_id); + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id: event_id.clone(), + content: accumulated.clone(), + is_complete: true, + }); + SignalToUI::set_ui_signal(); + es.close(); + return; + } + } else { + // Not JSON, treat as raw content + if !msg.data.is_empty() { + if !accumulated.is_empty() { + accumulated.push('\n'); + } + accumulated.push_str(&msg.data); + + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id: event_id.clone(), + content: accumulated.clone(), + is_complete: false, + }); + SignalToUI::set_ui_signal(); + } + } + } + Err(e) => { + error!("SSE: Error for event {}: {:?}", event_id, e); + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id, + content: if accumulated.is_empty() { + format!("SSE Error: {:?}", e) + } else { + format!("{}\n\nSSE Error: {:?}", accumulated, e) + }, + is_complete: true, + }); + SignalToUI::set_ui_signal(); + return; + } + } + } + + // Stream ended + log!("SSE: Connection closed for event {}", event_id); + let _ = update_sender.send(crate::home::room_screen::TimelineUpdate::SseContentUpdate { + event_id, + content: accumulated, + is_complete: true, + }); + SignalToUI::set_ui_signal(); + }); + } } } From d4c02fe69cad09cdb82ed9dbf65005c907813b21 Mon Sep 17 00:00:00 2001 From: alanpoon Date: Thu, 12 Mar 2026 13:35:04 +0800 Subject: [PATCH 3/3] remove unneccessary update_sender --- src/home/room_screen.rs | 90 ++++++++++++++++++++++++----------------- src/sliding_sync.rs | 12 ++++-- 2 files changed, 63 insertions(+), 39 deletions(-) diff --git a/src/home/room_screen.rs b/src/home/room_screen.rs index 695df99e4..5aff37d8d 100644 --- a/src/home/room_screen.rs +++ b/src/home/room_screen.rs @@ -1150,7 +1150,6 @@ impl Widget for RoomScreen { item_drawn_status, room_screen_widget_uid, &mut tl_state.sse_streams, - tl_state.update_sender.clone(), ) }, // TODO: properly implement `Poll` as a regular Message-like timeline item. @@ -1413,6 +1412,43 @@ impl RoomScreen { // log!("process_timeline_updates(): changed_indices: {changed_indices:?}, items len: {}\ncontent drawn: {:#?}\nprofile drawn: {:#?}", items.len(), tl.content_drawn_since_last_update, tl.profile_drawn_since_last_update); } tl.items = new_items; + + // Check new items for SSE messages and start fetching if needed + for item in tl.items.iter() { + if let Some(event_tl_item) = item.as_event() { + if let Some(event_id) = event_tl_item.event_id() { + // Check if this message contains an SSE header + if let TimelineItemContent::MsgLike(msg_like_content) = event_tl_item.content() { + if let MsgLikeKind::Message(message) = &msg_like_content.kind { + if let MessageType::Text(text_content) = message.msgtype() { + if let Some(sse_url) = parse_sse_header(&text_content.body) { + // Get or create SSE stream state + let sse_state = tl.sse_streams.entry(event_id.to_owned()).or_insert_with(|| { + SseStreamState { + url: sse_url.clone(), + accumulated_content: String::new(), + is_fetching: false, + is_complete: false, + } + }); + + // Start SSE fetch if not already fetching + if !sse_state.is_fetching && !sse_state.is_complete { + sse_state.is_fetching = true; + start_sse_fetch( + tl.kind.clone(), + event_id.to_owned(), + sse_url, + ); + } + } + } + } + } + } + } + } + done_loading = true; } TimelineUpdate::NewUnreadMessagesCount(unread_messages_count) => { @@ -2296,7 +2332,6 @@ impl RoomScreen { content_drawn_since_last_update: RangeSet::new(), profile_drawn_since_last_update: RangeSet::new(), update_receiver, - update_sender: update_sender.clone(), request_sender, media_cache: MediaCache::new(Some(update_sender.clone())), link_preview_cache: LinkPreviewCache::new(Some(update_sender)), @@ -2853,9 +2888,6 @@ struct TimelineUiState { /// which is okay because a sender on an unbounded channel never needs to block. update_receiver: crossbeam_channel::Receiver, - /// The channel sender for timeline updates, used for SSE fetching. - update_sender: crossbeam_channel::Sender, - /// The sender for timeline requests from a RoomScreen showing this room /// to the background async task that handles this room's timeline updates. request_sender: TimelineRequestSender, @@ -3074,7 +3106,6 @@ fn populate_message_view( item_drawn_status: ItemDrawnStatus, room_screen_widget_uid: WidgetUid, sse_streams: &mut HashMap, - update_sender: crossbeam_channel::Sender, ) -> (WidgetRef, ItemDrawnStatus) { let mut new_drawn_status = item_drawn_status; let ts_millis = event_tl_item.timestamp(); @@ -3117,36 +3148,23 @@ fn populate_message_view( if existed && item_drawn_status.content_drawn { (item, true) } else { - // Check for SSE pattern: !SSE|| - let (display_body, display_formatted) = if let Some(sse_url) = parse_sse_header(body) { - // Get or create SSE stream state + // Check for SSE pattern in the message and display SSE content if available. + // Note: SSE fetch is triggered in process_timeline_updates when NewItems arrive. + let (display_body, display_formatted) = if parse_sse_header(body).is_some() { + // Check if we have SSE state for this event if let Some(event_id) = event_tl_item.event_id() { - let sse_state = sse_streams.entry(event_id.to_owned()).or_insert_with(|| { - SseStreamState { - url: sse_url.clone(), - accumulated_content: String::new(), - is_fetching: false, - is_complete: false, + if let Some(sse_state) = sse_streams.get(event_id) { + // Display accumulated content or loading message + if sse_state.accumulated_content.is_empty() { + (format!("Loading SSE from {}...", sse_state.url), None) + } else if sse_state.is_complete { + (format!("SSE Complete:\n\n{}", sse_state.accumulated_content), None) + } else { + (format!("SSE Streaming...\n\n{}", sse_state.accumulated_content), None) } - }); - - // Start SSE fetch if not already fetching - if !sse_state.is_fetching && !sse_state.is_complete { - sse_state.is_fetching = true; - start_sse_fetch( - event_id.to_owned(), - sse_url, - update_sender.clone(), - ); - } - - // Display accumulated content or loading message - if sse_state.accumulated_content.is_empty() { - (format!("Loading SSE from {}...", sse_state.url), None) - } else if sse_state.is_complete { - (format!("SSE Complete:\n\n{}", sse_state.accumulated_content), None) } else { - (format!("SSE Streaming...\n\n{}", sse_state.accumulated_content), None) + // SSE state not yet created (shouldn't happen normally) + (body.to_string(), formatted.clone()) } } else { (body.to_string(), formatted.clone()) @@ -4922,17 +4940,17 @@ fn parse_sse_header(body: &str) -> Option { /// Starts an SSE fetch in a background task. /// /// This function submits an async request to fetch SSE content. -/// Updates are sent via the timeline update channel. +/// Updates are sent via the timeline update channel obtained from the timeline_kind. fn start_sse_fetch( + timeline_kind: TimelineKind, event_id: OwnedEventId, url: String, - update_sender: crossbeam_channel::Sender, ) { use crate::sliding_sync::{submit_async_request, MatrixRequest}; submit_async_request(MatrixRequest::FetchSse { + timeline_kind, event_id, url, - update_sender, }); } diff --git a/src/sliding_sync.rs b/src/sliding_sync.rs index 55c2ab47c..5541fab9e 100644 --- a/src/sliding_sync.rs +++ b/src/sliding_sync.rs @@ -666,12 +666,12 @@ pub enum MatrixRequest { }, /// Request to fetch SSE (Server-Sent Events) content from a URL. FetchSse { + /// The timeline kind (room) where the SSE message is displayed. + timeline_kind: TimelineKind, /// The event ID of the message containing the SSE header. event_id: OwnedEventId, /// The SSE endpoint URL to fetch from. url: String, - /// The sender for timeline updates. - update_sender: crossbeam_channel::Sender, }, } @@ -1938,13 +1938,19 @@ async fn matrix_worker_task( }); } - MatrixRequest::FetchSse { event_id, url, update_sender } => { + MatrixRequest::FetchSse { timeline_kind, event_id, url } => { let _fetch_sse_task = Handle::current().spawn(async move { use futures_util::StreamExt; use reqwest_eventsource::{Event, EventSource}; log!("Starting SSE fetch for event {} from {}", event_id, url); + // Get the timeline sender using get_timeline_and_sender + let Some((_timeline, update_sender)) = get_timeline_and_sender(&timeline_kind) else { + error!("SSE: Failed to get timeline sender for {:?}", timeline_kind); + return; + }; + let client = reqwest::Client::builder() .no_proxy() .build()