@@ -34,7 +34,9 @@ use tokio::io::AsyncReadExt;
3434use vibeio:: net:: { PollTcpStream as VibePollTcpStream , TcpListener as VibeTcpListener } ;
3535
3636use crate :: async_event:: AsyncEvent ;
37- use crate :: context:: { ensure_running_loop, run_in_context} ;
37+ use crate :: context:: {
38+ ensure_running_loop, run_in_context, run_in_context_noargs, run_in_context_onearg,
39+ } ;
3840use crate :: fast_streams:: { PyFastStreamProtocol , PyFastStreamReader } ;
3941use crate :: fd_ops;
4042use crate :: loop_core:: { LoopCommand , LoopCore } ;
@@ -59,6 +61,7 @@ enum PendingReadEvent {
5961
6062const DEFAULT_WRITE_BUFFER_HIGH_WATER : usize = 64 * 1024 ;
6163const DEFAULT_WRITE_BUFFER_LOW_WATER : usize = DEFAULT_WRITE_BUFFER_HIGH_WATER / 4 ;
64+ const MAX_PENDING_READ_COALESCE_BYTES : usize = 256 * 1024 ;
6265
6366struct OwnedWriteBuffer {
6467 bytes : Box < [ u8 ] > ,
@@ -772,18 +775,6 @@ impl StreamTransportCore {
772775 spawn_writer_worker ( Arc :: clone ( self ) , target, writer_rx) ;
773776 }
774777
775- fn call_protocol_with_tuple (
776- & self ,
777- py : Python < ' _ > ,
778- callback : & Py < PyAny > ,
779- context : & Py < PyAny > ,
780- context_needs_run : bool ,
781- args : & Bound < ' _ , PyTuple > ,
782- ) -> PyResult < Py < PyAny > > {
783- let tuple = args. clone ( ) . unbind ( ) ;
784- run_in_context ( py, context, context_needs_run, callback, & tuple)
785- }
786-
787778 fn server_ref ( & self ) -> Option < Weak < ServerCore > > {
788779 self . state
789780 . lock ( )
@@ -840,14 +831,39 @@ impl StreamTransportCore {
840831 drained
841832 } ;
842833
834+ let mut pending_data: Option < Vec < u8 > > = None ;
835+
843836 while let Some ( event) = pending. pop_front ( ) {
844837 match event {
845838 PendingReadEvent :: Data ( data) => {
846839 profiling:: scope!( "stream.pending.data" ) ;
847- if self . is_closing_or_lost ( ) {
848- continue ;
840+ match pending_data. as_mut ( ) {
841+ Some ( buffer)
842+ if buffer. len ( ) + data. len ( ) <= MAX_PENDING_READ_COALESCE_BYTES =>
843+ {
844+ buffer. extend_from_slice ( & data) ;
845+ }
846+ Some ( _) => {
847+ if let Err ( err) =
848+ self . flush_pending_data_with_py ( py, & mut pending_data)
849+ {
850+ let _ = self . report_error_with_py (
851+ py,
852+ err,
853+ "stream data_received callback failed" ,
854+ ) ;
855+ let _ = self . connection_lost_with_py ( py, None ) ;
856+ self . read_events_scheduled . store ( false , Ordering :: Release ) ;
857+ return Ok ( ( ) ) ;
858+ }
859+ pending_data = Some ( data. into_vec ( ) ) ;
860+ }
861+ None => pending_data = Some ( data. into_vec ( ) ) ,
849862 }
850- if let Err ( err) = self . data_received_with_py ( py, & data) {
863+ }
864+ PendingReadEvent :: Eof => {
865+ profiling:: scope!( "stream.pending.eof" ) ;
866+ if let Err ( err) = self . flush_pending_data_with_py ( py, & mut pending_data) {
851867 let _ = self . report_error_with_py (
852868 py,
853869 err,
@@ -857,9 +873,6 @@ impl StreamTransportCore {
857873 self . read_events_scheduled . store ( false , Ordering :: Release ) ;
858874 return Ok ( ( ) ) ;
859875 }
860- }
861- PendingReadEvent :: Eof => {
862- profiling:: scope!( "stream.pending.eof" ) ;
863876 match self . eof_received_with_py ( py) {
864877 Ok ( true ) => {
865878 self . read_events_scheduled . store ( false , Ordering :: Release ) ;
@@ -885,21 +898,58 @@ impl StreamTransportCore {
885898 }
886899 PendingReadEvent :: ConnectionLost ( message) => {
887900 profiling:: scope!( "stream.pending.connection_lost" ) ;
901+ if let Err ( err) = self . flush_pending_data_with_py ( py, & mut pending_data) {
902+ let _ = self . report_error_with_py (
903+ py,
904+ err,
905+ "stream data_received callback failed" ,
906+ ) ;
907+ let _ = self . connection_lost_with_py ( py, None ) ;
908+ self . read_events_scheduled . store ( false , Ordering :: Release ) ;
909+ return Ok ( ( ) ) ;
910+ }
888911 let err = message. map ( PyRuntimeError :: new_err) ;
889912 let _ = self . connection_lost_with_py ( py, err) ;
890913 self . read_events_scheduled . store ( false , Ordering :: Release ) ;
891914 return Ok ( ( ) ) ;
892915 }
893916 PendingReadEvent :: PauseWriting => {
894917 profiling:: scope!( "stream.pending.pause_writing" ) ;
918+ if let Err ( err) = self . flush_pending_data_with_py ( py, & mut pending_data) {
919+ let _ = self . report_error_with_py (
920+ py,
921+ err,
922+ "stream data_received callback failed" ,
923+ ) ;
924+ let _ = self . connection_lost_with_py ( py, None ) ;
925+ self . read_events_scheduled . store ( false , Ordering :: Release ) ;
926+ return Ok ( ( ) ) ;
927+ }
895928 self . pause_writing_with_py ( py) ?;
896929 }
897930 PendingReadEvent :: ResumeWriting => {
898931 profiling:: scope!( "stream.pending.resume_writing" ) ;
932+ if let Err ( err) = self . flush_pending_data_with_py ( py, & mut pending_data) {
933+ let _ = self . report_error_with_py (
934+ py,
935+ err,
936+ "stream data_received callback failed" ,
937+ ) ;
938+ let _ = self . connection_lost_with_py ( py, None ) ;
939+ self . read_events_scheduled . store ( false , Ordering :: Release ) ;
940+ return Ok ( ( ) ) ;
941+ }
899942 self . resume_writing_with_py ( py) ?;
900943 }
901944 }
902945 }
946+
947+ if let Err ( err) = self . flush_pending_data_with_py ( py, & mut pending_data) {
948+ let _ = self . report_error_with_py ( py, err, "stream data_received callback failed" ) ;
949+ let _ = self . connection_lost_with_py ( py, None ) ;
950+ self . read_events_scheduled . store ( false , Ordering :: Release ) ;
951+ return Ok ( ( ) ) ;
952+ }
903953 }
904954 }
905955
@@ -910,8 +960,7 @@ impl StreamTransportCore {
910960 context : & Py < PyAny > ,
911961 context_needs_run : bool ,
912962 ) -> PyResult < Py < PyAny > > {
913- let args = PyTuple :: empty ( py) ;
914- self . call_protocol_with_tuple ( py, callback, context, context_needs_run, & args)
963+ run_in_context_noargs ( py, context, context_needs_run, callback)
915964 }
916965
917966 fn call_protocol_method1 (
@@ -922,8 +971,23 @@ impl StreamTransportCore {
922971 context_needs_run : bool ,
923972 arg : Py < PyAny > ,
924973 ) -> PyResult < Py < PyAny > > {
925- let args = PyTuple :: new ( py, [ arg] ) ?;
926- self . call_protocol_with_tuple ( py, callback, context, context_needs_run, & args)
974+ run_in_context_onearg ( py, context, context_needs_run, callback, arg. bind ( py) )
975+ }
976+
977+ fn flush_pending_data_with_py (
978+ & self ,
979+ py : Python < ' _ > ,
980+ pending_data : & mut Option < Vec < u8 > > ,
981+ ) -> PyResult < ( ) > {
982+ let Some ( data) = pending_data. take ( ) else {
983+ return Ok ( ( ) ) ;
984+ } ;
985+
986+ if self . is_closing_or_lost ( ) {
987+ return Ok ( ( ) ) ;
988+ }
989+
990+ self . data_received_with_py ( py, & data)
927991 }
928992
929993 fn report_error_with_py ( & self , py : Python < ' _ > , err : PyErr , message : & str ) -> PyResult < ( ) > {
@@ -1024,10 +1088,12 @@ impl StreamTransportCore {
10241088 {
10251089 let args = PyTuple :: new ( py, [ data. len ( ) ] ) ?. unbind ( ) ;
10261090 let buffer_obj = run_in_context ( py, & context, context_needs_run, get_buffer, & args) ?;
1027- let memoryview = py
1028- . import ( "builtins" ) ?
1029- . getattr ( "memoryview" ) ?
1030- . call1 ( ( buffer_obj. bind ( py) , ) ) ?;
1091+ let memoryview = unsafe {
1092+ Bound :: from_owned_ptr_or_err (
1093+ py,
1094+ pyo3:: ffi:: PyMemoryView_FromObject ( buffer_obj. bind ( py) . as_ptr ( ) ) ,
1095+ )
1096+ } ?;
10311097 memoryview. set_item (
10321098 PySlice :: new ( py, 0 , data. len ( ) as isize , 1 ) ,
10331099 PyBytes :: new ( py, data) ,
0 commit comments