forked from auth0/jupiterone-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy path09_custom_file_transfer_example.py
More file actions
315 lines (250 loc) · 10.6 KB
/
09_custom_file_transfer_example.py
File metadata and controls
315 lines (250 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
#!/usr/bin/env python3
"""
Custom File Transfer (CFT) Integration Example
This script demonstrates how to use the JupiterOne Python client to:
1. Get an upload URL for Custom File Transfer integration
2. Upload a CSV file to the integration
3. Invoke the integration to process the uploaded file
Prerequisites:
- JupiterOne account with API access
- Custom File Transfer integration instance configured
- CSV file to upload
Usage:
python 09_custom_file_transfer_example.py
"""
import os
import sys
# Add the parent directory to the path so we can import the jupiterone client
try:
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
except NameError:
# Handle case when __file__ is not available (e.g., when exec'd)
sys.path.append('..')
from jupiterone.client import JupiterOneClient
def setup_client():
"""Set up the JupiterOne client with credentials."""
# You can set these as environment variables or replace with your actual values
account = os.getenv('JUPITERONE_ACCOUNT')
token = os.getenv('JUPITERONE_API_TOKEN')
if not account or not token:
print("Error: Please set JUPITERONE_ACCOUNT and JUPITERONE_API_TOKEN environment variables")
print("Example:")
print("export JUPITERONE_ACCOUNT='your-account-id'")
print("export JUPITERONE_API_TOKEN='your-api-token'")
sys.exit(1)
try:
client = JupiterOneClient(account=account, token=token)
print(f"✅ Successfully connected to JupiterOne account: {account}")
return client
except Exception as e:
print(f"❌ Failed to connect to JupiterOne: {e}")
sys.exit(1)
def get_cft_upload_url_example(client, integration_instance_id, filename, dataset_id):
"""
Example of getting an upload URL for Custom File Transfer integration.
Args:
client: JupiterOne client instance
integration_instance_id: ID of the CFT integration instance
filename: Name of the file to upload
dataset_id: Dataset ID for the upload
"""
print("\n" + "="*60)
print("1. GETTING CFT UPLOAD URL")
print("="*60)
try:
print(f"Requesting upload URL for:")
print(f" - Integration Instance ID: {integration_instance_id}")
print(f" - Filename: {filename}")
print(f" - Dataset ID: {dataset_id}")
upload_info = client.get_cft_upload_url(
integration_instance_id=integration_instance_id,
filename=filename,
dataset_id=dataset_id
)
print("✅ Upload URL obtained successfully!")
print(f" - Upload URL: {upload_info['uploadUrl']}")
print(f" - Expires In: {upload_info['expiresIn']} seconds")
return upload_info
except Exception as e:
print(f"❌ Failed to get upload URL: {e}")
return None
def upload_cft_file_example(client, upload_url, file_path):
"""
Example of uploading a CSV file to Custom File Transfer integration.
Args:
client: JupiterOne client instance
upload_url: Upload URL obtained from get_cft_upload_url()
file_path: Local path to the CSV file
"""
print("\n" + "="*60)
print("2. UPLOADING CSV FILE")
print("="*60)
try:
# Check if file exists
if not os.path.exists(file_path):
print(f"❌ File not found: {file_path}")
return None
# Check if file is CSV
if not file_path.lower().endswith('.csv'):
print(f"❌ File must be a CSV file. Got: {file_path}")
return None
print(f"Uploading file:")
print(f" - File path: {file_path}")
print(f" - File size: {os.path.getsize(file_path)} bytes")
print(f" - Upload URL: {upload_url}")
# Upload the file
result = client.upload_cft_file(
upload_url=upload_url,
file_path=file_path
)
print("✅ File upload completed!")
print(f" - Status code: {result['status_code']}")
print(f" - Success: {result['success']}")
print(f" - Response headers: {result['headers']}")
if result['success']:
print(" - File uploaded successfully to JupiterOne")
else:
print(f" - Upload failed. Response data: {result['response_data']}")
return result
except Exception as e:
print(f"❌ Failed to upload file: {e}")
return None
def invoke_cft_integration_example(client, integration_instance_id):
"""
Example of invoking a Custom File Transfer integration instance.
Args:
client: JupiterOne client instance
integration_instance_id: ID of the CFT integration instance
"""
print("\n" + "="*60)
print("3. INVOKING CFT INTEGRATION")
print("="*60)
try:
print(f"Invoicing integration instance:")
print(f" - Integration Instance ID: {integration_instance_id}")
# Invoke the integration
result = client.invoke_cft_integration(
integration_instance_id=integration_instance_id
)
if result == True:
print("✅ Integration invoked successfully!")
print(" - The integration is now processing the uploaded file")
elif result == 'ALREADY_RUNNING':
print("⚠️ Integration is already running")
print(" - The integration instance is currently executing")
else:
print("❌ Integration invocation failed")
print(" - Check the integration instance configuration")
return result
except Exception as e:
print(f"❌ Failed to invoke integration: {e}")
return None
def complete_workflow_example(client, integration_instance_id, file_path, dataset_id):
"""
Complete workflow example combining all three methods.
Args:
client: JupiterOne client instance
integration_instance_id: ID of the CFT integration instance
file_path: Local path to the CSV file
dataset_id: Dataset ID for the upload
"""
print("\n" + "="*60)
print("COMPLETE CFT WORKFLOW")
print("="*60)
print("Starting complete Custom File Transfer workflow...")
# Step 1: Get upload URL
upload_info = get_cft_upload_url_example(
client, integration_instance_id, os.path.basename(file_path), dataset_id
)
if not upload_info:
print("❌ Workflow failed at step 1: Getting upload URL")
return False
# Step 2: Upload file
upload_result = upload_cft_file_example(client, upload_info['uploadUrl'], file_path)
if not upload_result or not upload_result['success']:
print("❌ Workflow failed at step 2: Uploading file")
return False
# Step 3: Invoke integration
invoke_result = invoke_cft_integration_example(client, integration_instance_id)
if invoke_result == True:
print("\n🎉 Complete workflow successful!")
print(" - File uploaded successfully")
print(" - Integration invoked successfully")
print(" - Data processing has begun")
return True
elif invoke_result == 'ALREADY_RUNNING':
print("\n⚠️ Workflow partially successful")
print(" - File uploaded successfully")
print(" - Integration is already running")
return True
else:
print("\n❌ Workflow failed at step 3: Invoking integration")
return False
def main():
"""Main function demonstrating the CFT methods."""
print("🚀 JupiterOne Custom File Transfer Integration Examples")
print("="*60)
# Configuration - Replace these with your actual values
INTEGRATION_INSTANCE_ID = os.getenv('J1_CFT_INSTANCE_ID', 'your-integration-instance-id')
DATASET_ID = os.getenv('J1_CFT_DATASET_ID', 'your-dataset-id')
CSV_FILE_PATH = os.getenv('J1_CSV_FILE_PATH', 'examples/scanned_hosts.csv')
# Check if we have the required configuration
if INTEGRATION_INSTANCE_ID == 'your-integration-instance-id':
print("⚠️ Configuration required:")
print("Set the following environment variables:")
print(" - J1_CFT_INSTANCE_ID: Your CFT integration instance ID")
print(" - J1_CFT_DATASET_ID: Your dataset ID")
print(" - J1_CSV_FILE_PATH: Path to your CSV file (optional, defaults to examples/scanned_hosts.csv)")
print("\nExample:")
print("export J1_CFT_INSTANCE_ID='123e4567-e89b-12d3-a456-426614174000'")
print("export J1_CFT_DATASET_ID='dataset-123'")
print("export J1_CSV_FILE_PATH='/path/to/your/file.csv'")
print("\nOr update the variables in this script directly.")
return
# Set up the client
client = setup_client()
# Individual method examples
print("\n📚 INDIVIDUAL METHOD EXAMPLES")
# Example 1: Get upload URL
upload_info = get_cft_upload_url_example(
client, INTEGRATION_INSTANCE_ID, os.path.basename(CSV_FILE_PATH), DATASET_ID
)
if upload_info:
# Example 2: Upload file
upload_result = upload_cft_file_example(
client, upload_info['uploadUrl'], CSV_FILE_PATH
)
if upload_result and upload_result['success']:
# Example 3: Invoke integration
invoke_cft_integration_example(client, INTEGRATION_INSTANCE_ID)
# Complete workflow example
print("\n🔄 COMPLETE WORKFLOW EXAMPLE")
complete_workflow_example(client, INTEGRATION_INSTANCE_ID, CSV_FILE_PATH, DATASET_ID)
print("\n" + "="*60)
print("📖 USAGE PATTERNS")
print("="*60)
print("""
Common usage patterns:
1. Single file upload and processing:
upload_info = client.get_cft_upload_url(instance_id, filename, dataset_id)
upload_result = client.upload_cft_file(upload_info['uploadUrl'], file_path)
if upload_result['success']:
client.invoke_cft_integration(instance_id)
2. Batch processing multiple files:
for file_path in csv_files:
upload_info = client.get_cft_upload_url(instance_id, filename, dataset_id)
client.upload_cft_file(upload_info['uploadUrl'], file_path)
# Invoke integration once after all files are uploaded
client.invoke_cft_integration(instance_id)
3. Error handling and retries:
try:
result = client.upload_cft_file(upload_url, file_path)
if result['success']:
print("Upload successful")
else:
print(f"Upload failed: {result['response_data']}")
except Exception as e:
print(f"Upload error: {e}")
""")
if __name__ == "__main__":
main()