Skip to content

Commit 66d85ce

Browse files
committed
feat(adcp): add AD2CP velocity data reader and processor
- Add Ad2cpAvgPing dataclass for average-mode velocity records - Add _parse_avg_data_record() for Nortek 0x16 Average Data packets - Add read_ad2cp_velocity() — reads velocity, amplitude, correlation per (time, cell, beam) into xarray Dataset - Add process_ad2cp_velocity_file() — vectorized flattening to DataFrame - Parses GETAVG config line for n_cells, n_beams, cell_size, blanking - Tested with OceanLab Munkholmen Signature100 data (serial 103124)
1 parent 590b880 commit 66d85ce

2 files changed

Lines changed: 297 additions & 1 deletion

File tree

oceanstream/adcp/ad2cp_reader.py

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,22 @@ class Ad2cpBottomTrack:
7373
beam_tilt_deg: float # beam angle from vertical (°)
7474

7575

76+
@dataclass
77+
class Ad2cpAvgPing:
78+
"""A single average-mode velocity ping."""
79+
80+
time: np.datetime64
81+
sound_speed: float
82+
temperature: float
83+
pressure: float
84+
heading: float
85+
pitch: float
86+
roll: float
87+
velocity: np.ndarray # int16, shape (n_cells, n_beams), mm/s
88+
amplitude: np.ndarray # uint8, shape (n_cells, n_beams), counts
89+
correlation: np.ndarray # uint8, shape (n_cells, n_beams), %
90+
91+
7692
def _checksum(data: bytes) -> int:
7793
"""Nortek AD2CP checksum: 0xB58C + sum of 16-bit LE words."""
7894
cs = 0xB58C
@@ -219,6 +235,216 @@ def _parse_bt_data_record(
219235
beam_tilt_deg=beam_tilt_deg,
220236
)
221237

238+
def _parse_avg_data_record(data: bytes, n_cells: int, n_beams: int) -> Ad2cpAvgPing:
239+
"""Parse a single average-mode velocity data record payload.
240+
241+
Layout after the common header (``offset_of_data`` bytes):
242+
- velocity: ``n_cells × n_beams`` int16 (mm/s)
243+
- amplitude: ``n_cells × n_beams`` uint8 (counts × 0.5 dB)
244+
- correlation: ``n_cells × n_beams`` uint8 (%)
245+
"""
246+
offset_of_data = data[1]
247+
248+
year = data[8] + 1900
249+
month = data[9] + 1
250+
day = data[10]
251+
hour = data[11]
252+
minute = data[12]
253+
sec = data[13]
254+
microsec100 = struct.unpack_from("<H", data, 14)[0]
255+
us = microsec100 * 100
256+
time = np.datetime64(
257+
f"{year}-{month:02d}-{day:02d}T{hour:02d}:{minute:02d}:{sec:02d}.{us:06d}"
258+
)
259+
260+
sound_speed = struct.unpack_from("<H", data, 16)[0] * 0.1
261+
temperature = struct.unpack_from("<h", data, 18)[0] * 0.01
262+
pressure = struct.unpack_from("<I", data, 20)[0] * 0.001
263+
heading = struct.unpack_from("<H", data, 24)[0] * 0.01
264+
pitch = struct.unpack_from("<h", data, 26)[0] * 0.01
265+
roll = struct.unpack_from("<h", data, 28)[0] * 0.01
266+
267+
payload = data[offset_of_data:]
268+
vel_bytes = n_cells * n_beams * 2
269+
amp_bytes = n_cells * n_beams
270+
271+
velocity = np.frombuffer(payload[:vel_bytes], dtype="<i2").reshape(n_cells, n_beams).copy()
272+
amplitude = np.frombuffer(
273+
payload[vel_bytes : vel_bytes + amp_bytes], dtype=np.uint8
274+
).reshape(n_cells, n_beams).copy()
275+
correlation = np.frombuffer(
276+
payload[vel_bytes + amp_bytes : vel_bytes + 2 * amp_bytes], dtype=np.uint8
277+
).reshape(n_cells, n_beams).copy()
278+
279+
return Ad2cpAvgPing(
280+
time=time,
281+
sound_speed=sound_speed,
282+
temperature=temperature,
283+
pressure=pressure,
284+
heading=heading,
285+
pitch=pitch,
286+
roll=roll,
287+
velocity=velocity,
288+
amplitude=amplitude,
289+
correlation=correlation,
290+
)
291+
292+
293+
def read_ad2cp_velocity(path: Path | str) -> "xr.Dataset":
294+
"""Read average-mode velocity data from a Nortek ``.ad2cp`` file.
295+
296+
Parameters
297+
----------
298+
path : Path or str
299+
Path to the ``.ad2cp`` binary file.
300+
301+
Returns
302+
-------
303+
xr.Dataset
304+
Dataset with dimensions ``(time, cell, beam)`` and variables:
305+
306+
- ``velocity``: int16 (mm/s)
307+
- ``amplitude``: uint8 (counts × 0.5 dB)
308+
- ``correlation``: uint8 (%)
309+
- ``sound_speed``, ``temperature``, ``pressure``: per-ping
310+
- ``heading``, ``pitch``, ``roll``: per-ping orientation
311+
312+
Coordinates include ``depth`` (m) computed from blanking + cell_size.
313+
"""
314+
import xarray as xr
315+
316+
path = Path(path)
317+
if not path.exists():
318+
raise FileNotFoundError(f"AD2CP file not found: {path}")
319+
320+
config: Ad2cpConfig | None = None
321+
avg_pings: list[Ad2cpAvgPing] = []
322+
bt_records: list[Ad2cpBottomTrack] = []
323+
324+
# First pass: extract config to get n_cells, n_beams
325+
n_cells = 0
326+
n_beams = 4
327+
cell_size = 0.0
328+
blanking = 0.0
329+
330+
with open(path, "rb") as f:
331+
file_size = f.seek(0, 2)
332+
f.seek(0)
333+
334+
while f.tell() < file_size - 10:
335+
pos = f.tell()
336+
hdr = f.read(10)
337+
if len(hdr) < 10 or hdr[0] != _SYNC_BYTE:
338+
break
339+
340+
hdr_size = hdr[1]
341+
id_byte = hdr[2]
342+
343+
if id_byte in (0x23, 0x24):
344+
f.seek(pos)
345+
hdr = f.read(12)
346+
if len(hdr) < 12:
347+
break
348+
data_size = struct.unpack_from("<I", hdr, 4)[0]
349+
else:
350+
data_size = struct.unpack_from("<H", hdr, 4)[0]
351+
352+
data = f.read(data_size)
353+
if len(data) < data_size:
354+
break
355+
356+
if id_byte == _ID_STRING:
357+
text = data.decode("ascii", errors="replace")
358+
config = _parse_config(text)
359+
# Parse GETAVG line for n_cells, n_beams, cell_size, blanking
360+
for line in text.split("\n"):
361+
line = line.strip("\r\n\x00 ")
362+
if line.startswith("GETAVG"):
363+
for token in line.split(","):
364+
if "=" not in token:
365+
continue
366+
k, v = token.split("=", 1)
367+
k = k.strip()
368+
v = v.strip().strip('"')
369+
if k == "NC":
370+
n_cells = int(v)
371+
elif k == "NB":
372+
n_beams = int(v)
373+
elif k == "CS":
374+
cell_size = float(v)
375+
elif k == "BD":
376+
blanking = float(v)
377+
378+
elif id_byte == _ID_AVERAGE and n_cells > 0:
379+
avg_pings.append(_parse_avg_data_record(data, n_cells, n_beams))
380+
381+
elif id_byte == _ID_BOTTOM_TRACK:
382+
bt_records.append(_parse_bt_data_record(data))
383+
384+
if config is None:
385+
config = Ad2cpConfig()
386+
387+
if not avg_pings:
388+
raise ValueError(
389+
f"No average velocity data found in {path.name}. "
390+
"File may contain only echosounder data."
391+
)
392+
393+
n_pings = len(avg_pings)
394+
395+
# Build depth coordinate
396+
depth = blanking + np.arange(n_cells) * cell_size
397+
398+
# Assemble arrays
399+
times = np.array([p.time for p in avg_pings])
400+
velocity = np.stack([p.velocity for p in avg_pings]) # (n_pings, n_cells, n_beams)
401+
amp = np.stack([p.amplitude for p in avg_pings])
402+
corr = np.stack([p.correlation for p in avg_pings])
403+
ss = np.array([p.sound_speed for p in avg_pings], dtype=np.float32)
404+
temp = np.array([p.temperature for p in avg_pings], dtype=np.float32)
405+
pres = np.array([p.pressure for p in avg_pings], dtype=np.float32)
406+
hdg = np.array([p.heading for p in avg_pings], dtype=np.float32)
407+
pit = np.array([p.pitch for p in avg_pings], dtype=np.float32)
408+
rol = np.array([p.roll for p in avg_pings], dtype=np.float32)
409+
410+
ds = xr.Dataset(
411+
{
412+
"velocity": (["time", "cell", "beam"], velocity.astype(np.float32) / 1000.0,
413+
{"units": "m/s", "long_name": "Current velocity"}),
414+
"amplitude": (["time", "cell", "beam"], amp,
415+
{"units": "counts", "long_name": "Echo amplitude (×0.5 dB)"}),
416+
"correlation": (["time", "cell", "beam"], corr,
417+
{"units": "%", "long_name": "Signal correlation"}),
418+
"sound_speed": ("time", ss, {"units": "m/s"}),
419+
"temperature": ("time", temp, {"units": "°C"}),
420+
"pressure": ("time", pres, {"units": "dbar"}),
421+
"heading": ("time", hdg, {"units": "degrees"}),
422+
"pitch": ("time", pit, {"units": "degrees"}),
423+
"roll": ("time", rol, {"units": "degrees"}),
424+
},
425+
coords={
426+
"time": times,
427+
"depth": ("cell", depth),
428+
"beam": np.arange(n_beams),
429+
},
430+
attrs={
431+
"source_file": path.name,
432+
"instrument_type": config.instrument_type,
433+
"serial_number": config.serial_number,
434+
"n_cells": n_cells,
435+
"n_beams": n_beams,
436+
"cell_size_m": cell_size,
437+
"blanking_m": blanking,
438+
"coord_sys": "beam",
439+
},
440+
)
441+
442+
logger.info(
443+
"AD2CP velocity read: %s — %d pings, %d cells, %d beams",
444+
path.name, n_pings, n_cells, n_beams,
445+
)
446+
447+
return ds
222448

223449
def read_ad2cp(path: Path | str) -> xr.Dataset:
224450
"""Read a Nortek Signature ``.ad2cp`` file.

oceanstream/adcp/processor.py

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import logging
55
from pathlib import Path
66
from time import perf_counter
7-
from typing import TYPE_CHECKING
7+
from typing import TYPE_CHECKING, Any
8+
9+
import numpy as np
810

911
if TYPE_CHECKING:
1012
import pandas as pd
@@ -166,6 +168,74 @@ def process_ad2cp_file(
166168
return df
167169

168170

171+
def process_ad2cp_velocity_file(
172+
raw_path: Path | str,
173+
) -> pd.DataFrame:
174+
"""Process a Nortek AD2CP ``.ad2cp`` file with velocity data.
175+
176+
For files containing average-mode current velocity measurements
177+
(no echosounder data).
178+
179+
Pipeline: read_ad2cp_velocity → flatten to DataFrame.
180+
181+
Parameters
182+
----------
183+
raw_path : Path or str
184+
Path to the ``.ad2cp`` binary file.
185+
186+
Returns
187+
-------
188+
pd.DataFrame
189+
One row per (time, depth) with columns: ``time``, ``depth``,
190+
``velocity_beam0..N`` (m/s), ``amplitude_beam0..N``,
191+
``temperature``, ``pressure``, ``sound_speed``, ``heading``,
192+
``pitch``, ``roll``.
193+
"""
194+
from .ad2cp_reader import read_ad2cp_velocity
195+
196+
import pandas as pd_
197+
198+
raw_path = Path(raw_path)
199+
ds = read_ad2cp_velocity(raw_path)
200+
201+
n_times = ds.sizes["time"]
202+
n_cells = ds.sizes["cell"]
203+
n_beams = ds.sizes["beam"]
204+
205+
# Build time × cell grid using vectorized operations
206+
times_rep = np.repeat(ds.time.values, n_cells)
207+
depths_rep = np.tile(ds["depth"].values, n_times)
208+
209+
data: dict[str, Any] = {
210+
"time": times_rep,
211+
"depth": depths_rep,
212+
}
213+
214+
# Per-ping scalars — repeat for each cell
215+
for var in ("temperature", "pressure", "sound_speed", "heading", "pitch", "roll"):
216+
data[var] = np.repeat(ds[var].values, n_cells)
217+
218+
# Velocity and amplitude per beam — reshape (time, cell, beam) → (time*cell, beam)
219+
vel_flat = ds["velocity"].values.reshape(-1, n_beams)
220+
amp_flat = ds["amplitude"].values.reshape(-1, n_beams)
221+
for b in range(n_beams):
222+
data[f"velocity_beam{b}"] = vel_flat[:, b]
223+
data[f"amplitude_beam{b}"] = amp_flat[:, b]
224+
225+
df = pd_.DataFrame(data)
226+
if "time" in df.columns:
227+
df["time"] = pd_.to_datetime(df["time"], utc=True)
228+
229+
logger.info(
230+
"AD2CP velocity processed: %s — %d rows (%d pings × %d cells)",
231+
raw_path.name,
232+
len(df),
233+
ds.sizes["time"],
234+
ds.sizes["cell"],
235+
)
236+
return df
237+
238+
169239
def process_file(
170240
raw_path: Path | str,
171241
transducer_depth: float = 7.0,

0 commit comments

Comments
 (0)