Skip to content

Commit 95e6411

Browse files
Add CDCWonder_NNDSS_Infectious_Weekly scripts and schema mappings (#1973)
* Add NNDSS Infectious Weekly scripts and schema mappings * Add NNDSS Infectious Weekly scripts and schema mappings v1 * Fix: Remove 'python' prefix from manifest script execution and implement Pandas chunking * Fix: Remove 'python' prefix from manifest script execution and implement Pandas chunking v1 * Address PR comments: add bounds check, replace print with logging, add docstrings, and robust column logic
1 parent b02716a commit 95e6411

8 files changed

Lines changed: 814 additions & 0 deletions

File tree

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# CDCWonder_NNDSS_Infectious_Weekly
2+
3+
## Overview
4+
Notifiable Infectious Diseases Data: Weekly tables from CDC WONDER which has the incident counts of different infectious diseases per Previous 52 week that are reported by the 50 states, New York City, the District of Columbia, and the U.S. territories.
5+
6+
## Data Source
7+
**Source URL:**
8+
`https://data.cdc.gov/api/views/x9gk-5huc/rows.csv?accessType=DOWNLOAD&api_foundry=true`
9+
10+
## How To Download Input Data
11+
To download and process the data, you'll need to run the provided preprocess script, `preprocess.py`. This script will automatically create an "input_files" folder where you should place the file to be processed.By using this script, we are creating one more columns in the input files such as 'observationDate'.
12+
13+
statvars: Infectious Diseases
14+
15+
## Download the data:
16+
For download and preprocess the source data, run:
17+
```python3 preprocess.py```
18+
19+
## Processing Instructions
20+
To process data and generate statistical variables, use the following command from the "data" directory:
21+
22+
**For Test Data Run**
23+
```
24+
python3 tools/statvar_importer/stat_var_processor.py \
25+
--input_data=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/testdata/NNDSS_Weekly_Data.csv \
26+
--pv_map=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_pvmap.csv \
27+
--config_file=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_metadata.csv \
28+
--output_path=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/testdata/nndss_weekly_output
29+
```
30+
31+
**For Main data run**
32+
```bash
33+
python3 tools/statvar_importer/stat_var_processor.py \
34+
--input_data=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/input_files/NNDSS_Weekly_Data.csv \
35+
--pv_map=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_pvmap.csv \
36+
--config_file=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_metadata.csv \
37+
--existing_statvar_mcf=gs://unresolved_mcf/scripts/statvar/stat_vars.mcf \
38+
--output_path=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/output/nndss_weekly_output
39+
```
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"import_specifications": [
3+
{
4+
"import_name": "CDCWonder_NNDSS_Infectious_Weekly",
5+
"curator_emails": [
6+
"support@datacommons.org"
7+
],
8+
"provenance_url": "https://data.cdc.gov/api/views/x9gk-5huc/rows.csv?accessType=DOWNLOAD&api_foundry=true",
9+
"provenance_description": "Notifiable Infectious Diseases Data: Weekly tables from CDC WONDER which has the incident counts of different infectious diseases per week that are reported by the 50 states, New York City, the District of Columbia, and the U.S. territories.",
10+
"scripts": [
11+
"preprocess.py",
12+
"../../../tools/statvar_importer/stat_var_processor.py --input_data=input_files/NNDSS_Weekly_Data.csv --pv_map='nndss_weekly_pvmap.csv' --config_file=nndss_weekly_metadata.csv --output_path=output/nndss_weekly_output"
13+
],
14+
"import_inputs": [
15+
{
16+
"template_mcf": "output/nndss_weekly_output.tmcf",
17+
"cleaned_csv": "output/nndss_weekly_output.csv",
18+
"node_mcf": "output/*.mcf"
19+
}
20+
],
21+
"source_files": [
22+
"input_files/NNDSS_Weekly_Data.csv"
23+
],
24+
"cron_schedule": "00 11 1,15 * *",
25+
"resource_limits": {"cpu": 8, "memory": 32, "disk": 100}
26+
}
27+
],
28+
"config_override": {
29+
"invoke_import_validation": true,
30+
"invoke_import_tool": true,
31+
"invoke_differ_tool": true,
32+
"skip_input_upload": false,
33+
"skip_gcs_upload": false,
34+
"cleanup_gcs_volume_mount": false
35+
}
36+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
parameter,val
2+
mapped_rows,1
3+
mapped_columns,5
4+
header_rows,1
5+
#places_resolved_csv,
6+
input_columns,8
7+
#input_rows,1000

statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_pvmap.csv

Lines changed: 367 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# Copyright 2025 Google LLC
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
4+
import os, sys
5+
import pandas as pd
6+
from absl import app, logging
7+
from pathlib import Path
8+
import datetime
9+
import importlib.util
10+
import shutil
11+
12+
script_dir = os.path.dirname(os.path.abspath(__file__))
13+
util_script_path = os.path.abspath(os.path.join(script_dir, '../../../util/download_util_script.py'))
14+
spec = importlib.util.spec_from_file_location('download_util_script', util_script_path)
15+
if spec is None or spec.loader is None:
16+
raise ImportError(f'Could not load download_util_script from {util_script_path}')
17+
download_util_script = importlib.util.module_from_spec(spec)
18+
spec.loader.exec_module(download_util_script)
19+
download_file = download_util_script.download_file
20+
INPUT_DIR = os.path.join(script_dir, "input_files")
21+
Path(INPUT_DIR).mkdir(parents=True, exist_ok=True)
22+
INPUT_FILE = os.path.join(INPUT_DIR, "rows.csv")
23+
NEW_FILE = os.path.join(INPUT_DIR, "NNDSS_Weekly_Data.csv")
24+
SOURCE_URL = "https://data.cdc.gov/api/views/x9gk-5huc/rows.csv?accessType=DOWNLOAD&api_foundry=true"
25+
26+
def _start_date_of_year(year: int) -> datetime.date:
27+
"""Return the first day of the first MMWR week for a given year.
28+
29+
The first MMWR week starts on the Sunday of the week containing Jan 4.
30+
"""
31+
jan_one = datetime.date(year, 1, 1)
32+
diff = 7 * (jan_one.isoweekday() > 3) - jan_one.isoweekday()
33+
return jan_one + datetime.timedelta(days=diff)
34+
35+
def get_mmwr_week_start_date(year, week) -> datetime.date:
36+
"""Compute the start date for a given MMWR year and week.
37+
38+
Args:
39+
year: The MMWR year value from the CDC dataset.
40+
week: The MMWR week value from the CDC dataset.
41+
42+
Returns:
43+
A datetime.date object for the first day of the specified week, or None if invalid.
44+
"""
45+
try:
46+
year = int(year)
47+
week = int(week)
48+
except (ValueError, TypeError):
49+
return None
50+
51+
if not (1 <= week <= 53):
52+
logging.warning(f"Invalid MMWR WEEK found: {week}. Skipping date calculation.")
53+
return None
54+
55+
day_one = _start_date_of_year(year)
56+
diff = 7 * (week - 1)
57+
return day_one + datetime.timedelta(days=diff)
58+
59+
def preprocess_data(filepath: str):
60+
"""Read a CDC CSV in chunks, add observation dates, and save safely.
61+
62+
Args:
63+
filepath: Path to the downloaded CDC CSV file.
64+
"""
65+
temp_filepath = filepath + ".tmp"
66+
chunk_size = 100000
67+
first_chunk = True
68+
chunk_count = 0
69+
70+
try:
71+
logging.info(f"Opening pandas reader on {filepath}...")
72+
73+
# Added safety flags: low_memory=False and on_bad_lines='skip'
74+
# to prevent C-level SIGABRT crashes on bad rows.
75+
reader = pd.read_csv(filepath, chunksize=chunk_size, low_memory=False, on_bad_lines='skip')
76+
77+
for chunk in reader:
78+
chunk_count += 1
79+
logging.info(f"Processing chunk {chunk_count}...")
80+
81+
if first_chunk:
82+
required_cols = ['Current MMWR Year', 'MMWR WEEK']
83+
if not all(col in chunk.columns for col in required_cols):
84+
raise KeyError(f"The file must contain the columns: {required_cols}.")
85+
86+
chunk['observationDate'] = chunk.apply(
87+
lambda row: get_mmwr_week_start_date(row['Current MMWR Year'], row['MMWR WEEK']),
88+
axis=1
89+
)
90+
91+
cols = list(chunk.columns)
92+
cols.remove('observationDate')
93+
mmwr_week_index = cols.index('MMWR WEEK')
94+
cols.insert(mmwr_week_index + 1, 'observationDate')
95+
chunk = chunk[cols]
96+
97+
chunk.to_csv(temp_filepath, mode='a' if not first_chunk else 'w',
98+
header=first_chunk, index=False)
99+
first_chunk = False
100+
101+
logging.info("All chunks processed. Moving temp file...")
102+
shutil.move(temp_filepath, filepath)
103+
logging.info(f"Success: File '{filepath}' updated safely.")
104+
105+
except Exception as e:
106+
if os.path.exists(temp_filepath): os.remove(temp_filepath)
107+
logging.error(f"Error during Pandas processing: {e}")
108+
logging.fatal(f"An unexpected error occurred: {e}")
109+
raise RuntimeError(f"Import job failed An unexpected error occurred: {e}")
110+
111+
def main(argv):
112+
"""Download CDC data, validate it, preprocess it, and rename the output."""
113+
logging.info("Starting download phase...")
114+
try:
115+
download_file(url=SOURCE_URL,
116+
output_folder=INPUT_DIR,
117+
unzip=False,
118+
headers= None,
119+
tries= 3,
120+
delay= 5,
121+
backoff= 2)
122+
logging.info("Download function completed.")
123+
except Exception as e:
124+
logging.error(f"Failed during download: {e}")
125+
logging.fatal(f"Failed to download NNDSS weekly data file,{e}")
126+
raise RuntimeError(f"Failed to download NNDSS weekly data file,{e}")
127+
128+
# Check if file actually downloaded and check its size
129+
if not os.path.exists(INPUT_FILE):
130+
logging.fatal("The file 'rows.csv' was never downloaded.")
131+
sys.exit(1)
132+
133+
file_size_mb = os.path.getsize(INPUT_FILE) / (1024 * 1024)
134+
logging.info(f"Downloaded file size is {file_size_mb:.2f} MB.")
135+
136+
# Prevent Pandas from processing tiny error files
137+
if file_size_mb < 0.1:
138+
logging.error("File is suspiciously small! CDC likely returned an HTML error page.")
139+
with open(INPUT_FILE, 'r') as f:
140+
logging.error(f"Preview of bad file:\n{f.read(500)}")
141+
sys.exit(1)
142+
143+
logging.info("Handing off to Pandas chunker...")
144+
preprocess_data(INPUT_FILE)
145+
146+
logging.info("Renaming final file...")
147+
try:
148+
if os.path.exists(INPUT_FILE):
149+
if os.path.exists(NEW_FILE):
150+
os.remove(NEW_FILE)
151+
os.rename(INPUT_FILE, NEW_FILE)
152+
logging.info("Successfully renamed file.")
153+
except Exception as e:
154+
logging.error(f"Failed to rename file: {e}")
155+
sys.exit(1)
156+
157+
if __name__ == "__main__":
158+
app.run(main)

0 commit comments

Comments
 (0)