diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0304133..6ee4177 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,9 +16,12 @@ jobs: test: strategy: matrix: - os: [ubuntu-latest, windows-latest] # 'macos-latest' failing on github-ci with memory error + os: [ubuntu-latest, windows-latest, macos-latest] # failing on github-ci with memory error backend: ['torch','jax'] # 'tensorflow' is excluded since on github-ci tests get stuck for a yet unknown reason - python-version: ['3.10','3.11','3.12'] + # macos-latest: + # - misses support for ratarmount > index_zstd + # - for >=3.13 indexed_zstd/libzstd-seek/zstd-seek.h:20:10: fatal error: zstd.h: No such file or directory + python-version: ['3.10','3.11','3.12','3.13','3.14'] runs-on: ${{ matrix.os }} steps: - name: Remove unnecessary tools (Linux) @@ -34,11 +37,19 @@ jobs: with: python-version: ${{ matrix.python-version }} + - name: Install dependencies ${{ matrix.os }} + if: runner.os == 'Linux' + run: | + # ensure zstd.h is available + sudo apt update && sudo apt install -y libzstd-dev + - name: Install pip run: python -m pip install -U pip - name: Install package - run: python -m pip install -e .[ml,dev,test] + run: | + python -m pip install -e .[dev,test] + python -m pip install ${{ matrix.backend }} # https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/accessing-contextual-information-about-workflow-runs#runner-context - name: Set Env (Windows) @@ -57,6 +68,13 @@ jobs: run: | echo "KERAS_BACKEND=${{ matrix.backend }}" >> $GITHUB_ENV echo "PYTORCH_MPS_HIGH_WATERMARK_RATIO=0.0" >> $GITHUB_ENV + if [ -d "/opt/homebrew/bin" ]; then + echo "/opt/homebrew/bin" >> $GITHUB_PATH + elif [ -d "/usr/local/bin" ]; then + echo "/usr/local/bin" >> $GITHUB_PATH + fi + brew install zstd + # Run keras backend specific tests (for keras>=3) # Due to space issues avoid creating too many venvs (via tox) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c11a17f..a6602e4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,14 +7,14 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.14.3 + rev: v0.15.1 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] # https://pycqa.github.io/isort/docs/configuration/black_compatibility.html - repo: https://github.com/pycqa/isort - rev: 5.12.0 + rev: 8.0.1 hooks: - id: isort args: ["--profile", "black", "--filter-files"] diff --git a/docs/examples/damast-process-pipeline.py b/docs/examples/damast-process-pipeline.py index f6a74ed..64bc881 100644 --- a/docs/examples/damast-process-pipeline.py +++ b/docs/examples/damast-process-pipeline.py @@ -1,3 +1,5 @@ +from pathlib import Path + from damast.core import DataProcessingPipeline from damast.data_handling.transformers import AddDeltaTime from damast.domains.maritime.transformers.features import DeltaDistance, Speed diff --git a/pyproject.toml b/pyproject.toml index 96a0b7b..5c2aa52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,8 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "License :: OSI Approved :: BSD License", ] @@ -44,12 +46,14 @@ dependencies = [ "networkx", "numba", "numpy>=2", + "pandas", "polars>=1.36.1", "psutil", "pyais", "pyarrow", "pydantic>=2.0", - "ratarmount>=1.1", + "pydot", + "ratarmount>=1.2.1; platform_system != 'Darwin'", "scikit-learn", "tables", "tqdm", @@ -81,7 +85,7 @@ ml = [ # BEGIN keras backends "torch", "jax[cpu]", - "tensorflow" + "tensorflow; python_version <= '3.13'" # END backends ] diff --git a/src/damast/core/__init__.py b/src/damast/core/__init__.py index fab709a..20654c6 100644 --- a/src/damast/core/__init__.py +++ b/src/damast/core/__init__.py @@ -15,14 +15,14 @@ describe, input, output, - ) +) from .metadata import ( ArtifactSpecification, DataSpecification, History, MetaData, ValidationMode, - ) +) __all__ = [ "AnnotatedDataFrame", diff --git a/src/damast/core/dataframe.py b/src/damast/core/dataframe.py index 87e1fc3..845843b 100644 --- a/src/damast/core/dataframe.py +++ b/src/damast/core/dataframe.py @@ -20,7 +20,7 @@ DAMAST_CSV_DEFAULT_ARGS, DAMAST_SPEC_SUFFIX, DAMAST_SUPPORTED_FILE_FORMATS, - ) +) from .data_description import ListOfValues, MinMax from .metadata import DataSpecification, MetaData, ValidationMode from .types import DataFrame, XDataFrame diff --git a/src/damast/core/decorators.py b/src/damast/core/decorators.py index b2984dc..e1ad8c9 100644 --- a/src/damast/core/decorators.py +++ b/src/damast/core/decorators.py @@ -10,7 +10,7 @@ DECORATED_DESCRIPTION, DECORATED_INPUT_SPECS, DECORATED_OUTPUT_SPECS, - ) +) from .metadata import ArtifactSpecification, DataSpecification from .transformations import PipelineElement diff --git a/src/damast/core/polars_dataframe.py b/src/damast/core/polars_dataframe.py index a409738..58f7057 100644 --- a/src/damast/core/polars_dataframe.py +++ b/src/damast/core/polars_dataframe.py @@ -289,7 +289,7 @@ def open(cls, path: str | Path, sep = ',') -> polars.LazyFrame: raise ValueError(f"{cls.__name__}.load_data: Unsupported input file format {path.suffix}") @classmethod - def from_vaex_hdf5(cls, path: str | Path) -> tuple[polars.LazyFrame, 'MetaData']: + def from_vaex_hdf5(cls, path: str | Path) -> tuple[polars.LazyFrame, 'MetaData']: # noqa """ Load hdf5 file and (damast) metadata if found in the file. """ @@ -337,7 +337,7 @@ def from_vaex_hdf5(cls, path: str | Path) -> tuple[polars.LazyFrame, 'MetaData'] return polars.LazyFrame(data), metadata @classmethod - def import_netcdf(cls, path: list[str|Path]) -> tuple[polars.LazyFrame, dict[str, 'MetaData']]: + def import_netcdf(cls, path: list[str|Path]) -> tuple[polars.LazyFrame, dict[str, 'MetaData']]: #noqa ensure_packages(pkgs=["dask", "xarray", "pandas"], required_for="Loading netcdf files", @@ -356,7 +356,7 @@ def import_netcdf(cls, path: list[str|Path]) -> tuple[polars.LazyFrame, dict[str return df.lazy(), {} @classmethod - def import_hdf5(cls, files: str | Path | list[str|Path]) -> tuple[polars.LazyFrame, dict[str, 'MetaData']]: + def import_hdf5(cls, files: str | Path | list[str|Path]) -> tuple[polars.LazyFrame, dict[str, 'MetaData']]: # noqa """ Import a dataframe stored as HDF5. diff --git a/src/damast/core/transformations.py b/src/damast/core/transformations.py index e747209..2b13324 100644 --- a/src/damast/core/transformations.py +++ b/src/damast/core/transformations.py @@ -17,7 +17,7 @@ DECORATED_DESCRIPTION, DECORATED_INPUT_SPECS, DECORATED_OUTPUT_SPECS, - ) +) from .formatting import DEFAULT_INDENT @@ -43,13 +43,13 @@ def fit_transform(self, df: AnnotatedDataFrame, other: AnnotatedDataFrame | None class PipelineElement(Transformer): #: Pipeline in which context this processor will be run - parent_pipeline: 'DataProcessingPipeline' + parent_pipeline: 'DataProcessingPipeline' #noqa #: Map names of input and outputs for a particular pipeline _name_mappings: dict[str, dict[str, str]] #: Map names of datasource (arguments) to a specific (extra) transformer arguments - def set_parent(self, pipeline: 'DataProcessingPipeline'): + def set_parent(self, pipeline: 'DataProcessingPipeline'): #noqa """ Sets the parent pipeline for this pipeline element diff --git a/src/damast/data_handling/accessors.py b/src/damast/data_handling/accessors.py index dcc0559..05dac38 100644 --- a/src/damast/data_handling/accessors.py +++ b/src/damast/data_handling/accessors.py @@ -4,6 +4,7 @@ import logging import random +import sys import time from typing import Any, List, Optional, Union @@ -13,6 +14,7 @@ import polars as pl from damast.core.types import DataFrame, XDataFrame +from damast.ml import keras __all__ = [ "GroupSequenceAccessor", @@ -21,6 +23,16 @@ logger = logging.getLogger("damast") +if sys.platform == "darwin": + # Handle "Cannot convert a MPS Tensor to float64 dtype as the MPS framework doesn't support float64. Please use float32 instead" + def _mps_precision(data): + if data.dtype == np.float64: + return data.astype(np.float32) + return data +else: + def _mps_precision(data): + return data + # https://www.tensorflow.org/tutorials/structured_data/time_series class GroupSequenceAccessor: """ @@ -60,7 +72,7 @@ def __init__(self, self.groups = df.unique(group_column) if sort_columns is not None: - self.sort_columns = sort_columns if type(sort_columns) == list else [sort_columns] + self.sort_columns = sort_columns if type(sort_columns) is list else [sort_columns] else: self.sort_columns = sort_columns @@ -178,7 +190,7 @@ def to_keras_generator(self, features: List[str], f" got {datatypes}") if use_target: - target = target if type(target) == list else [target] + target = target if type(target) is list else [target] if sequence_forecast < 0: raise ValueError(f"{self.__class__.__name__}: Sequence forecast cannot be negative") @@ -219,7 +231,7 @@ def _generator(features: List[str], target: Optional[List[str]], all_columns += self.sort_columns use_target = target is not None if use_target: - target = target if type(target) == list else target + target = target if type(target) is list else target all_columns += target else: # If no target, we are not forecasting @@ -288,12 +300,12 @@ def _generator(features: List[str], target: Optional[List[str]], # target it the last step in the timeline, so the last target_chunk.append(target_window.to_numpy()) - X = np.array(chunk) + X = _mps_precision(np.array(chunk)) if use_target: if np.lib.NumpyVersion(np.__version__) >= '2.0.0': - y = np.array(target_chunk) + y = _mps_precision(np.array(target_chunk)) else: - y = np.array(target_chunk, copy=False) + y = _mps_precision(np.array(target_chunk, copy=False)) yield (X, y) else: yield (X,) @@ -408,7 +420,7 @@ def to_keras_generator(self, features, raise ValueError(f"{self.__class__.__name__}: Sequence forecast cannot be negative") if use_target: - target = target if type(target) == list else target + target = target if type(target) is list else target if sequence_forecast == 0: raise ValueError(f"{self.__class__.__name__}: Cannot do extract targets with no sequence forecast") else: diff --git a/src/damast/data_handling/transformers/__init__.py b/src/damast/data_handling/transformers/__init__.py index 5ec62f5..054662e 100644 --- a/src/damast/data_handling/transformers/__init__.py +++ b/src/damast/data_handling/transformers/__init__.py @@ -9,7 +9,7 @@ ChangeTypeColumn, JoinDataFrameByColumn, MultiplyValue, - ) +) from .filters import DropMissingOrNan, FilterWithin, RemoveValueRows from .normalizers import normalize diff --git a/src/damast/data_handling/transformers/augmenters.py b/src/damast/data_handling/transformers/augmenters.py index 04a21cd..e8e5f13 100644 --- a/src/damast/data_handling/transformers/augmenters.py +++ b/src/damast/data_handling/transformers/augmenters.py @@ -363,7 +363,7 @@ def __init__(self, new_type: Any): """Constructor""" if isinstance(new_type, pl.datatypes.classes.DataTypeClass): self.new_type = new_type - elif type(new_type) == str: + elif type(new_type) is str: polar_type = new_type.capitalize() if not hasattr(pl.datatypes.classes, polar_type): raise TypeError(f"Type {new_type} has not correspondence in 'polars'") diff --git a/src/damast/domains/maritime/ais/data_generator.py b/src/damast/domains/maritime/ais/data_generator.py index bffdc31..c7695dc 100644 --- a/src/damast/domains/maritime/ais/data_generator.py +++ b/src/damast/domains/maritime/ais/data_generator.py @@ -17,7 +17,7 @@ ColumnName, CourseOverGround, SpeedOverGround, - ) +) _log: Logger = getLogger(__name__) _log.setLevel(INFO) diff --git a/src/damast/domains/maritime/data_processing.py b/src/damast/domains/maritime/data_processing.py index 6d34432..40cadad 100644 --- a/src/damast/domains/maritime/data_processing.py +++ b/src/damast/domains/maritime/data_processing.py @@ -18,7 +18,7 @@ DropMissingOrNan, FilterWithin, RemoveValueRows, - ) +) from damast.data_handling.transformers.augmenters import AddLocalIndex from damast.domains.maritime.ais import vessel_types from damast.domains.maritime.transformers import AddVesselType, ComputeClosestAnchorage diff --git a/src/damast/domains/maritime/transformers/augmenters.py b/src/damast/domains/maritime/transformers/augmenters.py index 9bfdf1b..af11572 100644 --- a/src/damast/domains/maritime/transformers/augmenters.py +++ b/src/damast/domains/maritime/transformers/augmenters.py @@ -73,7 +73,7 @@ def load_data(cls, try: return XDataFrame.open(path=filename, sep=sep) except FileNotFoundError: - raise RuntimeError(f"{cls}: Vessel type information not accessible. File {vessel_type_csv} not found") + raise RuntimeError(f"{cls}: Vessel type information not accessible. File {filename} not found") @damast.core.describe("Compute distance from dataset to closest anchorage") @damast.core.input({"x": {"representation_type": float, "unit": damast.core.units.units.deg}, diff --git a/src/damast/ml/__init__.py b/src/damast/ml/__init__.py index e69de29..25d0f82 100644 --- a/src/damast/ml/__init__.py +++ b/src/damast/ml/__init__.py @@ -0,0 +1,35 @@ +import importlib +import logging +import os + +logger = logging.getLogger(__name__) + +# To ensure that autoloading works as expected +# from damast.ml import keras +def backend_available(backend: str) -> bool: + try: + importlib.import_module(backend) + return True + except ImportError: + return False + +def autodiscover_backend(priority: list[str] = ["torch", "tensorflow", "jax"]): + for backend in priority: + if backend_available(backend): + logger.warning(f"Autoloading backend: {backend} - to explicity select backend set env variable KERAS_BACKEND") + os.environ['KERAS_BACKEND'] = backend + break + else: + logger.info(f"Ignoring backend: {backend}, library is not available") + +if "KERAS_BACKEND" in os.environ: + backend = os.environ['KERAS_BACKEND'] + if not backend_available(backend): + raise RuntimeError( + "Keras backend: select backend '{backend}' is not available. " + "Please install the required package(s)." + ) +else: + autodiscover_backend() + +import keras # noqa diff --git a/src/damast/ml/experiments.py b/src/damast/ml/experiments.py index f9cb83d..060f4c8 100644 --- a/src/damast/ml/experiments.py +++ b/src/damast/ml/experiments.py @@ -12,7 +12,6 @@ from pathlib import Path from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Union -import keras import numpy as np import polars as pl import yaml @@ -21,6 +20,7 @@ from damast.core.dataprocessing import DataProcessingPipeline from damast.core.types import DataFrame from damast.data_handling.accessors import GroupSequenceAccessor +from damast.ml import keras from damast.ml.models.base import BaseModel, ModelInstanceDescription basicConfig() @@ -142,7 +142,7 @@ def __iter__(self): yield "training_parameters", self.training_parameters._asdict() # type: ignore def __eq__(self, other) -> bool: - if type(self) != type(other): + if type(self) is not type(other): return False for attr in ["pipeline", "features", "targets", "models"]: @@ -179,7 +179,7 @@ def __init__(self, *, self.forecast_length = forecast_length def __eq__(self, other): - if type(self) != type(other): + if type(self) is not type(other): return False for attr in ["pipeline", "features", "targets", "models", "group_column", "sequence_length", @@ -249,7 +249,7 @@ def __init__(self, raise ValueError(f"{self.__class__.__name__}.__init__: learning_task must be either" f"dict or LearningTask object") - if type(input_data) == list: + if type(input_data) is list: self.input_data = input_data else: self.input_data = [ Path(input_data) ] diff --git a/src/damast/ml/models/base.py b/src/damast/ml/models/base.py index 9f929a2..b90e8d4 100644 --- a/src/damast/ml/models/base.py +++ b/src/damast/ml/models/base.py @@ -6,15 +6,15 @@ from abc import ABC, abstractmethod from pathlib import Path from tempfile import gettempdir -from typing import ClassVar, NamedTuple, OrderedDict +from typing import ClassVar, Generator, NamedTuple, OrderedDict -import keras import keras.callbacks import keras.utils import pandas as pd from damast.core import DataSpecification from damast.core.types import DataFrame, XDataFrame +from damast.ml import keras __all__ = [ "BaseModel", @@ -190,8 +190,8 @@ def evaluation_file(self) -> Path: return self.model_dir / f'evaluation-{CHECKPOINT_BEST}.csv' def train(self, - training_data: tf.data.Dataset, - validation_data: tf.data.Dataset, + training_data: Generator, + validation_data: Generator, monitor: str = "val_loss", mode: str = "min", epochs: int = 1, @@ -270,7 +270,7 @@ def save(self) -> Path: def evaluate(self, label: str, - evaluation_data: tf.data.Dataset, + evaluation_data: Generator, **kwargs) -> dict[str, any]: """ Evaluate this model. diff --git a/src/damast/ml/worker.py b/src/damast/ml/worker.py index b0f4fcb..6749ba9 100644 --- a/src/damast/ml/worker.py +++ b/src/damast/ml/worker.py @@ -70,7 +70,7 @@ def predict(self, msg = connection.recvmsg(4) # Get the length field data = msg[0].decode() - if type(data) == str and data == ControlCommand.STOP.value: + if type(data) is str and data == ControlCommand.STOP.value: break data = next(gen_predict) diff --git a/src/damast/utils/__init__.py b/src/damast/utils/__init__.py index e81cb25..e11b9ef 100644 --- a/src/damast/utils/__init__.py +++ b/src/damast/utils/__init__.py @@ -1,5 +1,5 @@ -import sys import importlib +import sys if sys.version_info < (3,11): import iso8601 diff --git a/src/damast/utils/io.py b/src/damast/utils/io.py index 732ea52..e2e5fd6 100644 --- a/src/damast/utils/io.py +++ b/src/damast/utils/io.py @@ -1,23 +1,25 @@ +import io import logging import shutil import subprocess import tempfile import time import warnings +import zipfile +from enum import Enum from pathlib import Path -from typing import Callable, ClassVar +from typing import Callable from damast.core.constants import DAMAST_MOUNT_PREFIX -DAMAST_ARCHIVE_SUPPORT_AVAILABLE = False -try: - from ratarmountcore.compressions import ARCHIVE_FORMATS, COMPRESSION_FORMATS - DAMAST_ARCHIVE_SUPPORT_AVAILABLE = True -except Exception: - warnings.warn("ratarmount could not be loaded: archive support is not available") - logger = logging.getLogger(__name__) + +class ArchiveBackend(str, Enum): + ZIPFILE = 'zipfile' + RATARMOUNT = 'ratarmount' + + class Archive: """ Class to wrap and extract archive objects using ratarmount @@ -28,25 +30,54 @@ class Archive: _extracted_files: list[str] _mounted_dirs: list[Path] - _supported_suffixes: ClassVar[list[str]] = None - - @classmethod - def supported_suffixes(cls): + _supported_suffixes: list[str] = None + _backend: ArchiveBackend = None + + def autoload_backend(self): + try: + from ratarmountcore.compressions import ( # noqa + ARCHIVE_FORMATS, + COMPRESSION_FORMATS, + ) + self._backend = ArchiveBackend.RATARMOUNT + except Exception: + warnings.warn("ratarmount could not be loaded: falling back to zipfile-based support") + self._backend = ArchiveBackend.ZIPFILE + + def supported_suffixes(self): """ Get the list of suffixes for archives and compressed files which are supported """ - if not DAMAST_ARCHIVE_SUPPORT_AVAILABLE: - return [] + if not self._backend: + raise RuntimeError("Archive.supported_suffixes: ensure that backend is set, e.g., call 'autoload_backend' first") + + fn_name = f"supported_suffixes_{self._backend.value}" + if not hasattr(self, fn_name): + raise RuntimeError(f"Missing implementation for {fn_name}") + + return getattr(self, fn_name)() + + def supported_suffixes_zipfile(self): + if self._supported_suffixes is not None: + return self._supported_suffixes + + self._supported_suffixes = ["zip"] + return self._supported_suffixes - if cls._supported_suffixes is not None: - return cls._supported_suffixes + def supported_suffixes_ratarmount(self): + if self._supported_suffixes is not None: + return self._supported_suffixes - cls._supported_suffixes = [] + from ratarmountcore.compressions import ( # noqa + ARCHIVE_FORMATS, + COMPRESSION_FORMATS, + ) + self._supported_suffixes = [] for k, v in COMPRESSION_FORMATS.items(): - cls._supported_suffixes += v.extensions + self._supported_suffixes += v.extensions for k, v in ARCHIVE_FORMATS.items(): - cls._supported_suffixes += v.extensions - return cls._supported_suffixes + self._supported_suffixes += v.extensions + return self._supported_suffixes def __enter__(self) -> list[str]: """ @@ -65,7 +96,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __init__(self, filenames: list[str], - filter_fn: Callable[[str], bool] | None = None): + filter_fn: Callable[[str], bool] | None = None, + backend: ArchiveBackend = None + ): + if backend: + self._backend = backend + elif self._backend is None: + self.autoload_backend() + self.filenames = sorted(filenames) if filter_fn is None: @@ -76,11 +114,11 @@ def __init__(self, self._mounted_dirs = [] self._extracted_files = [] - def ratarmount(self, file, target): + def mount_ratarmount(self, file, target): """ Call ratarmount to mount and archive """ - if not DAMAST_ARCHIVE_SUPPORT_AVAILABLE: + if not self._backend == ArchiveBackend.RATARMOUNT: raise RuntimeError("damast.utils.io.Archive: " "cannot load archive." " 'ratarmount' support is not available." @@ -92,19 +130,43 @@ def ratarmount(self, file, target): self._mounted_dirs.append(target) + def mount_zipfile(self, file, target): + """ + Use zipfile to mount and archive + """ + with zipfile.ZipFile(file, "r") as f: + for file_in_zip in f.namelist(): + if Path(file_in_zip).suffix != ".zip": + f.extract(file_in_zip, target) + continue + + dirname = Path(file_in_zip).parent + extract_dir = target / dirname + extract_dir.mkdir(parents=True, exist_ok=True) + + # read inner zip file into bytes buffer + zip_content = io.BytesIO(f.read(file_in_zip)) + inner_zip_file = zipfile.ZipFile(zip_content) + for i in inner_zip_file.namelist(): + inner_zip_file.extract(i, extract_dir) + + self._mounted_dirs.append(target) def umount(self): """ Umount the archive """ - for mounted_dir in list(reversed(self._mounted_dirs)): - for count in range(0,5): - time.sleep(0.5) - response = subprocess.run(["fusermount", "-u", mounted_dir]) - if response.returncode == 0: - break - else: - logger.debug(f"Retrying to unmount {mounted_dir}") + if self._backend == ArchiveBackend.RATARMOUNT: + for mounted_dir in list(reversed(self._mounted_dirs)): + for count in range(0,5): + time.sleep(0.5) + + response = subprocess.run(["ratarmount", "-u", mounted_dir]) + + if response.returncode == 0: + break + else: + logger.debug(f"Retrying to unmount {mounted_dir}") for mounted_dir in self._mounted_dirs: if Path(mounted_dir).exists(): @@ -121,12 +183,13 @@ def mount(self) -> list[str]: local_mount = tempfile.mkdtemp(prefix=DAMAST_MOUNT_PREFIX) for file in self.filenames: - if Path(file).suffix[1:] in Archive.supported_suffixes(): + if Path(file).suffix[1:] in self.supported_suffixes(): logger.info(f"Archive.mount: found archive: {file}") target_mount = Path(local_mount) / Path(file).name target_mount.mkdir(parents=True, exist_ok=True) - self.ratarmount(file, target_mount) + fn = getattr(self, f"mount_{self._backend.value}") + fn(file, target_mount) decompressed_files = [x for x in Path(target_mount).glob("**/*") if Path(x).is_file()] for idx, x in enumerate(decompressed_files): diff --git a/test_ais.csv.zip b/test_ais.csv.zip new file mode 100644 index 0000000..05c22ce Binary files /dev/null and b/test_ais.csv.zip differ diff --git a/tests/damast/cli/test_cli.py b/tests/damast/cli/test_cli.py index 24d4d06..5da6c96 100644 --- a/tests/damast/cli/test_cli.py +++ b/tests/damast/cli/test_cli.py @@ -1,3 +1,4 @@ + import re import sys from argparse import ArgumentParser @@ -49,7 +50,7 @@ def test_subparser(name, klass, script_runner): assert result.returncode == 0 test_parser = ArgumentParser() - subparser = klass(parser=test_parser) + klass(parser=test_parser) for a in test_parser._actions: if a.help == "==SUPPRESS==": @@ -85,7 +86,24 @@ def test_annotate(data_path, filename, spec_filename, tmp_path, script_runner): with open(data_path / spec_filename, "r") as f: expected_spec = yaml.load(f, Loader=yaml.SafeLoader) expected_spec["annotations"]["source"] = [str(data_path / filename)] - assert written_spec == expected_spec + + assert written_spec['annotations'] == expected_spec['annotations'] + + written_columns = written_spec['columns'] + expected_columns = expected_spec['columns'] + + for idx, column in enumerate(written_columns): + for field in column: + if type(field) is float: + column.assert_approx(expected_columns[idx][field]) + elif type(field) is dict: + for subfield in field: + expected_subfield_value = expected_columns[idx][field][subfield] + written_subfield_value = field[subfield] + expected_subfield_value.assert_approx(written_subfield_value) + else: + column == expected_columns[idx][field] + @pytest.mark.parametrize("filename, spec_filename", [ ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] @@ -130,24 +148,24 @@ def test_convert_zip(data_path, filename, spec_filename, tmp_path, script_runner assert result.returncode == 0 assert output_file.exists() -@pytest.mark.parametrize("filename, spec_filename", [ - ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] -]) -def test_fail_convert_zip(data_path, filename, spec_filename, tmp_path, script_runner, monkeypatch): - import damast - monkeypatch.setattr(damast.utils.io, "DAMAST_ARCHIVE_SUPPORT_AVAILABLE", False) - - output_zip = tmp_path / f"{Path(filename)}.zip" - with ZipFile(output_zip, 'w') as f: - f.write(str(data_path / filename), arcname=filename) - f.write(str(data_path / spec_filename), arcname=spec_filename) - - assert Path(output_zip).exists() - output_file = Path(tmp_path) / (Path(filename).stem + ".parquet") - - result = script_runner.run(['damast', 'convert', '-f', output_zip, '--output-dir', tmp_path]) - assert result.returncode == 1 - assert not output_file.exists() +#@pytest.mark.parametrize("filename, spec_filename", [ +# ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] +#]) +#def test_fail_convert_zip(data_path, filename, spec_filename, tmp_path, script_runner, monkeypatch): +# import damast +# monkeypatch.setattr(damast.utils.io, "DAMAST_ARCHIVE_SUPPORT_AVAILABLE", False) +# +# output_zip = tmp_path / f"{Path(filename)}.zip" +# with ZipFile(output_zip, 'w') as f: +# f.write(str(data_path / filename), arcname=filename) +# f.write(str(data_path / spec_filename), arcname=spec_filename) +# +# assert Path(output_zip).exists() +# output_file = Path(tmp_path) / (Path(filename).stem + ".parquet") +# +# result = script_runner.run(['damast', 'convert', '-f', output_zip, '--output-dir', tmp_path]) +# assert result.returncode == 1 +# assert not output_file.exists() @pytest.mark.skipif(sys.platform.startswith("win"), reason="ratarmount does not (easily) run on windows") diff --git a/tests/damast/core/test_dataframe.py b/tests/damast/core/test_dataframe.py index 2a36c78..91ecfdf 100644 --- a/tests/damast/core/test_dataframe.py +++ b/tests/damast/core/test_dataframe.py @@ -16,7 +16,7 @@ DataSpecification, MetaData, ValidationMode, - ) +) from damast.core.types import XDataFrame diff --git a/tests/damast/core/test_dataprocessing.py b/tests/damast/core/test_dataprocessing.py index 6dcf989..103ec3c 100644 --- a/tests/damast/core/test_dataprocessing.py +++ b/tests/damast/core/test_dataprocessing.py @@ -11,7 +11,7 @@ DAMAST_DEFAULT_DATASOURCE, DECORATED_INPUT_SPECS, DECORATED_OUTPUT_SPECS, - ) +) from damast.core.metadata import DataCategory, DataSpecification, MetaData from damast.core.transformations import MultiCycleTransformer from damast.core.types import XDataFrame @@ -487,7 +487,7 @@ def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame: assert getattr(TransformX.transform, DECORATED_OUTPUT_SPECS)[0].name == "{{" + varname + "}}_suffix" for node in pipeline.processing_graph.nodes(): - name = node.name + assert node.name transformer = node.transformer assert transformer.input_specs[DAMAST_DEFAULT_DATASOURCE][0].name == "status" @@ -588,7 +588,7 @@ def test_join_operation(data_path, tmp_path): osint_df = AnnotatedDataFrame.from_file(osint_csv, metadata_required=False) with pytest.raises(TypeError, match="expected an annotated dataframe"): - new_adf = pipeline.transform(ais_csv, osint=osint_df) + pipeline.transform(ais_csv, osint=osint_df) joined_df = pipeline.transform(ais_df, osint=osint_df) @@ -626,7 +626,7 @@ def test_join_pipeline(data_path, tmp_path): osint_df = AnnotatedDataFrame.from_file(osint_csv, metadata_required=False) with pytest.raises(TypeError, match="expected an annotated dataframe"): - new_adf = pipeline.transform(ais_csv, osint=osint_df) + pipeline.transform(ais_csv, osint=osint_df) joined_df = pipeline.transform(ais_df, osint=osint_df) diff --git a/tests/damast/core/test_metadata.py b/tests/damast/core/test_metadata.py index caab6da..c729c72 100644 --- a/tests/damast/core/test_metadata.py +++ b/tests/damast/core/test_metadata.py @@ -14,7 +14,7 @@ MetaData, Status, ValidationMode, - ) +) from damast.core.types import XDataFrame diff --git a/tests/damast/core/test_processing_graph.py b/tests/damast/core/test_processing_graph.py index 19fdaa5..f098718 100644 --- a/tests/damast/core/test_processing_graph.py +++ b/tests/damast/core/test_processing_graph.py @@ -15,6 +15,8 @@ def test_processing_graph(): graph.add(node=n0) graph.add(node=n1) graph.add(node=n2) + graph.add(node=n3) + graph.add(node=n4) for n in graph.nodes(): node = graph[n.uuid] diff --git a/tests/damast/domains/maritime/ais/test_augmenters.py b/tests/damast/domains/maritime/ais/test_augmenters.py index 8f6e947..ff5c95a 100644 --- a/tests/damast/domains/maritime/ais/test_augmenters.py +++ b/tests/damast/domains/maritime/ais/test_augmenters.py @@ -13,7 +13,7 @@ from damast.data_handling.transformers.augmenters import ( AddLocalIndex, AddUndefinedValue, - ) +) from damast.domains.maritime.ais import AISNavigationalStatus from damast.domains.maritime.ais.vessel_types import VesselType from damast.domains.maritime.data_specification import ColumnName @@ -23,7 +23,7 @@ AddVesselType, ComputeClosestAnchorage, DeltaDistance, - ) +) def test_add_missing_ais_status(tmp_path): @@ -296,7 +296,6 @@ def test_message_index(tmp_path): data = [] num_messages_A = 90 num_messages_B = num_messages_A - 2 - num_messages_C = 1 # Insert last message first to validate time-based sorting data.append([mmsi_a, timestamp + dt.timedelta(seconds=num_messages_A), lat, lon, num_messages_A-1, 0]) diff --git a/tests/damast/domains/maritime/ais/test_vessel_types.py b/tests/damast/domains/maritime/ais/test_vessel_types.py index 0a64b37..fd66d58 100644 --- a/tests/damast/domains/maritime/ais/test_vessel_types.py +++ b/tests/damast/domains/maritime/ais/test_vessel_types.py @@ -5,7 +5,7 @@ Fishing, PotsAndTraps, VesselType, - ) +) def test_vessel_types(): diff --git a/tests/damast/domains/maritime/test_data_processing.py b/tests/damast/domains/maritime/test_data_processing.py index 71279cd..d14869f 100644 --- a/tests/damast/domains/maritime/test_data_processing.py +++ b/tests/damast/domains/maritime/test_data_processing.py @@ -192,7 +192,10 @@ def test_aggregate(data_path, tmp_path): pipeline = MyPipeline(workdir=tmp_path) df = pipeline.transform(adf, verbose=True) - path = data_path / "test_ais.csv" + assert df.metadata['lat'].unit == 'deg' + assert df.metadata['lon'].unit == 'deg' + + assert "delta_distance" in df.column_names def test_aggregate_with_name_mapping(data_path, tmp_path): path = data_path / "test_ais.csv" diff --git a/tests/damast/ml/test_experiments.py b/tests/damast/ml/test_experiments.py index 9406a79..669af05 100644 --- a/tests/damast/ml/test_experiments.py +++ b/tests/damast/ml/test_experiments.py @@ -2,7 +2,6 @@ from collections import OrderedDict from pathlib import Path -import keras import pytest import damast @@ -13,23 +12,19 @@ from damast.core.transformations import MultiCycleTransformer from damast.core.units import units from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec +from damast.ml import keras from damast.ml.experiments import ( Experiment, ForecastTask, LearningTask, ModelInstanceDescription, TrainingParameters, - ) +) from damast.ml.models.base import BaseModel os.environ["COLUMNS"] = '120' -class ModelA(BaseModel): - def __init_model(self): - pass - - class TransformerA(PipelineElement): @damast.input({"x": {}}) @damast.output({"x": {}}) @@ -201,7 +196,7 @@ def test_validate_experiment_dir(tmp_path): with pytest.raises(NotADirectoryError): filename = tmp_path / "dummy-file" - with open(filename, "w") as f: + with open(filename, "w"): pass Experiment.validate_experiment_dir(dir=filename) @@ -214,7 +209,7 @@ def test_validate_experiment_dir(tmp_path): Experiment.touch_marker(dir=f"{tmp_path}-does-not-exist") with pytest.raises(NotADirectoryError): file_in_dir = tmp_path / "test_file" - with open(file_in_dir, "a") as f: + with open(file_in_dir, "a"): pass Experiment.touch_marker(dir=file_in_dir) diff --git a/tests/damast/ml/test_scheduler_worker.py b/tests/damast/ml/test_scheduler_worker.py index 67aad1e..9cca632 100644 --- a/tests/damast/ml/test_scheduler_worker.py +++ b/tests/damast/ml/test_scheduler_worker.py @@ -42,8 +42,11 @@ def job(tmp_path): def worker_process(): # Ensure that no previous worker job is running for proc in psutil.process_iter(): - if "run_test_worker" in proc.name(): - proc.kill() + try: + if "run_test_worker" in proc.name(): + proc.kill() + except psutil.NoSuchProcess: + pass run_test_worker = Path(__file__).parent / "run_test_worker.py" assert run_test_worker.exists() diff --git a/tests/damast/utils/test_io.py b/tests/damast/utils/test_io.py index d9e5f0c..681a475 100644 --- a/tests/damast/utils/test_io.py +++ b/tests/damast/utils/test_io.py @@ -5,14 +5,15 @@ import pytest from damast.core.dataframe import DAMAST_SPEC_SUFFIX, AnnotatedDataFrame -from damast.utils.io import Archive +from damast.utils.io import Archive, ArchiveBackend -@pytest.mark.skipif(sys.platform.startswith("win"), reason="ratarmount does not (easily) run on windows") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="ratarmount does not (easily) run on windows - zipfile backend should be used") +@pytest.mark.skipif(sys.platform.startswith("darwin"), reason="ratarmount does not run on macos - zipfile backend should be used") @pytest.mark.parametrize("filename, spec_filename", [ ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] ]) -def test_archive(data_path, filename, spec_filename, tmp_path): +def test_archive_ratarmount(data_path, filename, spec_filename, tmp_path): output_zip = tmp_path / f"{Path(filename)}.zip" with ZipFile(output_zip, 'w') as f: f.write(str(data_path / filename), arcname=filename) @@ -22,7 +23,7 @@ def test_archive(data_path, filename, spec_filename, tmp_path): assert Path(output_zip).exists() # default no filter - with Archive(filenames=[output_zip]) as input_files: + with Archive(filenames=[output_zip], backend=ArchiveBackend.RATARMOUNT) as input_files: assert len(input_files) == 2 filenames = [x.name for x in input_files] @@ -31,9 +32,35 @@ def test_archive(data_path, filename, spec_filename, tmp_path): assert spec_filename in filenames # permit only supported files - with Archive(filenames=[output_zip], filter_fn = lambda x : AnnotatedDataFrame.get_supported_format(Path(x).suffix) is None) as input_files: + with Archive(filenames=[output_zip], filter_fn = lambda x : AnnotatedDataFrame.get_supported_format(Path(x).suffix) is None, backend=ArchiveBackend.RATARMOUNT) as input_files: assert len(input_files) == 1 assert filename in [x.name for x in input_files] +@pytest.mark.parametrize("filename, spec_filename", [ + ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] +]) +def test_archive_zipfile(data_path, filename, spec_filename, tmp_path): + output_zip = tmp_path / f"{Path(filename)}.zip" + with ZipFile(output_zip, 'w') as f: + f.write(str(data_path / filename), arcname=filename) + f.write(str(data_path / spec_filename), arcname=spec_filename) + + + assert Path(output_zip).exists() + + # default no filter + with Archive(filenames=[output_zip], backend=ArchiveBackend.ZIPFILE) as input_files: + assert len(input_files) == 2 + + filenames = [x.name for x in input_files] + + assert filename in filenames + assert spec_filename in filenames + + # permit only supported files + with Archive(filenames=[output_zip], filter_fn = lambda x : AnnotatedDataFrame.get_supported_format(Path(x).suffix) is None, backend=ArchiveBackend.ZIPFILE) as input_files: + assert len(input_files) == 1 + + assert filename in [x.name for x in input_files] diff --git a/tests/examples/03-ais-trajectory-prediction/trp.py b/tests/examples/03-ais-trajectory-prediction/trp.py new file mode 100644 index 0000000..c5284cc --- /dev/null +++ b/tests/examples/03-ais-trajectory-prediction/trp.py @@ -0,0 +1,156 @@ +import os +import tempfile +from collections import OrderedDict +from pathlib import Path + +import numpy as np + +import damast +from damast.core.dataframe import AnnotatedDataFrame +from damast.core.dataprocessing import DataProcessingPipeline, PipelineElement +from damast.core.datarange import MinMax +from damast.core.metadata import MetaData +from damast.core.transformations import MultiCycleTransformer +from damast.core.units import units +from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec +from damast.ml import keras +from damast.ml.experiments import ( + Experiment, + ForecastTask, + ModelInstanceDescription, + TrainingParameters, +) +from damast.ml.models.base import BaseModel + +os.environ["COLUMNS"] = '120' + +tmp_path = Path(tempfile.gettempdir()) / "damast-example-03" +tmp_path.mkdir(parents=True, exist_ok=True) + + +class TransformerA(PipelineElement): + @damast.input({"x": {}}) + @damast.output({"x": {}}) + def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame: + return df + +class LatLonTransformer(PipelineElement): + @damast.core.describe("Lat/Lon cyclic transformation") + @damast.core.input({ + "lat": {"unit": units.deg}, + "lon": {"unit": units.deg} + }) + @damast.core.output({ + "lat_x": {"value_range": MinMax(-1.0, 1.0)}, + "lat_y": {"value_range": MinMax(-1.0, 1.0)}, + "lon_x": {"value_range": MinMax(-1.0, 1.0)}, + "lon_y": {"value_range": MinMax(-1.0, 1.0)} + }) + def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame: + lat_cyclic_transformer = MultiCycleTransformer(features=["lat"], n=180.0) + lon_cyclic_transformer = MultiCycleTransformer(features=["lon"], n=360.0) + + _df = lat_cyclic_transformer.fit_transform(df=df) + _df = lon_cyclic_transformer.fit_transform(df=_df) + return _df + + +class Baseline(BaseModel): + input_specs = OrderedDict({ + "lat_x": {"length": 1}, + "lat_y": {"length": 1}, + "lon_x": {"length": 1}, + "lon_y": {"length": 1} + }) + + output_specs = OrderedDict({ + "lat_x": {"length": 1}, + "lat_y": {"length": 1}, + "lon_x": {"length": 1}, + "lon_y": {"length": 1} + }) + + def __init__(self, + features: list[str], + timeline_length: int, + output_dir: Path, + name: str = "Baseline", + targets: list[str] | None = None): + self.timeline_length = timeline_length + + super().__init__(name=name, + output_dir=output_dir, + features=features, + targets=targets) + + def _init_model(self): + features_width = len(self.features) + targets_width = len(self.targets) + + self.model = keras.models.Sequential([ + keras.layers.Flatten(input_shape=[self.timeline_length, features_width]), + keras.layers.Dense(targets_width) + ]) + +class BaselineA(Baseline): + pass + +class BaselineB(Baseline): + pass + + +pipeline = DataProcessingPipeline(name="ais_preparation", + base_dir=tmp_path) \ + .add("cyclic", LatLonTransformer()) + +features = ["lat_x", "lat_y", "lon_x", "lon_y"] + +data = AISTestData(1000) +adf = AnnotatedDataFrame(dataframe=data.dataframe, + metadata=MetaData.from_dict(data=AISTestDataSpec.copy())) + +dataset_filename = tmp_path / "test.parquet" +adf.save(filename=dataset_filename) + + +# TRAINING (including data preprocessing) +forecast_task = ForecastTask( + label="forecast-ais-short-sequence", + pipeline=pipeline, features=features, + models=[ModelInstanceDescription(BaselineA, {})], + group_column="mmsi", + sequence_length=5, + forecast_length=1, + training_parameters=TrainingParameters(epochs=1, + validation_steps=1) +) + +experiment = Experiment(learning_task=forecast_task, + input_data=dataset_filename, + output_directory=tmp_path) +report = experiment.run() +print(report) + + +# RELOAD MODEL (one of the trained ones) +models = Experiment.from_directory(report.parent) +current_model = models["BaselineA-forecast-ais-short-sequence"] + +# FORECAST +# .. reusing initial dataset here for simplicity +processed_data = AnnotatedDataFrame.from_file(dataset_filename) +input_adf = pipeline.transform(processed_data) + +# creating input features +from damast.data_handling.accessors import SequenceIterator # noqa + +sta = SequenceIterator(df=input_adf) +gen_predict = sta.to_keras_generator(features=features, target=features, sequence_length=5) + +data = next(gen_predict) +X, y = data +input_data = X[np.newaxis, :, :] +predicted_sequence = current_model.predict(input_data, steps=1, verbose=0) + +print(f"Input Sequence: {input_data}") +print(f"Predicted Sequence: {predicted_sequence}")