Skip to content

Commit 1d828a9

Browse files
committed
Update client.py
1 parent c319507 commit 1d828a9

File tree

1 file changed

+24
-41
lines changed

1 file changed

+24
-41
lines changed

jupiterone/client.py

Lines changed: 24 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,6 @@
5858
UPSERT_PARAMETER,
5959
)
6060

61-
def retry_on_429(exc):
62-
"""Used to trigger retry on rate limit"""
63-
return isinstance(exc, JupiterOneApiRetryError)
64-
65-
6661
class JupiterOneClient:
6762
"""Python client class for the JupiterOne GraphQL API"""
6863

@@ -87,7 +82,7 @@ def __init__(
8782
"JupiterOne-Account": self.account,
8883
"Content-Type": "application/json",
8984
}
90-
85+
9186
# Initialize session with retry logic
9287
self.session = requests.Session()
9388
retries = Retry(
@@ -178,14 +173,14 @@ def _execute_query(self, query: str, variables: Dict = None) -> Dict:
178173
raise JupiterOneApiError("{}:{}".format(response.status_code, content))
179174

180175
def _cursor_query(
181-
self,
182-
query: str,
183-
cursor: str = None,
176+
self,
177+
query: str,
178+
cursor: str = None,
184179
include_deleted: bool = False,
185180
max_workers: Optional[int] = None
186181
) -> Dict:
187182
"""Performs a V1 graph query using cursor pagination with optional parallel processing
188-
183+
189184
args:
190185
query (str): Query text
191186
cursor (str): A pagination cursor for the initial query
@@ -203,7 +198,7 @@ def _cursor_query(
203198
result_limit = False
204199

205200
results: List = []
206-
201+
207202
def fetch_page(cursor: Optional[str] = None) -> Dict:
208203
variables = {"query": query, "includeDeleted": include_deleted}
209204
if cursor is not None:
@@ -219,7 +214,7 @@ def fetch_page(cursor: Optional[str] = None) -> Dict:
219214
return data
220215

221216
results.extend(data)
222-
217+
223218
# If no cursor or we've hit the limit, return early
224219
if not response["data"]["queryV1"].get("cursor") or (result_limit and len(results) >= result_limit):
225220
return {"data": results[:result_limit] if result_limit else results}
@@ -228,24 +223,24 @@ def fetch_page(cursor: Optional[str] = None) -> Dict:
228223
if max_workers and max_workers > 1:
229224
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
230225
future_to_cursor = {
231-
executor.submit(fetch_page, response["data"]["queryV1"]["cursor"]):
226+
executor.submit(fetch_page, response["data"]["queryV1"]["cursor"]):
232227
response["data"]["queryV1"]["cursor"]
233228
}
234-
229+
235230
while future_to_cursor:
236231
# Wait for the next future to complete
237232
done, _ = concurrent.futures.wait(
238233
future_to_cursor,
239234
return_when=concurrent.futures.FIRST_COMPLETED
240235
)
241-
236+
242237
for future in done:
243238
cursor = future_to_cursor.pop(future)
244239
try:
245240
response = future.result()
246241
page_data = response["data"]["queryV1"]["data"]
247242
results.extend(page_data)
248-
243+
249244
# Check if we need to fetch more pages
250245
if (result_limit and len(results) >= result_limit) or \
251246
not response["data"]["queryV1"].get("cursor"):
@@ -254,11 +249,11 @@ def fetch_page(cursor: Optional[str] = None) -> Dict:
254249
f.cancel()
255250
future_to_cursor.clear()
256251
break
257-
252+
258253
# Schedule next page fetch
259254
next_cursor = response["data"]["queryV1"]["cursor"]
260255
future_to_cursor[executor.submit(fetch_page, next_cursor)] = next_cursor
261-
256+
262257
except Exception as e:
263258
# Log error but continue with other pages
264259
print(f"Error fetching page with cursor {cursor}: {str(e)}")
@@ -269,7 +264,7 @@ def fetch_page(cursor: Optional[str] = None) -> Dict:
269264
response = fetch_page(cursor)
270265
data = response["data"]["queryV1"]["data"]
271266
results.extend(data)
272-
267+
273268
if result_limit and len(results) >= result_limit:
274269
break
275270
elif not response["data"]["queryV1"].get("cursor"):
@@ -313,15 +308,15 @@ def _limit_and_skip_query(
313308
page += 1
314309

315310
return {"data": results}
316-
311+
317312
def query_with_deferred_response(self, query, cursor=None):
318313
"""
319314
Execute a J1QL query that returns a deferred response for handling large result sets.
320-
315+
321316
Args:
322317
query (str): The J1QL query to execute
323318
cursor (str, optional): Pagination cursor for subsequent requests
324-
319+
325320
Returns:
326321
list: Combined results from all paginated responses
327322
"""
@@ -347,12 +342,8 @@ def query_with_deferred_response(self, query, cursor=None):
347342

348343
for attempt in range(1, max_retries + 1):
349344

350-
session = requests.Session()
351-
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504, 429])
352-
session.mount('https://', HTTPAdapter(max_retries=retries))
353-
354345
# Get the download URL
355-
url_response = session.post(
346+
url_response = self.session.post(
356347
self.graphql_url,
357348
headers=self.headers,
358349
json=payload,
@@ -372,7 +363,7 @@ def query_with_deferred_response(self, query, cursor=None):
372363

373364
# Poll the download URL until results are ready
374365
while True:
375-
download_response = session.get(download_url, timeout=60).json()
366+
download_response = self.session.get(download_url, timeout=60).json()
376367
status = download_response['status']
377368

378369
if status != 'IN_PROGRESS':
@@ -392,17 +383,13 @@ def query_with_deferred_response(self, query, cursor=None):
392383
else:
393384
print(f"Request failed after {max_retries} attempts. Status: {url_response.status_code}")
394385

395-
return all_query_results
386+
return all_query_results
396387

397388
def _execute_syncapi_request(self, endpoint: str, payload: Dict = None) -> Dict:
398389
"""Executes POST request to SyncAPI endpoints"""
399390

400-
# initiate requests session and implement retry logic of 5 request retries with 1 second between
401-
s = requests.Session()
402-
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
403-
s.mount("https://", HTTPAdapter(max_retries=retries))
404-
405-
response = s.post(
391+
# initiate requests session and implement retry logic of 5 request retries with 1 second between retries
392+
response = self.session.post(
406393
self.sync_url + endpoint, headers=self.headers, json=payload, timeout=60
407394
)
408395

@@ -819,7 +806,7 @@ def update_integration_instance_config_value(
819806
):
820807
"""Update a single config k:v pair existing on a configured Integration Instance."""
821808

822-
# fetch existing instnace configuration
809+
# fetch existing instance configuration
823810
instance_config = self.get_integration_instance_details(instance_id=instance_id)
824811
config_dict = instance_config["data"]["integrationInstance"]["config"]
825812

@@ -1239,12 +1226,8 @@ def fetch_evaluation_result_download_url(self, raw_data_key: str = None):
12391226
def fetch_downloaded_evaluation_results(self, download_url: str = None):
12401227
"""Return full Alert Rule J1QL results from Download URL"""
12411228
# initiate requests session and implement retry logic of 5 request retries with 1 second between
1242-
s = requests.Session()
1243-
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
1244-
s.mount("https://", HTTPAdapter(max_retries=retries))
1245-
12461229
try:
1247-
response = s.get(download_url, timeout=60)
1230+
response = self.session.get(download_url, timeout=60)
12481231

12491232
return response.json()
12501233

0 commit comments

Comments
 (0)