-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmain.py
More file actions
93 lines (76 loc) · 2.93 KB
/
main.py
File metadata and controls
93 lines (76 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
"""Main module for the log-event function handler."""
import os
import time
import uuid
from crowdstrike.foundry.function import Function, Request, Response, APIError
from falconpy import APIHarnessV2
FUNC = Function.instance()
@FUNC.handler(method="POST", path="/log-event")
def on_post(request: Request) -> Response:
"""
Handle POST requests to /log-event endpoint.
Args:
request: The incoming request object containing the request body.
Returns:
Response: JSON response with event storage result or error message.
"""
# Validate request
if "event_data" not in request.body:
return Response(
code=400,
errors=[APIError(code=400, message="missing event_data")]
)
event_data = request.body["event_data"]
try:
# Store data in a collection
# This assumes you've already created a collection named "event_logs"
event_id = str(uuid.uuid4())
json_data = {
"event_id": event_id,
"data": event_data,
"timestamp": int(time.time())
}
# Allow setting APP_ID as an env variable for local testing
headers = {}
if os.environ.get("APP_ID"):
headers = {
"X-CS-APP-ID": os.environ.get("APP_ID")
}
api_client = APIHarnessV2()
collection_name = "event_logs"
response = api_client.command("PutObject",
body=json_data,
collection_name=collection_name,
object_key=event_id,
headers=headers
)
if response["status_code"] != 200:
error_message = response.get("error", {}).get("message", "Unknown error")
return Response(
code=response["status_code"],
errors=[APIError(
code=response["status_code"],
message=f"Failed to store event: {error_message}"
)]
)
# Query the collection to retrieve the event by id
query_response = api_client.command("SearchObjects",
filter=f"event_id:'{event_id}'",
collection_name=collection_name,
limit=5,
headers=headers
)
return Response(
body={
"stored": True,
"metadata": query_response.get("body").get("resources", [])
},
code=200
)
except (ConnectionError, ValueError, KeyError) as e:
return Response(
code=500,
errors=[APIError(code=500, message=f"Error saving collection: {str(e)}")]
)
if __name__ == "__main__":
FUNC.run()