Skip to content

Commit 8e3d5a9

Browse files
midgemacfMacFarland, Midgie
andauthored
release/1.1.5 (#13)
## [1.1.5] - 2025-09-22 ### Fixed - 🩹 More durable timestamp extrapolation in time data insertion - 🩹 Using shutil.move instead of pathlib.Path.rename to allow for differing file systems - 🩹 Broken path in docs ### Changed - ⚡ Added additional safeguards in TWST Collection to prevent zombie processes - ⚡ How random data uses the load/send when used as a class method for more durability ### Added - 🔥 Better probe identification in logs - 🔥 Filter on files being ingested by ADVA probes to only attempt files which match expected naming convention from input directory ### Removed - 🗡️ static version of openSAMPL in docs instructions --------- Co-authored-by: MacFarland, Midgie <macfarlandmj@ornl.gov>
1 parent 32109fc commit 8e3d5a9

9 files changed

Lines changed: 175 additions & 37 deletions

File tree

CHANGELOG.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,23 @@ This project adheres to [Semantic Versioning](https://semver.org/).
3737
*Unreleased* versions radiate potential—-and dread. Once you merge an infernal PR, move its bullet under a new version heading with the actual release date.*
3838
3939
-->
40+
## [1.1.5] - 2025-09-22
41+
### Fixed
42+
- 🩹 More durable timestamp extrapolation in time data insertion
43+
- 🩹 Using shutil.move instead of pathlib.Path.rename to allow for differing file systems
44+
- 🩹 Broken path in docs
45+
46+
### Changed
47+
- ⚡ Added additional safeguards in TWST Collection to prevent zombie processes
48+
- ⚡ How random data uses the load/send when used as a class method for more durability
49+
50+
### Added
51+
- 🔥 Better probe identification in logs
52+
- 🔥 Filter on files being ingested by ADVA probes to only attempt files which match expected naming convention from input directory
53+
54+
### Removed
55+
- 🗡️ static version of openSAMPL in docs instructions
56+
4057
## [1.1.4] - 2025-08-22
4158
### Added
4259
- 🔥 Random data generation for all supported probes

docs/getting-started/installation.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
1. Ensure you have Python 3.9 or higher installed
66
2. Pip install the latest version of openSAMPL:
77
```bash
8-
pip install opensampl==0.2.0
8+
pip install opensampl
99
```
1010

1111
## Installation for Developers

docs/getting-started/quickstart.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ This creates:
1313
In order to ingest data, you simply have to run `opensampl load` with appropriate arguments, and it will go straight into your new TimescaleDB
1414
instance and become visible on Grafana.
1515

16-
See the [Configuration](../guides/configuration#opensampl-server) page on configuring your server instance.
16+
See the [Configuration](../guides/configuration.md#opensampl-server) page on configuring your server instance.

docs/guides/opensampl-server.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ The openSAMPL Server CLI provides commands for managing a Docker Compose deploym
1616
- Docker and Docker Compose installed
1717
- The openSAMPL Python package installed
1818

19-
Install with the server extra:
19+
Install openSAMPL as normal:
2020
```
21-
pip install "opensampl==0.2.0[server]"
21+
pip install opensampl
2222
```
2323

2424
## Basic Commands
@@ -131,6 +131,17 @@ opensampl-server up --env-file ./my-custom-env.env
131131
- `BACKEND_URL=http://localhost:8015`
132132
- `ROUTE_TO_BACKEND=true`
133133

134+
## Power users
135+
136+
For those who are more familiar with docker, there is a `opensampl-server2` which corresponds to the following, more directly
137+
exposing the docker to users.
138+
`OPENSAMPL_SERVER__COMPOSE_FILE` is set in your .env file or environment.
139+
140+
```bash
141+
opensampl-server2 --env-file ENV_FILE args
142+
docker compose --env-file ${ENV_FILE} -f ${OPENSAMPL_SERVER__COMPOSE_FILE} $@
143+
```
144+
134145
## Troubleshooting
135146

136147
If you encounter issues:
@@ -172,3 +183,5 @@ nano ./dev.env
172183
# Start the server with custom environment
173184
opensampl-server up --env-file ./dev.env
174185
```
186+
187+

opensampl/collect/microchip/twst/readings.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
import asyncio
99
from typing import Optional
1010

11+
from loguru import logger
12+
1113
from opensampl.collect.modem import ModemReader, require_conn
1214

15+
SENTINEL = b"--openSAMPL stop reading--" # type: ignore[assignment]
16+
1317

1418
class ModemStatusReader(ModemReader):
1519
"""
@@ -34,6 +38,7 @@ def __init__(self, host: str, duration: int = 60, keys: Optional[list[str]] = No
3438
self.keys = keys
3539
self.queue = asyncio.Queue()
3640
self.readings = []
41+
self.continue_reading = False
3742
super().__init__(host=host, port=port)
3843

3944
@require_conn
@@ -43,10 +48,16 @@ async def reader_task(self):
4348
4449
Reads lines from the telnet connection and queues them for processing.
4550
"""
46-
while True:
47-
line = await self.reader.readline()
48-
if line:
51+
try:
52+
while self.continue_reading:
53+
line = await asyncio.wait_for(self.reader.readline(), timeout=5.0)
54+
if not line:
55+
break # EOF
4956
await self.queue.put(line)
57+
except asyncio.TimeoutError:
58+
logger.debug(f"Timeout waiting for data from {self.host}:{self.port}")
59+
finally:
60+
await self.queue.put(SENTINEL)
5061

5162
def parse_line(self, line: str):
5263
"""
@@ -90,11 +101,16 @@ async def processor_task(self):
90101
"""
91102
while True:
92103
line = await self.queue.get()
93-
parsed = self.parse_line(line)
94-
if parsed:
95-
timestamp, definition, value = parsed
96-
if self.should_keep(definition):
97-
self.readings.append(parsed)
104+
try:
105+
if line == SENTINEL:
106+
break
107+
parsed = self.parse_line(line)
108+
if parsed:
109+
timestamp, definition, value = parsed
110+
if self.should_keep(definition):
111+
self.readings.append(parsed)
112+
finally:
113+
self.queue.task_done()
98114

99115
async def collect_readings(self):
100116
"""
@@ -104,11 +120,15 @@ async def collect_readings(self):
104120
duration, then cancels the tasks.
105121
"""
106122
async with self.connect():
123+
self.continue_reading = True
107124
read_coroutine = asyncio.create_task(self.reader_task())
108125
process_coroutine = asyncio.create_task(self.processor_task())
109126

110127
try:
111128
await asyncio.sleep(self.duration)
129+
self.continue_reading = False
130+
await self.queue.join()
131+
112132
finally:
113133
# Cancel tasks and wait for them to complete
114134
read_coroutine.cancel()

opensampl/load_data.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def load_time_data(
113113
if not isinstance(session, Session):
114114
raise TypeError("Session must be a SQLAlchemy session")
115115

116+
probe_readable = str(probe_key)
116117
try:
117118
from opensampl.load.data import DataFactory
118119

@@ -124,6 +125,10 @@ def load_time_data(
124125
strict=strict,
125126
session=session,
126127
)
128+
probe_readable = (
129+
data_definition.probe.name # ty: ignore[possibly-unbound-attribute]
130+
or f"{data_definition.probe.ip_address} ({data_definition.probe.probe_id})" # ty: ignore[possibly-unbound-attribute]
131+
)
127132

128133
if any(x is None for x in [data_definition.probe, data_definition.metric, data_definition.reference]):
129134
raise RuntimeError(f"Not all required definition fields filled: {data_definition.dump_factory()}") # noqa: TRY301
@@ -134,9 +139,8 @@ def load_time_data(
134139
df["metric_type_uuid"] = data_definition.metric.uuid # ty: ignore[possibly-unbound-attribute]
135140
logger.debug(df.head())
136141
# Ensure correct dtypes
137-
df["time"] = pd.to_datetime(df["time"], utc=True, errors="raise")
142+
df["time"] = pd.to_datetime(df["time"], format="mixed", utc=True, errors="raise")
138143
df["value"] = df["value"].apply(json.dumps)
139-
140144
records = df.to_dict(orient="records")
141145
insert_stmt = text(f"""
142146
INSERT INTO {ProbeData.__table__.schema}.{ProbeData.__tablename__}
@@ -152,16 +156,20 @@ def load_time_data(
152156
total_rows = len(records)
153157
inserted = result.rowcount # ty: ignore[unresolved-attribute]
154158
excluded = total_rows - inserted
155-
logger.warning(f"Inserted {inserted}/{total_rows} rows; {excluded}/{total_rows} rejected due to conflicts")
159+
160+
logger.warning(
161+
f"Inserted {inserted}/{total_rows} rows for {probe_readable}; "
162+
f"{excluded}/{total_rows} rejected due to conflicts"
163+
)
156164

157165
except Exception as e:
158166
# In case of an error, roll back the session
159167
session.rollback()
160-
logger.error(f"Error inserting rows: {e}")
168+
logger.error(f"Error inserting rows for {probe_readable}: {e}")
161169
raise
162170

163171
except Exception as e:
164-
logger.exception(f"Error writing time data: {e}")
172+
logger.exception(f"Error writing time data for {probe_readable}: {e}")
165173
session.rollback()
166174
raise
167175

opensampl/vendors/adva.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import re
66
from datetime import datetime, timezone
77
from pathlib import Path
8-
from typing import TextIO, Union
8+
from typing import ClassVar, TextIO, Union
99

1010
import click
1111
import pandas as pd
@@ -25,6 +25,13 @@ class AdvaProbe(BaseProbe):
2525
start_time: datetime
2626
vendor = VENDORS.ADVA
2727

28+
file_pattern: ClassVar = re.compile(
29+
r"(?P<ip>\d+\.\d+\.\d+\.\d+)(?P<type>CLOCK_PROBE|PTP_CLOCK_PROBE)"
30+
r"-(?P<identifier>\d+-\d+)-"
31+
r"(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)-"
32+
r"(?P<hour>\d+)-(?P<minute>\d+)-(?P<second>\d+)\.txt(?:\.gz)?"
33+
)
34+
2835
class RandomDataConfig(BaseProbe.RandomDataConfig):
2936
"""Model for storing random data generation configurations as provided by CLI or YAML"""
3037

@@ -60,20 +67,19 @@ def __init__(self, input_file: Union[str, Path]):
6067
super().__init__(input_file=input_file)
6168
self.probe_key, self.timestamp = self.parse_file_name(self.input_file)
6269

70+
@classmethod
71+
def filter_files(cls, files: list[Path]) -> list[Path]:
72+
"""Filter the files found in the input directory when loading to those which match the regex"""
73+
return [f for f in files if cls.file_pattern.fullmatch(f.name)]
74+
6375
@classmethod
6476
def parse_file_name(cls, file_name: Path) -> tuple[ProbeKey, datetime]:
6577
"""
6678
Parse file name into identifying parts
6779
6880
Expected format: <ip_address>CLOCK_PROBE-<probe_id>-YYYY-MM-DD-HH-MM-SS.txt.gz
6981
"""
70-
pattern = (
71-
r"(?P<ip>\d+\.\d+\.\d+\.\d+)(?P<type>CLOCK_PROBE|PTP_CLOCK_PROBE)"
72-
r"-(?P<identifier>\d+-\d+)-"
73-
r"(?P<year>\d+)-(?P<month>\d+)-(?P<day>\d+)-"
74-
r"(?P<hour>\d+)-(?P<minute>\d+)-(?P<second>\d+)\.txt(?:\.gz)?"
75-
)
76-
match = re.match(pattern, file_name.name)
82+
match = re.match(cls.file_pattern, file_name.name)
7783
if match:
7884
ip_address = match.group("ip")
7985
probe_id = match.group("identifier")

0 commit comments

Comments
 (0)