Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies = [
"deltalake>=1.0.2",
"graphviz>=0.21",
"gitpython>=3.1.45",
"starfix>=0.1.3",
"starfix @ git+https://github.com/nauticalab/starfix-python.git@344617bc6f7fcabab5c011d5774ed47de33f21de",
"pygraphviz>=1.14",
"tzdata>=2024.1",
"uuid-utils>=0.11.1",
Comment on lines 19 to 25
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dependencies now lists "starfix" with no version constraint, and the git source override is uv-specific. pip install -e . (documented in README) will ignore [tool.uv.sources] and likely install the PyPI starfix, which may not match the git API/behavior (e.g., availability of ArrowDigester). Consider using a PEP 508 direct URL requirement (or pinning to a published version that contains ArrowDigester) so non-uv installs are deterministic and compatible.

Copilot uses AI. Check for mistakes.
Expand Down
8 changes: 3 additions & 5 deletions src/orcapod/contexts/data/v0.1.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,9 @@
}
},
"arrow_hasher": {
"_class": "orcapod.hashing.arrow_hashers.SemanticArrowHasher",
"_class": "orcapod.hashing.arrow_hashers.StarfixArrowHasher",
"_config": {
"hasher_id": "arrow_v0.1",
"hash_algorithm": "sha256",
"chunk_size": 8192,
"serialization_method": "logical",
"semantic_registry": {
"_ref": "semantic_registry"
}
Comment on lines +25 to 30
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The context config still sets hasher_id to "arrow_v0.1" even though the Arrow hashing implementation has been swapped to starfix. Reusing the same hasher_id for a different digest algorithm makes stored hashes ambiguous; either bump the id here (and in the versioned factory/tests) or keep the old implementation for arrow_v0.1 and introduce starfix under a new id.

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -83,7 +80,8 @@
"changelog": [
"Initial release with Path semantic type support",
"Basic SHA-256 hashing for files and objects",
"Arrow logical serialization method"
"Arrow logical serialization method",
"Introduced arrow_v0.1 StarfixArrowHasher using starfix ArrowDigester for cross-language-compatible Arrow hashing"
]
}
}
144 changes: 143 additions & 1 deletion src/orcapod/hashing/arrow_hashers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any

import pyarrow as pa
from starfix import ArrowDigester

from orcapod.hashing import arrow_serialization
from orcapod.hashing.visitors import SemanticHashingVisitor
Expand Down Expand Up @@ -197,7 +198,7 @@ def hash_table(self, table: pa.Table | pa.RecordBatch) -> ContentHash:

return ContentHash(method=self.hasher_id, digest=hasher.digest())

def hash_table_with_metadata(self, table: pa.Table) -> dict[str, Any]:
def hash_table_with_metadata(self, table: pa.Table) -> dict[str, Any]: # noqa: C901
"""
Compute hash with additional metadata about the process.

Expand Down Expand Up @@ -233,3 +234,144 @@ def hash_table_with_metadata(self, table: pa.Table) -> dict[str, Any]:
"processed_columns": processed_columns,
"column_order": [field.name for field in table.schema],
}


class StarfixArrowHasher:
"""
Arrow table hasher backed by the starfix-python ``ArrowDigester``.

This hasher produces cross-language-compatible, deterministic content
addresses for Arrow tables and schemas by delegating to the canonical
StarFix specification (``starfix-python``).

Pipeline
--------
1. **Semantic pre-processing** — the ``SemanticHashingVisitor`` traverses
every column and replaces recognised semantic types (e.g. ``Path``
structs) with their content-addressed hash strings. This step runs
before the Arrow bytes are ever touched by starfix, so the final hash
captures *file content* for path-typed columns rather than the raw
path string.
2. **Starfix hashing** — ``ArrowDigester.hash_table`` (or
``ArrowDigester.hash_schema``) is called on the pre-processed table /
schema. The digester is column-order-independent and normalises
``Utf8`` → ``LargeUtf8``, ``Binary`` → ``LargeBinary``, etc.,
producing a 35-byte versioned SHA-256 digest that is byte-for-byte
identical to the Rust ``starfix`` crate output.

Parameters
----------
semantic_registry:
Registry of semantic type converters used during pre-processing.
hasher_id:
String identifier embedded in every ``ContentHash`` produced by
this hasher. Bump this value whenever the hash algorithm changes
so that stored hashes remain distinguishable.
"""

def __init__(
self,
semantic_registry: SemanticTypeRegistry,
hasher_id: str,
) -> None:
self._hasher_id = hasher_id
self.semantic_registry = semantic_registry

@property
def hasher_id(self) -> str:
return self._hasher_id

def _process_table_columns(self, table: pa.Table | pa.RecordBatch) -> pa.Table:
"""Replace semantic-typed columns with their content-hash strings."""
new_columns: list[pa.Array] = []
new_fields: list[pa.Field] = []

for i, field in enumerate(table.schema):
# Short-circuit: primitive columns cannot contain semantic types, so skip
# the costly Python round-trip and reuse the original Arrow array directly.
if not (
pa.types.is_struct(field.type)
or pa.types.is_list(field.type)
or pa.types.is_large_list(field.type)
or pa.types.is_fixed_size_list(field.type)
or pa.types.is_map(field.type)
):
new_columns.append(table.column(i))
new_fields.append(field)
continue

column_data = table.column(i).to_pylist()
visitor = SemanticHashingVisitor(self.semantic_registry)

try:
new_type: pa.DataType | None = None
processed_data: list[Any] = []
Comment on lines +302 to +308
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StarfixArrowHasher._process_table_columns rebuilds fields via pa.field(field.name, new_type), which drops the original field’s nullable flag (and any metadata). This can change the schema seen by ArrowDigester.hash_table, yielding different hashes for the same logical data (and can also mask non-nullable fields). Preserve field properties when rebuilding the schema (e.g., carry over nullable and metadata).

Copilot uses AI. Check for mistakes.
for value in column_data:
processed_type, processed_value = visitor.visit(field.type, value)
# Infer the output type from the first non-null processed value.
# When the first row is null, visit_struct returns the original
# struct type rather than the converted type (e.g. large_string),
# which would cause pa.array() to fail for subsequent non-null rows.
if new_type is None and processed_value is not None:
new_type = processed_type
processed_data.append(processed_value)

# For empty or all-null columns there are no non-null values to infer
# the type from; fall back to the field's declared type.
if new_type is None:
new_type = field.type
new_columns.append(pa.array(processed_data, type=new_type))
# Preserve original field attributes (nullable, metadata) while
# updating only the type, so the schema fed to starfix remains faithful.
new_fields.append(field.with_type(new_type))

except Exception as exc:
raise RuntimeError(
f"Failed to process column '{field.name}': {exc}"
) from exc

# Preserve the original schema-level metadata while using updated fields.
return pa.table(new_columns, schema=pa.schema(new_fields, metadata=table.schema.metadata))

def hash_schema(self, schema: pa.Schema) -> ContentHash:
"""Hash an Arrow schema using the starfix canonical algorithm.

Parameters
----------
schema:
The ``pyarrow.Schema`` to hash.

Returns
-------
ContentHash
A ``ContentHash`` whose ``digest`` is the 35-byte versioned
SHA-256 produced by ``ArrowDigester.hash_schema``.
"""
digest = ArrowDigester.hash_schema(schema)
return ContentHash(method=self._hasher_id, digest=digest)

def hash_table(self, table: pa.Table | pa.RecordBatch) -> ContentHash:
"""Hash an Arrow table (or ``RecordBatch``) using starfix.

Semantic types are resolved to their content-hash strings before
the table is passed to ``ArrowDigester.hash_table``, ensuring that
path-typed columns contribute their *file content* hash rather than
the literal path string.

Parameters
----------
table:
The ``pa.Table`` or ``pa.RecordBatch`` to hash.

Returns
-------
ContentHash
A ``ContentHash`` whose ``digest`` is the 35-byte versioned
SHA-256 produced by ``ArrowDigester.hash_table``.
"""
if isinstance(table, pa.RecordBatch):
table = pa.Table.from_batches([table])

processed_table = self._process_table_columns(table)
digest = ArrowDigester.hash_table(processed_table)
return ContentHash(method=self._hasher_id, digest=digest)
6 changes: 3 additions & 3 deletions src/orcapod/hashing/versioned_hashers.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def get_versioned_semantic_arrow_hasher(
ArrowHasherProtocol
A fully configured SemanticArrowHasher instance.
"""
from orcapod.hashing.arrow_hashers import SemanticArrowHasher
from orcapod.hashing.arrow_hashers import StarfixArrowHasher
from orcapod.hashing.file_hashers import BasicFileHasher
from orcapod.semantic_types.semantic_registry import SemanticTypeRegistry
from orcapod.semantic_types.semantic_struct_converters import PathStructConverter
Expand All @@ -138,11 +138,11 @@ def get_versioned_semantic_arrow_hasher(
registry.register_converter("path", path_converter)

logger.debug(
"get_versioned_semantic_arrow_hasher: creating SemanticArrowHasher "
"get_versioned_semantic_arrow_hasher: creating StarfixArrowHasher "
"(hasher_id=%r)",
hasher_id,
)
hasher: Any = SemanticArrowHasher(
hasher: Any = StarfixArrowHasher(
hasher_id=hasher_id,
semantic_registry=registry,
)
Comment on lines 140 to 148
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StarfixArrowHasher is now the factory output, but the factory still takes the default _CURRENT_ARROW_HASHER_ID (currently "arrow_v0.1"). Since the underlying hashing algorithm has changed, keeping the same hasher_id risks treating old (IPC+hashlib) and new (starfix) digests as comparable. Update the current Arrow hasher_id constant to a new version and make sure contexts/tests align with it.

Copilot uses AI. Check for mistakes.
Expand Down
9 changes: 5 additions & 4 deletions test-objective/unit/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,17 @@ def test_normal_construction(self):
source = ArrowTableSource(_simple_table(), tag_columns=["name"])
assert source is not None

def test_empty_table_raises(self):
"""An empty table raises an error during construction."""
def test_empty_table_constructs_successfully(self):
"""An empty table constructs successfully and produces zero output rows."""
empty = pa.table(
{
"name": pa.array([], type=pa.large_string()),
"age": pa.array([], type=pa.int64()),
}
)
with pytest.raises(Exception):
ArrowTableSource(empty, tag_columns=["name"])
source = ArrowTableSource(empty, tag_columns=["name"])
assert source is not None
assert source.as_table().num_rows == 0

def test_missing_tag_columns_raises_value_error(self):
"""Specifying tag columns not in the table raises ValueError."""
Expand Down
Loading
Loading