From b34866c0472aeb4f1a78a5d3bbf6b2ef20e4e7f2 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Thu, 19 Feb 2026 14:10:10 +0100 Subject: [PATCH 01/18] test: add trajectory prediction example --- .../03-ais-trajectory-prediction/trp.py | 193 ++++++++++++++++++ 1 file changed, 193 insertions(+) create mode 100644 tests/examples/03-ais-trajectory-prediction/trp.py diff --git a/tests/examples/03-ais-trajectory-prediction/trp.py b/tests/examples/03-ais-trajectory-prediction/trp.py new file mode 100644 index 0000000..04b1579 --- /dev/null +++ b/tests/examples/03-ais-trajectory-prediction/trp.py @@ -0,0 +1,193 @@ +import os +from collections import OrderedDict +from pathlib import Path +import numpy as np + +import keras +import pytest + +import damast +from damast.core.dataframe import AnnotatedDataFrame +from damast.core.dataprocessing import DataProcessingPipeline, PipelineElement +from damast.core.datarange import MinMax +from damast.core.metadata import MetaData +from damast.core.transformations import MultiCycleTransformer +from damast.core.units import units +from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec +from damast.ml.experiments import ( + Experiment, + ForecastTask, + LearningTask, + ModelInstanceDescription, + TrainingParameters, + ) +from damast.ml.models.base import BaseModel +import tempfile + +os.environ["COLUMNS"] = '120' + + +tmp_path = Path(tempfile.gettempdir()) / "damast-example-03" +tmp_path.mkdir(parents=True, exist_ok=True) + + +class ModelA(BaseModel): + def __init_model(self): + pass + + +class TransformerA(PipelineElement): + @damast.input({"x": {}}) + @damast.output({"x": {}}) + def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame: + return df + +class LatLonTransformer(PipelineElement): + @damast.core.describe("Lat/Lon cyclic transformation") + @damast.core.input({ + "lat": {"unit": units.deg}, + "lon": {"unit": units.deg} + }) + @damast.core.output({ + "lat_x": {"value_range": MinMax(-1.0, 1.0)}, + "lat_y": {"value_range": MinMax(-1.0, 1.0)}, + "lon_x": {"value_range": MinMax(-1.0, 1.0)}, + "lon_y": {"value_range": MinMax(-1.0, 1.0)} + }) + def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame: + lat_cyclic_transformer = MultiCycleTransformer(features=["lat"], n=180.0) + lon_cyclic_transformer = MultiCycleTransformer(features=["lon"], n=360.0) + + _df = lat_cyclic_transformer.fit_transform(df=df) + _df = lon_cyclic_transformer.fit_transform(df=_df) + return _df + + +class SimpleModel(BaseModel): + input_specs = OrderedDict({ + "a": {"length": 1}, + "b": {"length": 1}, + "c": {"length": 1}, + "d": {"length": 1} + }) + + output_specs = OrderedDict({ + "a": {"length": 1}, + "b": {"length": 1}, + "c": {"length": 1}, + "d": {"length": 1} + }) + + def __init__(self, output_dir: str | Path): + super().__init__(features=["a", "b", "c", "d"], + targets=["a", "b", "c", "d"], + output_dir=output_dir) + + def _init_model(self): + inputs = keras.Input(shape=(4,), + name="input", + dtype=float) + outputs = keras.layers.Dense(4)(inputs) + + self.model = keras.Model(inputs=inputs, + outputs=outputs, + name=self.__class__.__name__) + +class Baseline(BaseModel): + input_specs = OrderedDict({ + "lat_x": {"length": 1}, + "lat_y": {"length": 1}, + "lon_x": {"length": 1}, + "lon_y": {"length": 1} + }) + + output_specs = OrderedDict({ + "lat_x": {"length": 1}, + "lat_y": {"length": 1}, + "lon_x": {"length": 1}, + "lon_y": {"length": 1} + }) + + def __init__(self, + features: list[str], + timeline_length: int, + output_dir: Path, + name: str = "Baseline", + targets: list[str] | None = None): + self.timeline_length = timeline_length + + super().__init__(name=name, + output_dir=output_dir, + features=features, + targets=targets) + + def _init_model(self): + features_width = len(self.features) + targets_width = len(self.targets) + + self.model = keras.models.Sequential([ + keras.layers.Flatten(input_shape=[self.timeline_length, features_width]), + keras.layers.Dense(targets_width) + ]) + +class BaselineA(Baseline): + pass + +class BaselineB(Baseline): + pass + + +pipeline = DataProcessingPipeline(name="ais_preparation", + base_dir=tmp_path) \ + .add("cyclic", LatLonTransformer()) + +features = ["lat_x", "lat_y", "lon_x", "lon_y"] + +data = AISTestData(1000) +adf = AnnotatedDataFrame(dataframe=data.dataframe, + metadata=MetaData.from_dict(data=AISTestDataSpec.copy())) + +dataset_filename = tmp_path / "test.parquet" +adf.save(filename=dataset_filename) + + +# TRAINING (including data preprocessing) +forecast_task = ForecastTask( + label="forecast-ais-short-sequence", + pipeline=pipeline, features=features, + models=[ModelInstanceDescription(BaselineA, {})], + group_column="mmsi", + sequence_length=5, + forecast_length=1, + training_parameters=TrainingParameters(epochs=1, + validation_steps=1) +) + +experiment = Experiment(learning_task=forecast_task, + input_data=dataset_filename, + output_directory=tmp_path) +report = experiment.run() +print(report) + + +# RELOAD MODEL (one of the trained ones) +models = Experiment.from_directory(report.parent) +current_model = models["BaselineA-forecast-ais-short-sequence"] + +# FORECAST +# .. reusing initial dataset here for simplicity +processed_data = AnnotatedDataFrame.from_file(dataset_filename) +input_adf = pipeline.transform(processed_data) + +# creating input features +from damast.data_handling.accessors import SequenceIterator +sta = SequenceIterator(df=input_adf) +gen_predict = sta.to_keras_generator(features=features, target=features, sequence_length=5) + +data = next(gen_predict) +X, y = data +input_data = X[np.newaxis, :, :] +predicted_sequence = current_model.predict(input_data, steps=1, verbose=0) + +print(f"Input Sequence: {input_data}") +print(f"Predicted Sequence: {predicted_sequence}") From 3db7eb11eb4bb1a06d2d9ce69e2154d1cc322c61 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Mon, 23 Feb 2026 15:00:11 +0100 Subject: [PATCH 02/18] Add missing dependencies --- pyproject.toml | 2 ++ tests/examples/03-ais-trajectory-prediction/trp.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 96a0b7b..83cd2da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,11 +44,13 @@ dependencies = [ "networkx", "numba", "numpy>=2", + "pandas", "polars>=1.36.1", "psutil", "pyais", "pyarrow", "pydantic>=2.0", + "pydot", "ratarmount>=1.1", "scikit-learn", "tables", diff --git a/tests/examples/03-ais-trajectory-prediction/trp.py b/tests/examples/03-ais-trajectory-prediction/trp.py index 04b1579..852ca6a 100644 --- a/tests/examples/03-ais-trajectory-prediction/trp.py +++ b/tests/examples/03-ais-trajectory-prediction/trp.py @@ -4,7 +4,6 @@ import numpy as np import keras -import pytest import damast from damast.core.dataframe import AnnotatedDataFrame From 1aa0a7fa4ecebf6bfbc64cd7278a674bc06579ff Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Mon, 23 Feb 2026 15:44:07 +0100 Subject: [PATCH 03/18] ci: enable macos and python versions up to 3.14 --- .github/workflows/test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0304133..f8c2f5e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,9 +16,9 @@ jobs: test: strategy: matrix: - os: [ubuntu-latest, windows-latest] # 'macos-latest' failing on github-ci with memory error + os: [ubuntu-latest, windows-latest, macos-latest] # failing on github-ci with memory error backend: ['torch','jax'] # 'tensorflow' is excluded since on github-ci tests get stuck for a yet unknown reason - python-version: ['3.10','3.11','3.12'] + python-version: ['3.10','3.11','3.12', '3.13', '3.14'] runs-on: ${{ matrix.os }} steps: - name: Remove unnecessary tools (Linux) From b52a8a3f1eb84372c420ffe931017dc4ed1bc0a1 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Mon, 23 Feb 2026 15:45:37 +0100 Subject: [PATCH 04/18] tests: example 03 - set backend to torch --- .../03-ais-trajectory-prediction/trp.py | 37 +------------------ 1 file changed, 2 insertions(+), 35 deletions(-) diff --git a/tests/examples/03-ais-trajectory-prediction/trp.py b/tests/examples/03-ais-trajectory-prediction/trp.py index 852ca6a..6d72e2b 100644 --- a/tests/examples/03-ais-trajectory-prediction/trp.py +++ b/tests/examples/03-ais-trajectory-prediction/trp.py @@ -25,16 +25,13 @@ os.environ["COLUMNS"] = '120' +# Explicitly setting backend to torch +keras.config.set_backend('torch') tmp_path = Path(tempfile.gettempdir()) / "damast-example-03" tmp_path.mkdir(parents=True, exist_ok=True) -class ModelA(BaseModel): - def __init_model(self): - pass - - class TransformerA(PipelineElement): @damast.input({"x": {}}) @damast.output({"x": {}}) @@ -62,36 +59,6 @@ def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame: return _df -class SimpleModel(BaseModel): - input_specs = OrderedDict({ - "a": {"length": 1}, - "b": {"length": 1}, - "c": {"length": 1}, - "d": {"length": 1} - }) - - output_specs = OrderedDict({ - "a": {"length": 1}, - "b": {"length": 1}, - "c": {"length": 1}, - "d": {"length": 1} - }) - - def __init__(self, output_dir: str | Path): - super().__init__(features=["a", "b", "c", "d"], - targets=["a", "b", "c", "d"], - output_dir=output_dir) - - def _init_model(self): - inputs = keras.Input(shape=(4,), - name="input", - dtype=float) - outputs = keras.layers.Dense(4)(inputs) - - self.model = keras.Model(inputs=inputs, - outputs=outputs, - name=self.__class__.__name__) - class Baseline(BaseModel): input_specs = OrderedDict({ "lat_x": {"length": 1}, From 7c8e9e75f889f6715fed4fe517116cfd71e4acb4 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Mon, 23 Feb 2026 15:53:08 +0100 Subject: [PATCH 05/18] ci: install only required backend --- .github/workflows/test.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f8c2f5e..b9228a4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -38,7 +38,9 @@ jobs: run: python -m pip install -U pip - name: Install package - run: python -m pip install -e .[ml,dev,test] + run: | + python -m pip install -e .[dev,test] + python -m pip install ${{ matrix.backend }} # https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/accessing-contextual-information-about-workflow-runs#runner-context - name: Set Env (Windows) From 5c1f0926d489ca833695a6851987b205a77be887 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Mon, 23 Feb 2026 16:39:22 +0100 Subject: [PATCH 06/18] ci: macos: setup brew and install zstd --- .github/workflows/test.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b9228a4..2a77ff5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -59,6 +59,13 @@ jobs: run: | echo "KERAS_BACKEND=${{ matrix.backend }}" >> $GITHUB_ENV echo "PYTORCH_MPS_HIGH_WATERMARK_RATIO=0.0" >> $GITHUB_ENV + if [ -d "/opt/homebrew/bin" ]; then + echo "/opt/homebrew/bin" >> $GITHUB_PATH + elif [ -d "/usr/local/bin" ]; then + echo "/usr/local/bin" >> $GITHUB_PATH + fi + brew install zstd + # Run keras backend specific tests (for keras>=3) # Due to space issues avoid creating too many venvs (via tox) From 3eea91e67ecbefc293fea47fed52cdd16e172c01 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Mon, 23 Feb 2026 16:55:56 +0100 Subject: [PATCH 07/18] Install tensorflow only when available --- pyproject.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 83cd2da..602511e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,8 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "License :: OSI Approved :: BSD License", ] @@ -83,7 +85,7 @@ ml = [ # BEGIN keras backends "torch", "jax[cpu]", - "tensorflow" + "tensorflow; python_version <= '3.13'" # END backends ] From 3f64934a5185b8801f572d2035c665431703b512 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Mon, 23 Feb 2026 17:25:01 +0100 Subject: [PATCH 08/18] Update dependencies to latest version of ratarmount --- pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 602511e..e9e15f1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ classifiers = [ dependencies = [ "astropy", "cloudpickle", + "indexed_zstd>=1.6.1", "iso8601;python_version<'3.11'", "matplotlib", "networkx", @@ -53,7 +54,7 @@ dependencies = [ "pyarrow", "pydantic>=2.0", "pydot", - "ratarmount>=1.1", + "ratarmount>=1.2.1", "scikit-learn", "tables", "tqdm", From 685ae6366d035d23f152f767aafd7ea1805abd4a Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Mon, 23 Feb 2026 23:08:21 +0100 Subject: [PATCH 09/18] ml: enable autoloading of keras backends --- src/damast/data_handling/accessors.py | 1 + src/damast/ml/__init__.py | 34 +++++++++++++++++++ src/damast/ml/experiments.py | 2 +- src/damast/ml/models/base.py | 2 +- tests/damast/ml/test_experiments.py | 3 +- .../03-ais-trajectory-prediction/trp.py | 8 ++--- 6 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/damast/data_handling/accessors.py b/src/damast/data_handling/accessors.py index dcc0559..83cbfa7 100644 --- a/src/damast/data_handling/accessors.py +++ b/src/damast/data_handling/accessors.py @@ -7,6 +7,7 @@ import time from typing import Any, List, Optional, Union +from damast.ml import keras import keras.utils import numpy as np import pandas as pd diff --git a/src/damast/ml/__init__.py b/src/damast/ml/__init__.py index e69de29..f5be018 100644 --- a/src/damast/ml/__init__.py +++ b/src/damast/ml/__init__.py @@ -0,0 +1,34 @@ +import importlib +import os + +import logging + +logger = logging.getLogger(__name__) + +# To ensure that autoloading works as expected +# from damast.ml import keras + +if "KERAS_BACKEND" in os.environ: + backend = os.environ['KERAS_BACKEND'] + if not backend_available(backend): + raise RuntimeError("Keras backend: {backend} is not available") + +def backend_available(backend: str) -> bool: + try: + importlib.import_module(backend) + return True + except ImportError as e: + return False + +def autodiscover_backend(priority: list[str] = ["torch", "tensorflow", "jax"]): + for backend in priority: + if backend_available(backend): + logger.warning(f"Autoloading backend: {backend} - to explicity select backend set env variable KERAS_BACKEND") + os.environ['KERAS_BACKEND'] = backend + break + else: + logger.info(f"Ignoring backend: {backend}, library is not available") + +autodiscover_backend() + +import keras diff --git a/src/damast/ml/experiments.py b/src/damast/ml/experiments.py index f9cb83d..ebb6afb 100644 --- a/src/damast/ml/experiments.py +++ b/src/damast/ml/experiments.py @@ -12,7 +12,7 @@ from pathlib import Path from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Union -import keras +from damast.ml import keras import numpy as np import polars as pl import yaml diff --git a/src/damast/ml/models/base.py b/src/damast/ml/models/base.py index 9f929a2..5c54a18 100644 --- a/src/damast/ml/models/base.py +++ b/src/damast/ml/models/base.py @@ -8,7 +8,7 @@ from tempfile import gettempdir from typing import ClassVar, NamedTuple, OrderedDict -import keras +from damast.ml import keras import keras.callbacks import keras.utils import pandas as pd diff --git a/tests/damast/ml/test_experiments.py b/tests/damast/ml/test_experiments.py index 9406a79..f837d28 100644 --- a/tests/damast/ml/test_experiments.py +++ b/tests/damast/ml/test_experiments.py @@ -2,7 +2,6 @@ from collections import OrderedDict from pathlib import Path -import keras import pytest import damast @@ -13,6 +12,8 @@ from damast.core.transformations import MultiCycleTransformer from damast.core.units import units from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec + +from damast.ml import keras from damast.ml.experiments import ( Experiment, ForecastTask, diff --git a/tests/examples/03-ais-trajectory-prediction/trp.py b/tests/examples/03-ais-trajectory-prediction/trp.py index 6d72e2b..77c5b83 100644 --- a/tests/examples/03-ais-trajectory-prediction/trp.py +++ b/tests/examples/03-ais-trajectory-prediction/trp.py @@ -3,8 +3,6 @@ from pathlib import Path import numpy as np -import keras - import damast from damast.core.dataframe import AnnotatedDataFrame from damast.core.dataprocessing import DataProcessingPipeline, PipelineElement @@ -13,6 +11,9 @@ from damast.core.transformations import MultiCycleTransformer from damast.core.units import units from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec + +from damast.ml import keras +import keras from damast.ml.experiments import ( Experiment, ForecastTask, @@ -25,9 +26,6 @@ os.environ["COLUMNS"] = '120' -# Explicitly setting backend to torch -keras.config.set_backend('torch') - tmp_path = Path(tempfile.gettempdir()) / "damast-example-03" tmp_path.mkdir(parents=True, exist_ok=True) From 7fb3d39cd16c5276b0af4a060318e4b9b0deb140 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Tue, 24 Feb 2026 23:28:00 +0100 Subject: [PATCH 10/18] ml: update backend autodiscovery --- src/damast/ml/__init__.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/damast/ml/__init__.py b/src/damast/ml/__init__.py index f5be018..e0d9c94 100644 --- a/src/damast/ml/__init__.py +++ b/src/damast/ml/__init__.py @@ -7,12 +7,6 @@ # To ensure that autoloading works as expected # from damast.ml import keras - -if "KERAS_BACKEND" in os.environ: - backend = os.environ['KERAS_BACKEND'] - if not backend_available(backend): - raise RuntimeError("Keras backend: {backend} is not available") - def backend_available(backend: str) -> bool: try: importlib.import_module(backend) @@ -29,6 +23,14 @@ def autodiscover_backend(priority: list[str] = ["torch", "tensorflow", "jax"]): else: logger.info(f"Ignoring backend: {backend}, library is not available") -autodiscover_backend() +if "KERAS_BACKEND" in os.environ: + backend = os.environ['KERAS_BACKEND'] + if not backend_available(backend): + raise RuntimeError( + "Keras backend: select backend '{backend}' is not available. " + "Please install the required package(s)." + ) +else: + autodiscover_backend() import keras From e17a0bb444642ab889775ecedc4ddd6216b8ef5f Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Thu, 26 Feb 2026 10:24:56 +0100 Subject: [PATCH 11/18] test: use assert_approx for annotation comparison --- tests/damast/cli/test_cli.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tests/damast/cli/test_cli.py b/tests/damast/cli/test_cli.py index 24d4d06..1e32474 100644 --- a/tests/damast/cli/test_cli.py +++ b/tests/damast/cli/test_cli.py @@ -85,7 +85,24 @@ def test_annotate(data_path, filename, spec_filename, tmp_path, script_runner): with open(data_path / spec_filename, "r") as f: expected_spec = yaml.load(f, Loader=yaml.SafeLoader) expected_spec["annotations"]["source"] = [str(data_path / filename)] - assert written_spec == expected_spec + + assert written_spec['annotations'] == expected_spec['annotations'] + + written_columns = written_spec['columns'] + expected_columns = expected_spec['columns'] + + for idx, column in enumerate(written_columns): + for field in column: + if type(field) is float: + column.assert_approx(expected_columns[idx][field]) + elif type(field) is dict: + for subfield in field: + expected_subfield_value = expected_columns[idx][field][subfield] + written_subfield_value = field[subfield] + expected_subfield_value.assert_approx(written_subfield_value) + else: + column == expected_columns[idx][field] + @pytest.mark.parametrize("filename, spec_filename", [ ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] From 4e7e6a60a08e8aaf7ab3a4ae9f5554d298a40b46 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Fri, 27 Feb 2026 21:42:30 +0100 Subject: [PATCH 12/18] io: archive: do not use ratarmount on macos, but fallback to zipfile usage --- pyproject.toml | 3 +- src/damast/utils/io.py | 102 ++++++++++++++++++++++++++-------- tests/damast/cli/test_cli.py | 36 ++++++------ tests/damast/utils/test_io.py | 36 ++++++++++-- 4 files changed, 129 insertions(+), 48 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e9e15f1..5c2aa52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,6 @@ classifiers = [ dependencies = [ "astropy", "cloudpickle", - "indexed_zstd>=1.6.1", "iso8601;python_version<'3.11'", "matplotlib", "networkx", @@ -54,7 +53,7 @@ dependencies = [ "pyarrow", "pydantic>=2.0", "pydot", - "ratarmount>=1.2.1", + "ratarmount>=1.2.1; platform_system != 'Darwin'", "scikit-learn", "tables", "tqdm", diff --git a/src/damast/utils/io.py b/src/damast/utils/io.py index 732ea52..c23698c 100644 --- a/src/damast/utils/io.py +++ b/src/damast/utils/io.py @@ -1,4 +1,6 @@ +from enum import Enum import logging +import io import shutil import subprocess import tempfile @@ -6,18 +8,18 @@ import warnings from pathlib import Path from typing import Callable, ClassVar +import zipfile from damast.core.constants import DAMAST_MOUNT_PREFIX -DAMAST_ARCHIVE_SUPPORT_AVAILABLE = False -try: - from ratarmountcore.compressions import ARCHIVE_FORMATS, COMPRESSION_FORMATS - DAMAST_ARCHIVE_SUPPORT_AVAILABLE = True -except Exception: - warnings.warn("ratarmount could not be loaded: archive support is not available") - logger = logging.getLogger(__name__) + +class ArchiveBackend(str, Enum): + ZIPFILE = 'zipfile' + RATARMOUNT = 'ratarmount' + + class Archive: """ Class to wrap and extract archive objects using ratarmount @@ -28,25 +30,48 @@ class Archive: _extracted_files: list[str] _mounted_dirs: list[Path] - _supported_suffixes: ClassVar[list[str]] = None + _supported_suffixes: list[str] = None + _backend: ArchiveBackend = None - @classmethod - def supported_suffixes(cls): + def autoload_backend(self): + try: + from ratarmountcore.compressions import ARCHIVE_FORMATS, COMPRESSION_FORMATS #noqa + self._backend = ArchiveBackend.RATARMOUNT + except Exception: + warnings.warn("ratarmount could not be loaded: falling back to zipfile-based support") + self._backend = ArchiveBackend.ZIPFILE + + def supported_suffixes(self): """ Get the list of suffixes for archives and compressed files which are supported """ - if not DAMAST_ARCHIVE_SUPPORT_AVAILABLE: - return [] + if not self._backend: + raise RuntimeError("Archive.supported_suffixes: ensure that backend is set, e.g., call 'autoload_backend' first") + + fn_name = f"supported_suffixes_{self._backend.value}" + if not hasattr(self, fn_name): + raise RuntimeError(f"Missing implementation for {fn_name}") + + return getattr(self, fn_name)() + + def supported_suffixes_zipfile(self): + if self._supported_suffixes is not None: + return self._supported_suffixes - if cls._supported_suffixes is not None: - return cls._supported_suffixes + self._supported_suffixes = ["zip"] + return self._supported_suffixes - cls._supported_suffixes = [] + def supported_suffixes_ratarmount(self): + if self._supported_suffixes is not None: + return self._supported_suffixes + + from ratarmountcore.compressions import ARCHIVE_FORMATS, COMPRESSION_FORMATS # noqa + self._supported_suffixes = [] for k, v in COMPRESSION_FORMATS.items(): - cls._supported_suffixes += v.extensions + self._supported_suffixes += v.extensions for k, v in ARCHIVE_FORMATS.items(): - cls._supported_suffixes += v.extensions - return cls._supported_suffixes + self._supported_suffixes += v.extensions + return self._supported_suffixes def __enter__(self) -> list[str]: """ @@ -65,7 +90,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __init__(self, filenames: list[str], - filter_fn: Callable[[str], bool] | None = None): + filter_fn: Callable[[str], bool] | None = None, + backend: ArchiveBackend = None + ): + if backend: + self._backend = backend + elif self._backend is None: + self.autoload_backend() + self.filenames = sorted(filenames) if filter_fn is None: @@ -76,11 +108,11 @@ def __init__(self, self._mounted_dirs = [] self._extracted_files = [] - def ratarmount(self, file, target): + def mount_ratarmount(self, file, target): """ Call ratarmount to mount and archive """ - if not DAMAST_ARCHIVE_SUPPORT_AVAILABLE: + if not self._backend == ArchiveBackend.RATARMOUNT: raise RuntimeError("damast.utils.io.Archive: " "cannot load archive." " 'ratarmount' support is not available." @@ -92,6 +124,27 @@ def ratarmount(self, file, target): self._mounted_dirs.append(target) + def mount_zipfile(self, file, target): + """ + Use zipfile to mount and archive + """ + with zipfile.ZipFile(file, "r") as f: + for file_in_zip in f.namelist(): + if Path(file_in_zip).suffix != ".zip": + f.extract(file_in_zip, target) + continue + + dirname = Path(file_in_zip).parent + extract_dir = target / dirname + extract_dir.mkdir(parents=True, exist_ok=True) + + # read inner zip file into bytes buffer + zip_content = io.BytesIO(f.read(file_in_zip)) + inner_zip_file = zipfile.ZipFile(zip_content) + for i in inner_zip_file.namelist(): + inner_zip_file.extract(i, extract_dir) + + self._mounted_dirs.append(target) def umount(self): """ @@ -121,12 +174,13 @@ def mount(self) -> list[str]: local_mount = tempfile.mkdtemp(prefix=DAMAST_MOUNT_PREFIX) for file in self.filenames: - if Path(file).suffix[1:] in Archive.supported_suffixes(): + if Path(file).suffix[1:] in self.supported_suffixes(): logger.info(f"Archive.mount: found archive: {file}") target_mount = Path(local_mount) / Path(file).name target_mount.mkdir(parents=True, exist_ok=True) - - self.ratarmount(file, target_mount) + + fn = getattr(self, f"mount_{self._backend.value}") + fn(file, target_mount) decompressed_files = [x for x in Path(target_mount).glob("**/*") if Path(x).is_file()] for idx, x in enumerate(decompressed_files): diff --git a/tests/damast/cli/test_cli.py b/tests/damast/cli/test_cli.py index 1e32474..d3d414a 100644 --- a/tests/damast/cli/test_cli.py +++ b/tests/damast/cli/test_cli.py @@ -147,24 +147,24 @@ def test_convert_zip(data_path, filename, spec_filename, tmp_path, script_runner assert result.returncode == 0 assert output_file.exists() -@pytest.mark.parametrize("filename, spec_filename", [ - ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] -]) -def test_fail_convert_zip(data_path, filename, spec_filename, tmp_path, script_runner, monkeypatch): - import damast - monkeypatch.setattr(damast.utils.io, "DAMAST_ARCHIVE_SUPPORT_AVAILABLE", False) - - output_zip = tmp_path / f"{Path(filename)}.zip" - with ZipFile(output_zip, 'w') as f: - f.write(str(data_path / filename), arcname=filename) - f.write(str(data_path / spec_filename), arcname=spec_filename) - - assert Path(output_zip).exists() - output_file = Path(tmp_path) / (Path(filename).stem + ".parquet") - - result = script_runner.run(['damast', 'convert', '-f', output_zip, '--output-dir', tmp_path]) - assert result.returncode == 1 - assert not output_file.exists() +#@pytest.mark.parametrize("filename, spec_filename", [ +# ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] +#]) +#def test_fail_convert_zip(data_path, filename, spec_filename, tmp_path, script_runner, monkeypatch): +# import damast +# monkeypatch.setattr(damast.utils.io, "DAMAST_ARCHIVE_SUPPORT_AVAILABLE", False) +# +# output_zip = tmp_path / f"{Path(filename)}.zip" +# with ZipFile(output_zip, 'w') as f: +# f.write(str(data_path / filename), arcname=filename) +# f.write(str(data_path / spec_filename), arcname=spec_filename) +# +# assert Path(output_zip).exists() +# output_file = Path(tmp_path) / (Path(filename).stem + ".parquet") +# +# result = script_runner.run(['damast', 'convert', '-f', output_zip, '--output-dir', tmp_path]) +# assert result.returncode == 1 +# assert not output_file.exists() @pytest.mark.skipif(sys.platform.startswith("win"), reason="ratarmount does not (easily) run on windows") diff --git a/tests/damast/utils/test_io.py b/tests/damast/utils/test_io.py index d9e5f0c..6b91c87 100644 --- a/tests/damast/utils/test_io.py +++ b/tests/damast/utils/test_io.py @@ -6,13 +6,15 @@ from damast.core.dataframe import DAMAST_SPEC_SUFFIX, AnnotatedDataFrame from damast.utils.io import Archive +from damast.utils.io import ArchiveBackend -@pytest.mark.skipif(sys.platform.startswith("win"), reason="ratarmount does not (easily) run on windows") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="ratarmount does not (easily) run on windows - zipfile backend should be used") +@pytest.mark.skipif(sys.platform.startswith("dwarwin"), reason="ratarmount does not run on macos - zipfile backend should be used") @pytest.mark.parametrize("filename, spec_filename", [ ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] ]) -def test_archive(data_path, filename, spec_filename, tmp_path): +def test_archive_ratarmount(data_path, filename, spec_filename, tmp_path): output_zip = tmp_path / f"{Path(filename)}.zip" with ZipFile(output_zip, 'w') as f: f.write(str(data_path / filename), arcname=filename) @@ -22,7 +24,7 @@ def test_archive(data_path, filename, spec_filename, tmp_path): assert Path(output_zip).exists() # default no filter - with Archive(filenames=[output_zip]) as input_files: + with Archive(filenames=[output_zip], backend=ArchiveBackend.RATARMOUNT) as input_files: assert len(input_files) == 2 filenames = [x.name for x in input_files] @@ -31,9 +33,35 @@ def test_archive(data_path, filename, spec_filename, tmp_path): assert spec_filename in filenames # permit only supported files - with Archive(filenames=[output_zip], filter_fn = lambda x : AnnotatedDataFrame.get_supported_format(Path(x).suffix) is None) as input_files: + with Archive(filenames=[output_zip], filter_fn = lambda x : AnnotatedDataFrame.get_supported_format(Path(x).suffix) is None, backend=ArchiveBackend.RATARMOUNT) as input_files: assert len(input_files) == 1 assert filename in [x.name for x in input_files] +@pytest.mark.parametrize("filename, spec_filename", [ + ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] +]) +def test_archive_zipfile(data_path, filename, spec_filename, tmp_path): + output_zip = tmp_path / f"{Path(filename)}.zip" + with ZipFile(output_zip, 'w') as f: + f.write(str(data_path / filename), arcname=filename) + f.write(str(data_path / spec_filename), arcname=spec_filename) + + + assert Path(output_zip).exists() + + # default no filter + with Archive(filenames=[output_zip], backend=ArchiveBackend.ZIPFILE) as input_files: + assert len(input_files) == 2 + + filenames = [x.name for x in input_files] + + assert filename in filenames + assert spec_filename in filenames + + # permit only supported files + with Archive(filenames=[output_zip], filter_fn = lambda x : AnnotatedDataFrame.get_supported_format(Path(x).suffix) is None, backend=ArchiveBackend.ZIPFILE) as input_files: + assert len(input_files) == 1 + + assert filename in [x.name for x in input_files] From 11a774d4ccfe44f6962d0e26746d54bf2dad2d7f Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Fri, 27 Feb 2026 21:44:58 +0100 Subject: [PATCH 13/18] ci: disable mac ratarmount specific elements --- src/damast/utils/io.py | 19 +++++++++++-------- tests/damast/utils/test_io.py | 2 +- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/damast/utils/io.py b/src/damast/utils/io.py index c23698c..232b555 100644 --- a/src/damast/utils/io.py +++ b/src/damast/utils/io.py @@ -150,14 +150,17 @@ def umount(self): """ Umount the archive """ - for mounted_dir in list(reversed(self._mounted_dirs)): - for count in range(0,5): - time.sleep(0.5) - response = subprocess.run(["fusermount", "-u", mounted_dir]) - if response.returncode == 0: - break - else: - logger.debug(f"Retrying to unmount {mounted_dir}") + if self._backend == ArchiveBackend.RATARMOUNT: + for mounted_dir in list(reversed(self._mounted_dirs)): + for count in range(0,5): + time.sleep(0.5) + + response = subprocess.run(["ratarmount", "-u", mounted_dir]) + + if response.returncode == 0: + break + else: + logger.debug(f"Retrying to unmount {mounted_dir}") for mounted_dir in self._mounted_dirs: if Path(mounted_dir).exists(): diff --git a/tests/damast/utils/test_io.py b/tests/damast/utils/test_io.py index 6b91c87..42084b8 100644 --- a/tests/damast/utils/test_io.py +++ b/tests/damast/utils/test_io.py @@ -10,7 +10,7 @@ @pytest.mark.skipif(sys.platform.startswith("win"), reason="ratarmount does not (easily) run on windows - zipfile backend should be used") -@pytest.mark.skipif(sys.platform.startswith("dwarwin"), reason="ratarmount does not run on macos - zipfile backend should be used") +@pytest.mark.skipif(sys.platform.startswith("darwin"), reason="ratarmount does not run on macos - zipfile backend should be used") @pytest.mark.parametrize("filename, spec_filename", [ ["test_ais.csv", f"test_ais{DAMAST_SPEC_SUFFIX}"] ]) From 623257963d7952cf8792913de060f25aab250d5b Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Thu, 26 Feb 2026 16:31:34 +0100 Subject: [PATCH 14/18] darwin: account for MPS limitation and cast float64 down to float32 --- src/damast/data_handling/accessors.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/damast/data_handling/accessors.py b/src/damast/data_handling/accessors.py index 83cbfa7..aed8357 100644 --- a/src/damast/data_handling/accessors.py +++ b/src/damast/data_handling/accessors.py @@ -4,6 +4,7 @@ import logging import random +import sys import time from typing import Any, List, Optional, Union @@ -22,6 +23,16 @@ logger = logging.getLogger("damast") +if sys.platform == "darwin": + # Handle "Cannot convert a MPS Tensor to float64 dtype as the MPS framework doesn't support float64. Please use float32 instead" + def _mps_precision(data): + if data.dtype == np.float64: + return data.astype(np.float32) + return data +else: + def _mps_precision(data): + return data + # https://www.tensorflow.org/tutorials/structured_data/time_series class GroupSequenceAccessor: """ @@ -289,12 +300,12 @@ def _generator(features: List[str], target: Optional[List[str]], # target it the last step in the timeline, so the last target_chunk.append(target_window.to_numpy()) - X = np.array(chunk) + X = _mps_precision(np.array(chunk)) if use_target: if np.lib.NumpyVersion(np.__version__) >= '2.0.0': - y = np.array(target_chunk) + y = _mps_precision(np.array(target_chunk)) else: - y = np.array(target_chunk, copy=False) + y = _mps_precision(np.array(target_chunk, copy=False)) yield (X, y) else: yield (X,) From 6b60453e9cdc1298db2c368f5b3874889f04dc6f Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Fri, 27 Feb 2026 23:52:59 +0100 Subject: [PATCH 15/18] ci: check full matrix job --- .github/workflows/test.yml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2a77ff5..74091a7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,7 +18,10 @@ jobs: matrix: os: [ubuntu-latest, windows-latest, macos-latest] # failing on github-ci with memory error backend: ['torch','jax'] # 'tensorflow' is excluded since on github-ci tests get stuck for a yet unknown reason - python-version: ['3.10','3.11','3.12', '3.13', '3.14'] + # macos-latest: + # - misses support for ratarmount > index_zstd + # - for >=3.13 indexed_zstd/libzstd-seek/zstd-seek.h:20:10: fatal error: zstd.h: No such file or directory + python-version: ['3.10','3.11','3.12','3.13','3.14'] runs-on: ${{ matrix.os }} steps: - name: Remove unnecessary tools (Linux) From 87e10f1f3ed51011499839bd28a8b4c6f491efc4 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Sat, 28 Feb 2026 21:28:31 +0100 Subject: [PATCH 16/18] Update pre-commit --- .pre-commit-config.yaml | 4 +-- docs/examples/damast-process-pipeline.py | 2 ++ src/damast/core/__init__.py | 4 +-- src/damast/core/dataframe.py | 2 +- src/damast/core/decorators.py | 2 +- src/damast/core/polars_dataframe.py | 6 ++--- src/damast/core/transformations.py | 6 ++--- src/damast/data_handling/accessors.py | 10 ++++---- .../data_handling/transformers/__init__.py | 2 +- .../data_handling/transformers/augmenters.py | 2 +- .../domains/maritime/ais/data_generator.py | 2 +- .../domains/maritime/data_processing.py | 2 +- .../maritime/transformers/augmenters.py | 2 +- src/damast/ml/__init__.py | 7 +++-- src/damast/ml/experiments.py | 8 +++--- src/damast/ml/models/base.py | 10 ++++---- src/damast/ml/worker.py | 2 +- src/damast/utils/__init__.py | 2 +- src/damast/utils/io.py | 24 +++++++++++------- test_ais.csv.zip | Bin 0 -> 572 bytes tests/damast/cli/test_cli.py | 3 ++- tests/damast/core/test_dataframe.py | 2 +- tests/damast/core/test_dataprocessing.py | 8 +++--- tests/damast/core/test_metadata.py | 2 +- tests/damast/core/test_processing_graph.py | 2 ++ .../domains/maritime/ais/test_augmenters.py | 5 ++-- .../domains/maritime/ais/test_vessel_types.py | 2 +- .../domains/maritime/test_data_processing.py | 5 +++- tests/damast/ml/test_experiments.py | 12 +++------ tests/damast/utils/test_io.py | 3 +-- .../03-ais-trajectory-prediction/trp.py | 11 ++++---- 31 files changed, 79 insertions(+), 75 deletions(-) create mode 100644 test_ais.csv.zip diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c11a17f..a6602e4 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,14 +7,14 @@ repos: - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.14.3 + rev: v0.15.1 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] # https://pycqa.github.io/isort/docs/configuration/black_compatibility.html - repo: https://github.com/pycqa/isort - rev: 5.12.0 + rev: 8.0.1 hooks: - id: isort args: ["--profile", "black", "--filter-files"] diff --git a/docs/examples/damast-process-pipeline.py b/docs/examples/damast-process-pipeline.py index f6a74ed..64bc881 100644 --- a/docs/examples/damast-process-pipeline.py +++ b/docs/examples/damast-process-pipeline.py @@ -1,3 +1,5 @@ +from pathlib import Path + from damast.core import DataProcessingPipeline from damast.data_handling.transformers import AddDeltaTime from damast.domains.maritime.transformers.features import DeltaDistance, Speed diff --git a/src/damast/core/__init__.py b/src/damast/core/__init__.py index fab709a..20654c6 100644 --- a/src/damast/core/__init__.py +++ b/src/damast/core/__init__.py @@ -15,14 +15,14 @@ describe, input, output, - ) +) from .metadata import ( ArtifactSpecification, DataSpecification, History, MetaData, ValidationMode, - ) +) __all__ = [ "AnnotatedDataFrame", diff --git a/src/damast/core/dataframe.py b/src/damast/core/dataframe.py index 87e1fc3..845843b 100644 --- a/src/damast/core/dataframe.py +++ b/src/damast/core/dataframe.py @@ -20,7 +20,7 @@ DAMAST_CSV_DEFAULT_ARGS, DAMAST_SPEC_SUFFIX, DAMAST_SUPPORTED_FILE_FORMATS, - ) +) from .data_description import ListOfValues, MinMax from .metadata import DataSpecification, MetaData, ValidationMode from .types import DataFrame, XDataFrame diff --git a/src/damast/core/decorators.py b/src/damast/core/decorators.py index b2984dc..e1ad8c9 100644 --- a/src/damast/core/decorators.py +++ b/src/damast/core/decorators.py @@ -10,7 +10,7 @@ DECORATED_DESCRIPTION, DECORATED_INPUT_SPECS, DECORATED_OUTPUT_SPECS, - ) +) from .metadata import ArtifactSpecification, DataSpecification from .transformations import PipelineElement diff --git a/src/damast/core/polars_dataframe.py b/src/damast/core/polars_dataframe.py index a409738..58f7057 100644 --- a/src/damast/core/polars_dataframe.py +++ b/src/damast/core/polars_dataframe.py @@ -289,7 +289,7 @@ def open(cls, path: str | Path, sep = ',') -> polars.LazyFrame: raise ValueError(f"{cls.__name__}.load_data: Unsupported input file format {path.suffix}") @classmethod - def from_vaex_hdf5(cls, path: str | Path) -> tuple[polars.LazyFrame, 'MetaData']: + def from_vaex_hdf5(cls, path: str | Path) -> tuple[polars.LazyFrame, 'MetaData']: # noqa """ Load hdf5 file and (damast) metadata if found in the file. """ @@ -337,7 +337,7 @@ def from_vaex_hdf5(cls, path: str | Path) -> tuple[polars.LazyFrame, 'MetaData'] return polars.LazyFrame(data), metadata @classmethod - def import_netcdf(cls, path: list[str|Path]) -> tuple[polars.LazyFrame, dict[str, 'MetaData']]: + def import_netcdf(cls, path: list[str|Path]) -> tuple[polars.LazyFrame, dict[str, 'MetaData']]: #noqa ensure_packages(pkgs=["dask", "xarray", "pandas"], required_for="Loading netcdf files", @@ -356,7 +356,7 @@ def import_netcdf(cls, path: list[str|Path]) -> tuple[polars.LazyFrame, dict[str return df.lazy(), {} @classmethod - def import_hdf5(cls, files: str | Path | list[str|Path]) -> tuple[polars.LazyFrame, dict[str, 'MetaData']]: + def import_hdf5(cls, files: str | Path | list[str|Path]) -> tuple[polars.LazyFrame, dict[str, 'MetaData']]: # noqa """ Import a dataframe stored as HDF5. diff --git a/src/damast/core/transformations.py b/src/damast/core/transformations.py index e747209..2b13324 100644 --- a/src/damast/core/transformations.py +++ b/src/damast/core/transformations.py @@ -17,7 +17,7 @@ DECORATED_DESCRIPTION, DECORATED_INPUT_SPECS, DECORATED_OUTPUT_SPECS, - ) +) from .formatting import DEFAULT_INDENT @@ -43,13 +43,13 @@ def fit_transform(self, df: AnnotatedDataFrame, other: AnnotatedDataFrame | None class PipelineElement(Transformer): #: Pipeline in which context this processor will be run - parent_pipeline: 'DataProcessingPipeline' + parent_pipeline: 'DataProcessingPipeline' #noqa #: Map names of input and outputs for a particular pipeline _name_mappings: dict[str, dict[str, str]] #: Map names of datasource (arguments) to a specific (extra) transformer arguments - def set_parent(self, pipeline: 'DataProcessingPipeline'): + def set_parent(self, pipeline: 'DataProcessingPipeline'): #noqa """ Sets the parent pipeline for this pipeline element diff --git a/src/damast/data_handling/accessors.py b/src/damast/data_handling/accessors.py index aed8357..05dac38 100644 --- a/src/damast/data_handling/accessors.py +++ b/src/damast/data_handling/accessors.py @@ -8,13 +8,13 @@ import time from typing import Any, List, Optional, Union -from damast.ml import keras import keras.utils import numpy as np import pandas as pd import polars as pl from damast.core.types import DataFrame, XDataFrame +from damast.ml import keras __all__ = [ "GroupSequenceAccessor", @@ -72,7 +72,7 @@ def __init__(self, self.groups = df.unique(group_column) if sort_columns is not None: - self.sort_columns = sort_columns if type(sort_columns) == list else [sort_columns] + self.sort_columns = sort_columns if type(sort_columns) is list else [sort_columns] else: self.sort_columns = sort_columns @@ -190,7 +190,7 @@ def to_keras_generator(self, features: List[str], f" got {datatypes}") if use_target: - target = target if type(target) == list else [target] + target = target if type(target) is list else [target] if sequence_forecast < 0: raise ValueError(f"{self.__class__.__name__}: Sequence forecast cannot be negative") @@ -231,7 +231,7 @@ def _generator(features: List[str], target: Optional[List[str]], all_columns += self.sort_columns use_target = target is not None if use_target: - target = target if type(target) == list else target + target = target if type(target) is list else target all_columns += target else: # If no target, we are not forecasting @@ -420,7 +420,7 @@ def to_keras_generator(self, features, raise ValueError(f"{self.__class__.__name__}: Sequence forecast cannot be negative") if use_target: - target = target if type(target) == list else target + target = target if type(target) is list else target if sequence_forecast == 0: raise ValueError(f"{self.__class__.__name__}: Cannot do extract targets with no sequence forecast") else: diff --git a/src/damast/data_handling/transformers/__init__.py b/src/damast/data_handling/transformers/__init__.py index 5ec62f5..054662e 100644 --- a/src/damast/data_handling/transformers/__init__.py +++ b/src/damast/data_handling/transformers/__init__.py @@ -9,7 +9,7 @@ ChangeTypeColumn, JoinDataFrameByColumn, MultiplyValue, - ) +) from .filters import DropMissingOrNan, FilterWithin, RemoveValueRows from .normalizers import normalize diff --git a/src/damast/data_handling/transformers/augmenters.py b/src/damast/data_handling/transformers/augmenters.py index 04a21cd..e8e5f13 100644 --- a/src/damast/data_handling/transformers/augmenters.py +++ b/src/damast/data_handling/transformers/augmenters.py @@ -363,7 +363,7 @@ def __init__(self, new_type: Any): """Constructor""" if isinstance(new_type, pl.datatypes.classes.DataTypeClass): self.new_type = new_type - elif type(new_type) == str: + elif type(new_type) is str: polar_type = new_type.capitalize() if not hasattr(pl.datatypes.classes, polar_type): raise TypeError(f"Type {new_type} has not correspondence in 'polars'") diff --git a/src/damast/domains/maritime/ais/data_generator.py b/src/damast/domains/maritime/ais/data_generator.py index bffdc31..c7695dc 100644 --- a/src/damast/domains/maritime/ais/data_generator.py +++ b/src/damast/domains/maritime/ais/data_generator.py @@ -17,7 +17,7 @@ ColumnName, CourseOverGround, SpeedOverGround, - ) +) _log: Logger = getLogger(__name__) _log.setLevel(INFO) diff --git a/src/damast/domains/maritime/data_processing.py b/src/damast/domains/maritime/data_processing.py index 6d34432..40cadad 100644 --- a/src/damast/domains/maritime/data_processing.py +++ b/src/damast/domains/maritime/data_processing.py @@ -18,7 +18,7 @@ DropMissingOrNan, FilterWithin, RemoveValueRows, - ) +) from damast.data_handling.transformers.augmenters import AddLocalIndex from damast.domains.maritime.ais import vessel_types from damast.domains.maritime.transformers import AddVesselType, ComputeClosestAnchorage diff --git a/src/damast/domains/maritime/transformers/augmenters.py b/src/damast/domains/maritime/transformers/augmenters.py index 9bfdf1b..af11572 100644 --- a/src/damast/domains/maritime/transformers/augmenters.py +++ b/src/damast/domains/maritime/transformers/augmenters.py @@ -73,7 +73,7 @@ def load_data(cls, try: return XDataFrame.open(path=filename, sep=sep) except FileNotFoundError: - raise RuntimeError(f"{cls}: Vessel type information not accessible. File {vessel_type_csv} not found") + raise RuntimeError(f"{cls}: Vessel type information not accessible. File {filename} not found") @damast.core.describe("Compute distance from dataset to closest anchorage") @damast.core.input({"x": {"representation_type": float, "unit": damast.core.units.units.deg}, diff --git a/src/damast/ml/__init__.py b/src/damast/ml/__init__.py index e0d9c94..25d0f82 100644 --- a/src/damast/ml/__init__.py +++ b/src/damast/ml/__init__.py @@ -1,7 +1,6 @@ import importlib -import os - import logging +import os logger = logging.getLogger(__name__) @@ -11,7 +10,7 @@ def backend_available(backend: str) -> bool: try: importlib.import_module(backend) return True - except ImportError as e: + except ImportError: return False def autodiscover_backend(priority: list[str] = ["torch", "tensorflow", "jax"]): @@ -33,4 +32,4 @@ def autodiscover_backend(priority: list[str] = ["torch", "tensorflow", "jax"]): else: autodiscover_backend() -import keras +import keras # noqa diff --git a/src/damast/ml/experiments.py b/src/damast/ml/experiments.py index ebb6afb..060f4c8 100644 --- a/src/damast/ml/experiments.py +++ b/src/damast/ml/experiments.py @@ -12,7 +12,6 @@ from pathlib import Path from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Union -from damast.ml import keras import numpy as np import polars as pl import yaml @@ -21,6 +20,7 @@ from damast.core.dataprocessing import DataProcessingPipeline from damast.core.types import DataFrame from damast.data_handling.accessors import GroupSequenceAccessor +from damast.ml import keras from damast.ml.models.base import BaseModel, ModelInstanceDescription basicConfig() @@ -142,7 +142,7 @@ def __iter__(self): yield "training_parameters", self.training_parameters._asdict() # type: ignore def __eq__(self, other) -> bool: - if type(self) != type(other): + if type(self) is not type(other): return False for attr in ["pipeline", "features", "targets", "models"]: @@ -179,7 +179,7 @@ def __init__(self, *, self.forecast_length = forecast_length def __eq__(self, other): - if type(self) != type(other): + if type(self) is not type(other): return False for attr in ["pipeline", "features", "targets", "models", "group_column", "sequence_length", @@ -249,7 +249,7 @@ def __init__(self, raise ValueError(f"{self.__class__.__name__}.__init__: learning_task must be either" f"dict or LearningTask object") - if type(input_data) == list: + if type(input_data) is list: self.input_data = input_data else: self.input_data = [ Path(input_data) ] diff --git a/src/damast/ml/models/base.py b/src/damast/ml/models/base.py index 5c54a18..b90e8d4 100644 --- a/src/damast/ml/models/base.py +++ b/src/damast/ml/models/base.py @@ -6,15 +6,15 @@ from abc import ABC, abstractmethod from pathlib import Path from tempfile import gettempdir -from typing import ClassVar, NamedTuple, OrderedDict +from typing import ClassVar, Generator, NamedTuple, OrderedDict -from damast.ml import keras import keras.callbacks import keras.utils import pandas as pd from damast.core import DataSpecification from damast.core.types import DataFrame, XDataFrame +from damast.ml import keras __all__ = [ "BaseModel", @@ -190,8 +190,8 @@ def evaluation_file(self) -> Path: return self.model_dir / f'evaluation-{CHECKPOINT_BEST}.csv' def train(self, - training_data: tf.data.Dataset, - validation_data: tf.data.Dataset, + training_data: Generator, + validation_data: Generator, monitor: str = "val_loss", mode: str = "min", epochs: int = 1, @@ -270,7 +270,7 @@ def save(self) -> Path: def evaluate(self, label: str, - evaluation_data: tf.data.Dataset, + evaluation_data: Generator, **kwargs) -> dict[str, any]: """ Evaluate this model. diff --git a/src/damast/ml/worker.py b/src/damast/ml/worker.py index b0f4fcb..6749ba9 100644 --- a/src/damast/ml/worker.py +++ b/src/damast/ml/worker.py @@ -70,7 +70,7 @@ def predict(self, msg = connection.recvmsg(4) # Get the length field data = msg[0].decode() - if type(data) == str and data == ControlCommand.STOP.value: + if type(data) is str and data == ControlCommand.STOP.value: break data = next(gen_predict) diff --git a/src/damast/utils/__init__.py b/src/damast/utils/__init__.py index e81cb25..e11b9ef 100644 --- a/src/damast/utils/__init__.py +++ b/src/damast/utils/__init__.py @@ -1,5 +1,5 @@ -import sys import importlib +import sys if sys.version_info < (3,11): import iso8601 diff --git a/src/damast/utils/io.py b/src/damast/utils/io.py index 232b555..e2e5fd6 100644 --- a/src/damast/utils/io.py +++ b/src/damast/utils/io.py @@ -1,14 +1,14 @@ -from enum import Enum -import logging import io +import logging import shutil import subprocess import tempfile import time import warnings -from pathlib import Path -from typing import Callable, ClassVar import zipfile +from enum import Enum +from pathlib import Path +from typing import Callable from damast.core.constants import DAMAST_MOUNT_PREFIX @@ -35,7 +35,10 @@ class Archive: def autoload_backend(self): try: - from ratarmountcore.compressions import ARCHIVE_FORMATS, COMPRESSION_FORMATS #noqa + from ratarmountcore.compressions import ( # noqa + ARCHIVE_FORMATS, + COMPRESSION_FORMATS, + ) self._backend = ArchiveBackend.RATARMOUNT except Exception: warnings.warn("ratarmount could not be loaded: falling back to zipfile-based support") @@ -47,7 +50,7 @@ def supported_suffixes(self): """ if not self._backend: raise RuntimeError("Archive.supported_suffixes: ensure that backend is set, e.g., call 'autoload_backend' first") - + fn_name = f"supported_suffixes_{self._backend.value}" if not hasattr(self, fn_name): raise RuntimeError(f"Missing implementation for {fn_name}") @@ -65,7 +68,10 @@ def supported_suffixes_ratarmount(self): if self._supported_suffixes is not None: return self._supported_suffixes - from ratarmountcore.compressions import ARCHIVE_FORMATS, COMPRESSION_FORMATS # noqa + from ratarmountcore.compressions import ( # noqa + ARCHIVE_FORMATS, + COMPRESSION_FORMATS, + ) self._supported_suffixes = [] for k, v in COMPRESSION_FORMATS.items(): self._supported_suffixes += v.extensions @@ -135,7 +141,7 @@ def mount_zipfile(self, file, target): continue dirname = Path(file_in_zip).parent - extract_dir = target / dirname + extract_dir = target / dirname extract_dir.mkdir(parents=True, exist_ok=True) # read inner zip file into bytes buffer @@ -181,7 +187,7 @@ def mount(self) -> list[str]: logger.info(f"Archive.mount: found archive: {file}") target_mount = Path(local_mount) / Path(file).name target_mount.mkdir(parents=True, exist_ok=True) - + fn = getattr(self, f"mount_{self._backend.value}") fn(file, target_mount) diff --git a/test_ais.csv.zip b/test_ais.csv.zip new file mode 100644 index 0000000000000000000000000000000000000000..05c22ce845c86dc610900eeeb38fa9c109df70bb GIT binary patch literal 572 zcmWIWW@Zs#U|`^2xKbJ(<5c*6VHP6;gE|WXgB*hlLrH3JNwI!PVs2t_36LsD)CY;g zCuSDwB^Q^4hHx@4+egjGyeP0Bv$TSnfsy3}GXn#dn0v`PZ?S_w!^2uXLvfGQ_wKsN zvMpq>oYteuYV&-4n_8jG>O`>vhn`)Izv^?jrtfpf>-F;sCD$*1s(i0K_09CpCf~k4 z)z#ac9n+@z?6&5;S6gfD9sidsbMO51;F$LNfA{d5uI7s(% zDLhPz!asx`i1(PAbzExJj5(@TgcnySsPcsQR_KPM0Qt#MvI ztt*{d_vF@yib)HehhDK5ubC+kBw{-&Z@hJXsvWHN__!nDk0Myn9FBlMu62 z7D?;YFE|h%*dzP2)S>)C?JzBH(@Qkv2fTLY_h=Cdz)Zxbhzb=t9};+zUe${4;9Ue|05sZ w&B!Fjj4KW$fN{&f0E|+GEsY=+p%`U_#3)*926(fwfpjwhVG58w1Tv5T05q58Bme*a literal 0 HcmV?d00001 diff --git a/tests/damast/cli/test_cli.py b/tests/damast/cli/test_cli.py index d3d414a..5da6c96 100644 --- a/tests/damast/cli/test_cli.py +++ b/tests/damast/cli/test_cli.py @@ -1,3 +1,4 @@ + import re import sys from argparse import ArgumentParser @@ -49,7 +50,7 @@ def test_subparser(name, klass, script_runner): assert result.returncode == 0 test_parser = ArgumentParser() - subparser = klass(parser=test_parser) + klass(parser=test_parser) for a in test_parser._actions: if a.help == "==SUPPRESS==": diff --git a/tests/damast/core/test_dataframe.py b/tests/damast/core/test_dataframe.py index 2a36c78..91ecfdf 100644 --- a/tests/damast/core/test_dataframe.py +++ b/tests/damast/core/test_dataframe.py @@ -16,7 +16,7 @@ DataSpecification, MetaData, ValidationMode, - ) +) from damast.core.types import XDataFrame diff --git a/tests/damast/core/test_dataprocessing.py b/tests/damast/core/test_dataprocessing.py index 6dcf989..103ec3c 100644 --- a/tests/damast/core/test_dataprocessing.py +++ b/tests/damast/core/test_dataprocessing.py @@ -11,7 +11,7 @@ DAMAST_DEFAULT_DATASOURCE, DECORATED_INPUT_SPECS, DECORATED_OUTPUT_SPECS, - ) +) from damast.core.metadata import DataCategory, DataSpecification, MetaData from damast.core.transformations import MultiCycleTransformer from damast.core.types import XDataFrame @@ -487,7 +487,7 @@ def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame: assert getattr(TransformX.transform, DECORATED_OUTPUT_SPECS)[0].name == "{{" + varname + "}}_suffix" for node in pipeline.processing_graph.nodes(): - name = node.name + assert node.name transformer = node.transformer assert transformer.input_specs[DAMAST_DEFAULT_DATASOURCE][0].name == "status" @@ -588,7 +588,7 @@ def test_join_operation(data_path, tmp_path): osint_df = AnnotatedDataFrame.from_file(osint_csv, metadata_required=False) with pytest.raises(TypeError, match="expected an annotated dataframe"): - new_adf = pipeline.transform(ais_csv, osint=osint_df) + pipeline.transform(ais_csv, osint=osint_df) joined_df = pipeline.transform(ais_df, osint=osint_df) @@ -626,7 +626,7 @@ def test_join_pipeline(data_path, tmp_path): osint_df = AnnotatedDataFrame.from_file(osint_csv, metadata_required=False) with pytest.raises(TypeError, match="expected an annotated dataframe"): - new_adf = pipeline.transform(ais_csv, osint=osint_df) + pipeline.transform(ais_csv, osint=osint_df) joined_df = pipeline.transform(ais_df, osint=osint_df) diff --git a/tests/damast/core/test_metadata.py b/tests/damast/core/test_metadata.py index caab6da..c729c72 100644 --- a/tests/damast/core/test_metadata.py +++ b/tests/damast/core/test_metadata.py @@ -14,7 +14,7 @@ MetaData, Status, ValidationMode, - ) +) from damast.core.types import XDataFrame diff --git a/tests/damast/core/test_processing_graph.py b/tests/damast/core/test_processing_graph.py index 19fdaa5..f098718 100644 --- a/tests/damast/core/test_processing_graph.py +++ b/tests/damast/core/test_processing_graph.py @@ -15,6 +15,8 @@ def test_processing_graph(): graph.add(node=n0) graph.add(node=n1) graph.add(node=n2) + graph.add(node=n3) + graph.add(node=n4) for n in graph.nodes(): node = graph[n.uuid] diff --git a/tests/damast/domains/maritime/ais/test_augmenters.py b/tests/damast/domains/maritime/ais/test_augmenters.py index 8f6e947..ff5c95a 100644 --- a/tests/damast/domains/maritime/ais/test_augmenters.py +++ b/tests/damast/domains/maritime/ais/test_augmenters.py @@ -13,7 +13,7 @@ from damast.data_handling.transformers.augmenters import ( AddLocalIndex, AddUndefinedValue, - ) +) from damast.domains.maritime.ais import AISNavigationalStatus from damast.domains.maritime.ais.vessel_types import VesselType from damast.domains.maritime.data_specification import ColumnName @@ -23,7 +23,7 @@ AddVesselType, ComputeClosestAnchorage, DeltaDistance, - ) +) def test_add_missing_ais_status(tmp_path): @@ -296,7 +296,6 @@ def test_message_index(tmp_path): data = [] num_messages_A = 90 num_messages_B = num_messages_A - 2 - num_messages_C = 1 # Insert last message first to validate time-based sorting data.append([mmsi_a, timestamp + dt.timedelta(seconds=num_messages_A), lat, lon, num_messages_A-1, 0]) diff --git a/tests/damast/domains/maritime/ais/test_vessel_types.py b/tests/damast/domains/maritime/ais/test_vessel_types.py index 0a64b37..fd66d58 100644 --- a/tests/damast/domains/maritime/ais/test_vessel_types.py +++ b/tests/damast/domains/maritime/ais/test_vessel_types.py @@ -5,7 +5,7 @@ Fishing, PotsAndTraps, VesselType, - ) +) def test_vessel_types(): diff --git a/tests/damast/domains/maritime/test_data_processing.py b/tests/damast/domains/maritime/test_data_processing.py index 71279cd..d14869f 100644 --- a/tests/damast/domains/maritime/test_data_processing.py +++ b/tests/damast/domains/maritime/test_data_processing.py @@ -192,7 +192,10 @@ def test_aggregate(data_path, tmp_path): pipeline = MyPipeline(workdir=tmp_path) df = pipeline.transform(adf, verbose=True) - path = data_path / "test_ais.csv" + assert df.metadata['lat'].unit == 'deg' + assert df.metadata['lon'].unit == 'deg' + + assert "delta_distance" in df.column_names def test_aggregate_with_name_mapping(data_path, tmp_path): path = data_path / "test_ais.csv" diff --git a/tests/damast/ml/test_experiments.py b/tests/damast/ml/test_experiments.py index f837d28..669af05 100644 --- a/tests/damast/ml/test_experiments.py +++ b/tests/damast/ml/test_experiments.py @@ -12,7 +12,6 @@ from damast.core.transformations import MultiCycleTransformer from damast.core.units import units from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec - from damast.ml import keras from damast.ml.experiments import ( Experiment, @@ -20,17 +19,12 @@ LearningTask, ModelInstanceDescription, TrainingParameters, - ) +) from damast.ml.models.base import BaseModel os.environ["COLUMNS"] = '120' -class ModelA(BaseModel): - def __init_model(self): - pass - - class TransformerA(PipelineElement): @damast.input({"x": {}}) @damast.output({"x": {}}) @@ -202,7 +196,7 @@ def test_validate_experiment_dir(tmp_path): with pytest.raises(NotADirectoryError): filename = tmp_path / "dummy-file" - with open(filename, "w") as f: + with open(filename, "w"): pass Experiment.validate_experiment_dir(dir=filename) @@ -215,7 +209,7 @@ def test_validate_experiment_dir(tmp_path): Experiment.touch_marker(dir=f"{tmp_path}-does-not-exist") with pytest.raises(NotADirectoryError): file_in_dir = tmp_path / "test_file" - with open(file_in_dir, "a") as f: + with open(file_in_dir, "a"): pass Experiment.touch_marker(dir=file_in_dir) diff --git a/tests/damast/utils/test_io.py b/tests/damast/utils/test_io.py index 42084b8..681a475 100644 --- a/tests/damast/utils/test_io.py +++ b/tests/damast/utils/test_io.py @@ -5,8 +5,7 @@ import pytest from damast.core.dataframe import DAMAST_SPEC_SUFFIX, AnnotatedDataFrame -from damast.utils.io import Archive -from damast.utils.io import ArchiveBackend +from damast.utils.io import Archive, ArchiveBackend @pytest.mark.skipif(sys.platform.startswith("win"), reason="ratarmount does not (easily) run on windows - zipfile backend should be used") diff --git a/tests/examples/03-ais-trajectory-prediction/trp.py b/tests/examples/03-ais-trajectory-prediction/trp.py index 77c5b83..c5284cc 100644 --- a/tests/examples/03-ais-trajectory-prediction/trp.py +++ b/tests/examples/03-ais-trajectory-prediction/trp.py @@ -1,6 +1,8 @@ import os +import tempfile from collections import OrderedDict from pathlib import Path + import numpy as np import damast @@ -11,18 +13,14 @@ from damast.core.transformations import MultiCycleTransformer from damast.core.units import units from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec - from damast.ml import keras -import keras from damast.ml.experiments import ( Experiment, ForecastTask, - LearningTask, ModelInstanceDescription, TrainingParameters, - ) +) from damast.ml.models.base import BaseModel -import tempfile os.environ["COLUMNS"] = '120' @@ -144,7 +142,8 @@ class BaselineB(Baseline): input_adf = pipeline.transform(processed_data) # creating input features -from damast.data_handling.accessors import SequenceIterator +from damast.data_handling.accessors import SequenceIterator # noqa + sta = SequenceIterator(df=input_adf) gen_predict = sta.to_keras_generator(features=features, target=features, sequence_length=5) From 45ecc5c3c920559d143ab3d27c5faa1c70302220 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Sat, 28 Feb 2026 21:33:15 +0100 Subject: [PATCH 17/18] ci: ensure libzstd-dev is installed --- .github/workflows/test.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 74091a7..6ee4177 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -37,6 +37,12 @@ jobs: with: python-version: ${{ matrix.python-version }} + - name: Install dependencies ${{ matrix.os }} + if: runner.os == 'Linux' + run: | + # ensure zstd.h is available + sudo apt update && sudo apt install -y libzstd-dev + - name: Install pip run: python -m pip install -U pip From 50153711edac70d2a37b2cc4e45d0e57dadb72f7 Mon Sep 17 00:00:00 2001 From: Thomas Roehr Date: Sun, 1 Mar 2026 11:05:03 +0100 Subject: [PATCH 18/18] test: improve fixture setup --- tests/damast/ml/test_scheduler_worker.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/damast/ml/test_scheduler_worker.py b/tests/damast/ml/test_scheduler_worker.py index 67aad1e..9cca632 100644 --- a/tests/damast/ml/test_scheduler_worker.py +++ b/tests/damast/ml/test_scheduler_worker.py @@ -42,8 +42,11 @@ def job(tmp_path): def worker_process(): # Ensure that no previous worker job is running for proc in psutil.process_iter(): - if "run_test_worker" in proc.name(): - proc.kill() + try: + if "run_test_worker" in proc.name(): + proc.kill() + except psutil.NoSuchProcess: + pass run_test_worker = Path(__file__).parent / "run_test_worker.py" assert run_test_worker.exists()