Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR introduces a new three-layer database abstraction (DBConnectorProtocol → ConnectorArrowDatabase / DBTableSource) to reduce duplicated DB-specific logic across ArrowDatabase backends and Source ingestion.
Changes:
- Adds
DBConnectorProtocol(+ColumnInfo) as the single per-DB-technology interface for schema introspection, Arrow-typed reads, and upserts. - Implements
ConnectorArrowDatabase(genericArrowDatabaseProtocol) andDBTableSource(generic read-onlyRootSource) on top of any connector. - Adds comprehensive test coverage for connector/database/source behavior and protocol conformance, plus design/spec documentation.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/orcapod/protocols/db_connector_protocol.py |
Defines ColumnInfo + DBConnectorProtocol contract for DB technologies. |
src/orcapod/protocols/database_protocols.py |
Re-exports connector abstractions and defines __all__. |
src/orcapod/databases/connector_arrow_database.py |
Implements generic ArrowDatabase wrapper around a connector (pending batches + flush). |
src/orcapod/databases/__init__.py |
Exports ConnectorArrowDatabase and updates module comments. |
src/orcapod/core/sources/db_table_source.py |
Implements DB-backed RootSource that defaults PK columns to tag columns. |
src/orcapod/core/sources/__init__.py |
Exports DBTableSource. |
tests/test_databases/test_connector_arrow_database.py |
New end-to-end tests for connector protocol + ConnectorArrowDatabase. |
tests/test_core/sources/test_db_table_source.py |
New tests for DBTableSource construction, behavior, hashing, config. |
superpowers/specs/2026-03-21-db-connector-design.md |
Design spec for the abstraction and intended semantics. |
superpowers/plans/2026-03-21-db-connector-plan.md |
Implementation plan for the spike and follow-ups. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| committed = self._get_committed_table(record_path) | ||
| committed_ids: set[str] = set() | ||
| if committed is not None: | ||
| committed_ids = set( | ||
| cast(list[str], committed[self.RECORD_ID_COLUMN].to_pylist()) | ||
| ) |
There was a problem hiding this comment.
skip_duplicates=True currently triggers _get_committed_table() and materializes committed_ids by reading the full committed table (SELECT *). For large tables this is O(n) per write and can be a major bottleneck. If possible, avoid full-table reads here (e.g., query only the id column, cache known ids, or defer duplicate handling to upsert_records(skip_existing=True) at flush time).
| committed = self._get_committed_table(record_path) | |
| committed_ids: set[str] = set() | |
| if committed is not None: | |
| committed_ids = set( | |
| cast(list[str], committed[self.RECORD_ID_COLUMN].to_pylist()) | |
| ) | |
| # Lazily initialize cache for committed record IDs to avoid | |
| # repeatedly scanning the full committed table for each write. | |
| if not hasattr(self, "_committed_record_ids_cache"): | |
| # Mapping: record_key -> set of committed record IDs | |
| self._committed_record_ids_cache = {} | |
| committed_ids = self._committed_record_ids_cache.get(record_key) | |
| if committed_ids is None: | |
| committed = self._get_committed_table(record_path) | |
| committed_ids = set() | |
| if committed is not None: | |
| committed_ids = set( | |
| cast(list[str], committed[self.RECORD_ID_COLUMN].to_pylist()) | |
| ) | |
| self._committed_record_ids_cache[record_key] = committed_ids |
There was a problem hiding this comment.
Fixed in commit ecf08c1 (Copilot review). The O(n) committed-table scan was removed entirely for skip_duplicates=True. Instead, we now only check for conflicts in the in-flight pending batch (which is already O(k) where k = pending row count), and we track a per-batch _pending_skip_existing flag. At flush() time that flag is passed straight through to connector.upsert_records(skip_existing=True), so each connector can use its native INSERT-OR-IGNORE (or equivalent) without any Python-side full-table read.
| self._connector.upsert_records( | ||
| table_name, | ||
| pending, | ||
| id_column=self.RECORD_ID_COLUMN, | ||
| skip_existing=False, | ||
| ) |
There was a problem hiding this comment.
flush() always calls self._connector.upsert_records(..., skip_existing=False), even for batches added with skip_duplicates=True. This prevents connector implementations from using native "INSERT ... DO NOTHING" semantics and forces duplicate handling into Python-side prefiltering. Consider plumbing a per-batch skip_existing flag through pending state and passing it to upsert_records so connectors can handle duplicates efficiently.
There was a problem hiding this comment.
Fixed in commit ecf08c1 (Copilot review). Added _pending_skip_existing: dict[str, bool] to __init__. When add_records(skip_duplicates=True) is called, the flag is set to True for that batch's record key. flush() then pops the flag and passes it as skip_existing to upsert_records, so connectors can use native INSERT-OR-IGNORE semantics. Defaults to False (replace) for all batches added without skip_duplicates.
| columns = _arrow_schema_to_column_infos(pending.schema) | ||
| self._connector.create_table_if_not_exists( | ||
| table_name, columns, pk_column=self.RECORD_ID_COLUMN | ||
| ) | ||
| self._connector.upsert_records( |
There was a problem hiding this comment.
flush() creates the table if missing, but it doesn't validate that pending.schema matches the existing table schema before calling upsert_records. The PR description/spec says schema evolution is out of scope and a schema mismatch should raise ValueError; without an explicit check, the connector may fail later with a DB-specific error (or partially write). Consider comparing pending.schema against connector.get_column_info(table_name) (or similar) and raising a clear ValueError on mismatch.
There was a problem hiding this comment.
Fixed in commit ecf08c1 (Copilot review). flush() now calls connector.get_table_names() before create_table_if_not_exists. If the table already exists, it compares column name→Arrow-type mappings from connector.get_column_info(table_name) against the pending schema. A clear ValueError is raised on any mismatch, with both column lists in the message. This fires before any write, so no partial data is written.
| for record_key in list(self._pending_batches.keys()): | ||
| record_path = tuple(record_key.split("/")) | ||
| table_name = self._path_to_table_name(record_path) |
There was a problem hiding this comment.
flush() reconstructs record_path via tuple(record_key.split("/")), but record_key is built using "/".join(record_path). If any path component contains "/" (or if different paths join to the same string), flush will write to the wrong table and/or merge unrelated pending batches. Use record_path (the tuple) as the dict key instead of a joined string, or mirror InMemoryArrowDatabase._validate_record_path by rejecting unsafe characters like "/" and "\0" in path components.
There was a problem hiding this comment.
Fixed in commit ecf08c1 (Copilot review). Added '/' in component or '\\0' in component checks to _validate_record_path, mirroring InMemoryArrowDatabase. This ensures that _get_record_key's "/".join(record_path) and flush()'s record_key.split("/") always round-trip correctly. A new test test_path_component_with_slash_raises and test_path_component_with_null_byte_raises verify this.
|
|
||
| def __init__( | ||
| self, | ||
| connector: "DBConnectorProtocol", |
There was a problem hiding this comment.
use from __future__ import annotations so that you can drop quotes around type hints everywhere
There was a problem hiding this comment.
Fixed in commit c23eaa4. db_table_source.py already had from __future__ import annotations but still used quoted hints like "DBConnectorProtocol" and "DBTableSource". All string quotes have been removed from type annotations throughout the file.
| system_tag_columns: Collection[str] = (), | ||
| record_id_column: str | None = None, | ||
| source_id: str | None = None, | ||
| **kwargs: Any, |
There was a problem hiding this comment.
favor explicitly listing out parameters that'd be passed to superclass rather than resorting to untyped kwargs
There was a problem hiding this comment.
Fixed in commit c23eaa4. Replaced **kwargs: Any with the three explicit parameters from RootSource.__init__: label: str | None = None, data_context: str | contexts.DataContext | None = None, and config: Config | None = None. The super().__init__() call now passes each one by name. The contexts and Config types are imported under TYPE_CHECKING so there is no runtime overhead. The docstring is updated accordingly.
| pc = LazyModule("pyarrow.compute") | ||
|
|
||
|
|
||
| def _arrow_schema_to_column_infos(schema: "pa.Schema") -> list[ColumnInfo]: |
There was a problem hiding this comment.
drop quotes around type hint as we already import annotations from future
There was a problem hiding this comment.
Fixed in commit c23eaa4. All quoted pa.* hints (e.g. "pa.Schema", "pa.Table", "pa.Table | None", "pa.RecordBatch") and the "ConnectorArrowDatabase" return annotation have been unquoted. The file already imported from __future__ import annotations so no quotes are needed anywhere.
| # ── Record-ID column helpers ────────────────────────────────────────────── | ||
|
|
||
| def _ensure_record_id_column( | ||
| self, arrow_data: "pa.Table", record_id: str |
There was a problem hiding this comment.
drop quotes around type hint. Make sure to consistently apply this rule everywhere
There was a problem hiding this comment.
Fixed in commit c23eaa4 — same as above. All quoted pa.* type hints throughout connector_arrow_database.py have been stripped of their quotes.
|
|
||
| # Re-export connector abstractions so callers can import everything DB-related | ||
| # from one place: ``from orcapod.protocols.database_protocols import ...`` | ||
| from orcapod.protocols.db_connector_protocol import ColumnInfo, DBConnectorProtocol # noqa: E402 |
There was a problem hiding this comment.
why is noqa: E402 necessary?
There was a problem hiding this comment.
Fixed in commit c23eaa4. The noqa: E402 suppression was needed because we added the re-export import at the bottom of the file (after class definitions), which triggers the E402 "module-level import not at top of file" lint rule. The fix was to move the import to the top of the file and add from __future__ import annotations while we were there. The suppression comment is gone.
| import pyarrow as pa | ||
|
|
||
|
|
||
| @dataclass(frozen=True) |
There was a problem hiding this comment.
Move concrete type definitions like this to orcapod.types for consistency
There was a problem hiding this comment.
Fixed in commit c23eaa4. ColumnInfo has been moved to orcapod/types.py alongside the other concrete type definitions (Schema, ContentHash, ColumnConfig, etc.). db_connector_protocol.py now imports and re-exports it from orcapod.types so that existing from orcapod.protocols.db_connector_protocol import ColumnInfo imports keep working, but the canonical definition is in types.py. Since types.py already does import pyarrow as pa unconditionally, the pa.DataType field annotation needs no quotes there.
There was a problem hiding this comment.
Actually that's a design issue -- we don't want to import pyarrow unconditionally to minimize import speed. Conditionally import pyarrow in types.py
There was a problem hiding this comment.
Fixed in commit d3195e9. The unconditional import pyarrow as pa in types.py has been replaced with the LazyModule pattern used across the codebase:
if TYPE_CHECKING:
import pyarrow as pa
else:
pa = LazyModule("pyarrow")The ColumnInfo.arrow_type: pa.DataType annotation is already a no-op string at runtime thanks to from __future__ import annotations.
The two module-level dicts (_PYTHON_TO_ARROW, _ARROW_TO_PYTHON) also had to change — they called pa.int64(), pa.float64() etc. at import time, which would immediately trigger the real import. They've been replaced with @functools.cache functions (_python_to_arrow() / _arrow_to_python()) that build the mappings lazily on first access.
- Move ColumnInfo to orcapod.types (consistent with other concrete type definitions); db_connector_protocol re-exports it for backward compat - Add from __future__ import annotations to database_protocols.py and drop all quoted type hints throughout new files - Move re-export import to top of database_protocols.py, eliminating the noqa: E402 suppression - Replace **kwargs in DBTableSource.__init__ with explicit label, data_context, and config parameters matching RootSource signature - Update test imports to use ColumnInfo from orcapod.types
eywalker review round — changes summary (commit c23eaa4)All six review comments have been addressed. Here's a summary of every change made in this round: 1.
|
Replace the unconditional `import pyarrow as pa` with the LazyModule
pattern already used across the codebase (`LazyModule("pyarrow")` at
runtime, real import under `TYPE_CHECKING`). Since `from __future__
import annotations` is in place, the `pa.DataType` annotation on
`ColumnInfo` is a no-op string at import time.
The two module-level dicts (`_PYTHON_TO_ARROW`, `_ARROW_TO_PYTHON`)
called `pa.*()` at module load time, which would defeat laziness. They
are replaced by `@functools.cache` functions (`_python_to_arrow()` /
`_arrow_to_python()`) that build the mappings on first access.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Review round 3 — changes summary (commit d3195e9)Conditional pyarrow import in
|
These module-level dicts were unused (no references anywhere in the codebase) and were the only reason pyarrow needed to be called at module load time. Removing them simplifies the lazy-import change. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Closes PLT-1078 — design spike: clean up Database/Source interface.
Introduces a three-layer abstraction so each DB technology (SQLite, PostgreSQL, SpiralDB) only needs to implement one class (
DBConnectorProtocol) to power both theArrowDatabaseProtocollayer (read+write memoisation) and theSourcelayer (read-only pipeline ingestion).New files
src/orcapod/protocols/db_connector_protocol.py—ColumnInfodataclass +@runtime_checkable DBConnectorProtocolsrc/orcapod/databases/connector_arrow_database.py—ConnectorArrowDatabasewith pending-batch/flush semantics,record_path→table_namemapping, within-batch deduplicationsrc/orcapod/core/sources/db_table_source.py—DBTableSource, defaults PK columns → tag columnsUpdated files
src/orcapod/protocols/database_protocols.py— re-exportsColumnInfo,DBConnectorProtocol; adds__all__src/orcapod/databases/__init__.py— exportsConnectorArrowDatabase; inline comments pointing to PLT-1074/1075/1076src/orcapod/core/sources/__init__.py— exportsDBTableSourceKey design decisions
typing.Protocol— structural subtyping, no import couplingDBTableSource(connector, table_name)upsert_records(skip_existing)hides SQL dialect differencesConnectorArrowDatabase(mirrors existing impls)ValueErroron mismatchUnblocks
DBTableSource(connector, table_name))Test plan
ConnectorArrowDatabasesatisfiesArrowDatabaseProtocol(isinstancecheck)MockDBConnectorsatisfiesDBConnectorProtocol(isinstancecheck)DBTableSourcesatisfiesSourceProtocol,StreamProtocol,PipelineElementProtocolto_configround-trip;from_configraisesNotImplementedErrorcontent_hashandpipeline_hash🤖 Generated with Claude Code