Skip to content

feat: DBConnectorProtocol + ConnectorArrowDatabase + DBTableSource (PLT-1078)#91

Merged
eywalker merged 8 commits intodevfrom
eywalker/plt-1078-design-spike-clean-up-databasesource-interface-to-streamline
Mar 21, 2026
Merged

feat: DBConnectorProtocol + ConnectorArrowDatabase + DBTableSource (PLT-1078)#91
eywalker merged 8 commits intodevfrom
eywalker/plt-1078-design-spike-clean-up-databasesource-interface-to-streamline

Conversation

@agent-kurouto
Copy link

@agent-kurouto agent-kurouto bot commented Mar 21, 2026

Summary

Closes PLT-1078 — design spike: clean up Database/Source interface.

Introduces a three-layer abstraction so each DB technology (SQLite, PostgreSQL, SpiralDB) only needs to implement one class (DBConnectorProtocol) to power both the ArrowDatabaseProtocol layer (read+write memoisation) and the Source layer (read-only pipeline ingestion).

DBConnectorProtocol           ← one class per DB technology
       │
       ├── ConnectorArrowDatabase  ← generic ArrowDatabaseProtocol (read+write)
       └── DBTableSource           ← generic read-only RootSource

New files

  • src/orcapod/protocols/db_connector_protocol.pyColumnInfo dataclass + @runtime_checkable DBConnectorProtocol
  • src/orcapod/databases/connector_arrow_database.pyConnectorArrowDatabase with pending-batch/flush semantics, record_path→table_name mapping, within-batch deduplication
  • src/orcapod/core/sources/db_table_source.pyDBTableSource, defaults PK columns → tag columns

Updated files

  • src/orcapod/protocols/database_protocols.py — re-exports ColumnInfo, DBConnectorProtocol; adds __all__
  • src/orcapod/databases/__init__.py — exports ConnectorArrowDatabase; inline comments pointing to PLT-1074/1075/1076
  • src/orcapod/core/sources/__init__.py — exports DBTableSource

Key design decisions

Question Decision
Protocol vs ABC? typing.Protocol — structural subtyping, no import coupling
Generic source vs per-DB subclasses? Single DBTableSource(connector, table_name)
Type mapping ownership? Connector — callers always see Arrow types
Upsert abstraction? upsert_records(skip_existing) hides SQL dialect differences
Pending-batch location? Python-side in ConnectorArrowDatabase (mirrors existing impls)
Schema evolution? Out of scope — ValueError on mismatch

Unblocks

  • PLT-1074 (SpiralDBConnector)
  • PLT-1075 (PostgreSQLConnector)
  • PLT-1076 (SQLiteConnector)
  • PLT-1072, PLT-1073, PLT-1077 (DB-backed Sources — just DBTableSource(connector, table_name))

Test plan

  • ConnectorArrowDatabase satisfies ArrowDatabaseProtocol (isinstance check)
  • MockDBConnector satisfies DBConnectorProtocol (isinstance check)
  • Full add/get/flush/skip-duplicates/dedup behaviour via mock connector
  • DBTableSource satisfies SourceProtocol, StreamProtocol, PipelineElementProtocol
  • PK columns used as default tag columns; explicit override works
  • Missing table / empty table / no-PK errors raised with distinct messages
  • to_config round-trip; from_config raises NotImplementedError
  • Deterministic content_hash and pipeline_hash
  • 123 new tests all green; 2384-test full suite zero regressions

🤖 Generated with Claude Code

@codecov
Copy link

codecov bot commented Mar 21, 2026

Codecov Report

❌ Patch coverage is 90.35714% with 27 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/orcapod/protocols/db_connector_protocol.py 62.50% 12 Missing ⚠️
src/orcapod/databases/connector_arrow_database.py 95.23% 9 Missing ⚠️
src/orcapod/core/sources/db_table_source.py 88.37% 5 Missing ⚠️
src/orcapod/types.py 87.50% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new three-layer database abstraction (DBConnectorProtocolConnectorArrowDatabase / DBTableSource) to reduce duplicated DB-specific logic across ArrowDatabase backends and Source ingestion.

Changes:

  • Adds DBConnectorProtocol (+ ColumnInfo) as the single per-DB-technology interface for schema introspection, Arrow-typed reads, and upserts.
  • Implements ConnectorArrowDatabase (generic ArrowDatabaseProtocol) and DBTableSource (generic read-only RootSource) on top of any connector.
  • Adds comprehensive test coverage for connector/database/source behavior and protocol conformance, plus design/spec documentation.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
src/orcapod/protocols/db_connector_protocol.py Defines ColumnInfo + DBConnectorProtocol contract for DB technologies.
src/orcapod/protocols/database_protocols.py Re-exports connector abstractions and defines __all__.
src/orcapod/databases/connector_arrow_database.py Implements generic ArrowDatabase wrapper around a connector (pending batches + flush).
src/orcapod/databases/__init__.py Exports ConnectorArrowDatabase and updates module comments.
src/orcapod/core/sources/db_table_source.py Implements DB-backed RootSource that defaults PK columns to tag columns.
src/orcapod/core/sources/__init__.py Exports DBTableSource.
tests/test_databases/test_connector_arrow_database.py New end-to-end tests for connector protocol + ConnectorArrowDatabase.
tests/test_core/sources/test_db_table_source.py New tests for DBTableSource construction, behavior, hashing, config.
superpowers/specs/2026-03-21-db-connector-design.md Design spec for the abstraction and intended semantics.
superpowers/plans/2026-03-21-db-connector-plan.md Implementation plan for the spike and follow-ups.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +219 to +224
committed = self._get_committed_table(record_path)
committed_ids: set[str] = set()
if committed is not None:
committed_ids = set(
cast(list[str], committed[self.RECORD_ID_COLUMN].to_pylist())
)
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip_duplicates=True currently triggers _get_committed_table() and materializes committed_ids by reading the full committed table (SELECT *). For large tables this is O(n) per write and can be a major bottleneck. If possible, avoid full-table reads here (e.g., query only the id column, cache known ids, or defer duplicate handling to upsert_records(skip_existing=True) at flush time).

Suggested change
committed = self._get_committed_table(record_path)
committed_ids: set[str] = set()
if committed is not None:
committed_ids = set(
cast(list[str], committed[self.RECORD_ID_COLUMN].to_pylist())
)
# Lazily initialize cache for committed record IDs to avoid
# repeatedly scanning the full committed table for each write.
if not hasattr(self, "_committed_record_ids_cache"):
# Mapping: record_key -> set of committed record IDs
self._committed_record_ids_cache = {}
committed_ids = self._committed_record_ids_cache.get(record_key)
if committed_ids is None:
committed = self._get_committed_table(record_path)
committed_ids = set()
if committed is not None:
committed_ids = set(
cast(list[str], committed[self.RECORD_ID_COLUMN].to_pylist())
)
self._committed_record_ids_cache[record_key] = committed_ids

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit ecf08c1 (Copilot review). The O(n) committed-table scan was removed entirely for skip_duplicates=True. Instead, we now only check for conflicts in the in-flight pending batch (which is already O(k) where k = pending row count), and we track a per-batch _pending_skip_existing flag. At flush() time that flag is passed straight through to connector.upsert_records(skip_existing=True), so each connector can use its native INSERT-OR-IGNORE (or equivalent) without any Python-side full-table read.

Comment on lines +274 to +279
self._connector.upsert_records(
table_name,
pending,
id_column=self.RECORD_ID_COLUMN,
skip_existing=False,
)
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() always calls self._connector.upsert_records(..., skip_existing=False), even for batches added with skip_duplicates=True. This prevents connector implementations from using native "INSERT ... DO NOTHING" semantics and forces duplicate handling into Python-side prefiltering. Consider plumbing a per-batch skip_existing flag through pending state and passing it to upsert_records so connectors can handle duplicates efficiently.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit ecf08c1 (Copilot review). Added _pending_skip_existing: dict[str, bool] to __init__. When add_records(skip_duplicates=True) is called, the flag is set to True for that batch's record key. flush() then pops the flag and passes it as skip_existing to upsert_records, so connectors can use native INSERT-OR-IGNORE semantics. Defaults to False (replace) for all batches added without skip_duplicates.

Comment on lines +270 to +274
columns = _arrow_schema_to_column_infos(pending.schema)
self._connector.create_table_if_not_exists(
table_name, columns, pk_column=self.RECORD_ID_COLUMN
)
self._connector.upsert_records(
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() creates the table if missing, but it doesn't validate that pending.schema matches the existing table schema before calling upsert_records. The PR description/spec says schema evolution is out of scope and a schema mismatch should raise ValueError; without an explicit check, the connector may fail later with a DB-specific error (or partially write). Consider comparing pending.schema against connector.get_column_info(table_name) (or similar) and raising a clear ValueError on mismatch.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit ecf08c1 (Copilot review). flush() now calls connector.get_table_names() before create_table_if_not_exists. If the table already exists, it compares column name→Arrow-type mappings from connector.get_column_info(table_name) against the pending schema. A clear ValueError is raised on any mismatch, with both column lists in the message. This fires before any write, so no partial data is written.

Comment on lines +264 to +266
for record_key in list(self._pending_batches.keys()):
record_path = tuple(record_key.split("/"))
table_name = self._path_to_table_name(record_path)
Copy link

Copilot AI Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() reconstructs record_path via tuple(record_key.split("/")), but record_key is built using "/".join(record_path). If any path component contains "/" (or if different paths join to the same string), flush will write to the wrong table and/or merge unrelated pending batches. Use record_path (the tuple) as the dict key instead of a joined string, or mirror InMemoryArrowDatabase._validate_record_path by rejecting unsafe characters like "/" and "\0" in path components.

Copilot uses AI. Check for mistakes.
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit ecf08c1 (Copilot review). Added '/' in component or '\\0' in component checks to _validate_record_path, mirroring InMemoryArrowDatabase. This ensures that _get_record_key's "/".join(record_path) and flush()'s record_key.split("/") always round-trip correctly. A new test test_path_component_with_slash_raises and test_path_component_with_null_byte_raises verify this.


def __init__(
self,
connector: "DBConnectorProtocol",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use from __future__ import annotations so that you can drop quotes around type hints everywhere

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit c23eaa4. db_table_source.py already had from __future__ import annotations but still used quoted hints like "DBConnectorProtocol" and "DBTableSource". All string quotes have been removed from type annotations throughout the file.

system_tag_columns: Collection[str] = (),
record_id_column: str | None = None,
source_id: str | None = None,
**kwargs: Any,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

favor explicitly listing out parameters that'd be passed to superclass rather than resorting to untyped kwargs

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit c23eaa4. Replaced **kwargs: Any with the three explicit parameters from RootSource.__init__: label: str | None = None, data_context: str | contexts.DataContext | None = None, and config: Config | None = None. The super().__init__() call now passes each one by name. The contexts and Config types are imported under TYPE_CHECKING so there is no runtime overhead. The docstring is updated accordingly.

pc = LazyModule("pyarrow.compute")


def _arrow_schema_to_column_infos(schema: "pa.Schema") -> list[ColumnInfo]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop quotes around type hint as we already import annotations from future

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit c23eaa4. All quoted pa.* hints (e.g. "pa.Schema", "pa.Table", "pa.Table | None", "pa.RecordBatch") and the "ConnectorArrowDatabase" return annotation have been unquoted. The file already imported from __future__ import annotations so no quotes are needed anywhere.

# ── Record-ID column helpers ──────────────────────────────────────────────

def _ensure_record_id_column(
self, arrow_data: "pa.Table", record_id: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop quotes around type hint. Make sure to consistently apply this rule everywhere

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit c23eaa4 — same as above. All quoted pa.* type hints throughout connector_arrow_database.py have been stripped of their quotes.


# Re-export connector abstractions so callers can import everything DB-related
# from one place: ``from orcapod.protocols.database_protocols import ...``
from orcapod.protocols.db_connector_protocol import ColumnInfo, DBConnectorProtocol # noqa: E402
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is noqa: E402 necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit c23eaa4. The noqa: E402 suppression was needed because we added the re-export import at the bottom of the file (after class definitions), which triggers the E402 "module-level import not at top of file" lint rule. The fix was to move the import to the top of the file and add from __future__ import annotations while we were there. The suppression comment is gone.

import pyarrow as pa


@dataclass(frozen=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move concrete type definitions like this to orcapod.types for consistency

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit c23eaa4. ColumnInfo has been moved to orcapod/types.py alongside the other concrete type definitions (Schema, ContentHash, ColumnConfig, etc.). db_connector_protocol.py now imports and re-exports it from orcapod.types so that existing from orcapod.protocols.db_connector_protocol import ColumnInfo imports keep working, but the canonical definition is in types.py. Since types.py already does import pyarrow as pa unconditionally, the pa.DataType field annotation needs no quotes there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually that's a design issue -- we don't want to import pyarrow unconditionally to minimize import speed. Conditionally import pyarrow in types.py

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit d3195e9. The unconditional import pyarrow as pa in types.py has been replaced with the LazyModule pattern used across the codebase:

if TYPE_CHECKING:
    import pyarrow as pa
else:
    pa = LazyModule("pyarrow")

The ColumnInfo.arrow_type: pa.DataType annotation is already a no-op string at runtime thanks to from __future__ import annotations.

The two module-level dicts (_PYTHON_TO_ARROW, _ARROW_TO_PYTHON) also had to change — they called pa.int64(), pa.float64() etc. at import time, which would immediately trigger the real import. They've been replaced with @functools.cache functions (_python_to_arrow() / _arrow_to_python()) that build the mappings lazily on first access.

agent-kurouto bot added 2 commits March 21, 2026 09:20
- Move ColumnInfo to orcapod.types (consistent with other concrete type
  definitions); db_connector_protocol re-exports it for backward compat
- Add from __future__ import annotations to database_protocols.py and
  drop all quoted type hints throughout new files
- Move re-export import to top of database_protocols.py, eliminating
  the noqa: E402 suppression
- Replace **kwargs in DBTableSource.__init__ with explicit label,
  data_context, and config parameters matching RootSource signature
- Update test imports to use ColumnInfo from orcapod.types
@agent-kurouto
Copy link
Author

agent-kurouto bot commented Mar 21, 2026

eywalker review round — changes summary (commit c23eaa4)

All six review comments have been addressed. Here's a summary of every change made in this round:

1. ColumnInfo moved to orcapod/types.py

ColumnInfo now lives alongside the other concrete type definitions (ContentHash, etc.) in orcapod/types.py. The original location (orcapod/protocols/db_connector_protocol.py) re-exports it for full backward compatibility:

from orcapod.types import ColumnInfo  # re-exported for backward compat

Test imports in both test_connector_arrow_database.py and test_db_table_source.py updated to from orcapod.types import ColumnInfo.

2. from __future__ import annotations + quoted hint removal

Added from __future__ import annotations to all new/modified files (db_connector_protocol.py, database_protocols.py, connector_arrow_database.py, db_table_source.py). All "pa.RecordBatch", "pa.Table", "ConnectorArrowDatabase", and other quoted type hints have been removed — they're now unquoted throughout.

3. Re-export moved to top of database_protocols.py

# before (bottom of file, with noqa comment)
from orcapod.protocols.db_connector_protocol import ColumnInfo, DBConnectorProtocol  # noqa: E402

# after (top of file, alongside other imports)
from orcapod.protocols.db_connector_protocol import ColumnInfo, DBConnectorProtocol

The # noqa: E402 comment is gone.

4. DBTableSource.__init__ — explicit params replacing **kwargs

# before
def __init__(self, connector, table_name, tag_columns=None, ..., **kwargs): ...

# after
def __init__(
    self,
    connector: DBConnectorProtocol,
    table_name: str,
    tag_columns: Collection[str] | None = None,
    system_tag_columns: Collection[str] = (),
    record_id_column: str | None = None,
    source_id: str | None = None,
    label: str | None = None,
    data_context: str | contexts.DataContext | None = None,
    config: Config | None = None,
) -> None: ...

contexts and Config are imported under TYPE_CHECKING to keep the lazy-import pattern consistent with the rest of the codebase.

5. __all__ added to database_protocols.py

__all__ = [
    "ArrowDatabaseProtocol",
    "ArrowDatabaseWithMetadataProtocol",
    "ColumnInfo",
    "DBConnectorProtocol",
    "MetadataCapableProtocol",
]

All 2388 tests pass (including 127 ConnectorArrowDatabase tests and 47 DBTableSource tests). No behaviour changes — this was a pure style/structure refactor.

Replace the unconditional `import pyarrow as pa` with the LazyModule
pattern already used across the codebase (`LazyModule("pyarrow")` at
runtime, real import under `TYPE_CHECKING`). Since `from __future__
import annotations` is in place, the `pa.DataType` annotation on
`ColumnInfo` is a no-op string at import time.

The two module-level dicts (`_PYTHON_TO_ARROW`, `_ARROW_TO_PYTHON`)
called `pa.*()` at module load time, which would defeat laziness. They
are replaced by `@functools.cache` functions (`_python_to_arrow()` /
`_arrow_to_python()`) that build the mappings on first access.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@agent-kurouto
Copy link
Author

agent-kurouto bot commented Mar 21, 2026

Review round 3 — changes summary (commit d3195e9)

Conditional pyarrow import in orcapod/types.py

Problem: import pyarrow as pa was unconditional at module level, slowing import time for any code that touches orcapod.types.

Fix:

  1. Replaced the unconditional import with the LazyModule pattern (LazyModule("pyarrow") at runtime, real import under TYPE_CHECKING)
  2. The ColumnInfo.arrow_type: pa.DataType annotation needs no runtime import — from __future__ import annotations makes it a string
  3. The two module-level dicts (_PYTHON_TO_ARROW, _ARROW_TO_PYTHON) called pa.int64() etc. at module load time, which would trigger the import immediately. Replaced them with @functools.cache functions (_python_to_arrow() / _arrow_to_python()) that build the mappings lazily on first access

All 2425 tests pass.

These module-level dicts were unused (no references anywhere in the
codebase) and were the only reason pyarrow needed to be called at
module load time. Removing them simplifies the lazy-import change.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@eywalker eywalker merged commit 1ac63cc into dev Mar 21, 2026
5 checks passed
@eywalker eywalker deleted the eywalker/plt-1078-design-spike-clean-up-databasesource-interface-to-streamline branch March 21, 2026 10:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants