Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions crates/database/src/jit_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use {
TransactionHash,
orders::{self, BuyTokenDestination, OrderKind, SellTokenSource, SigningScheme},
},
futures::stream::BoxStream,
sqlx::{
PgConnection,
QueryBuilder,
Expand Down Expand Up @@ -54,6 +55,16 @@ SELECT,
sqlx::query_as(QUERY).bind(uid).fetch_optional(ex).await
}

#[instrument(skip_all)]
pub async fn get_many_by_id<'a>(
ex: &'a mut PgConnection,
order_ids: &'a [OrderUid],
) -> BoxStream<'a, Result<orders::FullOrder, sqlx::Error>> {
const QUERY: &str =
const_format::concatcp!("SELECT ", SELECT, " FROM ", FROM, " WHERE o.uid = ANY($1)");
sqlx::query_as(QUERY).bind(order_ids).fetch(ex)
}

#[instrument(skip_all)]
pub async fn get_by_tx(
ex: &mut PgConnection,
Expand Down
40 changes: 27 additions & 13 deletions crates/database/src/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,21 @@ COALESCE((SELECT executed_fee_token FROM order_execution oe WHERE oe.order_uid =
"#;

pub const FROM: &str = "orders o";
const FULL_ORDER_WITH_QUOTE: &str = const_format::concatcp!(
"SELECT ",
SELECT,
", o_quotes.sell_amount as quote_sell_amount",
", o_quotes.buy_amount as quote_buy_amount",
", o_quotes.gas_amount as quote_gas_amount",
", o_quotes.gas_price as quote_gas_price",
", o_quotes.sell_token_price as quote_sell_token_price",
", o_quotes.verified as quote_verified",
", o_quotes.metadata as quote_metadata",
", o_quotes.solver as solver",
" FROM ",
FROM,
" LEFT JOIN order_quotes o_quotes ON o.uid = o_quotes.order_uid",
);

#[instrument(skip_all)]
pub async fn single_full_order_with_quote(
Expand All @@ -641,22 +656,21 @@ pub async fn single_full_order_with_quote(
) -> Result<Option<FullOrderWithQuote>, sqlx::Error> {
#[rustfmt::skip]
const QUERY: &str = const_format::concatcp!(
"SELECT ", SELECT,
", o_quotes.sell_amount as quote_sell_amount",
", o_quotes.buy_amount as quote_buy_amount",
", o_quotes.gas_amount as quote_gas_amount",
", o_quotes.gas_price as quote_gas_price",
", o_quotes.sell_token_price as quote_sell_token_price",
", o_quotes.verified as quote_verified",
", o_quotes.metadata as quote_metadata",
", o_quotes.solver as solver",
" FROM ", FROM,
" LEFT JOIN order_quotes o_quotes ON o.uid = o_quotes.order_uid",
" WHERE o.uid = $1",
);
FULL_ORDER_WITH_QUOTE,
" WHERE o.uid = $1"
);
sqlx::query_as(QUERY).bind(uid).fetch_optional(ex).await
}

#[instrument(skip_all)]
pub async fn many_full_orders_with_quotes<'a>(
ex: &'a mut PgConnection,
order_ids: &'a [OrderUid],
) -> BoxStream<'a, Result<FullOrderWithQuote, sqlx::Error>> {
const QUERY: &str = const_format::concatcp!(FULL_ORDER_WITH_QUOTE, " WHERE o.uid = ANY($1)");
sqlx::query_as(QUERY).bind(order_ids).fetch(ex)
}

// Partial query for getting the log indices of events of a single settlement.
//
// This will fail if we ever have multiple settlements in the same transaction
Expand Down
2 changes: 1 addition & 1 deletion crates/database/src/trades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ mod tests {
}

// Sort expected trades by block_number DESC (matching query ORDER BY)
expected_trades.sort_by(|a, b| b.block_number.cmp(&a.block_number));
expected_trades.sort_by_key(|b| std::cmp::Reverse(b.block_number));

// Test limit: get first 2 trades (blocks 4 and 3 in DESC order)
let result = trades(&mut db, Some(&owner), None, 0, 2)
Expand Down
5 changes: 5 additions & 0 deletions crates/orderbook/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod get_native_price;
mod get_order_by_uid;
mod get_order_status;
mod get_orders_by_tx;
mod get_orders_by_uid;
mod get_solver_competition;
mod get_solver_competition_v2;
mod get_token_metadata;
Expand Down Expand Up @@ -58,6 +59,10 @@ pub fn handle_all_routes(
"v1/create_order",
box_filter(post_order::post_order(orderbook.clone())),
),
(
"v1/get_orders",
box_filter(get_orders_by_uid::get_orders_by_uid(orderbook.clone())),
),
(
"v1/get_order",
box_filter(get_order_by_uid::get_order_by_uid(orderbook.clone())),
Expand Down
108 changes: 108 additions & 0 deletions crates/orderbook/src/api/get_orders_by_uid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use {
crate::{
api::{error, extract_payload},
orderbook::Orderbook,
},
anyhow::Result,
model::order::{Order, OrderUid},
std::{convert::Infallible, sync::Arc},
warp::{Filter, Rejection, hyper::StatusCode, reply},
};

const MAX_ORDERS_LIMIT: usize = 5000;

#[derive(Debug, Eq, PartialEq)]
enum ValidationError {
TooManyOrders(usize),
}

fn validate(uids: Vec<OrderUid>) -> Result<Vec<OrderUid>, ValidationError> {
if uids.len() > MAX_ORDERS_LIMIT {
return Err(ValidationError::TooManyOrders(uids.len()));
}
Ok(uids)
}

fn get_orders_by_uid_request()
-> impl Filter<Extract = (Result<Vec<OrderUid>, ValidationError>,), Error = Rejection> + Clone {
warp::path!("v1" / "orders" / "lookup")
.and(warp::post())
.and(extract_payload())
.map(|uids: Vec<OrderUid>| validate(uids))
}

pub fn get_orders_by_uid_response(result: Result<Vec<Order>>) -> super::ApiReply {
let orders = match result {
Ok(orders) => orders,
Err(err) => {
tracing::error!(?err, "get_orders_by_uids_response");
return crate::api::internal_error_reply();
}
};
reply::with_status(reply::json(&orders), StatusCode::OK)
}

pub fn get_orders_by_uid(
orderbook: Arc<Orderbook>,
) -> impl Filter<Extract = (super::ApiReply,), Error = Rejection> + Clone {
get_orders_by_uid_request().and_then(
move |request_result: Result<Vec<OrderUid>, ValidationError>| {
let orderbook = orderbook.clone();
async move {
Result::<_, Infallible>::Ok(match request_result {
Ok(uids) => {
let result = orderbook.get_orders(&uids).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will accumulate all the orders in memory, which is probably not a good idea. Can we instead return a stream of data directly from the DB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stream is problematic to implement since an order can be either a regular one or a Jit order. It is doable though although I am not sure if this its this PR.

Additionally, if keeping the X amount of orders is too much, we can lower the limit I've beset (5K), to a more acceptable level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate? sqlx::fetch already returns a stream, no? Then, axum::Body::from_stream() should send a stream of data back to the client.

get_orders_by_uid_response(result)
}
Err(ValidationError::TooManyOrders(requested)) => {
let err = error(
"TooManyOrders",
format!(
"Too many order UIDs requested: {requested}. Maximum allowed: \
{MAX_ORDERS_LIMIT}"
),
);
reply::with_status(err, StatusCode::BAD_REQUEST)
}
})
}
},
)
}

#[cfg(test)]
mod tests {
use {super::*, warp::test::request};

#[tokio::test]
async fn get_orders_by_uid_request_ok() {
let uid = OrderUid::default();
let request = request()
.path("/v1/orders/lookup")
.method("POST")
.header("content-type", "application-json")
.json(&[uid]);

let filter = get_orders_by_uid_request();
let result = request.filter(&filter).await.unwrap().unwrap();
assert_eq!(result, [uid]);
}

#[tokio::test]
async fn get_orders_by_uid_request_too_many_orders() {
let mut uids = Vec::new();
for _ in 0..(MAX_ORDERS_LIMIT + 1) {
uids.push(OrderUid::default());
}
let request = request()
.path("/v1/orders/lookup")
.method("POST")
.header("content-type", "application-json")
.json(&uids);

let filter = get_orders_by_uid_request();
let result = request.filter(&filter).await;
// Assert that the error is a rejection.
assert!(result.is_err());
}
}
30 changes: 30 additions & 0 deletions crates/orderbook/src/database/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub trait OrderStoring: Send + Sync {
) -> Result<Vec<Order>>;
async fn latest_order_event(&self, order_uid: &OrderUid) -> Result<Option<OrderEvent>>;
async fn single_order(&self, uid: &OrderUid) -> Result<Option<Order>>;
async fn many_orders(&self, uids: &[OrderUid]) -> Result<Vec<Order>>;
}

#[derive(Debug)]
Expand Down Expand Up @@ -313,6 +314,35 @@ impl OrderStoring for Postgres {
.transpose()
}

async fn many_orders(&self, uids: &[OrderUid]) -> Result<Vec<Order>> {
let _timer = super::Metrics::get()
.database_queries
.with_label_values(&["many_orders"])
.start_timer();
let mut ex = self.pool.acquire().await?;
let uids = uids.iter().map(|uid| ByteArray(uid.0)).collect::<Vec<_>>();

let orders: Vec<Result<Order>> =
orders::many_full_orders_with_quotes(&mut ex, uids.as_slice())
.await
.filter_map(async |order| order.ok())
.map(|order| {
let (order, quote) = order.into_order_and_quote();
full_order_with_quote_into_model_order(order, quote.as_ref())
})
.collect()
.await;
let jit_orders: Vec<Result<Order>> =
database::jit_orders::get_many_by_id(&mut ex, uids.as_slice())
.await
.filter_map(async |order| order.ok())
.map(full_order_into_model_order)
.collect()
.await;

orders.into_iter().chain(jit_orders).collect()
}

async fn orders_for_tx(&self, tx_hash: &B256) -> Result<Vec<Order>> {
tokio::try_join!(
self.user_order_for_tx(tx_hash),
Expand Down
4 changes: 4 additions & 0 deletions crates/orderbook/src/orderbook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,10 @@ impl Orderbook {
self.database_replica.single_order(uid).await
}

pub async fn get_orders(&self, uids: &[OrderUid]) -> Result<Vec<Order>> {
self.database_replica.many_orders(uids).await
}

pub async fn get_orders_for_tx(&self, hash: &B256) -> Result<Vec<Order>> {
self.database_replica.orders_for_tx(hash).await
}
Expand Down
Loading