-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
731 lines (625 loc) · 28.6 KB
/
main.py
File metadata and controls
731 lines (625 loc) · 28.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
#!/usr/bin/env python3
"""
OhMyMalware Security Data Ingestion Script for Elasticsearch
This script follows the specific instructions from the ohmymalware repository:
1. Creates separate indices for events (logs-endpoint.events.imported) and alerts (logs-endpoint.alerts.imported)
2. Uses mapping.total_fields.limit of 6000 as specified in README
3. Downloads and uses the official mapping.json from the repository
4. Creates ingest pipeline with event.ingested field for Elastic Security rule compatibility
5. Processes all Event Data including Episode directories
6. Handles data stream creation if required by Elasticsearch templates
7. Routes documents to correct indices based on content type
Note: Modern Elasticsearch versions may automatically create data streams for logs-* indices.
The script handles this automatically and creates either a regular index or data stream as needed.
This implementation matches the exact requirements from the ohmymalware README for triggering
Elastic Security detections and using the correct index patterns.
Downloads data from jamesspi/ohmymalware GitHub repository and indexes it properly.
"""
import csv
import json
import logging
from datetime import datetime
from typing import Any, Dict, List
import pandas as pd
import requests
from elasticsearch import Elasticsearch, helpers
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class SecurityDataIngester:
def __init__(
self,
es_host: str = "localhost",
es_port: int = 9200,
es_user: str = None,
es_password: str = None,
):
"""Initialize Elasticsearch connection"""
if es_user and es_password:
self.es = Elasticsearch(
[f"http://{es_host}:{es_port}"], basic_auth=(es_user, es_password)
)
else:
self.es = Elasticsearch([f"http://{es_host}:{es_port}"])
self.base_url = (
"https://raw.githubusercontent.com/jamesspi/ohmymalware/main/Event%20Data"
)
self.pipeline_name = "ohmymalware-security-pipeline"
self.events_index = "logs-endpoint.events.imported"
self.alerts_index = "logs-endpoint.alerts-imported"
def create_ingest_pipeline(self):
"""Create ingest pipeline for Elastic Security rule compatibility"""
pipeline_config = {
"description": "Ingest pipeline for ohmymalware data to trigger Elastic Security rules",
"processors": [
{
"date": {
"field": "@timestamp",
"formats": ["ISO8601"],
"output_format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSSXXX",
}
},
{"set": {"field": "event.ingested", "value": "{{_ingest.timestamp}}"}},
],
}
try:
self.es.ingest.put_pipeline(id=self.pipeline_name, body=pipeline_config)
logger.info(f"Created ingest pipeline: {self.pipeline_name}")
except Exception as e:
logger.error(f"Error creating ingest pipeline: {e}")
raise
def create_indices_and_pipeline(self):
"""Create Elasticsearch indices/data streams and ingest pipeline following ohmymalware repository instructions"""
# First create the ingest pipeline
self.create_ingest_pipeline()
try:
# Download the mapping.json file from the repository
mapping_url = "https://raw.githubusercontent.com/jamesspi/ohmymalware/main/Event%20Data/mapping.json"
response = requests.get(mapping_url)
response.raise_for_status()
# Parse the mapping from the repository
repo_mapping = json.loads(response.text)
logger.info("Successfully downloaded mapping.json from repository")
except Exception as e:
logger.warning(
f"Could not download mapping.json: {e}. Using fallback mapping."
)
# Fallback to basic mapping if mapping.json is not available
repo_mapping = {
"properties": {
"@timestamp": {"type": "date"},
"event_type": {"type": "keyword"},
"source_file": {"type": "keyword"},
"episode": {"type": "keyword"},
"raw_data": {"type": "text"},
"parsed_data": {"type": "object", "enabled": True},
"ingestion_timestamp": {"type": "date"},
"event": {"type": "object", "enabled": True},
}
}
# Create both indices (events and alerts)
for index_name in [self.events_index, self.alerts_index]:
try:
# Check if index or data stream already exists
if self.es.indices.exists(index=index_name):
logger.info(f"Index/data stream {index_name} already exists")
continue
# Try creating as regular index first
try:
index_config = {
"mappings": repo_mapping,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"mapping.total_fields.limit": "6000",
"index.default_pipeline": self.pipeline_name,
},
}
self.es.indices.create(index=index_name, body=index_config)
logger.info(f"Created index: {index_name}")
except Exception as index_error:
if "data streams only" in str(index_error):
# Create as data stream instead
logger.info(
f"Index template requires data stream for {index_name}. Creating data stream..."
)
try:
# Create index template for the data stream
template_name = f"{index_name}-template"
template_config = {
"index_patterns": [index_name],
"data_stream": {},
"template": {
"mappings": repo_mapping,
"settings": {
"mapping.total_fields.limit": "6000",
"index.default_pipeline": self.pipeline_name,
},
},
}
# Create the index template
self.es.indices.put_index_template(
name=template_name, body=template_config
)
logger.info(f"Created index template: {template_name}")
# Create the data stream
self.es.indices.create_data_stream(name=index_name)
logger.info(f"Created data stream: {index_name}")
except AttributeError:
# Fallback for older Elasticsearch versions that don't support data streams
logger.warning(
"Data streams not supported in this Elasticsearch version"
)
# Try with a different index name
fallback_index = f"security-{index_name.replace('logs-', '').replace('.', '-')}"
logger.info(
f"Trying with alternative index name: {fallback_index}"
)
index_config = {
"mappings": repo_mapping,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"mapping.total_fields.limit": "6000",
"index.default_pipeline": self.pipeline_name,
},
}
self.es.indices.create(
index=fallback_index, body=index_config
)
logger.info(
f"Created index with alternative name: {fallback_index}"
)
# Update index name for this fallback
if index_name == self.events_index:
self.events_index = fallback_index
else:
self.alerts_index = fallback_index
except Exception as ds_error:
logger.error(
f"Could not create data stream for {index_name}: {ds_error}"
)
# Final fallback with different name
fallback_index = f"security-{index_name.replace('logs-', '').replace('.', '-')}"
logger.info(
f"Final fallback with index name: {fallback_index}"
)
index_config = {
"mappings": repo_mapping,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
"mapping.total_fields.limit": "6000",
"index.default_pipeline": self.pipeline_name,
},
}
self.es.indices.create(
index=fallback_index, body=index_config
)
logger.info(
f"Created index with fallback name: {fallback_index}"
)
# Update index name for this fallback
if index_name == self.events_index:
self.events_index = fallback_index
else:
self.alerts_index = fallback_index
else:
raise index_error
except Exception as e:
logger.error(f"Error creating index/data stream {index_name}: {e}")
raise
def get_github_files(self) -> List[Dict[str, str]]:
"""Get list of files from GitHub repository Event Data directory including episodes"""
api_url = (
"https://api.github.com/repos/jamesspi/ohmymalware/contents/Event%20Data"
)
try:
response = requests.get(api_url)
response.raise_for_status()
items = response.json()
file_list = []
# Files to skip (not actual data)
skip_files = {"README.md", "mapping.json", ".gitignore"}
for item in items:
if item["type"] == "file" and item["name"] not in skip_files:
file_list.append(
{"name": item["name"], "path": item["name"], "episode": None}
)
elif item["type"] == "dir" and "Episode" in item["name"]:
# Handle episode directories
episode_name = item["name"]
episode_api_url = f"https://api.github.com/repos/jamesspi/ohmymalware/contents/Event%20Data/{episode_name}"
try:
episode_response = requests.get(episode_api_url)
episode_response.raise_for_status()
episode_files = episode_response.json()
for episode_file in episode_files:
if (
episode_file["type"] == "file"
and episode_file["name"] not in skip_files
):
file_list.append(
{
"name": episode_file["name"],
"path": f"{episode_name}/{episode_file['name']}",
"episode": episode_name,
}
)
except Exception as e:
logger.warning(f"Could not access episode {episode_name}: {e}")
logger.info(
f"Found {len(file_list)} data files across episodes and main directory"
)
return file_list
except Exception as e:
logger.error(f"Error fetching file list: {e}")
return []
def download_file(self, file_path: str) -> str:
"""Download file content from GitHub"""
url = f"{self.base_url}/{file_path}"
try:
response = requests.get(url)
response.raise_for_status()
# Handle compressed files
if file_path.endswith(".gz"):
import gzip
content = gzip.decompress(response.content).decode("utf-8")
logger.info(f"Downloaded and decompressed: {file_path}")
else:
content = response.text
logger.info(f"Downloaded: {file_path}")
return content
except Exception as e:
logger.error(f"Error downloading {file_path}: {e}")
return None
def parse_file_content(
self, content: str, filename: str, episode: str = None
) -> List[Dict[str, Any]]:
"""Parse file content based on file type"""
documents = []
file_ext = filename.lower().split(".")[-1]
# Handle compressed files
if filename.endswith(".gz"):
file_ext = filename.lower().split(".")[-2] # Get extension before .gz
try:
if file_ext == "json":
data = json.loads(content)
if isinstance(data, list):
for item in data:
documents.append(
self.create_document(item, filename, "json", episode)
)
else:
documents.append(
self.create_document(data, filename, "json", episode)
)
elif file_ext == "ndjson":
# Handle newline-delimited JSON
for line_num, line in enumerate(content.strip().split("\n")):
if line.strip():
try:
data = json.loads(line.strip())
documents.append(
self.create_document(data, filename, "ndjson", episode)
)
except json.JSONDecodeError as e:
logger.warning(
f"Invalid JSON on line {line_num + 1} in {filename}: {e}"
)
elif file_ext == "csv":
from io import StringIO
csv_data = csv.DictReader(StringIO(content))
for row in csv_data:
documents.append(
self.create_document(dict(row), filename, "csv", episode)
)
elif file_ext in ["txt", "log"]:
lines = content.strip().split("\n")
for i, line in enumerate(lines):
if line.strip(): # Skip empty lines
documents.append(
self.create_document(
{"line_number": i + 1, "content": line},
filename,
"text",
episode,
)
)
else:
# Handle other formats as raw text
documents.append(
self.create_document({"content": content}, filename, "raw", episode)
)
except Exception as e:
logger.error(f"Error parsing {filename}: {e}")
# Fallback: treat as raw text
documents.append(
self.create_document(
{"content": content, "parse_error": str(e)},
filename,
"raw",
episode,
)
)
return documents
def create_document(
self, data: Dict[str, Any], filename: str, data_type: str, episode: str = None
) -> Dict[str, Any]:
"""Create standardized document for Elasticsearch following ohmymalware format"""
current_time = datetime.now().isoformat()
# Create raw_data with size limit to prevent immense term errors
try:
if isinstance(data, dict):
raw_data_str = json.dumps(data)
# Limit raw_data size to prevent immense term errors (Elasticsearch limit is 32766 bytes)
if len(raw_data_str.encode("utf-8")) > 32000:
raw_data_str = raw_data_str[:16000] + "... [truncated due to size]"
else:
raw_data_str = str(data)
if len(raw_data_str.encode("utf-8")) > 32000:
raw_data_str = raw_data_str[:16000] + "... [truncated due to size]"
except Exception as e:
raw_data_str = f"Error serializing data: {str(e)}"
doc = {
"@timestamp": current_time,
"event_type": data_type,
"source_file": filename,
"parsed_data": data,
"raw_data": raw_data_str,
"ingestion_timestamp": current_time,
}
# Add episode information if available
if episode:
doc["episode"] = episode
# Try to extract timestamp from data if available
timestamp_fields = [
"timestamp",
"time",
"datetime",
"date",
"@timestamp",
"event_time",
"TimeCreated",
]
for field in timestamp_fields:
if field in data and data[field]:
try:
# Try to parse various timestamp formats
if isinstance(data[field], str):
parsed_time = pd.to_datetime(data[field])
doc["@timestamp"] = parsed_time.isoformat()
break
except Exception as e:
logger.debug(f"Could not parse timestamp field {field}: {e}")
continue
# If the data already has proper structure, merge it with our document
if isinstance(data, dict):
# Don't overwrite our standardized fields and skip conflicting data stream fields
reserved_fields = {
"@timestamp",
"event_type",
"source_file",
"parsed_data",
"raw_data",
"ingestion_timestamp",
"episode",
"data_stream",
}
for key, value in data.items():
if key not in reserved_fields:
doc[key] = value
return doc
def determine_target_index(self, filename: str, data: Dict[str, Any] = None) -> str:
"""Determine whether document should go to events or alerts index"""
# Check filename patterns
if "alert" in filename.lower():
return self.alerts_index
# Check document content for alert indicators
if data and isinstance(data, dict):
# Look for alert-specific fields
alert_indicators = [
"kibana.alert",
"signal",
"alert",
"detection",
"rule.name",
"rule.id",
"rule.type",
]
for indicator in alert_indicators:
if indicator in data or any(
indicator in str(key) for key in data.keys()
):
return self.alerts_index
# Default to events index
return self.events_index
def bulk_index_documents(
self, documents: List[Dict[str, Any]], filename: str
) -> int:
"""Bulk index documents to Elasticsearch with detailed error logging"""
if not documents:
return 0
# Group documents by target index
events_docs = []
alerts_docs = []
for doc in documents:
target_index = self.determine_target_index(filename, doc)
if target_index == self.alerts_index:
alerts_docs.append(doc)
else:
events_docs.append(doc)
total_success = 0
# Index events
if events_docs:
success_count = self._bulk_index_to_target(events_docs, self.events_index)
total_success += success_count
logger.info(f"Indexed {success_count} documents to events index")
# Index alerts
if alerts_docs:
success_count = self._bulk_index_to_target(alerts_docs, self.alerts_index)
total_success += success_count
logger.info(f"Indexed {success_count} documents to alerts index")
return total_success
def _bulk_index_to_target(
self, documents: List[Dict[str, Any]], target_index: str
) -> int:
"""Helper method to bulk index documents to a specific target index"""
actions = []
for doc in documents:
action = {
"_op_type": "create", # Required for data streams
"_index": target_index,
"_source": doc,
}
actions.append(action)
try:
# Use bulk with detailed error reporting
response = helpers.bulk(
self.es,
actions,
chunk_size=500,
request_timeout=60,
raise_on_error=False,
raise_on_exception=False,
)
success_count = response[0]
failed_items = response[1]
if failed_items:
logger.error(
f"Failed to index {len(failed_items)} documents to {target_index}. First 3 errors:"
)
for i, failed_item in enumerate(failed_items[:3]):
# Extract the actual error from the response
if "create" in failed_item:
error_info = failed_item["create"].get("error", {})
error_type = error_info.get("type", "unknown")
error_reason = error_info.get("reason", "no reason provided")
logger.error(
f" Error {i+1}: Type: {error_type}, Reason: {error_reason}"
)
# Show document that failed if it helps
if "caused_by" in error_info:
caused_by = error_info["caused_by"]
logger.error(
f" Caused by: {caused_by.get('type', 'unknown')} - {caused_by.get('reason', 'no details')}"
)
else:
logger.error(f" Error {i+1}: {failed_item}")
if len(failed_items) > 3:
logger.error(f" ... and {len(failed_items) - 3} more errors")
return success_count
except Exception as e:
logger.error(f"Error during bulk indexing to {target_index}: {e}")
# Try individual indexing to get better error messages
if len(documents) <= 5: # Only for small batches
logger.info(
"Trying individual document indexing for better error messages..."
)
success_count = 0
for i, doc in enumerate(documents):
try:
result = self.es.create(
index=target_index, body=doc
) # Use create instead of index
success_count += 1
except Exception as doc_error:
logger.error(f"Document {i+1} failed: {doc_error}")
return success_count
return 0
def run_ingestion(self):
"""Main ingestion process"""
logger.info("Starting ohmymalware security data ingestion...")
# Create index
self.create_indices_and_pipeline()
# Get file list
files = self.get_github_files()
if not files:
logger.error("No files found to process")
return
total_documents = 0
# Process each file
for file_info in files:
filename = file_info["name"]
file_path = file_info["path"]
episode = file_info["episode"]
logger.info(
f"Processing: {file_path}"
+ (f" (Episode: {episode})" if episode else "")
)
# Download file
content = self.download_file(file_path)
if not content:
continue
# Parse content
documents = self.parse_file_content(content, filename, episode)
# Index documents
indexed_count = self.bulk_index_documents(documents, filename)
total_documents += indexed_count
logger.info(f"Processed {file_path}: {indexed_count} documents indexed")
logger.info(f"Ingestion complete! Total documents indexed: {total_documents}")
logger.info(
f"Data ingested into: Events: {self.events_index}, Alerts: {self.alerts_index}"
)
logger.info(
"Note: Conflicting 'data_stream' fields were filtered out to avoid mapping conflicts"
)
# Refresh indices
for index in [self.events_index, self.alerts_index]:
try:
self.es.indices.refresh(index=index)
logger.info(f"Index/Data stream refreshed: {index}")
except Exception as e:
logger.warning(f"Could not refresh index {index}: {e}")
logger.info("Indices refreshed and ready for use")
def main():
"""Main function - configure your Elasticsearch connection here"""
# Configuration - MODIFY THESE VALUES
ES_HOST = "localhost" # Your Elasticsearch host
ES_PORT = 9200 # Your Elasticsearch port
ES_USER = "elastic" # Your Elasticsearch username (if auth required)
ES_PASSWORD = "changeme" # Your Elasticsearch password (if auth required)
try:
ingester = SecurityDataIngester(
es_host=ES_HOST, es_port=ES_PORT, es_user=ES_USER, es_password=ES_PASSWORD
)
# Test connection
if not ingester.es.ping():
logger.error("Cannot connect to Elasticsearch")
return
logger.info("Connected to Elasticsearch successfully")
# Check for conflicting templates
try:
templates = ingester.es.indices.get_index_template()
logs_templates = [
t
for t in templates.get("index_templates", [])
if any(
"logs" in pattern
for pattern in t.get("index_template", {}).get("index_patterns", [])
)
]
if logs_templates:
logger.info(
f"Found {len(logs_templates)} index templates that might affect logs-* indices"
)
except Exception as e:
logger.warning(f"Could not check index templates: {e}")
# Run ingestion
ingester.run_ingestion()
# Final summary
logger.info("=" * 60)
logger.info("INGESTION COMPLETE - READY FOR ELASTIC SECURITY")
logger.info("=" * 60)
logger.info(f"Events Index: {ingester.events_index}")
logger.info(f"Alerts Index: {ingester.alerts_index}")
logger.info(f"Pipeline: {ingester.pipeline_name}")
logger.info("The data is now compatible with Elastic Security rules!")
logger.info('In Kibana, filter data with: _index:"logs-endpoint.*"')
logger.info("=" * 60)
except Exception as e:
logger.error(f"Fatal error: {e}")
if __name__ == "__main__":
main()