@@ -86,35 +86,45 @@ def connect_thread(latch)
8686 def connect_stream ( latch )
8787 return Constants ::PUSH_NONRETRYABLE_ERROR unless socket_write ( latch )
8888 while connected? || @first_event . value
89- begin
90- partial_data = ""
91- Timeout ::timeout @read_timeout do
89+ if IO . select ( [ @socket ] , nil , nil , @read_timeout )
90+ begin
9291 partial_data = @socket . readpartial ( 10_000 )
93- end
94- read_first_event ( partial_data , latch )
95-
96- raise 'eof exception' if partial_data == :eof
97- rescue Timeout ::Error => e
98- log_if_debug ( "SSE read operation timed out!: #{ e . inspect } " , 3 )
99- return Constants ::PUSH_RETRYABLE_ERROR
100- rescue EOFError
101- raise 'eof exception'
102- rescue Errno ::EAGAIN => e
103- log_if_debug ( "SSE client transient error: #{ e . inspect } " , 1 )
104- IO . select ( [ tcp_socket ] )
105- retry
106- rescue Errno ::EBADF , IOError => e
107- log_if_debug ( e . inspect , 3 )
108- return nil
109- rescue StandardError => e
110- return nil if ENV [ 'SPLITCLIENT_ENV' ] == 'test'
92+ read_first_event ( partial_data , latch )
11193
112- log_if_debug ( "Error reading partial data: #{ e . inspect } " , 3 )
94+ raise 'eof exception' if partial_data == :eof
95+ rescue IO ::WaitReadable => e
96+ log_if_debug ( "SSE client transient error: #{ e . inspect } " , 1 )
97+ IO . select ( [ @socket ] , nil , nil , @read_timeout )
98+ retry
99+ rescue Errno ::ETIMEDOUT => e
100+ log_if_debug ( "SSE read operation timed out!: #{ e . inspect } " , 3 )
101+ return Constants ::PUSH_RETRYABLE_ERROR
102+ rescue EOFError => e
103+ log_if_debug ( "SSE read operation EOF Exception!: #{ e . inspect } " , 3 )
104+ raise 'eof exception'
105+ rescue Errno ::EAGAIN => e
106+ log_if_debug ( "SSE client transient error: #{ e . inspect } " , 1 )
107+ IO . select ( [ @socket ] , nil , nil , @read_timeout )
108+ retry
109+ rescue Errno ::EBADF , IOError => e
110+ log_if_debug ( "SSE read operation EBADF or IOError: #{ e . inspect } " , 3 )
111+ return nil
112+ rescue StandardError => e
113+ log_if_debug ( "SSE read operation StandardError: #{ e . inspect } " , 3 )
114+ return nil if ENV [ 'SPLITCLIENT_ENV' ] == 'test'
115+
116+ log_if_debug ( "Error reading partial data: #{ e . inspect } " , 3 )
117+ return Constants ::PUSH_RETRYABLE_ERROR
118+ end
119+ else
120+ @config . logger . debug ( "SSE read operation timed out, no data available." )
113121 return Constants ::PUSH_RETRYABLE_ERROR
114122 end
115123
116124 process_data ( partial_data )
117125 end
126+ log_if_debug ( "SSE read operation exited: #{ connected? } " , 1 )
127+
118128 nil
119129 end
120130
@@ -142,6 +152,7 @@ def read_first_event(data, latch)
142152
143153 if response_code == OK_CODE && !error_event
144154 @connected . make_true
155+ @config . logger . debug ( "SSE client first event Connected is true" )
145156 @telemetry_runtime_producer . record_streaming_event ( Telemetry ::Domain ::Constants ::SSE_CONNECTION_ESTABLISHED , nil )
146157 push_status ( Constants ::PUSH_CONNECTED )
147158 end
@@ -166,9 +177,8 @@ def socket_connect
166177 IO . select ( nil , [ ssl_socket ] )
167178 retry
168179 end
169-
170180 return ssl_socket
171- # return ssl_socket.connect
181+
172182 rescue Exception => e
173183 @config . logger . error ( "socket connect error: #{ e . inspect } " )
174184 return nil
@@ -179,9 +189,9 @@ def socket_connect
179189 end
180190
181191 def process_data ( partial_data )
192+ log_if_debug ( "Event partial data: #{ partial_data } " , 1 )
182193 return if partial_data . nil? || partial_data == KEEP_ALIVE_RESPONSE
183194
184- log_if_debug ( "Event partial data: #{ partial_data } " , 1 )
185195 events = @event_parser . parse ( partial_data )
186196 events . each { |event | process_event ( event ) }
187197 rescue StandardError => e
0 commit comments