Skip to content

Commit 261133f

Browse files
authored
Merge pull request #519 from lsst/tickets/SP-3139
tickets/SP-3139: Add support for reading visits from hdf5 as well as sqlite3
2 parents a394bb7 + b968ce1 commit 261133f

2 files changed

Lines changed: 234 additions & 56 deletions

File tree

rubin_sim/maf/utils/opsim_utils.py

Lines changed: 197 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
__all__ = (
22
"get_sim_data",
3+
"get_visit_data",
34
"scale_benchmarks",
45
"calc_coadded_depth",
56
)
@@ -15,64 +16,56 @@
1516
from sqlalchemy.engine import make_url
1617

1718

18-
def get_sim_data(
19+
def _local_get_sim_data(
1920
db_con,
2021
sqlconstraint=None,
2122
dbcols=None,
2223
stackers=None,
2324
table_name=None,
2425
full_sql_query=None,
26+
*,
27+
return_class=np.recarray,
2528
):
26-
"""Query an opsim database for the needed data columns
27-
and run any required stackers.
28-
29-
Parameters
30-
----------
31-
db_con : `str` or SQLAlchemy connectable, or sqlite3 connection
32-
Filename to a sqlite3 file, or a connection object that
33-
can be used by pandas.read_sql
34-
sqlconstraint : `str` or None
35-
SQL constraint to apply to query for observations.
36-
Ignored if full_sql_query is set.
37-
dbcols : `list` [`str`]
38-
Columns required from the database. Ignored if full_sql_query is set.
39-
stackers : `list` [`rubin_sim.maf.stackers`], optional
40-
Stackers to be used to generate additional columns. Default None.
41-
table_name : `str` (None)
42-
Name of the table to query.
43-
Default None will try "observations".
44-
Ignored if full_sql_query is set.
45-
full_sql_query : `str`
46-
The full SQL query to use. Overrides sqlconstraint, dbcols, tablename.
47-
48-
Returns
49-
-------
50-
sim_data : `np.ndarray`
51-
A numpy structured array with columns resulting from dbcols + stackers,
52-
for observations matching the SQLconstraint.
53-
"""
5429
if sqlconstraint is None:
5530
sqlconstraint = ""
5631

32+
need_sql: bool = full_sql_query is not None or len(sqlconstraint) > 0
33+
5734
# Check that file exists
5835
if isinstance(db_con, str):
5936
if os.path.isfile(db_con) is False:
6037
raise FileNotFoundError("No file %s" % db_con)
6138

39+
# Check if this is an HDF5 file
40+
is_hdf5 = isinstance(db_con, str) and db_con.lower().endswith((".h5", ".hdf5"))
41+
6242
# Check if table is "observations" or "SummaryAllProps"
6343
if (table_name is None) & (full_sql_query is None) & (isinstance(db_con, str)):
64-
url = make_url("sqlite:///" + db_con)
65-
eng = create_engine(url)
66-
inspector = inspect(eng)
67-
tables = [inspector.get_table_names(schema=schema) for schema in inspector.get_schema_names()]
68-
if "observations" in tables[0]:
69-
table_name = "observations"
70-
elif "SummaryAllProps" in tables[0]:
71-
table_name = "SummaryAllProps"
72-
elif "summary" in tables[0]:
73-
table_name = "summary"
44+
if is_hdf5:
45+
# For HDF5, detect table names by reading the store keys
46+
with pd.HDFStore(db_con, mode="r") as store:
47+
table_keys = [key.lstrip("/") for key in store.keys()]
48+
if "observations" in table_keys:
49+
table_name = "observations"
50+
elif "SummaryAllProps" in table_keys:
51+
table_name = "SummaryAllProps"
52+
elif "summary" in table_keys:
53+
table_name = "summary"
54+
else:
55+
raise ValueError("Could not guess table_name, set with table_name or full_sql_query kwargs")
7456
else:
75-
raise ValueError("Could not guess table_name, set with table_name or full_sql_query kwargs")
57+
url = make_url("sqlite:///" + db_con)
58+
eng = create_engine(url)
59+
inspector = inspect(eng)
60+
tables = [inspector.get_table_names(schema=schema) for schema in inspector.get_schema_names()]
61+
if "observations" in tables[0]:
62+
table_name = "observations"
63+
elif "SummaryAllProps" in tables[0]:
64+
table_name = "SummaryAllProps"
65+
elif "summary" in tables[0]:
66+
table_name = "summary"
67+
else:
68+
raise ValueError("Could not guess table_name, set with table_name or full_sql_query kwargs")
7669
elif (table_name is None) & (full_sql_query is None):
7770
# If someone passes in a connection object with an old table_name
7871
# things will fail
@@ -89,35 +82,183 @@ def get_sim_data(
8982
else:
9083
query = full_sql_query
9184

92-
if isinstance(db_con, sqlite3.Connection):
93-
sim_data = pd.read_sql(query, db_con).to_records(index=False)
85+
sim_data: np.recarray | pd.DataFrame | None = None
86+
if is_hdf5 and not need_sql:
87+
# Pure HDF5 path - no SQL filtering needed
88+
sim_data = pd.read_hdf(db_con, key=table_name)
89+
elif isinstance(db_con, sqlite3.Connection):
90+
sim_data = pd.read_sql(query, db_con)
9491
elif isinstance(db_con, str) and os.path.isfile(db_con):
95-
with closing(sqlite3.connect(db_con)) as con:
96-
sim_data = pd.read_sql(query, con).to_records(index=False)
97-
elif (not isinstance(db_con, str)) or urllib.parse.urlparse(db_con).scheme != "":
92+
# Handle HDF5 files with SQL constraints by loading into
93+
# in-memory SQLite
94+
if is_hdf5:
95+
with closing(sqlite3.connect(":memory:")) as con:
96+
with pd.HDFStore(db_con, mode="r") as store:
97+
for key in store.keys():
98+
tbl_name = key.lstrip("/")
99+
df = pd.read_hdf(db_con, key=key)
100+
df.to_sql(tbl_name, con, index=False)
101+
sim_data = pd.read_sql(query, con)
102+
else:
103+
with closing(sqlite3.connect(db_con)) as con:
104+
sim_data = pd.read_sql(query, con)
105+
else:
106+
raise RuntimeError(f"Cannot find {db_con}.")
107+
108+
if len(sim_data) == 0:
109+
raise UserWarning("No data found matching sqlconstraint %s" % (sqlconstraint))
110+
111+
# Now add the stacker columns.
112+
# This fails for pandas.DataFrames, so convert to recarray
113+
# if necessary.
114+
if stackers is not None:
115+
if not isinstance(sim_data, np.recarray):
116+
assert isinstance(sim_data, pd.DataFrame)
117+
sim_data = sim_data.to_records(index=False)
118+
for s in stackers:
119+
sim_data = s.run(sim_data).view(np.recarray)
120+
121+
if return_class is np.recarray:
122+
if isinstance(sim_data, pd.DataFrame):
123+
sim_data = sim_data.to_records(index=False)
124+
if return_class is pd.DataFrame:
125+
if isinstance(sim_data, np.recarray):
126+
sim_data = pd.DataFrame(sim_data)
127+
128+
assert isinstance(sim_data, return_class)
129+
130+
return sim_data
131+
132+
133+
def get_sim_data(
134+
db_con,
135+
sqlconstraint=None,
136+
dbcols=None,
137+
stackers=None,
138+
table_name=None,
139+
full_sql_query=None,
140+
*,
141+
return_class=np.recarray,
142+
):
143+
"""Query an opsim database for the needed data columns
144+
and run any required stackers.
145+
146+
Parameters
147+
----------
148+
db_con : `str` or SQLAlchemy connectable, or sqlite3 connection
149+
Filename or lsst.resources path to an hdf5 or sqlite3 file,
150+
or a connection object that can be used by pandas.read_sql
151+
sqlconstraint : `str` or None
152+
SQL constraint to apply to query for observations.
153+
Ignored if full_sql_query is set.
154+
dbcols : `list` [`str`]
155+
Columns required from the database. Ignored if full_sql_query is set.
156+
stackers : `list` [`rubin_sim.maf.stackers`], optional
157+
Stackers to be used to generate additional columns. Default None.
158+
table_name : `str` (None)
159+
Name of the table to query.
160+
Default None will try "observations".
161+
Ignored if full_sql_query is set.
162+
full_sql_query : `str`
163+
The full SQL query to use. Overrides sqlconstraint, dbcols, tablename.
164+
165+
Returns
166+
-------
167+
sim_data : `np.ndarray`
168+
A numpy structured array with columns resulting from dbcols + stackers,
169+
for observations matching the SQLconstraint.
170+
"""
171+
if (not isinstance(db_con, str)) or urllib.parse.urlparse(db_con).scheme == "":
172+
# Already have a local copy
173+
sim_data = _local_get_sim_data(
174+
db_con, sqlconstraint, dbcols, stackers, table_name, full_sql_query, return_class=return_class
175+
)
176+
else:
98177
try:
99178
from lsst.resources import ResourcePath
100179

101180
with ResourcePath(db_con).as_local() as local_db_path:
102-
with closing(sqlite3.connect(local_db_path.ospath)) as con:
103-
sim_data = pd.read_sql(query, con).to_records(index=False)
181+
sim_data = _local_get_sim_data(
182+
local_db_path,
183+
sqlconstraint,
184+
dbcols,
185+
stackers,
186+
table_name,
187+
full_sql_query,
188+
return_class=return_class,
189+
)
104190
except ModuleNotFoundError:
105191
raise RuntimeError(
106192
f"Cannot read visits from {db_con}."
107193
"Maybe it does not exist, or maybe you need to install lsst.resources."
108194
)
109-
else:
110-
raise RuntimeError("Cannot find {db_con}.")
111-
112-
if len(sim_data) == 0:
113-
raise UserWarning("No data found matching sqlconstraint %s" % (sqlconstraint))
114-
# Now add the stacker columns.
115-
if stackers is not None:
116-
for s in stackers:
117-
sim_data = s.run(sim_data)
118195
return sim_data
119196

120197

198+
# This almost-alias to get_sim_data is named to be less misleading,
199+
# in that the same get_*_data function can be used for real visits
200+
# from consdb as well. Return a DF instead of a recarray by default
201+
# because much of the code designed for reading consdb output
202+
# wants that.
203+
def get_visit_data(
204+
db_con,
205+
sqlconstraint=None,
206+
dbcols=None,
207+
stackers=None,
208+
table_name=None,
209+
full_sql_query=None,
210+
*,
211+
return_class=pd.DataFrame,
212+
):
213+
"""Query an opsim database, returning a `pandas.DataFrame` by default.
214+
215+
Thin wrapper around `get_sim_data` with ``return_class`` defaulting to
216+
`pandas.DataFrame` instead of `numpy.recarray`. All parameters are
217+
forwarded unchanged; see `get_sim_data` for full documentation.
218+
219+
Parameters
220+
----------
221+
db_con : `str` or SQLAlchemy connectable, or sqlite3 connection
222+
Filename or lsst.resources path to an hdf5 or sqlite3 file,
223+
or a connection object that can be used by pandas.read_sql
224+
sqlconstraint : `str` or None
225+
SQL constraint to apply to query for observations.
226+
Ignored if full_sql_query is set.
227+
dbcols : `list` [`str`]
228+
Columns required from the database. Ignored if full_sql_query is set.
229+
stackers : `list` [`rubin_sim.maf.stackers`], optional
230+
Stackers to be used to generate additional columns. Default None.
231+
table_name : `str` (None)
232+
Name of the table to query.
233+
Default None will try "observations".
234+
Ignored if full_sql_query is set.
235+
full_sql_query : `str`
236+
The full SQL query to use. Overrides sqlconstraint, dbcols, tablename.
237+
return_class : `type`, optional
238+
Class of the returned data. Default `pandas.DataFrame`.
239+
240+
Returns
241+
-------
242+
sim_data : `pandas.DataFrame`
243+
A `pandas.DataFrame` with columns resulting from dbcols + stackers,
244+
for observations matching the sqlconstraint.
245+
246+
See Also
247+
--------
248+
get_sim_data : Equivalent function with ``return_class`` defaulting to
249+
`numpy.recarray`.
250+
"""
251+
return get_sim_data(
252+
db_con,
253+
sqlconstraint=sqlconstraint,
254+
dbcols=dbcols,
255+
stackers=stackers,
256+
table_name=table_name,
257+
full_sql_query=full_sql_query,
258+
return_class=return_class,
259+
)
260+
261+
121262
def scale_benchmarks(run_length, benchmark="design"):
122263
"""Set design and stretch values of the number of visits or
123264
area of the footprint or seeing/Fwhmeff/skybrightness and single visit

tests/maf/test_opsimutils.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import os
22
import sqlite3
33
import unittest
4+
from tempfile import TemporaryDirectory
45

56
import numpy as np
67
from rubin_scheduler.data import get_data_dir
78

89
import rubin_sim.maf.utils.opsim_utils as opsimUtils
10+
from rubin_sim.sim_archive.util import opsimdb_to_hdf5
911

1012
TEST_DB = "example_v3.4_0yrs.db"
1113

@@ -75,6 +77,41 @@ def test_get_sim_data(self):
7577
with self.assertRaises(FileNotFoundError):
7678
opsimUtils.get_sim_data("not_a_file.db", sql, ["nocol"])
7779

80+
def test_get_sim_data_hdf5(self):
81+
"""Test that we can get simulation data from HDF5 files."""
82+
database_file = os.path.join(get_data_dir(), "tests", TEST_DB)
83+
84+
# Create HDF5 file from database
85+
with TemporaryDirectory() as tmpdir:
86+
hdf5_file = os.path.join(tmpdir, "test_visits.h5")
87+
opsimdb_to_hdf5(database_file, hdf5_file)
88+
89+
# Test basic HDF5 reading without constraints
90+
data = opsimUtils.get_sim_data(hdf5_file)
91+
assert np.size(data) > 0
92+
# Check that we got reasonable columns
93+
assert "observationId" in data.dtype.names
94+
95+
# Test HDF5 with sqlconstraint
96+
data_filtered = opsimUtils.get_sim_data(hdf5_file, sqlconstraint="observationId < 10")
97+
assert np.size(data_filtered) > 0
98+
# Verify all rows satisfy the constraint
99+
assert np.all(data_filtered["observationId"] < 10)
100+
101+
# Test HDF5 with full_sql_query
102+
full_sql = "SELECT observationId, fieldRA FROM observations WHERE observationId < 5;"
103+
data_query = opsimUtils.get_sim_data(hdf5_file, full_sql_query=full_sql)
104+
assert np.size(data_query) > 0
105+
assert "observationId" in data_query.dtype.names
106+
assert "fieldRA" in data_query.dtype.names
107+
assert np.all(data_query["observationId"] < 5)
108+
109+
# Verify HDF5 results match SQLite results
110+
data_sqlite = opsimUtils.get_sim_data(database_file, sqlconstraint="observationId < 10")
111+
112+
data_hdf5 = opsimUtils.get_sim_data(hdf5_file, sqlconstraint="observationId < 10")
113+
assert np.allclose(data_sqlite["fieldRA"], data_hdf5["fieldRA"])
114+
78115

79116
if __name__ == "__main__":
80117
unittest.main()

0 commit comments

Comments
 (0)