-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathgraphql_client.py
More file actions
118 lines (101 loc) · 3.56 KB
/
graphql_client.py
File metadata and controls
118 lines (101 loc) · 3.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""Module containing graphQL client."""
import json
import logging
from typing import Any, Callable
import aiohttp
import requests
import websockets
from requests import Session
class GraphqlClient:
"""Class which represents the interface to make graphQL requests through."""
def __init__(self, endpoint: str, headers: dict = {}, **kwargs: Any):
"""Insantiate the client."""
self.logger = logging.getLogger(__name__)
self.endpoint = endpoint
self.headers = headers
self.options = kwargs
def __request_body(
self, query: str, variables: dict = None, operation_name: str = None
) -> dict:
json = {"query": query}
if variables:
json["variables"] = variables
if operation_name:
json["operationName"] = operation_name
return json
def execute(
self,
query: str,
variables: dict = None,
operation_name: str = None,
headers: dict = {},
session: Session = None,
**kwargs: Any,
):
"""Make synchronous request to graphQL server."""
request_body = self.__request_body(
query=query, variables=variables, operation_name=operation_name
)
post_method = requests.post
if session:
post_method = session.post
result = post_method(
self.endpoint,
json=request_body,
headers={**self.headers, **headers},
**{**self.options, **kwargs},
)
result.raise_for_status()
return result.json()
async def execute_async(
self,
query: str,
variables: dict = None,
operation_name: str = None,
headers: dict = {},
):
"""Make asynchronous request to graphQL server."""
request_body = self.__request_body(
query=query, variables=variables, operation_name=operation_name
)
async with aiohttp.ClientSession() as session:
async with session.post(
self.endpoint,
json=request_body,
headers={**self.headers, **headers},
) as response:
return await response.json()
async def subscribe(
self,
query: str,
handle: Callable,
variables: dict = None,
operation_name: str = None,
headers: dict = {},
init_payload: dict = {},
):
"""Make asynchronous request for GraphQL subscription."""
connection_init_message = json.dumps(
{"type": "connection_init", "payload": init_payload}
)
request_body = self.__request_body(
query=query, variables=variables, operation_name=operation_name
)
request_message = json.dumps(
{"type": "start", "id": "1", "payload": request_body}
)
async with websockets.connect(
self.endpoint,
subprotocols=["graphql-ws"],
extra_headers={**self.headers, **headers},
) as websocket:
await websocket.send(connection_init_message)
await websocket.send(request_message)
async for response_message in websocket:
response_body = json.loads(response_message)
if response_body["type"] == "connection_ack":
self.logger.info("the server accepted the connection")
elif response_body["type"] == "ka":
self.logger.info("the server sent a keep alive message")
else:
handle(response_body["payload"])