Skip to content

Commit 82e21c4

Browse files
committed
env.py: rewrite client cluster handling
Now that the regular client handles clusters, this profile saving client can be a bit simpler
1 parent 9a9f9f3 commit 82e21c4

File tree

2 files changed

+71
-123
lines changed

2 files changed

+71
-123
lines changed

cloudify_cli/commands/init.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,6 @@ def init_local_profile(reset_context=False,
126126
if reset_context:
127127
if hard:
128128
os.remove(config.CLOUDIFY_CONFIG_PATH)
129-
# else:
130-
# TODO: Is this check necessary?
131-
# _raise_initialized_error('local')
132129

133130
_create_profiles_dir_and_config(hard, enable_colors)
134131
logger.info('Initialization completed successfully')

cloudify_cli/env.py

Lines changed: 71 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@
1111
import yaml
1212
import requests
1313

14-
from cloudify_rest_client import CloudifyClient
15-
from cloudify_rest_client.client import HTTPClient
14+
from cloudify_rest_client.client import CloudifyClient, HTTPClient
1615
from cloudify.cluster_status import CloudifyNodeType
1716
from cloudify_rest_client.exceptions import CloudifyClientError
1817
from cloudify.utils import ipv6_url_compat
1918

2019
from cloudify_cli import constants
2120
from cloudify_cli.exceptions import CloudifyCliError
21+
try:
22+
from cloudify_async_client.client import AsyncCloudifyClient
23+
except ImportError as e:
24+
AsyncCloudifyClient = None
25+
async_import_error = e
26+
else:
27+
async_import_error = None
2228

2329

2430
_ENV_NAME = 'manager'
@@ -300,18 +306,21 @@ def is_cluster(client_profile=None):
300306
client_profile.cluster.get(CloudifyNodeType.MANAGER))
301307

302308

303-
def get_rest_client(client_profile=None,
304-
rest_host=None,
305-
rest_port=None,
306-
rest_protocol=None,
307-
rest_cert=None,
308-
username=None,
309-
password=None,
310-
tenant_name=None,
311-
trust_all=False,
312-
cluster=None,
313-
kerberos_env=None,
314-
token=None):
309+
def get_rest_client(
310+
client_profile=None,
311+
rest_host=None,
312+
rest_port=None,
313+
rest_protocol=None,
314+
rest_cert=None,
315+
username=None,
316+
password=None,
317+
tenant_name=None,
318+
trust_all=False,
319+
cluster=None,
320+
kerberos_env=None,
321+
token=None,
322+
async_client=False,
323+
):
315324
if client_profile is None:
316325
client_profile = profile
317326
assert_credentials_set(client_profile)
@@ -323,8 +332,17 @@ def get_rest_client(client_profile=None,
323332
kerberos_env = kerberos_env \
324333
if kerberos_env is not None else client_profile.kerberos_env
325334

335+
if get_target_manager():
336+
rest_host = get_target_manager()
337+
elif is_cluster(client_profile):
338+
rest_host = [
339+
node.get('host_ip') or node.get('hostname')
340+
for node in client_profile.cluster.get(CloudifyNodeType.MANAGER)
341+
]
342+
rest_host = rest_host or client_profile.manager_ip
343+
326344
kwargs = {
327-
'host': rest_host or client_profile.manager_ip,
345+
'host': rest_host,
328346
'port': rest_port or client_profile.rest_port,
329347
'protocol': rest_protocol or client_profile.rest_protocol,
330348
'cert': rest_cert or get_ssl_cert(client_profile),
@@ -341,12 +359,14 @@ def get_rest_client(client_profile=None,
341359
kwargs['password'] = password
342360
kwargs['headers'].update(get_auth_header(username, password))
343361

344-
if cluster:
345-
kwargs['profile'] = client_profile
346-
client = CloudifyClusterClient(**kwargs)
347-
else:
348-
client = CloudifyClient(**kwargs)
349-
return client
362+
client_cls = ProfileSavingClusterClient
363+
if async_client:
364+
if AsyncCloudifyClient is None:
365+
raise RuntimeError(
366+
f'Async client not available: {async_import_error}')
367+
client_cls = AsyncCloudifyClient
368+
369+
return client_cls(**kwargs)
350370

351371

352372
def build_manager_host_string(ssh_user='', ip=''):
@@ -539,6 +559,7 @@ def get_auth_header(username, password):
539559
return header
540560

541561

562+
542563
# attributes that can differ for each node in a cluster. Those will be updated
543564
# in the profile when we switch to a new master.
544565
# Dicts with these keys live in profile.cluster, and are added there during
@@ -553,118 +574,48 @@ def get_auth_header(username, password):
553574
_TRY_NEXT_NODE = object()
554575

555576

556-
class ClusterHTTPClient(HTTPClient):
557-
577+
class ProfileSavingHTTPClient(HTTPClient):
558578
def __init__(self, *args, **kwargs):
559-
profile = kwargs.pop('profile')
560-
super(ClusterHTTPClient, self).__init__(*args, **kwargs)
561-
if not profile.cluster:
562-
raise ValueError('Cluster client invoked for an empty cluster!')
563-
self._cluster = list(profile.cluster.get(CloudifyNodeType.MANAGER))
564-
self._profile = profile
565-
first_node = self._cluster[0]
566-
self.cert = first_node.get('cert') or self.cert
567-
self.trust_all = first_node.get('trust_all') or self.trust_all
568-
self.default_timeout_sec = self.default_timeout_sec or (5, None)
569-
570-
def do_request(self, *args, **kwargs):
571-
# this request can be retried for each manager - if the data is
572-
# a generator, we need to copy it, so we can send it more than once
573-
copied_data = None
574-
if isinstance(kwargs.get('data'), types.GeneratorType):
575-
copied_data = itertools.tee(kwargs.pop('data'),
576-
len(self._cluster) + 1)
577-
578-
if kwargs.get('timeout') is None:
579-
kwargs['timeout'] = self.default_timeout_sec
580-
581-
if copied_data is not None:
582-
kwargs['data'] = copied_data[-1]
583-
584-
manager_host = get_target_manager()
585-
if manager_host:
586-
self.host = ipv6_url_compat(manager_host)
587-
return super(ClusterHTTPClient, self).do_request(*args, **kwargs)
588-
589-
# First try with the main manager ip given when creating the profile
590-
# with `cfy profiles use`
591-
self.host = ipv6_url_compat(self._profile.manager_ip)
592-
response = self._try_do_request(*args, **kwargs)
593-
if response is not _TRY_NEXT_NODE:
594-
return response
595-
596-
for node_index, node in list(enumerate(
597-
self._profile.cluster[CloudifyNodeType.MANAGER])):
598-
if self._profile.manager_ip in [node['host_ip'], node['hostname']]:
599-
continue
600-
self._use_node(node)
601-
if copied_data is not None:
602-
kwargs['data'] = copied_data[node_index]
603-
604-
response = self._try_do_request(*args, **kwargs)
605-
if response is _TRY_NEXT_NODE:
606-
continue
607-
return response
608-
609-
raise CloudifyClientError('All cluster nodes are offline')
610-
611-
def _try_do_request(self, *args, **kwargs):
612-
try:
613-
return super(ClusterHTTPClient, self).do_request(*args,
614-
**kwargs)
615-
except (requests.exceptions.ConnectionError,
616-
CloudifyClientError) as e:
617-
if isinstance(e, CloudifyClientError) and e.status_code != 502:
618-
raise
619-
self.logger.warning('Could not connect to manager %s on port %s',
620-
self.host, self.port)
621-
self.logger.debug(str(e))
622-
return _TRY_NEXT_NODE
579+
super().__init__(*args, **kwargs)
580+
self._last_tried_host = None
623581

624-
def _use_node(self, node):
625-
if ipv6_url_compat(node['host_ip']) == self.host:
626-
return
627-
self.host = ipv6_url_compat(node['host_ip'])
628-
for attr in ['rest_port', 'rest_protocol', 'trust_all', 'cert']:
629-
new_value = node.get(attr)
630-
if new_value:
631-
setattr(self, attr, new_value)
632-
self._update_profile(node)
633-
634-
def _update_profile(self, node):
582+
def get_host(self):
583+
host = super().get_host()
584+
self._last_tried_host = host
585+
return host
586+
587+
def process_response(self, *args, **kwargs):
588+
if self._last_tried_host is not None:
589+
self._update_profile(self._last_tried_host)
590+
self._last_tried_host = None
591+
return super().process_response(*args, **kwargs)
592+
593+
def _update_profile(self, target_ip):
635594
"""
636595
Put the node at the start of the cluster list in profile.
637-
638596
The client tries nodes in the order of the cluster list, so putting
639597
the node first will make the client try it first next time. This makes
640598
the client always try the last-known-active-manager first.
641599
"""
642-
self._profile.cluster[CloudifyNodeType.MANAGER].remove(node)
643-
self._profile.cluster[CloudifyNodeType.MANAGER] = (
644-
[node] + self._profile.cluster[CloudifyNodeType.MANAGER])
600+
node = None
601+
for cluster_member in profile.cluster[CloudifyNodeType.MANAGER]:
602+
if cluster_member['host_ip'] == target_ip:
603+
node = cluster_member
604+
break
605+
if node is None:
606+
return
607+
profile.cluster[CloudifyNodeType.MANAGER].remove(node)
608+
profile.cluster[CloudifyNodeType.MANAGER] = (
609+
[node] + profile.cluster[CloudifyNodeType.MANAGER])
645610
for node_attr in CLUSTER_NODE_ATTRS:
646611
if node_attr in node:
647-
setattr(self._profile, node_attr, node[node_attr])
648-
self._profile.save()
649-
612+
setattr(profile, node_attr, node[node_attr])
613+
profile.save()
650614

651-
class CloudifyClusterClient(CloudifyClient):
652-
"""
653-
A CloudifyClient that will retry the queries with the current manager.
654-
655-
When a request fails with a connection error, this will keep trying with
656-
every node in the cluster, until it finds an active manager.
657-
658-
When an active manager is found, the profile will be updated with its
659-
address.
660-
"""
661-
def __init__(self, profile, *args, **kwargs):
662-
self._profile = profile
663-
super(CloudifyClusterClient, self).__init__(*args, **kwargs)
664615

616+
class ProfileSavingClusterClient(CloudifyClient):
665617
def client_class(self, *args, **kwargs):
666-
kwargs.setdefault('profile', self._profile)
667-
return ClusterHTTPClient(*args, **kwargs)
618+
return ProfileSavingHTTPClient(*args, **kwargs)
668619

669620

670621
profile = get_profile_context(suppress_error=True)

0 commit comments

Comments
 (0)