From bfee13c82a5a17ef188d8171485767ada6e0df8a Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Mon, 16 Mar 2026 15:12:44 +0000 Subject: [PATCH 1/7] IO Scheduler experiment Signed-off-by: Adam Gutglick --- vortex-array/src/arrow/executor/list.rs | 19 + vortex-datafusion/src/persistent/opener.rs | 4 + .../src/e2e_test/vortex_scan_test.rs | 301 ++++++ vortex-duckdb/src/exporter/varbinview.rs | 1 - vortex-layout/public-api.lock | 8 + vortex-layout/src/layouts/chunked/reader.rs | 38 + vortex-layout/src/layouts/dict/reader.rs | 9 + vortex-layout/src/layouts/flat/reader.rs | 69 +- vortex-layout/src/layouts/row_idx/mod.rs | 9 + vortex-layout/src/layouts/struct_/reader.rs | 67 ++ vortex-layout/src/layouts/zoned/reader.rs | 9 + vortex-layout/src/reader.rs | 86 ++ vortex-scan/public-api.lock | 16 +- vortex-scan/src/api.rs | 18 + vortex-scan/src/layout.rs | 21 +- vortex-scan/src/multi.rs | 14 +- vortex-scan/src/repeated_scan.rs | 950 ++++++++++++++++-- vortex-scan/src/scan_builder.rs | 592 +++++++++-- vortex-scan/src/split_by.rs | 49 +- vortex-scan/src/splits.rs | 71 +- vortex-scan/src/tasks.rs | 359 ++++++- 21 files changed, 2509 insertions(+), 201 deletions(-) diff --git a/vortex-array/src/arrow/executor/list.rs b/vortex-array/src/arrow/executor/list.rs index a3326115608..f035f997994 100644 --- a/vortex-array/src/arrow/executor/list.rs +++ b/vortex-array/src/arrow/executor/list.rs @@ -17,6 +17,7 @@ use crate::ArrayRef; use crate::Canonical; use crate::DynArray; use crate::ExecutionCtx; +use crate::arrays::Chunked; use crate::arrays::List; use crate::arrays::ListArray; use crate::arrays::ListView; @@ -55,6 +56,24 @@ pub(super) fn to_arrow_list( Err(a) => a, }; + // Handle ChunkedArray by converting each chunk individually. + // This preserves the fast list_to_list path for inner ListArray chunks + // instead of falling through to the expensive execute:: path. + if let Some(chunked) = array.as_opt::() { + let mut arrow_chunks: Vec = Vec::with_capacity(chunked.nchunks()); + for chunk in chunked.chunks() { + arrow_chunks.push(to_arrow_list::(chunk.clone(), elements_field, ctx)?); + } + if arrow_chunks.len() == 1 { + return Ok(arrow_chunks + .into_iter() + .next() + .vortex_expect("known length")); + } + let refs: Vec<&dyn arrow_array::Array> = arrow_chunks.iter().map(|a| a.as_ref()).collect(); + return Ok(arrow_select::concat::concat(&refs)?); + } + // Otherwise, we execute the array to become a ListViewArray, then rebuild to ZCTL. // Note: arrow_cast::cast supports ListView → List (apache/arrow-rs#8735), but it // unconditionally uses take. Our rebuild uses a heuristic that picks list-by-list diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 5986a06da49..006bf51aaf1 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -42,6 +42,8 @@ use vortex::layout::LayoutReader; use vortex::metrics::Label; use vortex::metrics::MetricsRegistry; use vortex::scan::ScanBuilder; +use vortex::scan::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT; +use vortex::scan::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; @@ -361,6 +363,8 @@ impl FileOpener for VortexOpener { .with_projection(scan_projection) .with_some_filter(filter) .with_ordered(has_output_ordering) + .with_target_output_rows(DEFAULT_TARGET_OUTPUT_ROWS_HINT) + .with_target_output_bytes(DEFAULT_TARGET_OUTPUT_BYTES_HINT) .map(move |chunk| { let mut ctx = session.create_execution_ctx(); chunk.execute_record_batch(&stream_schema, &mut ctx) diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index 75db1ac3cd6..5209f9c7829 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -3,6 +3,7 @@ //! This module contains tests for the `vortex_scan` table function. +use std::collections::BTreeMap; use std::ffi::CStr; use std::io::Write; use std::net::TcpListener; @@ -174,6 +175,28 @@ async fn write_vortex_file_to_dir( temp_file_path } +async fn write_vortex_struct_file_to_dir( + dir: &Path, + prefix: &str, + iter: impl Iterator, impl IntoArray)>, +) -> NamedTempFile { + let struct_array = StructArray::try_from_iter(iter).unwrap(); + let temp_file_path = tempfile::Builder::new() + .prefix(prefix) + .suffix(".vortex") + .tempfile_in(dir) + .unwrap(); + + let mut file = async_fs::File::create(&temp_file_path).await.unwrap(); + SESSION + .write_options() + .write(&mut file, struct_array.to_array_stream()) + .await + .unwrap(); + + temp_file_path +} + #[test] fn test_scan_function_registration() { let conn = database_connection(); @@ -350,6 +373,284 @@ fn test_vortex_scan_multiple_files() { assert_eq!(total_sum, 21); } +#[test] +fn test_vortex_scan_tpch_q13_style_left_join_over_multifile_orders() { + let (tempdir, _customer_file, _orders_file1, _orders_file2) = RUNTIME.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + + let customer_file = write_vortex_struct_file_to_dir( + tempdir.path(), + "customer_", + [ + ("c_custkey", buffer![1i64, 2, 3, 4, 5].into_array()), + ( + "c_comment", + VarBinArray::from(vec!["c1", "c2", "c3", "c4", "c5"]).into_array(), + ), + ] + .into_iter(), + ) + .await; + + let orders_file1 = write_vortex_struct_file_to_dir( + tempdir.path(), + "orders_", + [ + ("o_orderkey", buffer![10i64, 20, 30].into_array()), + ("o_custkey", buffer![1i64, 2, 3].into_array()), + ( + "o_comment", + VarBinArray::from(vec![ + "ordinary order", + "special handling requests", + "regular comment", + ]) + .into_array(), + ), + ] + .into_iter(), + ) + .await; + + let orders_file2 = write_vortex_struct_file_to_dir( + tempdir.path(), + "orders_", + [ + ("o_orderkey", buffer![11i64, 21, 40].into_array()), + ("o_custkey", buffer![1i64, 2, 4].into_array()), + ( + "o_comment", + VarBinArray::from(vec![ + "special service requests", + "another normal order", + "special packaging requests", + ]) + .into_array(), + ), + ] + .into_iter(), + ) + .await; + + (tempdir, customer_file, orders_file1, orders_file2) + }); + + let customer_glob = format!("{}/customer_*.vortex", tempdir.path().display()); + let orders_glob = format!("{}/orders_*.vortex", tempdir.path().display()); + + let conn = database_connection(); + conn.query(&format!( + "CREATE OR REPLACE VIEW customer AS SELECT * FROM read_vortex('{customer_glob}') WHERE c_custkey IS NOT NULL" + )) + .unwrap(); + conn.query(&format!( + "CREATE OR REPLACE VIEW orders AS SELECT * FROM read_vortex('{orders_glob}') WHERE o_orderkey IS NOT NULL" + )) + .unwrap(); + + let result = conn + .query( + " + SELECT + c_count, + count(*) AS custdist + FROM ( + SELECT + c_custkey, + count(o_orderkey) AS c_count + FROM + customer + LEFT OUTER JOIN orders + ON c_custkey = o_custkey + AND o_comment NOT LIKE '%special%requests%' + GROUP BY + c_custkey + ) AS c_orders(c_custkey, c_count) + GROUP BY + c_count + ORDER BY + custdist DESC, + c_count DESC + ", + ) + .unwrap(); + + let mut rows = Vec::new(); + for chunk in result { + let len = chunk.len().as_(); + let counts = chunk.get_vector(0); + let dists = chunk.get_vector(1); + let counts = counts.as_slice_with_len::(len); + let dists = dists.as_slice_with_len::(len); + rows.extend(counts.iter().copied().zip(dists.iter().copied())); + } + + assert_eq!(rows, vec![(1, 3), (0, 2)]); +} + +#[test] +fn test_vortex_scan_tpch_q13_style_large_multifile_orders_matches_expected() { + const CUSTOMER_COUNT: i64 = 2048; + const ORDER_FILE_COUNT: usize = 3; + const INCLUDED_MODULUS: i64 = 64; + const EXCLUDED_MODULUS: i64 = 7; + + let mut expected_counts = BTreeMap::::new(); + let (tempdir, _customer_file, _order_files) = RUNTIME.block_on(async { + let tempdir = tempfile::tempdir().unwrap(); + + let customer_keys: Vec = (1..=CUSTOMER_COUNT).collect(); + let customer_comments: Vec = (1..=CUSTOMER_COUNT) + .map(|custkey| format!("customer {custkey}")) + .collect(); + let customer_file = write_vortex_struct_file_to_dir( + tempdir.path(), + "customer_", + [ + ( + "c_custkey", + customer_keys + .into_iter() + .collect::() + .into_array(), + ), + ( + "c_comment", + VarBinArray::from(customer_comments).into_array(), + ), + ] + .into_iter(), + ) + .await; + + let mut order_keys_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; + let mut order_custkeys_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; + let mut order_comments_per_file = vec![Vec::new(); ORDER_FILE_COUNT]; + let mut next_orderkey = 1_i64; + + for custkey in 1..=CUSTOMER_COUNT { + let included = custkey % INCLUDED_MODULUS; + let excluded = custkey % EXCLUDED_MODULUS; + *expected_counts.entry(included).or_default() += 1; + + for _ in 0..included { + let file_idx = usize::try_from(next_orderkey).unwrap() % ORDER_FILE_COUNT; + order_keys_per_file[file_idx].push(next_orderkey); + order_custkeys_per_file[file_idx].push(custkey); + order_comments_per_file[file_idx].push("ordinary order".to_string()); + next_orderkey += 1; + } + + for _ in 0..excluded { + let file_idx = usize::try_from(next_orderkey).unwrap() % ORDER_FILE_COUNT; + order_keys_per_file[file_idx].push(next_orderkey); + order_custkeys_per_file[file_idx].push(custkey); + order_comments_per_file[file_idx].push("special handling requests".to_string()); + next_orderkey += 1; + } + } + + let mut order_files = Vec::with_capacity(ORDER_FILE_COUNT); + for file_idx in 0..ORDER_FILE_COUNT { + order_files.push( + write_vortex_struct_file_to_dir( + tempdir.path(), + "orders_", + [ + ( + "o_orderkey", + std::mem::take(&mut order_keys_per_file[file_idx]) + .into_iter() + .collect::() + .into_array(), + ), + ( + "o_custkey", + std::mem::take(&mut order_custkeys_per_file[file_idx]) + .into_iter() + .collect::() + .into_array(), + ), + ( + "o_comment", + VarBinArray::from(std::mem::take( + &mut order_comments_per_file[file_idx], + )) + .into_array(), + ), + ] + .into_iter(), + ) + .await, + ); + } + + (tempdir, customer_file, order_files) + }); + + let customer_glob = format!("{}/customer_*.vortex", tempdir.path().display()); + let orders_glob = format!("{}/orders_*.vortex", tempdir.path().display()); + + let mut expected_rows: Vec<(i64, i64)> = expected_counts.into_iter().collect(); + expected_rows.sort_by(|(left_count, left_dist), (right_count, right_dist)| { + right_dist + .cmp(left_dist) + .then_with(|| right_count.cmp(left_count)) + }); + + let conn = database_connection(); + conn.query(&format!( + "CREATE OR REPLACE VIEW customer AS SELECT * FROM read_vortex('{customer_glob}')" + )) + .unwrap(); + conn.query(&format!( + "CREATE OR REPLACE VIEW orders AS SELECT * FROM read_vortex('{orders_glob}')" + )) + .unwrap(); + + let result = conn + .query( + " + SELECT + c_count, + count(*) AS custdist + FROM ( + SELECT + c_custkey, + count(o_orderkey) AS c_count + FROM + customer + LEFT OUTER JOIN orders + ON c_custkey = o_custkey + AND o_comment NOT LIKE '%special%requests%' + GROUP BY + c_custkey + ) AS c_orders(c_custkey, c_count) + GROUP BY + c_count + ORDER BY + custdist DESC, + c_count DESC + ", + ) + .unwrap(); + + let row_count = result.row_count(); + let mut actual_rows = Vec::new(); + for chunk in result { + let len = chunk.len().as_(); + let counts = chunk.get_vector(0); + let dists = chunk.get_vector(1); + let counts = counts.as_slice_with_len::(len); + let dists = dists.as_slice_with_len::(len); + actual_rows.extend(counts.iter().copied().zip(dists.iter().copied())); + } + + assert_eq!(usize::try_from(row_count).unwrap(), expected_rows.len()); + assert_eq!(actual_rows.len(), expected_rows.len()); + assert_eq!(actual_rows, expected_rows); +} + #[test] fn test_vortex_scan_over_http() { let file = RUNTIME.block_on(async { diff --git a/vortex-duckdb/src/exporter/varbinview.rs b/vortex-duckdb/src/exporter/varbinview.rs index 13a59d242ef..6ff01dac10b 100644 --- a/vortex-duckdb/src/exporter/varbinview.rs +++ b/vortex-duckdb/src/exporter/varbinview.rs @@ -123,7 +123,6 @@ fn to_ptr_binary_view<'a>( _ref: PtrRef { size: v.len(), prefix: view.prefix, - // TODO(joe) verify this. ptr: unsafe { buffers[view.buffer_index as usize] .as_ptr() diff --git a/vortex-layout/public-api.lock b/vortex-layout/public-api.lock index 2cb3dec9d9a..398a0c49365 100644 --- a/vortex-layout/public-api.lock +++ b/vortex-layout/public-api.lock @@ -638,6 +638,8 @@ pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&sel pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::row_count(&self) -> u64 +pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::split_points(&self, field_mask: alloc::vec::Vec, row_range: core::ops::range::Range) -> vortex_error::VortexResult + pub fn vortex_layout::layouts::row_idx::row_idx() -> vortex_array::expr::expression::Expression pub mod vortex_layout::layouts::struct_ @@ -1674,6 +1676,8 @@ pub fn vortex_layout::LayoutReader::register_splits(&self, field_mask: &[vortex_ pub fn vortex_layout::LayoutReader::row_count(&self) -> u64 +pub fn vortex_layout::LayoutReader::split_points(&self, field_mask: alloc::vec::Vec, row_range: core::ops::range::Range) -> vortex_error::VortexResult + impl vortex_layout::LayoutReader for vortex_layout::layouts::row_idx::RowIdxLayoutReader pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::dtype(&self) -> &vortex_array::dtype::DType @@ -1690,6 +1694,8 @@ pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::register_splits(&sel pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::row_count(&self) -> u64 +pub fn vortex_layout::layouts::row_idx::RowIdxLayoutReader::split_points(&self, field_mask: alloc::vec::Vec, row_range: core::ops::range::Range) -> vortex_error::VortexResult + pub trait vortex_layout::LayoutStrategy: 'static + core::marker::Send + core::marker::Sync pub fn vortex_layout::LayoutStrategy::buffered_bytes(&self) -> u64 @@ -1969,3 +1975,5 @@ pub type vortex_layout::LayoutId = arcref::ArcRef pub type vortex_layout::LayoutReaderRef = alloc::sync::Arc pub type vortex_layout::LayoutRef = alloc::sync::Arc + +pub type vortex_layout::SplitPointIter = alloc::boxed::Box<(dyn core::iter::traits::iterator::Iterator + core::marker::Send)> diff --git a/vortex-layout/src/layouts/chunked/reader.rs b/vortex-layout/src/layouts/chunked/reader.rs index 6a0c63b6eb5..b2030f4124f 100644 --- a/vortex-layout/src/layouts/chunked/reader.rs +++ b/vortex-layout/src/layouts/chunked/reader.rs @@ -27,6 +27,8 @@ use vortex_session::VortexSession; use crate::LayoutReaderRef; use crate::LazyReaderChildren; +use crate::SplitPointIter; +use crate::concat_split_point_iters; use crate::layouts::chunked::ChunkedLayout; use crate::reader::LayoutReader; use crate::segments::SegmentSource; @@ -202,6 +204,28 @@ impl LayoutReader for ChunkedReader { Ok(()) } + fn split_points( + &self, + field_mask: Vec, + row_range: Range, + ) -> VortexResult { + if row_range.is_empty() { + return Ok(Box::new(std::iter::empty())); + } + + let mut iters = Vec::new(); + for (chunk_idx, chunk_range, _) in self.ranges(&row_range) { + let child = self.chunk_reader(chunk_idx)?.clone(); + let chunk_offset = self.chunk_offset(chunk_idx); + let split_points = child.split_points(field_mask.clone(), chunk_range)?; + iters.push( + Box::new(split_points.map(move |point| point + chunk_offset)) as SplitPointIter, + ); + } + + Ok(concat_split_point_iters(iters)) + } + fn pruning_evaluation( &self, row_range: &Range, @@ -338,6 +362,7 @@ mod test { use vortex_array::MaskFuture; use vortex_array::assert_arrays_eq; use vortex_array::dtype::DType; + use vortex_array::dtype::FieldMask; use vortex_array::dtype::Nullability::NonNullable; use vortex_array::dtype::PType; use vortex_array::expr::root; @@ -406,4 +431,17 @@ mod test { assert_arrays_eq!(result.as_ref(), expected.as_ref()); }) } + + #[rstest] + fn test_chunked_split_points_are_absolute( + #[from(chunked_layout)] (segments, layout): (Arc, LayoutRef), + ) { + let reader = layout.new_reader("".into(), segments, &SESSION).unwrap(); + let split_points = reader + .split_points(vec![FieldMask::All], 2..8) + .unwrap() + .collect::>(); + + assert_eq!(split_points, vec![3, 6, 8]); + } } diff --git a/vortex-layout/src/layouts/dict/reader.rs b/vortex-layout/src/layouts/dict/reader.rs index ded15f6ace0..ce2b9673cca 100644 --- a/vortex-layout/src/layouts/dict/reader.rs +++ b/vortex-layout/src/layouts/dict/reader.rs @@ -33,6 +33,7 @@ use vortex_utils::aliases::dash_map::DashMap; use super::DictLayout; use crate::LayoutReader; use crate::LayoutReaderRef; +use crate::SplitPointIter; use crate::layouts::SharedArrayFuture; use crate::segments::SegmentSource; @@ -173,6 +174,14 @@ impl LayoutReader for DictReader { self.codes.register_splits(field_mask, row_range, splits) } + fn split_points( + &self, + field_mask: Vec, + row_range: Range, + ) -> VortexResult { + self.codes.split_points(field_mask, row_range) + } + fn pruning_evaluation( &self, _row_range: &Range, diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index 727566d3db8..7a370bf1a85 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -5,6 +5,7 @@ use std::collections::BTreeSet; use std::ops::BitAnd; use std::ops::Range; use std::sync::Arc; +use std::sync::OnceLock; use futures::FutureExt; use futures::future::BoxFuture; @@ -22,6 +23,7 @@ use vortex_mask::Mask; use vortex_session::VortexSession; use crate::LayoutReader; +use crate::SplitPointIter; use crate::layouts::SharedArrayFuture; use crate::layouts::flat::FlatLayout; use crate::segments::SegmentSource; @@ -38,6 +40,7 @@ pub struct FlatReader { name: Arc, segment_source: Arc, session: VortexSession, + array: OnceLock, } impl FlatReader { @@ -52,38 +55,38 @@ impl FlatReader { name, segment_source, session, + array: Default::default(), } } /// Register the segment request and return a future that would resolve into the deserialised array. fn array_future(&self) -> SharedArrayFuture { - let row_count = - usize::try_from(self.layout.row_count()).vortex_expect("row count must fit in usize"); - - // We create the segment_fut here to ensure we give the segment reader visibility into - // how to prioritize this segment, even if the `array` future has already been initialized. - // This is gross... see the function's TODO for a maybe better solution? - let segment_fut = self.segment_source.request(self.layout.segment_id()); - - let ctx = self.layout.array_ctx().clone(); - let session = self.session.clone(); - let dtype = self.layout.dtype().clone(); - let array_tree = self.layout.array_tree().cloned(); - async move { - let segment = segment_fut.await?; - let parts = if let Some(array_tree) = array_tree { - // Use the pre-stored flatbuffer from layout metadata combined with segment buffers. - ArrayParts::from_flatbuffer_and_segment(array_tree, segment)? - } else { - // Parse the flatbuffer from the segment itself. - ArrayParts::try_from(segment)? - }; - parts - .decode(&dtype, row_count, &ctx, &session) - .map_err(Arc::new) - } - .boxed() - .shared() + self.array + .get_or_init(|| { + let row_count = usize::try_from(self.layout.row_count()) + .vortex_expect("row count must fit in usize"); + let segment_fut = self.segment_source.request(self.layout.segment_id()); + let ctx = self.layout.array_ctx().clone(); + let session = self.session.clone(); + let dtype = self.layout.dtype().clone(); + let array_tree = self.layout.array_tree().cloned(); + async move { + let segment = segment_fut.await?; + let parts = if let Some(array_tree) = array_tree { + // Use the pre-stored flatbuffer from layout metadata combined with segment buffers. + ArrayParts::from_flatbuffer_and_segment(array_tree, segment)? + } else { + // Parse the flatbuffer from the segment itself. + ArrayParts::try_from(segment)? + }; + parts + .decode(&dtype, row_count, &ctx, &session) + .map_err(Arc::new) + } + .boxed() + .shared() + }) + .clone() } } @@ -110,6 +113,18 @@ impl LayoutReader for FlatReader { Ok(()) } + fn split_points( + &self, + _field_mask: Vec, + row_range: Range, + ) -> VortexResult { + if row_range.is_empty() { + return Ok(Box::new(std::iter::empty())); + } + + Ok(Box::new(std::iter::once(row_range.end))) + } + fn pruning_evaluation( &self, _row_range: &Range, diff --git a/vortex-layout/src/layouts/row_idx/mod.rs b/vortex-layout/src/layouts/row_idx/mod.rs index 8ce4a2f5f4b..d9f618575a8 100644 --- a/vortex-layout/src/layouts/row_idx/mod.rs +++ b/vortex-layout/src/layouts/row_idx/mod.rs @@ -42,6 +42,7 @@ use vortex_utils::aliases::dash_map::DashMap; use crate::ArrayFuture; use crate::LayoutReader; +use crate::SplitPointIter; use crate::layouts::partitioned::PartitionedExprEval; pub struct RowIdxLayoutReader { @@ -175,6 +176,14 @@ impl LayoutReader for RowIdxLayoutReader { self.child.register_splits(field_mask, row_range, splits) } + fn split_points( + &self, + field_mask: Vec, + row_range: Range, + ) -> VortexResult { + self.child.split_points(field_mask, row_range) + } + fn pruning_evaluation( &self, row_range: &Range, diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index bb069df9723..687cdaf4b15 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -43,8 +43,10 @@ use crate::ArrayFuture; use crate::LayoutReader; use crate::LayoutReaderRef; use crate::LazyReaderChildren; +use crate::SplitPointIter; use crate::layouts::partitioned::PartitionedExprEval; use crate::layouts::struct_::StructLayout; +use crate::merge_split_point_iters; use crate::segments::SegmentSource; pub struct StructReader { @@ -151,6 +153,49 @@ impl StructReader { .transpose() } + fn split_children( + &self, + field_mask: Vec, + ) -> VortexResult)>> { + let mut children = Vec::new(); + + if let Some(validity) = self.validity()? { + children.push((validity.clone(), vec![FieldMask::All])); + } + + if field_mask.iter().any(FieldMask::matches_all) { + for idx in 0..self.struct_fields().nfields() { + children.push(( + self.field_reader_by_index(idx)?.clone(), + vec![FieldMask::All], + )); + } + return Ok(children); + } + + let mut grouped = HashMap::>::default(); + for mask in field_mask { + let Some(field) = mask.starting_field()? else { + continue; + }; + let idx = self + .struct_fields() + .find( + field + .as_name() + .vortex_expect("struct fields are always named"), + ) + .ok_or_else(|| vortex_err!("Field not found: {field:?}"))?; + grouped.entry(idx).or_default().push(mask.step_into()?); + } + + for (idx, masks) in grouped { + children.push((self.field_reader_by_index(idx)?.clone(), masks)); + } + + Ok(children) + } + /// Utility for partitioning an expression over the fields of a struct. fn partition_expr(&self, expr: Expression) -> Partitioned { let key = ExactExpr(expr.clone()); @@ -258,6 +303,28 @@ impl LayoutReader for StructReader { }) } + fn split_points( + &self, + field_mask: Vec, + row_range: Range, + ) -> VortexResult { + if row_range.is_empty() { + return Ok(Box::new(std::iter::empty())); + } + + let children = self.split_children(field_mask)?; + if children.is_empty() { + return Ok(Box::new(std::iter::once(row_range.end))); + } + + let mut iters = Vec::with_capacity(children.len()); + for (child, masks) in children { + iters.push(child.split_points(masks, row_range.clone())?); + } + + Ok(merge_split_point_iters(iters)) + } + fn pruning_evaluation( &self, row_range: &Range, diff --git a/vortex-layout/src/layouts/zoned/reader.rs b/vortex-layout/src/layouts/zoned/reader.rs index b4c30610467..902cb6ebcf9 100644 --- a/vortex-layout/src/layouts/zoned/reader.rs +++ b/vortex-layout/src/layouts/zoned/reader.rs @@ -37,6 +37,7 @@ use vortex_utils::aliases::dash_map::DashMap; use crate::LayoutReader; use crate::LayoutReaderRef; use crate::LazyReaderChildren; +use crate::SplitPointIter; use crate::layouts::zoned::ZonedLayout; use crate::layouts::zoned::zone_map::ZoneMap; use crate::segments::SegmentSource; @@ -236,6 +237,14 @@ impl LayoutReader for ZonedReader { .register_splits(field_mask, row_range, splits) } + fn split_points( + &self, + field_mask: Vec, + row_range: Range, + ) -> VortexResult { + self.data_child()?.split_points(field_mask, row_range) + } + fn pruning_evaluation( &self, row_range: &Range, diff --git a/vortex-layout/src/reader.rs b/vortex-layout/src/reader.rs index 9ccd695c572..5789aaf7056 100644 --- a/vortex-layout/src/reader.rs +++ b/vortex-layout/src/reader.rs @@ -24,6 +24,7 @@ use crate::children::LayoutChildren; use crate::segments::SegmentSource; pub type LayoutReaderRef = Arc; +pub type SplitPointIter = Box + Send>; /// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple /// evaluation operations. @@ -46,6 +47,35 @@ pub trait LayoutReader: 'static + Send + Sync { splits: &mut BTreeSet, ) -> VortexResult<()>; + /// Return ordered split points for the requested row range. + /// + /// Split points are absolute row offsets greater than `row_range.start` and less than or equal + /// to `row_range.end`. The final split point must be `row_range.end` when the range is + /// non-empty. + fn split_points( + &self, + field_mask: Vec, + row_range: Range, + ) -> VortexResult { + if row_range.is_empty() { + return Ok(Box::new(std::iter::empty())); + } + + let mut splits = BTreeSet::new(); + self.register_splits(&field_mask, &row_range, &mut splits)?; + + let mut points = splits + .into_iter() + .filter(|point| *point > row_range.start && *point <= row_range.end) + .collect::>(); + + if points.last().copied() != Some(row_range.end) { + points.push(row_range.end); + } + + Ok(Box::new(points.into_iter())) + } + /// Returns a mask where all false values are proven to be false in the given expression. /// /// The returned mask **does not** need to have been intersected with the input mask. @@ -89,6 +119,62 @@ pub trait LayoutReader: 'static + Send + Sync { ) -> VortexResult; } +pub(crate) fn concat_split_point_iters(iters: Vec) -> SplitPointIter { + Box::new(ConcatSplitPointIter { iters, current: 0 }) +} + +pub(crate) fn merge_split_point_iters(iters: Vec) -> SplitPointIter { + Box::new(MergeSplitPointIter::new(iters)) +} + +struct ConcatSplitPointIter { + iters: Vec, + current: usize, +} + +impl Iterator for ConcatSplitPointIter { + type Item = u64; + + fn next(&mut self) -> Option { + while let Some(iter) = self.iters.get_mut(self.current) { + if let Some(point) = iter.next() { + return Some(point); + } + self.current += 1; + } + + None + } +} + +struct MergeSplitPointIter { + iters: Vec, + heads: Vec>, +} + +impl MergeSplitPointIter { + fn new(mut iters: Vec) -> Self { + let heads = iters.iter_mut().map(Iterator::next).collect(); + Self { iters, heads } + } +} + +impl Iterator for MergeSplitPointIter { + type Item = u64; + + fn next(&mut self) -> Option { + let next_point = self.heads.iter().flatten().min().copied()?; + + for (head, iter) in self.heads.iter_mut().zip(self.iters.iter_mut()) { + if *head == Some(next_point) { + *head = iter.next(); + } + } + + Some(next_point) + } +} + pub type ArrayFuture = BoxFuture<'static, VortexResult>; pub trait ArrayFutureExt { diff --git a/vortex-scan/public-api.lock b/vortex-scan/public-api.lock index 7456d12de14..3cb1234e4b3 100644 --- a/vortex-scan/public-api.lock +++ b/vortex-scan/public-api.lock @@ -16,6 +16,10 @@ pub vortex_scan::api::ScanRequest::row_range: core::option::Option + +pub vortex_scan::api::ScanRequest::target_output_rows: core::option::Option + impl core::clone::Clone for vortex_scan::api::ScanRequest pub fn vortex_scan::api::ScanRequest::clone(&self) -> vortex_scan::api::ScanRequest @@ -28,6 +32,10 @@ impl core::fmt::Debug for vortex_scan::api::ScanRequest pub fn vortex_scan::api::ScanRequest::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +pub const vortex_scan::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT: usize + +pub const vortex_scan::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT: usize + pub trait vortex_scan::api::DataSource: 'static + core::marker::Send + core::marker::Sync pub fn vortex_scan::api::DataSource::byte_size(&self) -> core::option::Option> @@ -234,6 +242,8 @@ pub vortex_scan::SplitBy::RowCount(usize) impl vortex_scan::SplitBy +pub fn vortex_scan::SplitBy::split_points(&self, layout_reader: &dyn vortex_layout::reader::LayoutReader, row_range: core::ops::range::Range, field_mask: alloc::vec::Vec) -> vortex_error::VortexResult + pub fn vortex_scan::SplitBy::splits(&self, layout_reader: &dyn vortex_layout::reader::LayoutReader, row_range: &core::ops::range::Range, field_mask: &[vortex_array::dtype::field_mask::FieldMask]) -> vortex_error::VortexResult> impl core::clone::Clone for vortex_scan::SplitBy @@ -264,8 +274,6 @@ impl vortex_scan::RepeatedScan pub fn vortex_scan::RepeatedScan::execute(&self, row_range: core::option::Option>) -> vortex_error::VortexResult>>>> -pub fn vortex_scan::RepeatedScan::execute_stream(&self, row_range: core::option::Option>) -> vortex_error::VortexResult> + core::marker::Send + 'static + use> - pub struct vortex_scan::ScanBuilder impl vortex_scan::ScanBuilder @@ -329,3 +337,7 @@ pub fn vortex_scan::ScanBuilder::with_some_limit(self, limit: core::option::O pub fn vortex_scan::ScanBuilder::with_some_metrics_registry(self, metrics: core::option::Option>) -> Self pub fn vortex_scan::ScanBuilder::with_split_by(self, split_by: vortex_scan::SplitBy) -> Self + +pub fn vortex_scan::ScanBuilder::with_target_output_bytes(self, target_output_bytes: usize) -> Self + +pub fn vortex_scan::ScanBuilder::with_target_output_rows(self, target_output_rows: usize) -> Self diff --git a/vortex-scan/src/api.rs b/vortex-scan/src/api.rs index f090dd2cd79..09a3a8ab6bf 100644 --- a/vortex-scan/src/api.rs +++ b/vortex-scan/src/api.rs @@ -41,6 +41,18 @@ use crate::Selection; /// A sendable stream of partitions. pub type PartitionStream = BoxStream<'static, VortexResult>; +/// Shared default lower bound for rows accumulated before materializing projected data. +/// +/// Engines that drive scans via [`ScanRequest`] can use this to align with the tuned Vortex +/// execution path used by DataFusion. +pub const DEFAULT_TARGET_OUTPUT_ROWS_HINT: usize = 64 * 1024; + +/// Shared default lower bound for projected payload bytes accumulated before materialization. +/// +/// Engines that drive scans via [`ScanRequest`] can use this to align with the tuned Vortex +/// execution path used by DataFusion. +pub const DEFAULT_TARGET_OUTPUT_BYTES_HINT: usize = 4 << 20; + /// Opens a Vortex [`DataSource`] from a URI. /// /// Configuration can be passed via the URI query parameters, similar to JDBC / ADBC. @@ -126,6 +138,10 @@ pub struct ScanRequest { /// Optional limit on the number of rows returned by scan. Limits are applied after all /// filtering and row selection. pub limit: Option, + /// Preferred lower bound for rows accumulated before materializing projected data. + pub target_output_rows: Option, + /// Preferred lower bound for projected payload bytes accumulated before materialization. + pub target_output_bytes: Option, } impl Default for ScanRequest { @@ -137,6 +153,8 @@ impl Default for ScanRequest { selection: Selection::default(), ordered: false, limit: None, + target_output_rows: None, + target_output_bytes: None, } } } diff --git a/vortex-scan/src/layout.rs b/vortex-scan/src/layout.rs index e2f11b30201..c9dc5e07d2e 100644 --- a/vortex-scan/src/layout.rs +++ b/vortex-scan/src/layout.rs @@ -34,6 +34,8 @@ use vortex_session::VortexSession; use crate::ScanBuilder; use crate::Selection; +use crate::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT; +use crate::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT; use crate::api::DataSource; use crate::api::DataSourceScan; use crate::api::DataSourceScanRef; @@ -166,6 +168,8 @@ impl DataSource for LayoutReaderDataSource { limit: scan_request.limit, selection: scan_request.selection, ordered: scan_request.ordered, + target_output_rows: scan_request.target_output_rows, + target_output_bytes: scan_request.target_output_bytes, metrics_registry: self.metrics_registry.clone(), next_row: row_range.start, end_row: row_range.end, @@ -187,6 +191,8 @@ struct LayoutReaderScan { limit: Option, ordered: bool, selection: Selection, + target_output_rows: Option, + target_output_bytes: Option, metrics_registry: Option>, next_row: u64, end_row: u64, @@ -254,6 +260,8 @@ impl Stream for LayoutReaderScan { ordered: this.ordered, row_range, selection: this.selection.clone(), + target_output_rows: this.target_output_rows, + target_output_bytes: this.target_output_bytes, metrics_registry: this.metrics_registry.clone(), }) as PartitionRef; @@ -281,6 +289,8 @@ struct LayoutReaderSplit { ordered: bool, row_range: Range, selection: Selection, + target_output_rows: Option, + target_output_bytes: Option, metrics_registry: Option>, } @@ -312,8 +322,17 @@ impl Partition for LayoutReaderSplit { .with_projection(self.projection) .with_some_filter(self.filter) .with_some_limit(self.limit) - .with_some_metrics_registry(self.metrics_registry) .with_ordered(self.ordered); + let builder = builder + .with_target_output_rows( + self.target_output_rows + .unwrap_or(DEFAULT_TARGET_OUTPUT_ROWS_HINT), + ) + .with_target_output_bytes( + self.target_output_bytes + .unwrap_or(DEFAULT_TARGET_OUTPUT_BYTES_HINT), + ); + let builder = builder.with_some_metrics_registry(self.metrics_registry); let dtype = builder.dtype()?; // Use into_stream() which creates a LazyScanStream that spawns individual I/O diff --git a/vortex-scan/src/multi.rs b/vortex-scan/src/multi.rs index df405011b6b..4a20d7fe12f 100644 --- a/vortex-scan/src/multi.rs +++ b/vortex-scan/src/multi.rs @@ -48,6 +48,8 @@ use vortex_mask::Mask; use vortex_session::VortexSession; use crate::ScanBuilder; +use crate::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT; +use crate::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT; use crate::api::DataSource; use crate::api::DataSourceScan; use crate::api::DataSourceScanRef; @@ -397,7 +399,17 @@ impl Partition for MultiLayoutPartition { .with_projection(request.projection) .with_some_filter(request.filter) .with_some_limit(request.limit) - .with_ordered(request.ordered); + .with_ordered(request.ordered) + .with_target_output_rows( + request + .target_output_rows + .unwrap_or(DEFAULT_TARGET_OUTPUT_ROWS_HINT), + ) + .with_target_output_bytes( + request + .target_output_bytes + .unwrap_or(DEFAULT_TARGET_OUTPUT_BYTES_HINT), + ); if let Some(row_range) = request.row_range { builder = builder.with_row_range(row_range); diff --git a/vortex-scan/src/repeated_scan.rs b/vortex-scan/src/repeated_scan.rs index f87849e605a..dd165bd0c14 100644 --- a/vortex-scan/src/repeated_scan.rs +++ b/vortex-scan/src/repeated_scan.rs @@ -2,23 +2,31 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::cmp; -use std::iter; +use std::collections::BTreeMap; +use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; +use std::task::Context; +use std::task::Poll; -use futures::Stream; +use futures::StreamExt; use futures::future::BoxFuture; -use itertools::Either; -use itertools::Itertools; +use futures::stream; +use futures::stream::BoxStream; +use futures::stream::FuturesUnordered; use vortex_array::ArrayRef; use vortex_array::dtype::DType; +use vortex_array::dtype::FieldMask; use vortex_array::expr::Expression; use vortex_array::iter::ArrayIterator; use vortex_array::iter::ArrayIteratorAdapter; use vortex_array::stream::ArrayStream; use vortex_array::stream::ArrayStreamAdapter; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_io::runtime::BlockingRuntime; +use vortex_io::runtime::Handle; +use vortex_io::runtime::Task; use vortex_io::session::RuntimeSessionExt; use vortex_layout::LayoutReaderRef; use vortex_session::VortexSession; @@ -26,9 +34,27 @@ use vortex_session::VortexSession; use crate::filter::FilterExpr; use crate::selection::Selection; use crate::splits::Splits; +use crate::tasks::FilteredSplit; use crate::tasks::TaskContext; +use crate::tasks::filter_split; +use crate::tasks::project_filtered_split; use crate::tasks::split_exec; +const ADAPTIVE_SELECTIVITY_SAMPLE_SPLITS: usize = 4; +const HIGH_SURVIVOR_RATIO: f64 = 0.75; +const MAX_PROJECTION_SUBSPLITS_PER_SPLIT: usize = 8; +const IMMEDIATE_PROJECTION_FILTER_AHEAD_MULTIPLIER: usize = 2; + +fn should_prefer_immediate_projection( + observed_filter_splits: usize, + observed_filter_rows: usize, + observed_surviving_rows: usize, +) -> bool { + observed_filter_splits >= ADAPTIVE_SELECTIVITY_SAMPLE_SPLITS + && observed_filter_rows > 0 + && (observed_surviving_rows as f64 / observed_filter_rows as f64) >= HIGH_SURVIVOR_RATIO +} + /// A projected subset (by indices, range, and filter) of rows from a Vortex data source. /// /// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this @@ -53,6 +79,13 @@ pub struct RepeatedScan { limit: Option, /// The dtype of the projected arrays. dtype: DType, + projection_field_mask: Vec, + /// Preferred lower bound for rows accumulated before starting projection. + target_output_rows: usize, + /// Preferred lower bound for projected payload bytes accumulated before starting projection. + target_output_bytes: usize, + /// Estimated incremental projection payload bytes per surviving row. + projection_row_cost_bytes: usize, } impl RepeatedScan { @@ -66,7 +99,7 @@ impl RepeatedScan { runtime: &B, ) -> VortexResult { let dtype = self.dtype.clone(); - let stream = self.execute_stream(row_range)?; + let stream = self.default_stream(row_range)?; let iter = runtime.block_on_stream(stream); Ok(ArrayIteratorAdapter::new(dtype, iter)) } @@ -76,12 +109,95 @@ impl RepeatedScan { row_range: Option>, ) -> VortexResult { let dtype = self.dtype.clone(); - let stream = self.execute_stream(row_range)?; + let stream = self.default_stream(row_range)?; Ok(ArrayStreamAdapter::new(dtype, stream)) } } impl RepeatedScan { + fn task_context(&self) -> Arc> { + Arc::new(TaskContext { + selection: self.selection.clone(), + filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))), + reader: self.layout_reader.clone(), + projection: self.projection.clone(), + projection_field_mask: self.projection_field_mask.clone(), + mapper: self.map_fn.clone(), + }) + } + + fn effective_row_range(&self, row_range: Option>) -> Option> { + intersect_ranges(self.row_range.as_ref(), row_range) + } + + fn split_ranges( + &self, + row_range: Option>, + ) -> VortexResult> + Send>> { + let Some(row_range) = self + .effective_row_range(row_range) + .or_else(|| Some(0..self.layout_reader.row_count())) + else { + return Ok(Box::new(std::iter::empty())); + }; + + self.splits.iter(self.layout_reader.as_ref(), row_range) + } + + fn default_stream( + &self, + row_range: Option>, + ) -> VortexResult>> { + let num_workers = std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1); + let concurrency = self.concurrency * num_workers; + self.execute_stream(row_range, concurrency, self.ordered, self.session.handle()) + } + + fn legacy_stream_from_ranges( + &self, + ctx: Arc>, + split_ranges: Vec>, + concurrency: usize, + ordered: bool, + handle: Handle, + ) -> VortexResult>> { + let mut limit = self.limit; + let mut tasks = Vec::with_capacity(split_ranges.len()); + + for range in split_ranges { + if range.start >= range.end { + continue; + } + + if limit.is_some_and(|value| value == 0) { + break; + } + + tasks.push(split_exec(ctx.clone(), range, limit.as_mut())?); + } + + let spawned = tasks.into_iter().map(move |task| handle.spawn(task)); + let stream = if ordered { + stream::iter(spawned).buffered(concurrency).left_stream() + } else { + stream::iter(spawned) + .buffer_unordered(concurrency) + .right_stream() + }; + + Ok(stream + .filter_map(|result| async move { + match result { + Ok(Some(value)) => Some(Ok(value)), + Ok(None) => None, + Err(err) => Some(Err(err)), + } + }) + .boxed()) + } + /// Constructor just to allow `scan_builder` to create a `RepeatedScan`. #[expect( clippy::too_many_arguments, @@ -100,6 +216,10 @@ impl RepeatedScan { map_fn: Arc VortexResult + Send + Sync>, limit: Option, dtype: DType, + projection_field_mask: Vec, + target_output_rows: usize, + target_output_bytes: usize, + projection_row_cost_bytes: usize, ) -> Self { Self { session, @@ -114,6 +234,10 @@ impl RepeatedScan { map_fn, limit, dtype, + projection_field_mask, + target_output_rows, + target_output_bytes, + projection_row_cost_bytes, } } @@ -121,49 +245,8 @@ impl RepeatedScan { &self, row_range: Option>, ) -> VortexResult>>>> { - let ctx = Arc::new(TaskContext { - selection: self.selection.clone(), - filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))), - reader: self.layout_reader.clone(), - projection: self.projection.clone(), - mapper: self.map_fn.clone(), - }); - - let row_range = intersect_ranges(self.row_range.as_ref(), row_range); - - let ranges = match &self.splits { - Splits::Natural(btree_set) => { - let splits_iter = match row_range { - None => Either::Left(btree_set.iter().copied()), - Some(range) => { - if range.is_empty() { - return Ok(Vec::new()); - } - Either::Right( - iter::once(range.start) - .chain(btree_set.range(range.clone()).copied()) - .chain(iter::once(range.end)), - ) - } - }; - - Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end)) - } - Splits::Ranges(ranges) => Either::Right(match row_range { - None => Either::Left(ranges.iter().cloned()), - Some(range) => { - if range.is_empty() { - return Ok(Vec::new()); - } - Either::Right(ranges.iter().filter_map(move |r| { - let start = cmp::max(r.start, range.start); - let end = cmp::min(r.end, range.end); - (start < end).then_some(start..end) - })) - } - }), - }; - + let ctx = self.task_context(); + let ranges = self.split_ranges(row_range)?; let mut limit = self.limit; let mut tasks = Vec::new(); @@ -182,27 +265,590 @@ impl RepeatedScan { Ok(tasks) } - pub fn execute_stream( + pub(crate) fn execute_stream( &self, row_range: Option>, - ) -> VortexResult> + Send + 'static + use> { - use futures::StreamExt; - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - let concurrency = self.concurrency * num_workers; - let handle = self.session.handle(); + concurrency: usize, + ordered: bool, + handle: Handle, + ) -> VortexResult>> { + let ctx = self.task_context(); + let concurrency = concurrency.max(1); + let filter_ahead = filter_ahead_for(concurrency, self.filter.is_some()); + let mut split_ranges = self.split_ranges(row_range)?; + let mut prefetched_ranges = Vec::with_capacity(filter_ahead.saturating_add(1)); + let mut split_ranges_exhausted = true; + + while prefetched_ranges.len() <= filter_ahead { + let Some(range) = split_ranges.next() else { + break; + }; + if range.start >= range.end { + continue; + } + prefetched_ranges.push(range); + if prefetched_ranges.len() > filter_ahead { + split_ranges_exhausted = false; + break; + } + } + + if should_fallback_to_legacy_stream( + prefetched_ranges.len(), + split_ranges_exhausted, + filter_ahead, + ) { + return self.legacy_stream_from_ranges( + ctx, + prefetched_ranges, + concurrency, + ordered, + handle, + ); + } + + let split_ranges = Box::new(prefetched_ranges.into_iter().chain(split_ranges)) + as Box> + Send>; + let mut staged = StagedSplitStream::new( + ctx, + split_ranges, + self.limit, + concurrency, + ordered, + handle, + self.filter.is_some(), + self.target_output_rows, + self.target_output_bytes, + self.projection_row_cost_bytes, + ); + + Ok(stream::poll_fn(move |cx| staged.poll_next(cx)).boxed()) + } +} + +/// Two-phase concurrent split processor. +/// +/// Splits flow through a pipeline: +/// +/// split_ranges → filter.in_flight → filter.ready → projection.in_flight → emit +/// +/// Filter tasks run ahead (up to `filter_ahead` splits) to discover which rows +/// survive before committing to projection IO. Projection starts once enough +/// filtered rows accumulate (by row count or byte estimate) or the filter +/// frontier is full. A fraction of the concurrency budget is reserved for filter +/// tasks to keep the pipeline fed. +struct StagedSplitStream { + ctx: Arc>, + split_ranges: Box> + Send>, + limit: Option, + concurrency: usize, + handle: Handle, + filter_enabled: bool, + filter_ahead: usize, + split_ranges_exhausted: bool, + next_split_idx: usize, + prefer_immediate_projection: bool, + observed_filter_splits: usize, + observed_filter_rows: usize, + observed_surviving_rows: usize, + filter: FilterQueue, + projection: ProjectionQueue, + emit: EmitQueue, +} + +type FilterTaskResult = (usize, usize, VortexResult>); +type ProjectionTaskResult = (usize, usize, usize, VortexResult); + +struct FilterQueue { + in_flight: FuturesUnordered>, + ready: BTreeMap, + ready_rows: usize, + ready_projection_bytes: usize, +} + +impl FilterQueue { + fn frontier_len(&self) -> usize { + self.in_flight.len() + self.ready.len() + } + + fn push_ready(&mut self, idx: usize, filtered: FilteredSplit, row_cost_bytes: usize) { + let rows = filtered.mask.true_count(); + self.ready_rows += rows; + self.ready_projection_bytes = self + .ready_projection_bytes + .saturating_add(rows.saturating_mul(row_cost_bytes)); + self.ready.insert(idx, filtered); + } + + fn take_ready(&mut self, row_cost_bytes: usize) -> Option<(usize, FilteredSplit)> { + let (idx, filtered) = self.ready.pop_first()?; + let rows = filtered.mask.true_count(); + self.ready_rows = self.ready_rows.saturating_sub(rows); + self.ready_projection_bytes = self + .ready_projection_bytes + .saturating_sub(rows.saturating_mul(row_cost_bytes)); + Some((idx, filtered)) + } +} + +struct ProjectionQueue { + in_flight: FuturesUnordered>>, + target_rows: usize, + target_bytes: usize, + row_cost_bytes: usize, + subsplit_rows: usize, +} - let stream = - futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task)); +struct EmitQueue { + ordered: bool, + next_split_idx: usize, + next_part_idx: usize, + unordered: VecDeque>, + ordered_map: BTreeMap>, +} - let stream = if self.ordered { - stream.buffered(concurrency).boxed() +struct OrderedSplitOutput { + total_parts: usize, + parts: BTreeMap>, +} + +impl EmitQueue { + fn queue(&mut self, idx: usize, value: Option>) { + self.queue_split(idx, value.into_iter().collect()); + } + + fn queue_split(&mut self, idx: usize, values: Vec>) { + if self.ordered { + let parts: BTreeMap> = values.into_iter().enumerate().collect(); + self.ordered_map.insert( + idx, + OrderedSplitOutput { + total_parts: parts.len(), + parts, + }, + ); } else { - stream.buffer_unordered(concurrency).boxed() - }; + self.unordered.extend(values); + } + } + + fn queue_part( + &mut self, + idx: usize, + part_idx: usize, + total_parts: usize, + value: VortexResult, + ) { + if self.ordered { + let split = self + .ordered_map + .entry(idx) + .or_insert_with(|| OrderedSplitOutput { + total_parts, + parts: BTreeMap::new(), + }); + split.total_parts = total_parts; + split.parts.insert(part_idx, value); + } else { + self.unordered.push_back(value); + } + } + + fn pop(&mut self) -> Option> { + if self.ordered { + loop { + let current = self.ordered_map.get_mut(&self.next_split_idx)?; + if self.next_part_idx < current.total_parts { + if let Some(value) = current.parts.remove(&self.next_part_idx) { + self.next_part_idx += 1; + return Some(value); + } + return None; + } + + self.ordered_map.remove(&self.next_split_idx); + self.next_split_idx += 1; + self.next_part_idx = 0; + } + } else { + self.unordered.pop_front() + } + } + + fn is_empty(&self) -> bool { + self.unordered.is_empty() && self.ordered_map.is_empty() + } +} + +impl StagedSplitStream { + #[expect( + clippy::too_many_arguments, + reason = "all arguments are part of staged scheduler construction" + )] + fn new( + ctx: Arc>, + split_ranges: Box> + Send>, + limit: Option, + concurrency: usize, + ordered: bool, + handle: Handle, + filter_enabled: bool, + target_output_rows: usize, + target_output_bytes: usize, + projection_row_cost_bytes: usize, + ) -> Self { + let concurrency = concurrency.max(1); + let filter_ahead = filter_ahead_for(concurrency, filter_enabled); + + Self { + ctx, + split_ranges, + limit, + concurrency, + handle, + filter_enabled, + filter_ahead, + split_ranges_exhausted: false, + next_split_idx: 0, + prefer_immediate_projection: false, + observed_filter_splits: 0, + observed_filter_rows: 0, + observed_surviving_rows: 0, + filter: FilterQueue { + in_flight: FuturesUnordered::new(), + ready: BTreeMap::new(), + ready_rows: 0, + ready_projection_bytes: 0, + }, + projection: ProjectionQueue { + in_flight: FuturesUnordered::new(), + target_rows: target_output_rows.max(1), + target_bytes: target_output_bytes, + row_cost_bytes: projection_row_cost_bytes, + subsplit_rows: target_output_rows.div_ceil(4).max(1), + }, + emit: EmitQueue { + ordered, + next_split_idx: 0, + next_part_idx: 0, + unordered: VecDeque::new(), + ordered_map: BTreeMap::new(), + }, + } + } + + fn effective_filter_ahead(&self) -> usize { + if self.prefer_immediate_projection { + immediate_projection_filter_ahead(self.filter_ahead, self.concurrency) + } else { + self.filter_ahead + } + } + + fn record_filter_observation(&mut self, candidate_rows: usize, surviving_rows: usize) { + self.observed_filter_splits = self.observed_filter_splits.saturating_add(1); + self.observed_filter_rows = self.observed_filter_rows.saturating_add(candidate_rows); + self.observed_surviving_rows = self.observed_surviving_rows.saturating_add(surviving_rows); + + if self.prefer_immediate_projection || !self.filter_enabled { + return; + } + + if should_prefer_immediate_projection( + self.observed_filter_splits, + self.observed_filter_rows, + self.observed_surviving_rows, + ) { + self.prefer_immediate_projection = true; + } + } + + /// Compute how many projection tasks can be spawned right now. + /// + /// Reserves ceil(concurrency/4) slots for filter tasks while the filter frontier + /// is below the lookahead threshold, to keep the pipeline fed. + fn available_projection_slots(&self) -> usize { + let in_flight = self.filter.in_flight.len() + self.projection.in_flight.len(); + let available = self.concurrency.saturating_sub(in_flight); + + let needs_reserve = !self.prefer_immediate_projection + && self.filter_enabled + && !self.split_ranges_exhausted + && self.filter.frontier_len() < self.effective_filter_ahead(); + if !needs_reserve { + return available; + } + let reserved = self + .concurrency + .div_ceil(4) + .saturating_sub(self.filter.in_flight.len()); + available.saturating_sub(reserved) + } + + fn should_start_projection(&self) -> bool { + if self.filter.ready.is_empty() { + return false; + } + + if !self.filter_enabled { + return true; + } + + if self.prefer_immediate_projection { + return true; + } + + if self.filter.ready_rows >= self.projection.target_rows { + return true; + } + + if self.filter.ready_projection_bytes >= self.projection.target_bytes { + return true; + } + + if self.filter.frontier_len() >= self.effective_filter_ahead() { + return true; + } + + self.split_ranges_exhausted && self.filter.in_flight.is_empty() + } + + fn spawn_projection_tasks(&mut self) -> bool { + let mut progress = false; + while self.available_projection_slots() > 0 && self.should_start_projection() { + let Some((idx, filtered)) = self.filter.take_ready(self.projection.row_cost_bytes) + else { + break; + }; + let parts = split_filtered_for_projection(filtered, self.projection.subsplit_rows); + let total_parts = parts.len(); + + for (part_idx, part) in parts.into_iter().enumerate() { + progress |= self.spawn_projection_part(idx, part_idx, total_parts, part); + } + } + progress + } + + fn spawn_projection_part( + &mut self, + idx: usize, + part_idx: usize, + total_parts: usize, + filtered: FilteredSplit, + ) -> bool { + match project_filtered_split(self.ctx.clone(), filtered) { + Ok(task) => { + self.projection.in_flight.push( + self.handle + .spawn(async move { (idx, part_idx, total_parts, task.await) }), + ); + true + } + Err(err) => { + self.emit.queue_part(idx, part_idx, total_parts, Err(err)); + true + } + } + } + + fn spawn_filter_tasks(&mut self) -> bool { + let mut progress = false; + + while !self.split_ranges_exhausted + && self.filter.frontier_len() < self.effective_filter_ahead() + && self.filter.in_flight.len() + self.projection.in_flight.len() < self.concurrency + { + let Some(range) = self.split_ranges.next() else { + self.split_ranges_exhausted = true; + break; + }; + + if range.start >= range.end { + continue; + } + + if self.limit.is_some_and(|value| value == 0) { + self.split_ranges_exhausted = true; + break; + } + + let idx = self.next_split_idx; + self.next_split_idx += 1; + let split_rows = + usize::try_from(range.end.saturating_sub(range.start)).unwrap_or(usize::MAX); + + match filter_split(self.ctx.clone(), range, self.limit.as_mut()) { + Ok(task) => { + self.filter.in_flight.push( + self.handle + .spawn(async move { (idx, split_rows, task.await) }), + ); + } + Err(err) => self.emit.queue(idx, Some(Err(err))), + } + progress = true; + } + + progress + } + + fn poll_projection_completions(&mut self, cx: &mut Context<'_>) -> bool { + let mut progress = false; + while let Poll::Ready(Some((idx, part_idx, total_parts, value))) = + self.projection.in_flight.poll_next_unpin(cx) + { + self.emit.queue_part(idx, part_idx, total_parts, value); + progress = true; + } + progress + } + + fn poll_filter_completions(&mut self, cx: &mut Context<'_>) -> bool { + let mut progress = false; + while let Poll::Ready(Some(result)) = self.filter.in_flight.poll_next_unpin(cx) { + match result { + (idx, split_rows, Ok(Some(filtered))) => { + self.record_filter_observation(split_rows, filtered.mask.true_count()); + self.filter + .push_ready(idx, filtered, self.projection.row_cost_bytes); + } + (idx, split_rows, Ok(None)) => { + self.record_filter_observation(split_rows, 0); + self.emit.queue(idx, None); + } + (idx, split_rows, Err(err)) => { + self.record_filter_observation(split_rows, 0); + self.emit.queue(idx, Some(Err(err))); + } + } + progress = true; + } + progress + } + + fn is_finished(&self) -> bool { + self.split_ranges_exhausted + && self.filter.in_flight.is_empty() + && self.filter.ready.is_empty() + && self.projection.in_flight.is_empty() + && self.emit.is_empty() + } + + fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + // Each step below can produce ready items: poll_* via completed futures, and + // spawn_* via synchronous error paths. We check emit.pop() after every step + // so we yield results as early as possible rather than doing unnecessary work. + loop { + if let Some(value) = self.emit.pop() { + return Poll::Ready(Some(value)); + } + + let mut progress = false; + progress |= self.poll_projection_completions(cx); + if let Some(value) = self.emit.pop() { + return Poll::Ready(Some(value)); + } + + progress |= self.poll_filter_completions(cx); + if let Some(value) = self.emit.pop() { + return Poll::Ready(Some(value)); + } + + progress |= self.spawn_projection_tasks(); + if let Some(value) = self.emit.pop() { + return Poll::Ready(Some(value)); + } + + progress |= self.spawn_filter_tasks(); + if let Some(value) = self.emit.pop() { + return Poll::Ready(Some(value)); + } + + if self.is_finished() { + return Poll::Ready(None); + } + + if !progress { + return Poll::Pending; + } + } + } +} + +fn filter_ahead_for(concurrency: usize, filter_enabled: bool) -> usize { + if filter_enabled { + concurrency.clamp(4, 16) + } else { + concurrency + } +} + +fn should_fallback_to_legacy_stream( + prefetched_split_count: usize, + split_ranges_exhausted: bool, + filter_ahead: usize, +) -> bool { + split_ranges_exhausted && prefetched_split_count <= filter_ahead +} + +fn immediate_projection_filter_ahead(filter_ahead: usize, concurrency: usize) -> usize { + filter_ahead + .saturating_mul(IMMEDIATE_PROJECTION_FILTER_AHEAD_MULTIPLIER) + .min(concurrency) + .max(1) +} + +fn split_filtered_for_projection( + filtered: FilteredSplit, + _target_rows: usize, +) -> Vec { + if filtered.projection_ranges.len() <= 1 || filtered.mask.true_count() == 0 { + return vec![filtered]; + } + + let FilteredSplit { + row_range, + mask, + projection_ranges, + } = filtered; + let group_size = projection_ranges + .len() + .div_ceil(MAX_PROJECTION_SUBSPLITS_PER_SPLIT) + .max(1); + let mut parts = Vec::with_capacity(projection_ranges.len().div_ceil(group_size)); + for grouped_ranges in projection_ranges.chunks(group_size) { + let part_row_range = grouped_ranges + .first() + .vortex_expect("grouped projection ranges are non-empty") + .start + ..grouped_ranges + .last() + .vortex_expect("grouped projection ranges are non-empty") + .end; + let start_offset = usize::try_from(part_row_range.start.saturating_sub(row_range.start)) + .unwrap_or(usize::MAX); + let end_offset = usize::try_from(part_row_range.end.saturating_sub(row_range.start)) + .unwrap_or(usize::MAX); + let part_mask = mask.slice(start_offset..end_offset); + if part_mask.all_false() { + continue; + } + parts.push(FilteredSplit { + row_range: part_row_range.clone(), + mask: part_mask, + projection_ranges: vec![part_row_range], + }); + } - Ok(stream.filter_map(|chunk| async move { chunk.transpose() })) + if parts.is_empty() { + vec![FilteredSplit { + row_range: row_range.clone(), + mask, + projection_ranges: vec![row_range], + }] + } else { + parts } } @@ -214,3 +860,179 @@ fn intersect_ranges(left: Option<&Range>, right: Option>) -> Opt (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)), } } + +#[cfg(test)] +mod test { + use std::collections::BTreeMap; + + use futures::stream::FuturesUnordered; + use vortex_mask::Mask; + + use crate::tasks::FilteredSplit; + + fn projection_slots_for( + concurrency: usize, + filter_enabled: bool, + split_ranges_exhausted: bool, + filter_ahead: usize, + filter_in_flight: usize, + filtered_ready: usize, + projection_in_flight: usize, + ) -> usize { + let available_slots = concurrency.saturating_sub(filter_in_flight + projection_in_flight); + if !filter_enabled + || split_ranges_exhausted + || filter_in_flight + filtered_ready >= filter_ahead + { + return available_slots; + } + + let desired_filter_slots = concurrency.div_ceil(4); + let reserved_filter_slots = desired_filter_slots.saturating_sub(filter_in_flight); + available_slots.saturating_sub(reserved_filter_slots) + } + + #[test] + fn projection_reserves_capacity_for_filter_frontier() { + assert_eq!(projection_slots_for(8, true, false, 8, 0, 0, 6), 0); + assert_eq!(projection_slots_for(8, true, false, 8, 1, 0, 5), 1); + assert_eq!(projection_slots_for(8, true, false, 8, 2, 0, 4), 2); + } + + #[test] + fn projection_uses_all_slots_when_filter_no_longer_needs_reserve() { + assert_eq!(projection_slots_for(8, false, false, 8, 0, 0, 6), 2); + assert_eq!(projection_slots_for(8, true, true, 8, 0, 0, 6), 2); + assert_eq!(projection_slots_for(8, true, false, 2, 0, 2, 6), 2); + } + + #[test] + fn small_split_sets_fallback_to_legacy_stream() { + assert!(super::should_fallback_to_legacy_stream(0, true, 4)); + assert!(super::should_fallback_to_legacy_stream(4, true, 4)); + assert!(!super::should_fallback_to_legacy_stream(5, true, 4)); + assert!(!super::should_fallback_to_legacy_stream(4, false, 4)); + } + + #[test] + fn high_survivor_ratio_prefers_immediate_projection() { + assert!(!super::should_prefer_immediate_projection(3, 300, 300)); + assert!(super::should_prefer_immediate_projection(4, 400, 400)); + assert!(super::should_prefer_immediate_projection(4, 400, 320)); + } + + #[test] + fn immediate_projection_softens_filter_ahead_expansion() { + assert_eq!(super::immediate_projection_filter_ahead(16, 32), 32); + assert_eq!(super::immediate_projection_filter_ahead(8, 12), 12); + assert_eq!(super::immediate_projection_filter_ahead(0, 0), 1); + } + + #[test] + fn low_survivor_ratio_keeps_filter_ahead() { + assert!(!super::should_prefer_immediate_projection(4, 400, 200)); + assert!(!super::should_prefer_immediate_projection(8, 800, 300)); + } + + #[test] + fn projection_splitter_subdivides_dense_masks() { + let parts = super::split_filtered_for_projection( + FilteredSplit { + row_range: 100..164, + mask: Mask::new_true(64), + projection_ranges: vec![100..116, 116..132, 132..148, 148..164], + }, + 16, + ); + + assert_eq!(parts.len(), 4); + assert_eq!(parts[0].row_range, 100..116); + assert_eq!(parts[1].row_range, 116..132); + assert_eq!(parts[2].row_range, 132..148); + assert_eq!(parts[3].row_range, 148..164); + assert!(parts.iter().all(|part| part.mask.all_true())); + } + + #[test] + fn projection_splitter_subdivides_by_surviving_rows() { + let parts = super::split_filtered_for_projection( + FilteredSplit { + row_range: 200..216, + mask: Mask::from_indices(16, vec![1, 2, 3, 10, 12, 15]), + projection_ranges: vec![200..203, 203..212, 212..216], + }, + 2, + ); + + assert_eq!(parts.len(), 3); + assert_eq!(parts[0].row_range, 200..203); + assert_eq!(parts[0].mask.true_count(), 2); + assert_eq!(parts[1].row_range, 203..212); + assert_eq!(parts[1].mask.true_count(), 2); + assert_eq!(parts[2].row_range, 212..216); + assert_eq!(parts[2].mask.true_count(), 2); + } + + #[test] + fn projection_splitter_keeps_all_ranges_when_capped() { + let projection_ranges = (0..10) + .map(|idx| { + let start = idx * 10; + start..start + 10 + }) + .collect::>(); + let parts = super::split_filtered_for_projection( + FilteredSplit { + row_range: 0..100, + mask: Mask::new_true(100), + projection_ranges, + }, + 10, + ); + + assert!(parts.len() <= super::MAX_PROJECTION_SUBSPLITS_PER_SPLIT); + assert_eq!(parts.first().expect("at least one part").row_range.start, 0); + assert_eq!(parts.last().expect("at least one part").row_range.end, 100); + assert_eq!( + parts + .iter() + .map(|part| part.mask.true_count()) + .sum::(), + 100 + ); + } + + #[test] + fn filter_queue_take_ready_pops_first_split() { + let mut filter = super::FilterQueue { + in_flight: FuturesUnordered::new(), + ready: BTreeMap::from([ + ( + 0, + FilteredSplit { + row_range: 0..10, + mask: Mask::new_true(10), + projection_ranges: std::iter::once(0..10).collect(), + }, + ), + ( + 1, + FilteredSplit { + row_range: 10..20, + mask: Mask::new_true(10), + projection_ranges: std::iter::once(10..20).collect(), + }, + ), + ]), + ready_rows: 20, + ready_projection_bytes: 20, + }; + + let (idx, filtered) = filter.take_ready(1).expect("expected one split"); + assert_eq!(idx, 0); + assert_eq!(filtered.row_range, 0..10); + assert_eq!(filter.ready.keys().copied().collect::>(), vec![1]); + assert_eq!(filter.ready_rows, 10); + assert_eq!(filter.ready_projection_bytes, 10); + } +} diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index 39510453004..f46cde87276 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -9,7 +9,6 @@ use std::task::Poll; use std::task::ready; use futures::Stream; -use futures::StreamExt; use futures::future::BoxFuture; use futures::stream::BoxStream; use itertools::Itertools; @@ -32,7 +31,6 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_io::runtime::BlockingRuntime; -use vortex_io::runtime::Handle; use vortex_io::runtime::Task; use vortex_io::session::RuntimeSessionExt; use vortex_layout::LayoutReader; @@ -42,6 +40,7 @@ use vortex_metrics::MetricsRegistry; use vortex_session::VortexSession; use crate::RepeatedScan; +use crate::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT; use crate::selection::Selection; use crate::split_by::SplitBy; use crate::splits::Splits; @@ -74,8 +73,18 @@ pub struct ScanBuilder { /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression, /// but not by the scan [`Selection`] which remains relative. row_offset: u64, + /// Preferred lower bound for rows accumulated before materializing projected data. + target_output_rows: Option, + /// Preferred lower bound for projected payload bytes accumulated before materialization. + target_output_bytes: Option, } +const DEFAULT_TARGET_OUTPUT_ROWS: usize = DEFAULT_TARGET_OUTPUT_ROWS_HINT; +const MIN_TARGET_OUTPUT_BYTES: usize = 1 << 20; +const MAX_TARGET_OUTPUT_BYTES: usize = 8 << 20; +const DEFAULT_VARIABLE_ROW_BYTES: usize = 32; +const NESTED_PROJECTION_TARGET_MULTIPLIER: usize = 2; + impl ScanBuilder { pub fn new(session: VortexSession, layout_reader: Arc) -> Self { Self { @@ -95,6 +104,8 @@ impl ScanBuilder { file_stats: None, limit: None, row_offset: 0, + target_output_rows: None, + target_output_bytes: None, } } @@ -233,10 +244,23 @@ impl ScanBuilder { file_stats: self.file_stats, limit: self.limit, row_offset: self.row_offset, + target_output_rows: self.target_output_rows, + target_output_bytes: self.target_output_bytes, map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)), } } + pub fn with_target_output_rows(mut self, target_output_rows: usize) -> Self { + assert!(target_output_rows > 0); + self.target_output_rows = Some(target_output_rows); + self + } + + pub fn with_target_output_bytes(mut self, target_output_bytes: usize) -> Self { + self.target_output_bytes = Some(target_output_bytes); + self + } + pub fn prepare(self) -> VortexResult> { let dtype = self.dtype()?; @@ -265,24 +289,38 @@ impl ScanBuilder { .map(|f| f.optimize_recursive(layout_reader.dtype())) .transpose()?; - // Construct field masks and compute the row splits of the scan. - let (filter_mask, projection_mask) = + // Construct field masks and defer split discovery until execution so scans can start + // producing work before we've traversed every touched layout. + let (filter_mask, projection_mask, projection_only_mask) = filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?; - let field_mask: Vec<_> = [filter_mask, projection_mask].concat(); + let split_field_mask = if filter_mask.is_empty() { + projection_only_mask.clone() + } else { + filter_mask + }; + let mut target_output_rows = self + .target_output_rows + .unwrap_or(DEFAULT_TARGET_OUTPUT_ROWS); + let projection_row_cost_bytes = + estimate_field_mask_row_bytes(layout_reader.dtype(), &projection_only_mask); + let mut target_output_bytes = self.target_output_bytes.unwrap_or_else(|| { + default_target_output_bytes(target_output_rows, projection_row_cost_bytes) + }); + if filter.is_some() && contains_nested_projection_dtype(&dtype) { + target_output_rows = + target_output_rows.saturating_mul(NESTED_PROJECTION_TARGET_MULTIPLIER); + target_output_bytes = + target_output_bytes.saturating_mul(NESTED_PROJECTION_TARGET_MULTIPLIER); + } let splits = if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) { Splits::Ranges(ranges) } else { - let split_range = self - .row_range - .clone() - .unwrap_or_else(|| 0..layout_reader.row_count()); - Splits::Natural(self.split_by.splits( - layout_reader.as_ref(), - &split_range, - &field_mask, - )?) + Splits::Natural { + split_by: self.split_by, + field_mask: split_field_mask, + } }; Ok(RepeatedScan::new( @@ -298,6 +336,10 @@ impl ScanBuilder { self.map_fn, self.limit, dtype, + projection_mask, + target_output_rows, + target_output_bytes, + projection_row_cost_bytes, )) } @@ -330,20 +372,11 @@ impl ScanBuilder { enum LazyScanState { Builder(Option>>), - Preparing(PreparingScan), + Preparing(Task>>>), Stream(BoxStream<'static, VortexResult>), Error(Option), } -type PreparedScanTasks = Vec>>>; - -struct PreparingScan { - ordered: bool, - concurrency: usize, - handle: Handle, - task: Task>>, -} - struct LazyScanStream { state: LazyScanState, } @@ -372,37 +405,17 @@ impl Stream for LazyScanStream { .unwrap_or(1); let concurrency = builder.concurrency * num_workers; let handle = builder.session.handle(); + let execute_handle = handle.clone(); let task = handle.spawn_blocking(move || { - builder.prepare().and_then(|scan| scan.execute(None)) - }); - self.state = LazyScanState::Preparing(PreparingScan { - ordered, - concurrency, - handle, - task, + let scan = builder.prepare()?; + scan.execute_stream(None, concurrency, ordered, execute_handle) }); + self.state = LazyScanState::Preparing(task); } - LazyScanState::Preparing(preparing) => { - match ready!(Pin::new(&mut preparing.task).poll(cx)) { - Ok(tasks) => { - let ordered = preparing.ordered; - let concurrency = preparing.concurrency; - let handle = preparing.handle.clone(); - let stream = - futures::stream::iter(tasks).map(move |task| handle.spawn(task)); - let stream = if ordered { - stream.buffered(concurrency).boxed() - } else { - stream.buffer_unordered(concurrency).boxed() - }; - let stream = stream - .filter_map(|chunk| async move { chunk.transpose() }) - .boxed(); - self.state = LazyScanState::Stream(stream); - } - Err(err) => self.state = LazyScanState::Error(Some(err)), - } - } + LazyScanState::Preparing(task) => match ready!(Pin::new(task).poll(cx)) { + Ok(stream) => self.state = LazyScanState::Stream(stream), + Err(err) => self.state = LazyScanState::Error(Some(err)), + }, LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx), LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)), } @@ -417,19 +430,25 @@ pub(crate) fn filter_and_projection_masks( projection: &Expression, filter: Option<&Expression>, dtype: &DType, -) -> VortexResult<(Vec, Vec)> { +) -> VortexResult<(Vec, Vec, Vec)> { let Some(struct_dtype) = dtype.as_struct_fields_opt() else { return Ok(match filter { - Some(_) => (vec![FieldMask::All], vec![FieldMask::All]), - None => (Vec::new(), vec![FieldMask::All]), + Some(_) => ( + vec![FieldMask::All], + vec![FieldMask::All], + vec![FieldMask::All], + ), + None => (Vec::new(), vec![FieldMask::All], vec![FieldMask::All]), }); }; let projection_mask = immediate_scope_access(projection, struct_dtype); + let projection_mask_vec = projection_mask + .iter() + .cloned() + .map(to_field_mask) + .collect_vec(); Ok(match filter { - None => ( - Vec::new(), - projection_mask.into_iter().map(to_field_mask).collect_vec(), - ), + None => (Vec::new(), projection_mask_vec.clone(), projection_mask_vec), Some(f) => { let filter_mask = immediate_scope_access(f, struct_dtype); let only_projection_mask = projection_mask @@ -439,6 +458,7 @@ pub(crate) fn filter_and_projection_masks( .collect_vec(); ( filter_mask.into_iter().map(to_field_mask).collect_vec(), + projection_mask_vec, only_projection_mask, ) } @@ -449,6 +469,73 @@ fn to_field_mask(field: FieldName) -> FieldMask { FieldMask::Prefix(FieldPath::from(Field::Name(field))) } +fn default_target_output_bytes( + target_output_rows: usize, + projection_row_cost_bytes: usize, +) -> usize { + if projection_row_cost_bytes == 0 { + return 0; + } + + target_output_rows + .saturating_mul(projection_row_cost_bytes) + .clamp(MIN_TARGET_OUTPUT_BYTES, MAX_TARGET_OUTPUT_BYTES) +} + +fn estimate_field_mask_row_bytes(dtype: &DType, field_masks: &[FieldMask]) -> usize { + if field_masks.is_empty() { + return 0; + } + + if field_masks.iter().any(FieldMask::matches_all) { + return estimate_dtype_row_bytes(dtype); + } + + field_masks.iter().fold(0usize, |sum, mask| { + sum.saturating_add(estimate_single_mask_row_bytes(dtype, mask)) + }) +} + +fn estimate_single_mask_row_bytes(dtype: &DType, field_mask: &FieldMask) -> usize { + match field_mask { + FieldMask::All => estimate_dtype_row_bytes(dtype), + FieldMask::Prefix(path) | FieldMask::Exact(path) => { + if path.is_root() { + return estimate_dtype_row_bytes(dtype); + } + + path.resolve(dtype.clone()) + .map(|dtype| estimate_dtype_row_bytes(&dtype)) + .unwrap_or_else(|| estimate_dtype_row_bytes(dtype)) + } + } +} + +fn estimate_dtype_row_bytes(dtype: &DType) -> usize { + dtype.element_size().unwrap_or_else(|| match dtype { + DType::Struct(fields, _) => fields.fields().fold(0usize, |sum, dtype| { + sum.saturating_add(estimate_dtype_row_bytes(&dtype)) + }), + DType::List(elem_dtype, _) => { + DEFAULT_VARIABLE_ROW_BYTES.saturating_add(estimate_dtype_row_bytes(elem_dtype)) + } + DType::FixedSizeList(elem_dtype, list_size, _) => { + estimate_dtype_row_bytes(elem_dtype).saturating_mul(*list_size as usize) + } + DType::Extension(ext_dtype) => estimate_dtype_row_bytes(ext_dtype.storage_dtype()), + DType::Utf8(_) | DType::Binary(_) => DEFAULT_VARIABLE_ROW_BYTES, + _ => DEFAULT_VARIABLE_ROW_BYTES, + }) +} + +fn contains_nested_projection_dtype(dtype: &DType) -> bool { + match dtype { + DType::List(..) | DType::FixedSizeList(..) | DType::Struct(..) => true, + DType::Extension(ext_dtype) => contains_nested_projection_dtype(ext_dtype.storage_dtype()), + _ => false, + } +} + #[cfg(test)] mod test { use std::collections::BTreeSet; @@ -470,9 +557,15 @@ mod test { use vortex_array::arrays::PrimitiveArray; use vortex_array::dtype::DType; use vortex_array::dtype::FieldMask; + use vortex_array::dtype::FieldPath; use vortex_array::dtype::Nullability; use vortex_array::dtype::PType; + use vortex_array::dtype::StructFields; use vortex_array::expr::Expression; + use vortex_array::expr::col; + use vortex_array::expr::eq; + use vortex_array::expr::lit; + use vortex_array::expr::root; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_io::runtime::BlockingRuntime; @@ -482,6 +575,7 @@ mod test { use vortex_mask::Mask; use super::ScanBuilder; + use super::contains_nested_projection_dtype; #[derive(Debug)] struct CountingLayoutReader { @@ -568,6 +662,18 @@ mod test { assert_eq!(calls.load(Ordering::Relaxed), 0); } + #[test] + fn prepare_defers_split_discovery() { + let calls = Arc::new(AtomicUsize::new(0)); + let reader = Arc::new(CountingLayoutReader::new(calls.clone())); + + let session = crate::test::SCAN_SESSION.clone(); + + let _scan = ScanBuilder::new(session, reader).prepare().unwrap(); + + assert_eq!(calls.load(Ordering::Relaxed), 0); + } + #[derive(Debug)] struct SplittingLayoutReader { name: Arc, @@ -651,6 +757,85 @@ mod test { } } + #[derive(Debug)] + struct OrderedSkippingReader { + name: Arc, + dtype: DType, + row_count: u64, + } + + impl OrderedSkippingReader { + fn new() -> Self { + Self { + name: Arc::from("ordered-skipping"), + dtype: DType::Primitive(PType::I32, Nullability::NonNullable), + row_count: 4, + } + } + } + + impl LayoutReader for OrderedSkippingReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + _expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_start = row_range.start; + Ok(MaskFuture::new(mask.len(), async move { + let mask = mask.await?; + if row_start == 0 { + Ok(Mask::new_false(mask.len())) + } else { + Ok(mask) + } + })) + } + + fn projection_evaluation( + &self, + row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + let value = i32::try_from(row_range.start) + .map_err(|_| vortex_err!("row_range.start must fit in i32"))?; + let array = PrimitiveArray::from_iter([value]).into_array(); + Ok(Box::pin(async move { Ok(array) })) + } + } + #[test] fn into_stream_executes_after_prepare() -> VortexResult<()> { let calls = Arc::new(AtomicUsize::new(0)); @@ -667,12 +852,36 @@ mod test { values.push(chunk?.to_primitive().into_buffer::()[0]); } - assert_eq!(calls.load(Ordering::Relaxed), 1); + assert!(calls.load(Ordering::Relaxed) >= 1); assert_eq!(values.as_ref(), [0, 1, 2, 3]); Ok(()) } + #[test] + fn into_stream_ordered_advances_past_filtered_empty_split() -> VortexResult<()> { + let reader = Arc::new(OrderedSkippingReader::new()); + + let runtime = SingleThreadRuntime::default(); + let session = crate::test::session_with_handle(runtime.handle()); + + let stream = ScanBuilder::new(session, reader) + .with_split_by(crate::SplitBy::RowCount(1)) + .with_filter(eq(root(), lit(1i32))) + .into_stream() + .unwrap(); + let mut iter = runtime.block_on_stream(stream); + + let mut values = Vec::new(); + for chunk in &mut iter { + values.push(chunk?.to_primitive().into_buffer::()[0]); + } + + assert_eq!(values.as_slice(), [1, 2, 3]); + + Ok(()) + } + #[derive(Debug)] struct BlockingSplitsLayoutReader { name: Arc, @@ -755,7 +964,7 @@ mod test { let guard = gate.lock(); let calls = Arc::new(AtomicUsize::new(0)); - let reader = Arc::new(BlockingSplitsLayoutReader::new(gate.clone(), calls.clone())); + let reader = Arc::new(BlockingSplitsLayoutReader::new(gate.clone(), calls)); let runtime = SingleThreadRuntime::default(); let session = crate::test::session_with_handle(runtime.handle()); @@ -779,10 +988,267 @@ mod test { let polled_pending = polled_pending.expect("poll_next blocked; expected quick return"); assert!( polled_pending, - "expected Poll::Pending while prepare is blocked" + "expected Poll::Pending while stream construction is blocked" ); - assert_eq!(calls.load(Ordering::Relaxed), 0); drop(runtime); } + + #[test] + fn into_stream_second_poll_does_not_block_when_split_discovery_blocks() { + let gate = Arc::new(Mutex::new(())); + let guard = gate.lock(); + + let calls = Arc::new(AtomicUsize::new(0)); + let reader = Arc::new(BlockingSplitsLayoutReader::new(gate.clone(), calls)); + + let runtime = SingleThreadRuntime::default(); + let session = crate::test::session_with_handle(runtime.handle()); + + let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap(); + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + + assert!(matches!( + Pin::new(&mut stream).poll_next(&mut cx), + Poll::Pending + )); + + let (send, recv) = std::sync::mpsc::channel::(); + let join = std::thread::spawn(move || { + let waker = noop_waker_ref(); + let mut cx = Context::from_waker(waker); + let poll = Pin::new(&mut stream).poll_next(&mut cx); + let _ = send.send(matches!(poll, Poll::Pending)); + }); + + let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok(); + + drop(guard); + drop(join.join()); + + let polled_pending = + polled_pending.expect("second poll_next blocked; expected quick return"); + assert!( + polled_pending, + "expected Poll::Pending while split discovery is blocked in spawn_blocking" + ); + + drop(runtime); + } + + #[derive(Debug)] + struct RecordingSplitMaskReader { + name: Arc, + dtype: DType, + seen_masks: Arc>>>, + } + + impl RecordingSplitMaskReader { + fn new(seen_masks: Arc>>>) -> Self { + Self { + name: Arc::from("recording-split-mask"), + dtype: DType::Struct( + StructFields::from_iter([ + ("a", DType::Primitive(PType::I32, Nullability::NonNullable)), + ("b", DType::Primitive(PType::I32, Nullability::NonNullable)), + ]), + Nullability::NonNullable, + ), + seen_masks, + } + } + } + + impl LayoutReader for RecordingSplitMaskReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + 1 + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + Ok(()) + } + + fn split_points( + &self, + field_mask: Vec, + row_range: Range, + ) -> VortexResult { + self.seen_masks.lock().push(field_mask); + Ok(Box::new(std::iter::once(row_range.end))) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: Mask, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + + fn projection_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + unimplemented!("not needed for this test"); + } + } + + #[test] + fn filtered_split_discovery_prefers_filter_fields() -> VortexResult<()> { + let seen_masks = Arc::new(Mutex::new(Vec::new())); + let reader = Arc::new(RecordingSplitMaskReader::new(seen_masks.clone())); + let session = crate::test::SCAN_SESSION.clone(); + + let scan = ScanBuilder::new(session, reader) + .with_filter(eq(col("a"), lit(1i32))) + .with_projection(col("b")) + .prepare()?; + drop(scan.execute(None)?); + + let seen_masks = seen_masks.lock(); + assert_eq!(seen_masks.len(), 1); + assert!( + seen_masks[0].contains(&FieldMask::Prefix(FieldPath::from_name("a"))), + "expected split discovery to include filter field" + ); + assert_eq!(seen_masks[0].len(), 1); + + Ok(()) + } + + #[test] + fn unfiltered_split_discovery_uses_projection_fields() -> VortexResult<()> { + let seen_masks = Arc::new(Mutex::new(Vec::new())); + let reader = Arc::new(RecordingSplitMaskReader::new(seen_masks.clone())); + let session = crate::test::SCAN_SESSION.clone(); + + let scan = ScanBuilder::new(session, reader) + .with_projection(col("b")) + .prepare()?; + drop(scan.execute(None)?); + + let seen_masks = seen_masks.lock(); + assert_eq!(seen_masks.len(), 1); + assert_eq!( + seen_masks[0], + vec![FieldMask::Prefix(FieldPath::from_name("b"))] + ); + + Ok(()) + } + + #[test] + fn estimate_field_mask_row_bytes_uses_only_requested_fields() { + let dtype = DType::Struct( + StructFields::from_iter([ + ("a", DType::Primitive(PType::I32, Nullability::NonNullable)), + ("b", DType::Primitive(PType::I64, Nullability::NonNullable)), + ("c", DType::Utf8(Nullability::NonNullable)), + ]), + Nullability::NonNullable, + ); + + let bytes = super::estimate_field_mask_row_bytes( + &dtype, + &[FieldMask::Prefix(FieldPath::from_name("b"))], + ); + let var_bytes = super::estimate_field_mask_row_bytes( + &dtype, + &[FieldMask::Prefix(FieldPath::from_name("c"))], + ); + + assert_eq!(bytes, 8); + assert_eq!(var_bytes, super::DEFAULT_VARIABLE_ROW_BYTES); + assert_eq!(super::estimate_field_mask_row_bytes(&dtype, &[]), 0); + } + + #[test] + fn estimate_dtype_row_bytes_recurses_into_lists() { + let list_dtype = DType::List( + Arc::new(DType::Primitive(PType::I32, Nullability::NonNullable)), + Nullability::Nullable, + ); + + assert_eq!( + super::estimate_dtype_row_bytes(&list_dtype), + super::DEFAULT_VARIABLE_ROW_BYTES + 4 + ); + } + + #[test] + fn estimate_dtype_row_bytes_sums_nested_struct_children() { + let dtype = DType::Struct( + StructFields::from_iter([ + ("a", DType::Primitive(PType::I32, Nullability::NonNullable)), + ( + "b", + DType::List( + Arc::new(DType::Primitive(PType::I64, Nullability::NonNullable)), + Nullability::Nullable, + ), + ), + ]), + Nullability::NonNullable, + ); + + assert_eq!( + super::estimate_dtype_row_bytes(&dtype), + 4 + super::DEFAULT_VARIABLE_ROW_BYTES + 8 + ); + } + + #[test] + fn default_target_output_bytes_clamps_to_reasonable_bounds() { + assert_eq!(super::default_target_output_bytes(32 * 1024, 0), 0); + assert_eq!( + super::default_target_output_bytes(32 * 1024, 1), + super::MIN_TARGET_OUTPUT_BYTES + ); + assert_eq!( + super::default_target_output_bytes(32 * 1024, 1024), + super::MAX_TARGET_OUTPUT_BYTES + ); + } + + #[test] + fn nested_projection_detection_matches_struct_and_list_outputs() { + let primitive = DType::Primitive(PType::I32, Nullability::NonNullable); + assert!(!contains_nested_projection_dtype(&primitive)); + + let list = DType::List(Arc::new(primitive.clone()), Nullability::Nullable); + assert!(contains_nested_projection_dtype(&list)); + + let struct_dtype = DType::Struct( + StructFields::from_iter([("x", primitive)]), + Nullability::NonNullable, + ); + assert!(contains_nested_projection_dtype(&struct_dtype)); + } } diff --git a/vortex-scan/src/split_by.rs b/vortex-scan/src/split_by.rs index 81e1c935852..59c77720167 100644 --- a/vortex-scan/src/split_by.rs +++ b/vortex-scan/src/split_by.rs @@ -2,12 +2,12 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::collections::BTreeSet; -use std::iter::once; use std::ops::Range; use vortex_array::dtype::FieldMask; use vortex_error::VortexResult; use vortex_layout::LayoutReader; +use vortex_layout::SplitPointIter; /// Defines how the Vortex file is split into batches for reading. /// @@ -23,30 +23,43 @@ pub enum SplitBy { } impl SplitBy { + pub fn split_points( + &self, + layout_reader: &dyn LayoutReader, + row_range: Range, + field_mask: Vec, + ) -> VortexResult { + if row_range.is_empty() { + return Ok(Box::new(std::iter::empty())); + } + + Ok(match *self { + SplitBy::Layout => layout_reader.split_points(field_mask, row_range)?, + SplitBy::RowCount(n) => { + let mut points: Vec = row_range.clone().step_by(n).skip(1).collect(); + if points.last().copied() != Some(row_range.end) { + points.push(row_range.end); + } + Box::new(points.into_iter()) + } + }) + } + /// Compute the splits for the given layout. - // TODO(ngates): remove this once layout readers are stream based. pub fn splits( &self, layout_reader: &dyn LayoutReader, row_range: &Range, field_mask: &[FieldMask], ) -> VortexResult> { - Ok(match *self { - SplitBy::Layout => { - let mut row_splits = BTreeSet::::new(); - row_splits.insert(row_range.start); - - // Register all splits in the row range for all layouts that are needed - // to read the field mask. - layout_reader.register_splits(field_mask, row_range, &mut row_splits)?; - row_splits - } - SplitBy::RowCount(n) => row_range - .clone() - .step_by(n) - .chain(once(row_range.end)) - .collect(), - }) + let mut row_splits = BTreeSet::::new(); + row_splits.insert(row_range.start); + row_splits.extend(self.split_points( + layout_reader, + row_range.clone(), + field_mask.to_vec(), + )?); + Ok(row_splits) } } diff --git a/vortex-scan/src/splits.rs b/vortex-scan/src/splits.rs index 2f57c5acee0..c88e6663c8b 100644 --- a/vortex-scan/src/splits.rs +++ b/vortex-scan/src/splits.rs @@ -1,11 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::collections::BTreeSet; use std::ops::Range; +use vortex_array::dtype::FieldMask; +use vortex_error::VortexResult; +use vortex_layout::LayoutReader; +use vortex_layout::SplitPointIter; + use crate::IDEAL_SPLIT_SIZE; use crate::selection::Selection; +use crate::split_by::SplitBy; /// The maximum number of rows in a single range. This is somewhat arbitrarily chosen. const MAX_RANGE_SIZE: u64 = IDEAL_SPLIT_SIZE / 25; @@ -17,7 +22,10 @@ const MIN_GAP_BETWEEN_RANGES: u64 = IDEAL_SPLIT_SIZE / 2; pub(super) enum Splits { /// Natural splits computed by the layout reader (e.g., computing splits across different-sized /// column chunks). - Natural(BTreeSet), + Natural { + split_by: SplitBy, + field_mask: Vec, + }, /// Exact split ranges. /// @@ -26,6 +34,65 @@ pub(super) enum Splits { Ranges(Vec>), } +impl Splits { + pub(super) fn iter( + &self, + layout_reader: &dyn LayoutReader, + row_range: Range, + ) -> VortexResult> + Send>> { + if row_range.is_empty() { + return Ok(Box::new(std::iter::empty())); + } + + match self { + Splits::Natural { + split_by, + field_mask, + } => { + let points = + split_by.split_points(layout_reader, row_range.clone(), field_mask.clone())?; + Ok(Box::new(SplitRangeIter::new(row_range.start, points))) + } + Splits::Ranges(ranges) => { + Ok(Box::new(ranges.clone().into_iter().filter_map( + move |range| intersect_range(&range, &row_range), + ))) + } + } + } +} + +struct SplitRangeIter { + start: u64, + split_points: SplitPointIter, +} + +impl SplitRangeIter { + fn new(start: u64, split_points: SplitPointIter) -> Self { + Self { + start, + split_points, + } + } +} + +impl Iterator for SplitRangeIter { + type Item = Range; + + fn next(&mut self) -> Option { + let end = self.split_points.next()?; + let range = self.start..end; + self.start = end; + Some(range) + } +} + +fn intersect_range(left: &Range, right: &Range) -> Option> { + let start = left.start.max(right.start); + let end = left.end.min(right.end); + (start < end).then_some(start..end) +} + /// Attempts to compute split ranges from the given selection. pub(super) fn attempt_split_ranges( selection: &Selection, diff --git a/vortex-scan/src/tasks.rs b/vortex-scan/src/tasks.rs index 9ba1e309948..98a000a66ef 100644 --- a/vortex-scan/src/tasks.rs +++ b/vortex-scan/src/tasks.rs @@ -13,6 +13,7 @@ use futures::future::BoxFuture; use futures::future::ok; use vortex_array::ArrayRef; use vortex_array::MaskFuture; +use vortex_array::dtype::FieldMask; use vortex_array::expr::Expression; use vortex_error::VortexResult; use vortex_layout::LayoutReader; @@ -23,23 +24,20 @@ use crate::selection::Selection; pub type TaskFuture = BoxFuture<'static, VortexResult>; -/// Logic for executing a single split reading task. -/// -/// # Task execution flow -/// -/// First, the task's row range (split) is intersected with the global file row-range requested, -/// if any. -/// -/// The intersected row range is then further reduced via expression-based pruning. After pruning -/// has eliminated more blocks, the full filter is executed over the remainder of the split. -/// -/// This mask is then provided to the reader to perform a filtered projection over the split data, -/// finally mapping the Vortex columnar record batches into some result type `A`. -pub(super) fn split_exec( +/// A split whose selection, pruning, and filter stages have already completed. +#[derive(Debug)] +pub(super) struct FilteredSplit { + pub(super) row_range: Range, + pub(super) mask: Mask, + pub(super) projection_ranges: Vec>, +} + +/// Execute the selection, pruning, and filter stages for a single split. +pub(super) fn filter_split( ctx: Arc>, split: Range, limit: Option<&mut u64>, -) -> VortexResult>> { +) -> VortexResult>> { // Apply the selection to calculate a read mask let read_mask = ctx.selection.row_mask(&split); let row_range = read_mask.row_range(); @@ -136,20 +134,71 @@ pub(super) fn split_exec( } }; - // Step 4: execute the projection, only at the mask for rows which match the filter - let projection_future = - ctx.reader - .projection_evaluation(&row_range, &ctx.projection, filter_mask.clone())?; - - let mapper = ctx.mapper.clone(); - let array_fut = async move { + let filtered = async move { let mask = filter_mask.await?; if mask.all_false() { return Ok(None); } + let projection_ranges = + projection_split_ranges(ctx.reader.as_ref(), &ctx.projection_field_mask, &row_range)?; + Ok(Some(FilteredSplit { + row_range, + mask, + projection_ranges, + })) + }; + + Ok(filtered.boxed()) +} + +/// Project and map a split after its filter mask has already been resolved. +pub(super) fn project_filtered_split( + ctx: Arc>, + filtered: FilteredSplit, +) -> VortexResult> { + let reader = ctx.reader.clone(); + let projection = ctx.projection.clone(); + let mapper = ctx.mapper.clone(); + let FilteredSplit { + row_range, mask, .. + } = filtered; + + let array_fut = async move { + // Only schedule payload reads once the filter has resolved for this split. + let projection_future = + reader.projection_evaluation(&row_range, &projection, MaskFuture::ready(mask))?; let array = projection_future.await?; - mapper(array).map(Some) + mapper(array) + }; + + Ok(array_fut.boxed()) +} + +/// Logic for executing a single split reading task. +/// +/// # Task execution flow +/// +/// First, the task's row range (split) is intersected with the global file row-range requested, +/// if any. +/// +/// The intersected row range is then further reduced via expression-based pruning. After pruning +/// has eliminated more blocks, the full filter is executed over the remainder of the split. +/// +/// This mask is then provided to the reader to perform a filtered projection over the split data, +/// finally mapping the Vortex columnar record batches into some result type `A`. +pub(super) fn split_exec( + ctx: Arc>, + split: Range, + limit: Option<&mut u64>, +) -> VortexResult>> { + let filtered = filter_split(ctx.clone(), split, limit)?; + let array_fut = async move { + let Some(filtered) = filtered.await? else { + return Ok(None); + }; + let array = project_filtered_split(ctx, filtered)?.await?; + Ok(Some(array)) }; Ok(array_fut.boxed()) @@ -165,6 +214,272 @@ pub(super) struct TaskContext { pub(super) reader: Arc, /// The projection expression to apply to gather the scanned rows. pub(super) projection: Expression, + /// Field mask for the projected columns, used to discover projection boundaries. + pub(super) projection_field_mask: Vec, /// Function that maps into an A. pub(super) mapper: Arc VortexResult + Send + Sync>, } + +fn projection_split_ranges( + reader: &dyn LayoutReader, + projection_field_mask: &[FieldMask], + row_range: &Range, +) -> VortexResult>> { + if row_range.is_empty() { + return Ok(Vec::new()); + } + + let mut start = row_range.start; + let mut ranges = Vec::new(); + let split_points = reader.split_points(projection_field_mask.to_vec(), row_range.clone())?; + for end in split_points { + if end > start { + ranges.push(start..end); + start = end; + } + } + + if ranges.is_empty() { + ranges.push(row_range.clone()); + } + + Ok(ranges) +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + use std::ops::BitAnd; + use std::ops::Range; + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + + use futures::executor::block_on; + use parking_lot::Mutex; + use vortex_array::ArrayRef; + use vortex_array::IntoArray; + use vortex_array::MaskFuture; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::dtype::DType; + use vortex_array::dtype::FieldMask; + use vortex_array::dtype::Nullability; + use vortex_array::dtype::PType; + use vortex_array::expr::Expression; + use vortex_array::expr::eq; + use vortex_array::expr::lit; + use vortex_array::expr::root; + use vortex_buffer::buffer; + use vortex_error::VortexResult; + use vortex_layout::ArrayFuture; + use vortex_layout::LayoutReader; + use vortex_mask::Mask; + + use super::TaskContext; + use crate::filter::FilterExpr; + use crate::selection::Selection; + use crate::tasks::filter_split; + use crate::tasks::project_filtered_split; + use crate::tasks::split_exec; + + struct ProjectionCountingReader { + name: Arc, + dtype: DType, + projection_calls: Arc, + } + + impl ProjectionCountingReader { + fn new(projection_calls: Arc) -> Self { + Self { + name: Arc::from("projection-counting"), + dtype: DType::Primitive(PType::I32, Nullability::NonNullable), + projection_calls, + } + } + } + + impl LayoutReader for ProjectionCountingReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + 4 + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let len = mask.len(); + Ok(MaskFuture::new(len, async move { + drop(mask.await?); + Ok(Mask::new_false(len)) + })) + } + + fn projection_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + _mask: MaskFuture, + ) -> VortexResult { + self.projection_calls.fetch_add(1, Ordering::Relaxed); + let array = PrimitiveArray::from_iter(buffer![1i32, 2, 3, 4]).into_array(); + Ok(Box::pin(async move { Ok(array) })) + } + } + + struct MaskForwardingReader { + name: Arc, + dtype: DType, + filter_mask: Mask, + projected_mask: Arc>>, + } + + impl MaskForwardingReader { + fn new(filter_mask: Mask, projected_mask: Arc>>) -> Self { + Self { + name: Arc::from("mask-forwarding"), + dtype: DType::Primitive(PType::I32, Nullability::NonNullable), + filter_mask, + projected_mask, + } + } + } + + impl LayoutReader for MaskForwardingReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn row_count(&self) -> u64 { + 4 + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let expected = self.filter_mask.clone(); + Ok(MaskFuture::new(mask.len(), async move { + let mask = mask.await?; + Ok(mask.bitand(&expected)) + })) + } + + fn projection_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let projected_mask = self.projected_mask.clone(); + let array = PrimitiveArray::from_iter(buffer![20i32, 40]).into_array(); + Ok(Box::pin(async move { + let mask = mask.await?; + *projected_mask.lock() = Some(mask); + Ok(array) + })) + } + } + + #[test] + fn split_exec_skips_projection_for_all_false_filter() { + let projection_calls = Arc::new(AtomicUsize::new(0)); + let reader = Arc::new(ProjectionCountingReader::new(projection_calls.clone())); + let ctx = Arc::new(TaskContext { + selection: Selection::default(), + filter: Some(Arc::new(FilterExpr::new(eq(root(), lit(1i32))))), + reader, + projection: root(), + projection_field_mask: vec![FieldMask::All], + mapper: Arc::new(|array: ArrayRef| Ok(array)), + }); + + let result = block_on(split_exec(ctx, 0..4, None).unwrap()).unwrap(); + assert!(result.is_none()); + assert_eq!(projection_calls.load(Ordering::Relaxed), 0); + } + + #[test] + fn filtered_split_mask_is_forwarded_unchanged_to_projection() { + let filter_mask = Mask::from_indices(4, vec![1, 3]); + let projected_mask = Arc::new(Mutex::new(None)); + let reader = Arc::new(MaskForwardingReader::new( + filter_mask, + projected_mask.clone(), + )); + let ctx = Arc::new(TaskContext { + selection: Selection::default(), + filter: Some(Arc::new(FilterExpr::new(eq(root(), lit(1i32))))), + reader, + projection: root(), + projection_field_mask: vec![FieldMask::All], + mapper: Arc::new(|array: ArrayRef| Ok(array)), + }); + + let filtered = block_on(filter_split(ctx.clone(), 0..4, None).unwrap()) + .unwrap() + .unwrap(); + assert_eq!(filtered.row_range, 0..4); + assert_eq!(filtered.mask.values().unwrap().indices(), &[1, 3]); + + let result = block_on(project_filtered_split(ctx, filtered).unwrap()).unwrap(); + assert_eq!(result.len(), 2); + + let projected_mask = projected_mask.lock(); + let projected_mask = projected_mask.as_ref().unwrap(); + assert_eq!(projected_mask.values().unwrap().indices(), &[1, 3]); + } +} From 69b5bbc48e9540006f61434e071e767a423e31bd Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Tue, 17 Mar 2026 17:04:27 +0000 Subject: [PATCH 2/7] step one Signed-off-by: Adam Gutglick --- scripts/run_clickbench_rss.sh | 57 ++++++++++++++++++++++ vortex-datafusion/src/persistent/format.rs | 2 +- vortex-layout/src/layouts/flat/reader.rs | 51 +++++++++---------- vortex-scan/src/multi.rs | 14 ++---- vortex-scan/src/repeated_scan.rs | 13 ++--- vortex-scan/src/scan_builder.rs | 17 +++---- 6 files changed, 97 insertions(+), 57 deletions(-) create mode 100755 scripts/run_clickbench_rss.sh diff --git a/scripts/run_clickbench_rss.sh b/scripts/run_clickbench_rss.sh new file mode 100755 index 00000000000..c9e1b0c7bdb --- /dev/null +++ b/scripts/run_clickbench_rss.sh @@ -0,0 +1,57 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +if ! command -v gtime >/dev/null 2>&1; then + echo "error: gtime not found in PATH" >&2 + exit 1 +fi + +BINARY="${BINARY:-$ROOT_DIR/target/release/datafusion-bench}" +FORMAT="${FORMAT:-vortex}" +START_QUERY="${START_QUERY:-0}" +END_QUERY="${END_QUERY:-42}" +OUTPUT_DIR="${OUTPUT_DIR:-$ROOT_DIR/data/clickbench-rss}" + +mkdir -p "$OUTPUT_DIR" + +echo "building release benchmark binary..." +cargo build -p datafusion-bench --release + +timestamp="$(date +%Y%m%d-%H%M%S)" +csv_path="$OUTPUT_DIR/clickbench-rss-$timestamp.csv" + +cat >"$csv_path" <<'EOF' +query,max_rss_kb,elapsed,user_seconds,system_seconds,cpu_percent,exit_status +EOF + +for ((q = START_QUERY; q <= END_QUERY; q++)); do + log_path="$OUTPUT_DIR/q${q}.gtime.$timestamp.log" + echo "running clickbench query $q ..." + + gtime --verbose \ + "$BINARY" clickbench --formats "$FORMAT" --queries "$q" \ + >"$OUTPUT_DIR/q${q}.stdout.$timestamp.log" \ + 2>"$log_path" + + max_rss_kb="$(awk -F': ' '/Maximum resident set size/ {print $2}' "$log_path" | tr -d '[:space:]')" + elapsed="$(awk -F': ' '/Elapsed \(wall clock\) time/ {print $2}' "$log_path" | sed 's/^ *//')" + user_seconds="$(awk -F': ' '/User time \(seconds\)/ {print $2}' "$log_path" | tr -d '[:space:]')" + system_seconds="$(awk -F': ' '/System time \(seconds\)/ {print $2}' "$log_path" | tr -d '[:space:]')" + cpu_percent="$(awk -F': ' '/Percent of CPU this job got/ {print $2}' "$log_path" | tr -d '[:space:]')" + exit_status="$(awk -F': ' '/Exit status/ {print $2}' "$log_path" | tr -d '[:space:]')" + + printf '%s,%s,%s,%s,%s,%s,%s\n' \ + "$q" \ + "${max_rss_kb:-}" \ + "${elapsed:-}" \ + "${user_seconds:-}" \ + "${system_seconds:-}" \ + "${cpu_percent:-}" \ + "${exit_status:-}" \ + >>"$csv_path" +done + +echo "wrote summary to $csv_path" diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index fb3d5f9db11..3df2ccf6f2c 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -101,7 +101,7 @@ config_namespace! { /// all expressions are evaluated after the scan. pub projection_pushdown: bool, default = false /// The intra-partition scan concurrency, controlling the number of row splits to process - /// concurrently per-thread within each file. + /// concurrently within each file. /// /// This does not affect the overall parallelism /// across partitions, which is controlled by DataFusion's execution configuration. diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index 7a370bf1a85..575cc60df1e 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -5,7 +5,6 @@ use std::collections::BTreeSet; use std::ops::BitAnd; use std::ops::Range; use std::sync::Arc; -use std::sync::OnceLock; use futures::FutureExt; use futures::future::BoxFuture; @@ -40,7 +39,6 @@ pub struct FlatReader { name: Arc, segment_source: Arc, session: VortexSession, - array: OnceLock, } impl FlatReader { @@ -55,38 +53,33 @@ impl FlatReader { name, segment_source, session, - array: Default::default(), } } /// Register the segment request and return a future that would resolve into the deserialised array. fn array_future(&self) -> SharedArrayFuture { - self.array - .get_or_init(|| { - let row_count = usize::try_from(self.layout.row_count()) - .vortex_expect("row count must fit in usize"); - let segment_fut = self.segment_source.request(self.layout.segment_id()); - let ctx = self.layout.array_ctx().clone(); - let session = self.session.clone(); - let dtype = self.layout.dtype().clone(); - let array_tree = self.layout.array_tree().cloned(); - async move { - let segment = segment_fut.await?; - let parts = if let Some(array_tree) = array_tree { - // Use the pre-stored flatbuffer from layout metadata combined with segment buffers. - ArrayParts::from_flatbuffer_and_segment(array_tree, segment)? - } else { - // Parse the flatbuffer from the segment itself. - ArrayParts::try_from(segment)? - }; - parts - .decode(&dtype, row_count, &ctx, &session) - .map_err(Arc::new) - } - .boxed() - .shared() - }) - .clone() + let row_count = + usize::try_from(self.layout.row_count()).vortex_expect("row count must fit in usize"); + let segment_fut = self.segment_source.request(self.layout.segment_id()); + let ctx = self.layout.array_ctx().clone(); + let session = self.session.clone(); + let dtype = self.layout.dtype().clone(); + let array_tree = self.layout.array_tree().cloned(); + async move { + let segment = segment_fut.await?; + let parts = if let Some(array_tree) = array_tree { + // Use the pre-stored flatbuffer from layout metadata combined with segment buffers. + ArrayParts::from_flatbuffer_and_segment(array_tree, segment)? + } else { + // Parse the flatbuffer from the segment itself. + ArrayParts::try_from(segment)? + }; + parts + .decode(&dtype, row_count, &ctx, &session) + .map_err(Arc::new) + } + .boxed() + .shared() } } diff --git a/vortex-scan/src/multi.rs b/vortex-scan/src/multi.rs index 4a20d7fe12f..24fd93f7072 100644 --- a/vortex-scan/src/multi.rs +++ b/vortex-scan/src/multi.rs @@ -102,10 +102,6 @@ impl MultiLayoutDataSource { session: &VortexSession, ) -> Self { let dtype = first.dtype().clone(); - let concurrency = std::thread::available_parallelism() - .map(|v| v.get()) - .unwrap_or(DEFAULT_CONCURRENCY); - let mut children = Vec::with_capacity(1 + remaining.len()); children.push(MultiLayoutChild::Opened(first)); children.extend(remaining.into_iter().map(MultiLayoutChild::Deferred)); @@ -114,7 +110,7 @@ impl MultiLayoutDataSource { dtype, session: session.clone(), children, - concurrency, + concurrency: DEFAULT_CONCURRENCY, } } @@ -128,10 +124,6 @@ impl MultiLayoutDataSource { factories: Vec>, session: &VortexSession, ) -> Self { - let concurrency = std::thread::available_parallelism() - .map(|v| v.get()) - .unwrap_or(DEFAULT_CONCURRENCY); - Self { dtype, session: session.clone(), @@ -139,14 +131,14 @@ impl MultiLayoutDataSource { .into_iter() .map(MultiLayoutChild::Deferred) .collect(), - concurrency, + concurrency: DEFAULT_CONCURRENCY, } } /// Sets the concurrency for opening deferred readers. /// /// Controls how many file opens run in parallel via `buffer_unordered`. - /// Defaults to the number of available CPU cores. + /// Defaults to a fixed budget. pub fn with_concurrency(mut self, concurrency: usize) -> Self { self.concurrency = concurrency; self diff --git a/vortex-scan/src/repeated_scan.rs b/vortex-scan/src/repeated_scan.rs index dd165bd0c14..3a765172aa1 100644 --- a/vortex-scan/src/repeated_scan.rs +++ b/vortex-scan/src/repeated_scan.rs @@ -71,7 +71,7 @@ pub struct RepeatedScan { selection: Selection, /// The natural splits of the file. splits: Splits, - /// The number of splits to make progress on concurrently **per-thread**. + /// The total number of splits to make progress on concurrently. concurrency: usize, /// Function to apply to each [`ArrayRef`] within the spawned split tasks. map_fn: Arc VortexResult + Send + Sync>, @@ -148,11 +148,12 @@ impl RepeatedScan { &self, row_range: Option>, ) -> VortexResult>> { - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - let concurrency = self.concurrency * num_workers; - self.execute_stream(row_range, concurrency, self.ordered, self.session.handle()) + self.execute_stream( + row_range, + self.concurrency, + self.ordered, + self.session.handle(), + ) } fn legacy_stream_from_ranges( diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index f46cde87276..9a7c6dedf65 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -61,7 +61,7 @@ pub struct ScanBuilder { selection: Selection, /// How to split the file for concurrent processing. split_by: SplitBy, - /// The number of splits to make progress on concurrently **per-thread**. + /// The total number of splits to make progress on concurrently. concurrency: usize, /// Function to apply to each [`ArrayRef`] within the spawned split tasks. map_fn: Arc VortexResult + Send + Sync>, @@ -80,6 +80,7 @@ pub struct ScanBuilder { } const DEFAULT_TARGET_OUTPUT_ROWS: usize = DEFAULT_TARGET_OUTPUT_ROWS_HINT; +const DEFAULT_SCAN_CONCURRENCY: usize = 16; const MIN_TARGET_OUTPUT_BYTES: usize = 1 << 20; const MAX_TARGET_OUTPUT_BYTES: usize = 8 << 20; const DEFAULT_VARIABLE_ROW_BYTES: usize = 32; @@ -96,9 +97,9 @@ impl ScanBuilder { row_range: None, selection: Default::default(), split_by: SplitBy::Layout, - // We default to four tasks per worker thread, which allows for some I/O lookahead - // without too much impact on work-stealing. - concurrency: 4, + // Keep one explicit scan-concurrency budget rather than multiplying by the + // runtime's worker count later. + concurrency: DEFAULT_SCAN_CONCURRENCY, map_fn: Arc::new(Ok), metrics_registry: None, file_stats: None, @@ -186,8 +187,7 @@ impl ScanBuilder { self.concurrency } - /// The number of row splits to make progress on concurrently per-thread, must - /// be greater than 0. + /// The total number of row splits to make progress on concurrently. pub fn with_concurrency(mut self, concurrency: usize) -> Self { assert!(concurrency > 0); self.concurrency = concurrency; @@ -400,10 +400,7 @@ impl Stream for LazyScanStream { LazyScanState::Builder(builder) => { let builder = builder.take().vortex_expect("polled after completion"); let ordered = builder.ordered; - let num_workers = std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1); - let concurrency = builder.concurrency * num_workers; + let concurrency = builder.concurrency; let handle = builder.session.handle(); let execute_handle = handle.clone(); let task = handle.spawn_blocking(move || { From 643070685e25eba8869eff7261baf5e2cfa78651 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 18 Mar 2026 18:57:12 +0000 Subject: [PATCH 3/7] Some changes Signed-off-by: Adam Gutglick --- vortex-datafusion/src/persistent/opener.rs | 7 +- vortex-file/src/file.rs | 11 +- vortex-file/src/read/driver.rs | 141 +++++++- vortex-file/src/segments/source.rs | 42 ++- vortex-layout/public-api.lock | 28 ++ vortex-layout/src/layouts/chunked/reader.rs | 7 + vortex-layout/src/layouts/struct_/reader.rs | 74 +++- vortex-layout/src/lib.rs | 1 + vortex-layout/src/reader.rs | 44 +++ vortex-layout/src/segments/cache.rs | 6 + vortex-layout/src/segments/mod.rs | 2 +- vortex-layout/src/segments/shared.rs | 4 + vortex-layout/src/segments/source.rs | 41 +++ vortex-mask/public-api.lock | 2 + vortex-mask/src/lib.rs | 17 + vortex-scan/public-api.lock | 12 - vortex-scan/src/api.rs | 18 - vortex-scan/src/fetch_plan.rs | 371 ++++++++++++++++++++ vortex-scan/src/layout.rs | 43 +-- vortex-scan/src/lib.rs | 2 + vortex-scan/src/multi.rs | 14 +- vortex-scan/src/repeated_scan.rs | 366 ++++++------------- vortex-scan/src/scan_builder.rs | 247 +++---------- vortex-scan/src/scan_metrics.rs | 32 ++ vortex-scan/src/tasks.rs | 224 ++++++++++-- 25 files changed, 1181 insertions(+), 575 deletions(-) create mode 100644 vortex-scan/src/fetch_plan.rs create mode 100644 vortex-scan/src/scan_metrics.rs diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 006bf51aaf1..056b13c716d 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -42,8 +42,6 @@ use vortex::layout::LayoutReader; use vortex::metrics::Label; use vortex::metrics::MetricsRegistry; use vortex::scan::ScanBuilder; -use vortex::scan::api::DEFAULT_TARGET_OUTPUT_BYTES_HINT; -use vortex::scan::api::DEFAULT_TARGET_OUTPUT_ROWS_HINT; use vortex::session::VortexSession; use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; @@ -298,7 +296,8 @@ impl FileOpener for VortexOpener { } }; - let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader); + let mut scan_builder = ScanBuilder::new(session.clone(), layout_reader) + .with_segment_source(vxf.segment_source()); if let Some(extensions) = file.extensions && let Some(vortex_plan) = extensions.downcast_ref::() @@ -363,8 +362,6 @@ impl FileOpener for VortexOpener { .with_projection(scan_projection) .with_some_filter(filter) .with_ordered(has_output_ordering) - .with_target_output_rows(DEFAULT_TARGET_OUTPUT_ROWS_HINT) - .with_target_output_bytes(DEFAULT_TARGET_OUTPUT_BYTES_HINT) .map(move |chunk| { let mut ctx = session.create_execution_ctx(); chunk.execute_record_batch(&stream_schema, &mut ctx) diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index d113cc8cf98..104a524f547 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -103,10 +103,10 @@ impl VortexFile { self.session.clone(), )); } - Ok(Arc::new(LayoutReaderDataSource::new( - reader, - self.session.clone(), - ))) + Ok(Arc::new( + LayoutReaderDataSource::new(reader, self.session.clone()) + .with_segment_source(self.segment_source()), + )) } /// Initiate a scan of the file, returning a builder for configuring the scan. @@ -114,7 +114,8 @@ impl VortexFile { Ok(ScanBuilder::new( self.session.clone(), self.layout_reader()?, - )) + ) + .with_segment_source(self.segment_source())) } /// Returns true if the expression will never match any rows in the file. diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index d8fd638adc8..04a919ec5cf 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -140,6 +140,7 @@ impl State { tracing::debug!(?req, "ReadRequest dropped before registration"); return; } + self.metrics.registered_requests.add(1); self.requests_by_offset.insert((req.offset, req.id)); self.requests.insert(req.id, req); } @@ -149,20 +150,43 @@ impl State { self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped before poll"); } else { + self.metrics.polled_requests.add(1); self.polled_requests.insert(req_id, req); } } } ReadEvent::Dropped(req_id) => { if let Some(req) = self.requests.remove(&req_id) { + self.metrics.dropped_requests.add(1); self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped before poll"); } if let Some(req) = self.polled_requests.remove(&req_id) { + self.metrics.dropped_requests.add(1); self.requests_by_offset.remove(&(req.offset, req_id)); tracing::debug!(?req, "ReadRequest dropped after poll"); } } + ReadEvent::BatchBoundary => { + // Promote all registered-but-unpolled requests to polled status. + // This tells the coalescer that the entire batch is needed now, + // allowing it to form optimal coalesced reads. + let promoted = self.requests.len(); + if promoted > 0 { + tracing::debug!( + promoted, + "BatchBoundary: promoting registered requests to polled" + ); + self.metrics.polled_requests.add(promoted as u64); + for (req_id, req) in std::mem::take(&mut self.requests) { + if req.callback.is_closed() { + self.requests_by_offset.remove(&(req.offset, req_id)); + } else { + self.polled_requests.insert(req_id, req); + } + } + } + } } } @@ -215,6 +239,9 @@ impl State { let first_req = self.next_uncoalesced()?; let mut requests = vec![first_req]; + let mut payload_bytes = requests[0].length as u64; + let mut registered_only_requests = 0usize; + let mut polled_requests = 1usize; let mut current_start = requests[0].offset; let mut current_end = requests[0].offset + requests[0].length as u64; let align = *self.coalesced_buffer_alignment as u64; @@ -269,18 +296,28 @@ impl State { let new_total_size = new_end - aligned_start; if new_total_size > window.max_size { + self.metrics.batched_skipped_max_size.add(1); // Skip it but keep it available for future coalescing operations. continue; } current_start = new_start; current_end = new_end; - let req = self - .polled_requests - .remove(&req_id) - .or_else(|| self.requests.remove(&req_id)) - .vortex_expect("Missing request in requests_by_offset"); + let (req, was_polled) = if let Some(req) = self.polled_requests.remove(&req_id) + { + (req, true) + } else if let Some(req) = self.requests.remove(&req_id) { + (req, false) + } else { + unreachable!("Missing request in requests_by_offset"); + }; + payload_bytes = payload_bytes.saturating_add(req.length as u64); + if was_polled { + polled_requests = polled_requests.saturating_add(1); + } else { + registered_only_requests = registered_only_requests.saturating_add(1); + } requests.push(req); if ids_to_remove.insert(req_id) { keys_to_remove.push((req_offset, req_id)); @@ -302,6 +339,18 @@ impl State { requests.sort_unstable_by_key(|r| r.offset); let aligned_start = current_start - (current_start % align); + let range_bytes = current_end - aligned_start; + + self.metrics.batched_range_bytes.update(range_bytes as f64); + self.metrics + .batched_payload_bytes + .update(payload_bytes as f64); + self.metrics + .batched_registered_only_requests + .update(registered_only_requests as f64); + self.metrics + .batched_polled_requests + .update(polled_requests as f64); tracing::debug!( "Coalesced {} requests into range {}..{} (len={})", @@ -808,4 +857,86 @@ mod tests { assert_eq!(individual_count, 2, "Expected 2 individual requests"); assert_eq!(coalesced_operations, 0, "Expected 0 coalesced operations"); } + + #[tokio::test] + async fn test_metrics_record_registered_only_batch_members() { + let (req1, _rx1) = create_request(1, 0, 10); + let (req2, _rx2) = create_request(2, 50, 10); + let (req3, _rx3) = create_request(3, 100, 10); + + let events = vec![ + ReadEvent::Request(req1), + ReadEvent::Request(req2), + ReadEvent::Request(req3), + ReadEvent::Polled(2), + ]; + + let event_stream = stream::iter(events); + let metrics_registry = DefaultMetricsRegistry::default(); + let metrics = RequestMetrics::new(&metrics_registry, vec![]); + let io_stream = IoRequestStream::new( + event_stream, + Some(CoalesceConfig { + distance: 60, + max_size: 1024, + }), + Alignment::none(), + metrics, + ); + + let outputs: Vec = io_stream.collect().await; + assert_eq!(outputs.len(), 1); + + let snapshot = metrics_registry.snapshot(); + let mut registered = 0u64; + let mut polled = 0u64; + let mut coalesced = 0u64; + let mut registered_only_count = 0usize; + let mut registered_only_total = 0.0; + let mut polled_in_batch_count = 0usize; + let mut polled_in_batch_total = 0.0; + + for metric in snapshot.iter() { + match metric.value() { + MetricValue::Counter(counter) => match metric.name().as_ref() { + "io.requests.registered" => registered = counter.value(), + "io.requests.polled" => polled = counter.value(), + "io.requests.coalesced" => coalesced = counter.value(), + _ => {} + }, + MetricValue::Histogram(histogram) => match metric.name().as_ref() { + "io.requests.batched.registered_only_requests" => { + registered_only_count = histogram.count(); + registered_only_total = histogram.total(); + } + "io.requests.batched.polled_requests" => { + polled_in_batch_count = histogram.count(); + polled_in_batch_total = histogram.total(); + } + _ => {} + }, + _ => {} + } + } + + assert_eq!(registered, 3, "Expected 3 registered requests"); + assert_eq!(polled, 1, "Expected 1 polled request"); + assert_eq!(coalesced, 1, "Expected 1 coalesced operation"); + assert_eq!( + registered_only_count, 1, + "Expected one histogram sample for registered-only requests" + ); + assert_eq!( + registered_only_total, 2.0, + "Expected two registered-only requests in the coalesced batch" + ); + assert_eq!( + polled_in_batch_count, 1, + "Expected one histogram sample for polled requests" + ); + assert_eq!( + polled_in_batch_total, 1.0, + "Expected one polled request in the coalesced batch" + ); + } } diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index 8f83150c4bb..238496d5e7a 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -39,6 +39,10 @@ pub enum ReadEvent { Request(ReadRequest), Polled(RequestId), Dropped(RequestId), + /// Signals that a logical batch of requests has been fully registered. + /// The driver promotes all registered-but-unpolled requests to polled status, + /// allowing the coalescer to form optimal reads over the entire batch. + BatchBoundary, } /// A [`SegmentSource`] for file-like IO. @@ -134,6 +138,10 @@ impl FileSegmentSource { } impl SegmentSource for FileSegmentSource { + fn flush(&self) { + drop(self.events.unbounded_send(ReadEvent::BatchBoundary)); + } + fn request(&self, id: SegmentId) -> SegmentFuture { // We eagerly register the read request here assuming the behaviour of [`FileRead`], where // coalescing becomes effective prior to the future being polled. @@ -232,14 +240,31 @@ impl Drop for ReadFuture { } pub struct RequestMetrics { + pub registered_requests: Counter, + pub polled_requests: Counter, + pub dropped_requests: Counter, pub individual_requests: Counter, pub coalesced_requests: Counter, pub num_requests_coalesced: Histogram, + pub batched_range_bytes: Histogram, + pub batched_payload_bytes: Histogram, + pub batched_registered_only_requests: Histogram, + pub batched_polled_requests: Histogram, + pub batched_skipped_max_size: Counter, } impl RequestMetrics { pub fn new(metrics_registry: &dyn MetricsRegistry, labels: Vec::with_some_limit(self, limit: core::option::O pub fn vortex_scan::ScanBuilder::with_some_metrics_registry(self, metrics: core::option::Option>) -> Self pub fn vortex_scan::ScanBuilder::with_split_by(self, split_by: vortex_scan::SplitBy) -> Self - -pub fn vortex_scan::ScanBuilder::with_target_output_bytes(self, target_output_bytes: usize) -> Self - -pub fn vortex_scan::ScanBuilder::with_target_output_rows(self, target_output_rows: usize) -> Self diff --git a/vortex-scan/src/api.rs b/vortex-scan/src/api.rs index 09a3a8ab6bf..f090dd2cd79 100644 --- a/vortex-scan/src/api.rs +++ b/vortex-scan/src/api.rs @@ -41,18 +41,6 @@ use crate::Selection; /// A sendable stream of partitions. pub type PartitionStream = BoxStream<'static, VortexResult>; -/// Shared default lower bound for rows accumulated before materializing projected data. -/// -/// Engines that drive scans via [`ScanRequest`] can use this to align with the tuned Vortex -/// execution path used by DataFusion. -pub const DEFAULT_TARGET_OUTPUT_ROWS_HINT: usize = 64 * 1024; - -/// Shared default lower bound for projected payload bytes accumulated before materialization. -/// -/// Engines that drive scans via [`ScanRequest`] can use this to align with the tuned Vortex -/// execution path used by DataFusion. -pub const DEFAULT_TARGET_OUTPUT_BYTES_HINT: usize = 4 << 20; - /// Opens a Vortex [`DataSource`] from a URI. /// /// Configuration can be passed via the URI query parameters, similar to JDBC / ADBC. @@ -138,10 +126,6 @@ pub struct ScanRequest { /// Optional limit on the number of rows returned by scan. Limits are applied after all /// filtering and row selection. pub limit: Option, - /// Preferred lower bound for rows accumulated before materializing projected data. - pub target_output_rows: Option, - /// Preferred lower bound for projected payload bytes accumulated before materialization. - pub target_output_bytes: Option, } impl Default for ScanRequest { @@ -153,8 +137,6 @@ impl Default for ScanRequest { selection: Selection::default(), ordered: false, limit: None, - target_output_rows: None, - target_output_bytes: None, } } } diff --git a/vortex-scan/src/fetch_plan.rs b/vortex-scan/src/fetch_plan.rs new file mode 100644 index 00000000000..4fc4966b71c --- /dev/null +++ b/vortex-scan/src/fetch_plan.rs @@ -0,0 +1,371 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use itertools::Itertools; +use vortex_array::dtype::DType; +use vortex_array::dtype::Field; +use vortex_array::dtype::FieldMask; +use vortex_array::dtype::FieldNames; +use vortex_array::dtype::FieldPath; +use vortex_array::expr::Expression; +use vortex_array::expr::root; +use vortex_array::expr::select; +use vortex_array::scalar_fn::fns::root::Root; +use vortex_array::scalar_fn::fns::select::Select; +use vortex_error::VortexResult; +use vortex_layout::LayoutReader; +use vortex_layout::ProjectionFetchHint; + +const IMMEDIATE_FIELD_ROW_BYTES_THRESHOLD: usize = 16; +pub(crate) const DEFERRED_WAIT_BUDGET_BYTES: usize = 8 << 20; +pub(crate) const DEFERRED_IN_FLIGHT_BUDGET_BYTES: usize = 16 << 20; +const DEFAULT_VARIABLE_ROW_BYTES: usize = 32; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum MaterializationPlan { + Monolithic { + projected_row_bytes: usize, + projection_aligned_splits: bool, + }, + Deferred(DeferredMaterializationPlan), +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct DeferredMaterializationPlan { + final_fields: FieldNames, + immediate_fields: FieldNames, + deferred_groups: Vec, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct DeferredFieldGroup { + field_names: FieldNames, + field_masks: Vec, + fetch_row_bytes: usize, +} + +impl MaterializationPlan { + pub(crate) fn from_projection( + projection: &Expression, + dtype: &DType, + filter_present: bool, + projection_field_mask: &[FieldMask], + ) -> Self { + let projected_row_bytes = estimate_field_mask_row_bytes(dtype, projection_field_mask); + if !filter_present { + return Self::Monolithic { + projected_row_bytes, + projection_aligned_splits: false, + }; + } + + let Some(final_fields) = simple_root_projection_fields(projection, dtype) else { + return Self::Monolithic { + projected_row_bytes, + projection_aligned_splits: false, + }; + }; + if final_fields.is_empty() || !final_fields.iter().all_unique() { + return Self::Monolithic { + projected_row_bytes, + projection_aligned_splits: false, + }; + } + + let Some(struct_fields) = dtype.as_struct_fields_opt() else { + return Self::Monolithic { + projected_row_bytes, + projection_aligned_splits: false, + }; + }; + if final_fields.len() == struct_fields.nfields() + || final_fields.len().saturating_mul(2) >= struct_fields.nfields() + { + return Self::Monolithic { + projected_row_bytes, + projection_aligned_splits: true, + }; + } + + let mut immediate = Vec::new(); + let mut deferred_groups = Vec::new(); + let mut immediate_carry_cost = 0usize; + let mut deferred_carry_cost = 0usize; + + for name in final_fields.iter() { + let Some(field_dtype) = struct_fields.field(name) else { + return Self::Monolithic { + projected_row_bytes, + projection_aligned_splits: false, + }; + }; + + let carry_cost_bytes_per_row = estimate_dtype_row_bytes(&field_dtype); + if should_defer_field(&field_dtype, carry_cost_bytes_per_row) { + deferred_carry_cost = deferred_carry_cost.saturating_add(carry_cost_bytes_per_row); + deferred_groups.push(DeferredFieldGroup { + field_names: FieldNames::from([name.clone()]), + field_masks: vec![FieldMask::Prefix(FieldPath::from(Field::Name( + name.clone(), + )))], + fetch_row_bytes: carry_cost_bytes_per_row, + }); + } else { + immediate_carry_cost = + immediate_carry_cost.saturating_add(carry_cost_bytes_per_row); + immediate.push(name.clone()); + } + } + + if deferred_groups.is_empty() { + return Self::Monolithic { + projected_row_bytes, + projection_aligned_splits: false, + }; + } + + let total_carry_cost = immediate_carry_cost.saturating_add(deferred_carry_cost); + if total_carry_cost == 0 || deferred_carry_cost.saturating_mul(2) < total_carry_cost { + return Self::Monolithic { + projected_row_bytes, + projection_aligned_splits: false, + }; + } + + Self::Deferred(DeferredMaterializationPlan { + final_fields, + immediate_fields: FieldNames::from(immediate), + deferred_groups, + }) + } + + pub(crate) fn fetch_hints( + &self, + reader: &dyn LayoutReader, + projection_field_mask: &[FieldMask], + row_range: &Range, + ) -> VortexResult> { + match self { + Self::Monolithic { + projected_row_bytes, + .. + } => reader.projection_fetch_hints( + projection_field_mask.to_vec(), + row_range.clone(), + *projected_row_bytes, + ), + Self::Deferred(plan) => { + let mut hints = Vec::new(); + for group in &plan.deferred_groups { + hints.extend(reader.projection_fetch_hints( + group.field_masks.clone(), + row_range.clone(), + group.fetch_row_bytes, + )?); + } + Ok(hints) + } + } + } + + pub(crate) fn prefers_projection_aligned_splits(&self) -> bool { + match self { + Self::Monolithic { + projection_aligned_splits, + .. + } => *projection_aligned_splits, + Self::Deferred(_) => false, + } + } + + #[cfg(test)] + pub(crate) fn deferred(&self) -> Option<&DeferredMaterializationPlan> { + match self { + Self::Monolithic { .. } => None, + Self::Deferred(plan) => Some(plan), + } + } +} + +impl DeferredMaterializationPlan { + pub(crate) fn final_fields(&self) -> &FieldNames { + &self.final_fields + } + + pub(crate) fn immediate_expr(&self) -> Option { + (!self.immediate_fields.is_empty()).then(|| select(self.immediate_fields.clone(), root())) + } + + pub(crate) fn deferred_groups(&self) -> &[DeferredFieldGroup] { + &self.deferred_groups + } +} + +impl DeferredFieldGroup { + pub(crate) fn projection_expr(&self) -> Expression { + select(self.field_names.clone(), root()) + } +} + +fn simple_root_projection_fields(projection: &Expression, dtype: &DType) -> Option { + let struct_fields = dtype.as_struct_fields_opt()?; + if projection.is::() { + return Some(struct_fields.names().clone()); + } + + projection + .as_opt::