@@ -46,3 +46,189 @@ async fn run_writer<W: AsyncWrite>(mut actor: WriterActor<W>) {
4646
4747 debug ! ( "writer loop is shutting down" ) ;
4848}
49+
50+ #[ cfg( test) ]
51+ mod tests {
52+ use super :: * ;
53+ use crate :: message:: parser:: RawFixMessage ;
54+ use tokio:: io:: { AsyncReadExt , duplex} ;
55+
56+ /// Test that a single message is successfully written to the socket
57+ #[ tokio:: test]
58+ async fn test_send_single_message ( ) {
59+ let ( reader, writer) = duplex ( 1024 ) ;
60+ let ( _reader_half, writer_half) = tokio:: io:: split ( writer) ;
61+ let writer_ref = spawn_socket_writer ( writer_half) ;
62+
63+ let fix_message = b"8=FIX.4.4\x01 9=77\x01 35=A\x01 34=1\x01 49=sender\x01 52=20230908-08:24:56.574\x01 56=target\x01 98=0\x01 108=30\x01 141=Y\x01 10=037\x01 " ;
64+ let raw_message = RawFixMessage :: new ( fix_message. to_vec ( ) ) ;
65+
66+ writer_ref. send_raw_message ( raw_message) . await ;
67+
68+ // read from the other end of the duplex stream
69+ let mut reader = reader;
70+ let mut buffer = vec ! [ 0u8 ; 1024 ] ;
71+ let n = tokio:: time:: timeout (
72+ tokio:: time:: Duration :: from_millis ( 100 ) ,
73+ reader. read ( & mut buffer) ,
74+ )
75+ . await
76+ . expect ( "timeout waiting for message" )
77+ . expect ( "read failed" ) ;
78+
79+ assert_eq ! ( & buffer[ ..n] , fix_message) ;
80+ }
81+
82+ /// Test that multiple messages are written in order
83+ #[ tokio:: test]
84+ async fn test_send_multiple_messages ( ) {
85+ let ( reader, writer) = duplex ( 2048 ) ;
86+ let ( _reader_half, writer_half) = tokio:: io:: split ( writer) ;
87+ let writer_ref = spawn_socket_writer ( writer_half) ;
88+
89+ let msg1 = b"8=FIX.4.4\x01 9=77\x01 35=A\x01 34=1\x01 49=sender\x01 52=20230908-08:24:56.574\x01 56=target\x01 98=0\x01 108=30\x01 141=Y\x01 10=037\x01 " ;
90+ let msg2 = b"8=FIX.4.4\x01 9=77\x01 35=A\x01 34=2\x01 49=sender\x01 52=20230908-08:24:58.574\x01 56=target\x01 98=0\x01 108=30\x01 141=Y\x01 10=040\x01 " ;
91+ let msg3 = b"8=FIX.4.4\x01 9=77\x01 35=A\x01 34=3\x01 49=sender\x01 52=20230908-08:24:59.574\x01 56=target\x01 98=0\x01 108=30\x01 141=Y\x01 10=043\x01 " ;
92+
93+ writer_ref
94+ . send_raw_message ( RawFixMessage :: new ( msg1. to_vec ( ) ) )
95+ . await ;
96+ writer_ref
97+ . send_raw_message ( RawFixMessage :: new ( msg2. to_vec ( ) ) )
98+ . await ;
99+ writer_ref
100+ . send_raw_message ( RawFixMessage :: new ( msg3. to_vec ( ) ) )
101+ . await ;
102+
103+ // read all messages from the other end
104+ let mut reader = reader;
105+ let mut buffer = vec ! [ 0u8 ; 2048 ] ;
106+ let n = tokio:: time:: timeout (
107+ tokio:: time:: Duration :: from_millis ( 100 ) ,
108+ reader. read ( & mut buffer) ,
109+ )
110+ . await
111+ . expect ( "timeout waiting for messages" )
112+ . expect ( "read failed" ) ;
113+
114+ // verify all three messages were written in order
115+ let mut expected = Vec :: new ( ) ;
116+ expected. extend_from_slice ( msg1) ;
117+ expected. extend_from_slice ( msg2) ;
118+ expected. extend_from_slice ( msg3) ;
119+
120+ assert_eq ! ( & buffer[ ..n] , & expected[ ..] ) ;
121+ }
122+
123+ /// Test that disconnect message properly shuts down the writer loop
124+ #[ tokio:: test]
125+ async fn test_disconnect ( ) {
126+ let ( reader, writer) = duplex ( 1024 ) ;
127+ let ( _reader_half, writer_half) = tokio:: io:: split ( writer) ;
128+ let writer_ref = spawn_socket_writer ( writer_half) ;
129+
130+ // send a message first
131+ let fix_message = b"8=FIX.4.4\x01 9=77\x01 35=A\x01 34=1\x01 49=sender\x01 52=20230908-08:24:56.574\x01 56=target\x01 98=0\x01 108=30\x01 141=Y\x01 10=037\x01 " ;
132+ writer_ref
133+ . send_raw_message ( RawFixMessage :: new ( fix_message. to_vec ( ) ) )
134+ . await ;
135+
136+ // disconnect the writer
137+ writer_ref. disconnect ( ) . await ;
138+
139+ // verify the message was sent before disconnect
140+ let mut reader = reader;
141+ let mut buffer = vec ! [ 0u8 ; 1024 ] ;
142+ let n = tokio:: time:: timeout (
143+ tokio:: time:: Duration :: from_millis ( 100 ) ,
144+ reader. read ( & mut buffer) ,
145+ )
146+ . await
147+ . expect ( "timeout waiting for message" )
148+ . expect ( "read failed" ) ;
149+
150+ assert_eq ! ( & buffer[ ..n] , fix_message) ;
151+ }
152+
153+ /// Test that the writer can handle an empty message
154+ #[ tokio:: test]
155+ async fn test_send_empty_message ( ) {
156+ let ( reader, writer) = duplex ( 1024 ) ;
157+ let ( _reader_half, writer_half) = tokio:: io:: split ( writer) ;
158+ let writer_ref = spawn_socket_writer ( writer_half) ;
159+
160+ let empty_message = RawFixMessage :: new ( vec ! [ ] ) ;
161+ writer_ref. send_raw_message ( empty_message) . await ;
162+
163+ // read from the other end - should complete immediately with 0 bytes
164+ let mut reader = reader;
165+ let mut buffer = vec ! [ 0u8 ; 1024 ] ;
166+
167+ // give it a moment to process
168+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 10 ) ) . await ;
169+
170+ // try to read, but with a short timeout since we expect nothing
171+ match tokio:: time:: timeout (
172+ tokio:: time:: Duration :: from_millis ( 50 ) ,
173+ reader. read ( & mut buffer) ,
174+ )
175+ . await
176+ {
177+ Ok ( Ok ( n) ) => assert_eq ! ( n, 0 , "Expected 0 bytes for empty message" ) ,
178+ Err ( _) => {
179+ // Timeout is also acceptable - no data to read
180+ }
181+ Ok ( Err ( e) ) => panic ! ( "Read failed: {}" , e) ,
182+ }
183+ }
184+
185+ /// Test that the writer loop properly shuts down when the mailbox is closed
186+ #[ tokio:: test]
187+ async fn test_writer_shutdown_on_mailbox_close ( ) {
188+ let ( _reader, writer) = duplex ( 1024 ) ;
189+ let ( _reader_half, writer_half) = tokio:: io:: split ( writer) ;
190+ let writer_ref = spawn_socket_writer ( writer_half) ;
191+
192+ // send a message to ensure the writer is running
193+ let fix_message = b"8=FIX.4.4\x01 9=77\x01 35=A\x01 34=1\x01 49=sender\x01 52=20230908-08:24:56.574\x01 56=target\x01 98=0\x01 108=30\x01 141=Y\x01 10=037\x01 " ;
194+ writer_ref
195+ . send_raw_message ( RawFixMessage :: new ( fix_message. to_vec ( ) ) )
196+ . await ;
197+
198+ // drop the writer_ref, which closes the channel
199+ drop ( writer_ref) ;
200+
201+ // give the writer loop time to shut down
202+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( 50 ) ) . await ;
203+
204+ // If the test completes without hanging, the writer loop properly shut down
205+ // when the mailbox was closed
206+ }
207+
208+ /// Test writer behaviour with a closed socket (write error handling)
209+ #[ tokio:: test]
210+ async fn test_write_error_handling ( ) {
211+ let ( reader, writer) = duplex ( 1024 ) ;
212+ let ( _reader_half, writer_half) = tokio:: io:: split ( writer) ;
213+ let writer_ref = spawn_socket_writer ( writer_half) ;
214+
215+ // close the reader end, which should cause write errors
216+ drop ( reader) ;
217+
218+ // try to send a message - should not panic, but will log a warning
219+ let fix_message = b"8=FIX.4.4\x01 9=77\x01 35=A\x01 34=1\x01 49=sender\x01 52=20230908-08:24:56.574\x01 56=target\x01 98=0\x01 108=30\x01 141=Y\x01 10=037\x01 " ;
220+ writer_ref
221+ . send_raw_message ( RawFixMessage :: new ( fix_message. to_vec ( ) ) )
222+ . await ;
223+
224+ // writer should still be running (not shut down due to write error)
225+ // send another message to verify
226+ writer_ref
227+ . send_raw_message ( RawFixMessage :: new ( fix_message. to_vec ( ) ) )
228+ . await ;
229+
230+ // if we reach here without panic, the writer correctly handled the error
231+ // and continued running (as per the code comment that it only shuts down
232+ // when explicitly requested)
233+ }
234+ }
0 commit comments