1- use std:: { collections:: HashMap , sync:: Arc } ;
2-
31use futures_util:: stream:: FuturesOrdered ;
42
5- use grafana_plugin_sdk:: backend;
6- use tokio:: sync:: RwLock ;
7- use tokio_postgres:: Client ;
3+ use grafana_plugin_sdk:: backend:: { self , DataSourceInstanceSettings } ;
84
95use crate :: {
10- path:: { self , PathDisplay , QueryId } ,
11- queries:: { Query , SelectStatement , TailTarget } ,
6+ path:: { PathDisplay , QueryId } ,
7+ queries:: { Query , TailTarget } ,
128 rows_to_frame, Error , MaterializePlugin ,
139} ;
1410
@@ -31,32 +27,40 @@ impl backend::DataQueryError for QueryError {
3127// Unfortunately this has to take all of its arguments by value until we have
3228// GATs, since the `DataService::Stream` associated type can't contain references.
3329// Ideally we'd just borrow the query/uid etc but it's really not a big deal.
34- async fn query_data_single (
35- client : & Client ,
36- uid : & str ,
37- query : backend:: DataQuery < Query > ,
38- queries : Arc < RwLock < HashMap < path:: QueryId , SelectStatement > > > ,
39- ) -> Result < backend:: DataResponse , Error > {
40- let q = query. query ;
41- let target = q. as_tail ( ) ?;
42- let rows = target. select_all ( client) . await ?;
43- let mut frame = rows_to_frame ( & rows) ;
30+ impl MaterializePlugin {
31+ async fn query_data_single (
32+ & self ,
33+ datasource_instance_settings : & DataSourceInstanceSettings ,
34+ query : & backend:: DataQuery < Query > ,
35+ ) -> Result < backend:: DataResponse , Error > {
36+ let q = & query. query ;
37+ let client = self . get_client ( datasource_instance_settings) . await ?;
38+ let target = q. as_tail ( ) ?;
39+ let rows = target. select_all ( & client) . await ?;
40+ let mut frame = rows_to_frame ( & rows) ;
4441
45- if let TailTarget :: Select { statement } = target {
46- let query_id = QueryId :: from_statement ( statement) ;
47- queries. write ( ) . await . insert ( query_id, statement. clone ( ) ) ;
48- }
42+ if let TailTarget :: Select { statement } = target {
43+ let query_id = QueryId :: from_statement ( statement) ;
44+ self . sql_queries
45+ . write ( )
46+ . await
47+ . insert ( query_id, statement. clone ( ) ) ;
48+ }
4949
50- let path = q. to_path ( ) ;
51- // Set the channel of the frame, indicating to Grafana that it should switch to
52- // streaming.
53- let channel = format ! ( "ds/{}/{}" , uid, path)
54- . parse ( )
55- . map_err ( Error :: CreatingChannel ) ?;
56- frame. set_channel ( channel) ;
57- let frame = frame. check ( ) ?;
50+ let path = q. to_path ( ) ;
51+ // Set the channel of the frame, indicating to Grafana that it should switch to
52+ // streaming.
53+ let channel = format ! ( "ds/{}/{}" , datasource_instance_settings . uid, path)
54+ . parse ( )
55+ . map_err ( Error :: CreatingChannel ) ?;
56+ frame. set_channel ( channel) ;
57+ let frame = frame. check ( ) ?;
5858
59- Ok ( backend:: DataResponse :: new ( query. ref_id , vec ! [ frame] ) )
59+ Ok ( backend:: DataResponse :: new (
60+ query. ref_id . clone ( ) ,
61+ vec ! [ frame] ,
62+ ) )
63+ }
6064}
6165
6266#[ backend:: async_trait]
@@ -65,43 +69,27 @@ impl backend::DataService for MaterializePlugin {
6569 type QueryError = QueryError ;
6670 type Stream < ' a > = backend:: BoxDataResponseStream < ' a , Self :: QueryError > ;
6771
68- async fn query_data (
69- & self ,
70- request : backend:: QueryDataRequest < Self :: Query > ,
71- ) -> Self :: Stream < ' _ > {
72+ async fn query_data < ' stream , ' req : ' stream , ' slf : ' req > (
73+ & ' slf self ,
74+ request : & ' req backend:: QueryDataRequest < Self :: Query > ,
75+ ) -> Self :: Stream < ' stream > {
7276 let datasource_settings = request
7377 . plugin_context
7478 . datasource_instance_settings
75- . clone ( )
79+ . as_ref ( )
7680 . ok_or ( Error :: MissingDatasource )
7781 . unwrap ( ) ;
78- // let clients: Vec<_> = request
79- // .queries
80- // .iter()
81- // .map(|_| self.get_client(&datasource_settings))
82- // .collect::<FuturesUnordered<_>>()
83- // .collect()
84- // .await;
85- let client = Arc :: new ( self . get_client ( & datasource_settings) . await . unwrap ( ) ) ;
86- let queries = self . sql_queries . clone ( ) ;
8782 Box :: pin (
8883 request
8984 . queries
90- . into_iter ( )
91- . map ( |x| {
92- let client = client. clone ( ) ;
93- let queries = queries. clone ( ) ;
94- let ref_id = x. ref_id . clone ( ) ;
95- let uid = datasource_settings. uid . clone ( ) ;
96- async move {
97- // let client = client.map_err(|source| QueryError {
98- // ref_id: ref_id.clone(),
99- // source,
100- // })?;
101- query_data_single ( & client, & uid, x, queries)
102- . await
103- . map_err ( |source| QueryError { ref_id, source } )
104- }
85+ . iter ( )
86+ . map ( |x| async move {
87+ self . query_data_single ( datasource_settings, x)
88+ . await
89+ . map_err ( |source| QueryError {
90+ ref_id : x. ref_id . clone ( ) ,
91+ source,
92+ } )
10593 } )
10694 . collect :: < FuturesOrdered < _ > > ( ) ,
10795 )
0 commit comments