2222"""
2323
2424
25+ def get_storage_content (conn , instance_name , storage_key ):
26+ if instance_name == "etcd" :
27+ content , _ = conn .get (storage_key )
28+ return content .decode ("utf-8" ) if content else None
29+ content = conn .call ("config.storage.get" , storage_key )
30+ if len (content ) > 0 :
31+ return content [0 ]["data" ][0 ]["value" ]
32+ return None
33+
34+
35+ def build_worker_url (
36+ instance ,
37+ instance_name ,
38+ prefix = "prefix" ,
39+ host = "host1" ,
40+ worker = "worker1" ,
41+ timeout = 5 ,
42+ ):
43+ creds = (
44+ f"{ instance .connection_username } :{ instance .connection_password } @"
45+ if instance_name == "tcs"
46+ else ""
47+ )
48+ return (
49+ f"http://{ creds } { instance .host } :{ instance .port } /{ prefix } /{ host } /{ worker } ?timeout={ timeout } "
50+ )
51+
52+
53+ def write_worker_cfg (tmpdir , cfg = worker_cfg , filename = "worker.yaml" ):
54+ path = os .path .join (tmpdir , filename )
55+ with open (path , "w" ) as f :
56+ f .write (cfg )
57+ return path
58+
59+
60+ def run_tt_cmd (tt_cmd , args , cwd , env = None ):
61+ proc = subprocess .Popen (
62+ [tt_cmd , * args ],
63+ cwd = cwd ,
64+ env = env ,
65+ stderr = subprocess .STDOUT ,
66+ stdout = subprocess .PIPE ,
67+ text = True ,
68+ )
69+ return proc .stdout .read ()
70+
71+
2572def test_cluster_worker_help (tt_cmd , tmp_path ):
2673 help_cmd = [tt_cmd , "cluster" , "worker" , "--help" ]
2774 instance_process = subprocess .Popen (
@@ -291,44 +338,16 @@ def test_cluster_worker_publish_connection_failed(tt_cmd, tmpdir_with_cfg):
291338def test_cluster_worker_publish (tt_cmd , tmpdir_with_cfg , instance_name , request ):
292339 instance = request .getfixturevalue (instance_name )
293340 tmpdir = tmpdir_with_cfg
294- worker_cfg_path = os .path .join (tmpdir , "worker.yaml" )
295- with open (worker_cfg_path , "w" ) as f :
296- f .write (worker_cfg )
341+ write_worker_cfg (tmpdir )
297342
298343 conn = instance .conn ()
299- creds = (
300- f"{ instance .connection_username } :{ instance .connection_password } @"
301- if instance_name == "tcs"
302- else ""
303- )
304- publish_cmd = [
305- tt_cmd ,
306- "cluster" ,
307- "worker" ,
308- "publish" ,
309- "http://" + creds + f"{ instance .host } :{ instance .port } /prefix/host1/worker1?timeout=5" ,
310- "worker.yaml" ,
311- ]
312- instance_process = subprocess .Popen (
313- publish_cmd ,
314- cwd = tmpdir ,
315- stderr = subprocess .STDOUT ,
316- stdout = subprocess .PIPE ,
317- text = True ,
318- )
319- publish_output = instance_process .stdout .read ()
344+ url = build_worker_url (instance , instance_name )
345+ output = run_tt_cmd (tt_cmd , ["cluster" , "worker" , "publish" , url , "worker.yaml" ], tmpdir )
320346
321- assert "" == publish_output
347+ assert "" == output
322348
323- content = ""
324349 storage_key = "/prefix/instances/host1/worker1"
325- if instance_name == "etcd" :
326- content , _ = conn .get (storage_key )
327- content = content .decode ("utf-8" )
328- else :
329- content = conn .call ("config.storage.get" , storage_key )
330- if len (content ) > 0 :
331- content = content [0 ]["data" ][0 ]["value" ]
350+ content = get_storage_content (conn , instance_name , storage_key )
332351
333352 assert worker_cfg == content
334353
@@ -337,46 +356,16 @@ def test_cluster_worker_publish(tt_cmd, tmpdir_with_cfg, instance_name, request)
337356def test_cluster_worker_publish_nested_prefix (tt_cmd , tmpdir_with_cfg , instance_name , request ):
338357 instance = request .getfixturevalue (instance_name )
339358 tmpdir = tmpdir_with_cfg
340- worker_cfg_path = os .path .join (tmpdir , "worker.yaml" )
341- with open (worker_cfg_path , "w" ) as f :
342- f .write (worker_cfg )
359+ write_worker_cfg (tmpdir )
343360
344361 conn = instance .conn ()
345- creds = (
346- f"{ instance .connection_username } :{ instance .connection_password } @"
347- if instance_name == "tcs"
348- else ""
349- )
350- publish_cmd = [
351- tt_cmd ,
352- "cluster" ,
353- "worker" ,
354- "publish" ,
355- "http://"
356- + creds
357- + f"{ instance .host } :{ instance .port } /tdb-workers/cluster1/host1/worker1?timeout=5" ,
358- "worker.yaml" ,
359- ]
360- instance_process = subprocess .Popen (
361- publish_cmd ,
362- cwd = tmpdir ,
363- stderr = subprocess .STDOUT ,
364- stdout = subprocess .PIPE ,
365- text = True ,
366- )
367- publish_output = instance_process .stdout .read ()
362+ url = build_worker_url (instance , instance_name , prefix = "tdb-workers/cluster1" )
363+ output = run_tt_cmd (tt_cmd , ["cluster" , "worker" , "publish" , url , "worker.yaml" ], tmpdir )
368364
369- assert "" == publish_output
365+ assert "" == output
370366
371- content = ""
372367 storage_key = "/tdb-workers/cluster1/instances/host1/worker1"
373- if instance_name == "etcd" :
374- content , _ = conn .get (storage_key )
375- content = content .decode ("utf-8" )
376- else :
377- content = conn .call ("config.storage.get" , storage_key )
378- if len (content ) > 0 :
379- content = content [0 ]["data" ][0 ]["value" ]
368+ content = get_storage_content (conn , instance_name , storage_key )
380369
381370 assert worker_cfg == content
382371
@@ -385,58 +374,27 @@ def test_cluster_worker_publish_nested_prefix(tt_cmd, tmpdir_with_cfg, instance_
385374def test_cluster_worker_publish_exists_no_force (tt_cmd , tmpdir_with_cfg , instance_name , request ):
386375 instance = request .getfixturevalue (instance_name )
387376 tmpdir = tmpdir_with_cfg
388- worker_cfg_path = os .path .join (tmpdir , "worker.yaml" )
389- with open (worker_cfg_path , "w" ) as f :
390- f .write (worker_cfg )
377+ write_worker_cfg (tmpdir )
391378
392379 conn = instance .conn ()
393- creds = (
394- f"{ instance .connection_username } :{ instance .connection_password } @"
395- if instance_name == "tcs"
396- else ""
397- )
398- url = "http://" + creds + f"{ instance .host } :{ instance .port } /prefix/host1/worker1?timeout=5"
380+ url = build_worker_url (instance , instance_name )
399381
400- publish_cmd = [tt_cmd , "cluster" , "worker" , "publish" , url , "worker.yaml" ]
401- instance_process = subprocess .Popen (
402- publish_cmd ,
403- cwd = tmpdir ,
404- stderr = subprocess .STDOUT ,
405- stdout = subprocess .PIPE ,
406- text = True ,
407- )
408- publish_output = instance_process .stdout .read ()
409- assert "" == publish_output
382+ output = run_tt_cmd (tt_cmd , ["cluster" , "worker" , "publish" , url , "worker.yaml" ], tmpdir )
383+ assert "" == output
410384
411- with open (worker_cfg_path , "w" ) as f :
412- f .write (worker_cfg_updated )
385+ write_worker_cfg (tmpdir , worker_cfg_updated )
413386
414- publish_cmd = [tt_cmd , "cluster" , "worker" , "publish" , url , "worker.yaml" ]
415- instance_process = subprocess .Popen (
416- publish_cmd ,
417- cwd = tmpdir ,
418- stderr = subprocess .STDOUT ,
419- stdout = subprocess .PIPE ,
420- text = True ,
421- )
422- publish_output = instance_process .stdout .read ()
387+ output = run_tt_cmd (tt_cmd , ["cluster" , "worker" , "publish" , url , "worker.yaml" ], tmpdir )
423388
424- assert publish_output == (
389+ assert output == (
425390 " ⨯ failed to publish worker configuration:"
426391 " worker configuration already exists at"
427392 ' "/prefix/instances/host1/worker1",'
428393 " use --force to overwrite\n "
429394 )
430395
431- content = ""
432396 storage_key = "/prefix/instances/host1/worker1"
433- if instance_name == "etcd" :
434- content , _ = conn .get (storage_key )
435- content = content .decode ("utf-8" )
436- else :
437- content = conn .call ("config.storage.get" , storage_key )
438- if len (content ) > 0 :
439- content = content [0 ]["data" ][0 ]["value" ]
397+ content = get_storage_content (conn , instance_name , storage_key )
440398
441399 assert worker_cfg == content
442400
@@ -445,52 +403,25 @@ def test_cluster_worker_publish_exists_no_force(tt_cmd, tmpdir_with_cfg, instanc
445403def test_cluster_worker_publish_force_overwrite (tt_cmd , tmpdir_with_cfg , instance_name , request ):
446404 instance = request .getfixturevalue (instance_name )
447405 tmpdir = tmpdir_with_cfg
448- worker_cfg_path = os .path .join (tmpdir , "worker.yaml" )
449- with open (worker_cfg_path , "w" ) as f :
450- f .write (worker_cfg )
406+ write_worker_cfg (tmpdir )
451407
452408 conn = instance .conn ()
453- creds = (
454- f"{ instance .connection_username } :{ instance .connection_password } @"
455- if instance_name == "tcs"
456- else ""
457- )
458- url = "http://" + creds + f"{ instance .host } :{ instance .port } /prefix/host1/worker1?timeout=5"
409+ url = build_worker_url (instance , instance_name )
459410
460- publish_cmd = [tt_cmd , "cluster" , "worker" , "publish" , url , "worker.yaml" ]
461- instance_process = subprocess .Popen (
462- publish_cmd ,
463- cwd = tmpdir ,
464- stderr = subprocess .STDOUT ,
465- stdout = subprocess .PIPE ,
466- text = True ,
467- )
468- publish_output = instance_process .stdout .read ()
469- assert "" == publish_output
411+ output = run_tt_cmd (tt_cmd , ["cluster" , "worker" , "publish" , url , "worker.yaml" ], tmpdir )
412+ assert "" == output
470413
471- with open (worker_cfg_path , "w" ) as f :
472- f .write (worker_cfg_updated )
414+ write_worker_cfg (tmpdir , cfg = worker_cfg_updated )
473415
474- publish_cmd = [tt_cmd , "cluster" , "worker" , "publish" , "--force" , url , "worker.yaml" ]
475- instance_process = subprocess .Popen (
476- publish_cmd ,
477- cwd = tmpdir ,
478- stderr = subprocess .STDOUT ,
479- stdout = subprocess .PIPE ,
480- text = True ,
416+ output = run_tt_cmd (
417+ tt_cmd ,
418+ ["cluster" , "worker" , "publish" , "--force" , url , "worker.yaml" ],
419+ tmpdir ,
481420 )
482- publish_output = instance_process .stdout .read ()
483- assert "" == publish_output
421+ assert "" == output
484422
485- content = ""
486423 storage_key = "/prefix/instances/host1/worker1"
487- if instance_name == "etcd" :
488- content , _ = conn .get (storage_key )
489- content = content .decode ("utf-8" )
490- else :
491- content = conn .call ("config.storage.get" , storage_key )
492- if len (content ) > 0 :
493- content = content [0 ]["data" ][0 ]["value" ]
424+ content = get_storage_content (conn , instance_name , storage_key )
494425
495426 assert worker_cfg_updated == content
496427
@@ -499,38 +430,20 @@ def test_cluster_worker_publish_force_overwrite(tt_cmd, tmpdir_with_cfg, instanc
499430def test_cluster_worker_publish_force_new_key (tt_cmd , tmpdir_with_cfg , instance_name , request ):
500431 instance = request .getfixturevalue (instance_name )
501432 tmpdir = tmpdir_with_cfg
502- worker_cfg_path = os .path .join (tmpdir , "worker.yaml" )
503- with open (worker_cfg_path , "w" ) as f :
504- f .write (worker_cfg )
433+ write_worker_cfg (tmpdir )
505434
506435 conn = instance .conn ()
507- creds = (
508- f"{ instance .connection_username } :{ instance .connection_password } @"
509- if instance_name == "tcs"
510- else ""
511- )
512- url = "http://" + creds + f"{ instance .host } :{ instance .port } /prefix/host1/worker1?timeout=5"
436+ url = build_worker_url (instance , instance_name )
513437
514- publish_cmd = [tt_cmd , "cluster" , "worker" , "publish" , "--force" , url , "worker.yaml" ]
515- instance_process = subprocess .Popen (
516- publish_cmd ,
517- cwd = tmpdir ,
518- stderr = subprocess .STDOUT ,
519- stdout = subprocess .PIPE ,
520- text = True ,
438+ output = run_tt_cmd (
439+ tt_cmd ,
440+ ["cluster" , "worker" , "publish" , "--force" , url , "worker.yaml" ],
441+ tmpdir ,
521442 )
522- publish_output = instance_process .stdout .read ()
523- assert "" == publish_output
443+ assert "" == output
524444
525- content = ""
526445 storage_key = "/prefix/instances/host1/worker1"
527- if instance_name == "etcd" :
528- content , _ = conn .get (storage_key )
529- content = content .decode ("utf-8" )
530- else :
531- content = conn .call ("config.storage.get" , storage_key )
532- if len (content ) > 0 :
533- content = content [0 ]["data" ][0 ]["value" ]
446+ content = get_storage_content (conn , instance_name , storage_key )
534447
535448 assert worker_cfg == content
536449
0 commit comments