@@ -21,6 +21,7 @@ use libsql_hrana::proto::{Batch, BatchResult, Col, Stmt, StmtResult};
2121use std:: collections:: VecDeque ;
2222use std:: future:: Future ;
2323use std:: pin:: Pin ;
24+ use std:: sync:: atomic:: Ordering ;
2425use std:: sync:: Arc ;
2526use std:: task:: { Context , Poll } ;
2627
@@ -122,7 +123,7 @@ where
122123
123124impl < T > Statement < T >
124125where
125- T : HttpSend ,
126+ T : HttpSend + Send + Sync + ' static ,
126127{
127128 pub ( crate ) fn new ( stream : HranaStream < T > , sql : String , want_rows : bool ) -> crate :: Result < Self > {
128129 // in SQLite when a multiple statements are glued together into one string, only the first one is
@@ -170,20 +171,20 @@ where
170171 pub ( crate ) async fn query_raw (
171172 & mut self ,
172173 params : & Params ,
173- ) -> crate :: Result < HranaRows < T :: Stream > > {
174+ ) -> crate :: Result < HranaRows < T :: Stream , T > > {
174175 let mut stmt = self . inner . clone ( ) ;
175176 bind_params ( params. clone ( ) , & mut stmt) ;
176177
177178 let cursor = self . stream . cursor ( Batch :: single ( stmt) ) . await ?;
178- let rows = HranaRows :: from_cursor ( cursor) . await ?;
179+ let rows = HranaRows :: from_cursor ( cursor, self . stream . clone ( ) ) . await ?;
179180
180181 Ok ( rows)
181182 }
182183}
183184
184185impl < T > Statement < T >
185186where
186- T : HttpSend ,
187+ T : HttpSend + Send + Sync + ' static ,
187188 <T as HttpSend >:: Stream : Send + Sync + ' static ,
188189{
189190 pub async fn query ( & mut self , params : & Params ) -> crate :: Result < super :: Rows > {
@@ -192,28 +193,37 @@ where
192193 }
193194}
194195
195- pub struct HranaRows < S > {
196+ pub struct HranaRows < S , T : HttpSend > {
196197 cursor_step : OwnedCursorStep < S > ,
197198 column_types : Option < Vec < ValueType > > ,
199+ stream : HranaStream < T > ,
198200}
199201
200- impl < S > HranaRows < S >
202+ impl < S , T > HranaRows < S , T >
201203where
204+ T : HttpSend + Send + Sync + ' static ,
202205 S : Stream < Item = std:: io:: Result < Bytes > > + Unpin ,
203206{
204- async fn from_cursor ( cursor : Cursor < S > ) -> Result < Self > {
207+ async fn from_cursor ( cursor : Cursor < S > , stream : HranaStream < T > ) -> Result < Self > {
205208 let cursor_step = cursor. next_step_owned ( ) . await ?;
206209 Ok ( HranaRows {
207210 cursor_step,
208211 column_types : None ,
212+ stream,
209213 } )
210214 }
211215
212216 pub async fn next ( & mut self ) -> crate :: Result < Option < super :: Row > > {
213217 let row = match self . cursor_step . next ( ) . await {
214218 Some ( Ok ( row) ) => row,
215219 Some ( Err ( e) ) => return Err ( crate :: Error :: Hrana ( Box :: new ( e) ) ) ,
216- None => return Ok ( None ) ,
220+ None => {
221+ self . stream
222+ . inner
223+ . affected_row_count
224+ . store ( self . cursor_step . affected_rows ( ) . into ( ) , Ordering :: SeqCst ) ;
225+ return Ok ( None ) ;
226+ }
217227 } ;
218228
219229 if self . column_types . is_none ( ) {
@@ -254,17 +264,19 @@ where
254264}
255265
256266#[ async_trait:: async_trait]
257- impl < S > RowsInner for HranaRows < S >
267+ impl < S , T > RowsInner for HranaRows < S , T >
258268where
269+ T : HttpSend + Send + Sync + ' static ,
259270 S : Stream < Item = std:: io:: Result < Bytes > > + Send + Sync + Unpin ,
260271{
261272 async fn next ( & mut self ) -> crate :: Result < Option < super :: Row > > {
262273 self . next ( ) . await
263274 }
264275}
265276
266- impl < S > ColumnsInner for HranaRows < S >
277+ impl < S , T > ColumnsInner for HranaRows < S , T >
267278where
279+ T : HttpSend + Send + Sync + ' static ,
268280 S : Stream < Item = std:: io:: Result < Bytes > > + Send + Sync + Unpin ,
269281{
270282 fn column_count ( & self ) -> i32 {
0 commit comments