-
Notifications
You must be signed in to change notification settings - Fork 98
Expand file tree
/
Copy pathclient.py
More file actions
80 lines (56 loc) · 2.93 KB
/
client.py
File metadata and controls
80 lines (56 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""OData Client Implementation"""
import logging
import warnings
import pyodata.v2.model
import pyodata.v2.service
from pyodata.exceptions import PyODataException, HttpError
def _fetch_metadata(connection, url, logger):
# download metadata
logger.info('Fetching metadata')
resp = connection.get(url + '$metadata')
logger.debug('Retrieved the response:\n%s\n%s',
'\n'.join((f'H: {key}: {value}' for key, value in resp.headers.items())),
resp.content)
if resp.status_code != 200:
# ANNOT_EX: fatal runtime communication initialization error on service side
raise HttpError(
'Metadata request failed, status code: {}, body:\n{}'.format(resp.status_code, resp.content), resp)
mime_type = resp.headers['content-type']
if not any((typ in ['application/xml', 'text/xml'] for typ in mime_type.split(';'))):
# ANNOT_EX: fatal runtime communication initialization error on service side
raise HttpError(
'Metadata request did not return XML, MIME type: {}, body:\n{}'.format(mime_type, resp.content),
resp)
return resp.content
class Client:
"""OData service client"""
# pylint: disable=too-few-public-methods
ODATA_VERSION_2 = 2
def __new__(cls, url, connection, odata_version=ODATA_VERSION_2, namespaces=None,
config: pyodata.v2.model.Config = None, metadata: str = None):
"""Create instance of the OData Client for given URL"""
logger = logging.getLogger('pyodata.client')
if odata_version == Client.ODATA_VERSION_2:
# sanitize url
url = url.rstrip('/') + '/'
if metadata is None:
metadata = _fetch_metadata(connection, url, logger)
else:
logger.info('Using static metadata')
if config is not None and namespaces is not None:
# ANNOT_EX: fatal invalid usage error
raise PyODataException('You cannot pass namespaces and config at the same time')
if config is None:
config = pyodata.v2.model.Config()
if namespaces is not None:
warnings.warn("Passing namespaces directly is deprecated. Use class Config instead", DeprecationWarning)
config.namespaces = namespaces
# create model instance from received metadata
logger.info('Creating OData Schema (version: %d)', odata_version)
schema = pyodata.v2.model.MetadataBuilder(metadata, config=config).build()
# create service instance based on model we have
logger.info('Creating OData Service (version: %d)', odata_version)
service = pyodata.v2.service.Service(url, schema, connection)
return service
# ANNOT_EX: fatal runtime initialization error on pyodata side
raise PyODataException('No implementation for selected odata version {}'.format(odata_version))