Skip to content

Commit 13b12d5

Browse files
committed
dist with GCS support
1 parent bdc1faa commit 13b12d5

3 files changed

Lines changed: 147 additions & 98 deletions

File tree

conf/default/distributed.conf.default

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,6 @@ token = ""
4242
autodiscovery = 600
4343
# Instances should start with following name pattern
4444
instance_name = cape-server
45+
46+
[gcs]
47+
enabled = no

modules/reporting/gcs.py

Lines changed: 122 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import zipfile
55
from lib.cuckoo.common.constants import CUCKOO_ROOT
66
from lib.cuckoo.common.abstracts import Report
7+
from lib.cuckoo.common.config import Config
78
from lib.cuckoo.common.exceptions import CuckooReportError
89

910
# Set up a logger for this module
@@ -19,6 +20,112 @@
1920
HAVE_GCS = False
2021

2122

23+
class GCSUploader:
24+
"""Helper class to upload files to GCS."""
25+
26+
def __init__(self, bucket_name=None, auth_by=None, credentials_path=None, exclude_dirs=None, exclude_files=None, mode=None):
27+
if not HAVE_GCS:
28+
raise ImportError("google-cloud-storage library is missing")
29+
30+
# Load from reporting.conf if parameters are missing
31+
if not bucket_name:
32+
cfg = Config("reporting")
33+
if not cfg.gcs.enabled:
34+
# If we are initializing purely for manual usage but config is disabled, we might want to allow it if params are passed.
35+
# But if params are missing AND config is disabled/missing, we can't proceed.
36+
pass
37+
38+
bucket_name = cfg.gcs.bucket_name
39+
auth_by = cfg.gcs.auth_by
40+
credentials_path_str = cfg.gcs.credentials_path
41+
42+
if credentials_path_str:
43+
credentials_path = os.path.join(CUCKOO_ROOT, credentials_path_str)
44+
45+
exclude_dirs_str = cfg.gcs.get("exclude_dirs", "")
46+
exclude_files_str = cfg.gcs.get("exclude_files", "")
47+
mode = cfg.gcs.get("mode", "file")
48+
49+
# Parse exclusion sets
50+
self.exclude_dirs = {item.strip() for item in exclude_dirs_str.split(",") if item.strip()}
51+
self.exclude_files = {item.strip() for item in exclude_files_str.split(",") if item.strip()}
52+
else:
53+
self.exclude_dirs = exclude_dirs if exclude_dirs else set()
54+
self.exclude_files = exclude_files if exclude_files else set()
55+
56+
self.mode = mode
57+
58+
if not bucket_name:
59+
raise ValueError("GCS bucket_name is not configured.")
60+
61+
if auth_by == "vm":
62+
self.storage_client = storage.Client()
63+
else:
64+
if not credentials_path or not os.path.exists(credentials_path):
65+
raise ValueError(f"Invalid credentials path: {credentials_path}")
66+
credentials = service_account.Credentials.from_service_account_file(credentials_path)
67+
self.storage_client = storage.Client(credentials=credentials)
68+
69+
self.bucket = self.storage_client.bucket(bucket_name)
70+
# We check bucket existence lazily or now?
71+
# dist.py might not want to crash on init if network is flaky, but validation is good.
72+
# Let's keep validation.
73+
# Note: bucket.exists() requires permissions.
74+
# if not self.bucket.exists():
75+
# raise ValueError(f"GCS Bucket '{bucket_name}' does not exist or is inaccessible.")
76+
77+
def _iter_files_to_upload(self, source_directory):
78+
"""Generator that yields files to be uploaded, skipping excluded ones."""
79+
for root, dirs, files in os.walk(source_directory):
80+
# Exclude specified directories
81+
dirs[:] = [d for d in dirs if d not in self.exclude_dirs]
82+
for filename in files:
83+
# Exclude specified files
84+
if filename in self.exclude_files:
85+
continue
86+
87+
local_path = os.path.join(root, filename)
88+
if not os.path.exists(local_path):
89+
continue
90+
relative_path = os.path.relpath(local_path, source_directory)
91+
yield local_path, relative_path
92+
93+
def upload(self, source_directory, analysis_id, tlp=None):
94+
if self.mode == "zip":
95+
self.upload_zip_archive(analysis_id, source_directory, tlp=tlp)
96+
else:
97+
self.upload_files_individually(analysis_id, source_directory, tlp=tlp)
98+
99+
def upload_zip_archive(self, analysis_id, source_directory, tlp=None):
100+
log.debug("Compressing and uploading files for analysis ID %s to GCS", analysis_id)
101+
blob_name = f"{analysis_id}_tlp_{tlp}.zip" if tlp else f"{analysis_id}.zip"
102+
103+
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip_file:
104+
tmp_zip_file_name = tmp_zip_file.name
105+
with zipfile.ZipFile(tmp_zip_file, "w", zipfile.ZIP_DEFLATED) as archive:
106+
for local_path, relative_path in self._iter_files_to_upload(source_directory):
107+
archive.write(local_path, relative_path)
108+
try:
109+
log.debug("Uploading '%s' to '%s'", tmp_zip_file_name, blob_name)
110+
blob = self.bucket.blob(blob_name)
111+
blob.upload_from_filename(tmp_zip_file_name)
112+
finally:
113+
os.unlink(tmp_zip_file_name)
114+
log.info("Successfully uploaded archive for analysis %s to GCS.", analysis_id)
115+
116+
def upload_files_individually(self, analysis_id, source_directory, tlp=None):
117+
log.debug("Uploading files for analysis ID %s to GCS", analysis_id)
118+
folder_name = f"{analysis_id}_tlp_{tlp}" if tlp else str(analysis_id)
119+
120+
for local_path, relative_path in self._iter_files_to_upload(source_directory):
121+
blob_name = f"{folder_name}/{relative_path}"
122+
# log.debug("Uploading '%s' to '%s'", local_path, blob_name)
123+
blob = self.bucket.blob(blob_name)
124+
blob.upload_from_filename(local_path)
125+
126+
log.info("Successfully uploaded files for analysis %s to GCS.", analysis_id)
127+
128+
22129
class GCS(Report):
23130
"""
24131
Uploads all analysis files to a Google Cloud Storage (GCS) bucket.
@@ -43,120 +150,37 @@ def run(self, results):
43150
return
44151

45152
tlp = results.get("info", {}).get("tlp")
153+
analysis_id = results.get("info", {}).get("id")
46154

47-
# Read configuration options from gcs.conf and validate them
48-
bucket_name = self.options.get("bucket_name")
49-
if not bucket_name:
50-
raise CuckooReportError("GCS bucket_name is not configured in reporting.conf -> gcs")
51-
auth_by = self.options.get("auth_by")
52-
if auth_by == "vm":
53-
storage_client = storage.Client()
54-
else:
55-
credentials_path_str = self.options.get("credentials_path")
56-
if not credentials_path_str:
57-
raise CuckooReportError("GCS credentials_path is not configured in reporting.conf -> gcs")
58-
59-
credentials_path = os.path.join(CUCKOO_ROOT, credentials_path_str)
60-
if not os.path.isfile(credentials_path):
61-
raise CuckooReportError(
62-
"GCS credentials_path '%s' is invalid or file does not exist in reporting.conf -> gcs", credentials_path
63-
)
155+
# We can now just use the Uploader.
156+
# But for backward compatibility with overrides in self.options (e.g. per-module config overrides in Cuckoo),
157+
# we should pass options explicitly if they differ from default config.
158+
# However, typically reporting.conf is the source.
64159

65-
credentials = service_account.Credentials.from_service_account_file(credentials_path)
66-
storage_client = storage.Client(credentials=credentials)
67-
68-
# Read the exclusion lists, defaulting to empty strings
160+
# Parse exclusion lists from self.options to respect local module config
69161
exclude_dirs_str = self.options.get("exclude_dirs", "")
70162
exclude_files_str = self.options.get("exclude_files", "")
71-
72-
# Parse the exclusion strings into sets for efficient lookups
73163
exclude_dirs = {item.strip() for item in exclude_dirs_str.split(",") if item.strip()}
74164
exclude_files = {item.strip() for item in exclude_files_str.split(",") if item.strip()}
75165

76-
if exclude_dirs:
77-
log.debug("GCS reporting will exclude directories: %s", exclude_dirs)
78-
if exclude_files:
79-
log.debug("GCS reporting will exclude files: %s", exclude_files)
80-
81-
# Get the upload mode, defaulting to 'file' for backward compatibility
166+
# We manually construct to respect self.options
167+
bucket_name = self.options.get("bucket_name")
168+
auth_by = self.options.get("auth_by")
169+
credentials_path_str = self.options.get("credentials_path")
170+
credentials_path = None
171+
if credentials_path_str:
172+
credentials_path = os.path.join(CUCKOO_ROOT, credentials_path_str)
82173
mode = self.options.get("mode", "file")
83174

84175
try:
85-
# --- Authentication ---
86-
log.debug("Authenticating with Google Cloud Storage...")
87-
bucket = storage_client.bucket(bucket_name)
176+
uploader = GCSUploader(bucket_name, auth_by, credentials_path, exclude_dirs, exclude_files, mode)
88177

89-
# Check if the bucket exists and is accessible
90-
if not bucket.exists():
91-
raise CuckooReportError(
92-
"The specified GCS bucket '%s' does not exist or you don't have permission to access it.", bucket_name
93-
)
94-
95-
analysis_id = results.get("info", {}).get("id")
96178
if not analysis_id:
97179
raise CuckooReportError("Could not get analysis ID from results.")
98180

99181
source_directory = self.analysis_path
100182

101-
if mode == "zip":
102-
self.upload_zip_archive(bucket, analysis_id, source_directory, exclude_dirs, exclude_files, tlp=tlp)
103-
elif mode == "file":
104-
self.upload_files_individually(bucket, analysis_id, source_directory, exclude_dirs, exclude_files, tlp=tlp)
105-
else:
106-
raise CuckooReportError("Invalid GCS upload mode specified: %s. Must be 'file' or 'zip'.", mode)
183+
uploader.upload(source_directory, analysis_id, tlp)
107184

108185
except Exception as e:
109186
raise CuckooReportError(f"Failed to upload report to GCS: {e}") from e
110-
111-
def _iter_files_to_upload(self, source_directory, exclude_dirs, exclude_files):
112-
"""Generator that yields files to be uploaded, skipping excluded ones."""
113-
for root, dirs, files in os.walk(source_directory):
114-
# Exclude specified directories
115-
dirs[:] = [d for d in dirs if d not in exclude_dirs]
116-
for filename in files:
117-
# Exclude specified files
118-
if filename in exclude_files:
119-
log.debug("Skipping excluded file: %s", os.path.join(root, filename))
120-
continue
121-
122-
local_path = os.path.join(root, filename)
123-
if not os.path.exists(local_path):
124-
continue
125-
relative_path = os.path.relpath(local_path, source_directory)
126-
yield local_path, relative_path
127-
128-
def upload_zip_archive(self, bucket, analysis_id, source_directory, exclude_dirs, exclude_files, tlp=None):
129-
"""Compresses and uploads the analysis directory as a single zip file."""
130-
log.debug("Compressing and uploading files for analysis ID %d to GCS bucket '%s'", analysis_id, bucket.name)
131-
if tlp:
132-
zip_name = "%s_tlp_%s.zip" % analysis_id, tlp
133-
else:
134-
zip_name = "%s.zip" % analysis_id
135-
blob_name = zip_name
136-
137-
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip_file:
138-
tmp_zip_file_name = tmp_zip_file.name
139-
with zipfile.ZipFile(tmp_zip_file, "w", zipfile.ZIP_DEFLATED) as archive:
140-
for local_path, relative_path in self._iter_files_to_upload(source_directory, exclude_dirs, exclude_files):
141-
archive.write(local_path, relative_path)
142-
try:
143-
log.debug("Uploading '%s' to '%s'", tmp_zip_file_name, blob_name)
144-
blob = bucket.blob(blob_name)
145-
blob.upload_from_filename(tmp_zip_file_name)
146-
finally:
147-
os.unlink(tmp_zip_file_name)
148-
log.info("Successfully uploaded archive for analysis %d to GCS.", analysis_id)
149-
150-
def upload_files_individually(self, bucket, analysis_id, source_directory, exclude_dirs, exclude_files, tlp=None):
151-
"""Uploads analysis files individually to the GCS bucket."""
152-
log.debug("Uploading files for analysis ID %d to GCS bucket '%s'", analysis_id, bucket.name)
153-
folder_name = analysis_id
154-
if tlp:
155-
folder_name = "%s_tlp_%s" % analysis_id, tlp
156-
for local_path, relative_path in self._iter_files_to_upload(source_directory, exclude_dirs, exclude_files):
157-
blob_name = f"{folder_name}/{relative_path}"
158-
log.debug("Uploading '%s' to '%s'", local_path, blob_name)
159-
blob = bucket.blob(blob_name)
160-
blob.upload_from_filename(local_path)
161-
162-
log.info("Successfully uploaded files for analysis %d to GCS.", analysis_id)

utils/dist.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,18 @@
9595
NFS_FETCH = dist_conf.distributed.get("nfs")
9696
RESTAPI_FETCH = dist_conf.distributed.get("restapi")
9797

98+
# GCS Configuration
99+
GCS_ENABLED = dist_conf.gcs.enabled
100+
101+
if GCS_ENABLED:
102+
from modules.reporting.gcs import GCSUploader
103+
try:
104+
# Initialize without args to load from reporting.conf
105+
gcs_uploader = GCSUploader()
106+
except Exception as e:
107+
log.error("Failed to initialize GCS Uploader: %s", e)
108+
GCS_ENABLED = False
109+
98110
INTERVAL = 10
99111

100112
# controller of dead nodes
@@ -994,6 +1006,16 @@ def fetch_latest_reports_nfs(self):
9941006
except Exception as e:
9951007
log.exception("Failed to save iocs for parent sample: %s", str(e))
9961008

1009+
if GCS_ENABLED:
1010+
try:
1011+
# We assume report_path is the analysis folder root.
1012+
# TLP is not readily available in 't' object without loading report.json or task options.
1013+
# We can try to get TLP from task options if available, or just pass None.
1014+
tlp = t.tlp
1015+
gcs_uploader.upload(report_path, t.main_task_id, tlp=tlp)
1016+
except Exception as e:
1017+
log.error("Failed to upload report to GCS for task %d: %s", t.main_task_id, e)
1018+
9971019
t.retrieved = True
9981020
t.finished = True
9991021
db.commit()

0 commit comments

Comments
 (0)