From ff008905266dbd41158bfbe33e23efa8bddf469e Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 00:29:26 +0000 Subject: [PATCH 1/2] Optimize WebhookExporter to be non-blocking using background thread - Replaced blocking `std::thread::spawn` + `recv_timeout` per request with a persistent background thread and Tokio runtime. - Implemented `tokio::sync::mpsc::channel` (bounded: 100) to queue insights. - `send_insights` now uses `try_send` to avoid blocking, dropping batches if the queue is full (load shedding). - Preserves synchronous `InsightExporter` trait signature while offloading I/O. - Added `tests/webhook_performance.rs` to verify non-blocking behavior. Co-authored-by: Tuntii <121901995+Tuntii@users.noreply.github.com> --- crates/rustapi-extras/src/insight/export.rs | 118 ++++++++++-------- .../tests/webhook_performance.rs | 50 ++++++++ 2 files changed, 117 insertions(+), 51 deletions(-) create mode 100644 crates/rustapi-extras/tests/webhook_performance.rs diff --git a/crates/rustapi-extras/src/insight/export.rs b/crates/rustapi-extras/src/insight/export.rs index 0591eeb..4574c25 100644 --- a/crates/rustapi-extras/src/insight/export.rs +++ b/crates/rustapi-extras/src/insight/export.rs @@ -222,73 +222,89 @@ pub struct WebhookExporter { config: WebhookConfig, buffer: Arc>>, #[cfg(feature = "webhook")] - client: reqwest::Client, + sender: tokio::sync::mpsc::Sender>, + #[cfg(not(feature = "webhook"))] + _marker: std::marker::PhantomData<()>, } impl WebhookExporter { /// Create a new webhook exporter. pub fn new(config: WebhookConfig) -> Self { #[cfg(feature = "webhook")] - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(config.timeout_secs)) - .build() - .expect("Failed to build HTTP client"); + { + // Allow buffering up to 100 batches before dropping + let (tx, mut rx) = tokio::sync::mpsc::channel::>(100); + let config_clone = config.clone(); + + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(config_clone.timeout_secs)) + .build() + .expect("Failed to build HTTP client"); + + rt.block_on(async move { + while let Some(insights) = rx.recv().await { + let mut request = client.post(&config_clone.url).json(&insights); + + if let Some(ref auth_value) = config_clone.auth_header { + request = request.header("Authorization", auth_value); + } + + // Add custom headers + for (k, v) in &config_clone.headers { + request = request.header(k, v); + } + + match request.send().await { + Ok(response) => { + if !response.status().is_success() { + tracing::error!( + "Webhook returned status {}", + response.status() + ); + } + } + Err(e) => { + tracing::error!("Webhook error: {}", e); + } + } + } + }); + }); + + Self { + config, + buffer: Arc::new(Mutex::new(Vec::new())), + sender: tx, + } + } + #[cfg(not(feature = "webhook"))] Self { config, buffer: Arc::new(Mutex::new(Vec::new())), - #[cfg(feature = "webhook")] - client, + _marker: std::marker::PhantomData, } } /// Send insights to the webhook. #[cfg(feature = "webhook")] fn send_insights(&self, insights: &[InsightData]) -> ExportResult<()> { - use std::sync::mpsc; - - // Use a channel to get the result from the async context - let (tx, rx) = mpsc::channel(); - let client = self.client.clone(); - let url = self.config.url.clone(); - let auth = self.config.auth_header.clone(); - let insights = insights.to_vec(); - - // Spawn a blocking task to run the async request - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let result = rt.block_on(async { - let mut request = client.post(&url).json(&insights); - - if let Some(auth_value) = auth { - request = request.header("Authorization", auth_value); - } - - match request.send().await { - Ok(response) => { - if response.status().is_success() { - Ok(()) - } else { - Err(ExportError::Unavailable(format!( - "Webhook returned status {}", - response.status() - ))) - } - } - Err(e) => Err(ExportError::Unavailable(e.to_string())), - } - }); - - let _ = tx.send(result); - }); - - // Wait for the result with timeout - rx.recv_timeout(std::time::Duration::from_secs(self.config.timeout_secs + 1)) - .map_err(|_| ExportError::Unavailable("Webhook request timed out".to_string()))? + match self.sender.try_send(insights.to_vec()) { + Ok(_) => Ok(()), + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + tracing::warn!("Webhook exporter channel full, dropping batch"); + Ok(()) + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + Err(ExportError::Unavailable("Webhook worker channel closed".to_string())) + } + } } /// Send insights to the webhook (stub when webhook feature is disabled). diff --git a/crates/rustapi-extras/tests/webhook_performance.rs b/crates/rustapi-extras/tests/webhook_performance.rs new file mode 100644 index 0000000..56f27eb --- /dev/null +++ b/crates/rustapi-extras/tests/webhook_performance.rs @@ -0,0 +1,50 @@ +use rustapi_extras::insight::export::{WebhookConfig, WebhookExporter, InsightExporter}; +use rustapi_extras::insight::InsightData; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpListener; + +#[tokio::test] +async fn test_webhook_blocking_behavior() { + // Start a dummy server that sleeps + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + loop { + let (mut socket, _) = listener.accept().await.unwrap(); + tokio::spawn(async move { + let mut buf = [0; 1024]; + let _ = socket.read(&mut buf).await; + // Simulate slow processing + tokio::time::sleep(Duration::from_millis(500)).await; + let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; + let _ = socket.write_all(response.as_bytes()).await; + }); + } + }); + + // Configure exporter with batch size 1 to trigger send immediately + let config = WebhookConfig::new(format!("http://{}", addr)) + .batch_size(1) + .timeout(2); + + let exporter = WebhookExporter::new(config); + let insight = InsightData::new("test", "GET", "/"); + + let start = Instant::now(); + // This should trigger a send because batch_size is 1. + // In current implementation, it blocks waiting for response. + match exporter.export(&insight) { + Ok(_) => println!("Export successful"), + Err(e) => println!("Export failed: {:?}", e), + } + let duration = start.elapsed(); + + println!("Export took: {:?}", duration); + + // If it blocks, it should take at least 500ms (due to 500ms server sleep or timeout) + if duration.as_millis() >= 400 { + panic!("Performance regression: Export blocked for {:?}. Expected non-blocking behavior.", duration); + } +} From 55912836f15ba14e6f0a3535034f77d60a1a181a Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 00:39:18 +0000 Subject: [PATCH 2/2] Fix CI lint and test failures for webhook exporter - Applied `cargo fmt` to `crates/rustapi-extras/src/insight/export.rs` and `crates/rustapi-extras/tests/webhook_performance.rs`. - Added `#![cfg(feature = "webhook")]` to `crates/rustapi-extras/tests/webhook_performance.rs` to prevent compilation errors when the `webhook` feature (and thus the `insight` module) is disabled. Co-authored-by: Tuntii <121901995+Tuntii@users.noreply.github.com> --- crates/rustapi-extras/src/insight/export.rs | 6 +++--- crates/rustapi-extras/tests/webhook_performance.rs | 9 +++++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/crates/rustapi-extras/src/insight/export.rs b/crates/rustapi-extras/src/insight/export.rs index 4574c25..33e797d 100644 --- a/crates/rustapi-extras/src/insight/export.rs +++ b/crates/rustapi-extras/src/insight/export.rs @@ -301,9 +301,9 @@ impl WebhookExporter { tracing::warn!("Webhook exporter channel full, dropping batch"); Ok(()) } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - Err(ExportError::Unavailable("Webhook worker channel closed".to_string())) - } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => Err( + ExportError::Unavailable("Webhook worker channel closed".to_string()), + ), } } diff --git a/crates/rustapi-extras/tests/webhook_performance.rs b/crates/rustapi-extras/tests/webhook_performance.rs index 56f27eb..ab510db 100644 --- a/crates/rustapi-extras/tests/webhook_performance.rs +++ b/crates/rustapi-extras/tests/webhook_performance.rs @@ -1,4 +1,6 @@ -use rustapi_extras::insight::export::{WebhookConfig, WebhookExporter, InsightExporter}; +#![cfg(feature = "webhook")] + +use rustapi_extras::insight::export::{InsightExporter, WebhookConfig, WebhookExporter}; use rustapi_extras::insight::InsightData; use std::time::{Duration, Instant}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -45,6 +47,9 @@ async fn test_webhook_blocking_behavior() { // If it blocks, it should take at least 500ms (due to 500ms server sleep or timeout) if duration.as_millis() >= 400 { - panic!("Performance regression: Export blocked for {:?}. Expected non-blocking behavior.", duration); + panic!( + "Performance regression: Export blocked for {:?}. Expected non-blocking behavior.", + duration + ); } }