forked from auth0/jupiterone-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathsync_job_workflow.py
More file actions
136 lines (115 loc) · 4.76 KB
/
sync_job_workflow.py
File metadata and controls
136 lines (115 loc) · 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
"""
JupiterOne Sync Job Workflow Example
This example demonstrates the core synchronization job workflow:
1. Start a sync job
2. Upload entities batch
3. Finalize the sync job
This is a standalone script that can be run independently to test
the sync job functionality.
"""
import os
import sys
from jupiterone.client import JupiterOneClient
def main():
"""Main function to demonstrate sync job workflow."""
# Initialize JupiterOne client
# You can set these as environment variables or replace with your values
api_token = os.getenv('JUPITERONE_API_TOKEN')
account_id = os.getenv('JUPITERONE_ACCOUNT_ID')
if not api_token or not account_id:
print("Error: Please set JUPITERONE_API_TOKEN and JUPITERONE_ACCOUNT_ID environment variables")
print("Example:")
print(" export JUPITERONE_API_TOKEN='your-api-token'")
print(" export JUPITERONE_ACCOUNT_ID='your-account-id'")
sys.exit(1)
# Create JupiterOne client
j1 = JupiterOneClient(api_token=api_token, account_id=account_id)
print("=== JupiterOne Sync Job Workflow Example ===\n")
# You'll need to replace this with an actual integration instance ID
instance_id = os.getenv('JUPITERONE_INSTANCE_ID')
if not instance_id:
print("Error: Please set JUPITERONE_INSTANCE_ID environment variable")
print("Example:")
print(" export JUPITERONE_INSTANCE_ID='your-integration-instance-id'")
sys.exit(1)
try:
# Step 1: Start sync job
print("1. Starting synchronization job...")
sync_job = j1.start_sync_job(
instance_id=instance_id,
sync_mode="PATCH",
source="integration-external"
)
sync_job_id = sync_job['job'].get('id')
print(f"✓ Started sync job: {sync_job_id}")
print(f" Status: {sync_job['job']['status']}")
print()
# Step 2: Upload entities batch
print("2. Uploading entities batch...")
# Sample entities payload
entities_payload = [
{
"_key": "example-server-001",
"_type": "example_server",
"_class": "Host",
"displayName": "Example Server 001",
"hostname": "server-001.example.com",
"ipAddress": "192.168.1.100",
"operatingSystem": "Linux",
"tag.Environment": "development",
"tag.Team": "engineering",
"tag.Purpose": "web_server"
},
{
"_key": "example-server-002",
"_type": "example_server",
"_class": "Host",
"displayName": "Example Server 002",
"hostname": "server-002.example.com",
"ipAddress": "192.168.1.101",
"operatingSystem": "Linux",
"tag.Environment": "staging",
"tag.Team": "engineering",
"tag.Purpose": "database_server"
},
{
"_key": "example-database-001",
"_type": "example_database",
"_class": "Database",
"displayName": "Example Database 001",
"databaseName": "app_db",
"engine": "postgresql",
"version": "13.4",
"tag.Environment": "development",
"tag.Team": "data"
}
]
# Upload entities
upload_result = j1.upload_entities_batch_json(
instance_job_id=sync_job_id,
entities_list=entities_payload
)
print(f"✓ Uploaded {len(entities_payload)} entities successfully")
print(f" Upload result: {upload_result}")
print()
# Step 3: Finalize sync job
print("3. Finalizing synchronization job...")
finalize_result = j1.finalize_sync_job(instance_job_id=sync_job_id)
finalize_job_id = finalize_result['job'].get('id')
print(f"✓ Finalized sync job: {finalize_job_id}")
print(f" Status: {finalize_result['job']['status']}")
# Check final status
if finalize_result['job']['status'] == 'COMPLETED':
print("✓ Sync job completed successfully!")
elif finalize_result['job']['status'] == 'FAILED':
error_msg = finalize_result['job'].get('error', 'Unknown error')
print(f"✗ Sync job failed: {error_msg}")
else:
print(f"ℹ Sync job status: {finalize_result['job']['status']}")
print("\n=== Sync Job Workflow Complete ===")
except Exception as e:
print(f"✗ Error during sync job workflow: {e}")
sys.exit(1)
if __name__ == "__main__":
main()