diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 8833155918..ae6ae7ac50 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -387,17 +387,23 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) { .await .expect("failed to initialize price estimator factory"); + let initial_prices = db_write.fetch_latest_prices().await.unwrap(); let native_price_estimator = price_estimator_factory .native_price_estimator( args.native_price_estimators.as_slice(), args.native_price_estimation_results_required, eth.contracts().weth().clone(), + initial_prices, ) .instrument(info_span!("native_price_estimator")) .await .unwrap(); - let prices = db_write.fetch_latest_prices().await.unwrap(); - native_price_estimator.initialize_cache(prices); + + let api_native_price_estimator = Arc::new( + shared::price_estimation::native_price_cache::QuoteCompetitionEstimator::new( + native_price_estimator.clone(), + ), + ); let price_estimator = price_estimator_factory .price_estimator( @@ -531,7 +537,7 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) { let (api_shutdown_sender, api_shutdown_receiver) = tokio::sync::oneshot::channel(); let api_task = tokio::spawn(infra::api::serve( args.api_address, - native_price_estimator.clone(), + api_native_price_estimator, args.price_estimation.quote_timeout, api_shutdown_receiver, )); diff --git a/crates/autopilot/src/solvable_orders.rs b/crates/autopilot/src/solvable_orders.rs index 458aab5523..2000c98949 100644 --- a/crates/autopilot/src/solvable_orders.rs +++ b/crates/autopilot/src/solvable_orders.rs @@ -913,9 +913,11 @@ mod tests { HEALTHY_PRICE_ESTIMATION_TIME, PriceEstimationError, native::MockNativePriceEstimating, + native_price_cache::{CacheStorage, RequiresUpdatingPrices}, }, signature_validator::{MockSignatureValidating, SignatureValidationError}, }, + std::sync::Arc, }; #[tokio::test] @@ -956,14 +958,14 @@ mod tests { .returning(|_, _| async { Ok(0.25) }.boxed()); let native_price_estimator = CachingNativePriceEstimator::new( - Box::new(native_price_estimator), - Duration::from_secs(10), - Duration::MAX, - None, - Default::default(), + Arc::new(native_price_estimator), + CacheStorage::new_without_maintenance( + Duration::from_secs(10), + Default::default(), + Default::default(), + ), 3, - Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + RequiresUpdatingPrices::Yes, ); let metrics = Metrics::instance(observe::metrics::get_storage_registry()).unwrap(); @@ -1046,15 +1048,28 @@ mod tests { .withf(move |token, _| *token == token5) .returning(|_, _| async { Ok(5.) }.boxed()); - let native_price_estimator = CachingNativePriceEstimator::new( - Box::new(native_price_estimator), + let maintenance_estimator: Arc< + dyn shared::price_estimation::native::NativePriceEstimating, + > = Arc::new(native_price_estimator); + let cache = CacheStorage::new_with_maintenance( Duration::from_secs(10), - Duration::MAX, - None, Default::default(), - 1, Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + shared::price_estimation::native_price_cache::MaintenanceConfig { + estimator: maintenance_estimator.clone(), + // Short interval to trigger background fetch quickly + update_interval: Duration::from_millis(1), + update_size: Default::default(), + prefetch_time: Default::default(), + concurrent_requests: 1, + quote_timeout: HEALTHY_PRICE_ESTIMATION_TIME, + }, + ); + let native_price_estimator = CachingNativePriceEstimator::new( + maintenance_estimator, + cache, + 1, + RequiresUpdatingPrices::Yes, ); let metrics = Metrics::instance(observe::metrics::get_storage_registry()).unwrap(); @@ -1144,15 +1159,15 @@ mod tests { .returning(|_, _| async { Ok(50.) }.boxed()); let native_price_estimator = CachingNativePriceEstimator::new( - Box::new(native_price_estimator), - Duration::from_secs(10), - Duration::MAX, - None, - Default::default(), - 3, + Arc::new(native_price_estimator), // Set to use native price approximations for the following tokens - HashMap::from([(token1, token_approx1), (token2, token_approx2)]), - HEALTHY_PRICE_ESTIMATION_TIME, + CacheStorage::new_without_maintenance( + Duration::from_secs(10), + Default::default(), + HashMap::from([(token1, token_approx1), (token2, token_approx2)]), + ), + 3, + RequiresUpdatingPrices::Yes, ); let metrics = Metrics::instance(observe::metrics::get_storage_registry()).unwrap(); diff --git a/crates/e2e/src/setup/proxy.rs b/crates/e2e/src/setup/proxy.rs index 9173213c26..0d018f3138 100644 --- a/crates/e2e/src/setup/proxy.rs +++ b/crates/e2e/src/setup/proxy.rs @@ -51,6 +51,7 @@ impl ProxyState { if let Some(current) = backends.pop_front() { backends.push_back(current); } + tracing::info!(backends = ?backends.iter().map(Url::as_str).collect::>(), "rotated backends"); } /// Returns the total number of backends configured. @@ -98,7 +99,7 @@ async fn serve(listen_addr: SocketAddr, backends: Vec, state: ProxyState) { let app = Router::new().fallback(proxy_handler); - tracing::info!(?listen_addr, ?backends, "starting reverse proxy"); + tracing::info!(%listen_addr, backends = ?backends.iter().map(Url::as_str).collect::>(), "starting reverse proxy"); axum::Server::bind(&listen_addr) .serve(app.into_make_service()) .await @@ -132,7 +133,7 @@ async fn handle_request( match try_backend(&client, &parts, body_bytes.to_vec(), &backend).await { Ok(response) => return response.into_response(), Err(err) => { - tracing::warn!(?err, ?backend, attempt, "backend failed, rotating to next"); + tracing::warn!(?err, %backend, attempt, "backend failed, rotating to next"); state.rotate_backends().await; } } diff --git a/crates/e2e/tests/e2e/autopilot_leader.rs b/crates/e2e/tests/e2e/autopilot_leader.rs index 7ff1e1fc7d..b26cd35572 100644 --- a/crates/e2e/tests/e2e/autopilot_leader.rs +++ b/crates/e2e/tests/e2e/autopilot_leader.rs @@ -160,12 +160,16 @@ async fn dual_autopilot_only_leader_produces_auctions(web3: Web3) { // Stop autopilot-leader, follower should take over manual_shutdown.shutdown(); - onchain.mint_block().await; assert!( tokio::time::timeout(Duration::from_secs(15), autopilot_leader) .await .is_ok() ); + // Ensure all the locks are released and follower has time to step up + tokio::time::sleep(Duration::from_secs(2)).await; + onchain.mint_block().await; + // Ensure the follower has stepped up as leader + tokio::time::sleep(Duration::from_secs(2)).await; // Run 10 txs, autopilot-backup is in charge // - only test_solver2 should participate and settle diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index c17bcc5bfd..d11a0ef9dc 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -315,16 +315,16 @@ pub async fn run(args: Arguments) { .await .expect("failed to initialize price estimator factory"); + let initial_prices = postgres_write.fetch_latest_prices().await.unwrap(); let native_price_estimator = price_estimator_factory .native_price_estimator( args.native_price_estimators.as_slice(), args.fast_price_estimation_results_required, native_token.clone(), + initial_prices, ) .await .unwrap(); - let prices = postgres_write.fetch_latest_prices().await.unwrap(); - native_price_estimator.initialize_cache(prices); let price_estimator = price_estimator_factory .price_estimator( diff --git a/crates/shared/src/price_estimation/factory.rs b/crates/shared/src/price_estimation/factory.rs index 305ecc11e8..129dfa3cfa 100644 --- a/crates/shared/src/price_estimation/factory.rs +++ b/crates/shared/src/price_estimation/factory.rs @@ -7,7 +7,7 @@ use { external::ExternalPriceEstimator, instrumented::InstrumentedPriceEstimator, native::{self, NativePriceEstimator}, - native_price_cache::CachingNativePriceEstimator, + native_price_cache::{CacheStorage, CachingNativePriceEstimator, MaintenanceConfig}, sanitized::SanitizedPriceEstimator, trade_verifier::{TradeVerifier, TradeVerifying}, }, @@ -24,12 +24,14 @@ use { buffered::{self, BufferedRequest, NativePriceBatchFetching}, competition::PriceRanking, native::NativePriceEstimating, + native_price_cache::RequiresUpdatingPrices, }, tenderly_api::TenderlyCodeSimulator, token_info::TokenInfoFetching, }, alloy::primitives::Address, anyhow::{Context as _, Result}, + bigdecimal::BigDecimal, contracts::alloy::WETH9, ethrpc::{alloy::ProviderLabelingExt, block_stream::CurrentBlockWatcher}, number::nonzero::NonZeroU256, @@ -360,45 +362,98 @@ impl<'a> PriceEstimatorFactory<'a> { )) } + /// Creates a native price estimator with a shared cache and background + /// maintenance task. + /// + /// The estimator is configured with `RequiresUpdatingPrices::Yes`, meaning + /// entries are actively maintained by the background task. For quote + /// competition use, wrap the returned estimator with + /// `QuoteCompetitionEstimator` to mark prices with `KeepPriceUpdated::No` + /// (cached but not actively maintained). + /// + /// The `initial_prices` are used to seed the cache before the estimator + /// starts. pub async fn native_price_estimator( &mut self, - native: &[Vec], + estimators: &[Vec], results_required: NonZeroUsize, weth: WETH9::Instance, + initial_prices: HashMap, ) -> Result> { anyhow::ensure!( self.args.native_price_cache_max_age > self.args.native_price_prefetch_time, "price cache prefetch time needs to be less than price cache max age" ); - let mut estimators = Vec::with_capacity(native.len()); - for stage in native.iter() { + // Create non-caching estimator + let estimator: Arc = Arc::new( + self.create_competition_native_estimator(estimators, results_required, &weth) + .await?, + ); + + let approximation_tokens = self + .args + .native_price_approximation_tokens + .iter() + .copied() + .collect(); + + // Create cache with background maintenance, which only refreshes + // entries marked with `KeepPriceUpdated::Yes` + let cache = CacheStorage::new_with_maintenance( + self.args.native_price_cache_max_age, + initial_prices, + approximation_tokens, + MaintenanceConfig { + estimator: estimator.clone(), + update_interval: self.args.native_price_cache_refresh, + update_size: self.args.native_price_cache_max_update_size, + prefetch_time: self.args.native_price_prefetch_time, + concurrent_requests: self.args.native_price_cache_concurrent_requests, + quote_timeout: self.args.quote_timeout, + }, + ); + + // Wrap with caching layer + Ok(self.wrap_with_cache(estimator, cache)) + } + + /// Wraps a native price estimator with caching functionality. + /// Uses `RequiresUpdatingPrices::Yes` so entries are actively maintained. + fn wrap_with_cache( + &self, + estimator: Arc, + cache: Arc, + ) -> Arc { + Arc::new(CachingNativePriceEstimator::new( + estimator, + cache, + self.args.native_price_cache_concurrent_requests, + RequiresUpdatingPrices::Yes, + )) + } + + /// Helper to create a CompetitionEstimator for native price estimation. + async fn create_competition_native_estimator( + &mut self, + sources: &[Vec], + results_required: NonZeroUsize, + weth: &WETH9::Instance, + ) -> Result>> { + let mut estimators = Vec::with_capacity(sources.len()); + for stage in sources.iter() { let mut stages = Vec::with_capacity(stage.len()); for source in stage { - stages.push(self.create_native_estimator(source, &weth).await?); + stages.push(self.create_native_estimator(source, weth).await?); } estimators.push(stages); } - let competition_estimator = + Ok( CompetitionEstimator::new(estimators, PriceRanking::MaxOutAmount) .with_verification(self.args.quote_verification) - .with_early_return(results_required); - let native_estimator = Arc::new(CachingNativePriceEstimator::new( - Box::new(competition_estimator), - self.args.native_price_cache_max_age, - self.args.native_price_cache_refresh, - Some(self.args.native_price_cache_max_update_size), - self.args.native_price_prefetch_time, - self.args.native_price_cache_concurrent_requests, - self.args - .native_price_approximation_tokens - .iter() - .copied() - .collect(), - self.args.quote_timeout, - )); - Ok(native_estimator) + .with_early_return(results_required), + ) } } diff --git a/crates/shared/src/price_estimation/native_price_cache.rs b/crates/shared/src/price_estimation/native_price_cache.rs index 5e95f7e45f..13c25d26b4 100644 --- a/crates/shared/src/price_estimation/native_price_cache.rs +++ b/crates/shared/src/price_estimation/native_price_cache.rs @@ -13,17 +13,39 @@ use { rand::Rng, std::{ collections::{HashMap, hash_map::Entry}, - sync::{Arc, Mutex, MutexGuard, Weak}, + sync::{Arc, Mutex, Weak}, time::{Duration, Instant}, }, tokio::time, tracing::{Instrument, instrument}, }; +/// Determines whether the background maintenance task should +/// keep the token price up to date automatically. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub enum KeepPriceUpdated { + #[default] + Yes, + No, +} + +/// Determines whether we need the price of the token to be +/// actively kept up to date by the maintenance task. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RequiresUpdatingPrices { + /// The lookup does not care whether the price of the token + /// is actively being maintained. In other words the flag + /// of the token should not be changed. + DontCare, + /// The token will be marked to require active maintenance. + Yes, +} + #[derive(prometheus_metric_storage::MetricStorage)] struct Metrics { - /// native price cache hits misses - #[metric(labels("result"))] + /// native price cache hits misses by result and caller type + /// Labels: result=hits|misses, caller=auction|quote + #[metric(labels("result", "caller"))] native_price_cache_access: IntCounterVec, /// number of items in cache native_price_cache_size: IntGauge, @@ -31,29 +53,74 @@ struct Metrics { native_price_cache_background_updates: IntCounter, /// number of items in cache that are outdated native_price_cache_outdated_entries: IntGauge, + /// number of entries actively maintained by background task + /// (KeepPriceUpdated::Yes) + native_price_cache_maintained_entries: IntGauge, + /// number of entries passively cached but not maintained + /// (KeepPriceUpdated::No, i.e. quote-only tokens) + native_price_cache_passive_entries: IntGauge, } impl Metrics { fn get() -> &'static Self { Metrics::instance(observe::metrics::get_storage_registry()).unwrap() } + + /// Resets counters on startup to ensure clean metrics for this run. + fn reset(&self) { + for caller in &["auction", "quote"] { + self.native_price_cache_access + .with_label_values(&["hits", caller]) + .reset(); + self.native_price_cache_access + .with_label_values(&["misses", caller]) + .reset(); + } + self.native_price_cache_background_updates.reset(); + } } -/// Wrapper around `Box` which caches successful price -/// estimates for some time and supports updating the cache in the background. -/// -/// The size of the underlying cache is unbounded. -/// -/// Is an Arc internally. -#[derive(Clone)] -pub struct CachingNativePriceEstimator(Arc); +/// Configuration for the background maintenance task that keeps the cache warm. +pub struct MaintenanceConfig { + /// Estimator used for maintenance updates. + /// Maintenance only refreshes entries marked with `KeepPriceUpdated::Yes`. + pub estimator: Arc, + /// How often to run the maintenance task. + pub update_interval: Duration, + /// Maximum number of prices to update per maintenance cycle. + /// 0 means unlimited. High-priority tokens are updated first, so if this + /// limit is smaller than the number of outdated high-priority tokens, + /// non-priority tokens won't be updated until the backlog clears. + pub update_size: usize, + /// How early before expiration to refresh prices. + pub prefetch_time: Duration, + /// Number of concurrent price fetch requests. + pub concurrent_requests: usize, + /// Timeout for individual price fetch requests. + pub quote_timeout: Duration, +} + +/// Type alias for backwards compatibility. +/// `NativePriceCache` is now `Arc` with methods on +/// `CacheStorage`. +pub type NativePriceCache = Arc; -struct Inner { +/// A cache storage for native price estimates. +/// +/// Can be shared between multiple `CachingNativePriceEstimator` instances, +/// allowing them to read/write from the same cache while using different +/// price estimation sources. +pub struct CacheStorage { cache: Mutex>, - high_priority: Mutex>, - estimator: Box, max_age: Duration, - concurrent_requests: usize, + /// Tokens that should be prioritized during maintenance updates. + /// + /// These tokens are updated before non-priority tokens during each + /// maintenance cycle. Note: If the number of outdated high-priority tokens + /// exceeds `MaintenanceConfig::update_size`, only that many will be updated + /// per cycle (in priority order), and non-priority tokens won't be updated + /// until the high-priority backlog clears. + high_priority: Mutex>, // TODO remove when implementing a less hacky solution /// Maps a requested token to an approximating token. If the system /// wants to get the native price for the requested token the native @@ -64,89 +131,156 @@ struct Inner { /// It's very important that the 2 tokens have the same number of decimals. /// After startup this is a read only value. approximation_tokens: HashMap, - quote_timeout: Duration, } -struct UpdateTask { - inner: Weak, - update_interval: Duration, - update_size: Option, - prefetch_time: Duration, -} +impl CacheStorage { + /// Creates a new cache with the given max age for entries and initial + /// prices. Entries are initialized with random ages to avoid expiration + /// spikes. + fn new( + max_age: Duration, + initial_prices: HashMap, + approximation_tokens: HashMap, + ) -> Arc { + let mut rng = rand::thread_rng(); + let now = std::time::Instant::now(); -type CacheEntry = Result; + let cache = initial_prices + .into_iter() + .filter_map(|(token, price)| { + // Generate random `updated_at` timestamp + // to avoid spikes of expired prices. + let percent_expired = rng.gen_range(50..=90); + let age = max_age.as_secs() * percent_expired / 100; + let updated_at = now - Duration::from_secs(age); -#[derive(Debug, Clone)] -struct CachedResult { - result: CacheEntry, - updated_at: Instant, - requested_at: Instant, - accumulative_errors_count: u32, -} + Some(( + token, + CachedResult::new( + Ok(from_normalized_price(price)?), + updated_at, + now, + Default::default(), + KeepPriceUpdated::Yes, + ), + )) + }) + .collect::>(); -/// Defines how many consecutive errors are allowed before the cache starts -/// returning the error to the user without trying to fetch the price from the -/// estimator. -const ACCUMULATIVE_ERRORS_THRESHOLD: u32 = 5; + Arc::new(Self { + cache: Mutex::new(cache), + max_age, + high_priority: Default::default(), + approximation_tokens, + }) + } -impl CachedResult { - fn new( - result: CacheEntry, - updated_at: Instant, - requested_at: Instant, - current_accumulative_errors_count: u32, - ) -> Self { - let estimator_internal_errors_count = - matches!(result, Err(PriceEstimationError::EstimatorInternal(_))) - .then_some(current_accumulative_errors_count + 1) - .unwrap_or_default(); + /// Creates a new cache with background maintenance task. + /// + /// The maintenance task periodically refreshes cached prices before they + /// expire, using the provided estimator to fetch new prices. + pub fn new_with_maintenance( + max_age: Duration, + initial_prices: HashMap, + approximation_tokens: HashMap, + config: MaintenanceConfig, + ) -> Arc { + let cache = Self::new(max_age, initial_prices, approximation_tokens); + spawn_maintenance_task(&cache, config); + cache + } - Self { - result, - updated_at, - requested_at, - accumulative_errors_count: estimator_internal_errors_count, - } + /// Creates a new cache without background maintenance. + /// + /// This is only available for testing purposes. Production code should use + /// `new_with_maintenance` instead. + #[cfg(any(test, feature = "test-util"))] + pub fn new_without_maintenance( + max_age: Duration, + initial_prices: HashMap, + approximation_tokens: HashMap, + ) -> Arc { + Self::new(max_age, initial_prices, approximation_tokens) } - /// The result is not ready if the estimator has returned an internal error - /// and consecutive errors are less than - /// `ESTIMATOR_INTERNAL_ERRORS_THRESHOLD`. - fn is_ready(&self) -> bool { - !matches!(self.result, Err(PriceEstimationError::EstimatorInternal(_))) - || self.accumulative_errors_count >= ACCUMULATIVE_ERRORS_THRESHOLD + /// Returns the max age configuration for this cache. + pub fn max_age(&self) -> Duration { + self.max_age + } + + /// Returns the approximation tokens mapping. + pub fn approximation_tokens(&self) -> &HashMap { + &self.approximation_tokens + } + + /// Returns the number of entries in the cache. + pub fn len(&self) -> usize { + self.cache.lock().unwrap().len() + } + + /// Returns true if the cache is empty. + pub fn is_empty(&self) -> bool { + self.cache.lock().unwrap().is_empty() } -} -impl Inner { - // Returns a single cached price and updates its `requested_at` field. - fn get_cached_price( + /// Returns counts of (maintained, passive) entries. + /// Maintained entries have `KeepPriceUpdated::Yes` and are actively + /// refreshed by the background task. Passive entries have + /// `KeepPriceUpdated::No` and are only cached (quote-only tokens). + fn count_by_maintenance_flag(&self) -> (usize, usize) { + let cache = self.cache.lock().unwrap(); + let maintained = cache + .values() + .filter(|c| c.update_price_continuously == KeepPriceUpdated::Yes) + .count(); + let passive = cache.len() - maintained; + (maintained, passive) + } + + /// Get a cached price with optional cache modifications. + /// Returns None if the price is not cached or is expired. + /// + /// The `require_updating_price` parameter controls whether to mark the + /// token for active price maintenance: + /// - `DontCare`: Don't modify the token's maintenance flag + /// - `Yes`: Mark the token to require active price updates + fn lookup_cached_price( + cache: &mut HashMap, token: Address, now: Instant, - cache: &mut MutexGuard>, - max_age: &Duration, - create_missing_entry: bool, + max_age: Duration, + require_updating_price: RequiresUpdatingPrices, ) -> Option { match cache.entry(token) { Entry::Occupied(mut entry) => { - let entry = entry.get_mut(); - entry.requested_at = now; - let is_recent = now.saturating_duration_since(entry.updated_at) < *max_age; - is_recent.then_some(entry.clone()) + let cached = entry.get_mut(); + cached.requested_at = now; + + if cached.update_price_continuously == KeepPriceUpdated::No + && require_updating_price == RequiresUpdatingPrices::Yes + { + tracing::trace!(?token, "marking token for needing active maintenance"); + cached.update_price_continuously = KeepPriceUpdated::Yes; + } + + let is_recent = now.saturating_duration_since(cached.updated_at) < max_age; + is_recent.then_some(cached.clone()) } Entry::Vacant(entry) => { - if create_missing_entry { - // Create an outdated cache entry so the background task keeping the cache warm - // will fetch the price during the next maintenance cycle. + if require_updating_price == RequiresUpdatingPrices::Yes { + // Create an outdated cache entry so the background task keeping the + // cache warm will fetch the price during the next maintenance cycle. // This should happen only for prices missing while building the auction. - // Otherwise malicious actors could easily cause the cache size to blow up. - let outdated_timestamp = now.checked_sub(*max_age).unwrap(); + // Otherwise malicious actors could easily cause the cache size to blow + // up. + let outdated_timestamp = now.checked_sub(max_age).unwrap_or(now); tracing::trace!(?token, "create outdated price entry"); entry.insert(CachedResult::new( Ok(0.), outdated_timestamp, now, Default::default(), + KeepPriceUpdated::Yes, )); } None @@ -154,80 +288,143 @@ impl Inner { } } - fn get_ready_to_use_cached_price( - token: Address, + /// Get cached prices that are ready to use (not in error accumulation + /// state). + /// + /// Returns a map of token -> cached result for tokens that have valid + /// cached prices. Missing tokens (not cached or expired) are not + /// included in the result. Also updates cache access metrics + /// (hits/misses). + /// + /// The `require_updating_price` parameter controls whether to mark tokens + /// for active price maintenance: + /// - `DontCare`: Don't modify the token's maintenance flag + /// - `Yes`: Mark the token to require active price updates. For existing + /// entries, upgrades `KeepPriceUpdated::No` to `Yes`. For missing tokens, + /// creates placeholder entries so the maintenance task will fetch them. + fn get_ready_to_use_cached_prices( + &self, + tokens: &[Address], now: Instant, - cache: &mut MutexGuard>, - max_age: &Duration, - create_missing_entry: bool, - ) -> Option { - Self::get_cached_price(token, now, cache, max_age, create_missing_entry) - .filter(|cached| cached.is_ready()) - } + require_updating_price: RequiresUpdatingPrices, + ) -> HashMap { + let max_age = self.max_age; + let outdated_timestamp = now.checked_sub(max_age).unwrap_or(now); + let mut results = HashMap::new(); + let mut hits = 0u64; + let mut misses = 0u64; + + let mut cache = self.cache.lock().unwrap(); + for &token in tokens { + match cache.entry(token) { + Entry::Occupied(mut entry) => { + let cached = entry.get_mut(); + cached.requested_at = now; + + if cached.update_price_continuously == KeepPriceUpdated::No + && require_updating_price == RequiresUpdatingPrices::Yes + { + tracing::trace!(?token, "marking token for needing active maintenance"); + cached.update_price_continuously = KeepPriceUpdated::Yes; + } - /// Checks cache for the given tokens one by one. If the price is already - /// cached, it gets returned. If it's not in the cache, a new price - /// estimation request gets issued. We check the cache before each - /// request because they can take a long time and some other task might - /// have fetched some requested price in the meantime. - fn estimate_prices_and_update_cache<'a>( - &'a self, - tokens: &'a [Address], - max_age: Duration, - request_timeout: Duration, - ) -> futures::stream::BoxStream<'a, (Address, NativePriceEstimateResult)> { - let estimates = tokens.iter().map(move |token| async move { - let current_accumulative_errors_count = { - // check if the price is cached by now - let now = Instant::now(); - let mut cache = self.cache.lock().unwrap(); - - match Self::get_cached_price(*token, now, &mut cache, &max_age, false) { - Some(cached) if cached.is_ready() => { - return (*token, cached.result); + let is_recent = now.saturating_duration_since(cached.updated_at) < max_age; + if is_recent && cached.is_ready() { + results.insert(token, cached.clone()); + hits += 1; + } else { + misses += 1; } - Some(cached) => cached.accumulative_errors_count, - None => Default::default(), } - }; + Entry::Vacant(entry) => { + if require_updating_price == RequiresUpdatingPrices::Yes { + // Create an outdated cache entry so the background task keeping the + // cache warm will fetch the price during the next maintenance cycle. + tracing::trace!(?token, "create outdated price entry for maintenance"); + entry.insert(CachedResult::new( + Ok(0.), + outdated_timestamp, + now, + Default::default(), + KeepPriceUpdated::Yes, + )); + } + misses += 1; + } + } + } - let token_to_fetch = *self.approximation_tokens.get(token).unwrap_or(token); + drop(cache); // Release lock before metrics update - let result = self - .estimator - .estimate_native_price(token_to_fetch, request_timeout) - .await; + let caller = match require_updating_price { + RequiresUpdatingPrices::Yes => "auction", + RequiresUpdatingPrices::DontCare => "quote", + }; + let metrics = Metrics::get(); + if hits > 0 { + metrics + .native_price_cache_access + .with_label_values(&["hits", caller]) + .inc_by(hits); + } + if misses > 0 { + metrics + .native_price_cache_access + .with_label_values(&["misses", caller]) + .inc_by(misses); + } - // update price in cache - if should_cache(&result) { - let now = Instant::now(); - let mut cache = self.cache.lock().unwrap(); + results + } - cache.insert( - *token, - CachedResult::new(result.clone(), now, now, current_accumulative_errors_count), - ); - }; + /// Insert or update a cached result. + /// + /// Note: This locks the cache. Do not call in a loop; prefer batch + /// operations instead. + fn insert(&self, token: Address, result: CachedResult) { + self.cache.lock().unwrap().insert(token, result); + } - (*token, result) - }); - futures::stream::iter(estimates) - .buffered(self.concurrent_requests) - .boxed() + /// Insert or update multiple cached results in a single lock acquisition. + fn insert_batch(&self, results: impl IntoIterator) { + let mut cache = self.cache.lock().unwrap(); + for (token, result) in results { + cache.insert(token, result); + } + } + + /// Get accumulative error counts for multiple tokens in a single lock. + /// Returns a map of token -> error count. Tokens not in cache return 0. + fn get_accumulative_errors(&self, tokens: &[Address]) -> HashMap { + let cache = self.cache.lock().unwrap(); + tokens + .iter() + .map(|&token| { + let count = cache + .get(&token) + .map(|c| c.accumulative_errors_count) + .unwrap_or_default(); + (token, count) + }) + .collect() } - /// Tokens with highest priority first. - fn sorted_tokens_to_update(&self, max_age: Duration, now: Instant) -> Vec
{ + /// Fetches all tokens that need to be updated sorted by priority. + /// High-priority tokens (from `self.high_priority`) are returned first. + fn prioritized_tokens_to_update(&self, max_age: Duration, now: Instant) -> Vec
{ + let high_priority = self.high_priority.lock().unwrap(); let mut outdated: Vec<_> = self .cache .lock() .unwrap() .iter() - .filter(|(_, cached)| now.saturating_duration_since(cached.updated_at) > max_age) + .filter(|(_, cached)| { + cached.update_price_continuously == KeepPriceUpdated::Yes + && now.saturating_duration_since(cached.updated_at) > max_age + }) .map(|(token, cached)| (*token, cached.requested_at)) .collect(); - let high_priority = self.high_priority.lock().unwrap().clone(); let index = |token: &Address| high_priority.get_index_of(token).unwrap_or(usize::MAX); outdated.sort_by_cached_key(|entry| { ( @@ -237,169 +434,374 @@ impl Inner { }); outdated.into_iter().map(|(token, _)| token).collect() } -} -fn should_cache(result: &Result) -> bool { - // We don't want to cache errors that we consider transient - match result { - Ok(_) - | Err(PriceEstimationError::NoLiquidity) - | Err(PriceEstimationError::UnsupportedToken { .. }) - | Err(PriceEstimationError::EstimatorInternal(_)) => true, - Err(PriceEstimationError::ProtocolInternal(_)) | Err(PriceEstimationError::RateLimited) => { - false + /// Replaces the set of high-priority tokens with the provided set. + /// High-priority tokens are refreshed before other tokens in the cache. + pub fn replace_high_priority(&self, tokens: IndexSet
) { + tracing::trace!(?tokens, "updated high priority tokens in cache"); + *self.high_priority.lock().unwrap() = tokens; + } + + /// Helper for estimating a price with cache check and update. + /// + /// Returns early if a valid cached price exists, otherwise calls the + /// provided fetch function and caches the result. + /// + /// This is the core logic shared by both on-demand price fetching and + /// background maintenance. + async fn estimate_with_cache_update( + &self, + token: Address, + require_updating_price: RequiresUpdatingPrices, + keep_updated: KeepPriceUpdated, + fetch: F, + ) -> NativePriceEstimateResult + where + F: FnOnce(u32) -> Fut, + Fut: std::future::Future, + { + let (current_accumulative_errors_count, existing_keep_updated) = { + let now = Instant::now(); + let mut cache = self.cache.lock().unwrap(); + + let cached = Self::lookup_cached_price( + &mut cache, + token, + now, + self.max_age, + require_updating_price, + ); + + match cached { + Some(cached) if cached.is_ready() => return cached.result, + Some(cached) => ( + cached.accumulative_errors_count, + cached.update_price_continuously, + ), + None => { + // Entry might exist but be expired - preserve its flag if so. + // If entry doesn't exist, use the caller's preference. + let existing_keep_updated = cache + .get(&token) + .map(|c| c.update_price_continuously) + .unwrap_or(KeepPriceUpdated::No); + (Default::default(), existing_keep_updated) + } + } + }; + + let result = fetch(current_accumulative_errors_count).await; + + if should_cache(&result) { + let now = Instant::now(); + // Preserve Yes if existing entry had it, otherwise use the requested + // keep_updated. This prevents downgrading auction-related tokens + // when QuoteCompetitionEstimator requests them after expiration. + let final_keep_updated = if existing_keep_updated == KeepPriceUpdated::Yes { + KeepPriceUpdated::Yes + } else { + keep_updated + }; + self.insert( + token, + CachedResult::new( + result.clone(), + now, + now, + current_accumulative_errors_count, + final_keep_updated, + ), + ); } - Err(PriceEstimationError::UnsupportedOrderType(_)) => { - tracing::error!(?result, "Unexpected error in native price cache"); - false + + result + } + + /// Estimates prices for the given tokens and updates the cache. + /// Used by the background maintenance task. All tokens are processed using + /// the provided estimator and marked with `KeepPriceUpdated::Yes`. + /// + /// This method batches lock acquisitions: one lock to get error counts, + /// then concurrent fetches without locking, then one lock to insert + /// results. + async fn estimate_prices_and_update_cache_for_maintenance( + &self, + tokens: &[Address], + estimator: &Arc, + concurrent_requests: usize, + request_timeout: Duration, + ) -> usize { + if tokens.is_empty() { + return 0; } + + let error_counts = self.get_accumulative_errors(tokens); + let futures: Vec<_> = tokens + .iter() + .map(|&token| { + let estimator = estimator.clone(); + let token_to_fetch = *self.approximation_tokens.get(&token).unwrap_or(&token); + let error_count = error_counts.get(&token).copied().unwrap_or_default(); + async move { + let result = estimator + .estimate_native_price(token_to_fetch, request_timeout) + .await; + (token, result, error_count) + } + }) + .collect(); + + let results: Vec<_> = futures::stream::iter(futures) + .buffered(concurrent_requests) + .collect() + .await; + + let now = Instant::now(); + let to_insert = results + .iter() + .filter(|(_, result, _)| should_cache(result)) + .map(|(token, result, error_count)| { + ( + *token, + CachedResult::new( + result.clone(), + now, + now, + *error_count, + KeepPriceUpdated::Yes, + ), + ) + }); + self.insert_batch(to_insert); + + results.len() } } -impl UpdateTask { +/// Spawns a background maintenance task for the given cache. +fn spawn_maintenance_task(cache: &Arc, config: MaintenanceConfig) { + Metrics::get().reset(); + let update_task = CacheMaintenanceTask::new(Arc::downgrade(cache), config) + .run() + .instrument(tracing::info_span!("native_price_cache_maintenance")); + tokio::spawn(update_task); +} + +/// Background task that keeps the cache warm by periodically refreshing prices. +/// Only refreshes entries with `KeepPriceUpdated::Yes`; entries with +/// `KeepPriceUpdated::No` are cached but not maintained. +struct CacheMaintenanceTask { + cache: Weak, + /// Estimator used for maintenance updates. + estimator: Arc, + update_interval: Duration, + update_size: usize, + prefetch_time: Duration, + concurrent_requests: usize, + quote_timeout: Duration, +} + +impl CacheMaintenanceTask { + fn new(cache: Weak, config: MaintenanceConfig) -> Self { + CacheMaintenanceTask { + cache, + estimator: config.estimator, + update_interval: config.update_interval, + update_size: config.update_size, + prefetch_time: config.prefetch_time, + concurrent_requests: config.concurrent_requests, + quote_timeout: config.quote_timeout, + } + } + /// Single run of the background updating process. - async fn single_update(&self, inner: &Inner) { + /// Only updates entries with `KeepPriceUpdated::Yes`. + async fn single_update(&self, cache: &Arc) { let metrics = Metrics::get(); metrics .native_price_cache_size - .set(i64::try_from(inner.cache.lock().unwrap().len()).unwrap_or(i64::MAX)); + .set(i64::try_from(cache.len()).unwrap_or(i64::MAX)); + + let (maintained, passive) = cache.count_by_maintenance_flag(); + metrics + .native_price_cache_maintained_entries + .set(i64::try_from(maintained).unwrap_or_default()); + metrics + .native_price_cache_passive_entries + .set(i64::try_from(passive).unwrap_or_default()); - let max_age = inner.max_age.saturating_sub(self.prefetch_time); - let mut outdated_entries = inner.sorted_tokens_to_update(max_age, Instant::now()); + let max_age = cache.max_age().saturating_sub(self.prefetch_time); + let mut outdated_entries = cache.prioritized_tokens_to_update(max_age, Instant::now()); - tracing::trace!(tokens = ?outdated_entries, first_n = ?self.update_size, "outdated prices to fetch"); + tracing::trace!(tokens = ?outdated_entries, first_n = ?self.update_size, "outdated auction prices to fetch"); metrics .native_price_cache_outdated_entries .set(i64::try_from(outdated_entries.len()).unwrap_or(i64::MAX)); - outdated_entries.truncate(self.update_size.unwrap_or(usize::MAX)); + if self.update_size > 0 { + outdated_entries.truncate(self.update_size); + } if outdated_entries.is_empty() { return; } - let mut stream = - inner.estimate_prices_and_update_cache(&outdated_entries, max_age, inner.quote_timeout); - while stream.next().await.is_some() {} + let updates_count = cache + .estimate_prices_and_update_cache_for_maintenance( + &outdated_entries, + &self.estimator, + self.concurrent_requests, + self.quote_timeout, + ) + .await as u64; metrics .native_price_cache_background_updates - .inc_by(outdated_entries.len() as u64); + .inc_by(updates_count); } - /// Runs background updates until inner is no longer alive. + /// Runs background updates until the cache is no longer alive. async fn run(self) { - while let Some(inner) = self.inner.upgrade() { + while let Some(cache) = self.cache.upgrade() { let now = Instant::now(); - self.single_update(&inner).await; + self.single_update(&cache).await; tokio::time::sleep(self.update_interval.saturating_sub(now.elapsed())).await; } } } -impl CachingNativePriceEstimator { - pub fn initialize_cache(&self, prices: HashMap) { - let mut rng = rand::thread_rng(); - let now = std::time::Instant::now(); +/// Wrapper around `Arc` which caches successful +/// price estimates for some time and supports updating the cache in the +/// background. +/// +/// The size of the underlying cache is unbounded. +/// +/// Is an Arc internally. +pub struct CachingNativePriceEstimator { + cache: NativePriceCache, + estimator: Arc, + concurrent_requests: usize, + require_updating_prices: RequiresUpdatingPrices, +} - let cache = prices - .into_iter() - .filter_map(|(token, price)| { - // Generate random `updated_at` timestamp - // to avoid spikes of expired prices. - let percent_expired = rng.gen_range(50..=90); - let age = self.0.max_age.as_secs() * percent_expired / 100; - let updated_at = now - Duration::from_secs(age); +type CacheEntry = Result; - Some(( - token, - CachedResult::new( - Ok(from_normalized_price(price)?), - updated_at, - now, - Default::default(), - ), - )) - }) - .collect::>(); +#[derive(Debug, Clone)] +struct CachedResult { + result: CacheEntry, + updated_at: Instant, + requested_at: Instant, + accumulative_errors_count: u32, + update_price_continuously: KeepPriceUpdated, +} + +/// Defines how many consecutive errors are allowed before the cache starts +/// returning the error to the user without trying to fetch the price from the +/// estimator. +const ACCUMULATIVE_ERRORS_THRESHOLD: u32 = 5; + +impl CachedResult { + fn new( + result: CacheEntry, + updated_at: Instant, + requested_at: Instant, + current_accumulative_errors_count: u32, + update_price_continuously: KeepPriceUpdated, + ) -> Self { + let estimator_internal_errors_count = + matches!(result, Err(PriceEstimationError::EstimatorInternal(_))) + .then_some(current_accumulative_errors_count + 1) + .unwrap_or_default(); + + Self { + result, + updated_at, + requested_at, + accumulative_errors_count: estimator_internal_errors_count, + update_price_continuously, + } + } + + /// The result is not ready if the estimator has returned an internal error + /// and consecutive errors are less than + /// `ESTIMATOR_INTERNAL_ERRORS_THRESHOLD`. + fn is_ready(&self) -> bool { + !matches!(self.result, Err(PriceEstimationError::EstimatorInternal(_))) + || self.accumulative_errors_count >= ACCUMULATIVE_ERRORS_THRESHOLD + } +} + +fn should_cache(result: &Result) -> bool { + // We don't want to cache errors that we consider transient + match result { + Ok(_) + | Err(PriceEstimationError::NoLiquidity) + | Err(PriceEstimationError::UnsupportedToken { .. }) + | Err(PriceEstimationError::EstimatorInternal(_)) => true, + Err(PriceEstimationError::ProtocolInternal(_)) | Err(PriceEstimationError::RateLimited) => { + false + } + Err(PriceEstimationError::UnsupportedOrderType(_)) => { + tracing::error!(?result, "Unexpected error in native price cache"); + false + } + } +} - *self.0.cache.lock().unwrap() = cache; +impl CachingNativePriceEstimator { + /// Returns a reference to the underlying shared cache. + /// This can be used to share the cache with other estimator instances. + pub fn cache(&self) -> &Arc { + &self.cache } - /// Creates new CachingNativePriceEstimator using `estimator` to calculate - /// native prices which get cached a duration of `max_age`. - /// Spawns a background task maintaining the cache once per - /// `update_interval`. Only soon to be outdated prices get updated and - /// recently used prices have a higher priority. If `update_size` is - /// `Some(n)` at most `n` prices get updated per interval. - /// If `update_size` is `None` no limit gets applied. - #[expect(clippy::too_many_arguments)] + /// Creates a new CachingNativePriceEstimator. + /// + /// The estimator will use the provided cache for lookups and will fetch + /// prices on-demand for cache misses. Background maintenance (keeping the + /// cache warm) is handled by the cache itself, not by this estimator. + /// + /// The `require_updating_prices` parameter controls whether entries fetched + /// by this estimator should be actively maintained by the background task. pub fn new( - estimator: Box, - max_age: Duration, - update_interval: Duration, - update_size: Option, - prefetch_time: Duration, + estimator: Arc, + cache: NativePriceCache, concurrent_requests: usize, - approximation_tokens: HashMap, - quote_timeout: Duration, + require_updating_prices: RequiresUpdatingPrices, ) -> Self { - let inner = Arc::new(Inner { + Self { estimator, - cache: Default::default(), - high_priority: Default::default(), - max_age, + cache, concurrent_requests, - approximation_tokens, - quote_timeout, - }); - - let update_task = UpdateTask { - inner: Arc::downgrade(&inner), - update_interval, - update_size, - prefetch_time, + require_updating_prices, } - .run() - .instrument(tracing::info_span!("caching_native_price_estimator")); - tokio::spawn(update_task); - - Self(inner) } /// Only returns prices that are currently cached. Missing prices will get /// prioritized to get fetched during the next cycles of the maintenance - /// background task. + /// background task (only if `require_updating_prices == Yes`). + /// + /// If `require_updating_prices == Yes` and a cached entry has + /// `KeepPriceUpdated::No`, it is upgraded to `KeepPriceUpdated::Yes`. fn get_cached_prices( &self, tokens: &[Address], ) -> HashMap> { let now = Instant::now(); - let mut cache = self.0.cache.lock().unwrap(); - let mut results = HashMap::default(); - for token in tokens { - let cached = Inner::get_ready_to_use_cached_price( - *token, - now, - &mut cache, - &self.0.max_age, - true, - ); - let label = if cached.is_some() { "hits" } else { "misses" }; - Metrics::get() - .native_price_cache_access - .with_label_values(&[label]) - .inc_by(1); - if let Some(result) = cached { - results.insert(*token, result.result); - } - } - results + let cached = + self.cache + .get_ready_to_use_cached_prices(tokens, now, self.require_updating_prices); + + cached + .into_iter() + .map(|(token, cached)| (token, cached.result)) + .collect() } + /// Updates the set of high-priority tokens for maintenance updates. + /// Forwards to the underlying cache. pub fn replace_high_priority(&self, tokens: IndexSet
) { - tracing::trace!(?tokens, "update high priority tokens"); - *self.0.high_priority.lock().unwrap() = tokens; + self.cache.replace_high_priority(tokens); } pub async fn estimate_native_prices_with_timeout<'a>( @@ -417,9 +819,7 @@ impl CachingNativePriceEstimator { .filter(|t| !prices.contains_key(*t)) .copied() .collect(); - let price_stream = - self.0 - .estimate_prices_and_update_cache(&uncached_tokens, self.0.max_age, timeout); + let price_stream = self.estimate_prices_and_update_cache(&uncached_tokens, timeout); let _ = time::timeout(timeout, async { let mut price_stream = price_stream; @@ -433,6 +833,53 @@ impl CachingNativePriceEstimator { // Return whatever was collected up to that point, regardless of the timeout prices } + + /// Checks cache for the given tokens one by one. If the price is already + /// cached, it gets returned. If it's not in the cache, a new price + /// estimation request gets issued. We check the cache before each + /// request because they can take a long time and some other task might + /// have fetched some requested price in the meantime. + /// + /// If `require_updating_prices == Yes` and a cached entry has + /// `KeepPriceUpdated::No`, it is upgraded to `KeepPriceUpdated::Yes`. + fn estimate_prices_and_update_cache<'a>( + &'a self, + tokens: &'a [Address], + request_timeout: Duration, + ) -> futures::stream::BoxStream<'a, (Address, NativePriceEstimateResult)> { + let keep_updated = match self.require_updating_prices { + RequiresUpdatingPrices::Yes => KeepPriceUpdated::Yes, + RequiresUpdatingPrices::DontCare => KeepPriceUpdated::No, + }; + + let estimates = tokens.iter().cloned().map(move |token| async move { + let result = self + .cache + .estimate_with_cache_update( + token, + self.require_updating_prices, + keep_updated, + |_| self.fetch_price(token, request_timeout), + ) + .await; + (token, result) + }); + futures::stream::iter(estimates) + .buffered(self.concurrent_requests) + .boxed() + } + + /// Fetches a single price (without caching). + async fn fetch_price(&self, token: Address, timeout: Duration) -> NativePriceEstimateResult { + let token_to_fetch = *self + .cache + .approximation_tokens() + .get(&token) + .unwrap_or(&token); + self.estimator + .estimate_native_price(token_to_fetch, timeout) + .await + } } impl NativePriceEstimating for CachingNativePriceEstimator { @@ -443,24 +890,16 @@ impl NativePriceEstimating for CachingNativePriceEstimator { timeout: Duration, ) -> futures::future::BoxFuture<'_, NativePriceEstimateResult> { async move { - let cached = { - let now = Instant::now(); - let mut cache = self.0.cache.lock().unwrap(); - Inner::get_ready_to_use_cached_price(token, now, &mut cache, &self.0.max_age, false) - }; - - let label = if cached.is_some() { "hits" } else { "misses" }; - Metrics::get() - .native_price_cache_access - .with_label_values(&[label]) - .inc_by(1); - - if let Some(cached) = cached { + let now = Instant::now(); + if let Some(cached) = self + .cache + .get_ready_to_use_cached_prices(&[token], now, self.require_updating_prices) + .remove(&token) + { return cached.result; } - self.0 - .estimate_prices_and_update_cache(&[token], self.0.max_age, timeout) + self.estimate_prices_and_update_cache(&[token], timeout) .next() .await .unwrap() @@ -470,6 +909,58 @@ impl NativePriceEstimating for CachingNativePriceEstimator { } } +/// Wrapper around `CachingNativePriceEstimator` that marks all entries with +/// `KeepPriceUpdated::No`. Used for the autopilot API endpoints where prices +/// should be cached but not actively maintained by the background task. +#[derive(Clone)] +pub struct QuoteCompetitionEstimator(Arc); + +impl QuoteCompetitionEstimator { + /// Creates a new `QuoteCompetitionEstimator` wrapping the given estimator. + /// + /// Prices fetched through this wrapper will be cached with + /// `KeepPriceUpdated::No`, meaning they won't be actively refreshed by the + /// background maintenance task. However, if the same token is later + /// requested with `RequiresUpdatingPrices::Yes`, the entry will be upgraded + /// to `KeepPriceUpdated::Yes` and become actively maintained. + pub fn new(estimator: Arc) -> Self { + Self(estimator) + } +} + +impl NativePriceEstimating for QuoteCompetitionEstimator { + fn estimate_native_price( + &self, + token: Address, + timeout: Duration, + ) -> futures::future::BoxFuture<'_, NativePriceEstimateResult> { + async move { + let now = Instant::now(); + // Don't upgrade or create entries, just read from cache + if let Some(cached) = self + .0 + .cache + .get_ready_to_use_cached_prices(&[token], now, RequiresUpdatingPrices::DontCare) + .remove(&token) + { + return cached.result; + } + + // Cache the result but don't mark for active maintenance + self.0 + .cache + .estimate_with_cache_update( + token, + RequiresUpdatingPrices::DontCare, + KeepPriceUpdated::No, + |_| self.0.fetch_price(token, timeout), + ) + .await + } + .boxed() + } +} + #[cfg(test)] mod tests { use { @@ -499,22 +990,22 @@ mod tests { let prices = HashMap::from_iter((0..10).map(|t| (token(t), BigDecimal::try_from(1e18).unwrap()))); - let estimator = CachingNativePriceEstimator::new( - Box::new(inner), + let cache = CacheStorage::new_without_maintenance( Duration::from_secs(MAX_AGE_SECS), + prices, Default::default(), - None, - Default::default(), + ); + let estimator = CachingNativePriceEstimator::new( + Arc::new(inner), + cache, 1, - Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + RequiresUpdatingPrices::Yes, ); - estimator.initialize_cache(prices); { // Check that `updated_at` timestamps are initialized with // reasonable values. - let cache = estimator.0.cache.lock().unwrap(); + let cache = estimator.cache.cache.lock().unwrap(); for value in cache.values() { let elapsed = value.updated_at.elapsed(); assert!(elapsed >= min_age && elapsed <= max_age); @@ -538,14 +1029,14 @@ mod tests { .returning(|_, _| async { Ok(1.0) }.boxed()); let estimator = CachingNativePriceEstimator::new( - Box::new(inner), - Duration::from_millis(30), - Default::default(), - None, - Default::default(), + Arc::new(inner), + CacheStorage::new_without_maintenance( + Duration::from_millis(30), + Default::default(), + Default::default(), + ), 1, - Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + RequiresUpdatingPrices::Yes, ); for _ in 0..10 { @@ -576,18 +1067,18 @@ mod tests { .returning(|_, _| async { Ok(200.0) }.boxed()); let estimator = CachingNativePriceEstimator::new( - Box::new(inner), - Duration::from_millis(30), - Default::default(), - None, - Default::default(), - 1, + Arc::new(inner), // set token approximations for tokens 1 and 2 - HashMap::from([ - (Address::with_last_byte(1), Address::with_last_byte(100)), - (Address::with_last_byte(2), Address::with_last_byte(200)), - ]), - HEALTHY_PRICE_ESTIMATION_TIME, + CacheStorage::new_without_maintenance( + Duration::from_millis(30), + Default::default(), + HashMap::from([ + (Address::with_last_byte(1), Address::with_last_byte(100)), + (Address::with_last_byte(2), Address::with_last_byte(200)), + ]), + ), + 1, + RequiresUpdatingPrices::Yes, ); // no approximation token used for token 0 @@ -631,14 +1122,14 @@ mod tests { .returning(|_, _| async { Err(PriceEstimationError::NoLiquidity) }.boxed()); let estimator = CachingNativePriceEstimator::new( - Box::new(inner), - Duration::from_millis(30), - Default::default(), - None, - Default::default(), + Arc::new(inner), + CacheStorage::new_without_maintenance( + Duration::from_millis(30), + Default::default(), + Default::default(), + ), 1, - Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + RequiresUpdatingPrices::Yes, ); for _ in 0..10 { @@ -702,14 +1193,14 @@ mod tests { }); let estimator = CachingNativePriceEstimator::new( - Box::new(inner), - Duration::from_millis(100), - Duration::from_millis(200), - None, - Default::default(), + Arc::new(inner), + CacheStorage::new_without_maintenance( + Duration::from_millis(100), + Default::default(), + Default::default(), + ), 1, - Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + RequiresUpdatingPrices::Yes, ); // First 3 calls: The cache is not used. Counter gets increased. @@ -774,14 +1265,14 @@ mod tests { .returning(|_, _| async { Err(PriceEstimationError::RateLimited) }.boxed()); let estimator = CachingNativePriceEstimator::new( - Box::new(inner), - Duration::from_millis(30), - Default::default(), - None, - Default::default(), + Arc::new(inner), + CacheStorage::new_without_maintenance( + Duration::from_millis(30), + Default::default(), + Default::default(), + ), 1, - Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + RequiresUpdatingPrices::Yes, ); for _ in 0..10 { @@ -797,49 +1288,53 @@ mod tests { #[tokio::test] async fn maintenance_can_limit_update_size_to_n() { - let mut inner = MockNativePriceEstimating::new(); - // first request from user - inner + // On-demand estimator for initial cache population + let mut on_demand = MockNativePriceEstimating::new(); + on_demand .expect_estimate_native_price() - .times(1) + .times(2) .returning(|passed_token, _| { - assert_eq!(passed_token, token(0)); - async { Ok(1.0) }.boxed() + let price = if passed_token == token(0) { 1.0 } else { 2.0 }; + async move { Ok(price) }.boxed() }); - // second request from user - inner + // After maintenance skips token(0), user request triggers on-demand fetch + on_demand .expect_estimate_native_price() .times(1) .returning(|passed_token, _| { - assert_eq!(passed_token, token(1)); - async { Ok(2.0) }.boxed() + assert_eq!(passed_token, token(0)); + async { Ok(3.0) }.boxed() }); - // maintenance task updates n=1 outdated prices - inner + + // Maintenance estimator updates n=1 outdated prices (most recently requested) + let mut maintenance = MockNativePriceEstimating::new(); + maintenance .expect_estimate_native_price() .times(1) .returning(|passed_token, _| { assert_eq!(passed_token, token(1)); async { Ok(4.0) }.boxed() }); - // user requested something which has been skipped by the maintenance task - inner - .expect_estimate_native_price() - .times(1) - .returning(|passed_token, _| { - assert_eq!(passed_token, token(0)); - async { Ok(3.0) }.boxed() - }); - let estimator = CachingNativePriceEstimator::new( - Box::new(inner), + let cache = CacheStorage::new_with_maintenance( Duration::from_millis(30), - Duration::from_millis(50), - Some(1), - Duration::default(), - 1, Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + Default::default(), + MaintenanceConfig { + estimator: Arc::new(maintenance), + update_interval: Duration::from_millis(50), + update_size: 1, + prefetch_time: Default::default(), + concurrent_requests: 1, + quote_timeout: HEALTHY_PRICE_ESTIMATION_TIME, + }, + ); + + let estimator = CachingNativePriceEstimator::new( + Arc::new(on_demand), + cache, + 1, + RequiresUpdatingPrices::Yes, ); // fill cache with 2 different queries @@ -855,11 +1350,13 @@ mod tests { // wait for maintenance cycle tokio::time::sleep(Duration::from_millis(60)).await; + // token(0) was not updated by maintenance (n=1 limit), triggers on-demand let result = estimator .estimate_native_price(token(0), HEALTHY_PRICE_ESTIMATION_TIME) .await; assert_eq!(result.as_ref().unwrap().to_i64().unwrap(), 3); + // token(1) was updated by maintenance let result = estimator .estimate_native_price(token(1), HEALTHY_PRICE_ESTIMATION_TIME) .await; @@ -868,26 +1365,39 @@ mod tests { #[tokio::test] async fn maintenance_can_update_all_old_queries() { - let mut inner = MockNativePriceEstimating::new(); - inner + // On-demand estimator for initial cache population + let mut on_demand = MockNativePriceEstimating::new(); + on_demand .expect_estimate_native_price() .times(10) .returning(move |_, _| async { Ok(1.0) }.boxed()); - // background task updates all outdated prices - inner + + // Maintenance estimator updates all outdated prices + let mut maintenance = MockNativePriceEstimating::new(); + maintenance .expect_estimate_native_price() .times(10) .returning(move |_, _| async { Ok(2.0) }.boxed()); - let estimator = CachingNativePriceEstimator::new( - Box::new(inner), + let cache = CacheStorage::new_with_maintenance( Duration::from_millis(30), - Duration::from_millis(50), - None, - Duration::default(), - 1, Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + Default::default(), + MaintenanceConfig { + estimator: Arc::new(maintenance), + update_interval: Duration::from_millis(50), + update_size: 0, + prefetch_time: Default::default(), + concurrent_requests: 1, + quote_timeout: HEALTHY_PRICE_ESTIMATION_TIME, + }, + ); + + let estimator = CachingNativePriceEstimator::new( + Arc::new(on_demand), + cache, + 1, + RequiresUpdatingPrices::Yes, ); let tokens: Vec<_> = (0..10).map(Address::with_last_byte).collect(); @@ -915,13 +1425,18 @@ mod tests { async fn maintenance_can_update_concurrently() { const WAIT_TIME_MS: u64 = 100; const BATCH_SIZE: usize = 100; - let mut inner = MockNativePriceEstimating::new(); - inner + + // On-demand estimator for initial cache population + let mut on_demand = MockNativePriceEstimating::new(); + on_demand .expect_estimate_native_price() .times(BATCH_SIZE) .returning(|_, _| async { Ok(1.0) }.boxed()); - // background task updates all outdated prices - inner + + // Maintenance estimator updates all outdated prices (with delay to test + // concurrency) + let mut maintenance = MockNativePriceEstimating::new(); + maintenance .expect_estimate_native_price() .times(BATCH_SIZE) .returning(move |_, _| { @@ -932,15 +1447,25 @@ mod tests { .boxed() }); - let estimator = CachingNativePriceEstimator::new( - Box::new(inner), + let cache = CacheStorage::new_with_maintenance( Duration::from_millis(30), - Duration::from_millis(50), - None, - Duration::default(), - BATCH_SIZE, Default::default(), - HEALTHY_PRICE_ESTIMATION_TIME, + Default::default(), + MaintenanceConfig { + estimator: Arc::new(maintenance), + update_interval: Duration::from_millis(50), + update_size: 0, + prefetch_time: Default::default(), + concurrent_requests: BATCH_SIZE, + quote_timeout: HEALTHY_PRICE_ESTIMATION_TIME, + }, + ); + + let estimator = CachingNativePriceEstimator::new( + Arc::new(on_demand), + cache, + 1, + RequiresUpdatingPrices::Yes, ); let tokens: Vec<_> = (0..BATCH_SIZE as u64).map(token).collect(); @@ -972,33 +1497,106 @@ mod tests { let t0 = Address::with_last_byte(0); let t1 = Address::with_last_byte(1); let now = Instant::now(); - let inner = Inner { - cache: Mutex::new( - [ - (t0, CachedResult::new(Ok(0.), now, now, Default::default())), - (t1, CachedResult::new(Ok(0.), now, now, Default::default())), - ] - .into_iter() - .collect(), - ), - high_priority: Default::default(), - estimator: Box::new(MockNativePriceEstimating::new()), - max_age: Default::default(), - concurrent_requests: 1, - approximation_tokens: Default::default(), - quote_timeout: HEALTHY_PRICE_ESTIMATION_TIME, - }; + + // Create a cache and populate it directly with `KeepPriceUpdated::Yes` + // entries (since maintenance only updates those) + let cache = CacheStorage::new_without_maintenance( + Duration::from_secs(10), + Default::default(), + Default::default(), + ); + cache.insert( + t0, + CachedResult::new(Ok(0.), now, now, Default::default(), KeepPriceUpdated::Yes), + ); + cache.insert( + t1, + CachedResult::new(Ok(0.), now, now, Default::default(), KeepPriceUpdated::Yes), + ); let now = now + Duration::from_secs(1); - *inner.high_priority.lock().unwrap() = std::iter::once(t0).collect(); - let tokens = inner.sorted_tokens_to_update(Duration::from_secs(0), now); + cache.replace_high_priority(std::iter::once(t0).collect()); + let tokens = cache.prioritized_tokens_to_update(Duration::from_secs(0), now); assert_eq!(tokens[0], t0); assert_eq!(tokens[1], t1); - *inner.high_priority.lock().unwrap() = std::iter::once(t1).collect(); - let tokens = inner.sorted_tokens_to_update(Duration::from_secs(0), now); + cache.replace_high_priority(std::iter::once(t1).collect()); + let tokens = cache.prioritized_tokens_to_update(Duration::from_secs(0), now); assert_eq!(tokens[0], t1); assert_eq!(tokens[1], t0); } + + #[tokio::test] + async fn quote_competition_estimator_preserves_keep_updated_yes() { + // This test verifies that when QuoteCompetitionEstimator requests a token + // that was previously marked with KeepPriceUpdated::Yes, the flag is preserved + // even after the cache entry expires and needs to be re-fetched. + + let mut inner = MockNativePriceEstimating::new(); + // First call: auction-related estimator fetches the price + inner + .expect_estimate_native_price() + .times(1) + .returning(|_, _| async { Ok(1.0) }.boxed()); + // Second call: QuoteCompetitionEstimator re-fetches after expiration + inner + .expect_estimate_native_price() + .times(1) + .returning(|_, _| async { Ok(2.0) }.boxed()); + + let cache = CacheStorage::new_without_maintenance( + Duration::from_millis(50), + Default::default(), + Default::default(), + ); + + // Create auction-related estimator (marks entries with KeepPriceUpdated::Yes) + let auction_estimator = CachingNativePriceEstimator::new( + Arc::new(inner), + cache.clone(), + 1, + RequiresUpdatingPrices::Yes, + ); + + // Create QuoteCompetitionEstimator (uses KeepPriceUpdated::No) + let quote_estimator = QuoteCompetitionEstimator::new(Arc::new(auction_estimator)); + + let t0 = token(0); + + // Step 1: Auction estimator fetches the price, marking it with Yes + let result = quote_estimator + .0 + .estimate_native_price(t0, HEALTHY_PRICE_ESTIMATION_TIME) + .await; + assert_eq!(result.unwrap().to_i64().unwrap(), 1); + + // Verify the entry has KeepPriceUpdated::Yes + { + let cache_guard = cache.cache.lock().unwrap(); + let entry = cache_guard.get(&t0).unwrap(); + assert_eq!(entry.update_price_continuously, KeepPriceUpdated::Yes); + } + + // Step 2: Wait for the cache entry to expire + tokio::time::sleep(Duration::from_millis(60)).await; + + // Step 3: QuoteCompetitionEstimator requests the same token (after expiration) + // This would previously downgrade the entry to KeepPriceUpdated::No + let result = quote_estimator + .estimate_native_price(t0, HEALTHY_PRICE_ESTIMATION_TIME) + .await; + assert_eq!(result.unwrap().to_i64().unwrap(), 2); + + // Step 4: Verify the entry STILL has KeepPriceUpdated::Yes (not downgraded) + { + let cache_guard = cache.cache.lock().unwrap(); + let entry = cache_guard.get(&t0).unwrap(); + assert_eq!( + entry.update_price_continuously, + KeepPriceUpdated::Yes, + "QuoteCompetitionEstimator should not downgrade KeepPriceUpdated::Yes to No" + ); + } + } }