-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
148 lines (118 loc) · 5.13 KB
/
main.py
File metadata and controls
148 lines (118 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
"""
ChainBreaker - Advanced Multi-Agent Penetration Testing Framework
Main entry point for the penetration testing workflow
"""
import argparse
import json
import logging
from datetime import datetime
from typing import Dict, Any
from agents.supervisor import SupervisorAgent
from utils.graph_builder import AttackGraphBuilder
from utils.session_manager import SessionManager
from config import get_config
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ChainBreaker:
"""Main orchestrator for the penetration testing framework"""
def __init__(self, config_path: str = None):
"""Initialize the framework with configuration"""
self.config = get_config(config_path)
self.session_manager = SessionManager()
self.graph_builder = AttackGraphBuilder()
self.supervisor = SupervisorAgent(self.config.settings)
def _load_config(self, path: str) -> Dict[str, Any]:
"""Load configuration from JSON file"""
try:
with open(path, 'r') as f:
return json.load(f)
except FileNotFoundError:
logger.warning(f"Config file {path} not found, using defaults")
return self._get_default_config()
def _get_default_config(self) -> Dict[str, Any]:
"""Return default configuration"""
return {
"model": self.config.get("models.default_model", "gpt-4"),
"max_iterations": self.config.get("execution.max_llm_calls", 50),
"timeout": self.config.get("network.timeout", 300),
"target": {
"ip": None,
"ports": [],
"services": []
}
}
def execute(self, target_ip: str, objective: str) -> Dict[str, Any]:
"""
Execute the penetration testing workflow
Args:
target_ip: Target IP address
objective: Penetration testing objective
Returns:
Dictionary containing results and attack graph
"""
logger.info(f"Starting ChainBreaker against {target_ip}")
logger.info(f"Objective: {objective}")
# Initialize attack graph
attack_graph = self.graph_builder.create_initial_graph(target_ip, objective)
# Create initial state
state = {
"target_ip": target_ip,
"objective": objective,
"attack_graph": attack_graph,
"sessions": [],
"vulnerabilities": [],
"exploits_attempted": [],
"current_privileges": "none"
}
# Run supervisor agent to coordinate the attack
result = self.supervisor.run(state)
# Generate report
report = self._generate_report(result)
logger.info("ChainBreaker execution completed")
return report
def _generate_report(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""Generate final penetration testing report"""
return {
"timestamp": datetime.now().isoformat(),
"target": result.get("target_ip"),
"objective_achieved": result.get("objective_achieved", False),
"attack_path": result.get("attack_graph", {}).get("path", []),
"vulnerabilities_found": result.get("vulnerabilities", []),
"sessions_established": len(result.get("sessions", [])),
"recommendations": self._generate_recommendations(result)
}
def _generate_recommendations(self, result: Dict[str, Any]) -> list:
"""Generate security recommendations based on findings"""
recommendations = []
if result.get("vulnerabilities"):
recommendations.append("Patch identified vulnerabilities immediately")
if "weak_credentials" in str(result):
recommendations.append("Implement strong password policies")
if "outdated_services" in str(result):
recommendations.append("Update all services to latest versions")
return recommendations
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(description="ChainBreaker - Multi-Agent Penetration Testing")
parser.add_argument("--target", "-t", required=True, help="Target IP address")
parser.add_argument("--objective", "-o", default="full_compromise",
help="Penetration testing objective")
parser.add_argument("--config", "-c", default="config.json",
help="Configuration file path")
parser.add_argument("--output", default="report.json",
help="Output report file")
args = parser.parse_args()
# Initialize and run ChainBreaker
chainbreaker = ChainBreaker(args.config)
result = chainbreaker.execute(args.target, args.objective)
# Save report
with open(args.output, 'w') as f:
json.dump(result, f, indent=2)
logger.info(f"Report saved to {args.output}")
if __name__ == "__main__":
main()