diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..27c1d35 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,28 @@ +name: CI + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + + - name: Install uv + uses: astral-sh/setup-uv@v5 + + - name: Set up Python ${{ matrix.python-version }} + run: uv python install ${{ matrix.python-version }} + + - name: Install dependencies + run: uv sync --dev --python ${{ matrix.python-version }} + + - name: Run tests + run: uv run --python ${{ matrix.python-version }} pytest tests/ -v diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..e4fba21 --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.12 diff --git a/src/starfix/arrow_digester.py b/src/starfix/arrow_digester.py index 1803434..500197e 100644 --- a/src/starfix/arrow_digester.py +++ b/src/starfix/arrow_digester.py @@ -1,6 +1,7 @@ """Pure-Python implementation of the starfix Arrow logical hasher. -Produces identical hashes to the Rust implementation for all supported types. +Implements the byte-layout specification defined in the starfix Rust crate +(``nauticalab/starfix docs/byte-layout-spec.md``). """ from __future__ import annotations @@ -54,18 +55,13 @@ def raw_bytes(self) -> bytes: return bytes(self._bytes) -def _is_list_type(dt: pa.DataType) -> bool: - import pyarrow as pa - return pa.types.is_list(dt) or pa.types.is_large_list(dt) - - # --------------------------------------------------------------------------- -# Schema / DataType serialization (matches Rust `serialized_schema`) +# Schema / DataType serialization (spec Section 2) # --------------------------------------------------------------------------- def _data_type_to_value(dt: pa.DataType) -> object: """Convert a pyarrow DataType to the JSON-compatible value that matches - the Rust ``data_type_to_value`` output. + the canonical form described in spec Section 2.1. Types are normalized: Utf8→LargeUtf8, Binary→LargeBinary, List→LargeList, Dictionary→value_type. Struct fields are sorted alphabetically. @@ -202,225 +198,331 @@ def _hash_schema(schema: pa.Schema) -> bytes: # --------------------------------------------------------------------------- -# DigestBufferType: (nullable, BitVec|None, structural_sha256|None, data_sha256) +# DigestBufferType (spec Section 3: null_bits, structural, data) +# +# Each entry is a 3-tuple: (BitVec|None, sha256|None, sha256|None) +# [0] null_bits – present when nullable +# [1] structural – present for list entries +# [2] data – present for leaf and list-leaf entries # --------------------------------------------------------------------------- -def _new_digest_entry(nullable: bool, structured: bool) -> tuple: - """Create a digest entry matching Rust DigestBufferType.""" - return ( - nullable, - _BitVec() if nullable else None, - hashlib.sha256() if structured else None, - hashlib.sha256(), - ) +def _new_data_only(nullable: bool) -> tuple: + """Leaf field entry (spec Section 3 — data-only or validity+data).""" + return (_BitVec() if nullable else None, None, hashlib.sha256()) -# --------------------------------------------------------------------------- -# Field extraction (flatten structs into BTreeMap) -# --------------------------------------------------------------------------- +def _new_structural_only(nullable: bool) -> tuple: + """List-level entry whose value is a struct or nested list (spec Section 3).""" + return (_BitVec() if nullable else None, hashlib.sha256(), None) + + +def _new_list_leaf(nullable: bool) -> tuple: + """List-level entry whose value is a leaf type (spec Section 3).""" + return (_BitVec() if nullable else None, hashlib.sha256(), hashlib.sha256()) -def _extract_fields(field: pa.Field, parent: str, out: dict[str, tuple]): - import pyarrow as pa - full_name = f"{parent}{DELIMITER}{field.name}" if parent else field.name - if pa.types.is_struct(field.type): - for i in range(field.type.num_fields): - _extract_fields(field.type.field(i), full_name, out) - else: - out[full_name] = _new_digest_entry(field.nullable, _is_list_type(field.type)) + +def _new_validity_only() -> tuple: + """Nullable parent entry — just null_bits, no structural or data (spec Section 3).""" + return (_BitVec(), None, None) # --------------------------------------------------------------------------- -# Array data hashing +# Type decomposition into BTreeMap entries (spec Sections 3.4, 3.5) # --------------------------------------------------------------------------- -def _handle_null_bits(arr, bit_vec: _BitVec) -> None: - """Push validity bits for *arr* into *bit_vec*.""" - for i in range(len(arr)): - bit_vec.push(arr[i].is_valid) +def _extract_type_entries( + data_type: pa.DataType, + nullable: bool, + path: str, + out: dict[str, tuple], +) -> None: + """Recursively decompose a data type into BTreeMap entries. + + This implements the recursive decomposition described in spec Section 3: + - Structs are transparent — no entry, recurse into sorted children + - Lists create validity-only + structural/data entries + - Leaves create data entries + """ + import pyarrow as pa + canonical = _normalize_data_type(data_type) + + if pa.types.is_struct(canonical): + # Struct is transparent — no entry for the struct itself. + # Recurse into children sorted alphabetically (spec Section 3.5). + children = [canonical.field(i) for i in range(canonical.num_fields)] + children.sort(key=lambda f: f.name) + for child in children: + child_path = f"{path}{DELIMITER}{child.name}" if path else child.name + _extract_type_entries(child.type, child.nullable, child_path, out) + + elif pa.types.is_large_list(canonical) or pa.types.is_list(canonical): + # Nullable list: validity-only entry at `path` (spec Section 3.4) + if nullable: + out[path] = _new_validity_only() + + # List level: entry at path + "/" (spec Section 3.4) + list_path = f"{path}{DELIMITER}" + inner_field = canonical.value_field + inner_canonical = _normalize_data_type(inner_field.type) + + if pa.types.is_struct(inner_canonical): + # List: structural-only at list_path, struct children get own entries + out[list_path] = _new_structural_only(inner_field.nullable) + _extract_type_entries(inner_field.type, inner_field.nullable, list_path, out) + elif pa.types.is_large_list(inner_canonical) or pa.types.is_list(inner_canonical): + # List: structural-only, recurse into inner list + out[list_path] = _new_structural_only(inner_field.nullable) + _extract_type_entries(inner_field.type, inner_field.nullable, list_path, out) + else: + # List: list-leaf entry with both structural + data + out[list_path] = _new_list_leaf(inner_field.nullable) + else: + # Leaf type: data entry + out[path] = _new_data_only(nullable) -def _hash_fixed_size_array(arr, digest_entry, element_size: int) -> None: - """Hash a fixed-width array by reading raw buffers (matching Rust behaviour).""" - nullable, bit_vec, _structural, data_digest = digest_entry - bufs = arr.buffers() - data_buf = bufs[1] - offset = arr.offset +def _extract_fields(field: pa.Field, parent: str, out: dict[str, tuple]) -> None: + """Extract BTreeMap entries from a schema field (record-batch path).""" + full_name = f"{parent}{DELIMITER}{field.name}" if parent else field.name + _extract_type_entries(field.type, field.nullable, full_name, out) - raw = data_buf.to_pybytes() - start = offset * element_size - sliced = raw[start:] - if not nullable: - end = start + len(arr) * element_size - data_digest.update(raw[start:end]) - else: - _handle_null_bits(arr, bit_vec) - if arr.null_count > 0: - for i in range(len(arr)): - if arr[i].is_valid: - pos = i * element_size - data_digest.update(sliced[pos:pos + element_size]) - else: - end = len(arr) * element_size - data_digest.update(sliced[:end]) +# --------------------------------------------------------------------------- +# Type normalization (spec Section 2.1 — type canonicalization) +# --------------------------------------------------------------------------- +def _normalize_data_type(dt: pa.DataType) -> pa.DataType: + """Recursively normalize a DataType to its canonical large equivalent.""" + import pyarrow as pa -def _hash_boolean_array(arr, digest_entry) -> None: - nullable, bit_vec, _structural, data_digest = digest_entry + if pa.types.is_dictionary(dt): + return _normalize_data_type(dt.value_type) + if dt == pa.utf8(): + return pa.large_utf8() + if dt == pa.binary(): + return pa.large_binary() + if pa.types.is_list(dt) or pa.types.is_large_list(dt): + inner = _normalize_field(dt.value_field) + return pa.large_list(inner) + if pa.types.is_struct(dt): + fields = [_normalize_field(dt.field(i)) for i in range(dt.num_fields)] + return pa.struct(fields) + if pa.types.is_fixed_size_list(dt): + inner = _normalize_field(dt.value_field) + return pa.list_(inner, dt.list_size) + if pa.types.is_map(dt): + key_field = _normalize_field(dt.key_field) + item_field = _normalize_field(dt.item_field) + return pa.map_(key_field.type, item_field.type, keys_sorted=dt.keys_sorted) + return dt - if not nullable: - bv = _BitVec() - for i in range(len(arr)): - bv.push(arr[i].as_py()) - data_digest.update(bv.raw_bytes()) - else: - _handle_null_bits(arr, bit_vec) - bv = _BitVec() - for i in range(len(arr)): - if arr[i].is_valid: - bv.push(arr[i].as_py()) - data_digest.update(bv.raw_bytes()) +def _normalize_field(field: pa.Field) -> pa.Field: + """Normalize a single field: keep name and nullability, normalize the data type.""" + import pyarrow as pa + return pa.field(field.name, _normalize_data_type(field.type), field.nullable) -def _hash_binary_array(arr, digest_entry) -> None: - """Hash Binary / LargeBinary arrays. - Nullable null values are skipped entirely (only valid values are hashed). +# --------------------------------------------------------------------------- +# Array normalization helper +# --------------------------------------------------------------------------- + +def _normalize_array(data_type, array): + """Normalize small Arrow variants to their large canonical equivalents. + + Returns (effective_data_type, effective_array). """ - nullable, bit_vec, _structural, data_digest = digest_entry + import pyarrow as pa - if not nullable: - for i in range(len(arr)): - val = arr[i].as_py() - data_digest.update(struct.pack(" None: - """Hash Utf8 / LargeUtf8 arrays. +# --------------------------------------------------------------------------- +# Recursive traversal — populates BTreeMap entries from array data +# (spec Sections 3.1–3.5) +# --------------------------------------------------------------------------- - Nullable null values are skipped entirely (only valid values are hashed). +def _combine_null_masks(own_valid, ancestor_valid): + """AND-combine two validity lists. Returns None if all valid.""" + if own_valid is None and ancestor_valid is None: + return None + if own_valid is None: + return ancestor_valid + if ancestor_valid is None: + return own_valid + return [a and b for a, b in zip(own_valid, ancestor_valid)] + + +def _get_validity_list(array): + """Return a list of bools (True=valid) or None if no nulls.""" + if array.null_count == 0: + return None + return [array[i].is_valid for i in range(len(array))] + + +def _traverse_and_update(data_type, nullable, array, path, ancestor_nulls, fields): + """Top-down recursive traversal that routes data to BTreeMap entries. + + Parameters: + data_type: Arrow data type of the array + nullable: whether this position is nullable + array: the Arrow array to hash + path: current BTreeMap key path + ancestor_nulls: list of bools from ancestor struct nulls, or None + fields: the BTreeMap of entries to populate """ - nullable, bit_vec, _structural, data_digest = digest_entry + import pyarrow as pa - if not nullable: - for i in range(len(arr)): - val = arr[i].as_py().encode("utf-8") - data_digest.update(struct.pack(" None: +def _traverse_list(array, value_field, nullable, path, ancestor_nulls, fields): + """Traverse a list array, populating validity/structural/data entries (spec Section 3.4).""" import pyarrow as pa - nullable, bit_vec, structural, data_digest = digest_entry - if not nullable: - for i in range(len(arr)): - sub_arr = arr.value(i) if hasattr(arr, 'value') else arr[i].values - size_bytes = struct.pack(" None: - """Hash a struct array using composite child hashing. +def _traverse_struct(array, nullable, path, ancestor_nulls, fields): + """Traverse a struct array — struct is transparent (spec Section 3.5). - Each child is independently hashed into its own DigestBufferType, - then finalized into the parent's data stream via _finalize_child_into_data. + Struct-level nulls are AND-propagated to all descendant entries. """ import pyarrow as pa - nullable, bit_vec, _structural, data_digest = digest_entry - - # Push struct-level nulls to parent's BitVec + struct_array = array + # Combine struct's own nulls with ancestor nulls (AND propagation) if nullable: - _handle_null_bits(arr, bit_vec) + combined = _combine_null_masks(_get_validity_list(struct_array), ancestor_nulls) + else: + combined = ancestor_nulls # Sort children alphabetically by field name - children = [(i, data_type.field(i)) for i in range(data_type.num_fields)] + children = [(i, struct_array.type.field(i)) for i in range(struct_array.type.num_fields)] children.sort(key=lambda x: x[1].name) - struct_nulls = None - if arr.null_count > 0: - # Build struct-level null buffer from validity bitmap - struct_nulls = [arr[i].is_valid for i in range(len(arr))] - for idx, child_field in children: - child_array = arr.field(idx) + child_array = struct_array.field(idx) + child_path = f"{path}{DELIMITER}{child_field.name}" if path else child_field.name + _traverse_and_update( + child_field.type, + child_field.nullable, + child_array, + child_path, + combined, + fields, + ) - # Child is effectively nullable if the child field is nullable - # OR the struct itself has nulls (struct-level nulls propagate down) - effectively_nullable = child_field.nullable or (struct_nulls is not None) - child_digest = _new_digest_entry( - effectively_nullable, - _is_list_type(child_field.type), - ) +def _traverse_leaf(data_type, array, path, ancestor_nulls, fields): + """Traverse a leaf array — hash data into its BTreeMap entry (spec Sections 3.1–3.3).""" + entry = fields.get(path) + if entry is None: + return - if struct_nulls is not None: - # Propagate struct-level nulls: combined = struct AND child - combined_child = _combine_struct_nulls_with_child( - child_array, struct_nulls - ) - _array_digest_update(child_field.type, combined_child, child_digest) + null_bits, _structural, data_digest = entry + if data_digest is None: + return + + # Compute effective validity (own nulls AND ancestor struct nulls) + own_valid = _get_validity_list(array) + effective = _combine_null_masks(own_valid, ancestor_nulls) + + # Push effective validity to null_bits + if null_bits is not None: + if effective is not None: + for v in effective: + null_bits.push(v) else: - _array_digest_update(child_field.type, child_array, child_digest) + null_bits.extend_true(len(array)) - # Finalize child digest into parent's data stream - _finalize_child_into_data(digest_entry, child_digest) + # Hash leaf data, skipping null elements + _hash_leaf_data(data_type, array, data_digest, effective) -def _combine_struct_nulls_with_child(child_array, struct_nulls: list[bool]): - """Combine struct-level nulls with child nulls and return a new array.""" +def _hash_leaf_data(data_type, array, data_digest, effective_nulls): + """Hash leaf-level data bytes into the data digest (spec Sections 3.1–3.3).""" import pyarrow as pa - # Build combined validity: struct_valid AND child_valid - combined_valid = [] - for i in range(len(child_array)): - child_valid = child_array[i].is_valid - combined_valid.append(struct_nulls[i] and child_valid) - - # Reconstruct the child array with combined null mask - # Convert to Python values and rebuild with explicit mask - values = [] - for i in range(len(child_array)): - if combined_valid[i]: - values.append(child_array[i].as_py()) + if pa.types.is_boolean(data_type): + _hash_boolean_data(array, data_digest, effective_nulls) + elif pa.types.is_large_binary(data_type): + _hash_binary_data(array, data_digest, effective_nulls) + elif pa.types.is_large_string(data_type): + _hash_string_data(array, data_digest, effective_nulls) + else: + element_size = _element_size_for_type(data_type) + if element_size is not None: + _hash_fixed_size_data(array, data_digest, element_size, effective_nulls) else: - values.append(None) + raise NotImplementedError(f"Unsupported leaf type: {data_type}") - return pa.array(values, type=child_array.type) +# --------------------------------------------------------------------------- +# Leaf data hashing (spec Sections 3.1–3.3) +# --------------------------------------------------------------------------- def _element_size_for_type(dt: pa.DataType) -> int | None: """Return byte width for fixed-size types, or None for variable-length.""" @@ -445,82 +547,96 @@ def _element_size_for_type(dt: pa.DataType) -> int | None: return None -def _array_digest_update(data_type, arr, digest_entry) -> None: - import pyarrow as pa +def _hash_fixed_size_data(arr, data_digest, element_size: int, effective_nulls) -> None: + """Hash a fixed-width array's data bytes (spec Section 3.1).""" + bufs = arr.buffers() + data_buf = bufs[1] + offset = arr.offset - # Normalize small variants to large equivalents - if pa.types.is_string(data_type) and not pa.types.is_large_string(data_type): - arr = arr.cast(pa.large_utf8()) - data_type = pa.large_utf8() - elif pa.types.is_binary(data_type) and not pa.types.is_large_binary(data_type): - arr = arr.cast(pa.large_binary()) - data_type = pa.large_binary() - elif pa.types.is_list(data_type) and not pa.types.is_large_list(data_type): - arr = arr.cast(pa.large_list(data_type.value_field)) - data_type = pa.large_list(data_type.value_field) - elif pa.types.is_dictionary(data_type): - arr = arr.cast(data_type.value_type) - data_type = data_type.value_type - # Re-enter to handle potential further normalization - _array_digest_update(data_type, arr, digest_entry) - return + raw = data_buf.to_pybytes() + start = offset * element_size - if pa.types.is_boolean(data_type): - _hash_boolean_array(arr, digest_entry) - elif pa.types.is_large_binary(data_type): - _hash_binary_array(arr, digest_entry) - elif pa.types.is_large_string(data_type): - _hash_string_array(arr, digest_entry) - elif pa.types.is_large_list(data_type): - _hash_list_array(arr, data_type.value_type, digest_entry) - elif pa.types.is_struct(data_type): - _hash_struct_array(arr, data_type, digest_entry) + if effective_nulls is None: + # Non-nullable or all valid: feed entire contiguous buffer + end = start + len(arr) * element_size + data_digest.update(raw[start:end]) else: - element_size = _element_size_for_type(data_type) - if element_size is not None: - _hash_fixed_size_array(arr, digest_entry, element_size) + # Nullable: feed only valid elements + has_nulls = any(not v for v in effective_nulls) + if has_nulls: + for i in range(len(arr)): + if effective_nulls[i]: + pos = start + i * element_size + data_digest.update(raw[pos:pos + element_size]) else: - raise NotImplementedError(f"Unsupported data type: {data_type}") + end = start + len(arr) * element_size + data_digest.update(raw[start:end]) + + +def _hash_boolean_data(arr, data_digest, effective_nulls) -> None: + """Hash boolean array data bits (spec Section 3.2).""" + bv = _BitVec() + if effective_nulls is None: + for i in range(len(arr)): + bv.push(arr[i].as_py()) + else: + for i in range(len(arr)): + if effective_nulls[i]: + bv.push(arr[i].as_py()) + data_digest.update(bv.raw_bytes()) + + +def _hash_binary_data(arr, data_digest, effective_nulls) -> None: + """Hash binary array data (spec Section 3.3).""" + if effective_nulls is None: + for i in range(len(arr)): + val = arr[i].as_py() + data_digest.update(struct.pack(" None: + """Hash string array data (spec Section 3.3).""" + if effective_nulls is None: + for i in range(len(arr)): + val = arr[i].as_py().encode("utf-8") + data_digest.update(struct.pack(" None: - nullable, bit_vec, structural, data_digest = entry - if nullable: - # Validity bitmap length as u64 LE - final_digest.update(struct.pack(" None: + """Finalize a single BTreeMap entry into the final combining digest (spec Section 4).""" + null_bits, structural, data = entry + + # 1. null_bits (if present — nullable entries only) + if null_bits is not None: + final_digest.update(struct.pack(" None: - """Finalize a child's digest and write the resulting bytes into the parent's data stream. - - Used for composite types (structs) where each child is independently hashed - and then its finalized representation is fed into the parent digest. - """ - _p_nullable, _p_bit_vec, _p_structural, parent_data = parent_entry - c_nullable, c_bit_vec, c_structural, c_data = child_entry - - # Null bits first (if nullable child) - if c_nullable and c_bit_vec is not None: - parent_data.update(struct.pack(" None: self._schema = schema self._schema_digest = _hash_schema(schema) - # BTreeMap – sorted by key + # BTreeMap — sorted by key self._fields: dict[str, tuple] = {} for i in range(len(schema)): _extract_fields(schema.field(i), "", self._fields) @@ -544,33 +660,24 @@ def __init__(self, schema: pa.Schema) -> None: self._fields = dict(sorted(self._fields.items())) def update(self, record_batch: pa.RecordBatch) -> None: - """Feed a RecordBatch into the running digest.""" - import pyarrow as pa - - for field_path, entry in self._fields.items(): - parts = field_path.split(DELIMITER) - if len(parts) == 1: - col_name = parts[0] - col_idx = record_batch.schema.get_field_index(col_name) - col = record_batch.column(col_idx) - field = record_batch.schema.field(col_idx) - _array_digest_update(field.type, col, entry) - else: - # Nested struct traversal - col_idx = record_batch.schema.get_field_index(parts[0]) - arr = record_batch.column(col_idx) - for level in range(1, len(parts) - 1): - struct_arr = arr - child_idx = struct_arr.type.get_field_index(parts[level]) - arr = struct_arr.field(child_idx) - leaf_name = parts[-1] - child_idx = arr.type.get_field_index(leaf_name) - leaf_arr = arr.field(child_idx) - leaf_dt = arr.type.field(child_idx).type - _array_digest_update(leaf_dt, leaf_arr, entry) + """Feed a RecordBatch into the running digest (spec Sections 3–5).""" + # Build a mapping from top-level column name to (field, array) + schema = record_batch.schema + # Traverse each top-level field using the recursive traversal + for i in range(len(schema)): + field = schema.field(i) + col = record_batch.column(i) + _traverse_and_update( + field.type, + field.nullable, + col, + field.name, + None, # no ancestor struct nulls at top level + self._fields, + ) def finalize(self) -> bytes: - """Consume the digester and return the versioned hash.""" + """Consume the digester and return the versioned hash (spec Section 5).""" final_digest = hashlib.sha256() final_digest.update(self._schema_digest) for _path, entry in sorted(self._fields.items()): @@ -599,7 +706,10 @@ def hash_table(table: pa.Table) -> bytes: @staticmethod def hash_array(array: pa.Array) -> bytes: - """Hash a single array (matches Rust ``hash_array``).""" + """Hash a single array (spec Section 6). + + Uses the same recursive BTreeMap decomposition as the record-batch path. + """ import pyarrow as pa # Resolve dictionary arrays to their plain value type @@ -612,55 +722,34 @@ def hash_array(array: pa.Array) -> bytes: # Normalize to canonical large types normalized_type = _normalize_data_type(effective_type) - # Use data_type_to_value for canonical metadata serialization + # Step 1: Type metadata (canonical JSON string) dt_value = _data_type_to_value(normalized_type) + dt_value = _sort_json_value(dt_value) dt_json = json.dumps(dt_value, separators=(",", ":")) final_digest = hashlib.sha256() final_digest.update(dt_json.encode()) - # Match Rust: is_nullable() checks null_count > 0, not buffer existence. - # Arrays with all-valid values but a null bitmap (e.g. from dictionary cast) - # are treated as non-nullable. + # Determine nullability: arrays with null_count > 0 are nullable nullable = effective_array.null_count > 0 - entry = _new_digest_entry(nullable, _is_list_type(normalized_type)) - _array_digest_update(effective_type, effective_array, entry) - _finalize_digest(final_digest, entry) - - return VERSION_BYTES + final_digest.digest() - - -# --------------------------------------------------------------------------- -# Type normalization (matches Rust normalize_data_type / normalize_field) -# --------------------------------------------------------------------------- - -def _normalize_data_type(dt: pa.DataType) -> pa.DataType: - """Recursively normalize a DataType to its canonical large equivalent.""" - import pyarrow as pa - - if pa.types.is_dictionary(dt): - return _normalize_data_type(dt.value_type) - if dt == pa.utf8(): - return pa.large_utf8() - if dt == pa.binary(): - return pa.large_binary() - if pa.types.is_list(dt) or pa.types.is_large_list(dt): - inner = _normalize_field(dt.value_field) - return pa.large_list(inner) - if pa.types.is_struct(dt): - fields = [_normalize_field(dt.field(i)) for i in range(dt.num_fields)] - return pa.struct(fields) - if pa.types.is_fixed_size_list(dt): - inner = _normalize_field(dt.value_field) - return pa.list_(inner, dt.list_size) - if pa.types.is_map(dt): - inner = _normalize_field(dt.key_field) - return pa.map_(inner.type, dt.item_field.type) - return dt + # Step 2: Build BTreeMap entries from the type tree (same as record-batch) + fields: dict[str, tuple] = {} + _extract_type_entries(effective_type, nullable, "", fields) + fields = dict(sorted(fields.items())) + + # Step 3: Traverse and populate entries + _traverse_and_update( + effective_type, + nullable, + effective_array, + "", + None, + fields, + ) + # Step 4: Finalize all entries into the digest + for _path, entry in sorted(fields.items()): + _finalize_digest(final_digest, entry) -def _normalize_field(field: pa.Field) -> pa.Field: - """Normalize a single field: keep name and nullability, normalize the data type.""" - import pyarrow as pa - return pa.field(field.name, _normalize_data_type(field.type), field.nullable) + return VERSION_BYTES + final_digest.digest() diff --git a/tests/test_arrow_digester.py b/tests/test_arrow_digester.py index e748d10..9d9e083 100644 --- a/tests/test_arrow_digester.py +++ b/tests/test_arrow_digester.py @@ -131,7 +131,7 @@ def test_list_array(self): type=pa.list_(pa.field("item", pa.int32(), nullable=True)), ) h = ArrowDigester.hash_array(arr).hex() - assert h == "00000190658c2c4e9178f8ae6c686d6fe13262a9fab9cb619542911453abeca8195a9f" + assert h == "000001dc359d563a1ed210eb271b314612ea8343f0a0b0955b9053a9eb47962d27163c" def test_decimal128_array(self): from decimal import Decimal diff --git a/tests/test_golden_parity.py b/tests/test_golden_parity.py new file mode 100644 index 0000000..60924c9 --- /dev/null +++ b/tests/test_golden_parity.py @@ -0,0 +1,609 @@ +"""Golden-hash parity tests: Python must produce byte-for-byte identical hashes to Rust. + +Every expected value below was generated by `cargo run --bin emit_golden` against +the Rust starfix crate *after* PR #11 was merged (commit on 2026-03-20). The Rust +values are treated as the authoritative source of truth; these tests are regression +guards that will fail immediately if the Python implementation diverges. + +Covers 36 distinct Arrow tables / arrays: + - spec examples A, B, C, D, E, F, G, H, I, J, K, N, O + - all primitive scalar types (bool, int8..uint64, float32, float64) + - temporal types (date32, date64, time32(s/ms), time64(us/ns)) + - variable-length (binary, string/utf8) + - decimal128 (precision ranges 1-9, 10-18, 19-38) + - list-of-int32 + - struct column, nested struct (two levels), struct with nullable children + - multi-type record batch, schema-only (empty) hash + - streaming two-batch incremental update +""" + +from __future__ import annotations + +from decimal import Decimal + +import pyarrow as pa +import pytest +from starfix.arrow_digester import ArrowDigester + + +# --------------------------------------------------------------------------- +# Spec examples (byte-exact conformance, letter-for-letter from spec) +# --------------------------------------------------------------------------- + + +class TestSpecExamples: + def test_example_a_two_column_table(self): + """Example A: {age: Int32 non-null, name: LargeUtf8 nullable} — two rows.""" + schema = pa.schema([ + pa.field("age", pa.int32(), nullable=False), + pa.field("name", pa.large_utf8(), nullable=True), + ]) + batch = pa.record_batch( + { + "age": pa.array([25, 30], type=pa.int32()), + "name": pa.array(["Alice", None], type=pa.large_utf8()), + }, + schema=schema, + ) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "0000018020e47f4462f26b0bc73ad110ea0f9198c2745c04ce23335093d2b78ef51c88" + + def test_example_b_bool_array_with_nulls(self): + """Example B: BooleanArray [true, NULL, false, true].""" + array = pa.array([True, None, False, True], type=pa.bool_()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000185a9c99eba7bcfd9b14fd529b9534f2289319779270aa4a072f117cf90a6ac8b" + + def test_example_c_non_nullable_int32(self): + """Example C: Int32Array [1, 2, 3] — non-nullable.""" + array = pa.array([1, 2, 3], type=pa.int32()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000109cb203c0cae27346ea2ce37d1245775ac99c8b26634677de56268ba01dc693c" + + def test_example_d_non_nullable_binary(self): + """Example D: BinaryArray [b'hi', b''] — non-nullable; tests Binary→LargeBinary canonicalization.""" + array = pa.array([b"hi", b""], type=pa.binary()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000010fbc627fc7ad9d5f2e96fec8fa57b56b364a04a8af7e374015a07b26bd1ac80d" + + def test_example_e_column_order_independence(self): + """Example E: columns [x: Int32, y: Boolean nullable] — order must not matter.""" + schema_xy = pa.schema([ + pa.field("x", pa.int32(), nullable=False), + pa.field("y", pa.bool_(), nullable=True), + ]) + schema_yx = pa.schema([ + pa.field("y", pa.bool_(), nullable=True), + pa.field("x", pa.int32(), nullable=False), + ]) + batch_xy = pa.record_batch( + {"x": pa.array([10], type=pa.int32()), "y": pa.array([True], type=pa.bool_())}, + schema=schema_xy, + ) + batch_yx = pa.record_batch( + {"y": pa.array([True], type=pa.bool_()), "x": pa.array([10], type=pa.int32())}, + schema=schema_yx, + ) + expected = "000001f68bf6319f8ec4aa938e52dd911974348289fbdfb9b5ebed5e14e239a6d8a3a9" + assert ArrowDigester.hash_record_batch(batch_xy).hex() == expected + assert ArrowDigester.hash_record_batch(batch_yx).hex() == expected + + def test_example_f_utf8_equivalence(self): + """Example F: Utf8 and LargeUtf8 arrays with the same data must hash identically.""" + small = pa.array(["ab"], type=pa.utf8()) + large = pa.array(["ab"], type=pa.large_utf8()) + expected = "000001f14d076ffc02533c6560f867b86cde7e15b1a0ac5a892b53697a609191cf307f" + assert ArrowDigester.hash_array(small).hex() == expected + assert ArrowDigester.hash_array(large).hex() == expected + + def test_example_g_nullable_int32(self): + """Example G: Int32Array [Some(42), None, Some(-7), Some(0)].""" + array = pa.array([42, None, -7, 0], type=pa.int32()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000018330f9b8796b9434cbf7bc028c18c58a2a739b980acf9995ce1e5d60b43b0138" + + def test_example_h_nullable_string(self): + """Example H: StringArray [Some('hello'), None, Some('world'), Some('')].""" + array = pa.array(["hello", None, "world", ""], type=pa.utf8()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000016255bde0141ebf26e08c31c96f6112e5e21d101ab8bb90d77f2c3eec02c62d3c" + + def test_example_i_empty_table(self): + """Example I: schema {a: Int32 non-null, b: Boolean nullable} with zero rows.""" + schema = pa.schema([ + pa.field("a", pa.int32(), nullable=False), + pa.field("b", pa.bool_(), nullable=True), + ]) + digester = ArrowDigester(schema) + result = digester.finalize().hex() + assert result == "000001df8887d2ebb6968ba1dd994b1c663e0539f218f6045996a03dd20209c40d0922" + + def test_example_j_multi_batch_streaming(self): + """Example J: batch-split independence — two batches == one combined batch.""" + schema = pa.schema([pa.field("v", pa.int32(), nullable=False)]) + batch1 = pa.record_batch({"v": pa.array([1, 2], type=pa.int32())}, schema=schema) + batch2 = pa.record_batch({"v": pa.array([3], type=pa.int32())}, schema=schema) + digester = ArrowDigester(schema) + digester.update(batch1) + digester.update(batch2) + result = digester.finalize().hex() + assert result == "000001d4efb209fd319c8b418662caad5922b154003d0425fa722060b44e6d5eb0d857" + + def test_example_k_struct_column(self): + """Example K: {person: Struct}.""" + person_type = pa.struct([ + pa.field("age", pa.int32(), nullable=False), + pa.field("name", pa.large_utf8(), nullable=False), + ]) + schema = pa.schema([pa.field("person", person_type, nullable=False)]) + person_col = pa.array( + [{"age": 25, "name": "Alice"}, {"age": 30, "name": "Bob"}], + type=person_type, + ) + batch = pa.record_batch({"person": person_col}, schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "000001e3516302d68fd1fc4860fa4b7e0d4a6d60d500a9d7ccae79e445c5980021e4ec" + + def test_example_n_list_of_struct(self): + """Example N: {items: LargeList> nullable}. + + Row 0: [{id:1, label:'a'}, {id:2, label:'b'}] + Row 1: [{id:3, label:'c'}] + """ + item_type = pa.struct([ + pa.field("id", pa.int32(), nullable=False), + pa.field("label", pa.large_utf8(), nullable=False), + ]) + schema = pa.schema([ + pa.field("items", pa.large_list(pa.field("item", item_type, nullable=False)), nullable=True) + ]) + items_col = pa.array( + [ + [{"id": 1, "label": "a"}, {"id": 2, "label": "b"}], + [{"id": 3, "label": "c"}], + ], + type=pa.large_list(pa.field("item", item_type, nullable=False)), + ) + batch = pa.record_batch({"items": items_col}, schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "0000019c9cc6230a42e646a475ec1c8125fe396dcace351d64ff6e63e8f53d329c4895" + + def test_example_o_nested_struct(self): + """Example O: {s: Struct>} — two levels.""" + nested_type = pa.struct([ + pa.field("p", pa.int32(), nullable=False), + pa.field("q", pa.int32(), nullable=False), + ]) + outer_type = pa.struct([ + pa.field("a", pa.int32(), nullable=False), + pa.field("nested", nested_type, nullable=False), + ]) + schema = pa.schema([pa.field("s", outer_type, nullable=False)]) + s_col = pa.array( + [{"a": 10, "nested": {"p": 20, "q": 30}}], + type=outer_type, + ) + batch = pa.record_batch({"s": s_col}, schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "000001e732b8458782d9a331722698883617eec7cd1492028947b4ce3e97c7d6201550" + + +# --------------------------------------------------------------------------- +# Primitive scalar types (all 11 numeric/bool types) +# --------------------------------------------------------------------------- + + +class TestPrimitiveTypes: + def test_bool(self): + array = pa.array([True, False, None, True], type=pa.bool_()) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001e752bea45ed48352e104fb24919ea68b1af64281231aff3a52b51a5fcabe7fb1" + + def test_int8(self): + array = pa.array([1, None, -1, 127], type=pa.int8()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000019ab12c5c219df0b745b77bb592945dfd5a8d8bf6c20752cb601cc554bde708bc" + + def test_int16(self): + array = pa.array([1000, None, -1000, 32767], type=pa.int16()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000163dac968f9f62a036126abe3e53404b69a40f2cf02872a1fc970c7428c5869b4" + + def test_int32(self): + array = pa.array([42, None, -7, 0], type=pa.int32()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000018330f9b8796b9434cbf7bc028c18c58a2a739b980acf9995ce1e5d60b43b0138" + + def test_int64(self): + array = pa.array([1_000_000, None, -1_000_000, 0], type=pa.int64()) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001950cf62cd41a3cfb936bcaeb2af16e4b9010e72e6201af4755c016b7b9466acf" + + def test_uint8(self): + array = pa.array([0, 1, None, 255], type=pa.uint8()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000130328e5fa11093ea36a10554afdb07fa718453f41a113dbdbdefc5baf06d1ffd" + + def test_uint16(self): + array = pa.array([0, 1000, None, 65535], type=pa.uint16()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000133e15d7b6d806f8c121f9683e1f6b62674b0f48710f7f6dc3155262e0fc84357" + + def test_uint32(self): + array = pa.array([0, 1_000_000, None, 4_294_967_295], type=pa.uint32()) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001c3d60c88f4842be618c85a09d604cbc0d87d8ea27c164fc2acb387ca50bbfa2e" + + def test_uint64(self): + array = pa.array([0, 1_000_000_000, None, 18_446_744_073_709_551_615], type=pa.uint64()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000144997caad273e039ba4919a2f1db91f05b6546271121619bb1fc97d8757f180b" + + def test_float32(self): + array = pa.array([1.5, None, -3.14, 0.0], type=pa.float32()) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001f5c04a8a43027a04f68de9f71b99b3075459c9c296f68aecdc92bac8ef506baa" + + def test_float64(self): + array = pa.array([1.5, None, -3.14, 0.0], type=pa.float64()) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001b0e2b1cc3668a42dbc98d3b7246f7e8cdf2d5e6a38cc0510e463d36cbeecff0b" + + +# --------------------------------------------------------------------------- +# Temporal types +# --------------------------------------------------------------------------- + + +class TestTemporalTypes: + def test_date32(self): + """Date32: days since Unix epoch.""" + array = pa.array([0, 18993, None, -1], type=pa.date32()) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001103053df817a93502fae8bd8d052c57e48acc76fc65db9316c2425805f2f7ef6" + + def test_date64(self): + """Date64: ms since Unix epoch.""" + array = pa.array([0, 1_640_995_200_000, None, -86_400_000], type=pa.date64()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000106dde92a47a5a0db64ce2c83c1de96868b8f55868003fb6e894be0751d10beeb" + + def test_time32_seconds(self): + """Time32(s): seconds since midnight.""" + array = pa.array([1000, None, 5000, 0], type=pa.time32("s")) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001aba70469e596c735ec13c3d60a9db2d0e5515eb864f07ad5d24572b35f23eacc" + + def test_time32_milliseconds(self): + """Time32(ms): milliseconds since midnight.""" + array = pa.array([1_000_000, None, 5_000_000, 0], type=pa.time32("ms")) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001abd6f51bd0ca68584a4ae73f4f9351cfe97fe6f3f85ed315f9b3628650a97564" + + def test_time64_microseconds(self): + """Time64(us): microseconds since midnight.""" + array = pa.array([1_000_000, None, 5_000_000, 0], type=pa.time64("us")) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001c96d705b1278f9ffe1b31fb307408768f14d961c44028a1d0f778dd61786ee26" + + def test_time64_nanoseconds(self): + """Time64(ns): nanoseconds since midnight.""" + array = pa.array([1_000_000_000, None, 5_000_000_000, 0], type=pa.time64("ns")) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000013236ab3ffd9de453dc2a7613731108b132583249f2d42eb7a102dd430e66175f" + + +# --------------------------------------------------------------------------- +# Variable-length types +# --------------------------------------------------------------------------- + + +class TestVariableLengthTypes: + def test_binary(self): + """Binary array — tests LargeBinary canonicalization.""" + array = pa.array([b"hello", None, b"world", b""], type=pa.binary()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000018dc3a0e479d1335553546c8f23c36d75335cbd34805a6f96c5d5225b347fbc57" + + def test_large_binary_equals_binary(self): + """LargeBinary and Binary arrays with same data must produce the same hash.""" + small = pa.array([b"hello", None, b"world", b""], type=pa.binary()) + large = pa.array([b"hello", None, b"world", b""], type=pa.large_binary()) + expected = "0000018dc3a0e479d1335553546c8f23c36d75335cbd34805a6f96c5d5225b347fbc57" + assert ArrowDigester.hash_array(small).hex() == expected + assert ArrowDigester.hash_array(large).hex() == expected + + def test_string(self): + """String (Utf8) array — tests LargeUtf8 canonicalization.""" + array = pa.array(["hello", None, "world", ""], type=pa.utf8()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000016255bde0141ebf26e08c31c96f6112e5e21d101ab8bb90d77f2c3eec02c62d3c" + + def test_large_string_equals_string(self): + """LargeUtf8 and Utf8 arrays with same data must produce the same hash.""" + small = pa.array(["hello", None, "world", ""], type=pa.utf8()) + large = pa.array(["hello", None, "world", ""], type=pa.large_utf8()) + expected = "0000016255bde0141ebf26e08c31c96f6112e5e21d101ab8bb90d77f2c3eec02c62d3c" + assert ArrowDigester.hash_array(small).hex() == expected + assert ArrowDigester.hash_array(large).hex() == expected + + +# --------------------------------------------------------------------------- +# Decimal types (precision routing: 1-9, 10-18, 19-38) +# --------------------------------------------------------------------------- + + +class TestDecimalTypes: + def test_decimal_precision_9_scale_2(self): + """Decimal128 precision=9 (maps to decimal32 storage range) scale=2. + + Rust stores raw i128 values [123, -456, 0]. In PyArrow, pa.decimal128(9, 2) + interprets values as scaled decimals, so we use Decimal strings to get + the same raw integer storage: 1.23 * 10^2 = 123, etc. + """ + array = pa.array( + [Decimal("1.23"), None, Decimal("-4.56"), Decimal("0.00")], + type=pa.decimal128(9, 2), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000014f015bd5c4b6ce6e939a8c890333f3e110c2c28ef8014aafd352f8373791e547" + + def test_decimal_precision_15_scale_3(self): + """Decimal128 precision=15 (maps to decimal64 range) scale=3.""" + array = pa.array( + [Decimal("1234567890.123"), None, Decimal("-9876543.210"), Decimal("0.000")], + type=pa.decimal128(15, 3), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001dc08c7b9c583edecec36bc5dee21cd2edec9f402a651014fea5f8834d16ad737" + + def test_decimal_precision_38_scale_5(self): + """Decimal128 precision=38 scale=5 — full 16-byte integer path.""" + array = pa.array( + [ + Decimal("1234567890123456789012.34567"), + None, + Decimal("-9876543210987654321098.76543"), + Decimal("0.00000"), + ], + type=pa.decimal128(38, 5), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000011e3b33d28771b3593fd5dc4b68af8091a1ba9cd493ade374e7368e213bef244e" + + +# --------------------------------------------------------------------------- +# List types +# --------------------------------------------------------------------------- + + +class TestListTypes: + def test_list_of_int32(self): + """List — tests LargeList canonicalization and per-element length prefix.""" + array = pa.array( + [ + [1, 2, 3], + None, + [4, 5], + [6], + ], + type=pa.list_(pa.field("item", pa.int32(), nullable=True)), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001dc359d563a1ed210eb271b314612ea8343f0a0b0955b9053a9eb47962d27163c" + + def test_large_list_equals_list(self): + """LargeList with the same data must produce the same hash as List.""" + small = pa.array( + [[1, 2, 3], None, [4, 5], [6]], + type=pa.list_(pa.field("item", pa.int32(), nullable=True)), + ) + large = pa.array( + [[1, 2, 3], None, [4, 5], [6]], + type=pa.large_list(pa.field("item", pa.int32(), nullable=True)), + ) + expected = "000001dc359d563a1ed210eb271b314612ea8343f0a0b0955b9053a9eb47962d27163c" + assert ArrowDigester.hash_array(small).hex() == expected + assert ArrowDigester.hash_array(large).hex() == expected + + +# --------------------------------------------------------------------------- +# Struct types +# --------------------------------------------------------------------------- + + +class TestStructTypes: + def test_struct_with_nullable_children(self): + """Non-nullable struct column {a: Int32 non-null, b: Boolean nullable}.""" + struct_type = pa.struct([ + pa.field("a", pa.int32(), nullable=False), + pa.field("b", pa.bool_(), nullable=True), + ]) + schema = pa.schema([pa.field("s", struct_type, nullable=False)]) + s_col = pa.array( + [{"a": 1, "b": True}, {"a": 2, "b": False}, {"a": 3, "b": None}], + type=struct_type, + ) + batch = pa.record_batch({"s": s_col}, schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "0000015f1992c5f9130aaa64aa1360e53ddd00b427855ae02e380f534a629353135680" + + def test_struct_field_order_independence(self): + """Struct fields in different order must hash identically (order-independence).""" + type_ab = pa.struct([ + pa.field("a", pa.int32(), nullable=False), + pa.field("b", pa.bool_(), nullable=True), + ]) + type_ba = pa.struct([ + pa.field("b", pa.bool_(), nullable=True), + pa.field("a", pa.int32(), nullable=False), + ]) + schema_ab = pa.schema([pa.field("s", type_ab, nullable=False)]) + schema_ba = pa.schema([pa.field("s", type_ba, nullable=False)]) + + data_ab = pa.array( + [{"a": 1, "b": True}, {"a": 2, "b": False}, {"a": 3, "b": None}], + type=type_ab, + ) + data_ba = pa.array( + [{"b": True, "a": 1}, {"b": False, "a": 2}, {"b": None, "a": 3}], + type=type_ba, + ) + batch_ab = pa.record_batch({"s": data_ab}, schema=schema_ab) + batch_ba = pa.record_batch({"s": data_ba}, schema=schema_ba) + assert ( + ArrowDigester.hash_record_batch(batch_ab) + == ArrowDigester.hash_record_batch(batch_ba) + ) + + +# --------------------------------------------------------------------------- +# Multi-type record batch +# --------------------------------------------------------------------------- + + +class TestMultiTypeBatch: + def test_multi_type_record_batch(self): + """One-row record batch covering all scalar and collection types.""" + schema = pa.schema([ + pa.field("bool", pa.bool_(), nullable=True), + pa.field("int8", pa.int8(), nullable=False), + pa.field("uint8", pa.uint8(), nullable=False), + pa.field("int16", pa.int16(), nullable=False), + pa.field("uint16", pa.uint16(), nullable=False), + pa.field("int32", pa.int32(), nullable=False), + pa.field("uint32", pa.uint32(), nullable=False), + pa.field("int64", pa.int64(), nullable=False), + pa.field("uint64", pa.uint64(), nullable=False), + pa.field("float32", pa.float32(), nullable=False), + pa.field("float64", pa.float64(), nullable=False), + pa.field("date32", pa.date32(), nullable=False), + pa.field("date64", pa.date64(), nullable=False), + pa.field("time32_second", pa.time32("s"), nullable=False), + pa.field("time32_millis", pa.time32("ms"), nullable=False), + pa.field("time64_micro", pa.time64("us"), nullable=False), + pa.field("time64_nano", pa.time64("ns"), nullable=False), + pa.field("binary", pa.binary(), nullable=True), + pa.field("large_binary", pa.large_binary(), nullable=True), + pa.field("utf8", pa.utf8(), nullable=True), + pa.field("large_utf8", pa.large_utf8(), nullable=True), + pa.field("list", pa.list_(pa.field("item", pa.int32(), nullable=True)), nullable=True), + pa.field("large_list", pa.large_list(pa.field("item", pa.int32(), nullable=True)), nullable=True), + pa.field("decimal128", pa.decimal128(38, 5), nullable=True), + ]) + batch = pa.record_batch( + { + "bool": pa.array([True], type=pa.bool_()), + "int8": pa.array([1], type=pa.int8()), + "uint8": pa.array([1], type=pa.uint8()), + "int16": pa.array([100], type=pa.int16()), + "uint16": pa.array([100], type=pa.uint16()), + "int32": pa.array([1000], type=pa.int32()), + "uint32": pa.array([1000], type=pa.uint32()), + "int64": pa.array([100_000], type=pa.int64()), + "uint64": pa.array([100_000], type=pa.uint64()), + "float32": pa.array([1.5], type=pa.float32()), + "float64": pa.array([1.5], type=pa.float64()), + "date32": pa.array([18993], type=pa.date32()), + "date64": pa.array([1_640_995_200_000], type=pa.date64()), + "time32_second": pa.array([3600], type=pa.time32("s")), + "time32_millis": pa.array([3_600_000], type=pa.time32("ms")), + "time64_micro": pa.array([3_600_000_000], type=pa.time64("us")), + "time64_nano": pa.array([3_600_000_000_000], type=pa.time64("ns")), + "binary": pa.array([b"data1"], type=pa.binary()), + "large_binary": pa.array([b"large1"], type=pa.large_binary()), + "utf8": pa.array(["text1"], type=pa.utf8()), + "large_utf8": pa.array(["large_text1"], type=pa.large_utf8()), + "list": pa.array([[1, 2]], type=pa.list_(pa.field("item", pa.int32(), nullable=True))), + "large_list": pa.array([[5, 6]], type=pa.large_list(pa.field("item", pa.int32(), nullable=True))), + "decimal128": pa.array( + [Decimal("1234567890123456789012345.67890")], + type=pa.decimal128(38, 5), + ), + }, + schema=schema, + ) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "0000010f8bb471cb34975f20c4b221abff1a90b2d98f24217a2257e3298ee3d4e9f9c4" + + def test_multi_type_schema_only(self): + """Empty (no-rows) digester over the large multi-type schema.""" + schema = pa.schema([ + pa.field("bool", pa.bool_(), nullable=True), + pa.field("int8", pa.int8(), nullable=False), + pa.field("uint8", pa.uint8(), nullable=False), + pa.field("int16", pa.int16(), nullable=False), + pa.field("uint16", pa.uint16(), nullable=False), + pa.field("int32", pa.int32(), nullable=False), + pa.field("uint32", pa.uint32(), nullable=False), + pa.field("int64", pa.int64(), nullable=False), + pa.field("uint64", pa.uint64(), nullable=False), + pa.field("float32", pa.float32(), nullable=False), + pa.field("float64", pa.float64(), nullable=False), + pa.field("date32", pa.date32(), nullable=False), + pa.field("date64", pa.date64(), nullable=False), + pa.field("time32_second", pa.time32("s"), nullable=False), + pa.field("time32_millis", pa.time32("ms"), nullable=False), + pa.field("time64_micro", pa.time64("us"), nullable=False), + pa.field("time64_nano", pa.time64("ns"), nullable=False), + pa.field("binary", pa.binary(), nullable=True), + pa.field("large_binary", pa.large_binary(), nullable=True), + pa.field("utf8", pa.utf8(), nullable=True), + pa.field("large_utf8", pa.large_utf8(), nullable=True), + pa.field("list", pa.list_(pa.field("item", pa.int32(), nullable=True)), nullable=True), + pa.field("large_list", pa.large_list(pa.field("item", pa.int32(), nullable=True)), nullable=True), + pa.field("decimal128", pa.decimal128(38, 5), nullable=True), + ]) + result = ArrowDigester(schema).finalize().hex() + assert result == "0000010e62b3f3317d25413b7a4dc6e7a8a6e7c73a73c513cdbe9f9b1f5e5ec56f0e94" + + +# --------------------------------------------------------------------------- +# Streaming / incremental hashing +# --------------------------------------------------------------------------- + + +class TestStreamingHashing: + def test_streaming_two_batches(self): + """Two-batch incremental update must match the known Rust hash.""" + schema = pa.schema([ + pa.field("uids", pa.int32(), nullable=False), + pa.field("flags", pa.bool_(), nullable=True), + ]) + batch1 = pa.record_batch( + { + "uids": pa.array([1, 2, 3, 4], type=pa.int32()), + "flags": pa.array([True, False, None, True], type=pa.bool_()), + }, + schema=schema, + ) + batch2 = pa.record_batch( + { + "uids": pa.array([5, 6, 7, 8], type=pa.int32()), + "flags": pa.array([False, True, True, None], type=pa.bool_()), + }, + schema=schema, + ) + digester = ArrowDigester(schema) + digester.update(batch1) + digester.update(batch2) + result = digester.finalize().hex() + assert result == "0000019f5fa370d315a4b4f2314be7b7284a0549b70ad4e21e584fdebf441ad02f44f0" + + def test_batch_split_independence(self): + """Splitting a batch into two must not change the hash.""" + schema = pa.schema([ + pa.field("v", pa.int32(), nullable=False), + ]) + combined = pa.record_batch({"v": pa.array([1, 2, 3], type=pa.int32())}, schema=schema) + batch1 = pa.record_batch({"v": pa.array([1, 2], type=pa.int32())}, schema=schema) + batch2 = pa.record_batch({"v": pa.array([3], type=pa.int32())}, schema=schema) + + hash_combined = ArrowDigester.hash_record_batch(combined) + + digester = ArrowDigester(schema) + digester.update(batch1) + digester.update(batch2) + hash_split = digester.finalize() + + assert hash_combined == hash_split diff --git a/tests/test_golden_parity_r2.py b/tests/test_golden_parity_r2.py new file mode 100644 index 0000000..971f0cc --- /dev/null +++ b/tests/test_golden_parity_r2.py @@ -0,0 +1,969 @@ +"""Golden-hash parity tests — Round 2. + +Every expected value below was generated by `cargo run --bin emit_golden` against +the Rust starfix crate. The Rust values are treated as the authoritative source +of truth; these tests are regression guards that will fail immediately if the +Python implementation diverges. + +Covers 37+ distinct Arrow tables / arrays across categories: + - Empty arrays (0 elements) + - All-null arrays + - Large boolean arrays (multi-byte bitvec) + - Single-element arrays + - Nullable structs (struct itself nullable, hash_array & record batch) + - Three-level nested structs + - Struct with list child + - List, nullable List + - Non-nullable lists + - Mixed-type structs + - Mixed struct+list record batches + - Empty record batches (schema only) + - All-null columns in batches + - Unicode strings + - Binary with varied sizes + - Struct field order independence + - Nested struct with nullable inner struct + - List-of-list (nested lists) + - Large schema hashes + - Multi-batch streaming with structs and lists + - Float special values (NaN, Inf) + - Realistic batch (struct + list + scalar) +""" + +from __future__ import annotations + +import pyarrow as pa +import pytest +from starfix.arrow_digester import ArrowDigester + + +# --------------------------------------------------------------------------- +# Empty arrays +# --------------------------------------------------------------------------- + + +class TestEmptyArrays: + def test_empty_int32_array(self): + """Empty Int32 array (0 elements).""" + array = pa.array([], type=pa.int32()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000015b41cb9aa8a4c79da8859ae00ce976d8184bdf5426060292b69717c6f28ed38a" + + def test_empty_bool_array(self): + """Empty Boolean array (0 elements).""" + array = pa.array([], type=pa.bool_()) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001b93f053f60f756b3d47a304889daa8011b9d09b4c83efbfe10ca7f073d59d4ea" + + +# --------------------------------------------------------------------------- +# All-null arrays +# --------------------------------------------------------------------------- + + +class TestAllNullArrays: + def test_all_null_int32(self): + """Int32 array where every element is null.""" + array = pa.array([None, None, None, None], type=pa.int32()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000104fd65cf73bc1d8e787a6940b749777eddb28285d7bfc90e832212afb6fdea65" + + def test_all_null_string(self): + """LargeString array where every element is null.""" + array = pa.array([None, None, None], type=pa.large_utf8()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000115a6af4eb6fdb3419afebadf58a9f184f0a1fffa09ffa56600237f501db71070" + + +# --------------------------------------------------------------------------- +# Boolean arrays +# --------------------------------------------------------------------------- + + +class TestBooleanArrays: + def test_bool_17_elements(self): + """17-element boolean array (multi-byte bitvec).""" + array = pa.array( + [ + True, False, True, True, + None, False, True, False, + True, None, True, False, + True, True, False, None, + True, + ], + type=pa.bool_(), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001f8ad377be621fd278f5590c7cab06ca42e2b4ed6867d363f9fbf28f6f3d7582a" + + +# --------------------------------------------------------------------------- +# Single-element arrays +# --------------------------------------------------------------------------- + + +class TestSingleElementArrays: + def test_single_int64(self): + """Single Int64 value: 9_999_999_999.""" + array = pa.array([9_999_999_999], type=pa.int64()) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001e1cb4b7821c4238f0c53befd313280ff45a74b301e7044957b01a701108c5b7f" + + def test_single_string(self): + """Single LargeString value: 'hello world'.""" + array = pa.array(["hello world"], type=pa.large_utf8()) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000014db35cf6860dda66df959cd2b4882a4cddaa6d17e74cf389e71c5553e4cceadb" + + def test_single_bool_false(self): + """Single Boolean value: false.""" + array = pa.array([False], type=pa.bool_()) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000117655fca3764c8082b61b9de7186594bdcc58b86934176bea4b12d95becc54df" + + +# --------------------------------------------------------------------------- +# Nullable struct via hash_array +# --------------------------------------------------------------------------- + + +class TestNullableStructHashArray: + def test_nullable_struct_hash_array(self): + """Struct with null buffer via hash_array — row 1 is null struct. + + Fields: a: Int32 non-null, b: LargeUtf8 non-null + Data: [{a:10, b:'x'}, NULL, {a:30, b:'z'}] + """ + a_arr = pa.array([10, 20, 30], type=pa.int32()) + b_arr = pa.array(["x", "y", "z"], type=pa.large_utf8()) + mask = pa.array([False, True, False], type=pa.bool_()) + fields = [ + pa.field("a", pa.int32(), nullable=False), + pa.field("b", pa.large_utf8(), nullable=False), + ] + struct_array = pa.StructArray.from_arrays( + [a_arr, b_arr], + fields=fields, + mask=mask, + ) + result = ArrowDigester.hash_array(struct_array).hex() + assert result == "0000011ea07bf87d5b03b132e46a169df25352cb2e3c5a5f8b067138e0bd2a2023e6d5" + + +# --------------------------------------------------------------------------- +# Three-level nested struct +# --------------------------------------------------------------------------- + + +class TestThreeLevelStruct: + def test_three_level_struct_batch(self): + """Three-level nested struct as record batch. + + top: Struct> + Data: [{x:100, mid:{y:200, z:300}}] + """ + inner_type = pa.struct([ + pa.field("y", pa.int32(), nullable=False), + pa.field("z", pa.int32(), nullable=False), + ]) + outer_type = pa.struct([ + pa.field("x", pa.int32(), nullable=False), + pa.field("mid", inner_type, nullable=False), + ]) + schema = pa.schema([pa.field("top", outer_type, nullable=False)]) + col = pa.array( + [{"x": 100, "mid": {"y": 200, "z": 300}}], + type=outer_type, + ) + batch = pa.record_batch({"top": col}, schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "000001fdd123a2cceb977b1448eaba26039de58f3a782244bf1cff70a6742e2f6f220a" + + def test_three_level_struct_schema(self): + """Schema hash for three-level nested struct.""" + inner_type = pa.struct([ + pa.field("y", pa.int32(), nullable=False), + pa.field("z", pa.int32(), nullable=False), + ]) + outer_type = pa.struct([ + pa.field("x", pa.int32(), nullable=False), + pa.field("mid", inner_type, nullable=False), + ]) + schema = pa.schema([pa.field("top", outer_type, nullable=False)]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "000001f5e018be18625769644d2f10aa06d52a50df75ade8872248f7dc4b296ce997f7" + + def test_three_level_struct_hash_array(self): + """Three-level struct via hash_array. + + Struct, x: Int32> + Data: [{inner:{y:7,z:'a'}, x:5}, {inner:{y:8,z:'b'}, x:6}] + """ + y_arr = pa.array([7, 8], type=pa.int32()) + z_arr = pa.array(["a", "b"], type=pa.large_utf8()) + inner_fields = [ + pa.field("y", pa.int32(), nullable=False), + pa.field("z", pa.large_utf8(), nullable=False), + ] + inner = pa.StructArray.from_arrays( + [y_arr, z_arr], + fields=inner_fields, + ) + x_arr = pa.array([5, 6], type=pa.int32()) + inner_type = pa.struct(inner_fields) + outer_fields = [ + pa.field("inner", inner_type, nullable=False), + pa.field("x", pa.int32(), nullable=False), + ] + outer = pa.StructArray.from_arrays( + [inner, x_arr], + fields=outer_fields, + ) + result = ArrowDigester.hash_array(outer).hex() + assert result == "0000011dc18344fc9e0c026561c3a38ca128e5afa90d7b66df2cffac51a18bf106d0d0" + + +# --------------------------------------------------------------------------- +# Struct with list child +# --------------------------------------------------------------------------- + + +class TestStructWithListChild: + def test_struct_with_list_child_batch(self): + """Record batch: struct with list child. + + record: Struct> + Data: [{id:1, tags:[10,20]}, {id:2, tags:[30]}, {id:3, tags:null}] + """ + ids = pa.array([1, 2, 3], type=pa.int32()) + tags = pa.array( + [[10, 20], [30], None], + type=pa.list_(pa.field("item", pa.int32(), nullable=True)), + ) + struct_type = pa.struct([ + pa.field("id", pa.int32(), nullable=False), + pa.field("tags", pa.list_(pa.field("item", pa.int32(), nullable=True)), nullable=True), + ]) + struct_arr = pa.StructArray.from_arrays( + [ids, tags], + names=["id", "tags"], + ) + schema = pa.schema([pa.field("record", struct_type, nullable=False)]) + batch = pa.record_batch([struct_arr], schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "000001e4173182e5596f68ac3a1013caea1fc4aa3003ebc5c434bdab8db83c9173635d" + + def test_struct_with_list_child_schema(self): + """Schema hash for struct with list child.""" + struct_type = pa.struct([ + pa.field("id", pa.int32(), nullable=False), + pa.field("tags", pa.list_(pa.field("item", pa.int32(), nullable=True)), nullable=True), + ]) + schema = pa.schema([pa.field("record", struct_type, nullable=False)]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "000001310788c527cc1f9b997d616ada011d9c3790a22d17dcc4e4cf0f043539f4c0ed" + + +# --------------------------------------------------------------------------- +# Nullable struct in record batch +# --------------------------------------------------------------------------- + + +class TestNullableStructBatch: + def test_nullable_struct_batch(self): + """Nullable struct in record batch — row 1 is null. + + s: Struct nullable + Data: [{a:1,b:true}, NULL, {a:3,b:true}] + """ + a_arr = pa.array([1, 2, 3], type=pa.int32()) + b_arr = pa.array([True, False, True], type=pa.bool_()) + mask = pa.array([False, True, False], type=pa.bool_()) + struct_array = pa.StructArray.from_arrays( + [a_arr, b_arr], + names=["a", "b"], + mask=mask, + ) + struct_type = pa.struct([ + pa.field("a", pa.int32(), nullable=False), + pa.field("b", pa.bool_(), nullable=False), + ]) + schema = pa.schema([pa.field("s", struct_type, nullable=True)]) + batch = pa.record_batch([struct_array], schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "000001374a7e10ae3d1b9b46aa33f43988d5015290286e7236a3acbcd028462016e048" + + +# --------------------------------------------------------------------------- +# List via hash_array +# --------------------------------------------------------------------------- + + +class TestListOfStruct: + def test_list_of_struct_hash_array(self): + """LargeList> via hash_array. + + Row 0: [{id:1, name:'a'}, {id:2, name:'b'}] + Row 1: [{id:3, name:'c'}, {id:4, name:'d'}] + """ + ids = pa.array([1, 2, 3, 4], type=pa.int32()) + names = pa.array(["a", "b", "c", "d"], type=pa.large_utf8()) + struct_fields = [ + pa.field("id", pa.int32(), nullable=False), + pa.field("name", pa.large_utf8(), nullable=False), + ] + struct_arr = pa.StructArray.from_arrays( + [ids, names], + fields=struct_fields, + ) + inner_type = pa.struct([ + pa.field("id", pa.int32(), nullable=False), + pa.field("name", pa.large_utf8(), nullable=False), + ]) + inner_field = pa.field("item", inner_type, nullable=False) + offsets = pa.array([0, 2, 4], type=pa.int64()) + list_array = pa.LargeListArray.from_arrays(offsets, struct_arr, type=pa.large_list(inner_field)) + result = ArrowDigester.hash_array(list_array).hex() + assert result == "0000010d5f9a58c3122935886c0de076b8a90400dc9a1b3885154c734ab4616c52b626" + + def test_nullable_list_of_struct_hash_array(self): + """Nullable LargeList via hash_array — row 1 is null. + + Row 0: [{id:10, val:'p'}] + Row 1: NULL + Row 2: [{id:20, val:'q'}, {id:30, val:'r'}] + """ + ids = pa.array([10, 20, 30], type=pa.int32()) + vals = pa.array(["p", "q", "r"], type=pa.large_utf8()) + struct_fields = [ + pa.field("id", pa.int32(), nullable=False), + pa.field("val", pa.large_utf8(), nullable=False), + ] + struct_arr = pa.StructArray.from_arrays( + [ids, vals], + fields=struct_fields, + ) + inner_type = pa.struct(struct_fields) + inner_field = pa.field("item", inner_type, nullable=False) + offsets = pa.array([0, 1, 1, 3], type=pa.int64()) + mask = pa.array([False, True, False], type=pa.bool_()) + list_array = pa.LargeListArray.from_arrays(offsets, struct_arr, type=pa.large_list(inner_field), mask=mask) + result = ArrowDigester.hash_array(list_array).hex() + assert result == "000001bb51997ac0fe67d4b08c5ced13b763c0ff5a532c9f9c8932fa2a4dd96fe57ea8" + + +# --------------------------------------------------------------------------- +# Non-nullable list +# --------------------------------------------------------------------------- + + +class TestNonNullableList: + def test_list_non_nullable(self): + """List where all elements are present (no nulls at list or item level). + + Row 0: [1] + Row 1: [2, 3] + Row 2: [4, 5, 6] + """ + array = pa.array( + [[1], [2, 3], [4, 5, 6]], + type=pa.list_(pa.field("item", pa.int32(), nullable=True)), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "00000150f18fb7ba1d34afe3f1b721a1ce10d1466dd55c228119449703ccf5ab771a5b" + + +# --------------------------------------------------------------------------- +# Struct with mixed types +# --------------------------------------------------------------------------- + + +class TestStructMixedTypes: + def test_struct_mixed_types_hash_array(self): + """Struct with int, string, bool, and list children via hash_array. + + Data: [{id:1, name:'Alice', active:true, scores:[90,85]}, + {id:2, name:'Bob', active:false, scores:[70]}] + """ + id_arr = pa.array([1, 2], type=pa.int32()) + name_arr = pa.array(["Alice", "Bob"], type=pa.large_utf8()) + active_arr = pa.array([True, False], type=pa.bool_()) + scores_arr = pa.array( + [[90, 85], [70]], + type=pa.list_(pa.field("item", pa.int32(), nullable=True)), + ) + struct_fields = [ + pa.field("id", pa.int32(), nullable=False), + pa.field("name", pa.large_utf8(), nullable=False), + pa.field("active", pa.bool_(), nullable=False), + pa.field("scores", pa.list_(pa.field("item", pa.int32(), nullable=True)), nullable=False), + ] + struct_arr = pa.StructArray.from_arrays( + [id_arr, name_arr, active_arr, scores_arr], + fields=struct_fields, + ) + result = ArrowDigester.hash_array(struct_arr).hex() + assert result == "0000019f925992104e467b68e68e7dfa64ac7d39c82b002cb8c06f1c5bec19156107a3" + + +# --------------------------------------------------------------------------- +# Mixed struct + list record batch +# --------------------------------------------------------------------------- + + +class TestMixedStructListBatch: + def test_mixed_struct_list_batch(self): + """Record batch with id: Int64, meta: Struct, tags: LargeList. + + Row 0: {id:100, meta:{key:'k1', value:10}, tags:['t1','t2']} + Row 1: {id:200, meta:{key:'k2', value:null}, tags:['t3']} + """ + schema = pa.schema([ + pa.field("id", pa.int64(), nullable=False), + pa.field("meta", pa.struct([ + pa.field("key", pa.large_utf8(), nullable=False), + pa.field("value", pa.int32(), nullable=True), + ]), nullable=False), + pa.field("tags", pa.large_list( + pa.field("item", pa.large_utf8(), nullable=True)), nullable=True), + ]) + + id_arr = pa.array([100, 200], type=pa.int64()) + key_arr = pa.array(["k1", "k2"], type=pa.large_utf8()) + val_arr = pa.array([10, None], type=pa.int32()) + meta_arr = pa.StructArray.from_arrays( + [key_arr, val_arr], + names=["key", "value"], + ) + + tag_strings = pa.array(["t1", "t2", "t3"], type=pa.large_utf8()) + offsets = pa.array([0, 2, 3], type=pa.int64()) + tags_arr = pa.LargeListArray.from_arrays( + offsets, tag_strings, + type=pa.large_list(pa.field("item", pa.large_utf8(), nullable=True)), + ) + + batch = pa.record_batch([id_arr, meta_arr, tags_arr], schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "00000128fd142d16920be401eb33abf71143977164eb8952a7da20ae6673a8d1d3707a" + + def test_mixed_struct_list_schema(self): + """Schema hash for mixed struct + list batch.""" + schema = pa.schema([ + pa.field("id", pa.int64(), nullable=False), + pa.field("meta", pa.struct([ + pa.field("key", pa.large_utf8(), nullable=False), + pa.field("value", pa.int32(), nullable=True), + ]), nullable=False), + pa.field("tags", pa.large_list( + pa.field("item", pa.large_utf8(), nullable=True)), nullable=True), + ]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "0000011eb0482c51f8fb95dfae3c5ae83253a1e695dcf546fe5a3b6d75df172c5c2a45" + + +# --------------------------------------------------------------------------- +# Empty record batch (schema with 0 rows) +# --------------------------------------------------------------------------- + + +class TestEmptyBatchComplexSchema: + def test_empty_batch_complex_schema(self): + """Empty table hash (ArrowDigester(schema).finalize()) with complex schema.""" + schema = pa.schema([ + pa.field("x", pa.int32(), nullable=False), + pa.field("y", pa.large_utf8(), nullable=True), + pa.field("z", pa.struct([ + pa.field("a", pa.bool_(), nullable=False), + pa.field("b", pa.float64(), nullable=True), + ]), nullable=False), + ]) + digester = ArrowDigester(schema) + result = digester.finalize().hex() + assert result == "000001837b0cc525983aca21f8979b3c2ed7c77d22728fb3b9a56df6ed4aee540dfa83" + + def test_empty_batch_complex_schema_hash(self): + """Schema hash (hash_schema) for same complex schema.""" + schema = pa.schema([ + pa.field("x", pa.int32(), nullable=False), + pa.field("y", pa.large_utf8(), nullable=True), + pa.field("z", pa.struct([ + pa.field("a", pa.bool_(), nullable=False), + pa.field("b", pa.float64(), nullable=True), + ]), nullable=False), + ]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "00000194e104ebfea3ba38467c883d05a71884f8a1d0eace1c0dcfb00ab2d1203aa654" + + +# --------------------------------------------------------------------------- +# Batch with all-null column +# --------------------------------------------------------------------------- + + +class TestBatchAllNullColumn: + def test_batch_all_null_column(self): + """Record batch where one column is entirely null. + + id: [1, 2, 3], value: [null, null, null] + """ + schema = pa.schema([ + pa.field("id", pa.int32(), nullable=False), + pa.field("value", pa.large_utf8(), nullable=True), + ]) + batch = pa.record_batch( + { + "id": pa.array([1, 2, 3], type=pa.int32()), + "value": pa.array([None, None, None], type=pa.large_utf8()), + }, + schema=schema, + ) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "000001266d1fd64c44cf1f18ccc395bd2c712459e43807095d78115ff78497cd7ec494" + + +# --------------------------------------------------------------------------- +# Unicode strings +# --------------------------------------------------------------------------- + + +class TestUnicodeStrings: + def test_unicode_string_array(self): + """LargeString array with unicode characters.""" + array = pa.array( + ["hello", "\u4e16\u754c", "\U0001f30d\U0001f30e\U0001f30f", None, "caf\u00e9"], + type=pa.large_utf8(), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001d3f64ee0c7dac33e00445c2dd3edbb4b14cf61608682e4aae20d034dcaa3af40" + + +# --------------------------------------------------------------------------- +# Binary with varied sizes +# --------------------------------------------------------------------------- + + +class TestBinaryVariedSizes: + def test_binary_varied_sizes(self): + """Binary array with varied sizes including empty, null, and special bytes.""" + array = pa.array( + [ + b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f", + b"", + None, + b"\xff", + b"\x00", + ], + type=pa.binary(), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "0000010213a24d1014bb90e20f339fccab30ca699f2ca916d190f9a7f50077f5734b05" + + +# --------------------------------------------------------------------------- +# Struct field order independence +# --------------------------------------------------------------------------- + + +class TestStructFieldOrderIndependence: + def test_struct_3field_order_independence(self): + """Three-field struct: field order should not affect hash. + + Fields: a: Int32, b: LargeUtf8, c: Boolean + Data: [{a:1,b:'x',c:true}, {a:2,b:'y',c:false}] + """ + a = pa.array([1, 2], type=pa.int32()) + b = pa.array(["x", "y"], type=pa.large_utf8()) + c = pa.array([True, False], type=pa.bool_()) + + fa = pa.field("a", pa.int32(), nullable=False) + fb = pa.field("b", pa.large_utf8(), nullable=False) + fc = pa.field("c", pa.bool_(), nullable=False) + + # Order: a, b, c + s1 = pa.StructArray.from_arrays([a, b, c], fields=[fa, fb, fc]) + # Order: c, a, b + s2 = pa.StructArray.from_arrays([c, a, b], fields=[fc, fa, fb]) + + h1 = ArrowDigester.hash_array(s1).hex() + h2 = ArrowDigester.hash_array(s2).hex() + assert h1 == h2, "struct field order should not affect hash_array" + assert h1 == "000001c3c2f4d6414b062973fa8277701f8e0689f7656405d03cd7fef1e1ef7c05ac25" + + +# --------------------------------------------------------------------------- +# Nested struct with nullable inner struct +# --------------------------------------------------------------------------- + + +class TestNestedNullableInnerStruct: + def test_nested_nullable_inner_struct(self): + """Outer struct with nullable inner struct — inner row 1 is null. + + Outer: Struct nullable> + Data: [{a:10, inner:{p:1,q:4}}, {a:20, inner:NULL}, {a:30, inner:{p:3,q:6}}] + """ + p = pa.array([1, 2, 3], type=pa.int32()) + q = pa.array([4, 5, 6], type=pa.int32()) + inner_mask = pa.array([False, True, False], type=pa.bool_()) + inner_fields = [ + pa.field("p", pa.int32(), nullable=False), + pa.field("q", pa.int32(), nullable=False), + ] + inner = pa.StructArray.from_arrays( + [p, q], + fields=inner_fields, + mask=inner_mask, + ) + + a = pa.array([10, 20, 30], type=pa.int32()) + inner_type = pa.struct(inner_fields) + outer_fields = [ + pa.field("a", pa.int32(), nullable=False), + pa.field("inner", inner_type, nullable=True), + ] + outer = pa.StructArray.from_arrays( + [a, inner], + fields=outer_fields, + ) + result = ArrowDigester.hash_array(outer).hex() + assert result == "000001d43e8fb43a96546aa158c8defdcf186e1f15e42cf12fb2439902374d320f503c" + + +# --------------------------------------------------------------------------- +# List of list (nested lists) +# --------------------------------------------------------------------------- + + +class TestListOfList: + def test_list_of_list_hash_array(self): + """LargeList> via hash_array. + + Row 0: [[1,2], [3]] + Row 1: [[4,5], [6]] + """ + inner_values = pa.array([1, 2, 3, 4, 5, 6], type=pa.int32()) + inner_offsets = pa.array([0, 2, 3, 5, 6], type=pa.int64()) + inner_field = pa.field("item", pa.int32(), nullable=False) + inner_list = pa.LargeListArray.from_arrays( + inner_offsets, inner_values, + type=pa.large_list(inner_field), + ) + outer_offsets = pa.array([0, 2, 4], type=pa.int64()) + outer_field = pa.field("item", pa.large_list(inner_field), nullable=False) + outer_list = pa.LargeListArray.from_arrays( + outer_offsets, inner_list, + type=pa.large_list(outer_field), + ) + result = ArrowDigester.hash_array(outer_list).hex() + assert result == "0000010843e52522d6435df07140b616e2cce64d55fc0dd4ee798a1f8dab86edf7bedc" + + def test_list_of_list_batch(self): + """Record batch with LargeList> column.""" + inner_values = pa.array([1, 2, 3, 4, 5, 6], type=pa.int32()) + inner_offsets = pa.array([0, 2, 3, 5, 6], type=pa.int64()) + inner_field = pa.field("item", pa.int32(), nullable=False) + inner_list = pa.LargeListArray.from_arrays( + inner_offsets, inner_values, + type=pa.large_list(inner_field), + ) + outer_offsets = pa.array([0, 2, 4], type=pa.int64()) + outer_field = pa.field("item", pa.large_list(inner_field), nullable=False) + outer_list = pa.LargeListArray.from_arrays( + outer_offsets, inner_list, + type=pa.large_list(outer_field), + ) + schema = pa.schema([pa.field("col", pa.large_list(outer_field), nullable=False)]) + batch = pa.record_batch([outer_list], schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "000001644acff58777778402bb96ee8aa5e8fac1b3ffe7c58fe3a27860d0e93581ed10" + + def test_list_of_list_schema(self): + """Schema hash for list-of-list column.""" + inner_field = pa.field("item", pa.int32(), nullable=False) + outer_field = pa.field("item", pa.large_list(inner_field), nullable=False) + schema = pa.schema([pa.field("col", pa.large_list(outer_field), nullable=False)]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "00000141016034ea375b62e02305da3756a08f63bdfa572a285858a15b1d6d0bb66330" + + +# --------------------------------------------------------------------------- +# Large schema (12 fields) +# --------------------------------------------------------------------------- + + +class TestLargeSchema: + def test_large_schema_12_fields(self): + """Schema hash for 12-field schema with mixed types.""" + schema = pa.schema([ + pa.field("f01", pa.int8(), nullable=False), + pa.field("f02", pa.int16(), nullable=True), + pa.field("f03", pa.int32(), nullable=False), + pa.field("f04", pa.int64(), nullable=True), + pa.field("f05", pa.float32(), nullable=False), + pa.field("f06", pa.float64(), nullable=True), + pa.field("f07", pa.bool_(), nullable=False), + pa.field("f08", pa.large_utf8(), nullable=True), + pa.field("f09", pa.large_binary(), nullable=False), + pa.field("f10", pa.date32(), nullable=True), + pa.field("f11", pa.struct([ + pa.field("nested_a", pa.int32(), nullable=False), + pa.field("nested_b", pa.large_utf8(), nullable=True), + ]), nullable=False), + pa.field("f12", pa.large_list( + pa.field("item", pa.int32(), nullable=True)), nullable=True), + ]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "000001d4b6edc1304b0e981460aefa31813af93631c46558569a5ba8d5613cdbd722bd" + + +# --------------------------------------------------------------------------- +# Schema with nested list-of-struct +# --------------------------------------------------------------------------- + + +class TestSchemaListOfStructWithList: + def test_schema_list_of_struct_with_list(self): + """Schema hash for LargeList>>.""" + schema = pa.schema([ + pa.field("events", pa.large_list( + pa.field("item", pa.struct([ + pa.field("ts", pa.int64(), nullable=False), + pa.field("msg", pa.large_utf8(), nullable=True), + pa.field("tags", pa.large_list( + pa.field("item", pa.large_utf8(), nullable=False)), + nullable=True), + ]), nullable=False)), nullable=True), + ]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "0000017f5258ff48e85b2a63cd1a29590ff3bfca39bf718365200ad74a8232bb654ec6" + + +# --------------------------------------------------------------------------- +# Deeply nested schema (4 levels) +# --------------------------------------------------------------------------- + + +class TestSchema4LevelNestedStruct: + def test_schema_4_level_nested_struct(self): + """Schema hash for 4-level nested struct: root.l1.l2.l3.leaf.""" + schema = pa.schema([ + pa.field("root", pa.struct([ + pa.field("l1", pa.struct([ + pa.field("l2", pa.struct([ + pa.field("l3", pa.struct([ + pa.field("leaf", pa.int32(), nullable=False), + ]), nullable=False), + ]), nullable=False), + ]), nullable=False), + ]), nullable=False), + ]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "0000017e33cfc98b4338141e1d5dae0eef992e2c22ff89fbb9be054f7fae96f1b8577e" + + +# --------------------------------------------------------------------------- +# Nullable list-of-struct with nullable children +# --------------------------------------------------------------------------- + + +class TestNullableListStructNullableChildren: + def test_nullable_list_struct_nullable_children_batch(self): + """Record batch: nullable LargeList with nullable children. + + items: LargeList> nullable + Row 0: [{id:1, label:'a'}, {id:2, label:null}] + Row 1: NULL + """ + ids = pa.array([1, 2, 3], type=pa.int32()) + labels = pa.array(["a", None, "c"], type=pa.large_utf8()) + struct_fields = [ + pa.field("id", pa.int32(), nullable=False), + pa.field("label", pa.large_utf8(), nullable=True), + ] + struct_arr = pa.StructArray.from_arrays( + [ids, labels], + fields=struct_fields, + ) + inner_type = pa.struct(struct_fields) + inner_field = pa.field("item", inner_type, nullable=False) + offsets = pa.array([0, 2, 3], type=pa.int64()) + mask = pa.array([False, True], type=pa.bool_()) + list_array = pa.LargeListArray.from_arrays( + offsets, struct_arr, + type=pa.large_list(inner_field), + mask=mask, + ) + schema = pa.schema([ + pa.field("items", pa.large_list(inner_field), nullable=True), + ]) + batch = pa.record_batch([list_array], schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "0000018a03ac4fd279a17240cb084cef23b1dbda814a60dba70f2147cb468f9c44af96" + + +# --------------------------------------------------------------------------- +# Struct empty batch (schema only, struct fields) +# --------------------------------------------------------------------------- + + +class TestStructEmptyBatch: + def test_struct_empty_batch(self): + """Empty table hash with struct schema (0 rows).""" + schema = pa.schema([ + pa.field("person", pa.struct([ + pa.field("name", pa.large_utf8(), nullable=False), + pa.field("age", pa.int32(), nullable=False), + pa.field("score", pa.float64(), nullable=False), + ]), nullable=False), + ]) + d = ArrowDigester(schema) + result = d.finalize().hex() + assert result == "00000120a2341ffaf6b2413ebdb36aa062aec7c9468e946cbbba353b1ab5c5f6d1a773" + + +# --------------------------------------------------------------------------- +# Multi-batch with struct column +# --------------------------------------------------------------------------- + + +class TestMultiBatchStruct: + def test_multi_batch_struct(self): + """Multi-batch streaming with struct column. + + Schema: s: Struct + Batch 1: [{x:1,y:10}, {x:2,y:null}] + Batch 2: [{x:3,y:30}] + """ + struct_type = pa.struct([ + pa.field("x", pa.int32(), nullable=False), + pa.field("y", pa.int32(), nullable=True), + ]) + schema = pa.schema([pa.field("s", struct_type, nullable=False)]) + + s1 = pa.StructArray.from_arrays( + [pa.array([1, 2], type=pa.int32()), pa.array([10, None], type=pa.int32())], + names=["x", "y"], + ) + batch1 = pa.record_batch([s1], schema=schema) + + s2 = pa.StructArray.from_arrays( + [pa.array([3], type=pa.int32()), pa.array([30], type=pa.int32())], + names=["x", "y"], + ) + batch2 = pa.record_batch([s2], schema=schema) + + d = ArrowDigester(schema) + d.update(batch1) + d.update(batch2) + result = d.finalize().hex() + assert result == "000001d6b36c3858d63ed06e99de401b83e6255070da784f2d88c376b15e53bacbe86c" + + +# --------------------------------------------------------------------------- +# Multi-batch with list column +# --------------------------------------------------------------------------- + + +class TestMultiBatchList: + def test_multi_batch_list(self): + """Multi-batch streaming with LargeList column. + + Batch 1: [[1,2], null] + Batch 2: [[3,4,5]] + """ + list_type = pa.large_list(pa.field("item", pa.int32(), nullable=True)) + schema = pa.schema([pa.field("items", list_type, nullable=True)]) + + arr1 = pa.array( + [[1, 2], None], + type=list_type, + ) + batch1 = pa.record_batch([arr1], schema=schema) + + arr2 = pa.array( + [[3, 4, 5]], + type=list_type, + ) + batch2 = pa.record_batch([arr2], schema=schema) + + d = ArrowDigester(schema) + d.update(batch1) + d.update(batch2) + result = d.finalize().hex() + assert result == "000001fcf529d339593cb8537af4c1ed7d02aae69cfbbd2608b02e811035edb6ba4ec3" + + +# --------------------------------------------------------------------------- +# Float special values +# --------------------------------------------------------------------------- + + +class TestFloatSpecialValues: + def test_float64_special_values(self): + """Float64 array with NaN, Inf, -Inf, 0.0, -0.0.""" + array = pa.array( + [float("nan"), float("inf"), float("-inf"), 0.0, -0.0], + type=pa.float64(), + ) + result = ArrowDigester.hash_array(array).hex() + assert result == "000001947acf002083724864f155cf2857b1c9107749f6c8bfec13ea84c20fa9f20b5a" + + +# --------------------------------------------------------------------------- +# Realistic batch (struct + list + scalar) +# --------------------------------------------------------------------------- + + +class TestRealisticBatch: + def test_realistic_batch(self): + """Realistic record batch: name, coords struct, readings list. + + name: ['A', null, 'C'] + coords: [{lat:40.7,lon:-74.0}, {lat:34.0,lon:-118.2}, {lat:51.5,lon:-0.1}] + readings: [[1.0,2.0], null, [3.0,4.0,5.0]] + """ + schema = pa.schema([ + pa.field("name", pa.large_utf8(), nullable=True), + pa.field("coords", pa.struct([ + pa.field("lat", pa.float64(), nullable=False), + pa.field("lon", pa.float64(), nullable=False), + ]), nullable=False), + pa.field("readings", pa.large_list( + pa.field("item", pa.float32(), nullable=False)), nullable=True), + ]) + + name_arr = pa.array(["A", None, "C"], type=pa.large_utf8()) + lat_arr = pa.array([40.7, 34.0, 51.5], type=pa.float64()) + lon_arr = pa.array([-74.0, -118.2, -0.1], type=pa.float64()) + coords_arr = pa.StructArray.from_arrays( + [lat_arr, lon_arr], + names=["lat", "lon"], + ) + + reading_vals = pa.array([1.0, 2.0, 3.0, 4.0, 5.0], type=pa.float32()) + offsets = pa.array([0, 2, 2, 5], type=pa.int64()) + reading_mask = pa.array([False, True, False], type=pa.bool_()) + readings_arr = pa.LargeListArray.from_arrays( + offsets, reading_vals, + type=pa.large_list(pa.field("item", pa.float32(), nullable=False)), + mask=reading_mask, + ) + + batch = pa.record_batch([name_arr, coords_arr, readings_arr], schema=schema) + result = ArrowDigester.hash_record_batch(batch).hex() + assert result == "0000010d6dda2bffa3477ea97d880b38e018f8ae083e3059db77876b48e9a14a8f17ac" + + def test_realistic_schema(self): + """Schema hash for realistic batch.""" + schema = pa.schema([ + pa.field("name", pa.large_utf8(), nullable=True), + pa.field("coords", pa.struct([ + pa.field("lat", pa.float64(), nullable=False), + pa.field("lon", pa.float64(), nullable=False), + ]), nullable=False), + pa.field("readings", pa.large_list( + pa.field("item", pa.float32(), nullable=False)), nullable=True), + ]) + result = ArrowDigester.hash_schema(schema).hex() + assert result == "0000011c85d47bb95c90c6aa05b8ab930cfbb369136fffd8e120cee5d94dd84ed3c356"