Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,43 @@ name: Run pytest
on:
push:
branches: [main, dev]
pull_request:
branches: [main, dev]
types: ["opened", "reopened", "synchronize", "ready_for_review", "draft"]
pull_request:
branches: [main, dev]
workflow_dispatch:

jobs:
build:
name: Run pytest
runs-on: ubuntu-latest
env:
PROJECT_ROOT: ${{ github.workspace }}/waveform-controller

steps:
- uses: actions/checkout@v5
with:
path: waveform-controller
- name: Checkout PIXL dependency
uses: actions/checkout@v5
with:
repository: SAFEHR-data/PIXL
ref: main
path: PIXL

- name: Install uv
uses: astral-sh/setup-uv@v7
with:
version: "0.9.26"
python-version: "3.13"
enable-cache: true

- name: Make a venv
working-directory: waveform-controller
run: uv venv

- name: Install dependencies
run: uv sync --locked --all-extras --dev
- name: Install with dependencies
working-directory: waveform-controller
run: uv pip install '.[dev]'

- name: Run the tests
working-directory: waveform-controller
run: uv run pytest tests
19 changes: 11 additions & 8 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ repos:
hooks:
- id: detect-secrets
args: [--exclude-secrets=(^my_pw$)|(^inform$)]
# Formats docstrings to comply with PEP257.
# Use --black to keep output compatible with ruff-format.
- repo: https://github.com/PyCQA/docformatter
rev: 06907d0
hooks:
- id: docformatter
additional_dependencies: [tomli]
args:
- --in-place
- --black
- --config=pyproject.toml
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.2
hooks:
Expand All @@ -25,14 +36,6 @@ repos:
"types-pika"
]
files: src/
# ----------
# Formats docstrings to comply with PEP257
- repo: https://github.com/PyCQA/docformatter
rev: 06907d0
hooks:
- id: docformatter
additional_dependencies: [tomli]
args: [--config=pyproject.toml]
# a collection of sanity checks: check for merge conflicts, check the end of
# lines, check for DOS vs unix stuff, and a newline at the end of files.
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ services:
- ../config/exporter.env
volumes:
- ../waveform-export:/waveform-export
# because we're launching through cron in the container, which starts
# processes with a clean environment, also mount in the config file so
# it can be read in by snakemake later
- ../config/exporter.env:/config/exporter.env:ro
restart: unless-stopped
waveform-hasher:
build:
Expand Down
128 changes: 128 additions & 0 deletions docs/debugging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# About
This is a brief guide to inspecting the state of the waveform data pipeline.
Eg, looking for intermediate data or error messages if something
is not coming through as expected.

# Start here

It is helpful to refer to the pipeline diagram at
https://github.com/SAFEHR-data/emap/blob/develop/docs/technical_overview/waveforms/pipeline.md
to get an overview and find the right place to look.

First, let's see what data is present in the `waveform-export` directory.

Are there recent files in `ftps-logs`?

> [!NOTE]
> The timestamps that are part of the file names are based on when the data is from, not when they were processed by us!

> [!TIP]
> Use `ls -latr` to see the latest files in a directory.

If the relevant `*.uploaded.json` marker file is present, this means the FTPS
upload to the DSH happened without an error.
The file contents are upload stats in JSON format.
Uploads will also generate email notifications from the DSH.

If the marker file is not present, let's check the
other end of our pipeline: are there recent files in `original-csv`?
If not then you need to look at the `waveform-controller` logs,
or failing that further upstream, at the Rabbitmq server (see later section).

If files in `original-csv` are present, then the error is somewhere inside our pipeline,
and you should check the logs in `snakemake-logs` (see later section).

Parquets that are a direct translation from the CSV are found in `original-parquet`.

Parquets that have been pseudonymised are found in `pseudonymised`.


# Logging summary

Logs are found in:
* Docker container logs
* Snakemake top-level logs
* Snakemake job-level logs

> [!CAUTION]
> Always be aware that logs may contain sensitive information. The only
> files considered safe for upload to the DSH are those in the `pseudonymised`
> directory.

## Docker logs

### `waveform-controller` container
```docker compose logs -t waveform-controller```
Shows the `waveform-controller` service logs. Useful for:
- Emap connectivity
- RabbitMQ connectivity
- patient correlation query errors (search for "unmatched")
- CSV output failures

This log is not very chatty if everything is going well.

### `waveform-exporter` container
```docker compose logs -t waveform-exporter```
Shows the output from the cron-triggered script `scheduled-script.sh`.
Useful for high-level pipeline failures before Snakemake starts, or
Snakemake startup failures (eg. when snakemake already running)

## Snakemake logs

Written to the mounted volume under `waveform-export/snakemake-logs/`.
These logs describe pipeline orchestration and per-rule execution.

### `snakemake-outer-log*.log`
Top-level Snakemake run logs, including:
- recently written CSVs that were temporarily excluded from processing (search "File too new")
- job summaries and Snakemake DAG resolution
- more detailed errors when Snakemake itself fails

Unlike data files, the timestamps in these file names are when the snakemake
pipeline was invoked.

### `{date}.{hashed_csn}.{stream_id}.{units}.log`
Job-level log for the `csv_to_parquet` rule. Contains:
- CSV -> parquet info
- pseudonymisation steps

## FTPS logs and marker files

Produced under `waveform-export/ftps-logs/`.

### `{date}.{hashed_csn}.{stream_id}.{units}.ftps.log`
Job-level FTPS upload logs. Useful for:
- connection/authentication errors
- transfer failures

### `{date}.{hashed_csn}.{stream_id}.{units}.ftps.uploaded.json`
Upload marker file (aka sentinel) written after a successful transfer.
It contains, in JSON format:
- `uploaded_file` (the uploaded file path)
- `upload_time_secs` (time to upload in seconds using monotonic clock)
- `start_timestamp` and `end_timestamp` (wall clock UTC start and end timestamp)

Example paths:
- `waveform-export/snakemake-logs/snakemake-outer-log20260122T173201.log`
- `waveform-export/snakemake-logs/2025-06-04.acbc4701.52912.mL.log`
- `waveform-export/ftps-logs/2025-06-04.8bea0824.52912.mL.ftps.log`
- `waveform-export/ftps-logs/2025-06-04.8bea0824.52912.mL.ftps.uploaded.json`

# Rabbitmq (part of Emap)

If the `waveform-controller` service appears to be up and running but is
not generating data, you could check the `waveform_export` queue
in the rabbitmq server, which is part of Emap.

If there are no messages present, it's possible that the waveform reader (also
part of Emap) is not generating them.

# Waveform reader (part of Emap)
This receives HL7 data on a TCP port from the Capsule server.
It writes received messages
to the docker host directory `waveform-saved-messages`, so look
there for recent messages.

Useful commands, to be run from the emap venv (see Emap repo for more details):
* `emap docker ps` check for container up status (`waveform-reader`)
* `emap docker logs waveform-reader` see if HL7 messages are being received, check for errors
15 changes: 14 additions & 1 deletion docs/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,20 @@ git push

## Testing

No tests yet.
Even though we are largely running in docker, you may wish to let your IDE have access to a venv for running tests in.

Create the venv in the parent of the repo root and activate it.
```
uv venv --python=3.13
source .venv/bin/activate
```

From the repo root, install the software and its deps:
```
uv pip install -e '.[dev]'
```


### API tests

from PROJECT_ROOT directory
Expand Down
9 changes: 6 additions & 3 deletions exporter-scripts/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# (can't use -u because need to check for potentially unset var)
set -eo pipefail

CRON_OUTPUT_FILE="/var/log/cron_output.log"
touch "$CRON_OUTPUT_FILE"
# Set up cron schedule according to the environment variable
if [ -z "$EXPORTER_CRON_SCHEDULE" ]; then
echo "You must set EXPORTER_CRON_SCHEDULE when running this container"
Expand All @@ -12,8 +14,9 @@ set -x
cat <<EOF | crontab -
PATH=/usr/local/bin:/usr/bin:/bin
SHELL=/usr/bin/bash
$EXPORTER_CRON_SCHEDULE /app/exporter-scripts/scheduled-script.sh
$EXPORTER_CRON_SCHEDULE /app/exporter-scripts/scheduled-script.sh >> "$CRON_OUTPUT_FILE" 2>&1
EOF

# cron scheduler is PID 1 in this container
exec cron -f
# run cron in background, and ensure that output goes to docker logs
cron
tail -f "$CRON_OUTPUT_FILE"
34 changes: 30 additions & 4 deletions exporter-scripts/scheduled-script.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
#!/bin/bash

# Run by the cron scheduler
# Probably want snakemake instead...
emap-csv-pseudon --help
emap-send-ftps --help
set -euo pipefail

# This script is to be run by the cron scheduler, and its
# output goes to the docker logs.
# The snakemake output goes to its own log file as defined here.
# These files will end up on Windows so be careful about disallowed characters in the names.
date_str=$(date --utc +"%Y%m%dT%H%M%S")
SNAKEMAKE_CORES="${SNAKEMAKE_CORES:-1}"
# for temporarily making the pipeline not go all the way
SNAKEMAKE_RULE_UNTIL="${SNAKEMAKE_RULE_UNTIL:-all}"

# log file for the overall snakemake run (as opposed to per-job logs,
# which are defined in the snakefile)
outer_log_file="/waveform-export/snakemake-logs/snakemake-outer-log${date_str}.log"
# snakemake has not run yet so will not create the log dir; do it manually
mkdir -p "$(dirname "$outer_log_file")"
echo "$0: invoking snakemake, cores=$SNAKEMAKE_CORES, logging to $outer_log_file"
touch "$outer_log_file"
# bring in envs from file because cron gives us a clean environment
set -a
source /config/exporter.env
set +a
set +e
snakemake --snakefile /app/src/pipeline/Snakefile \
--cores "$SNAKEMAKE_CORES" \
--until "$SNAKEMAKE_RULE_UNTIL" \
>> "$outer_log_file" 2>&1
ret_code=$?
set -e
echo "$0: snakemake exited with return code $ret_code"
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"pandas",
"pandas==2.2.3",
#"pyarrow>=22.0",
"pika>=1.3.2",
"pre-commit>=4.5.0",
"snakemake==9.14.5",
# need to be compatible with PIXL, which currently pins 2.9.10 (arguably it shouldn't)
"psycopg2-binary>=2.9.10",
"stablehash==0.3.0",
# trick for making a "relative" path, works inside or outside container image
"core @ file:///${PROJECT_ROOT}/../PIXL/pixl_core",
]
Expand All @@ -20,8 +22,8 @@ dev = ["pytest>=9.0.2"]

[project.scripts]
emap-extract-waveform = "controller:receiver"
emap-csv-pseudon = "pseudon.pseudon:main"
emap-send-ftps = "exporter.ftps:do_upload"
emap-csv-pseudon = "pseudon.pseudon:psudon_cli"
emap-send-ftps = "exporter.ftps:do_upload_cli"

[tool.mypy]
ignore_missing_imports = true
5 changes: 2 additions & 3 deletions src/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
logger = logging.getLogger(__name__)


emap_db = db.starDB()
emap_db.init_query()
emap_db.connect()
Expand All @@ -28,8 +27,8 @@ def __init__(self, ch, delivery_tag, body):


def ack_message(ch, delivery_tag):
"""Note that `ch` must be the same pika channel instance via which the
message being ACKed was retrieved (AMQP protocol constraint)."""
"""Note that `ch` must be the same pika channel instance via which the message being
ACKed was retrieved (AMQP protocol constraint)."""
if ch.is_open:
ch.basic_ack(delivery_tag)
else:
Expand Down
7 changes: 3 additions & 4 deletions src/csv_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
def create_file_name(
source_stream_id: str, observation_time: datetime, csn: str, units: str
) -> str:
"""Create a unique file name based on the patient contact serial number
(csn) the date, and the source system."""
"""Create a unique file name based on the patient contact serial number (csn) the
date, and the source system."""
datestring = observation_time.strftime("%Y-%m-%d")
units = units.replace("/", "p")
units = units.replace("%", "percent")
Expand All @@ -26,8 +26,7 @@ def write_frame(
csn: str,
mrn: str,
) -> bool:
"""Appends a frame of waveform data to a csv file (creates file if it
doesn't exist.
"""Appends a frame of waveform data to a csv file (creates file if it doesn't exist.

:return: True if write was successful.
"""
Expand Down
Loading