Skip to content

Commit 2b84164

Browse files
authored
change Parquet for DuckDB in EpiAutoGP (#1061)
* change Paquet for DuckDB * add method for DUCKDB * duckdb test julia -> julia * rm R date conversion step * add local test for interop date types * pin to julia 1.11 + move interop test to end * add project instantiation for interop test
1 parent 43ba11b commit 2b84164

10 files changed

Lines changed: 270 additions & 95 deletions

File tree

.github/workflows/test.yaml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ jobs:
2929
XLA_FLAGS: --xla_force_host_platform_device_count=2
3030
run: |
3131
uv run pytest \
32+
--ignore=pipelines/tests/test_epiautogp_parquet_interop.py \
3233
--cov=pipelines --cov-report=term --cov-report=xml:pipelines-coverage.xml .
3334
3435
- name: Upload results to Codecov
@@ -95,6 +96,8 @@ jobs:
9596

9697
- name: Set up Julia
9798
uses: julia-actions/setup-julia@v3
99+
with:
100+
version: "1.11"
98101

99102
- name: Run tests
100103
uses: julia-actions/julia-runtest@v1
@@ -116,3 +119,17 @@ jobs:
116119
files: lcov.info
117120
token: ${{ secrets.CODECOV_TOKEN }}
118121
verbose: true
122+
123+
- name: Set up R with hewr
124+
uses: ./.github/actions/setup-hewr
125+
126+
- name: Set up Pyrenew-HEW via UV
127+
uses: ./.github/actions/setup-pyrenew-hew
128+
129+
- name: Instantiate EpiAutoGP Julia project
130+
run: |
131+
julia --project=EpiAutoGP -e 'using Pkg; Pkg.instantiate()'
132+
133+
- name: Run EpiAutoGP parquet interop test
134+
run: |
135+
uv run pytest pipelines/tests/test_epiautogp_parquet_interop.py -q

EpiAutoGP/Project.toml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,36 @@
11
name = "EpiAutoGP"
22
uuid = "c2940010-6b35-4be1-8bbf-9fa0d9979e50"
3-
version = "0.1.0"
43
authors = ["Sam Brand (USI1) <usi1@cdc.gov>"]
4+
version = "0.1.0"
55

66
[deps]
77
ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63"
88
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
9+
DBInterface = "a10d1c49-ce27-4219-8d33-6db1a4562965"
910
DataFramesMeta = "1313f7d8-7da2-5740-9ea0-a2ca25f37964"
1011
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
12+
DuckDB = "d2f5444f-75bc-4fdf-ac35-56f514c445e1"
1113
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
1214
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
1315
NowcastAutoGP = "7e9f7f4b-f590-4c14-8324-de4fcbed18f7"
14-
Parquet = "626c502c-15b0-58ad-a749-f091afb673ae"
1516
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
1617
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"
1718
StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4"
1819
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
1920

2021
[sources]
21-
NowcastAutoGP = {rev = "v0.3.0", url = "https://github.com/CDCgov/NowcastAutoGP"}
22+
NowcastAutoGP = {rev = "v0.3.0", url = "https://github.com/CDCgov/NowcastAutoGP.git"}
2223

2324
[compat]
2425
ArgParse = "1.2.0"
2526
CSV = "0.10.15"
27+
DBInterface = "2.6.1"
2628
DataFramesMeta = "0.15.4"
2729
Dates = "1.11.0"
30+
DuckDB = "1.5.2"
2831
JSON3 = "1.14.3"
2932
Logging = "1.11.0"
3033
NowcastAutoGP = "0.3.0"
31-
Parquet = "0.8.6"
3234
Random = "1.11.0"
3335
Statistics = "1.11.1"
3436
StructTypes = "1.11.0"

EpiAutoGP/src/EpiAutoGP.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module EpiAutoGP
22
using NowcastAutoGP # Core modeling package
3-
using CSV, DataFramesMeta, Dates, JSON3, StructTypes, Parquet # Data handling packages
3+
using CSV, DataFramesMeta, Dates, DBInterface, DuckDB, JSON3, StructTypes # Data handling packages
44
using ArgParse # Command-line argument parsing
55
using Statistics # For modeling functions
66

EpiAutoGP/src/output.jl

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,24 @@ function create_forecast_df(results::NamedTuple, output_type::PipelineOutput)
117117
return forecast_df
118118
end
119119

120+
function _quote_duckdb_string(value::AbstractString)
121+
return "'" * replace(value, "'" => "''") * "'"
122+
end
123+
124+
function _write_parquet_with_duckdb(path::AbstractString, table)
125+
con = DBInterface.connect(DuckDB.DB, ":memory:")
126+
return try
127+
DuckDB.register_data_frame(con, table, "forecast_samples")
128+
DBInterface.execute(
129+
con,
130+
"COPY forecast_samples TO $(_quote_duckdb_string(path)) (FORMAT parquet)"
131+
)
132+
finally
133+
DBInterface.close(con)
134+
end
135+
end
136+
137+
120138
"""
121139
create_forecast_output(input, results, output_dir, output_type; kwargs...) -> DataFrame
122140
@@ -219,7 +237,7 @@ function create_forecast_output(
219237
Dict(
220238
"observed" => "observed_ed_visits",
221239
"other" => "other_ed_visits",
222-
"pct" => "prop_disease_ed_visits",
240+
"pct" => "prop_disease_ed_visits"
223241
)[input.ed_visit_type]
224242
end
225243

@@ -236,21 +254,11 @@ function create_forecast_output(
236254
forecast_df[!, :geo_value] .= input.location
237255
forecast_df[!, :disease] .= input.pathogen
238256

239-
# Add metadata columns for hubverse compatibility
240-
forecast_df[!, :geo_value] .= input.location
241-
forecast_df[!, :disease] .= input.pathogen
242-
243-
# Convert date column to string for parquet compatibility
244-
# Julia parquet does not support writing Dates
245-
forecast_df[!, :date] = string.(forecast_df[!, :date])
246-
247257
# Save as parquet if requested
248258
if save_output
249-
# Use model-specific naming with frequency prefix
250-
# This matches the convention: {frequency}_{model}_samples_{target_letter}.parquet
251-
parquet_path = joinpath(output_dir, "samples_raw.parquet")
259+
parquet_path = joinpath(output_dir, "samples.parquet")
252260
mkpath(dirname(parquet_path))
253-
Parquet.write_parquet(parquet_path, forecast_df)
261+
_write_parquet_with_duckdb(parquet_path, forecast_df)
254262

255263
@info "Saved pipeline forecast samples to $parquet_path"
256264
end

EpiAutoGP/test/runtests.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ using Test, EpiAutoGP
22
using JSON3
33
using ArgParse
44
using Dates
5+
using DBInterface
6+
using DuckDB
57
using CSV
68
using DataFramesMeta
79
using Random

EpiAutoGP/test/test_output.jl

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,67 @@ end
7878
rm(tmpdir, recursive = true)
7979
end
8080
end
81+
82+
@testset "PipelineOutput writes samples parquet with Date column" begin
83+
input = EpiAutoGPInput(
84+
[Date("2024-01-01")],
85+
[100.0],
86+
"COVID-19",
87+
"CA",
88+
"nhsn",
89+
"epiweekly",
90+
"observed",
91+
Date("2024-01-01"),
92+
Date[],
93+
Vector{Real}[]
94+
)
95+
96+
forecast_dates = [Date("2024-01-08"), Date("2024-01-15")]
97+
forecasts = reshape([10.0, 20.0, 30.0, 40.0], 2, 2)
98+
results = (forecast_dates = forecast_dates, forecasts = forecasts)
99+
100+
tmpdir = mktempdir()
101+
try
102+
result_df = create_forecast_output(
103+
input, results, tmpdir, PipelineOutput();
104+
save_output = true
105+
)
106+
107+
parquet_path = joinpath(tmpdir, "samples.parquet")
108+
@test isfile(parquet_path)
109+
@test eltype(result_df.date) == Date
110+
@test propertynames(result_df) == [
111+
:date,
112+
Symbol(".value"),
113+
Symbol(".draw"),
114+
Symbol(".variable"),
115+
:resolution,
116+
:geo_value,
117+
:disease
118+
]
119+
120+
con = DBInterface.connect(DuckDB.DB, ":memory:")
121+
try
122+
read_df = DataFrame(DBInterface.execute(
123+
con,
124+
"SELECT * FROM read_parquet($(EpiAutoGP._quote_duckdb_string(parquet_path)))"
125+
))
126+
127+
@test eltype(read_df.date) == Date
128+
@test propertynames(read_df) == propertynames(result_df)
129+
@test read_df.date == forecast_dates[[1, 2, 1, 2]]
130+
@test read_df[!, Symbol(".draw")] == [1, 1, 2, 2]
131+
@test read_df[!, Symbol(".value")] == [10.0, 20.0, 30.0, 40.0]
132+
@test all(read_df[!, Symbol(".variable")] .==
133+
"observed_hospital_admissions")
134+
@test all(read_df.resolution .== "epiweekly")
135+
@test all(read_df.geo_value .== "CA")
136+
@test all(read_df.disease .== "COVID-19")
137+
finally
138+
DBInterface.close(con)
139+
end
140+
finally
141+
rm(tmpdir, recursive = true)
142+
end
143+
end
81144
end

pipelines/epiautogp/create_samples_from_epiautogp_fit_dir.R

Lines changed: 0 additions & 48 deletions
This file was deleted.

pipelines/epiautogp/epiautogp_forecast_utils.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,9 @@
2424
load_credentials,
2525
make_figures_from_model_fit_dir,
2626
model_fit_dir_to_hub_tbl,
27-
run_r_script,
2827
)
2928

3029

31-
def create_samples_from_epiautogp_fit_dir(model_fit_dir: Path) -> None:
32-
"""Create samples.parquet from an EpiAutoGP model fit directory using R."""
33-
run_r_script(
34-
"pipelines/epiautogp/create_samples_from_epiautogp_fit_dir.R",
35-
[str(model_fit_dir)],
36-
function_name="create_samples_from_epiautogp_fit_dir",
37-
)
38-
return None
39-
40-
4130
@dataclass
4231
class ModelPaths:
4332
"""
@@ -150,8 +139,6 @@ def post_process_forecast(self) -> None:
150139
self.logger.info("Processing forecast and generating plots...")
151140
model_fit_dir = Path(self.model_run_dir, self.model_name)
152141

153-
create_samples_from_epiautogp_fit_dir(model_fit_dir=model_fit_dir)
154-
155142
make_figures_from_model_fit_dir(
156143
model_fit_dir=model_fit_dir,
157144
save_figs=True,

0 commit comments

Comments
 (0)