Skip to content

Commit 1332fa0

Browse files
EliEli
authored andcommitted
Eliminated rogue time series reader and fixed typing bug for case where all flags are missing (NaN). Also eliminated guard against new files that has some use in update_data but not for update_flagged_data.
1 parent 2662e31 commit 1332fa0

2 files changed

Lines changed: 14 additions & 74 deletions

File tree

dms_datastore/reconcile_data.py

Lines changed: 13 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from dms_datastore.inventory import to_wildcard
4848
from dms_datastore.read_ts import original_header
4949
from dms_datastore import dstore_config
50+
from dms_datastore.read_ts import read_ts, read_flagged
5051
import logging
5152
logger = logging.getLogger(__name__)
5253

@@ -268,66 +269,6 @@ def _stable_u01(key: str) -> float:
268269
return u / float(2**64)
269270

270271

271-
def _read_csv_timeseries(path: str) -> pd.DataFrame:
272-
""" Read a dms-datastore CSV file into a DataFrame.
273-
274-
Parameters
275-
----------
276-
path : str
277-
Path to a CSV file with a commented YAML-like header and a ``datetime``
278-
column.
279-
280-
Returns
281-
-------
282-
df : pandas.DataFrame
283-
DataFrame indexed by a ``DatetimeIndex``.
284-
285-
Raises
286-
------
287-
ValueError
288-
If the file does not produce a DatetimeIndex or contains duplicate
289-
timestamps.
290-
291-
Notes
292-
-----
293-
This reader forces ``dtype={'user_flag': str}`` so that screened flag columns do
294-
not become floats due to NA inference. Normalization to nullable Int64 is
295-
performed separately by :func:`_normalize_flag`.
296-
"""
297-
298-
299-
df = pd.read_csv(
300-
path,
301-
comment="#",
302-
parse_dates=["datetime"],
303-
index_col="datetime",
304-
dtype={"user_flag": str},
305-
)
306-
if not isinstance(df.index, pd.DatetimeIndex):
307-
raise ValueError(f"Expected datetime index in {path}")
308-
if df.index.has_duplicates:
309-
310-
# Provide actionable detail while remaining fail-fast.
311-
# Report up to N duplicate timestamp values with their multiplicities.
312-
N = 25
313-
vc = df.index.value_counts()
314-
dups = vc[vc > 1].sort_index()
315-
316-
# Build a compact message; avoid dumping huge lists.
317-
total_dup_rows = int((dups - 1).sum()) # extra rows beyond the first per timestamp
318-
n_dup_keys = int(dups.shape[0])
319-
320-
preview = dups.iloc[:N]
321-
preview_txt = ", ".join([f"{ts.isoformat()}×{int(cnt)}" for ts, cnt in preview.items()])
322-
more = "" if n_dup_keys <= N else f" (+{n_dup_keys - N} more)"
323-
324-
raise ValueError(
325-
"Duplicate timestamps in "
326-
f"{path}: {n_dup_keys} duplicated timestamps, {total_dup_rows} extra rows. "
327-
f"Examples: {preview_txt}{more}"
328-
)
329-
return df
330-
331272

332273
def _values_equal(
333274
a: pd.DataFrame, b: pd.DataFrame, *, atol: float, rtol: float
@@ -617,7 +558,9 @@ def _merge_screened_flags(
617558

618559
# For equal-value timestamps: keep repo value to reduce churn, and merge flags.
619560
if equal.any():
620-
out.loc[equal, "value"] = rv.loc[equal, "value"]
561+
vals = rv.loc[equal, "value"]
562+
if not vals.isna().all():
563+
out.loc[equal, "value"] = vals
621564

622565
rf = rflag.loc[equal]
623566
sf = sflag.loc[equal]
@@ -823,8 +766,8 @@ def update_repo(
823766
continue
824767

825768
# Parsed compare to ignore harmless numeric/formatting differences
826-
sdf = _read_csv_timeseries(spath)
827-
rdf = _read_csv_timeseries(rpath)
769+
sdf = read_ts(spath,force_regular=True)
770+
rdf = read_ts(rpath,force_regular=True)
828771
if list(sdf.columns) != list(rdf.columns):
829772
raise ValueError(
830773
f"Column mismatch for {series_id} shard {shard}: "
@@ -928,7 +871,7 @@ def update_repo(
928871
if a.repo_path is None or (not os.path.exists(a.repo_path)):
929872
dest = os.path.join(repo_dir, os.path.basename(a.staged_path))
930873
head = original_header(a.staged_path)
931-
df = _read_csv_timeseries(a.staged_path)
874+
df = read_ts(a.staged_path)
932875
_write_preserving_header(df=df, dest_path=dest, header_text=head)
933876
continue
934877
head = original_header(a.repo_path)
@@ -995,11 +938,7 @@ def update_flagged_data(
995938
for series_id, staged_shards in staged_map.items():
996939
repo_shards = repo_map.get(series_id, {})
997940

998-
if not repo_shards and not allow_new_series:
999-
raise ValueError(
1000-
f"No namesake series found in repo for series_id={series_id!r}. "
1001-
f"Set allow_new_series=True to initialize new series."
1002-
)
941+
1003942
for shard, spath in staged_shards.items():
1004943
rpath = repo_shards.get(shard)
1005944
dest = (
@@ -1025,8 +964,8 @@ def update_flagged_data(
1025964
if _hash_data_section(spath) == _hash_data_section(rpath):
1026965
continue
1027966

1028-
sdf = _read_csv_timeseries(spath)
1029-
rdf = _read_csv_timeseries(rpath)
967+
sdf = read_flagged(spath, apply_flags=False, return_flags=True)
968+
rdf = read_flagged(rpath, apply_flags=False, return_flags=True)
1030969
merged = _merge_screened_flags(
1031970
rdf,
1032971
sdf,
@@ -1073,11 +1012,11 @@ def update_flagged_data(
10731012
)
10741013

10751014
if a.reason == "missing_in_repo":
1076-
df = _read_csv_timeseries(a.staged_path)
1015+
df = read_flagged(a.staged_path, apply_flags=False, return_flags=True)
10771016
else:
1078-
sdf = _read_csv_timeseries(a.staged_path)
1017+
sdf = read_flagged(a.staged_path, apply_flags=False, return_flags=True)
10791018
rdf = (
1080-
_read_csv_timeseries(a.repo_path)
1019+
read_flagged(a.repo_path, apply_flags=False, return_flags=True)
10811020
if os.path.exists(a.repo_path)
10821021
else sdf.iloc[0:0]
10831022
)

dms_datastore/update_flagged_data.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
)
1414

1515

16+
1617
@click.command(context_settings={"help_option_names": ["-h", "--help"]})
1718
@click.argument("staged_dir", type=click.Path(exists=True, file_okay=False, dir_okay=True))
1819
@click.argument("repo_dir", type=click.Path(exists=True, file_okay=False, dir_okay=True))

0 commit comments

Comments
 (0)