From f419e224201e05b941007e18ca7c02f261e74d6d Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 01:47:29 +0000 Subject: [PATCH 1/2] Optimize InsightStore with async RwLock and O(1) storage - Replace blocking `std::sync::RwLock` with `tokio::sync::RwLock` - Make `InsightStore` trait async using `async-trait` - Optimize `InMemoryInsightStore` to avoid O(N) index rebuilding on eviction - Update `InsightLayer` to use async store methods Co-authored-by: Tuntii <121901995+Tuntii@users.noreply.github.com> --- Cargo.lock | 1 + crates/rustapi-extras/Cargo.toml | 1 + crates/rustapi-extras/src/insight/layer.rs | 16 +- crates/rustapi-extras/src/insight/store.rs | 219 ++++++++++----------- 4 files changed, 110 insertions(+), 127 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d2e5d32..2a00251 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3084,6 +3084,7 @@ dependencies = [ name = "rustapi-extras" version = "0.1.233" dependencies = [ + "async-trait", "base64 0.22.1", "bytes", "cookie", diff --git a/crates/rustapi-extras/Cargo.toml b/crates/rustapi-extras/Cargo.toml index 04a3138..1374564 100644 --- a/crates/rustapi-extras/Cargo.toml +++ b/crates/rustapi-extras/Cargo.toml @@ -17,6 +17,7 @@ rustapi-openapi = { workspace = true } # Async tokio = { workspace = true } futures-util = { workspace = true } +async-trait = { workspace = true } # HTTP http = { workspace = true } diff --git a/crates/rustapi-extras/src/insight/layer.rs b/crates/rustapi-extras/src/insight/layer.rs index 5eeb7d8..e726797 100644 --- a/crates/rustapi-extras/src/insight/layer.rs +++ b/crates/rustapi-extras/src/insight/layer.rs @@ -185,12 +185,12 @@ impl InsightLayer { } /// Create dashboard response with recent insights. - fn create_dashboard_response(store: &dyn InsightStore, limit: usize) -> Response { - let insights = store.get_recent(limit); + async fn create_dashboard_response(store: &dyn InsightStore, limit: usize) -> Response { + let insights = store.get_recent(limit).await; let body = json!({ "insights": insights, "count": insights.len(), - "total": store.count() + "total": store.count().await }); let body_bytes = serde_json::to_vec(&body).unwrap_or_default(); @@ -202,8 +202,8 @@ impl InsightLayer { } /// Create stats response. - fn create_stats_response(store: &dyn InsightStore) -> Response { - let stats = store.get_stats(); + async fn create_stats_response(store: &dyn InsightStore) -> Response { + let stats = store.get_stats().await; let body_bytes = serde_json::to_vec(&stats).unwrap_or_default(); http::Response::builder() .status(StatusCode::OK) @@ -240,13 +240,13 @@ impl MiddlewareLayer for InsightLayer { .get("limit") .and_then(|v| v.parse().ok()) .unwrap_or(100); - return InsightLayer::create_dashboard_response(store.as_ref(), limit); + return InsightLayer::create_dashboard_response(store.as_ref(), limit).await; } } if let Some(ref stats_path) = config.stats_path { if path == *stats_path && method == "GET" { - return InsightLayer::create_stats_response(store.as_ref()); + return InsightLayer::create_stats_response(store.as_ref()).await; } } @@ -359,7 +359,7 @@ impl MiddlewareLayer for InsightLayer { } // Store the insight - store.store(insight); + store.store(insight).await; // Reconstruct response http::Response::from_parts(resp_parts, ResponseBody::Full(Full::new(resp_body_bytes))) diff --git a/crates/rustapi-extras/src/insight/store.rs b/crates/rustapi-extras/src/insight/store.rs index 47c3758..db27d29 100644 --- a/crates/rustapi-extras/src/insight/store.rs +++ b/crates/rustapi-extras/src/insight/store.rs @@ -4,37 +4,40 @@ //! for storing and retrieving insight data. use super::data::{InsightData, InsightStats}; +use async_trait::async_trait; use dashmap::DashMap; use std::collections::VecDeque; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; +use tokio::sync::RwLock; /// Trait for storing and retrieving insight data. /// /// Implement this trait to create custom storage backends (e.g., database, Redis). +#[async_trait] pub trait InsightStore: Send + Sync + 'static { /// Store a new insight entry. - fn store(&self, insight: InsightData); + async fn store(&self, insight: InsightData); /// Get recent insights (up to `limit` entries). - fn get_recent(&self, limit: usize) -> Vec; + async fn get_recent(&self, limit: usize) -> Vec; /// Get all stored insights. - fn get_all(&self) -> Vec; + async fn get_all(&self) -> Vec; /// Get insights filtered by path pattern. - fn get_by_path(&self, path_pattern: &str) -> Vec; + async fn get_by_path(&self, path_pattern: &str) -> Vec; /// Get insights filtered by status code range. - fn get_by_status(&self, min_status: u16, max_status: u16) -> Vec; + async fn get_by_status(&self, min_status: u16, max_status: u16) -> Vec; /// Get aggregated statistics. - fn get_stats(&self) -> InsightStats; + async fn get_stats(&self) -> InsightStats; /// Clear all stored insights. - fn clear(&self); + async fn clear(&self); /// Get the current count of stored insights. - fn count(&self) -> usize; + async fn count(&self) -> usize; /// Clone this store into a boxed trait object. fn clone_store(&self) -> Box; @@ -54,12 +57,12 @@ pub trait InsightStore: Send + Sync + 'static { /// ``` #[derive(Clone)] pub struct InMemoryInsightStore { - /// Ring buffer holding insights - buffer: Arc>>, + /// Ring buffer holding insights (in order) + buffer: Arc>>>, /// Maximum capacity of the buffer capacity: usize, /// Index for quick lookup by request_id - index: Arc>, + index: Arc>>, } impl InMemoryInsightStore { @@ -88,9 +91,8 @@ impl InMemoryInsightStore { /// Get an insight by request ID. pub fn get_by_request_id(&self, request_id: &str) -> Option { - let idx = self.index.get(request_id)?; - let buffer = self.buffer.read().ok()?; - buffer.get(*idx).cloned() + // Look up directly in index (O(1)) + self.index.get(request_id).map(|r| r.as_ref().clone()) } } @@ -100,89 +102,67 @@ impl Default for InMemoryInsightStore { } } +#[async_trait] impl InsightStore for InMemoryInsightStore { - fn store(&self, insight: InsightData) { - let mut buffer = match self.buffer.write() { - Ok(b) => b, - Err(_) => return, // Poisoned lock, skip storage - }; + async fn store(&self, insight: InsightData) { + let insight_arc = Arc::new(insight); + let request_id = insight_arc.request_id.clone(); + + let mut buffer = self.buffer.write().await; // If at capacity, remove oldest entry if buffer.len() >= self.capacity { if let Some(old) = buffer.pop_front() { self.index.remove(&old.request_id); } - // Rebuild indices after removal (indices shift) - self.index.clear(); - for (i, item) in buffer.iter().enumerate() { - self.index.insert(item.request_id.clone(), i); - } } // Add new insight - let idx = buffer.len(); - self.index.insert(insight.request_id.clone(), idx); - buffer.push_back(insight); + buffer.push_back(insight_arc.clone()); + self.index.insert(request_id, insight_arc); } - fn get_recent(&self, limit: usize) -> Vec { - let buffer = match self.buffer.read() { - Ok(b) => b, - Err(_) => return Vec::new(), - }; - - buffer.iter().rev().take(limit).cloned().collect() + async fn get_recent(&self, limit: usize) -> Vec { + let buffer = self.buffer.read().await; + buffer.iter().rev().take(limit).map(|i| i.as_ref().clone()).collect() } - fn get_all(&self) -> Vec { - let buffer = match self.buffer.read() { - Ok(b) => b, - Err(_) => return Vec::new(), - }; - - buffer.iter().cloned().collect() + async fn get_all(&self) -> Vec { + let buffer = self.buffer.read().await; + buffer.iter().map(|i| i.as_ref().clone()).collect() } - fn get_by_path(&self, path_pattern: &str) -> Vec { - let buffer = match self.buffer.read() { - Ok(b) => b, - Err(_) => return Vec::new(), - }; - + async fn get_by_path(&self, path_pattern: &str) -> Vec { + let buffer = self.buffer.read().await; buffer .iter() .filter(|i| i.path.contains(path_pattern)) - .cloned() + .map(|i| i.as_ref().clone()) .collect() } - fn get_by_status(&self, min_status: u16, max_status: u16) -> Vec { - let buffer = match self.buffer.read() { - Ok(b) => b, - Err(_) => return Vec::new(), - }; - + async fn get_by_status(&self, min_status: u16, max_status: u16) -> Vec { + let buffer = self.buffer.read().await; buffer .iter() .filter(|i| i.status >= min_status && i.status <= max_status) - .cloned() + .map(|i| i.as_ref().clone()) .collect() } - fn get_stats(&self) -> InsightStats { - let all = self.get_all(); + async fn get_stats(&self) -> InsightStats { + let all = self.get_all().await; InsightStats::from_insights(&all) } - fn clear(&self) { - if let Ok(mut buffer) = self.buffer.write() { - buffer.clear(); - } + async fn clear(&self) { + let mut buffer = self.buffer.write().await; + buffer.clear(); self.index.clear(); } - fn count(&self) -> usize { - self.buffer.read().map(|b| b.len()).unwrap_or(0) + async fn count(&self) -> usize { + self.buffer.read().await.len() } fn clone_store(&self) -> Box { @@ -196,34 +176,35 @@ impl InsightStore for InMemoryInsightStore { #[derive(Clone, Copy, Default)] pub struct NullInsightStore; +#[async_trait] impl InsightStore for NullInsightStore { - fn store(&self, _insight: InsightData) { + async fn store(&self, _insight: InsightData) { // Discard } - fn get_recent(&self, _limit: usize) -> Vec { + async fn get_recent(&self, _limit: usize) -> Vec { Vec::new() } - fn get_all(&self) -> Vec { + async fn get_all(&self) -> Vec { Vec::new() } - fn get_by_path(&self, _path_pattern: &str) -> Vec { + async fn get_by_path(&self, _path_pattern: &str) -> Vec { Vec::new() } - fn get_by_status(&self, _min_status: u16, _max_status: u16) -> Vec { + async fn get_by_status(&self, _min_status: u16, _max_status: u16) -> Vec { Vec::new() } - fn get_stats(&self) -> InsightStats { + async fn get_stats(&self) -> InsightStats { InsightStats::default() } - fn clear(&self) {} + async fn clear(&self) {} - fn count(&self) -> usize { + async fn count(&self) -> usize { 0 } @@ -243,34 +224,34 @@ mod tests { .with_duration(Duration::from_millis(10)) } - #[test] - fn test_in_memory_store_basic() { + #[tokio::test] + async fn test_in_memory_store_basic() { let store = InMemoryInsightStore::new(10); - store.store(create_test_insight("1", "/users", 200)); - store.store(create_test_insight("2", "/items", 201)); + store.store(create_test_insight("1", "/users", 200)).await; + store.store(create_test_insight("2", "/items", 201)).await; - assert_eq!(store.count(), 2); + assert_eq!(store.count().await, 2); - let recent = store.get_recent(10); + let recent = store.get_recent(10).await; assert_eq!(recent.len(), 2); // Most recent first assert_eq!(recent[0].request_id, "2"); assert_eq!(recent[1].request_id, "1"); } - #[test] - fn test_ring_buffer_eviction() { + #[tokio::test] + async fn test_ring_buffer_eviction() { let store = InMemoryInsightStore::new(3); - store.store(create_test_insight("1", "/a", 200)); - store.store(create_test_insight("2", "/b", 200)); - store.store(create_test_insight("3", "/c", 200)); - store.store(create_test_insight("4", "/d", 200)); // Should evict "1" + store.store(create_test_insight("1", "/a", 200)).await; + store.store(create_test_insight("2", "/b", 200)).await; + store.store(create_test_insight("3", "/c", 200)).await; + store.store(create_test_insight("4", "/d", 200)).await; // Should evict "1" - assert_eq!(store.count(), 3); + assert_eq!(store.count().await, 3); - let all = store.get_all(); + let all = store.get_all().await; let ids: Vec<_> = all.iter().map(|i| i.request_id.as_str()).collect(); assert!(!ids.contains(&"1")); assert!(ids.contains(&"2")); @@ -278,71 +259,71 @@ mod tests { assert!(ids.contains(&"4")); } - #[test] - fn test_filter_by_path() { + #[tokio::test] + async fn test_filter_by_path() { let store = InMemoryInsightStore::new(10); - store.store(create_test_insight("1", "/users/123", 200)); - store.store(create_test_insight("2", "/items/456", 200)); - store.store(create_test_insight("3", "/users/789", 200)); + store.store(create_test_insight("1", "/users/123", 200)).await; + store.store(create_test_insight("2", "/items/456", 200)).await; + store.store(create_test_insight("3", "/users/789", 200)).await; - let user_insights = store.get_by_path("/users"); + let user_insights = store.get_by_path("/users").await; assert_eq!(user_insights.len(), 2); } - #[test] - fn test_filter_by_status() { + #[tokio::test] + async fn test_filter_by_status() { let store = InMemoryInsightStore::new(10); - store.store(create_test_insight("1", "/a", 200)); - store.store(create_test_insight("2", "/b", 404)); - store.store(create_test_insight("3", "/c", 500)); - store.store(create_test_insight("4", "/d", 201)); + store.store(create_test_insight("1", "/a", 200)).await; + store.store(create_test_insight("2", "/b", 404)).await; + store.store(create_test_insight("3", "/c", 500)).await; + store.store(create_test_insight("4", "/d", 201)).await; - let errors = store.get_by_status(400, 599); + let errors = store.get_by_status(400, 599).await; assert_eq!(errors.len(), 2); - let success = store.get_by_status(200, 299); + let success = store.get_by_status(200, 299).await; assert_eq!(success.len(), 2); } - #[test] - fn test_clear() { + #[tokio::test] + async fn test_clear() { let store = InMemoryInsightStore::new(10); - store.store(create_test_insight("1", "/a", 200)); - store.store(create_test_insight("2", "/b", 200)); + store.store(create_test_insight("1", "/a", 200)).await; + store.store(create_test_insight("2", "/b", 200)).await; - assert_eq!(store.count(), 2); + assert_eq!(store.count().await, 2); - store.clear(); + store.clear().await; - assert_eq!(store.count(), 0); - assert!(store.get_all().is_empty()); + assert_eq!(store.count().await, 0); + assert!(store.get_all().await.is_empty()); } - #[test] - fn test_stats() { + #[tokio::test] + async fn test_stats() { let store = InMemoryInsightStore::new(10); - store.store(create_test_insight("1", "/users", 200)); - store.store(create_test_insight("2", "/users", 201)); - store.store(create_test_insight("3", "/items", 404)); + store.store(create_test_insight("1", "/users", 200)).await; + store.store(create_test_insight("2", "/users", 201)).await; + store.store(create_test_insight("3", "/items", 404)).await; - let stats = store.get_stats(); + let stats = store.get_stats().await; assert_eq!(stats.total_requests, 3); assert_eq!(stats.successful_requests, 2); assert_eq!(stats.client_errors, 1); } - #[test] - fn test_null_store() { + #[tokio::test] + async fn test_null_store() { let store = NullInsightStore; - store.store(create_test_insight("1", "/a", 200)); + store.store(create_test_insight("1", "/a", 200)).await; - assert_eq!(store.count(), 0); - assert!(store.get_all().is_empty()); + assert_eq!(store.count().await, 0); + assert!(store.get_all().await.is_empty()); } } From c348dd2d6993a158e5da800fee9289d3675ab170 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 31 Jan 2026 01:55:43 +0000 Subject: [PATCH 2/2] Fix formatting in InsightStore Run `cargo fmt` to fix CI failures in `crates/rustapi-extras/src/insight/store.rs` introduced by previous commit. Co-authored-by: Tuntii <121901995+Tuntii@users.noreply.github.com> --- crates/rustapi-extras/src/insight/store.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/crates/rustapi-extras/src/insight/store.rs b/crates/rustapi-extras/src/insight/store.rs index db27d29..1e18d7d 100644 --- a/crates/rustapi-extras/src/insight/store.rs +++ b/crates/rustapi-extras/src/insight/store.rs @@ -124,7 +124,12 @@ impl InsightStore for InMemoryInsightStore { async fn get_recent(&self, limit: usize) -> Vec { let buffer = self.buffer.read().await; - buffer.iter().rev().take(limit).map(|i| i.as_ref().clone()).collect() + buffer + .iter() + .rev() + .take(limit) + .map(|i| i.as_ref().clone()) + .collect() } async fn get_all(&self) -> Vec { @@ -263,9 +268,15 @@ mod tests { async fn test_filter_by_path() { let store = InMemoryInsightStore::new(10); - store.store(create_test_insight("1", "/users/123", 200)).await; - store.store(create_test_insight("2", "/items/456", 200)).await; - store.store(create_test_insight("3", "/users/789", 200)).await; + store + .store(create_test_insight("1", "/users/123", 200)) + .await; + store + .store(create_test_insight("2", "/items/456", 200)) + .await; + store + .store(create_test_insight("3", "/users/789", 200)) + .await; let user_insights = store.get_by_path("/users").await; assert_eq!(user_insights.len(), 2);