3131import tarfile
3232import io
3333import threading
34+ import logging
3435
3536try :
3637 from urllib .request import urlopen
3738except ImportError :
3839 from urllib import urlopen
39- from urllib3 .exceptions import MaxRetryError
40+
41+
42+ log = logging .getLogger (__file__ )
4043
4144
4245CRATE_CONFIG_ERROR = 'crate_config must point to a folder or to a file named "crate.yml"'
@@ -86,6 +89,38 @@ def wait_for_http_url(log, timeout=30, verbose=False):
8689 elif elapsed > timeout :
8790 return None
8891
92+
93+ class OutputMonitor :
94+
95+ def __init__ (self ):
96+ self .consumers = []
97+
98+ def consume (self , iterable ):
99+ for line in iterable :
100+ for consumer in self .consumers :
101+ consumer .send (line )
102+
103+ def start (self , proc ):
104+ self ._stop_out_thread = threading .Event ()
105+ self ._out_thread = threading .Thread (target = self .consume , args = (proc .stdout ,))
106+ self ._out_thread .daemon = True
107+ self ._out_thread .start ()
108+
109+ def stop (self ):
110+ if self ._out_thread is not None :
111+ self ._stop_out_thread .set ()
112+ self ._out_thread .join ()
113+
114+
115+ class LineBuffer :
116+
117+ def __init__ (self ):
118+ self .lines = []
119+
120+ def send (self , line ):
121+ self .lines .append (line .strip ())
122+
123+
89124class CrateLayer (object ):
90125 """
91126 This layer starts a Crate server.
@@ -145,6 +180,7 @@ def new_teardown(*args, **kws):
145180 layer .tearDown = new_teardown
146181 return layer
147182
183+
148184 def __init__ (self ,
149185 name ,
150186 crate_home ,
@@ -188,7 +224,6 @@ def __init__(self,
188224 self .env = env or {}
189225 self .env .setdefault ('CRATE_USE_IPV4' , 'true' )
190226 self ._stdout_consumers = []
191- self ._stop_out_thread = threading .Event ()
192227
193228 crate_home = os .path .abspath (crate_home )
194229 if crate_exec is None :
@@ -272,11 +307,8 @@ def start(self):
272307 # this is necessary if no static port is assigned
273308 self .http_url = wait_for_http_url (self .process .stdout , verbose = self .verbose )
274309
275- # start a process stdout consumer to prevent the stdout buffer from
276- # filling up, which would cause the process to block
277- self ._out_thread = threading .Thread (target = self ._consume , args = (self .process .stdout ,))
278- self ._out_thread .daemon = True
279- self ._out_thread .start ()
310+ self .monitor = OutputMonitor ()
311+ self .monitor .start (self .process )
280312
281313 if not self .http_url :
282314 self .stop ()
@@ -286,19 +318,12 @@ def start(self):
286318 self ._wait_for_master ()
287319 sys .stderr .write ('\n Crate instance ready.\n ' )
288320
289- def _consume (self , iterable ):
290- for line in iterable :
291- if not self ._stop_out_thread .isSet ():
292- for consumer in self ._stdout_consumers :
293- consumer .send (line )
294321
295322 def stop (self ):
296323 if self .process :
297324 self .process .terminate ()
298325 self .process = None
299- if self ._out_thread is not None :
300- self ._stop_out_thread .set ()
301- self ._out_thread .join ()
326+ self .monitor .stop ()
302327 self ._clean ()
303328
304329 def tearDown (self ):
@@ -307,6 +332,9 @@ def tearDown(self):
307332 def _wait_for (self , validator ):
308333 start = time .time ()
309334
335+ line_buf = LineBuffer ()
336+ self .monitor .consumers .append (line_buf )
337+
310338 while True :
311339 wait_time = time .time () - start
312340 try :
@@ -317,11 +345,16 @@ def _wait_for(self, validator):
317345 raise e
318346
319347 if wait_time > 30 :
348+ for line in line_buf .lines :
349+ log .error (line )
320350 raise SystemError ('Failed to start Crate instance in time.' )
321351 else :
322352 sys .stderr .write ('.' )
323353 time .sleep (self .wait_interval )
324354
355+ self .monitor .consumers .remove (line_buf )
356+ line_buf = None
357+
325358 def _wait_for_start (self ):
326359 """Wait for instance to be started"""
327360
0 commit comments