Skip to content

Sync 2.x into master: pipeline refactor (PR-A through PR-E2) and changelog#32

Open
ramonski wants to merge 24 commits into
masterfrom
2.x
Open

Sync 2.x into master: pipeline refactor (PR-A through PR-E2) and changelog#32
ramonski wants to merge 24 commits into
masterfrom
2.x

Conversation

@ramonski
Copy link
Copy Markdown
Contributor

Heads-up: This is the last PR that will be merged into master.
After this merge, 2.x will diverge from master to implement new
transport layers and broader architectural changes. Future work will
live on 2.x (and successor branches), not on master.

Summary

Brings master up to date with the pipeline refactor series merged on
2.x:

Test plan

  • bin/test --package senaite.astm passes
  • Replay corpus tests pass against the bundled fixtures
  • Spot-check that downstream consumers of Wrapper.to_dict() still
    work with the typed envelope

ramonski added 15 commits May 8, 2026 22:56
The package declares Python 3.8+ in setup.py, so the legacy compat
layer is dead code. Cleaning it up reduces noise for future readers
and removes a layer of indirection over plain str / int / bytes.

- Delete src/senaite/astm/compat.py (basestring, unicode, long,
  make_string, b, u, buffer)
- Replace compat imports with direct str / int / bytes usage
- Inline make_string into Field._set_value (decode bytes as utf-8,
  str() everything else)
- Replace try/except izip_longest with a direct
  'from itertools import zip_longest' in utils.py and mapping.py
- Replace try/except 'from collections import Iterable' with
  'from collections.abc import Iterable' in codec.py
- Replace deprecated logger.warn with logger.warning in utils.py
  and lims.py
- Drop the u() helper in tests/test_fields.py — Python 3 string
  literals are already unicode
Drop Python 2 compatibility shims
Bumps the package to 2.0.0 — the LIMS push API is intentionally
incompatible with the 1.x line.

The Session class now creates a single requests.Session in __init__
and reuses it across all calls, so the TLS handshake is amortised
across the connection rather than repeated for every request. auth(),
get() and post() raise typed exceptions (SenaiteAuthError,
SenaiteHTTPError, SenaiteUnreachableError) instead of swallowing
every Exception into an empty dict, so the caller can react to the
specific failure mode.

post_to_senaite() now authenticates once per call. Retries on push
failure only re-call session.post(); auth is not re-run on every
attempt as before. The function returns a PushResult dataclass
(success, attempts, last_error) so the server can act on the result
instead of fire-and-forget.

The top-level senaite.astm.lims module is removed in the same PR;
server.py, sender.py and the tests import from senaite.astm.core.lims
directly. We own all callers, so we don't keep a compat shim.
Lift LIMS push into core/ with typed errors and PushResult
Adds senaite.astm.core.envelope with a pydantic-based Envelope and
Metadata model. The envelope is now a pinned contract:

- ENVELOPE_VERSION = '1.0' is exposed in metadata.envelope_version
  on every output, so consumers can detect schema changes.
- Metadata declares the required keys (envelope_version, astm,
  lis2a) and accepts vendor extras (e.g. Roche c111's parsed
  sender component) via extra='allow'.
- The per-record buckets (H, P, O, R, C, M, L, Q) default to empty
  lists so the top-level shape is stable regardless of which
  record types a given instrument emits.
- Per-record dict shapes are intentionally left loose — that lives
  in the per-instrument record classes.

Wrapper now exposes to_envelope() returning the typed model and
keeps to_dict()/to_json() as JSON-serialisable convenience wrappers
around it. The 11 golden snapshots are regenerated to include the
envelope_version field and the empty-list defaults.

pydantic>=2 is added to install_requires.
Define a typed Envelope schema for Wrapper.to_dict()
Three behaviours bit us in production and made the test output
hard to read. This PR fixes the symptoms without rewriting the
descriptor framework.

NotUsedField no longer warns on assignment. The cobas_c311 fixture
alone produced ~78 UserWarning entries per parse, drowning out
real warnings without giving the operator anything actionable. The
field now silently drops the assigned value.

SetField accepts unknown values by default and logs them at debug
level. A device firmware update that introduces a new status code
should not crash parsing of every message that contains it. Pass
strict=True to restore the legacy raise-on-unknown behaviour.

DateField, TimeField and DateTimeField now accept a tuple of
parse_formats in addition to the canonical format. Subclasses can
extend it to handle vendor-specific date strings without rewriting
parse logic. The canonical format is always tried first and is
still used for serialisation, so existing snapshots are unchanged.
Make field descriptors quiet and tolerant
First step of unifying senaite.astm.instruments and
senaite.astm.adapters into a single mechanism.

- New core.instrument module:
  - Instrument base class (name, header_regex, record_map,
    can_handle, preparse, get_metadata)
  - register_instrument decorator with shape validation
  - find_instrument resolver that raises
    AmbiguousInstrumentError instead of silently picking one
    match when two regexes overlap
- Wrapper.get_mapping() now consults the registry first and
  falls back to today's pkgutil-based discovery. Instrument-
  specific metadata is merged via either path.

No instrument has been migrated yet, so behaviour is unchanged
for all existing analyzers. PR-E2 will migrate them and remove
the legacy discovery path together with senaite.astm.adapters.

269 tests pass (+10 new); flake8 clean.
Introduce the instrument registry (PR-E1)
Second and final step of the instrument unification:

- Every senaite.astm.instruments.* module now declares an
  Instrument subclass at the bottom and registers itself via
  @register_instrument. Module-level HEADER_RX is a compiled
  bytes regex; the old get_mapping()/get_metadata() helpers are
  gone. Each module also exposes INSTRUMENT for direct test
  access.
- The two zope-adapter data handlers (mini_vidas, spotchem se1520)
  have moved onto their corresponding Instrument via a new
  raw_data_regex attribute and a handle_raw_data(protocol, data)
  hook. ASTMProtocol.handle_data now dispatches via
  find_raw_data_handler instead of a Components registry.
- Wrapper.get_mapping resolves entirely through the registry and
  falls back to DEFAULT_MAPPING for unknown headers. The pkgutil
  iter_modules path is gone, along with self.module.
- senaite.astm.adapters, senaite.astm.interfaces, the
  adapter_registry global, and the zope.interface dependency are
  removed.
- instruments/__init__.py imports every submodule so the
  decorators run at import time.
- The Instrument base class also gained get_metadata; the wrapper
  now actually calls it (the old wrapper had latent code that
  never fired for any module). Envelope snapshots regenerated
  accordingly to include version + header_rx in metadata.
- New test_replay_corpus.py walks $ASTM_REPLAY_DIR (~50k CERMEL
  captures) through Wrapper and asserts the parse failure ratio
  stays under 5%. Pre-/post-migration ratios are identical at
  1549/50382 (~3.07%), confirming no real-traffic regression.
- Existing per-instrument tests updated to access
  <module>.INSTRUMENT.record_map.

270 tests pass (269 + replay). Existing pre-migration parser
quirks (mostly truncated c111 captures) are explicitly tolerated
by the replay threshold.
Migrate every instrument to the registry (PR-E2)
@ramonski ramonski added Enhancement ✨ Improvement to existing functionality Cleanup 🧹 Code cleanup and refactoring labels May 15, 2026
@ramonski ramonski requested a review from xispa May 15, 2026 10:43
ramonski added 9 commits May 15, 2026 13:43
Lay the groundwork for new transports (HL7, POCT1-A2) by separating
the ASTM transport from the message pipeline.

- transports/astm/framing.py: canonical home of the byte-level
  framing helpers (re-exported from utils).
- transports/astm/protocol.py: slim ASTMProtocol that owns only the
  framing state machine and emits complete frame batches via a
  caller-supplied frame_callback. No Wrapper, no queue, no format
  negotiation, no log_message, no module-level QUEUE singleton.
- core/pipeline.py: Pipeline that runs registered handlers in order,
  exception-isolated. Sync handlers are awaited via asyncio.to_thread.
- core/handlers.py: DiskCaptureHandler, LimsPushHandler, plus the
  shared serialize_envelope helper.
- cli/astm_server.py: senaite-astm-server entry point. Wires the
  listener to the pipeline. CLI surface (args, default ports,
  logfile name) preserved verbatim.
- setup.py: senaite-astm-server now points at cli/astm_server:main.
- Legacy senaite/astm/protocol.py and senaite/astm/server.py removed
  (no BBB shim per refactor-plan §2).
- Existing instrument tests: import path updated from
  senaite.astm.protocol to senaite.astm.transports.astm.protocol.
- test_end_to_end.py: rebuilt around the frame_callback contract via
  a small make_serializing_callback test helper that mirrors the CLI
  wiring.
- New test_pipeline.py: handler ordering, exception isolation, sync
  handler invocation, name resolution, serialize_envelope formats.

Tests: 281 passed, 1 skipped. PR-G picks up server hardening (sane
log rotation, asyncio.run, tracked tasks for graceful shutdown).
PR-H makes disk capture first-class and removes the implicit
\$CWD/astm_messages/ directory.
Split transport from protocol semantics (PR-F)
Replace the legacy synchronous server entry point with a properly
async one and stop losing in-flight messages on shutdown.

- cli/astm_server.amain: new async entry point. Boots the listener,
  installs SIGINT/SIGTERM handlers via loop.add_signal_handler, and
  blocks on an asyncio.Event until shutdown is requested. Tests can
  inject the stop_event to drive shutdown without OS signals.
- main(): now wraps amain via asyncio.run, replacing the deprecated
  asyncio.get_event_loop() / loop.run_until_complete dance.
- Tracked task set: every pipeline run is dispatched as a tracked
  asyncio.Task. On shutdown amain calls _drain_tasks(task_set,
  args.shutdown_grace_seconds), awaiting in-flight handlers up to
  the grace period and cancelling whatever remains. New CLI flag
  --shutdown-grace-seconds (default 30s).
- Sane log rotation: RotatingFileHandler now uses 10 MB / 5 backups.
  The legacy maxBytes=5 (yes, five bytes) rotated after every record.
- Frame callback now schedules wrap+pipeline as a coroutine task on
  the loop and registers it with the tracked set, so the protocol's
  data_received returns immediately.

Tests: 289 passed, 1 skipped. New test_server_lifecycle.py covers:
- LOGFILE_MAX_BYTES / LOGFILE_BACKUP_COUNT are sane.
- A burst of records does not rotate the file.
- _drain_tasks awaits in-flight tasks, cancels stragglers past grace.
- frame_callback wraps + dispatches + tracks the task synchronously.
- amain awaits an in-flight slow handler before returning on shutdown.
Server hardening: async main, sane log rotation, graceful shutdown (PR-G)
Promote raw-message capture from an implicit ``protocol.log_message``
side effect (rooted at ``$CWD/astm_messages``) to a configured
:class:`Pipeline` handler. The implicit-discovery magic is gone;
``--output <path>`` is now the only way to enable capture.

- core/output.py: new home of DiskCaptureHandler. Behaviour:
  ``path=None`` / "" makes the handler a no-op; otherwise the target
  directory is created on first write if it does not yet exist; one
  timestamped file per envelope.
- core/lims.py: gains LimsPushHandler so the LIMS-side handler lives
  next to Session and post_to_senaite.
- core/envelope.py: gains serialize_envelope(envelope, format) — it
  is a method-on-envelope helper, so it lives with the schema.
- core/handlers.py: deleted. The PR-F "umbrella" module is no
  longer needed once each handler lives with its domain.
- cli/astm_server.build_pipeline: drops the auto-discovery of
  $CWD/astm_messages/. Pipeline is now strictly driven by --output
  and --url.
- .gitignore: ignore stray logfiles produced by manual server runs.
- New test_output.py: noop-when-unconfigured, file-per-envelope,
  directory creation, name attribute. Plus build_pipeline coverage:
  no implicit capture even when the legacy directory exists.

Tests: 296 passed, 1 skipped. flake8 clean.

This is the last refactor PR before the HL7/POCT1-A2 transport
work. From here, a new transport is a new ``transports/<name>/``
directory and the HemoScreen instrument is a new
``instruments/pixcell_hemoscreen/`` directory — neither requires
core-code changes.
Disk capture is a first-class pipeline handler (PR-H)
First non-ASTM transport in the package. Lands the listener that
HemoScreen (and any HL7 v2 / MLLP device) talks to; parsing and LIMS
push come in PR-7 once we have a real device capture.

- transports/hl7/framing.py — MLLP framing helpers. wrap() produces
  SB + payload + EB + CR; extract_messages() streams complete blocks
  out of a buffered TCP read and returns the unconsumed tail.
- transports/hl7/protocol.py — asyncio.Protocol that buffers bytes
  across data_received, dispatches every complete MLLP block to a
  frame_callback, and writes back an MLLP-wrapped ACK^R01 with
  MSA|AA echoing MSH-10 (Message Control ID). The HemoScreen spec
  §3.2 mandates an ACK before the device sends the next message,
  so even passthrough must respond.
- cli/_runtime.py — shared CLI scaffolding (rotating logfile,
  shutdown grace, signal handlers, tracked task draining) lifted
  out of cli/astm_server so both transports use the same lifecycle.
- cli/hl7_server.py — senaite-hl7-server entry point. Default port
  2575 (IANA-registered for HL7). Captures each received message
  to --output via a new RawCaptureHandler; no parsing, no LIMS.
- cli/hl7_simulator.py — senaite-hl7-simulator entry point.
  Replays HL7 fixtures (with \n -> \r normalisation) against a
  listener, waits for each ACK before sending the next message.
- tests/data/hl7/hemoscreen_*.hl7 — four fixtures lifted verbatim
  from the HemoScreen HL7 Connectivity Protocol §8.1 appendix:
  fresh blood, liquid QC, proficiency, with-flags.
- tests/test_hl7_framing.py — 10 framing tests: wrap, streaming
  reassembly, partial-tail preservation, pre-SB junk drop,
  back-to-back blocks, EB-without-CR rejection.
- tests/test_hl7_protocol.py — 11 protocol tests: build_ack
  contract (echoes MSH-10, MSA|AA, encoding chars, fallback for
  malformed MSH), end-to-end dispatch + ACK over a real socket on
  each appendix fixture, two-messages-one-connection draining,
  RawCaptureHandler write-and-noop behaviour, build_pipeline shape.

Suite: 317 passed, 1 skipped. flake8 clean.

Next: PR-7 — parse MLLP-captured HL7 v2 messages into the existing
Envelope shape (MSH/PID/OBR/OBX/NTE -> metadata + buckets). PR-8 —
HemoScreen instrument adapter (route only OBR-4=="OBS" to LIMS,
keyword mapping via the existing # -> _ABS / % -> _PERC convention).
HL7-over-MLLP transport, passthrough (PR-6, HemoScreen)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Cleanup 🧹 Code cleanup and refactoring Enhancement ✨ Improvement to existing functionality

Development

Successfully merging this pull request may close these issues.

1 participant