From c1c2c5cda3d7647b2fe8edd185dd26b0925a7e40 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Fri, 20 Mar 2026 17:40:25 +0000 Subject: [PATCH] C Scan API: initial --- proposed/0030-c-scan-api.md | 393 ++++++++++++++++++++++++++++++++++++ 1 file changed, 393 insertions(+) create mode 100644 proposed/0030-c-scan-api.md diff --git a/proposed/0030-c-scan-api.md b/proposed/0030-c-scan-api.md new file mode 100644 index 0000000..706deb5 --- /dev/null +++ b/proposed/0030-c-scan-api.md @@ -0,0 +1,393 @@ +- Start Date: 2026-03-13 +- Authors: Mikhail Kot + +# C Scan API + +There is a scan API for Rust-compatible code available at +https://github.com/vortex-data/vortex/tree/develop/vortex-scan. + +The goal of introducing C scan API is to make integration with non-Rust query +engines like Velox easier. In theory, such engines can use cxx.rs, but it +requires a lot of binding code and runtime bridging (see below). + +There exists a partial scan API for C exposed over files [1], but it's limited +to single-file URIs without globs, and it's also not thread-safe. Its main +flaws, however, are: + +- Inability to export to well-known format like ArrowArrayStream, +- Lack of introspection over produced `vx_array`s, and +- Inability to control scan on a level lower than just getting partitions and + `vx_array`s with filters and projections pre-configured. + + Why does Scan API need to expose `vx_array`s? What's the benefit of using + own format over ArrowArrayStream? + + The answer is "compression". Vortex DTypes don't exactly match with Arrow + physical encodings, so if you have i.e. a ConstantArray, you need to + decompress it into something Arrow-compatible. This was a major regression + in Duckdb integration. + +C++ API works it out by allowing to produce an ArrowArrayStream interface out of +ScanBuilder, but it uses Rust code directly via cxx.rs which we want to avoid +while adding C interfaces. C++ API future is outside of scope of this proposal +but it's expected to wrap C API directly over time, removing dependency on +cxx.rs for vortex-cxx. + +## Customization points + +Main goal of providing customization points is to do as little work as possible +in Vortex code and as much work as possible in the query engine. Some engines +may request control over scan execution like pruning. Engines like Duckdb have +own remote storage, caching, and globbing implementations. API still needs an +ability to fall back to own implementation. + +Still, Scan API is a relatively high-level concept, and if its level is not +suffifient, engines can resort to using a layout reader plan and executing it +directly. + +## Datasource + +A Datasource is a reference to multiple possibly remote files. When created, it +opens first file to determine the schema from DType, all other operations are +deferred till a scan is requested. You can request multiple file scans from a +Datasource. + +```c +// Opaque, generated by bindgen +typedef struct vx_data_source vx_data_source; +typedef struct vx_file_handle vx_file_handle; + +typedef void (*vx_list_callback)(void* userdata, const char* name, int is_dir); +typedef void (*vx_glob_callback)(void* userdata, const char* file); + +typedef struct vx_data_source_options { + // (1) Filesystem customization + + bool (*fs_use_vortex)(const char* schema, const char* path); + void (*fs_set_userdata)(void* userdata); + + // should be called after glob expansion, single-file mode + vx_file_handle (*fs_open)(void* userdata, const char* path, vx_error** err); + vx_file_handle (*fs_create)(void* userdata, const char* path, vx_error** err); + + // non-recursive, callback is invoked with each path + void fs_list(void* userdata, const char* path, vx_list_callback cb, vx_error *err); + + void fs_close(vx_file_handle handle); + uint64_t fs_size(vx_file_handle handle, vx_error *err); + + // positional read, doesn't change file open cursor + void fs_read(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, + vx_error *err); + + // not needed for scanning but makes FS API complete + void fs_write(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, + vx_error *err); + void fs_sync(vx_file_handle handle, vx_error *err); + + // (2) Globbing customization + + void (*glob)(const char* glob, vx_glob_callback cb, vx_error* err); + + /// (3) Cache customization + + void* (*cache_init)(vx_error* err); + void (*cache_free)(void* cache, vx_error* err); + void (*cache_get)(void* cache, const char* key, void** value, vx_error* err); + void (*cache_put)(void* cache, const char* key, void* value, vx_error* err); + void (*cache_delete)(void* cache, const char* key, vx_error* err); +} vx_data_source_options; + +// Addition to existing DType API, returns owned ArrowSchema which needs to +// be freed by the caller using release callback. If err is populated, out +// is not set. +void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, vx_error* err); + +/// Create a new owned datasource which must be freed by the caller. +const vx_data_source * +vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error_out err); + +// datasource is Arc'd inside so a clone creates another reference rather +// than cloning the state fully. +const vx_data_source *vx_data_source_clone(const vx_data_source *ptr); + +// vx_dtype's lifetime is bound to datasource's lifetime, caller doesn't need +// to free it +const vx_dtype *vx_data_source_dtype(const vx_data_source *ds); + +typedef enum { + VX_CARD_UNKNOWN = 0, + VX_CARD_ESTIMATE = 1, + VX_CARD_MAXIMUM = 2, +} vx_cardinality; +typedef struct { + vx_cardinality cardinality; + uint64_t rows; +} vx_data_source_row_count; + +void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc); +void vx_data_source_free(const vx_data_source *ptr); +``` + +1. Open local or remote file. Allow using vortex's filesystem or query engine's + filesystem e.g. duckdb fs. Allow partial customization e.g. duckdb fs for + local reads, but vortex fs for remote reads. Remote filesystem customization + point has the benefit of not duplicating credentials e.g. S3 access key + between query engine and vortex. Local implementation may be more performant. + Vortex rolled back full duckdb fs usage due to performance implications. +2. Open single file or multiple files. Query engines may have their own glob + expansion [4] which does HTTP IO. +3. Cache intermediate results. Main use case for Vortex is caching schema in + memory for footer cache and conversion results. Benefits are in no + requirement to open first file in a glob eagerly if there's a cache hit. + Vortex had had an integration with Duckdb object cache which was deleted in + favor of own implementation which led to a performance regression. + +When all three customization points are implemented, Vortex offloads all IO +to a query engine. + + Why not expose API to consume byte ranges or emit byte range requests for + the query engine to read and populate buffers? + + This approach is indeed easier than using a vtable with a specific + implementation, and requires slightly more scaffolding on the query engine + side, but it's significantly easier to implement on Vortex side and it is + coherent with current Rust implementation. + Similar implementation can be found in Arrow's RandomAccessFile or + parquet-rs's AsyncFileReader. + + However, as we're thinking of changing our Rust API, we can try to invest + time into this approach as well. + +Coupled with https://github.com/vortex-data/vortex/pull/7012 it also allows +Duckdb integration to abstract memory allocations to the database. + +## Runtime bridging + +In Rust API, a Datasource produces a stream of Partitions. A Partition produces +a stream of Arrays. API is required to be used in an async runtime, current +runtime for Vortex is tokio. + +Velox uses a non-coroutine but async runtime based on Folly executors. Engines +like CoroBase use a coroutine-based runtime. Duckdb and ClickHouse runtimes are sync +based on thread pools. Postgres runtime is sync based on processes. Some of +engines may use OS-specific IO like `io_uring`. + +All potential usages of our API may be grouped into 4 cases: + +- sync, single-thread runtime, trivial. (1) +- sync, multi-thread runtime. (2) +- async, multi-thread/coroutine. (3) +- async, single-thread. (4) + +Scan/Partition suggestions outlined below play well with (2) but not with (3) +and (4) because Vortex has its own runtime which will block on current thread +when i.e. getting an Array out of Partition. An async-friendly API basically +means exposing a coroutine/state machine which hands control over to the host on +IO. + +As Joe mentioned, we want to get away from the concept of partitions and emit +chunks of vx_array's directly from the scan. In this case, such state machine +may be expressed with roughly the following states: + +``` + Passed a file handle +START -> NEED_IO (offset, len) -> EXECUTE -> DONE + ^ When passed a file handle, instructs host to read following byte + range into buffer and return a handle to this buffer. + ^ Decompresses the buffer (executes the Array) + one step into other buffer + ^ + Array is executed/canonicalized to the form host can work with. + Host now transfers data from buffers to its own output format. +``` + +However, as the future of such approach is unclear, a async-unfriendly option is +described below + +## Scan + +Scan iterators: + +```c +``` + +Scan options: + +```c +typedef enum { + VX_S_INCLUDE_ALL = 0, + VX_S_INCLUDE_RANGE = 1, + VX_S_EXCLUDE_RANGE = 2, +} vx_scan_selection_include; + +typedef struct { + uint64_t *idx; + size_t idx_len; + // Roaring bitmaps won't be supported as for now + // If selection is VX_S_INCLUDE_ALL, these are not read. idx is copied by query + // engine on scan invocation and can be freed after a scan iterator is requested + vx_scan_selection_include include; +} vx_scan_selection; + +typedef struct vx_scan_selection { + const size_t* idx; + size_t idx_len; +} vx_scan_selection; + +typedef struct vx_scan_options { + // May be NULL which means "return all columns" + const vx_expression *projection; + + // May be NULL which means "no filter" + const vx_expression *filter; + + // Set both to 0 to indicate no range request + // Inclusive + uint64_t row_range_begin; + // Exclusive + uint64_t row_range_end; + + vx_scan_selection selection; + + // 0 for no limit + uint64_t limit; + int ordered; +} vx_scan_options; +``` + +Scan interface: + +```c +typedef struct vx_scan vx_scan; + +/** + * A partition is a contiguous chunk of memory from which you can interatively + * get vx_arrays. + */ +typedef struct vx_partition vx_partition; + +typedef enum { + VX_ESTIMATE_UNKNOWN = 0, + VX_ESTIMATE_EXACT = 1, + VX_ESTIMATE_INEXACT = 2, +} vx_estimate_boundary; + +typedef struct { + // If type is VX_P_ESTIMATE_UNKNOWN, estimate field is not populated + uint64_t estimate; + vx_estimate_boundary boundary; +} vx_estimate; + +// Users are encouraged to create worker threads depending on est->estimate to +// distribute work. +// opts and est may be nullptr. +// Requesting a scan doesn't do anything unless vx_partition_next is called. +vx_scan * +vx_data_source_scan(const vx_data_source *data_source, const vx_scan_options *options, vx_error_out err); + +/** + * Get next owned partition out of a scan request. + * Caller must free this partition using vx_partition_free. + * This method is thread-safe. + * If using in a sync multi-thread runtime, users are encouraged to create a + * worker thread per partition. + * Returns NULL and doesn't set err on exhaustion. + * Returns NULL and sets err on error. + */ +vx_partition *vx_scan_next(vx_scan *scan, vx_error_out err); + +// Request an array stream in Arrow format from a partition, consuming it +// fully. Not thread-safe, should be called once. +// stream is owned and must be freed using the release callback +void vx_partition_scan_arrow(const vx_partition *partition, FFI_ArrowArrayStream *stream, vx_error_out err); + +// Thread-unsafe. Get an owned vx_array of an iterator. +// Returns NULL if iterator is exhausted. Array is owned and must be +// freed by caller. +const vx_array *vx_partition_next(vx_partition *partition, vx_error_out err); +``` + +There are examples of APIs also exposing batch reads, but I doubt this is a good +option as for every ArrayRef the work that needs to be done to execute it may be +significant, and if you want to parallelize work, you can use this with +partitions, so each thread will be still busy with one ArrayRef at a time. +It can be introduced in the future. + +Scan functions are lazy as they operate on streams and it is +consumer's code responsibility to use parallelism at the desired degree. + +## What to do with `vx_array` + +The main question is how to transform outputs of iteration, vx_array, into +something query engines can operate with. You need to execute the array +iteratively till you recognize data and start exporting it. Duckdb integration +is mostly written in Rust with C++ code calling Rust's vtable functions. Rust +code does all data export. PoC implementation moves Duckdb to use C API but +leaves existing Rust code for exporting `vx_array` into DataChunk. + +However, the goal is not to interface with Rust code, so as a baseline the API +provides a way to scan partitions directly into ArrowArrayStream which should be +good enough for most consumers. + +## Cancellation + +There will be no option to cancel the scan as this isn't possibe on Rust API +either and this is a low priority task. + +## Testing + +C API doesn't have any testing. I suggest setting up a Catch3 testing target and +a CMake library for C API using FetchContent to download Catch. This way people +not working on Duckdb integration or FFI wouldn't need CMake and Catch. To +integrate C tests with `cargo test`, we can write a `build.rs` extension which +parses C test names and codegenerates rust tests targets calling to Catch. + +## Duckdb integration PoC + +``` +before: +Duckdb side Vortex side + +C++ C++ Rust +duckdb -> TableFunction vtable -> ffi wrapping -> vtable implementation + +after: + +C++ C++ C +duckdb -> TableFunction vtable -> ffi_wrapping -> vx_scan()* + +* - vx_array -> DataChunk reuses existing Rust code +``` + +https://github.com/vortex-data/vortex/tree/myrrc/scan-api-duckdb has an +implementation of using C Scan API for Duckdb scan integration. Duckdb has a +sync multi-threaded runtime, and table function is called from multiple threads +simultaneously. Users can save a per-thread state. + +The integration splits into following parts: +- DType -> LogicalType integration, done sans Temporal extension. +- Table function binding (creating a DataSource), done. +- Global state initialization (creating a Scan), done sans filter pushdown. +- Local state initialization (export batch id), done. +- Utility functions like cardinality estimates, done. +- vx_array -> DataChunk export, delegated to existing Rust code. + +On filter pushdown: projection pushdown requires exposing only `select()` +expression. On the other hand, filter pushdown requires `TableFilter -> Vortex +Expression` conversion which is significant porting so left out. + +On DataChunk export: it requires exposing features like array optimization, +validity masks, and other features, so left out. + +Table function uses Vortex _partition_ concepts as a work splitting term only, +i.e. one worker thread operating on one or multiple partitions. Each thread +pulls out partitions from `vx_scan_next` (thus it's thread-safe) and then +works on its own partition without synchronization. + +[1] `vx_file_scan` + +[2] Need to control pruning + https://spiraldb.slack.com/archives/C0AJS0HDS6R/p1773068549282999 + +[4] e.g. Duckdb MultiFileReader / MultiFileList