@@ -32,7 +32,9 @@ use axum::Json;
3232use axum:: extract:: Path ;
3333use axum:: extract:: Query ;
3434use axum:: extract:: State ;
35+ use axum:: http:: HeaderMap ;
3536use axum:: response:: IntoResponse ;
37+ use axum:: response:: Response ;
3638use duckdb:: Connection ;
3739
3840pub ( crate ) use self :: charts:: chart_payload;
@@ -62,53 +64,180 @@ pub use self::window::CommitWindow;
6264use crate :: app:: AppState ;
6365use crate :: db;
6466use crate :: error:: ApiError ;
67+ use crate :: read_model:: ArtifactCachePolicy ;
6568use crate :: slug:: ChartKey ;
6669use crate :: slug:: GroupKey ;
6770
71+ pub ( crate ) fn read_transaction < T > (
72+ conn : & mut Connection ,
73+ f : impl FnOnce ( & Connection ) -> Result < T > ,
74+ ) -> Result < T > {
75+ conn. execute_batch ( "BEGIN TRANSACTION" ) ?;
76+ let result = f ( conn) ;
77+ match result {
78+ Ok ( value) => {
79+ conn. execute_batch ( "COMMIT" ) ?;
80+ Ok ( value)
81+ }
82+ Err ( err) => {
83+ let _ = conn. execute_batch ( "ROLLBACK" ) ;
84+ Err ( err)
85+ }
86+ }
87+ }
88+
6889/// Handler for `GET /api/groups`.
69- pub async fn groups ( State ( state) : State < AppState > ) -> Result < impl IntoResponse , ApiError > {
70- let groups = db:: run_blocking ( & state. db , |conn| collect_groups ( conn) ) . await ?;
71- Ok ( Json ( GroupsResponse { groups } ) )
90+ pub async fn groups (
91+ State ( state) : State < AppState > ,
92+ headers : HeaderMap ,
93+ ) -> Result < Response , ApiError > {
94+ let generation = state. read_store . active ( ) ;
95+ Ok ( generation
96+ . groups_artifact ( )
97+ . response ( & headers, ArtifactCachePolicy :: Revalidate ) )
7298}
7399
74100/// Handler for `GET /api/chart/{slug}`.
75101pub async fn chart (
76102 State ( state) : State < AppState > ,
77103 Path ( slug) : Path < String > ,
78104 Query ( q) : Query < ChartQuery > ,
79- ) -> Result < impl IntoResponse , ApiError > {
105+ headers : HeaderMap ,
106+ ) -> Result < Response , ApiError > {
80107 let key = ChartKey :: from_slug ( & slug)
81108 . map_err ( |e| ApiError :: BadRequest ( format ! ( "invalid slug: {e}" ) ) ) ?;
82109 let window = q. window ( ) ;
83- let response =
84- db:: run_blocking ( & state. db , move |conn| chart_payload ( conn, & key, & window) ) . await ?;
110+ if is_materialized_window ( & window) {
111+ let generation = state. read_store . active ( ) ;
112+ let response = generation
113+ . chart_artifact ( & slug)
114+ . ok_or_else ( || ApiError :: NotFound ( format ! ( "no data for slug {slug:?}" ) ) ) ?;
115+ return Ok ( response. response ( & headers, ArtifactCachePolicy :: Revalidate ) ) ;
116+ }
117+ let response = cached_chart_payload ( & state, & slug, & key, & window) . await ?;
85118 let response =
86119 response. ok_or_else ( || ApiError :: NotFound ( format ! ( "no data for slug {slug:?}" ) ) ) ?;
87- Ok ( Json ( response) )
120+ Ok ( Json ( response) . into_response ( ) )
88121}
89122
90123/// Handler for `GET /api/group/{slug}`.
91124pub async fn group (
92125 State ( state) : State < AppState > ,
93126 Path ( slug) : Path < String > ,
94127 Query ( q) : Query < ChartQuery > ,
95- ) -> Result < impl IntoResponse , ApiError > {
128+ headers : HeaderMap ,
129+ ) -> Result < Response , ApiError > {
96130 let key = GroupKey :: from_slug ( & slug)
97131 . map_err ( |e| ApiError :: BadRequest ( format ! ( "invalid group slug: {e}" ) ) ) ?;
98132 let window = q. window ( ) ;
99- let response = db:: run_blocking ( & state. db , move |conn| {
100- collect_group_charts ( conn, & key, & window)
101- } )
102- . await ?;
133+ if is_materialized_window ( & window) {
134+ let generation = state. read_store . active ( ) ;
135+ let response = generation
136+ . group_artifact ( & slug)
137+ . ok_or_else ( || ApiError :: NotFound ( format ! ( "no data for group slug {slug:?}" ) ) ) ?;
138+ return Ok ( response. response ( & headers, ArtifactCachePolicy :: Revalidate ) ) ;
139+ }
140+ let response = cached_group_charts ( & state, & slug, & key, & window) . await ?;
103141 let response =
104142 response. ok_or_else ( || ApiError :: NotFound ( format ! ( "no data for group slug {slug:?}" ) ) ) ?;
105- Ok ( Json ( response) )
143+ Ok ( Json ( response) . into_response ( ) )
144+ }
145+
146+ /// Handler for versioned latest-100 group shard artifacts.
147+ pub async fn group_shard_artifact (
148+ State ( state) : State < AppState > ,
149+ Path ( ( generation_id, group_slug, index) ) : Path < ( String , String , usize ) > ,
150+ headers : HeaderMap ,
151+ ) -> Result < Response , ApiError > {
152+ let generation = state
153+ . read_store
154+ . generation ( & generation_id)
155+ . ok_or_else ( || ApiError :: NotFound ( format ! ( "unknown generation {generation_id:?}" ) ) ) ?;
156+ let artifact = generation
157+ . group_shard_artifact ( & group_slug, index)
158+ . ok_or_else ( || ApiError :: NotFound ( format ! ( "unknown group shard {group_slug:?}#{index}" ) ) ) ?;
159+ Ok ( artifact. response ( & headers, ArtifactCachePolicy :: Immutable ) )
160+ }
161+
162+ fn is_materialized_window ( window : & CommitWindow ) -> bool {
163+ matches ! ( window, CommitWindow :: Last ( n) if n. get( ) == DEFAULT_COMMIT_WINDOW )
164+ }
165+
166+ /// Cache-aware wrapper around `collect_groups`.
167+ pub async fn cached_groups ( state : & AppState ) -> Result < std:: sync:: Arc < Vec < Group > > > {
168+ let db = state. db . clone ( ) ;
169+ state
170+ . cache
171+ . groups ( move || async move {
172+ db:: run_read_blocking ( & db, |conn| read_transaction ( conn, collect_groups) ) . await
173+ } )
174+ . await
175+ }
176+
177+ /// Cache-aware wrapper around [`collect_filter_universe`].
178+ pub async fn cached_filter_universe ( state : & AppState ) -> Result < std:: sync:: Arc < FilterUniverse > > {
179+ let db = state. db . clone ( ) ;
180+ state
181+ . cache
182+ . filter_universe ( move || async move {
183+ db:: run_read_blocking ( & db, |conn| read_transaction ( conn, collect_filter_universe) ) . await
184+ } )
185+ . await
186+ }
187+
188+ /// Cache-aware wrapper around `chart_payload`.
189+ pub async fn cached_chart_payload (
190+ state : & AppState ,
191+ slug : & str ,
192+ key : & ChartKey ,
193+ window : & CommitWindow ,
194+ ) -> Result < Option < std:: sync:: Arc < ChartResponse > > > {
195+ let db = state. db . clone ( ) ;
196+ let key_for_compute = key. clone ( ) ;
197+ let window_for_compute = * window;
198+ state
199+ . cache
200+ . chart_payload ( slug, window, move || async move {
201+ db:: run_read_blocking ( & db, move |conn| {
202+ read_transaction ( conn, |conn| {
203+ chart_payload ( conn, & key_for_compute, & window_for_compute)
204+ } )
205+ } )
206+ . await
207+ } )
208+ . await
209+ }
210+
211+ /// Cache-aware wrapper around `collect_group_charts`.
212+ pub async fn cached_group_charts (
213+ state : & AppState ,
214+ slug : & str ,
215+ key : & GroupKey ,
216+ window : & CommitWindow ,
217+ ) -> Result < Option < std:: sync:: Arc < GroupChartsResponse > > > {
218+ let db = state. db . clone ( ) ;
219+ let key_for_compute = key. clone ( ) ;
220+ let window_for_compute = * window;
221+ state
222+ . cache
223+ . group_charts ( slug, window, move || async move {
224+ db:: run_read_blocking ( & db, move |conn| {
225+ read_transaction ( conn, |conn| {
226+ collect_group_charts ( conn, & key_for_compute, & window_for_compute)
227+ } )
228+ } )
229+ . await
230+ } )
231+ . await
106232}
107233
108234/// Handler for `GET /health`.
109235pub async fn health ( State ( state) : State < AppState > ) -> Result < impl IntoResponse , ApiError > {
110236 let path = state. db_path . display ( ) . to_string ( ) ;
111- let response = db:: run_blocking ( & state. db , move |conn| collect_health ( conn, path) ) . await ?;
237+ let response = db:: run_read_blocking ( & state. db , move |conn| {
238+ read_transaction ( conn, |conn| collect_health ( conn, path) )
239+ } )
240+ . await ?;
112241 Ok ( Json ( response) )
113242}
114243
0 commit comments