Skip to content

Commit 80a95d8

Browse files
dwr-psandhuCopilot
andcommitted
feat: update CSV to DSS conversion to use '15min' resampling and improve path handling
Co-authored-by: Copilot <copilot@github.com>
1 parent 551c691 commit 80a95d8

4 files changed

Lines changed: 206 additions & 18 deletions

File tree

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ pydsm csv-to-dss CSV_FILE DSS_FILE [OPTIONS]
138138
| `--unit` | `UNK` | Physical unit string |
139139
| `--period_type` | `INST-VAL` | Period type (e.g. `INST-VAL`, `PER-AVER`) |
140140
| `--multiplier` | `1.0` | Scale factor applied to all values |
141-
| `--resample_to` | `15T` | Pandas resample frequency (e.g. `15T`, `1H`, `1D`) |
141+
| `--resample_to` | `15min` | Pandas resample frequency (e.g. `15min`, `1h`, `1D`) |
142142

143143
```bash
144-
pydsm csv-to-dss observed_ec.csv observed_ec.dss --bpart RSAC075 --unit uS/cm --resample_to 15T
144+
pydsm csv-to-dss observed_ec.csv observed_ec.dss --bpart RSAC075 --unit uS/cm --resample_to 15min
145145
```
146146

147147
---

pydsm/analysis/dssutils.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -347,20 +347,32 @@ def csv_to_dss(
347347
unit="UNK",
348348
period_type="INST-VAL",
349349
multiplier=1.0,
350-
resample_to="15T",
350+
resample_to="15min",
351351
):
352+
"""Convert a CSV file to a DSS file.
353+
354+
Column names in the CSV header are used as the B-part of each DSS path.
355+
The index column (default 0) is parsed as the datetime index.
356+
D and E parts are inferred from the time series.
357+
"""
352358
df = pd.read_csv(csv_file, index_col=index_col, parse_dates=True)
353359
df = df * multiplier
354360
df = df.resample(resample_to).mean()
355-
for c in df.columns:
356-
with pyhecdss.DSSFile(dss_file, create_new=True) as f:
357-
for c in tqdm.tqdm(df.columns):
358-
bpart = c
359-
ts = df[c]
360-
epart = ts.index.freqstr
361-
pathname = f"/{apart}/{bpart}/{cpart}///{fpart}/"
362-
print("Writing to ", pathname)
363-
f.write_rts(pathname, ts, unit, period_type)
361+
with pyhecdss.DSSFile(dss_file, create_new=True) as f:
362+
for c in tqdm.tqdm(df.columns):
363+
ts = df[c].dropna()
364+
if ts.empty:
365+
print(f"Skipping {c!r} — all values are NaN")
366+
continue
367+
# dropna() drops the freq attribute; restore it so write_rts can
368+
# determine the E-part from the index frequency.
369+
if ts.index.freq is None and len(ts) > 1:
370+
inferred = pd.infer_freq(ts.index)
371+
if inferred is not None:
372+
ts.index.freq = pd.tseries.frequencies.to_offset(inferred)
373+
pathname = f"/{apart}/{c}/{cpart}///{fpart}/"
374+
print("Writing to ", pathname)
375+
f.write_rts(pathname, ts, unit, period_type)
364376
print("Done")
365377

366378

pydsm/cli.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ def pretty_print_input(input_file, output_file=None):
282282
@click.argument("dss_file", type=click.Path(exists=False))
283283
@click.option("--index_col", default=0, help="Column to use as index")
284284
@click.option("--apart", default="A", help="A part of the DSS path")
285-
@click.option("--bpart", default="F", help="B part of the DSS path")
285+
@click.option("--cpart", default="FLOW", help="C part (variable/parameter) of the DSS path. B parts are read from the CSV column headers.")
286286
@click.option("--fpart", default="F", help="F part of the DSS path")
287287
@click.option("--unit", default="UNK", help="Unit of the data")
288288
@click.option(
@@ -296,30 +296,32 @@ def pretty_print_input(input_file, output_file=None):
296296
)
297297
@click.option(
298298
"--resample_to",
299-
default="15T",
300-
help="Resample frequency for the time series (e.g., '15T' for 15 minutes)",
299+
default="15min",
300+
help="Resample frequency for the time series (e.g., '15min' for 15 minutes)",
301301
)
302302
def csv_to_dss(
303303
csv_file,
304304
dss_file,
305305
index_col=0,
306306
apart="A",
307-
bpart="F",
307+
cpart="FLOW",
308308
fpart="F",
309309
unit="UNK",
310310
period_type="INST-VAL",
311311
multiplier=1.0,
312-
resample_to="15T",
312+
resample_to="15min",
313313
):
314314
"""
315315
Convert a CSV file to a DSS file.
316+
317+
Column headers in the CSV are used as the B part of each DSS path.
316318
"""
317319
dssutils.csv_to_dss(
318320
csv_file,
319321
dss_file,
320322
index_col,
321323
apart,
322-
bpart,
324+
cpart,
323325
fpart,
324326
unit,
325327
period_type,

tests/test_csv_to_dss.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
"""Tests for dssutils.csv_to_dss — CSV header → DSS B-part conversion."""
2+
3+
import textwrap
4+
from pathlib import Path
5+
6+
import pandas as pd
7+
import pytest
8+
9+
import pyhecdss
10+
from pydsm.analysis.dssutils import csv_to_dss
11+
12+
DATA_DIR = Path(__file__).parent / "data"
13+
14+
15+
# ---------------------------------------------------------------------------
16+
# Helpers
17+
# ---------------------------------------------------------------------------
18+
19+
def _read_dss_paths(dss_file):
20+
"""Return the list of DSS pathnames present in *dss_file*."""
21+
with pyhecdss.DSSFile(str(dss_file)) as f:
22+
return f.get_pathnames(f.read_catalog())
23+
24+
25+
def _read_dss_ts(dss_file, pathname):
26+
"""Read a single time series from *dss_file* and return a Series."""
27+
with pyhecdss.DSSFile(str(dss_file)) as f:
28+
ts, _unit, _period_type = f.read_rts(pathname)
29+
if isinstance(ts, pd.DataFrame):
30+
ts = ts.iloc[:, 0]
31+
if isinstance(ts.index, pd.PeriodIndex):
32+
ts.index = ts.index.to_timestamp()
33+
return ts
34+
35+
36+
# ---------------------------------------------------------------------------
37+
# Fixtures
38+
# ---------------------------------------------------------------------------
39+
40+
@pytest.fixture
41+
def daily_csv(tmp_path):
42+
"""A minimal CSV with three columns and a daily DatetimeIndex."""
43+
content = textwrap.dedent("""\
44+
,flow_sac,flow_sjr,ec_bdl
45+
2020-01-01,100.0,50.0,200.0
46+
2020-01-02,110.0,55.0,210.0
47+
2020-01-03,120.0,60.0,220.0
48+
2020-01-04,130.0,65.0,230.0
49+
2020-01-05,140.0,70.0,240.0
50+
""")
51+
p = tmp_path / "test_input.csv"
52+
p.write_text(content)
53+
return p
54+
55+
56+
@pytest.fixture
57+
def csv_with_nans(tmp_path):
58+
"""CSV where one column has NaN values (should be skipped for that record)."""
59+
content = textwrap.dedent("""\
60+
,flow_a,flow_b
61+
2020-01-01,100.0,
62+
2020-01-02,110.0,55.0
63+
2020-01-03,120.0,60.0
64+
""")
65+
p = tmp_path / "test_nans.csv"
66+
p.write_text(content)
67+
return p
68+
69+
70+
# ---------------------------------------------------------------------------
71+
# Tests
72+
# ---------------------------------------------------------------------------
73+
74+
class TestCsvToDssColumnHeaders:
75+
"""Column names become B-parts in the DSS path."""
76+
77+
def test_paths_match_column_names(self, daily_csv, tmp_path):
78+
dss = tmp_path / "out.dss"
79+
csv_to_dss(str(daily_csv), str(dss), resample_to="1D")
80+
paths = _read_dss_paths(dss)
81+
bparts = [p.split("/")[2].lower() for p in paths]
82+
assert "flow_sac" in bparts
83+
assert "flow_sjr" in bparts
84+
assert "ec_bdl" in bparts
85+
86+
def test_number_of_paths(self, daily_csv, tmp_path):
87+
dss = tmp_path / "out.dss"
88+
csv_to_dss(str(daily_csv), str(dss), resample_to="1D")
89+
paths = _read_dss_paths(dss)
90+
assert len(paths) == 3
91+
92+
def test_apart_used(self, daily_csv, tmp_path):
93+
dss = tmp_path / "out.dss"
94+
csv_to_dss(str(daily_csv), str(dss), apart="MYAPART", resample_to="1D")
95+
paths = _read_dss_paths(dss)
96+
assert all(p.split("/")[1].upper() == "MYAPART" for p in paths)
97+
98+
def test_cpart_used(self, daily_csv, tmp_path):
99+
dss = tmp_path / "out.dss"
100+
csv_to_dss(str(daily_csv), str(dss), cpart="FLOW", resample_to="1D")
101+
paths = _read_dss_paths(dss)
102+
assert all(p.split("/")[3].upper() == "FLOW" for p in paths)
103+
104+
def test_fpart_used(self, daily_csv, tmp_path):
105+
dss = tmp_path / "out.dss"
106+
csv_to_dss(str(daily_csv), str(dss), fpart="VER1", resample_to="1D")
107+
paths = _read_dss_paths(dss)
108+
assert all(p.split("/")[6].upper() == "VER1" for p in paths)
109+
110+
111+
class TestCsvToDssValues:
112+
"""Data values are written correctly."""
113+
114+
def test_values_match_input(self, daily_csv, tmp_path):
115+
dss = tmp_path / "out.dss"
116+
csv_to_dss(str(daily_csv), str(dss), resample_to="1D")
117+
paths = _read_dss_paths(dss)
118+
sac_path = next(p for p in paths if "/FLOW_SAC/" in p)
119+
ts = _read_dss_ts(dss, sac_path)
120+
assert pytest.approx(ts.iloc[0], rel=1e-4) == 100.0
121+
assert pytest.approx(ts.iloc[-1], rel=1e-4) == 140.0
122+
123+
def test_multiplier_applied(self, daily_csv, tmp_path):
124+
dss = tmp_path / "out.dss"
125+
csv_to_dss(str(daily_csv), str(dss), multiplier=2.0, resample_to="1D")
126+
paths = _read_dss_paths(dss)
127+
sac_path = next(p for p in paths if "/FLOW_SAC/" in p)
128+
ts = _read_dss_ts(dss, sac_path)
129+
assert pytest.approx(ts.iloc[0], rel=1e-4) == 200.0
130+
131+
def test_nan_rows_dropped(self, csv_with_nans, tmp_path):
132+
"""NaN values must not be written (dropna() before write_rts)."""
133+
dss = tmp_path / "out.dss"
134+
csv_to_dss(str(csv_with_nans), str(dss), resample_to="1D")
135+
paths = _read_dss_paths(dss)
136+
flow_b_path = next((p for p in paths if "/FLOW_B/" in p), None)
137+
assert flow_b_path is not None
138+
ts = _read_dss_ts(dss, flow_b_path)
139+
assert ts.isna().sum() == 0
140+
141+
def test_all_nan_column_skipped(self, tmp_path):
142+
"""A column that is entirely NaN must be silently skipped."""
143+
content = textwrap.dedent("""\
144+
,flow_a,all_nan
145+
2020-01-01,100.0,
146+
2020-01-02,110.0,
147+
2020-01-03,120.0,
148+
""")
149+
csv_file = tmp_path / "all_nan.csv"
150+
csv_file.write_text(content)
151+
dss = tmp_path / "all_nan.dss"
152+
csv_to_dss(str(csv_file), str(dss), resample_to="1D")
153+
paths = _read_dss_paths(dss)
154+
bparts = [p.split("/")[2].lower() for p in paths]
155+
assert "flow_a" in bparts
156+
assert "all_nan" not in bparts
157+
158+
159+
class TestCsvToDssSingleColumn:
160+
"""A single-column CSV writes one path whose B-part equals the column name."""
161+
162+
def test_single_column(self, tmp_path):
163+
content = textwrap.dedent("""\
164+
datetime,my_station
165+
2021-06-01,500.0
166+
2021-06-02,510.0
167+
""")
168+
csv_file = tmp_path / "single.csv"
169+
csv_file.write_text(content)
170+
dss = tmp_path / "single.dss"
171+
csv_to_dss(str(csv_file), str(dss), resample_to="1D")
172+
paths = _read_dss_paths(dss)
173+
assert len(paths) == 1
174+
assert paths[0].split("/")[2].lower() == "my_station"

0 commit comments

Comments
 (0)