-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
112 lines (96 loc) · 3.92 KB
/
main.py
File metadata and controls
112 lines (96 loc) · 3.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from crowdstrike.foundry.function import Function, Request, Response, APIError
from falconpy import APIIntegrations
from logging import Logger
from typing import Dict
func = Function.instance()
@func.handler(method='POST', path='/ticket')
def on_post(request: Request, config: Dict[str, object] | None, logger: Logger) -> Response:
"""
Create an incident ticket in ServiceNow using the Table API.
Required fields in request body:
- title: Short description of the incident
- description: Detailed description of the incident
Optional fields:
- impact: Impact level (1-High, 2-Medium, 3-Low)
- urgency: Urgency level (1-High, 2-Medium, 3-Low)
- category: Category of the incident
- subcategory: Subcategory of the incident
- assignment_group: Group to assign the incident to
- caller_id: User ID of the caller
"""
# Validate required fields
if 'title' not in request.body or 'description' not in request.body:
return Response(
code=400,
errors=[APIError(code=400, message='Missing required fields: title and description')]
)
# Prepare payload for ServiceNow incident creation
payload = {
"short_description": request.body["title"],
"description": request.body["description"],
}
# Add optional fields if provided
optional_fields = {
"impact": request.body.get("impact", "2"),
"urgency": request.body.get("urgency", "2"),
"category": request.body.get("category"),
"subcategory": request.body.get("subcategory"),
"assignment_group": request.body.get("assignment_group"),
"caller_id": request.body.get("caller_id")
}
# Only include fields with non-None values
for field, value in optional_fields.items():
if value is not None:
payload[field] = value
logger.info(f"Creating ServiceNow incident with payload: {payload}")
try:
# Use the APIIntegrations client to call ServiceNow Table API to create incident
api = APIIntegrations()
response = api.execute_command_proxy(
definition_id="ServiceNow",
operation_id="POST__api_now_table_tablename",
params={
"path": {"tableName": "incident"}
},
request={
"json": payload,
"headers": {
"Accept": "application/json",
"Content-Type": "application/json"
}
}
)
# Log the raw response for troubleshooting
logger.info(f"ServiceNow API response: {response}")
if response["status_code"] >= 400:
error_message = response.get('error', {}).get('message', 'Unknown error')
return Response(
code=response["status_code"],
errors=[APIError(
code=response["status_code"],
message=f"ServiceNow integration error: {error_message}"
)]
)
# Extract relevant information from the response
result = response["body"]["result"]
# Return formatted response with incident details
return Response(
body={
"incident_id": result["sys_id"],
"incident_number": result["number"],
"state": result["state"],
"priority": result["priority"],
"created_at": result["sys_created_on"],
"assigned_to": result["assigned_to"],
"url": response["headers"]["Location"]
},
code=201 if response["status_code"] == 200 else response["status_code"]
)
except Exception as e:
logger.error(f"Error creating ServiceNow incident: {str(e)}", exc_info=True)
return Response(
code=500,
errors=[APIError(code=500, message=f"Error creating incident: {str(e)}")]
)
if __name__ == '__main__':
func.run()