2323package com .twitter .clientlib .stream ;
2424
2525
26+ import java .io .IOException ;
2627import java .util .ArrayList ;
2728import java .util .List ;
2829import java .util .concurrent .BlockingQueue ;
3637import com .fasterxml .jackson .databind .JsonMappingException ;
3738import com .fasterxml .jackson .databind .ObjectMapper ;
3839
40+ import com .twitter .clientlib .exceptions .EmptyStreamTimeoutException ;
3941import com .twitter .clientlib .model .StreamingTweet ;
4042import okio .BufferedSource ;
4143
4244public class TweetsStreamExecutor {
45+
46+ private static final long EMPTY_STREAM_TIMEOUT = 20000 ;
47+ private static final int POLL_WAIT = 5 ;
4348 private volatile BlockingQueue <String > rawTweets ;
4449 private volatile BlockingQueue <StreamingTweet > tweets ;
4550 private volatile boolean isRunning = true ;
@@ -90,9 +95,12 @@ public synchronized void shutdown() {
9095 shutDownServices ();
9196 try {
9297 terminateServices ();
98+ stream .close ();
9399 } catch (InterruptedException ie ) {
94100 shutDownServices ();
95101 Thread .currentThread ().interrupt ();
102+ } catch (IOException e ) {
103+
96104 }
97105 System .out .println ("TweetsStreamListenersExecutor is shutting down." );
98106 }
@@ -109,9 +117,9 @@ private void terminateServices() throws InterruptedException {
109117 terminateService (listenersService );
110118 }
111119 private void terminateService (ExecutorService executorService ) throws InterruptedException {
112- if (!executorService .awaitTermination (3 , TimeUnit .SECONDS )) {
120+ if (!executorService .awaitTermination (1500 , TimeUnit .MILLISECONDS )) {
113121 executorService .shutdownNow ();
114- if (!executorService .awaitTermination (3 , TimeUnit .SECONDS ))
122+ if (!executorService .awaitTermination (1500 , TimeUnit .MILLISECONDS ))
115123 System .err .println ("Pool did not terminate" );
116124 }
117125 }
@@ -126,19 +134,32 @@ public void run() {
126134 public void queueTweets () {
127135 String line = null ;
128136 try {
137+ boolean emptyResponse = false ;
138+ long firstEmptyResponseMillis = 0 ;
139+ long lastEmptyReponseMillis ;
129140 while (isRunning ) {
130141 line = stream .readUtf8Line ();
131142 if (line == null || line .isEmpty ()) {
143+ if (!emptyResponse ) {
144+ firstEmptyResponseMillis = System .currentTimeMillis ();
145+ emptyResponse = true ;
146+ } else {
147+ lastEmptyReponseMillis = System .currentTimeMillis ();
148+ if (lastEmptyReponseMillis - firstEmptyResponseMillis > EMPTY_STREAM_TIMEOUT ) {
149+ throw new EmptyStreamTimeoutException (String .format ("Stream was empty for %d seconds consecutively" , EMPTY_STREAM_TIMEOUT ));
150+ }
151+ }
132152 continue ;
133153 }
154+ emptyResponse = false ;
134155 try {
135156 rawTweets .put (line );
136- } catch (Exception interExcep ) {
137- interExcep . printStackTrace ();
157+ } catch (Exception ignore ) {
158+
138159 }
139160 }
140161 } catch (Exception e ) {
141- e . printStackTrace ( );
162+ System . out . println ( "Something went wrong. Closing stream... " + e . getMessage () );
142163 shutdown ();
143164 }
144165 }
@@ -156,15 +177,14 @@ private DeserializeTweetsTask() {
156177 public void run () {
157178 while (isRunning ) {
158179 try {
159- String rawTweet = rawTweets .take ();
180+ String rawTweet = rawTweets .poll (POLL_WAIT , TimeUnit .MILLISECONDS );
181+ if (rawTweet == null ) continue ;
160182 StreamingTweet tweet = objectMapper .readValue (rawTweet , StreamingTweet .class );
161183 tweets .put (tweet );
162184 } catch (InterruptedException e ) {
163- System .out .println ("Fail 1" );
164- } catch (JsonMappingException e ) {
165- System .out .println ("Fail 2" );
185+
166186 } catch (JsonProcessingException e ) {
167- System .out .println ("Fail 3 " );
187+ System .out .println ("debug log here " );
168188 }
169189 }
170190 }
@@ -181,9 +201,10 @@ private void processTweets() {
181201
182202 while (isRunning ) {
183203 try {
184- streamingTweet = tweets .take ();
204+ streamingTweet = tweets .poll (POLL_WAIT , TimeUnit .MILLISECONDS );
205+ if (streamingTweet == null ) continue ;
185206 for (TweetsStreamListener listener : listeners ) {
186- listener .actionOnTweetsStream (streamingTweet );
207+ listener .onTweetArrival (streamingTweet );
187208 }
188209 tweetsCount ++;
189210 if (tweetsCount == tweetsLimit ) {
@@ -194,7 +215,7 @@ private void processTweets() {
194215 shutdown ();
195216 }
196217 } catch (InterruptedException e ) {
197- System . out . println ( "processTweets: Fail 1" );
218+
198219 }
199220
200221 }
0 commit comments