Skip to content

Commit 4d8eb48

Browse files
jpvelezcursoragent
andcommitted
fix(census): resolve type errors and missing import in check_csv_parquet_match
- Inline _parse_pums_partition_path (remove dependency on missing convert script) - Use cast() for polars collect() return type (InProcessQuery | DataFrame) - Replace .item() with .row(0)[0] for row count extraction Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 8f8dca4 commit 4d8eb48

1 file changed

Lines changed: 186 additions & 0 deletions

File tree

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
#!/usr/bin/env python3
2+
"""Temporary script: verify CSV row counts and optionally values match Parquet.
3+
4+
Compares the CSV tree (after unzip) to the parquet tree produced by
5+
convert_pums_csv_to_parquet. Reports per-partition row count mismatches and
6+
optionally does a value-level check (--values).
7+
8+
Usage:
9+
uv run python data/census/pums/check_csv_parquet_match.py --csv-dir csv --parquet-dir parquet
10+
uv run python data/census/pums/check_csv_parquet_match.py --csv-dir csv --parquet-dir parquet --values
11+
"""
12+
13+
from __future__ import annotations
14+
15+
import argparse
16+
import sys
17+
from pathlib import Path
18+
from typing import cast
19+
20+
import polars as pl
21+
22+
# Partition pattern: survey/year/record_type/state=XX (same as convert script)
23+
24+
25+
def _parse_pums_partition_path(part_dir: Path) -> tuple[str, int, str, str] | None:
26+
"""Parse survey/year/record_type/state=XX from a partition directory path."""
27+
parts = part_dir.resolve().parts
28+
if len(parts) < 4:
29+
return None
30+
state_part = parts[-1]
31+
if not state_part.startswith("state="):
32+
return None
33+
state = state_part[6:]
34+
record_type = parts[-2]
35+
if record_type not in ("person", "housing"):
36+
return None
37+
try:
38+
end_year = int(parts[-3])
39+
except ValueError:
40+
return None
41+
survey = parts[-4]
42+
if survey not in ("acs1", "acs5"):
43+
return None
44+
return (survey, end_year, record_type, state)
45+
46+
47+
def _row_count_csv(part_dir: Path) -> int:
48+
"""Total rows from all *.csv in partition dir (same as convert combines)."""
49+
csv_files = list(part_dir.glob("*.csv"))
50+
if not csv_files:
51+
return 0
52+
csv_glob = str(part_dir / "*.csv")
53+
df = cast(pl.DataFrame, pl.scan_csv(csv_glob).select(pl.len()).collect())
54+
return int(df.row(0)[0])
55+
56+
57+
def _row_count_parquet(parquet_file: Path) -> int:
58+
"""Row count of single data.parquet."""
59+
if not parquet_file.exists():
60+
return -1
61+
df = cast(pl.DataFrame, pl.scan_parquet(parquet_file).select(pl.len()).collect())
62+
return int(df.row(0)[0])
63+
64+
65+
def check_row_counts(csv_dir: Path, parquet_dir: Path) -> tuple[bool, int, int]:
66+
"""Compare row counts per partition. Return (all_ok, total_csv_rows, total_parquet_rows)."""
67+
csv_dir = csv_dir.resolve()
68+
parquet_dir = parquet_dir.resolve()
69+
total_csv = 0
70+
total_pq = 0
71+
all_ok = True
72+
for part_dir in csv_dir.rglob("*"):
73+
if not part_dir.is_dir():
74+
continue
75+
parsed = _parse_pums_partition_path(part_dir)
76+
if parsed is None:
77+
continue
78+
survey, end_year, record_type, state = parsed
79+
csv_count = _row_count_csv(part_dir)
80+
out_part = parquet_dir / survey / str(end_year) / record_type / f"state={state}"
81+
pq_file = out_part / "data.parquet"
82+
pq_count = _row_count_parquet(pq_file) if pq_file.exists() else -1
83+
total_csv += csv_count
84+
if pq_count >= 0:
85+
total_pq += pq_count
86+
else:
87+
all_ok = False
88+
print(
89+
f"MISSING parquet: {out_part.relative_to(parquet_dir)} (CSV rows={csv_count})"
90+
)
91+
if pq_count >= 0 and pq_count != csv_count:
92+
all_ok = False
93+
print(
94+
f"COUNT MISMATCH {part_dir.relative_to(csv_dir)}: CSV={csv_count} Parquet={pq_count}"
95+
)
96+
return all_ok, total_csv, total_pq
97+
98+
99+
def check_partition_values(part_dir: Path, pq_file: Path) -> bool:
100+
"""Read CSV and Parquet for one partition; compare after normalizing (lowercase, sort)."""
101+
csv_glob = str(part_dir / "*.csv")
102+
lf_csv = pl.scan_csv(csv_glob)
103+
cols = [c.lower() for c in lf_csv.collect_schema().names()]
104+
lf_csv = lf_csv.rename({c: c.lower() for c in lf_csv.collect_schema().names()})
105+
df_csv = cast(pl.DataFrame, lf_csv.collect())
106+
df_csv = df_csv.sort(cols)
107+
df_pq = pl.read_parquet(pq_file)
108+
if set(df_csv.columns) != set(df_pq.columns) or df_csv.height != df_pq.height:
109+
return False
110+
df_pq = df_pq.select(df_csv.columns).sort(cols)
111+
# Cast to string so CSV-inferred types vs parquet data-dict types still match
112+
a = df_csv.select(pl.all().cast(pl.Utf8))
113+
b = df_pq.select(pl.all().cast(pl.Utf8))
114+
return a.equals(b)
115+
116+
117+
def check_values(csv_dir: Path, parquet_dir: Path) -> bool:
118+
"""Run value-level check for every partition. Return True if all match."""
119+
csv_dir = csv_dir.resolve()
120+
parquet_dir = parquet_dir.resolve()
121+
all_ok = True
122+
for part_dir in csv_dir.rglob("*"):
123+
if not part_dir.is_dir():
124+
continue
125+
parsed = _parse_pums_partition_path(part_dir)
126+
if parsed is None:
127+
continue
128+
survey, end_year, record_type, state = parsed
129+
out_part = parquet_dir / survey / str(end_year) / record_type / f"state={state}"
130+
pq_file = out_part / "data.parquet"
131+
if not pq_file.exists():
132+
continue
133+
if not check_partition_values(part_dir, pq_file):
134+
all_ok = False
135+
print(f"VALUE MISMATCH: {part_dir.relative_to(csv_dir)}")
136+
return all_ok
137+
138+
139+
def main() -> int:
140+
parser = argparse.ArgumentParser(
141+
description="Verify CSV and Parquet row counts (and optionally values) match."
142+
)
143+
parser.add_argument(
144+
"--csv-dir", type=Path, required=True, help="Root of CSV tree (e.g. csv/)"
145+
)
146+
parser.add_argument(
147+
"--parquet-dir",
148+
type=Path,
149+
required=True,
150+
help="Root of Parquet tree (e.g. parquet/)",
151+
)
152+
parser.add_argument(
153+
"--values",
154+
action="store_true",
155+
help="Also compare values partition-by-partition (slower).",
156+
)
157+
args = parser.parse_args()
158+
159+
csv_dir = args.csv_dir.resolve()
160+
parquet_dir = args.parquet_dir.resolve()
161+
if not csv_dir.is_dir():
162+
print(f"Error: CSV dir not found: {csv_dir}", file=sys.stderr)
163+
return 1
164+
if not parquet_dir.is_dir():
165+
print(f"Error: Parquet dir not found: {parquet_dir}", file=sys.stderr)
166+
return 1
167+
168+
ok, total_csv, total_pq = check_row_counts(csv_dir, parquet_dir)
169+
print(f"Total CSV rows: {total_csv}")
170+
print(f"Total Parquet rows: {total_pq}")
171+
if not ok:
172+
print("Row count check: FAILED")
173+
return 1
174+
print("Row count check: OK")
175+
176+
if args.values:
177+
if not check_values(csv_dir, parquet_dir):
178+
print("Value check: FAILED")
179+
return 1
180+
print("Value check: OK")
181+
182+
return 0
183+
184+
185+
if __name__ == "__main__":
186+
sys.exit(main())

0 commit comments

Comments
 (0)