From b631b0c2e885c9741141b2e0429c2107171b8cf8 Mon Sep 17 00:00:00 2001 From: Polina Binder Date: Sun, 18 Jan 2026 16:06:17 -0800 Subject: [PATCH 1/7] partitioning SCDL --- sub-packages/bionemo-scdl/pyproject.toml | 1 + .../bionemo/scdl/scripts/partition_scdl.py | 303 ++++++++++++++++++ .../tests/bionemo/scdl/scripts/__init__.py | 14 + .../scdl/scripts/test_partition_scdl.py | 99 ++++++ 4 files changed, 417 insertions(+) create mode 100644 sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py create mode 100644 sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py create mode 100644 sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py diff --git a/sub-packages/bionemo-scdl/pyproject.toml b/sub-packages/bionemo-scdl/pyproject.toml index 242534a5cd..dbbf856101 100644 --- a/sub-packages/bionemo-scdl/pyproject.toml +++ b/sub-packages/bionemo-scdl/pyproject.toml @@ -33,6 +33,7 @@ test = [ [project.scripts] convert_h5ad_to_scdl = "bionemo.scdl.scripts.convert_h5ad_to_scdl:main" +partition-scdl = "bionemo.scdl.scripts.partition_scdl:main" [tool.setuptools.packages.find] where = ["src"] diff --git a/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py b/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py new file mode 100644 index 0000000000..59fde799e8 --- /dev/null +++ b/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py @@ -0,0 +1,303 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-Apache2 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Partition a monolithic SCDL dataset into chunks for efficient remote access. + +This script takes an existing SCDL dataset and splits it into smaller chunks, +each containing a subset of rows. The chunked format enables: +- Efficient remote storage access (fetch only needed chunks) +- Local caching with LRU eviction +- Parallel prefetching during training + +The script handles large files (5TB+) efficiently by: +- Using streaming binary I/O to avoid loading entire arrays into memory +- Processing data in configurable buffer sizes +- Creating raw memmap files consistent with SCDL format + +Usage: + partition-scdl --input /path/to/scdl --output /path/to/chunked --chunk-size 100000 +""" + +import argparse +import json +import logging +import shutil +from dataclasses import asdict, dataclass +from pathlib import Path + +import numpy as np + +from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset +from bionemo.scdl.util.scdl_constants import FileNames + + +logger = logging.getLogger(__name__) + + +@dataclass +class ChunkedSCDLMetadata: + """ChunkedSCDLMetadata describing the chunked dataset structure. + + With uniform chunk sizes, finding a chunk is O(1): + chunk_id = global_idx // chunk_size + local_idx = global_idx % chunk_size + """ + + version: str + total_rows: int + chunk_size: int + num_chunks: int + dtypes: dict # Maps array name to dtype string (e.g., {"data": "float32", "rowptr": "uint64"}) + + def save(self, path: Path) -> None: + """Save manifest to JSON file.""" + with open(path, "w") as f: + json.dump(asdict(self), f, indent=2) + + @classmethod + def load(cls, path: Path) -> "ChunkedSCDLMetadata": + """Load manifest from JSON file.""" + with open(path) as f: + data = json.load(f) + return cls(**data) + + def get_chunk_for_row(self, global_idx: int) -> tuple[int, int]: + """Return (chunk_id, local_idx) for a global row index.""" + if global_idx < 0 or global_idx >= self.total_rows: + raise IndexError(f"Row index {global_idx} out of range [0, {self.total_rows})") + chunk_id = global_idx // self.chunk_size + local_idx = global_idx % self.chunk_size + return chunk_id, local_idx + + +def partition_scdl( + input_path: Path, + output_path: Path, + chunk_size: int = 100_000, + buffer_elements: int = 10 * 1024 * 1024, # ~10M elements per buffer + delete_original: bool = False, +) -> ChunkedSCDLMetadata: + """Partition an SCDL dataset into chunks. + + Uses streaming binary I/O to handle large files (5TB+) efficiently without + loading entire arrays into memory. + + Args: + input_path: Path to existing monolithic SCDL dataset. + output_path: Path where chunked dataset will be created. + chunk_size: Number of rows per chunk (default: 100,000). + buffer_elements: Number of elements to read/write per buffer (default: ~10M). + delete_original: If True, delete the original dataset after successful partitioning. + + Returns: + ChunkedSCDLMetadata describing the chunked dataset. + + Raises: + FileNotFoundError: If input_path doesn't exist or is missing required files. + FileExistsError: If output_path already exists. + """ + input_path = Path(input_path) + output_path = Path(output_path) + + # Validate input + if not input_path.exists(): + raise FileNotFoundError(f"Input path does not exist: {input_path}") + + required_files = [FileNames.DATA.value, FileNames.ROWPTR.value, FileNames.COLPTR.value] + for fname in required_files: + if not (input_path / fname).exists(): + raise FileNotFoundError(f"Missing required file: {input_path / fname}") + + if output_path.exists(): + raise FileExistsError(f"Output path already exists: {output_path}") + + output_path.mkdir(parents=True) + + # Load the source SCDL dataset (uses memmaps, doesn't load data into RAM) + source_ds = SingleCellMemMapDataset(str(input_path)) + + # Extract dtypes from loaded dataset + dtypes = { + "data": source_ds.dtypes[FileNames.DATA.value], + "rowptr": source_ds.dtypes[FileNames.ROWPTR.value], + "colptr": source_ds.dtypes[FileNames.COLPTR.value], + } + + # Get dimensions from the loaded dataset + total_rows = len(source_ds) + rowptr = source_ds.row_index # Already loaded as memmap + + num_chunks = (total_rows + chunk_size - 1) // chunk_size # Ceiling division + + rowptr_dtype = np.dtype(dtypes["rowptr"]) + data_dtype = np.dtype(dtypes["data"]) + colptr_dtype = np.dtype(dtypes["colptr"]) + + # Create chunks using streaming binary I/O + for chunk_id in range(num_chunks): + row_start = chunk_id * chunk_size + row_end = min(row_start + chunk_size, total_rows) + chunk_dir = output_path / f"chunk_{chunk_id:05d}" + chunk_dir.mkdir() + + # Calculate data slice for this chunk + data_start = int(rowptr[row_start]) + data_end = int(rowptr[row_end]) + num_elements = data_end - data_start + + # Create adjusted row pointers (relative to chunk start) + chunk_rowptr = (rowptr[row_start : row_end + 1] - data_start).astype(rowptr_dtype) + + # Write rowptr as raw binary (small enough to do in one go) + with open(chunk_dir / FileNames.ROWPTR.value, "wb") as f: + f.write(chunk_rowptr.tobytes()) + + # Stream data and colptr using buffered binary I/O + _stream_copy_slice( + input_path / FileNames.DATA.value, + chunk_dir / FileNames.DATA.value, + data_start, + num_elements, + data_dtype, + buffer_elements, + ) + + _stream_copy_slice( + input_path / FileNames.COLPTR.value, + chunk_dir / FileNames.COLPTR.value, + data_start, + num_elements, + colptr_dtype, + buffer_elements, + ) + + logger.info(f"Created chunk {chunk_id}: rows {row_start}-{row_end} ({num_elements} elements)") + + # Copy feature indices + _copy_global_features(input_path, output_path) + + # Copy other metadata files if they exist + for fname in [FileNames.VERSION.value, FileNames.HEADER.value, FileNames.METADATA.value]: + src = input_path / fname + if src.exists(): + shutil.copy(src, output_path / fname) + + # Create and save metadata + metadata = ChunkedSCDLMetadata( + version="1.0", + total_rows=total_rows, + chunk_size=chunk_size, + num_chunks=num_chunks, + dtypes=dtypes, + ) + metadata.save(output_path / "metadata.json") + + logger.info(f"Created {num_chunks} chunks from {total_rows} rows") + logger.info(f"ChunkedSCDLMetadata saved to: {output_path / 'metadata.json'}") + + # Delete original dataset if requested + if delete_original: + # Close memmap references before deleting + del source_ds + shutil.rmtree(input_path) + logger.info(f"Deleted original dataset: {input_path}") + + return metadata + + +def _stream_copy_slice( + src_path: Path, + dst_path: Path, + start_element: int, + num_elements: int, + dtype: np.dtype, + buffer_elements: int, +) -> None: + """Copy a slice of a binary array file using streaming I/O. + + Avoids loading the entire slice into memory by reading/writing in buffers. + + Args: + src_path: Source file path. + dst_path: Destination file path. + start_element: Starting element index in source. + num_elements: Number of elements to copy. + dtype: Numpy dtype of elements. + buffer_elements: Number of elements per buffer. + """ + itemsize = dtype.itemsize + start_byte = start_element * itemsize + + with open(src_path, "rb") as src, open(dst_path, "wb") as dst: + src.seek(start_byte) + elements_remaining = num_elements + + while elements_remaining > 0: + to_read = min(buffer_elements, elements_remaining) * itemsize + chunk = src.read(to_read) + if not chunk: + raise IOError(f"Unexpected EOF reading {src_path}") + dst.write(chunk) + elements_remaining -= len(chunk) // itemsize + + +def _copy_global_features(input_path: Path, output_path: Path) -> None: + """Copy global features (var_features and obs_features) across all chunks.""" + for name in [FileNames.VAR_FEATURES.value, FileNames.OBS_FEATURES.value]: + src = input_path / name + if src.exists(): + shutil.copytree(src, output_path / name) + logger.info(f"Copied {name} to output") + + +def main(): + """CLI entry point for partitioning SCDL datasets.""" + parser = argparse.ArgumentParser(description="Partition a monolithic SCDL dataset into chunks for remote access.") + parser.add_argument( + "--input", + type=str, + required=True, + help="Path to the input SCDL dataset.", + ) + parser.add_argument( + "--output", + type=str, + required=True, + help="Path where the chunked dataset will be created.", + ) + parser.add_argument( + "--chunk-size", + type=int, + default=100_000, + help="Number of rows per chunk (default: 100,000).", + ) + parser.add_argument( + "--delete-original", + action="store_true", + help="Delete the original dataset after successful partitioning.", + ) + + args = parser.parse_args() + + partition_scdl( + input_path=Path(args.input), + output_path=Path(args.output), + chunk_size=args.chunk_size, + delete_original=args.delete_original, + ) + + +if __name__ == "__main__": + main() diff --git a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py new file mode 100644 index 0000000000..25e6abfbc5 --- /dev/null +++ b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-Apache2 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py new file mode 100644 index 0000000000..a31a736086 --- /dev/null +++ b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py @@ -0,0 +1,99 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-Apache2 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for the SCDL partition script.""" + +import os + +import numpy as np +import pytest + +from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset +from bionemo.scdl.scripts.partition_scdl import ChunkedSCDLMetadata, partition_scdl +from bionemo.scdl.util.scdl_constants import FileNames + + +def _load_chunk_memmap(chunk_dir, fname, dtype): + """Load a raw binary memmap file from a chunk directory.""" + path = chunk_dir / fname + itemsize = np.dtype(dtype).itemsize + num_elements = os.path.getsize(path) // itemsize + return np.memmap(path, dtype=dtype, mode="r", shape=(num_elements,)) + + +@pytest.fixture +def partitioned_scdl(tmp_path, make_h5ad_with_raw): + """Create an SCDL dataset and partition it into chunks. + + Uses a fixed CHUNK_SIZE for clarity between tests and metadata calculation. + """ + CHUNK_SIZE = 50 + h5ad_path = make_h5ad_with_raw(tmp_path) + scdl_path = tmp_path / "scdl" + original = SingleCellMemMapDataset(scdl_path, h5ad_path=h5ad_path) + + chunked_path = tmp_path / "chunked" + metadata = partition_scdl(scdl_path, chunked_path, chunk_size=CHUNK_SIZE) + + return original, metadata, chunked_path, CHUNK_SIZE + + +def test_metadata_total_rows_matches_original(partitioned_scdl): + """Metadata total_rows matches number of rows in original dataset.""" + original, metadata, *_ = partitioned_scdl + assert metadata.total_rows == len(original) + + +def test_metadata_num_chunks_formula(partitioned_scdl): + """Metadata reports correct number of chunks using ceiling division. + + Ensures the calculation matches both the test and the script. + """ + original, metadata, _, CHUNK_SIZE = partitioned_scdl + expected_chunks = (len(original) + (CHUNK_SIZE - 1)) // CHUNK_SIZE + assert metadata.num_chunks == expected_chunks + + +def test_partition_row_data_correctness(partitioned_scdl): + """Partitioned chunk's row data matches original for all rows.""" + original, metadata, chunked_path, _ = partitioned_scdl + + for global_idx in range(metadata.total_rows): + chunk_id, local_idx = metadata.get_chunk_for_row(global_idx) + chunk_dir = chunked_path / f"chunk_{chunk_id:05d}" + + rowptr = _load_chunk_memmap(chunk_dir, FileNames.ROWPTR.value, metadata.dtypes["rowptr"]) + data = _load_chunk_memmap(chunk_dir, FileNames.DATA.value, metadata.dtypes["data"]) + colptr = _load_chunk_memmap(chunk_dir, FileNames.COLPTR.value, metadata.dtypes["colptr"]) + + (orig_vals, orig_cols), _, _ = original.get_row(global_idx) + chunk_vals = data[rowptr[local_idx] : rowptr[local_idx + 1]] + chunk_cols = colptr[rowptr[local_idx] : rowptr[local_idx + 1]] + + np.testing.assert_array_equal(orig_vals, chunk_vals) + np.testing.assert_array_equal(orig_cols, chunk_cols) + + +def test_metadata_save_load_roundtrip(partitioned_scdl, tmp_path): + """Metadata saves and loads correctly from disk.""" + _, metadata, _, _ = partitioned_scdl + metadata_path = tmp_path / "metadata.json" + metadata.save(metadata_path) + loaded = ChunkedSCDLMetadata.load(metadata_path) + assert loaded.total_rows == metadata.total_rows + assert loaded.chunk_size == metadata.chunk_size + assert loaded.num_chunks == metadata.num_chunks + assert loaded.dtypes == metadata.dtypes + assert loaded.version == metadata.version From 5a8950c7573fe4e5feb8f70d9c685cc8b11ae2da Mon Sep 17 00:00:00 2001 From: Polina Binder Date: Sun, 18 Jan 2026 16:53:06 -0800 Subject: [PATCH 2/7] some more code clean up --- .../src/bionemo/scdl/schema/header.py | 59 ++++++++++++- .../bionemo/scdl/scripts/partition_scdl.py | 88 ++++--------------- .../src/bionemo/scdl/util/scdl_constants.py | 1 + .../scdl/scripts/test_partition_scdl.py | 71 ++++++++------- 4 files changed, 117 insertions(+), 102 deletions(-) diff --git a/sub-packages/bionemo-scdl/src/bionemo/scdl/schema/header.py b/sub-packages/bionemo-scdl/src/bionemo/scdl/schema/header.py index 7ee31684bb..430dd32e8d 100644 --- a/sub-packages/bionemo-scdl/src/bionemo/scdl/schema/header.py +++ b/sub-packages/bionemo-scdl/src/bionemo/scdl/schema/header.py @@ -407,6 +407,38 @@ def __repr__(self) -> str: return self.__str__() +class ChunkedInfo: + """Chunking metadata for CHUNKED_MEMMAP backend.""" + + def __init__(self, chunk_size: int, num_chunks: int, total_rows: int): + """Initialize chunked info.""" + self.chunk_size = chunk_size + self.num_chunks = num_chunks + self.total_rows = total_rows + + def get_chunk_for_row(self, global_idx: int) -> Tuple[int, int]: + """Return (chunk_id, local_idx) for a global row index.""" + if global_idx < 0 or global_idx >= self.total_rows: + raise IndexError(f"Row index {global_idx} out of range [0, {self.total_rows})") + return global_idx // self.chunk_size, global_idx % self.chunk_size + + def serialize(self, codec: BinaryHeaderCodec) -> bytes: + """Serialize to binary format.""" + return ( + codec.pack_uint32(self.chunk_size) + + codec.pack_uint32(self.num_chunks) + + codec.pack_uint64(self.total_rows) + ) + + @classmethod + def deserialize(cls, codec: BinaryHeaderCodec, data: bytes, offset: int = 0) -> Tuple["ChunkedInfo", int]: + """Deserialize from binary data. Returns (ChunkedInfo, bytes_consumed).""" + chunk_size = codec.unpack_uint32(data[offset : offset + 4]) + num_chunks = codec.unpack_uint32(data[offset + 4 : offset + 8]) + total_rows = codec.unpack_uint64(data[offset + 8 : offset + 16]) + return cls(chunk_size=chunk_size, num_chunks=num_chunks, total_rows=total_rows), 16 + + class SCDLHeader: """Header for a SCDL archive following the official schema specification. @@ -423,6 +455,7 @@ def __init__( backend: Backend = Backend.MEMMAP_V0, arrays: Optional[List[ArrayInfo]] = None, feature_indices: Optional[List[FeatureIndexInfo]] = None, + chunked_info: Optional["ChunkedInfo"] = None, ): """Initialize SCDL header. @@ -431,12 +464,14 @@ def __init__( backend: Storage backend type arrays: List of arrays in the archive feature_indices: Optional list of feature indices in the archive + chunked_info: Optional chunking metadata for CHUNKED_MEMMAP backend """ self.version = version or CurrentSCDLVersion() self.endianness = Endianness.NETWORK # Always network byte order per spec self.backend = backend self.arrays = arrays or [] self.feature_indices = feature_indices or [] + self.chunked_info = chunked_info # Create codec with network byte order self._codec = BinaryHeaderCodec(self.endianness) @@ -525,6 +560,13 @@ def serialize(self) -> bytes: for feature_index in self.feature_indices: data += feature_index.serialize(self._codec) + # Chunked info (optional, for CHUNKED_MEMMAP backend) + if self.chunked_info is not None: + data += self._codec.pack_uint8(1) # has_chunked_info = true + data += self.chunked_info.serialize(self._codec) + else: + data += self._codec.pack_uint8(0) # has_chunked_info = false + return data except Exception as e: @@ -617,7 +659,22 @@ def deserialize(cls, data: bytes) -> "SCDLHeader": feature_indices.append(feature_index) offset += bytes_consumed - header = cls(version=version, backend=backend, arrays=arrays, feature_indices=feature_indices) + # Read chunked info (optional, for backwards compatibility) + chunked_info = None + if offset < len(data): + has_chunked_info = codec.unpack_uint8(data[offset : offset + 1]) + offset += 1 + if has_chunked_info: + chunked_info, bytes_consumed = ChunkedInfo.deserialize(codec, data, offset) + offset += bytes_consumed + + header = cls( + version=version, + backend=backend, + arrays=arrays, + feature_indices=feature_indices, + chunked_info=chunked_info, + ) return header except HeaderSerializationError: diff --git a/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py b/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py index 59fde799e8..1b19da5e59 100644 --- a/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py +++ b/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py @@ -31,64 +31,27 @@ """ import argparse -import json import logging import shutil -from dataclasses import asdict, dataclass from pathlib import Path import numpy as np from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset -from bionemo.scdl.util.scdl_constants import FileNames +from bionemo.scdl.schema.header import ChunkedInfo, SCDLHeader +from bionemo.scdl.util.scdl_constants import Backend, FileNames logger = logging.getLogger(__name__) -@dataclass -class ChunkedSCDLMetadata: - """ChunkedSCDLMetadata describing the chunked dataset structure. - - With uniform chunk sizes, finding a chunk is O(1): - chunk_id = global_idx // chunk_size - local_idx = global_idx % chunk_size - """ - - version: str - total_rows: int - chunk_size: int - num_chunks: int - dtypes: dict # Maps array name to dtype string (e.g., {"data": "float32", "rowptr": "uint64"}) - - def save(self, path: Path) -> None: - """Save manifest to JSON file.""" - with open(path, "w") as f: - json.dump(asdict(self), f, indent=2) - - @classmethod - def load(cls, path: Path) -> "ChunkedSCDLMetadata": - """Load manifest from JSON file.""" - with open(path) as f: - data = json.load(f) - return cls(**data) - - def get_chunk_for_row(self, global_idx: int) -> tuple[int, int]: - """Return (chunk_id, local_idx) for a global row index.""" - if global_idx < 0 or global_idx >= self.total_rows: - raise IndexError(f"Row index {global_idx} out of range [0, {self.total_rows})") - chunk_id = global_idx // self.chunk_size - local_idx = global_idx % self.chunk_size - return chunk_id, local_idx - - def partition_scdl( input_path: Path, output_path: Path, chunk_size: int = 100_000, buffer_elements: int = 10 * 1024 * 1024, # ~10M elements per buffer delete_original: bool = False, -) -> ChunkedSCDLMetadata: +) -> SCDLHeader: """Partition an SCDL dataset into chunks. Uses streaming binary I/O to handle large files (5TB+) efficiently without @@ -102,7 +65,7 @@ def partition_scdl( delete_original: If True, delete the original dataset after successful partitioning. Returns: - ChunkedSCDLMetadata describing the chunked dataset. + SCDLHeader with Backend.CHUNKED_MEMMAP_V0 and ChunkedInfo. Raises: FileNotFoundError: If input_path doesn't exist or is missing required files. @@ -127,23 +90,14 @@ def partition_scdl( # Load the source SCDL dataset (uses memmaps, doesn't load data into RAM) source_ds = SingleCellMemMapDataset(str(input_path)) - - # Extract dtypes from loaded dataset - dtypes = { - "data": source_ds.dtypes[FileNames.DATA.value], - "rowptr": source_ds.dtypes[FileNames.ROWPTR.value], - "colptr": source_ds.dtypes[FileNames.COLPTR.value], - } - - # Get dimensions from the loaded dataset total_rows = len(source_ds) - rowptr = source_ds.row_index # Already loaded as memmap + rowptr = source_ds.row_index + num_chunks = (total_rows + chunk_size - 1) // chunk_size - num_chunks = (total_rows + chunk_size - 1) // chunk_size # Ceiling division - - rowptr_dtype = np.dtype(dtypes["rowptr"]) - data_dtype = np.dtype(dtypes["data"]) - colptr_dtype = np.dtype(dtypes["colptr"]) + # Get dtypes from source + rowptr_dtype = np.dtype(source_ds.dtypes[FileNames.ROWPTR.value]) + data_dtype = np.dtype(source_ds.dtypes[FileNames.DATA.value]) + colptr_dtype = np.dtype(source_ds.dtypes[FileNames.COLPTR.value]) # Create chunks using streaming binary I/O for chunk_id in range(num_chunks): @@ -188,24 +142,20 @@ def partition_scdl( # Copy feature indices _copy_global_features(input_path, output_path) - # Copy other metadata files if they exist - for fname in [FileNames.VERSION.value, FileNames.HEADER.value, FileNames.METADATA.value]: + # Copy metadata files + for fname in [FileNames.VERSION.value, FileNames.METADATA.value]: src = input_path / fname if src.exists(): shutil.copy(src, output_path / fname) - # Create and save metadata - metadata = ChunkedSCDLMetadata( - version="1.0", - total_rows=total_rows, - chunk_size=chunk_size, - num_chunks=num_chunks, - dtypes=dtypes, - ) - metadata.save(output_path / "metadata.json") + # Copy original header and add chunked info + header = source_ds.header if source_ds.header else SCDLHeader() + header.backend = Backend.CHUNKED_MEMMAP_V0 + header.chunked_info = ChunkedInfo(chunk_size=chunk_size, num_chunks=num_chunks, total_rows=total_rows) + header.save(str(output_path / FileNames.HEADER.value)) logger.info(f"Created {num_chunks} chunks from {total_rows} rows") - logger.info(f"ChunkedSCDLMetadata saved to: {output_path / 'metadata.json'}") + logger.info(f"SCDLHeader saved to: {output_path / FileNames.HEADER.value}") # Delete original dataset if requested if delete_original: @@ -214,7 +164,7 @@ def partition_scdl( shutil.rmtree(input_path) logger.info(f"Deleted original dataset: {input_path}") - return metadata + return header def _stream_copy_slice( diff --git a/sub-packages/bionemo-scdl/src/bionemo/scdl/util/scdl_constants.py b/sub-packages/bionemo-scdl/src/bionemo/scdl/util/scdl_constants.py index 7e276326de..3cd5f0483b 100644 --- a/sub-packages/bionemo-scdl/src/bionemo/scdl/util/scdl_constants.py +++ b/sub-packages/bionemo-scdl/src/bionemo/scdl/util/scdl_constants.py @@ -163,6 +163,7 @@ class Backend(IntEnum): """ MEMMAP_V0 = 1 + CHUNKED_MEMMAP_V0 = 2 # Chunked memmap for large datasets with remote access support class Mode(str, Enum): diff --git a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py index a31a736086..7f9d339fdc 100644 --- a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py +++ b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py @@ -21,8 +21,9 @@ import pytest from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset -from bionemo.scdl.scripts.partition_scdl import ChunkedSCDLMetadata, partition_scdl -from bionemo.scdl.util.scdl_constants import FileNames +from bionemo.scdl.schema.header import SCDLHeader +from bionemo.scdl.scripts.partition_scdl import partition_scdl +from bionemo.scdl.util.scdl_constants import Backend, FileNames def _load_chunk_memmap(chunk_dir, fname, dtype): @@ -45,38 +46,42 @@ def partitioned_scdl(tmp_path, make_h5ad_with_raw): original = SingleCellMemMapDataset(scdl_path, h5ad_path=h5ad_path) chunked_path = tmp_path / "chunked" - metadata = partition_scdl(scdl_path, chunked_path, chunk_size=CHUNK_SIZE) + header = partition_scdl(scdl_path, chunked_path, chunk_size=CHUNK_SIZE) - return original, metadata, chunked_path, CHUNK_SIZE + return original, header, chunked_path, CHUNK_SIZE -def test_metadata_total_rows_matches_original(partitioned_scdl): - """Metadata total_rows matches number of rows in original dataset.""" - original, metadata, *_ = partitioned_scdl - assert metadata.total_rows == len(original) +def test_header_backend_is_chunked(partitioned_scdl): + """Header has CHUNKED_MEMMAP_V0 backend type.""" + _, header, *_ = partitioned_scdl + assert header.backend == Backend.CHUNKED_MEMMAP_V0 -def test_metadata_num_chunks_formula(partitioned_scdl): - """Metadata reports correct number of chunks using ceiling division. - - Ensures the calculation matches both the test and the script. - """ - original, metadata, _, CHUNK_SIZE = partitioned_scdl +def test_header_has_chunked_info(partitioned_scdl): + """Header has chunked_info with correct values.""" + original, header, _, CHUNK_SIZE = partitioned_scdl + assert header.chunked_info is not None + assert header.chunked_info.total_rows == len(original) + assert header.chunked_info.chunk_size == CHUNK_SIZE expected_chunks = (len(original) + (CHUNK_SIZE - 1)) // CHUNK_SIZE - assert metadata.num_chunks == expected_chunks + assert header.chunked_info.num_chunks == expected_chunks def test_partition_row_data_correctness(partitioned_scdl): """Partitioned chunk's row data matches original for all rows.""" - original, metadata, chunked_path, _ = partitioned_scdl + original, header, chunked_path, _ = partitioned_scdl + chunked_info = header.chunked_info + + # Get dtypes from header arrays (keys are enum names like "ROWPTR", not values like "row_ptr.npy") + dtype_map = {arr.name: arr.dtype.numpy_dtype_string for arr in header.arrays} - for global_idx in range(metadata.total_rows): - chunk_id, local_idx = metadata.get_chunk_for_row(global_idx) + for global_idx in range(chunked_info.total_rows): + chunk_id, local_idx = chunked_info.get_chunk_for_row(global_idx) chunk_dir = chunked_path / f"chunk_{chunk_id:05d}" - rowptr = _load_chunk_memmap(chunk_dir, FileNames.ROWPTR.value, metadata.dtypes["rowptr"]) - data = _load_chunk_memmap(chunk_dir, FileNames.DATA.value, metadata.dtypes["data"]) - colptr = _load_chunk_memmap(chunk_dir, FileNames.COLPTR.value, metadata.dtypes["colptr"]) + rowptr = _load_chunk_memmap(chunk_dir, FileNames.ROWPTR.value, dtype_map["ROWPTR"]) + data = _load_chunk_memmap(chunk_dir, FileNames.DATA.value, dtype_map["DATA"]) + colptr = _load_chunk_memmap(chunk_dir, FileNames.COLPTR.value, dtype_map["COLPTR"]) (orig_vals, orig_cols), _, _ = original.get_row(global_idx) chunk_vals = data[rowptr[local_idx] : rowptr[local_idx + 1]] @@ -86,14 +91,16 @@ def test_partition_row_data_correctness(partitioned_scdl): np.testing.assert_array_equal(orig_cols, chunk_cols) -def test_metadata_save_load_roundtrip(partitioned_scdl, tmp_path): - """Metadata saves and loads correctly from disk.""" - _, metadata, _, _ = partitioned_scdl - metadata_path = tmp_path / "metadata.json" - metadata.save(metadata_path) - loaded = ChunkedSCDLMetadata.load(metadata_path) - assert loaded.total_rows == metadata.total_rows - assert loaded.chunk_size == metadata.chunk_size - assert loaded.num_chunks == metadata.num_chunks - assert loaded.dtypes == metadata.dtypes - assert loaded.version == metadata.version +def test_header_save_load_roundtrip(partitioned_scdl): + """Header with ChunkedInfo saves and loads correctly from disk.""" + _, header, chunked_path, _ = partitioned_scdl + + # Load the header from the saved file + header_path = chunked_path / FileNames.HEADER.value + loaded = SCDLHeader.load(str(header_path)) + + assert loaded.backend == header.backend + assert loaded.chunked_info is not None + assert loaded.chunked_info.total_rows == header.chunked_info.total_rows + assert loaded.chunked_info.chunk_size == header.chunked_info.chunk_size + assert loaded.chunked_info.num_chunks == header.chunked_info.num_chunks From 6a9a73806610f0f8a37493f841b3ee6eaea895dd Mon Sep 17 00:00:00 2001 From: Polina Binder Date: Sun, 18 Jan 2026 17:02:49 -0800 Subject: [PATCH 3/7] simplifying the PR Signed-off-by: Polina Binder --- sub-packages/bionemo-scdl/pyproject.toml | 1 - .../bionemo/scdl/scripts/partition_scdl.py | 214 ++---------------- .../scdl/scripts/test_partition_scdl.py | 33 +-- 3 files changed, 27 insertions(+), 221 deletions(-) diff --git a/sub-packages/bionemo-scdl/pyproject.toml b/sub-packages/bionemo-scdl/pyproject.toml index dbbf856101..242534a5cd 100644 --- a/sub-packages/bionemo-scdl/pyproject.toml +++ b/sub-packages/bionemo-scdl/pyproject.toml @@ -33,7 +33,6 @@ test = [ [project.scripts] convert_h5ad_to_scdl = "bionemo.scdl.scripts.convert_h5ad_to_scdl:main" -partition-scdl = "bionemo.scdl.scripts.partition_scdl:main" [tool.setuptools.packages.find] where = ["src"] diff --git a/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py b/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py index 1b19da5e59..1affa2a596 100644 --- a/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py +++ b/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py @@ -13,25 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Partition a monolithic SCDL dataset into chunks for efficient remote access. +"""Partition a monolithic SCDL dataset into chunks.""" -This script takes an existing SCDL dataset and splits it into smaller chunks, -each containing a subset of rows. The chunked format enables: -- Efficient remote storage access (fetch only needed chunks) -- Local caching with LRU eviction -- Parallel prefetching during training - -The script handles large files (5TB+) efficiently by: -- Using streaming binary I/O to avoid loading entire arrays into memory -- Processing data in configurable buffer sizes -- Creating raw memmap files consistent with SCDL format - -Usage: - partition-scdl --input /path/to/scdl --output /path/to/chunked --chunk-size 100000 -""" - -import argparse -import logging import shutil from pathlib import Path @@ -42,212 +25,57 @@ from bionemo.scdl.util.scdl_constants import Backend, FileNames -logger = logging.getLogger(__name__) - - def partition_scdl( input_path: Path, output_path: Path, chunk_size: int = 100_000, - buffer_elements: int = 10 * 1024 * 1024, # ~10M elements per buffer - delete_original: bool = False, ) -> SCDLHeader: - """Partition an SCDL dataset into chunks. - - Uses streaming binary I/O to handle large files (5TB+) efficiently without - loading entire arrays into memory. - - Args: - input_path: Path to existing monolithic SCDL dataset. - output_path: Path where chunked dataset will be created. - chunk_size: Number of rows per chunk (default: 100,000). - buffer_elements: Number of elements to read/write per buffer (default: ~10M). - delete_original: If True, delete the original dataset after successful partitioning. - - Returns: - SCDLHeader with Backend.CHUNKED_MEMMAP_V0 and ChunkedInfo. - - Raises: - FileNotFoundError: If input_path doesn't exist or is missing required files. - FileExistsError: If output_path already exists. - """ - input_path = Path(input_path) - output_path = Path(output_path) + """Partition an SCDL dataset into chunks.""" + input_path, output_path = Path(input_path), Path(output_path) - # Validate input if not input_path.exists(): raise FileNotFoundError(f"Input path does not exist: {input_path}") - - required_files = [FileNames.DATA.value, FileNames.ROWPTR.value, FileNames.COLPTR.value] - for fname in required_files: - if not (input_path / fname).exists(): - raise FileNotFoundError(f"Missing required file: {input_path / fname}") - if output_path.exists(): raise FileExistsError(f"Output path already exists: {output_path}") output_path.mkdir(parents=True) - # Load the source SCDL dataset (uses memmaps, doesn't load data into RAM) + # Load source dataset source_ds = SingleCellMemMapDataset(str(input_path)) total_rows = len(source_ds) rowptr = source_ds.row_index num_chunks = (total_rows + chunk_size - 1) // chunk_size - # Get dtypes from source - rowptr_dtype = np.dtype(source_ds.dtypes[FileNames.ROWPTR.value]) - data_dtype = np.dtype(source_ds.dtypes[FileNames.DATA.value]) - colptr_dtype = np.dtype(source_ds.dtypes[FileNames.COLPTR.value]) - - # Create chunks using streaming binary I/O + # Create chunks for chunk_id in range(num_chunks): row_start = chunk_id * chunk_size row_end = min(row_start + chunk_size, total_rows) chunk_dir = output_path / f"chunk_{chunk_id:05d}" chunk_dir.mkdir() - # Calculate data slice for this chunk - data_start = int(rowptr[row_start]) - data_end = int(rowptr[row_end]) - num_elements = data_end - data_start + data_start, data_end = int(rowptr[row_start]), int(rowptr[row_end]) - # Create adjusted row pointers (relative to chunk start) - chunk_rowptr = (rowptr[row_start : row_end + 1] - data_start).astype(rowptr_dtype) - - # Write rowptr as raw binary (small enough to do in one go) + # Write chunk files using memmap slicing + chunk_rowptr = rowptr[row_start : row_end + 1] - data_start with open(chunk_dir / FileNames.ROWPTR.value, "wb") as f: - f.write(chunk_rowptr.tobytes()) - - # Stream data and colptr using buffered binary I/O - _stream_copy_slice( - input_path / FileNames.DATA.value, - chunk_dir / FileNames.DATA.value, - data_start, - num_elements, - data_dtype, - buffer_elements, - ) + f.write(chunk_rowptr.astype(source_ds.dtypes[FileNames.ROWPTR.value]).tobytes()) + with open(chunk_dir / FileNames.DATA.value, "wb") as f: + f.write(np.array(source_ds.data[data_start:data_end]).tobytes()) + with open(chunk_dir / FileNames.COLPTR.value, "wb") as f: + f.write(np.array(source_ds.col_index[data_start:data_end]).tobytes()) - _stream_copy_slice( - input_path / FileNames.COLPTR.value, - chunk_dir / FileNames.COLPTR.value, - data_start, - num_elements, - colptr_dtype, - buffer_elements, - ) - - logger.info(f"Created chunk {chunk_id}: rows {row_start}-{row_end} ({num_elements} elements)") - - # Copy feature indices - _copy_global_features(input_path, output_path) - - # Copy metadata files - for fname in [FileNames.VERSION.value, FileNames.METADATA.value]: - src = input_path / fname - if src.exists(): - shutil.copy(src, output_path / fname) + # Copy features and metadata + for name in [FileNames.VAR_FEATURES.value, FileNames.OBS_FEATURES.value]: + if (input_path / name).exists(): + shutil.copytree(input_path / name, output_path / name) + for name in [FileNames.VERSION.value, FileNames.METADATA.value]: + if (input_path / name).exists(): + shutil.copy(input_path / name, output_path / name) - # Copy original header and add chunked info + # Update header with chunked info header = source_ds.header if source_ds.header else SCDLHeader() header.backend = Backend.CHUNKED_MEMMAP_V0 header.chunked_info = ChunkedInfo(chunk_size=chunk_size, num_chunks=num_chunks, total_rows=total_rows) header.save(str(output_path / FileNames.HEADER.value)) - logger.info(f"Created {num_chunks} chunks from {total_rows} rows") - logger.info(f"SCDLHeader saved to: {output_path / FileNames.HEADER.value}") - - # Delete original dataset if requested - if delete_original: - # Close memmap references before deleting - del source_ds - shutil.rmtree(input_path) - logger.info(f"Deleted original dataset: {input_path}") - return header - - -def _stream_copy_slice( - src_path: Path, - dst_path: Path, - start_element: int, - num_elements: int, - dtype: np.dtype, - buffer_elements: int, -) -> None: - """Copy a slice of a binary array file using streaming I/O. - - Avoids loading the entire slice into memory by reading/writing in buffers. - - Args: - src_path: Source file path. - dst_path: Destination file path. - start_element: Starting element index in source. - num_elements: Number of elements to copy. - dtype: Numpy dtype of elements. - buffer_elements: Number of elements per buffer. - """ - itemsize = dtype.itemsize - start_byte = start_element * itemsize - - with open(src_path, "rb") as src, open(dst_path, "wb") as dst: - src.seek(start_byte) - elements_remaining = num_elements - - while elements_remaining > 0: - to_read = min(buffer_elements, elements_remaining) * itemsize - chunk = src.read(to_read) - if not chunk: - raise IOError(f"Unexpected EOF reading {src_path}") - dst.write(chunk) - elements_remaining -= len(chunk) // itemsize - - -def _copy_global_features(input_path: Path, output_path: Path) -> None: - """Copy global features (var_features and obs_features) across all chunks.""" - for name in [FileNames.VAR_FEATURES.value, FileNames.OBS_FEATURES.value]: - src = input_path / name - if src.exists(): - shutil.copytree(src, output_path / name) - logger.info(f"Copied {name} to output") - - -def main(): - """CLI entry point for partitioning SCDL datasets.""" - parser = argparse.ArgumentParser(description="Partition a monolithic SCDL dataset into chunks for remote access.") - parser.add_argument( - "--input", - type=str, - required=True, - help="Path to the input SCDL dataset.", - ) - parser.add_argument( - "--output", - type=str, - required=True, - help="Path where the chunked dataset will be created.", - ) - parser.add_argument( - "--chunk-size", - type=int, - default=100_000, - help="Number of rows per chunk (default: 100,000).", - ) - parser.add_argument( - "--delete-original", - action="store_true", - help="Delete the original dataset after successful partitioning.", - ) - - args = parser.parse_args() - - partition_scdl( - input_path=Path(args.input), - output_path=Path(args.output), - chunk_size=args.chunk_size, - delete_original=args.delete_original, - ) - - -if __name__ == "__main__": - main() diff --git a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py index 7f9d339fdc..aed28b4aa0 100644 --- a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py +++ b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py @@ -15,8 +15,6 @@ """Tests for the SCDL partition script.""" -import os - import numpy as np import pytest @@ -26,28 +24,15 @@ from bionemo.scdl.util.scdl_constants import Backend, FileNames -def _load_chunk_memmap(chunk_dir, fname, dtype): - """Load a raw binary memmap file from a chunk directory.""" - path = chunk_dir / fname - itemsize = np.dtype(dtype).itemsize - num_elements = os.path.getsize(path) // itemsize - return np.memmap(path, dtype=dtype, mode="r", shape=(num_elements,)) - - @pytest.fixture def partitioned_scdl(tmp_path, make_h5ad_with_raw): - """Create an SCDL dataset and partition it into chunks. - - Uses a fixed CHUNK_SIZE for clarity between tests and metadata calculation. - """ + """Create an SCDL dataset and partition it into chunks.""" CHUNK_SIZE = 50 h5ad_path = make_h5ad_with_raw(tmp_path) scdl_path = tmp_path / "scdl" original = SingleCellMemMapDataset(scdl_path, h5ad_path=h5ad_path) - chunked_path = tmp_path / "chunked" header = partition_scdl(scdl_path, chunked_path, chunk_size=CHUNK_SIZE) - return original, header, chunked_path, CHUNK_SIZE @@ -63,25 +48,22 @@ def test_header_has_chunked_info(partitioned_scdl): assert header.chunked_info is not None assert header.chunked_info.total_rows == len(original) assert header.chunked_info.chunk_size == CHUNK_SIZE - expected_chunks = (len(original) + (CHUNK_SIZE - 1)) // CHUNK_SIZE - assert header.chunked_info.num_chunks == expected_chunks + assert header.chunked_info.num_chunks == (len(original) + CHUNK_SIZE - 1) // CHUNK_SIZE def test_partition_row_data_correctness(partitioned_scdl): """Partitioned chunk's row data matches original for all rows.""" original, header, chunked_path, _ = partitioned_scdl chunked_info = header.chunked_info - - # Get dtypes from header arrays (keys are enum names like "ROWPTR", not values like "row_ptr.npy") dtype_map = {arr.name: arr.dtype.numpy_dtype_string for arr in header.arrays} for global_idx in range(chunked_info.total_rows): chunk_id, local_idx = chunked_info.get_chunk_for_row(global_idx) chunk_dir = chunked_path / f"chunk_{chunk_id:05d}" - rowptr = _load_chunk_memmap(chunk_dir, FileNames.ROWPTR.value, dtype_map["ROWPTR"]) - data = _load_chunk_memmap(chunk_dir, FileNames.DATA.value, dtype_map["DATA"]) - colptr = _load_chunk_memmap(chunk_dir, FileNames.COLPTR.value, dtype_map["COLPTR"]) + rowptr = np.memmap(chunk_dir / FileNames.ROWPTR.value, dtype=dtype_map["ROWPTR"], mode="r") + data = np.memmap(chunk_dir / FileNames.DATA.value, dtype=dtype_map["DATA"], mode="r") + colptr = np.memmap(chunk_dir / FileNames.COLPTR.value, dtype=dtype_map["COLPTR"], mode="r") (orig_vals, orig_cols), _, _ = original.get_row(global_idx) chunk_vals = data[rowptr[local_idx] : rowptr[local_idx + 1]] @@ -94,10 +76,7 @@ def test_partition_row_data_correctness(partitioned_scdl): def test_header_save_load_roundtrip(partitioned_scdl): """Header with ChunkedInfo saves and loads correctly from disk.""" _, header, chunked_path, _ = partitioned_scdl - - # Load the header from the saved file - header_path = chunked_path / FileNames.HEADER.value - loaded = SCDLHeader.load(str(header_path)) + loaded = SCDLHeader.load(str(chunked_path / FileNames.HEADER.value)) assert loaded.backend == header.backend assert loaded.chunked_info is not None From cb53d6aad2c04b74bd06513e9e749a8b053630ae Mon Sep 17 00:00:00 2001 From: Polina Binder Date: Sun, 18 Jan 2026 17:08:01 -0800 Subject: [PATCH 4/7] moving the files around --- .../src/bionemo/scdl/{scripts => util}/partition_scdl.py | 0 .../tests/bionemo/scdl/{scripts => util}/test_partition_scdl.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename sub-packages/bionemo-scdl/src/bionemo/scdl/{scripts => util}/partition_scdl.py (100%) rename sub-packages/bionemo-scdl/tests/bionemo/scdl/{scripts => util}/test_partition_scdl.py (98%) diff --git a/sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py b/sub-packages/bionemo-scdl/src/bionemo/scdl/util/partition_scdl.py similarity index 100% rename from sub-packages/bionemo-scdl/src/bionemo/scdl/scripts/partition_scdl.py rename to sub-packages/bionemo-scdl/src/bionemo/scdl/util/partition_scdl.py diff --git a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py b/sub-packages/bionemo-scdl/tests/bionemo/scdl/util/test_partition_scdl.py similarity index 98% rename from sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py rename to sub-packages/bionemo-scdl/tests/bionemo/scdl/util/test_partition_scdl.py index aed28b4aa0..1a3ee346f9 100644 --- a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/test_partition_scdl.py +++ b/sub-packages/bionemo-scdl/tests/bionemo/scdl/util/test_partition_scdl.py @@ -20,7 +20,7 @@ from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset from bionemo.scdl.schema.header import SCDLHeader -from bionemo.scdl.scripts.partition_scdl import partition_scdl +from bionemo.scdl.util.partition_scdl import partition_scdl from bionemo.scdl.util.scdl_constants import Backend, FileNames From 397b89322f86ac5f0fb73faaffe2c10db5f89440 Mon Sep 17 00:00:00 2001 From: polinabinder1 Date: Sun, 18 Jan 2026 17:09:12 -0800 Subject: [PATCH 5/7] Delete sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py Signed-off-by: polinabinder1 --- .../tests/bionemo/scdl/scripts/__init__.py | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py diff --git a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py b/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py deleted file mode 100644 index 25e6abfbc5..0000000000 --- a/sub-packages/bionemo-scdl/tests/bionemo/scdl/scripts/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -# SPDX-License-Identifier: LicenseRef-Apache2 -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. From 51fd0e3fdcaa3614d6b9c7092e3661df0e2fa8ea Mon Sep 17 00:00:00 2001 From: Polina Binder Date: Sun, 18 Jan 2026 17:35:04 -0800 Subject: [PATCH 6/7] more chunked implementation --- .../scdl/io/single_cell_memmap_dataset.py | 112 +++++++++++++----- .../src/bionemo/scdl/util/partition_scdl.py | 14 ++- .../tests/bionemo/scdl/conftest.py | 19 ++- 3 files changed, 111 insertions(+), 34 deletions(-) diff --git a/sub-packages/bionemo-scdl/src/bionemo/scdl/io/single_cell_memmap_dataset.py b/sub-packages/bionemo-scdl/src/bionemo/scdl/io/single_cell_memmap_dataset.py index 89fa3f9d61..8f9a181f5f 100644 --- a/sub-packages/bionemo-scdl/src/bionemo/scdl/io/single_cell_memmap_dataset.py +++ b/sub-packages/bionemo-scdl/src/bionemo/scdl/io/single_cell_memmap_dataset.py @@ -18,6 +18,7 @@ import logging import os import shutil +import tempfile import warnings from pathlib import Path from typing import Dict, List, Optional, Tuple, Union @@ -41,6 +42,7 @@ determine_dtype, smallest_uint_dtype, ) +from bionemo.scdl.util.partition_scdl import partition_scdl from bionemo.scdl.util.scdl_constants import FLOAT_ORDER, INT_ORDER, FileNames, Mode, NeighborSamplingStrategy @@ -128,6 +130,8 @@ def __init__( self.data_path: str = data_path self.header: SCDLHeader = None self.mode: Mode = mode + self._is_chunked: bool = False + self._chunks: List[Tuple[np.ndarray, np.ndarray, np.ndarray]] = [] self.paginated_load_cutoff = paginated_load_cutoff self.load_block_row_size = load_block_row_size self.var_feature_index_name = var_feature_index_name @@ -436,10 +440,16 @@ def get_row( List[np.ndarray]: optional, corresponding variable (column) features. List[np.ndarray]: optional, corresponding observed (row) features. """ - start = self.row_index[index] - end = self.row_index[index + 1] - values = self.data[start:end] - columns = self.col_index[start:end] + if self._is_chunked: + chunk_id, local_idx = self.header.chunked_info.get_chunk_for_row(index) + data, rowptr, colptr = self._chunks[chunk_id] + start, end = rowptr[local_idx], rowptr[local_idx + 1] + values, columns = data[start:end], colptr[start:end] + else: + start = self.row_index[index] + end = self.row_index[index + 1] + values = self.data[start:end] + columns = self.col_index[start:end] ret = (values, columns) var_features = ( self._var_feature_index.lookup(index, select_features=var_feature_names)[0] @@ -685,37 +695,52 @@ def load(self, stored_path: str) -> None: raise ValueError(f"Array name {FileNames[array_info.name].value} not found in dtypes") self.dtypes[FileNames[array_info.name].value] = array_info.dtype.numpy_dtype_string - # Metadata is required, so we must check if it exists and fail if not. - if not os.path.exists(f"{self.data_path}/{FileNames.METADATA.value}"): - raise FileNotFoundError( - f"Error: the metadata file {self.data_path}/{FileNames.METADATA.value} does not exist." - ) - - with open(f"{self.data_path}/{FileNames.METADATA.value}", Mode.READ_APPEND.value) as mfi: - self.metadata = json.load(mfi) + # Load metadata if exists + metadata_path = f"{self.data_path}/{FileNames.METADATA.value}" + if os.path.exists(metadata_path): + with open(metadata_path, Mode.READ_APPEND.value) as mfi: + self.metadata = json.load(mfi) + # Load feature indices if os.path.exists(f"{self.data_path}/{FileNames.VAR_FEATURES.value}"): self._var_feature_index = VariableFeatureIndex.load(f"{self.data_path}/{FileNames.VAR_FEATURES.value}") - elif os.path.exists( - f"{self.data_path}/{FileNames.FEATURES.value}" - ): # Backward compatibility with old features file + elif os.path.exists(f"{self.data_path}/{FileNames.FEATURES.value}"): self._var_feature_index = VariableFeatureIndex.load(f"{self.data_path}/{FileNames.FEATURES.value}") if os.path.exists(f"{self.data_path}/{FileNames.OBS_FEATURES.value}"): self._obs_feature_index = ObservedFeatureIndex.load(f"{self.data_path}/{FileNames.OBS_FEATURES.value}") - # mmap the existing arrays - self.data = self._load_mmap_file_if_exists( - f"{self.data_path}/{FileNames.DATA.value}", self.dtypes[f"{FileNames.DATA.value}"] - ) - self.row_index = self._load_mmap_file_if_exists( - f"{self.data_path}/{FileNames.ROWPTR.value}", dtype=self.dtypes[f"{FileNames.ROWPTR.value}"] - ) - self.col_index = self._load_mmap_file_if_exists( - f"{self.data_path}/{FileNames.COLPTR.value}", dtype=self.dtypes[f"{FileNames.COLPTR.value}"] - ) - # Load neighbor data - if self.load_neighbors: - self._load_neighbor_memmaps() + # Load data arrays - chunked vs monolithic + if self.header is not None and self.header.backend == Backend.CHUNKED_MEMMAP_V0: + self._is_chunked = True + self._load_chunk_memmaps() + else: + self.data = self._load_mmap_file_if_exists( + f"{self.data_path}/{FileNames.DATA.value}", self.dtypes[f"{FileNames.DATA.value}"] + ) + self.row_index = self._load_mmap_file_if_exists( + f"{self.data_path}/{FileNames.ROWPTR.value}", dtype=self.dtypes[f"{FileNames.ROWPTR.value}"] + ) + self.col_index = self._load_mmap_file_if_exists( + f"{self.data_path}/{FileNames.COLPTR.value}", dtype=self.dtypes[f"{FileNames.COLPTR.value}"] + ) + if self.load_neighbors: + self._load_neighbor_memmaps() + + def _load_chunk_memmaps(self) -> None: + """Preload all chunk memmaps (lazy - just file handles, no RAM).""" + for chunk_id in range(self.header.chunked_info.num_chunks): + chunk_path = Path(self.data_path) / f"chunk_{chunk_id:05d}" + self._chunks.append( + ( + np.memmap(chunk_path / FileNames.DATA.value, dtype=self.dtypes[FileNames.DATA.value], mode="r"), + np.memmap( + chunk_path / FileNames.ROWPTR.value, dtype=self.dtypes[FileNames.ROWPTR.value], mode="r" + ), + np.memmap( + chunk_path / FileNames.COLPTR.value, dtype=self.dtypes[FileNames.COLPTR.value], mode="r" + ), + ) + ) def _write_metadata(self) -> None: with open(f"{self.data_path}/{FileNames.METADATA.value}", f"{Mode.CREATE.value}") as mfi: @@ -1218,6 +1243,8 @@ def number_of_rows(self) -> int: ValueError if the length of the number of rows in the feature index does not correspond to the number of stored rows. """ + if self._is_chunked: + return self.header.chunked_info.total_rows if len(self._var_feature_index) > 0 and self._var_feature_index.number_of_rows() != self.row_index.size - 1: raise ValueError( f"""The number of rows in the feature index {self._var_feature_index.number_of_rows()} @@ -1445,3 +1472,32 @@ def concat( mode=Mode.READ_APPEND.value, ) self.save() + + def to_chunked( + self, output_path: Optional[str] = None, chunk_size: int = 100_000, delete_original: bool = False + ) -> "SingleCellMemMapDataset": + """Convert this dataset to a chunked format for efficient remote access. + + Args: + output_path: Path where the chunked dataset will be created. If None, replaces in-place. + chunk_size: Number of rows per chunk (default: 100,000). + delete_original: If True and output_path is set, delete the original after conversion. + + Returns: + A new SingleCellMemMapDataset instance pointing to the chunked data. + """ + if self._is_chunked: + raise ValueError("Dataset is already chunked") + + src = Path(self.data_path) + if output_path is None: + # In-place: partition to temp, then swap + with tempfile.TemporaryDirectory() as tmp_dir: + tmp_path = Path(tmp_dir) / "chunked" + partition_scdl(src, tmp_path, chunk_size=chunk_size) + shutil.rmtree(src) + shutil.move(str(tmp_path), str(src)) + return SingleCellMemMapDataset(str(src)) + + partition_scdl(src, Path(output_path), chunk_size=chunk_size, delete_original=delete_original) + return SingleCellMemMapDataset(output_path) diff --git a/sub-packages/bionemo-scdl/src/bionemo/scdl/util/partition_scdl.py b/sub-packages/bionemo-scdl/src/bionemo/scdl/util/partition_scdl.py index 1affa2a596..e1f1578858 100644 --- a/sub-packages/bionemo-scdl/src/bionemo/scdl/util/partition_scdl.py +++ b/sub-packages/bionemo-scdl/src/bionemo/scdl/util/partition_scdl.py @@ -20,7 +20,6 @@ import numpy as np -from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset from bionemo.scdl.schema.header import ChunkedInfo, SCDLHeader from bionemo.scdl.util.scdl_constants import Backend, FileNames @@ -29,8 +28,11 @@ def partition_scdl( input_path: Path, output_path: Path, chunk_size: int = 100_000, + delete_original: bool = False, ) -> SCDLHeader: """Partition an SCDL dataset into chunks.""" + from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset + input_path, output_path = Path(input_path), Path(output_path) if not input_path.exists(): @@ -44,7 +46,11 @@ def partition_scdl( source_ds = SingleCellMemMapDataset(str(input_path)) total_rows = len(source_ds) rowptr = source_ds.row_index - num_chunks = (total_rows + chunk_size - 1) // chunk_size + if chunk_size <= 0: + raise ValueError(f"Chunk size must be greater than 0, got {chunk_size}") + if total_rows <= 0: + raise ValueError(f"Total rows must be greater than 0, got {total_rows}") + num_chunks = max(1, (total_rows + chunk_size - 1) // chunk_size) # Create chunks for chunk_id in range(num_chunks): @@ -78,4 +84,8 @@ def partition_scdl( header.chunked_info = ChunkedInfo(chunk_size=chunk_size, num_chunks=num_chunks, total_rows=total_rows) header.save(str(output_path / FileNames.HEADER.value)) + if delete_original: + del source_ds # Release memmap handles + shutil.rmtree(input_path) + return header diff --git a/sub-packages/bionemo-scdl/tests/bionemo/scdl/conftest.py b/sub-packages/bionemo-scdl/tests/bionemo/scdl/conftest.py index 3b8e934471..7152d7e53f 100644 --- a/sub-packages/bionemo-scdl/tests/bionemo/scdl/conftest.py +++ b/sub-packages/bionemo-scdl/tests/bionemo/scdl/conftest.py @@ -199,13 +199,24 @@ def _make(tmp_path): @pytest.fixture def make_h5ad_with_raw(make_random_csr): - """Factory to create an h5ad with uniquely randomized data for the fields .raw.X and .X""" + """Factory to create an h5ad with uniquely randomized data for .raw.X, .X, obs, and var.""" def _make(tmp_path): - X = make_random_csr(total_nnz=100, n_cols=50, seed=42) - X_raw = make_random_csr(total_nnz=100, n_cols=50, seed=43) + n_rows, n_cols = 100, 50 + X = make_random_csr(total_nnz=n_rows, n_cols=n_cols, seed=42) + X_raw = make_random_csr(total_nnz=n_rows, n_cols=n_cols, seed=43) + + obs = pd.DataFrame( + {"cell_type": [f"type_{i % 3}" for i in range(n_rows)]}, + index=[f"cell_{i}" for i in range(n_rows)], + ) + var = pd.DataFrame( + {"gene_name": [f"gene_{i}" for i in range(n_cols)]}, + index=[f"ENSG{i:08d}" for i in range(n_cols)], + ) + h = tmp_path / "var.h5ad" - ad.AnnData(X=X, var=pd.DataFrame(index=np.arange(X.shape[1])), raw={"X": X_raw}).write_h5ad(h) + ad.AnnData(X=X, obs=obs, var=var, raw={"X": X_raw}).write_h5ad(h) return h return _make From 2919a918e1ff0b1a224c90a455fe14c86544a6df Mon Sep 17 00:00:00 2001 From: Polina Binder Date: Sun, 18 Jan 2026 18:12:41 -0800 Subject: [PATCH 7/7] adding test file --- .../bionemo/scdl/io/test_chunked_dataset.py | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 sub-packages/bionemo-scdl/tests/bionemo/scdl/io/test_chunked_dataset.py diff --git a/sub-packages/bionemo-scdl/tests/bionemo/scdl/io/test_chunked_dataset.py b/sub-packages/bionemo-scdl/tests/bionemo/scdl/io/test_chunked_dataset.py new file mode 100644 index 0000000000..c4594e1544 --- /dev/null +++ b/sub-packages/bionemo-scdl/tests/bionemo/scdl/io/test_chunked_dataset.py @@ -0,0 +1,67 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: LicenseRef-Apache2 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for chunked SingleCellMemMapDataset functionality.""" + +import numpy as np +import pytest + +from bionemo.scdl.io.single_cell_memmap_dataset import SingleCellMemMapDataset +from bionemo.scdl.util.scdl_constants import Backend + + +def test_to_chunked(tmp_path, make_h5ad_with_raw): + """Convert to chunked, verify data and features match.""" + h5ad_path = make_h5ad_with_raw(tmp_path) + original = SingleCellMemMapDataset(tmp_path / "orig", h5ad_path=h5ad_path) + chunked = original.to_chunked(str(tmp_path / "chunked"), chunk_size=30) + + # Basic properties + assert chunked._is_chunked + assert chunked.header.backend == Backend.CHUNKED_MEMMAP_V0 + assert len(chunked) == len(original) + + # Data matches + for idx in range(len(original)): + (orig_vals, orig_cols), _, _ = original.get_row(idx) + (chunk_vals, chunk_cols), _, _ = chunked.get_row(idx) + np.testing.assert_array_equal(orig_vals, chunk_vals) + np.testing.assert_array_equal(orig_cols, chunk_cols) + + # Features preserved + assert len(chunked._var_feature_index) == len(original._var_feature_index) + assert chunked._obs_feature_index.number_of_rows() == original._obs_feature_index.number_of_rows() + + +def test_to_chunked_inplace(tmp_path, make_h5ad_with_raw): + """In-place conversion replaces original with chunked.""" + h5ad_path = make_h5ad_with_raw(tmp_path) + scdl_path = tmp_path / "scdl" + SingleCellMemMapDataset(scdl_path, h5ad_path=h5ad_path) + + chunked = SingleCellMemMapDataset(scdl_path).to_chunked(chunk_size=30) + + assert chunked._is_chunked + assert chunked.data_path == str(scdl_path) + + +def test_to_chunked_already_chunked_raises(tmp_path, make_h5ad_with_raw): + """Cannot chunk an already chunked dataset.""" + h5ad_path = make_h5ad_with_raw(tmp_path) + original = SingleCellMemMapDataset(tmp_path / "orig", h5ad_path=h5ad_path) + chunked = original.to_chunked(str(tmp_path / "chunked"), chunk_size=30) + + with pytest.raises(ValueError, match="already chunked"): + chunked.to_chunked()