3434
3535import com .fasterxml .jackson .core .JsonProcessingException ;
3636import com .fasterxml .jackson .databind .DeserializationFeature ;
37- import com .fasterxml .jackson .databind .JsonMappingException ;
3837import com .fasterxml .jackson .databind .ObjectMapper ;
3938
4039import com .twitter .clientlib .exceptions .EmptyStreamTimeoutException ;
4140import com .twitter .clientlib .model .StreamingTweet ;
4241import okio .BufferedSource ;
42+ import org .slf4j .Logger ;
43+ import org .slf4j .LoggerFactory ;
4344
4445public class TweetsStreamExecutor {
4546
47+ private static final Logger logger = LoggerFactory .getLogger (TweetsStreamExecutor .class );
48+
4649 private static final long EMPTY_STREAM_TIMEOUT = 20000 ;
4750 private static final int POLL_WAIT = 5 ;
4851 private volatile BlockingQueue <String > rawTweets ;
@@ -75,7 +78,7 @@ public void removeListener(TweetsStreamListener toRemove) {
7578
7679 public void start () {
7780 if (stream == null ) {
78- System . out . println ( "Error: stream is null." );
81+ logger . error ( "Stream is null. Exiting.. ." );
7982 return ;
8083 }
8184 startTime = System .currentTimeMillis ();
@@ -91,6 +94,7 @@ public void start() {
9194 }
9295
9396 public synchronized void shutdown () {
97+ logger .info ("TweetsStreamListenersExecutor is shutting down." );
9498 isRunning = false ;
9599 shutDownServices ();
96100 try {
@@ -102,7 +106,6 @@ public synchronized void shutdown() {
102106 } catch (IOException e ) {
103107
104108 }
105- System .out .println ("TweetsStreamListenersExecutor is shutting down." );
106109 }
107110
108111 private void shutDownServices () {
@@ -120,12 +123,14 @@ private void terminateService(ExecutorService executorService) throws Interrupte
120123 if (!executorService .awaitTermination (1500 , TimeUnit .MILLISECONDS )) {
121124 executorService .shutdownNow ();
122125 if (!executorService .awaitTermination (1500 , TimeUnit .MILLISECONDS ))
123- System . err . println ( "Pool did not terminate" );
126+ logger . error ( "Thread pool did not terminate" );
124127 }
125128 }
126129
127130 private class RawTweetsQueuer implements Runnable {
128131
132+ private final Logger logger = LoggerFactory .getLogger (RawTweetsQueuer .class );
133+
129134 @ Override
130135 public void run () {
131136 queueTweets ();
@@ -159,13 +164,15 @@ public void queueTweets() {
159164 }
160165 }
161166 } catch (Exception e ) {
162- System . out . println ("Something went wrong. Closing stream... " + e .getMessage ());
167+ logger . error ("Something went wrong. Closing stream... {}" , e .getMessage ());
163168 shutdown ();
164169 }
165170 }
166171 }
167172
168173 private class DeserializeTweetsTask implements Runnable {
174+
175+ private final Logger logger = LoggerFactory .getLogger (DeserializeTweetsTask .class );
169176 private final ObjectMapper objectMapper ;
170177
171178 private DeserializeTweetsTask () {
@@ -181,16 +188,18 @@ public void run() {
181188 if (rawTweet == null ) continue ;
182189 StreamingTweet tweet = objectMapper .readValue (rawTweet , StreamingTweet .class );
183190 tweets .put (tweet );
184- } catch (InterruptedException e ) {
191+ } catch (InterruptedException ignore ) {
185192
186193 } catch (JsonProcessingException e ) {
187- System . out . println ( "debug log here " );
194+ logger . debug ( "Json could not be parsed " );
188195 }
189196 }
190197 }
191198 }
192199
193200 private class TweetsListenersTask implements Runnable {
201+
202+ private final Logger logger = LoggerFactory .getLogger (TweetsListenersTask .class );
194203 @ Override
195204 public void run () {
196205 processTweets ();
@@ -211,7 +220,7 @@ private void processTweets() {
211220 long stopTime = System .currentTimeMillis ();
212221 long durationInMillis = stopTime - startTime ;
213222 double seconds = durationInMillis / 1000.0 ;
214- System . out . println ("Total duration in seconds: " + seconds );
223+ logger . info ("Total duration in seconds: {}" , seconds );
215224 shutdown ();
216225 }
217226 } catch (InterruptedException e ) {
0 commit comments