Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion encodings/fastlanes/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ pub fn vortex_fastlanes::DeltaArray::len(&self) -> usize

pub fn vortex_fastlanes::DeltaArray::offset(&self) -> usize

pub fn vortex_fastlanes::DeltaArray::try_from_delta_compress_parts(bases: vortex_array::array::ArrayRef, deltas: vortex_array::array::ArrayRef) -> vortex_error::VortexResult<Self>
pub fn vortex_fastlanes::DeltaArray::try_from_delta_compress_parts(bases: vortex_array::array::ArrayRef, deltas: vortex_array::array::ArrayRef, logical_len: usize) -> vortex_error::VortexResult<Self>

pub fn vortex_fastlanes::DeltaArray::try_from_primitive_array(array: &vortex_array::arrays::primitive::array::PrimitiveArray, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<Self>

Expand Down
129 changes: 111 additions & 18 deletions encodings/fastlanes/src/bit_transpose/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,23 @@ pub fn untranspose_validity(validity: &Validity, ctx: &mut ExecutionCtx) -> Vort

#[inline]
pub fn untranspose_bitbuffer(bits: BitBuffer) -> BitBuffer {
assert!(
bits.inner().len().is_multiple_of(128),
"Transpose BitBuffer must be 128-byte aligned"
);
let (offset, len, bytes) = bits.into_inner();
match bytes.try_into_mut() {
Ok(mut bytes_mut) => {
let (chunks, _) = bytes_mut.as_chunks_mut::<128>();
let mut tmp = [0u8; 128];
for chunk in chunks {
untranspose_bits(chunk, &mut tmp);
chunk.copy_from_slice(&tmp);

if bytes.len().is_multiple_of(128) {
match bytes.try_into_mut() {
Ok(mut bytes_mut) => {
let (chunks, _) = bytes_mut.as_chunks_mut::<128>();
let mut tmp = [0u8; 128];
for chunk in chunks {
untranspose_bits(chunk, &mut tmp);
chunk.copy_from_slice(&tmp);
}
BitBuffer::new_with_offset(bytes_mut.freeze().into_byte_buffer(), len, offset)
}
BitBuffer::new_with_offset(bytes_mut.freeze().into_byte_buffer(), len, offset)
Err(bytes) => bits_op_with_copy(bytes, len, offset, untranspose_bits),
}
Err(bytes) => bits_op_with_copy(bytes, len, offset, untranspose_bits),
} else {
bits_op_with_copy(bytes, len, offset, untranspose_bits)
}
}

Expand Down Expand Up @@ -131,9 +132,101 @@ fn bits_op_with_copy<F: Fn(&[u8; 128], &mut [u8; 128])>(
}

unsafe { output.set_len(output_len) };
BitBuffer::new_with_offset(
output.freeze().into_byte_buffer(),
len.next_multiple_of(1024),
offset,
)
BitBuffer::new_with_offset(output.freeze().into_byte_buffer(), len, offset)
}

#[cfg(test)]
mod tests {
use vortex_array::LEGACY_SESSION;
use vortex_array::VortexSessionExecute;
use vortex_array::validity::Validity;
use vortex_buffer::BitBuffer;
use vortex_buffer::BitBufferMut;
use vortex_buffer::ByteBuffer;

use super::*;

fn make_validity_bits(num_bits: usize) -> BitBuffer {
let mut builder = BitBufferMut::with_capacity(num_bits);
for i in 0..num_bits {
builder.append(i % 3 != 0);
}
builder.freeze()
}

fn force_copy_path(bits: BitBuffer) -> (BitBuffer, ByteBuffer) {
let (offset, len, bytes) = bits.into_inner();
let extra_ref = bytes.clone();
(BitBuffer::new_with_offset(bytes, len, offset), extra_ref)
}

#[test]
fn transpose_roundtrip_preserves_len_inplace_path() {
let bits = make_validity_bits(1024);
assert_eq!(bits.len(), 1024);

let transposed = transpose_bitbuffer(bits.clone());
assert_eq!(transposed.len(), 1024);

let roundtripped = untranspose_bitbuffer(transposed);
assert_eq!(roundtripped.len(), 1024);
assert_eq!(roundtripped, bits);
}

#[test]
fn transpose_roundtrip_preserves_len_copy_path() {
let bits = make_validity_bits(1024);
let (bits_shared, _hold) = force_copy_path(bits.clone());

let transposed = transpose_bitbuffer(bits_shared);
assert_eq!(transposed.len(), 1024);

let roundtripped = untranspose_bitbuffer(transposed);
assert_eq!(roundtripped.len(), 1024);
assert_eq!(roundtripped, bits);
}

#[test]
fn transpose_preserves_len_non_aligned_copy_path() {
let bits = make_validity_bits(500);
assert_eq!(bits.len(), 500);

let transposed = transpose_bitbuffer(bits);
assert_eq!(transposed.len(), 500);
}

#[test]
fn transpose_inplace_and_copy_produce_same_bits() {
let bits = make_validity_bits(2048);

let inplace_result = transpose_bitbuffer(bits.clone());

let (bits_shared, _hold) = force_copy_path(bits);
let copy_result = transpose_bitbuffer(bits_shared);

assert_eq!(inplace_result.len(), copy_result.len());
assert_eq!(inplace_result, copy_result);
}

#[test]
fn transpose_validity_roundtrip_non_aligned() {
let bits = make_validity_bits(1500);
let validity = Validity::Array(BoolArray::new(bits, Validity::NonNullable).into_array());

let mut ctx = LEGACY_SESSION.create_execution_ctx();
let transposed = transpose_validity(&validity, &mut ctx).unwrap();

if let Validity::Array(arr) = &transposed {
assert_eq!(arr.len(), 1500);
} else {
panic!("expected Validity::Array");
}

let roundtripped = untranspose_validity(&transposed, &mut ctx).unwrap();
if let Validity::Array(arr) = &roundtripped {
assert_eq!(arr.len(), 1500);
} else {
panic!("expected Validity::Array");
}
}
}
48 changes: 18 additions & 30 deletions encodings/fastlanes/src/bitpacking/array/bitpack_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use fastlanes::BitPacking;
use itertools::Itertools;
use num_traits::PrimInt;
use vortex_array::IntoArray;
use vortex_array::arrays::PrimitiveArray;
Expand Down Expand Up @@ -205,22 +204,16 @@ pub fn gather_patches(
bit_width: u8,
num_exceptions_hint: usize,
) -> VortexResult<Option<Patches>> {
let patch_validity = match parray.validity() {
Validity::NonNullable => Validity::NonNullable,
_ => Validity::AllValid,
};

let array_len = parray.len();
let validity_mask = parray.validity_mask()?;
let validity = parray.validity_mask()?;

let patches = if array_len < u8::MAX as usize {
match_each_integer_ptype!(parray.ptype(), |T| {
gather_patches_impl::<T, u8>(
parray.as_slice::<T>(),
bit_width,
num_exceptions_hint,
patch_validity,
validity_mask,
&validity,
)?
})
} else if array_len < u16::MAX as usize {
Expand All @@ -229,8 +222,7 @@ pub fn gather_patches(
parray.as_slice::<T>(),
bit_width,
num_exceptions_hint,
patch_validity,
validity_mask,
&validity,
)?
})
} else if array_len < u32::MAX as usize {
Expand All @@ -239,8 +231,7 @@ pub fn gather_patches(
parray.as_slice::<T>(),
bit_width,
num_exceptions_hint,
patch_validity,
validity_mask,
&validity,
)?
})
} else {
Expand All @@ -249,8 +240,7 @@ pub fn gather_patches(
parray.as_slice::<T>(),
bit_width,
num_exceptions_hint,
patch_validity,
validity_mask,
&validity,
)?
})
};
Expand All @@ -262,15 +252,15 @@ fn gather_patches_impl<T, P>(
data: &[T],
bit_width: u8,
num_exceptions_hint: usize,
patch_validity: Validity,
validity_mask: Mask,
validity: &Mask,
) -> VortexResult<Option<Patches>>
where
T: PrimInt + NativePType,
P: IntegerPType,
{
let mut indices: BufferMut<P> = BufferMut::with_capacity(num_exceptions_hint);
let mut values: BufferMut<T> = BufferMut::with_capacity(num_exceptions_hint);
let mut patch_validity = Vec::with_capacity(num_exceptions_hint);

let total_chunks = data.len().div_ceil(1024);
let mut chunk_offsets: BufferMut<u64> = BufferMut::with_capacity(total_chunks);
Expand All @@ -281,11 +271,10 @@ where
chunk_offsets.push(values.len() as u64);
}

if (value.leading_zeros() as usize) < T::PTYPE.bit_width() - bit_width as usize
&& validity_mask.value(idx)
{
if (value.leading_zeros() as usize) < T::PTYPE.bit_width() - bit_width as usize {
indices.push(P::from(idx).vortex_expect("cast index from usize"));
values.push(*value);
patch_validity.push(validity.value(idx));
}
}

Expand All @@ -296,7 +285,7 @@ where
data.len(),
0,
indices.into_array(),
PrimitiveArray::new(values, patch_validity).into_array(),
PrimitiveArray::new(values, Validity::from_iter(patch_validity)).into_array(),
Some(chunk_offsets.into_array()),
)?))
}
Expand Down Expand Up @@ -324,14 +313,12 @@ fn bit_width_histogram_typed<T: NativePType + PrimInt>(
// All values are invalid
bit_widths[0] = array.len();
}
AllOr::Some(buffer) => {
// Some values are valid
for (is_valid, v) in buffer.iter().zip_eq(array.as_slice::<T>()) {
if is_valid {
bit_widths[bit_width(*v)] += 1;
} else {
bit_widths[0] += 1;
}
AllOr::Some(_) => {
// Count actual bit widths for all values regardless of validity.
// This ensures patches are created for values at null positions that
// exceed the chosen bit width, preserving byte-level data integrity.
for v in array.as_slice::<T>() {
bit_widths[bit_width(*v)] += 1;
}
}
}
Expand Down Expand Up @@ -462,7 +449,8 @@ mod test {
);
assert!(values.ptype().is_unsigned_int());
let compressed = BitPackedArray::encode(&values.into_array(), 4).unwrap();
assert!(compressed.patches().is_none());
// Values 16-23 at null positions still get patches to preserve byte-level integrity.
assert!(compressed.patches().is_some());
assert_eq!(
(0..(1 << 4)).collect::<Vec<_>>(),
compressed
Expand Down
Loading
Loading