-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
94 lines (73 loc) · 2.92 KB
/
main.py
File metadata and controls
94 lines (73 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import uuid
import os
from googleapiclient import discovery
from google.oauth2 import service_account
import json
import base64
from google.cloud import pubsub_v1
project_id = "project-id" # Enter project id
vpc_network = "projects/{project-id}/global/networks/{vpc-name}" # Enter the vpc
subscription_id = "Subscription-to-setup" # Make sure to set this to your actual Pub/Sub subscription ID
# Path to your service account key file
key_path = ""
def block_ip_from_vpc(source_ip):
# Use google.oauth2 service account credentials for Python 3
credentials = service_account.Credentials.from_service_account_file(
key_path,
scopes=['https://www.googleapis.com/auth/cloud-platform']
)
service = discovery.build('compute', 'v1', credentials=credentials)
# Generate a unique identifier for the firewall rule
unique_id = str(uuid.uuid4())
rule_name = f"block-ip-{unique_id}"
firewall_rule = {
"name": rule_name,
"network": vpc_network,
"direction": "INGRESS",
"priority": 1000,
"sourceRanges": [source_ip],
"denied": [
{
"IPProtocol": "all"
}
],
}
# Insert the firewall rule
request = service.firewalls().insert(project=project_id, body=firewall_rule)
response = request.execute()
print(f"Firewall rule '{rule_name}' created to block IP: {source_ip}")
def process_pubsub_message(message):
try:
# Print the entire Pub/Sub message for debugging
print(f"Received message: {message.data.decode('utf-8')}")
# Parse the Pub/Sub message
data = json.loads(message.data.decode("utf-8"))
# Print the parsed JSON data to understand the structure
print(f"Parsed data: {json.dumps(data, indent=4)}")
# Extract the source IP address from the jsonPayload
source_ip = data.get("jsonPayload", {}).get("source_ip_address")
if source_ip:
print(f"Blocking IP: {source_ip}")
block_ip_from_vpc(source_ip)
else:
print("Source IP not found in the message.")
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
except Exception as e:
print(f"Error processing message: {e}")
# Acknowledge the Pub/Sub message
message.ack()
def listen_for_threats():
# Initialize the Pub/Sub subscriber client
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
# Listen to the Pub/Sub subscription
streaming_pull_future = subscriber.subscribe(subscription_path, callback=process_pubsub_message)
print(f"Listening for messages on {subscription_path}...")
# Keep the main thread alive to keep receiving messages
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
if __name__ == "__main__":
listen_for_threats()