forked from auth0/jupiterone-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathbulk_upload.py
More file actions
75 lines (65 loc) · 2.03 KB
/
bulk_upload.py
File metadata and controls
75 lines (65 loc) · 2.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from jupiterone.client import JupiterOneClient
import os
account = os.environ.get("JUPITERONE_ACCOUNT")
token = os.environ.get("JUPITERONE_TOKEN")
url = "https://graphql.us.jupiterone.io"
# Check if credentials are available
if not account or not token:
print("Error: JUPITERONE_ACCOUNT and JUPITERONE_TOKEN environment variables must be set")
print("This example script requires valid JupiterOne credentials to run")
exit(1)
j1 = JupiterOneClient(account=account, token=token, url=url)
instance_id = "<Integration Instance ID>"
sync_job = j1.start_sync_job(
instance_id=instance_id,
sync_mode="PATCH",
source="integration-external"
)
print(sync_job)
sync_job_id = sync_job['job'].get('id')
# Prepare entities payload
entities_payload = [
{
"_key": "server-001",
"_type": "aws_ec2_instance",
"_class": "Host",
"displayName": "web-server-001",
"instanceId": "i-1234567890abcdef0",
"instanceType": "t3.micro",
"state": "running",
"tag.Environment": "production",
"tag.Team": "engineering"
},
{
"_key": "server-002",
"_type": "aws_ec2_instance",
"_class": "Host",
"displayName": "web-server-002",
"instanceId": "i-0987654321fedcba0",
"instanceType": "t3.small",
"state": "running",
"tag.Environment": "staging",
"tag.Team": "engineering"
},
{
"_key": "database-001",
"_type": "aws_rds_instance",
"_class": "Database",
"displayName": "prod-database",
"dbInstanceIdentifier": "prod-db",
"engine": "postgres",
"dbInstanceClass": "db.t3.micro",
"tag.Environment": "production",
"tag.Team": "data"
}
]
# Upload entities batch
result = j1.upload_entities_batch_json(
instance_job_id=sync_job_id,
entities_list=entities_payload
)
print(f"Uploaded {len(entities_payload)} entities")
print(result)
# Finalize the sync job
result = j1.finalize_sync_job(instance_job_id=sync_job_id)
print(f"Finalized sync job: {result['job']['id']}")