-
Notifications
You must be signed in to change notification settings - Fork 5
feat(hashing): integrate starfix-python for Arrow schema and data hashing #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6718dd3
5c42ec5
385d7ec
6c8666a
6c1abd6
a292930
03cdc0c
ab01703
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,12 +22,9 @@ | |
| } | ||
| }, | ||
| "arrow_hasher": { | ||
| "_class": "orcapod.hashing.arrow_hashers.SemanticArrowHasher", | ||
| "_class": "orcapod.hashing.arrow_hashers.StarfixArrowHasher", | ||
| "_config": { | ||
| "hasher_id": "arrow_v0.1", | ||
| "hash_algorithm": "sha256", | ||
| "chunk_size": 8192, | ||
| "serialization_method": "logical", | ||
| "semantic_registry": { | ||
| "_ref": "semantic_registry" | ||
| } | ||
|
Comment on lines
+25
to
30
|
||
|
|
@@ -83,7 +80,8 @@ | |
| "changelog": [ | ||
| "Initial release with Path semantic type support", | ||
| "Basic SHA-256 hashing for files and objects", | ||
| "Arrow logical serialization method" | ||
| "Arrow logical serialization method", | ||
| "Introduced arrow_v0.1 StarfixArrowHasher using starfix ArrowDigester for cross-language-compatible Arrow hashing" | ||
| ] | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ | |
| from typing import Any | ||
|
|
||
| import pyarrow as pa | ||
| from starfix import ArrowDigester | ||
|
|
||
| from orcapod.hashing import arrow_serialization | ||
| from orcapod.hashing.visitors import SemanticHashingVisitor | ||
|
|
@@ -197,7 +198,7 @@ def hash_table(self, table: pa.Table | pa.RecordBatch) -> ContentHash: | |
|
|
||
| return ContentHash(method=self.hasher_id, digest=hasher.digest()) | ||
|
|
||
| def hash_table_with_metadata(self, table: pa.Table) -> dict[str, Any]: | ||
| def hash_table_with_metadata(self, table: pa.Table) -> dict[str, Any]: # noqa: C901 | ||
| """ | ||
| Compute hash with additional metadata about the process. | ||
|
|
||
|
|
@@ -233,3 +234,144 @@ def hash_table_with_metadata(self, table: pa.Table) -> dict[str, Any]: | |
| "processed_columns": processed_columns, | ||
| "column_order": [field.name for field in table.schema], | ||
| } | ||
|
|
||
|
|
||
| class StarfixArrowHasher: | ||
| """ | ||
| Arrow table hasher backed by the starfix-python ``ArrowDigester``. | ||
|
|
||
| This hasher produces cross-language-compatible, deterministic content | ||
| addresses for Arrow tables and schemas by delegating to the canonical | ||
| StarFix specification (``starfix-python``). | ||
|
|
||
| Pipeline | ||
| -------- | ||
| 1. **Semantic pre-processing** — the ``SemanticHashingVisitor`` traverses | ||
| every column and replaces recognised semantic types (e.g. ``Path`` | ||
| structs) with their content-addressed hash strings. This step runs | ||
| before the Arrow bytes are ever touched by starfix, so the final hash | ||
| captures *file content* for path-typed columns rather than the raw | ||
| path string. | ||
| 2. **Starfix hashing** — ``ArrowDigester.hash_table`` (or | ||
| ``ArrowDigester.hash_schema``) is called on the pre-processed table / | ||
| schema. The digester is column-order-independent and normalises | ||
| ``Utf8`` → ``LargeUtf8``, ``Binary`` → ``LargeBinary``, etc., | ||
| producing a 35-byte versioned SHA-256 digest that is byte-for-byte | ||
| identical to the Rust ``starfix`` crate output. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| semantic_registry: | ||
| Registry of semantic type converters used during pre-processing. | ||
| hasher_id: | ||
| String identifier embedded in every ``ContentHash`` produced by | ||
| this hasher. Bump this value whenever the hash algorithm changes | ||
| so that stored hashes remain distinguishable. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| semantic_registry: SemanticTypeRegistry, | ||
| hasher_id: str, | ||
| ) -> None: | ||
| self._hasher_id = hasher_id | ||
| self.semantic_registry = semantic_registry | ||
|
|
||
| @property | ||
| def hasher_id(self) -> str: | ||
| return self._hasher_id | ||
|
|
||
| def _process_table_columns(self, table: pa.Table | pa.RecordBatch) -> pa.Table: | ||
| """Replace semantic-typed columns with their content-hash strings.""" | ||
| new_columns: list[pa.Array] = [] | ||
| new_fields: list[pa.Field] = [] | ||
|
|
||
| for i, field in enumerate(table.schema): | ||
| # Short-circuit: primitive columns cannot contain semantic types, so skip | ||
| # the costly Python round-trip and reuse the original Arrow array directly. | ||
| if not ( | ||
| pa.types.is_struct(field.type) | ||
| or pa.types.is_list(field.type) | ||
| or pa.types.is_large_list(field.type) | ||
| or pa.types.is_fixed_size_list(field.type) | ||
| or pa.types.is_map(field.type) | ||
| ): | ||
| new_columns.append(table.column(i)) | ||
| new_fields.append(field) | ||
| continue | ||
|
|
||
| column_data = table.column(i).to_pylist() | ||
| visitor = SemanticHashingVisitor(self.semantic_registry) | ||
|
|
||
| try: | ||
| new_type: pa.DataType | None = None | ||
| processed_data: list[Any] = [] | ||
|
Comment on lines
+302
to
+308
|
||
| for value in column_data: | ||
| processed_type, processed_value = visitor.visit(field.type, value) | ||
| # Infer the output type from the first non-null processed value. | ||
| # When the first row is null, visit_struct returns the original | ||
| # struct type rather than the converted type (e.g. large_string), | ||
| # which would cause pa.array() to fail for subsequent non-null rows. | ||
| if new_type is None and processed_value is not None: | ||
| new_type = processed_type | ||
| processed_data.append(processed_value) | ||
|
|
||
| # For empty or all-null columns there are no non-null values to infer | ||
| # the type from; fall back to the field's declared type. | ||
| if new_type is None: | ||
| new_type = field.type | ||
| new_columns.append(pa.array(processed_data, type=new_type)) | ||
| # Preserve original field attributes (nullable, metadata) while | ||
| # updating only the type, so the schema fed to starfix remains faithful. | ||
| new_fields.append(field.with_type(new_type)) | ||
|
|
||
| except Exception as exc: | ||
| raise RuntimeError( | ||
| f"Failed to process column '{field.name}': {exc}" | ||
| ) from exc | ||
|
|
||
| # Preserve the original schema-level metadata while using updated fields. | ||
| return pa.table(new_columns, schema=pa.schema(new_fields, metadata=table.schema.metadata)) | ||
|
|
||
| def hash_schema(self, schema: pa.Schema) -> ContentHash: | ||
| """Hash an Arrow schema using the starfix canonical algorithm. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| schema: | ||
| The ``pyarrow.Schema`` to hash. | ||
|
|
||
| Returns | ||
| ------- | ||
| ContentHash | ||
| A ``ContentHash`` whose ``digest`` is the 35-byte versioned | ||
| SHA-256 produced by ``ArrowDigester.hash_schema``. | ||
| """ | ||
| digest = ArrowDigester.hash_schema(schema) | ||
| return ContentHash(method=self._hasher_id, digest=digest) | ||
|
|
||
| def hash_table(self, table: pa.Table | pa.RecordBatch) -> ContentHash: | ||
| """Hash an Arrow table (or ``RecordBatch``) using starfix. | ||
|
|
||
| Semantic types are resolved to their content-hash strings before | ||
| the table is passed to ``ArrowDigester.hash_table``, ensuring that | ||
| path-typed columns contribute their *file content* hash rather than | ||
| the literal path string. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| table: | ||
| The ``pa.Table`` or ``pa.RecordBatch`` to hash. | ||
|
|
||
| Returns | ||
| ------- | ||
| ContentHash | ||
| A ``ContentHash`` whose ``digest`` is the 35-byte versioned | ||
| SHA-256 produced by ``ArrowDigester.hash_table``. | ||
| """ | ||
| if isinstance(table, pa.RecordBatch): | ||
| table = pa.Table.from_batches([table]) | ||
|
|
||
| processed_table = self._process_table_columns(table) | ||
| digest = ArrowDigester.hash_table(processed_table) | ||
| return ContentHash(method=self._hasher_id, digest=digest) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -123,7 +123,7 @@ def get_versioned_semantic_arrow_hasher( | |
| ArrowHasherProtocol | ||
| A fully configured SemanticArrowHasher instance. | ||
| """ | ||
| from orcapod.hashing.arrow_hashers import SemanticArrowHasher | ||
| from orcapod.hashing.arrow_hashers import StarfixArrowHasher | ||
| from orcapod.hashing.file_hashers import BasicFileHasher | ||
| from orcapod.semantic_types.semantic_registry import SemanticTypeRegistry | ||
| from orcapod.semantic_types.semantic_struct_converters import PathStructConverter | ||
|
|
@@ -138,11 +138,11 @@ def get_versioned_semantic_arrow_hasher( | |
| registry.register_converter("path", path_converter) | ||
|
|
||
| logger.debug( | ||
| "get_versioned_semantic_arrow_hasher: creating SemanticArrowHasher " | ||
| "get_versioned_semantic_arrow_hasher: creating StarfixArrowHasher " | ||
| "(hasher_id=%r)", | ||
| hasher_id, | ||
| ) | ||
| hasher: Any = SemanticArrowHasher( | ||
| hasher: Any = StarfixArrowHasher( | ||
| hasher_id=hasher_id, | ||
| semantic_registry=registry, | ||
| ) | ||
|
Comment on lines
140
to
148
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dependenciesnow lists"starfix"with no version constraint, and the git source override is uv-specific.pip install -e .(documented in README) will ignore[tool.uv.sources]and likely install the PyPIstarfix, which may not match the git API/behavior (e.g., availability ofArrowDigester). Consider using a PEP 508 direct URL requirement (or pinning to a published version that containsArrowDigester) so non-uv installs are deterministic and compatible.