@@ -153,17 +153,17 @@ impl UffdHandler {
153153 panic ! ( "Could not get UFFD and mappings after 5 retries" ) ;
154154 }
155155
156- pub fn from_unix_stream ( stream : & UnixStream , backing_buffer : * const u8 , size : usize ) -> Self {
157- let ( body , file ) = Self :: get_mappings_and_file ( stream ) ;
158- let mappings =
159- serde_json :: from_str :: < Vec < GuestRegionUffdMapping > > ( & body ) . unwrap_or_else ( |_| {
160- panic ! ( "Cannot deserialize memory mappings. Received body: {body}" )
161- } ) ;
156+ pub fn from_mappings (
157+ mappings : Vec < GuestRegionUffdMapping > ,
158+ uffd : File ,
159+ backing_buffer : * const u8 ,
160+ size : usize ,
161+ ) -> Self {
162162 let memsize: usize = mappings. iter ( ) . map ( |r| r. size ) . sum ( ) ;
163163 // Page size is the same for all memory regions, so just grab the first one
164164 let first_mapping = mappings. first ( ) . unwrap_or_else ( || {
165165 panic ! (
166- "Cannot get the first mapping. Mappings size is {}. Received body: {body} " ,
166+ "Cannot get the first mapping. Mappings size is {}." ,
167167 mappings. len( )
168168 )
169169 } ) ;
@@ -173,7 +173,7 @@ impl UffdHandler {
173173 assert_eq ! ( memsize, size) ;
174174 assert ! ( page_size. is_power_of_two( ) ) ;
175175
176- let uffd = unsafe { Uffd :: from_raw_fd ( file . into_raw_fd ( ) ) } ;
176+ let uffd = unsafe { Uffd :: from_raw_fd ( uffd . into_raw_fd ( ) ) } ;
177177
178178 Self {
179179 mem_regions : mappings,
@@ -375,22 +375,110 @@ impl Runtime {
375375 if pollfds[ i] . revents & libc:: POLLIN != 0 {
376376 nready -= 1 ;
377377 if pollfds[ i] . fd == self . stream . as_raw_fd ( ) {
378- // Handle new uffd from stream
379- let handler = UffdHandler :: from_unix_stream (
380- & self . stream ,
381- self . backing_memory ,
382- self . backing_memory_size ,
383- ) ;
384- pollfds. push ( libc:: pollfd {
385- fd : handler. uffd . as_raw_fd ( ) ,
386- events : libc:: POLLIN ,
387- revents : 0 ,
388- } ) ;
389- self . uffds . insert ( handler. uffd . as_raw_fd ( ) , handler) ;
390-
391- // If connection is closed, we can skip the socket from being polled.
392- if pollfds[ i] . revents & ( libc:: POLLRDHUP | libc:: POLLHUP ) != 0 {
393- skip_stream = 1 ;
378+ const BUFFER_SIZE : usize = 4096 ;
379+
380+ let mut buffer = [ 0u8 ; BUFFER_SIZE ] ;
381+ let mut fds = [ 0 ; 1 ] ;
382+ let mut current_pos = 0 ;
383+ let mut exit_loop = false ;
384+
385+ loop {
386+ // Read more data into the buffer if there's space
387+ let mut iov = [ libc:: iovec {
388+ iov_base : ( buffer[ current_pos..] ) . as_mut_ptr ( ) as * mut libc:: c_void ,
389+ iov_len : buffer. len ( ) - current_pos,
390+ } ] ;
391+
392+ if current_pos < BUFFER_SIZE {
393+ let ret = unsafe { self . stream . recv_with_fds ( & mut iov, & mut fds) } ;
394+ match ret {
395+ Ok ( ( 0 , _) ) => break ,
396+ Ok ( ( n, 1 ) ) => current_pos += n,
397+ Ok ( ( n, 0 ) ) | Ok ( ( _, n) ) => panic ! ( "Wrong number of fds: {}" , n) ,
398+ Err ( e) if e. errno ( ) == libc:: EAGAIN => {
399+ if exit_loop {
400+ break ;
401+ }
402+ continue ;
403+ }
404+ Err ( e) => panic ! ( "Read error: {}" , e) ,
405+ }
406+
407+ exit_loop = false ;
408+ }
409+
410+ let mut parser =
411+ serde_json:: Deserializer :: from_slice ( & buffer[ ..current_pos] )
412+ . into_iter :: < UffdMsgFromFirecracker > ( ) ;
413+ let mut total_consumed = 0 ;
414+ let mut needs_more = false ;
415+
416+ while let Some ( result) = parser. next ( ) {
417+ match result {
418+ Ok ( UffdMsgFromFirecracker :: Mappings ( mappings) ) => {
419+ // Handle new uffd from stream
420+ let handler = UffdHandler :: from_mappings (
421+ mappings,
422+ unsafe { File :: from_raw_fd ( fds[ 0 ] ) } ,
423+ self . backing_memory ,
424+ self . backing_memory_size ,
425+ ) ;
426+
427+ let fd = handler. uffd . as_raw_fd ( ) ;
428+
429+ pollfds. push ( libc:: pollfd {
430+ fd,
431+ events : libc:: POLLIN ,
432+ revents : 0 ,
433+ } ) ;
434+ self . uffds . insert ( fd, handler) ;
435+
436+ // If connection is closed, we can skip the socket from
437+ // being polled.
438+ if pollfds[ i] . revents & ( libc:: POLLRDHUP | libc:: POLLHUP )
439+ != 0
440+ {
441+ skip_stream = 1 ;
442+ }
443+
444+ total_consumed = parser. byte_offset ( ) ;
445+ }
446+ Ok ( UffdMsgFromFirecracker :: FaultReq ( ref _fault_request) ) => {
447+ unimplemented ! (
448+ "Received unsupported message from Firecracker: {:?}" ,
449+ result
450+ )
451+ }
452+ Err ( e) if e. is_eof ( ) => {
453+ needs_more = true ;
454+ break ;
455+ }
456+ Err ( e) => {
457+ println ! (
458+ "Buffer content: {:?}" ,
459+ std:: str :: from_utf8( & buffer[ ..current_pos] )
460+ ) ;
461+ panic ! ( "Invalid JSON: {}" , e) ;
462+ }
463+ }
464+ }
465+
466+ if total_consumed > 0 {
467+ buffer. copy_within ( total_consumed..current_pos, 0 ) ;
468+ current_pos -= total_consumed;
469+ }
470+
471+ if needs_more {
472+ continue ;
473+ }
474+
475+ // We consumed all data in the buffer, but the socket may have remaining
476+ // unread data so we attempt to read from it
477+ // and exit the loop only if we confirm that nothing is in
478+ // there.
479+ if current_pos == 0 {
480+ exit_loop = true ;
481+ }
394482 }
395483 } else {
396484 // Handle one of uffd page faults
0 commit comments