Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
393 changes: 393 additions & 0 deletions proposed/0030-c-scan-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,393 @@
- Start Date: 2026-03-13
- Authors: Mikhail Kot

# C Scan API

There is a scan API for Rust-compatible code available at
https://github.com/vortex-data/vortex/tree/develop/vortex-scan.

The goal of introducing C scan API is to make integration with non-Rust query
engines like Velox easier. In theory, such engines can use cxx.rs, but it
requires a lot of binding code and runtime bridging (see below).

There exists a partial scan API for C exposed over files [1], but it's limited
to single-file URIs without globs, and it's also not thread-safe. Its main
flaws, however, are:

- Inability to export to well-known format like ArrowArrayStream,
- Lack of introspection over produced `vx_array`s, and
- Inability to control scan on a level lower than just getting partitions and
`vx_array`s with filters and projections pre-configured.

Why does Scan API need to expose `vx_array`s? What's the benefit of using
own format over ArrowArrayStream?

The answer is "compression". Vortex DTypes don't exactly match with Arrow
physical encodings, so if you have i.e. a ConstantArray, you need to
decompress it into something Arrow-compatible. This was a major regression
in Duckdb integration.

C++ API works it out by allowing to produce an ArrowArrayStream interface out of
ScanBuilder, but it uses Rust code directly via cxx.rs which we want to avoid
while adding C interfaces. C++ API future is outside of scope of this proposal
but it's expected to wrap C API directly over time, removing dependency on
cxx.rs for vortex-cxx.

## Customization points

Main goal of providing customization points is to do as little work as possible
in Vortex code and as much work as possible in the query engine. Some engines
may request control over scan execution like pruning. Engines like Duckdb have
own remote storage, caching, and globbing implementations. API still needs an
ability to fall back to own implementation.

Still, Scan API is a relatively high-level concept, and if its level is not
suffifient, engines can resort to using a layout reader plan and executing it
directly.

## Datasource

A Datasource is a reference to multiple possibly remote files. When created, it
opens first file to determine the schema from DType, all other operations are
deferred till a scan is requested. You can request multiple file scans from a
Datasource.

```c
// Opaque, generated by bindgen
typedef struct vx_data_source vx_data_source;
typedef struct vx_file_handle vx_file_handle;

typedef void (*vx_list_callback)(void* userdata, const char* name, int is_dir);
typedef void (*vx_glob_callback)(void* userdata, const char* file);

typedef struct vx_data_source_options {
// (1) Filesystem customization

bool (*fs_use_vortex)(const char* schema, const char* path);
void (*fs_set_userdata)(void* userdata);

// should be called after glob expansion, single-file mode
vx_file_handle (*fs_open)(void* userdata, const char* path, vx_error** err);
vx_file_handle (*fs_create)(void* userdata, const char* path, vx_error** err);

// non-recursive, callback is invoked with each path
void fs_list(void* userdata, const char* path, vx_list_callback cb, vx_error *err);

void fs_close(vx_file_handle handle);
uint64_t fs_size(vx_file_handle handle, vx_error *err);

// positional read, doesn't change file open cursor
void fs_read(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer,
vx_error *err);

// not needed for scanning but makes FS API complete
void fs_write(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer,
vx_error *err);
void fs_sync(vx_file_handle handle, vx_error *err);

// (2) Globbing customization

void (*glob)(const char* glob, vx_glob_callback cb, vx_error* err);

/// (3) Cache customization

void* (*cache_init)(vx_error* err);
void (*cache_free)(void* cache, vx_error* err);
void (*cache_get)(void* cache, const char* key, void** value, vx_error* err);
void (*cache_put)(void* cache, const char* key, void* value, vx_error* err);
void (*cache_delete)(void* cache, const char* key, vx_error* err);
Comment on lines +94 to +98
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the way we want to expose a cache?

I feel like for Vortex arrays specifically this is way too generic to be useful. What exactly would we be storing in the cache? Just vortex arrays, or things like stats? Partitions? Array trees? And if we are passing around void * values then shouldn't there also be a size parameter somewhere? Also things related to ownership and eviction semantics, etc are missing.

} vx_data_source_options;

// Addition to existing DType API, returns owned ArrowSchema which needs to
// be freed by the caller using release callback. If err is populated, out
// is not set.
void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, vx_error* err);

/// Create a new owned datasource which must be freed by the caller.
const vx_data_source *
vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error_out err);

// datasource is Arc'd inside so a clone creates another reference rather
// than cloning the state fully.
const vx_data_source *vx_data_source_clone(const vx_data_source *ptr);

// vx_dtype's lifetime is bound to datasource's lifetime, caller doesn't need
// to free it
const vx_dtype *vx_data_source_dtype(const vx_data_source *ds);

typedef enum {
VX_CARD_UNKNOWN = 0,
VX_CARD_ESTIMATE = 1,
VX_CARD_MAXIMUM = 2,
} vx_cardinality;
typedef struct {
vx_cardinality cardinality;
uint64_t rows;
} vx_data_source_row_count;

void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc);
void vx_data_source_free(const vx_data_source *ptr);
```
1. Open local or remote file. Allow using vortex's filesystem or query engine's
filesystem e.g. duckdb fs. Allow partial customization e.g. duckdb fs for
local reads, but vortex fs for remote reads. Remote filesystem customization
point has the benefit of not duplicating credentials e.g. S3 access key
between query engine and vortex. Local implementation may be more performant.
Vortex rolled back full duckdb fs usage due to performance implications.
2. Open single file or multiple files. Query engines may have their own glob
expansion [4] which does HTTP IO.
3. Cache intermediate results. Main use case for Vortex is caching schema in
memory for footer cache and conversion results. Benefits are in no
requirement to open first file in a glob eagerly if there's a cache hit.
Vortex had had an integration with Duckdb object cache which was deleted in
favor of own implementation which led to a performance regression.
When all three customization points are implemented, Vortex offloads all IO
to a query engine.
Why not expose API to consume byte ranges or emit byte range requests for
the query engine to read and populate buffers?
This approach is indeed easier than using a vtable with a specific
implementation, and requires slightly more scaffolding on the query engine
side, but it's significantly easier to implement on Vortex side and it is
coherent with current Rust implementation.
Similar implementation can be found in Arrow's RandomAccessFile or
parquet-rs's AsyncFileReader.
However, as we're thinking of changing our Rust API, we can try to invest
time into this approach as well.
Coupled with https://github.com/vortex-data/vortex/pull/7012 it also allows
Duckdb integration to abstract memory allocations to the database.
## Runtime bridging
In Rust API, a Datasource produces a stream of Partitions. A Partition produces
a stream of Arrays. API is required to be used in an async runtime, current
runtime for Vortex is tokio.
Velox uses a non-coroutine but async runtime based on Folly executors. Engines
like CoroBase use a coroutine-based runtime. Duckdb and ClickHouse runtimes are sync
based on thread pools. Postgres runtime is sync based on processes. Some of
engines may use OS-specific IO like `io_uring`.
All potential usages of our API may be grouped into 4 cases:
- sync, single-thread runtime, trivial. (1)
- sync, multi-thread runtime. (2)
- async, multi-thread/coroutine. (3)
- async, single-thread. (4)
Scan/Partition suggestions outlined below play well with (2) but not with (3)
and (4) because Vortex has its own runtime which will block on current thread
when i.e. getting an Array out of Partition. An async-friendly API basically
means exposing a coroutine/state machine which hands control over to the host on
IO.
As Joe mentioned, we want to get away from the concept of partitions and emit
chunks of vx_array's directly from the scan. In this case, such state machine
may be expressed with roughly the following states:
```
Passed a file handle
START -> NEED_IO (offset, len) -> EXECUTE -> DONE
^ When passed a file handle, instructs host to read following byte
range into buffer and return a handle to this buffer.
^ Decompresses the buffer (executes the Array)
one step into other buffer
^
Array is executed/canonicalized to the form host can work with.
Host now transfers data from buffers to its own output format.
```
However, as the future of such approach is unclear, a async-unfriendly option is
described below
Comment on lines +205 to +206
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be valuable to actually think through what an async API looks like in practice, because it is almost certainly the case that the devil is in the details.

Otherwise, this should be in an unresolved questions section and we should acknowledge we are not going to work on this now.

## Scan
Scan iterators:
```c
```
Comment on lines +212 to +213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empty?


Scan options:

```c
typedef enum {
VX_S_INCLUDE_ALL = 0,
VX_S_INCLUDE_RANGE = 1,
VX_S_EXCLUDE_RANGE = 2,
} vx_scan_selection_include;

typedef struct {
uint64_t *idx;
size_t idx_len;
// Roaring bitmaps won't be supported as for now
// If selection is VX_S_INCLUDE_ALL, these are not read. idx is copied by query
// engine on scan invocation and can be freed after a scan iterator is requested
vx_scan_selection_include include;
} vx_scan_selection;

typedef struct vx_scan_selection {
const size_t* idx;
size_t idx_len;
} vx_scan_selection;
Comment on lines +233 to +236
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a remnant of a revision (this exists above)


typedef struct vx_scan_options {
// May be NULL which means "return all columns"
const vx_expression *projection;

// May be NULL which means "no filter"
const vx_expression *filter;

// Set both to 0 to indicate no range request
// Inclusive
uint64_t row_range_begin;
// Exclusive
uint64_t row_range_end;

vx_scan_selection selection;

// 0 for no limit
uint64_t limit;
int ordered;
} vx_scan_options;
```

Scan interface:

```c
typedef struct vx_scan vx_scan;

/**
* A partition is a contiguous chunk of memory from which you can interatively
* get vx_arrays.
*/
typedef struct vx_partition vx_partition;

typedef enum {
VX_ESTIMATE_UNKNOWN = 0,
VX_ESTIMATE_EXACT = 1,
VX_ESTIMATE_INEXACT = 2,
} vx_estimate_boundary;

typedef struct {
// If type is VX_P_ESTIMATE_UNKNOWN, estimate field is not populated
uint64_t estimate;
vx_estimate_boundary boundary;
} vx_estimate;

// Users are encouraged to create worker threads depending on est->estimate to
// distribute work.
// opts and est may be nullptr.
// Requesting a scan doesn't do anything unless vx_partition_next is called.
vx_scan *
vx_data_source_scan(const vx_data_source *data_source, const vx_scan_options *options, vx_error_out err);

/**
* Get next owned partition out of a scan request.
* Caller must free this partition using vx_partition_free.
* This method is thread-safe.
* If using in a sync multi-thread runtime, users are encouraged to create a
* worker thread per partition.
* Returns NULL and doesn't set err on exhaustion.
* Returns NULL and sets err on error.
*/
vx_partition *vx_scan_next(vx_scan *scan, vx_error_out err);

// Request an array stream in Arrow format from a partition, consuming it
// fully. Not thread-safe, should be called once.
// stream is owned and must be freed using the release callback
void vx_partition_scan_arrow(const vx_partition *partition, FFI_ArrowArrayStream *stream, vx_error_out err);

// Thread-unsafe. Get an owned vx_array of an iterator.
// Returns NULL if iterator is exhausted. Array is owned and must be
// freed by caller.
const vx_array *vx_partition_next(vx_partition *partition, vx_error_out err);
```
There are examples of APIs also exposing batch reads, but I doubt this is a good
option as for every ArrayRef the work that needs to be done to execute it may be
significant, and if you want to parallelize work, you can use this with
partitions, so each thread will be still busy with one ArrayRef at a time.
It can be introduced in the future.
Scan functions are lazy as they operate on streams and it is
consumer's code responsibility to use parallelism at the desired degree.
## What to do with `vx_array`
The main question is how to transform outputs of iteration, vx_array, into
something query engines can operate with. You need to execute the array
iteratively till you recognize data and start exporting it. Duckdb integration
is mostly written in Rust with C++ code calling Rust's vtable functions. Rust
code does all data export. PoC implementation moves Duckdb to use C API but
leaves existing Rust code for exporting `vx_array` into DataChunk.
However, the goal is not to interface with Rust code, so as a baseline the API
provides a way to scan partitions directly into ArrowArrayStream which should be
good enough for most consumers.
## Cancellation
There will be no option to cancel the scan as this isn't possibe on Rust API
either and this is a low priority task.
## Testing
C API doesn't have any testing. I suggest setting up a Catch3 testing target and
a CMake library for C API using FetchContent to download Catch. This way people
not working on Duckdb integration or FFI wouldn't need CMake and Catch. To
integrate C tests with `cargo test`, we can write a `build.rs` extension which
parses C test names and codegenerates rust tests targets calling to Catch.
## Duckdb integration PoC
```
before:
Duckdb side Vortex side

C++ C++ Rust
duckdb -> TableFunction vtable -> ffi wrapping -> vtable implementation

after:

C++ C++ C
duckdb -> TableFunction vtable -> ffi_wrapping -> vx_scan()*

* - vx_array -> DataChunk reuses existing Rust code
```
https://github.com/vortex-data/vortex/tree/myrrc/scan-api-duckdb has an
implementation of using C Scan API for Duckdb scan integration. Duckdb has a
sync multi-threaded runtime, and table function is called from multiple threads
simultaneously. Users can save a per-thread state.
The integration splits into following parts:
- DType -> LogicalType integration, done sans Temporal extension.
- Table function binding (creating a DataSource), done.
- Global state initialization (creating a Scan), done sans filter pushdown.
- Local state initialization (export batch id), done.
- Utility functions like cardinality estimates, done.
- vx_array -> DataChunk export, delegated to existing Rust code.
On filter pushdown: projection pushdown requires exposing only `select()`
expression. On the other hand, filter pushdown requires `TableFilter -> Vortex
Expression` conversion which is significant porting so left out.
On DataChunk export: it requires exposing features like array optimization,
validity masks, and other features, so left out.
Table function uses Vortex _partition_ concepts as a work splitting term only,
i.e. one worker thread operating on one or multiple partitions. Each thread
pulls out partitions from `vx_scan_next` (thus it's thread-safe) and then
works on its own partition without synchronization.
[1] `vx_file_scan`
[2] Need to control pruning
https://spiraldb.slack.com/archives/C0AJS0HDS6R/p1773068549282999
[4] e.g. Duckdb MultiFileReader / MultiFileList
Loading