-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpcap_debug.py
More file actions
138 lines (111 loc) Β· 5.38 KB
/
pcap_debug.py
File metadata and controls
138 lines (111 loc) Β· 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
"""
PCAP validation and debugging script
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
import pyshark
from src.capture import read_pcap, extract_meta
def analyze_pcap(pcap_path: str):
"""Analyze a PCAP file to understand its contents"""
print(f"π Analyzing PCAP file: {pcap_path}")
print("=" * 60)
if not os.path.exists(pcap_path):
print(f"β File not found: {pcap_path}")
return
# First, let's see what's actually in the PCAP
try:
print("π General packet analysis...")
cap = pyshark.FileCapture(pcap_path)
total_packets = 0
protocols = {}
ports = {}
for i, pkt in enumerate(cap):
if i >= 1000: # Limit to first 1000 packets
break
total_packets += 1
# Count protocols
if hasattr(pkt, 'highest_layer'):
protocol = pkt.highest_layer
protocols[protocol] = protocols.get(protocol, 0) + 1
# Count ports
if hasattr(pkt, 'tcp'):
port = f"TCP:{pkt.tcp.dstport}"
ports[port] = ports.get(port, 0) + 1
elif hasattr(pkt, 'udp'):
port = f"UDP:{pkt.udp.dstport}"
ports[port] = ports.get(port, 0) + 1
cap.close()
print(f"Total packets analyzed: {total_packets}")
print(f"\nTop protocols found:")
for proto, count in sorted(protocols.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" {proto}: {count}")
print(f"\nTop destination ports:")
for port, count in sorted(ports.items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" {port}: {count}")
except Exception as e:
print(f"β Error analyzing PCAP: {e}")
return
# Now test VoIP extraction
print(f"\nπ― VoIP packet extraction test...")
try:
voip_packets = list(read_pcap(pcap_path, limit=100))
print(f"VoIP packets found: {len(voip_packets)}")
if voip_packets:
print("\nFirst few VoIP packets:")
for i, pkt in enumerate(voip_packets[:5]):
print(f" Packet {i+1}: {pkt['proto']} - {pkt['src_ip']}:{pkt.get('src_port', 'N/A')} -> {pkt['dst_ip']}:{pkt.get('dst_port', 'N/A')}")
if pkt.get('call_id'):
print(f" Call ID: {pkt['call_id']}")
else:
print("β No VoIP packets detected with current filters")
# Try alternative detection
print("\nπ Trying alternative detection...")
try:
# Look for specific VoIP indicators
cap = pyshark.FileCapture(pcap_path)
found_voip_like = []
for i, pkt in enumerate(cap):
if i >= 500: # Check first 500 packets
break
voip_indicators = []
# Check for SIP-like content
if hasattr(pkt, 'udp') or hasattr(pkt, 'tcp'):
if hasattr(pkt, 'udp'):
if int(pkt.udp.dstport) == 5060 or int(pkt.udp.srcport) == 5060:
voip_indicators.append("SIP_PORT")
if hasattr(pkt, 'tcp'):
if int(pkt.tcp.dstport) == 5060 or int(pkt.tcp.srcport) == 5060:
voip_indicators.append("SIP_PORT")
# Check for RTP-like patterns (UDP, even payload type, etc.)
if hasattr(pkt, 'udp'):
port = int(pkt.udp.dstport)
if 8000 <= port <= 65000 and port % 2 == 0: # Common RTP port range
voip_indicators.append("RTP_LIKE_PORT")
if voip_indicators:
found_voip_like.append({
'packet_num': i,
'indicators': voip_indicators,
'src': f"{pkt.ip.src if hasattr(pkt, 'ip') else 'N/A'}",
'dst': f"{pkt.ip.dst if hasattr(pkt, 'ip') else 'N/A'}",
'protocol': pkt.highest_layer if hasattr(pkt, 'highest_layer') else 'Unknown'
})
cap.close()
if found_voip_like:
print(f"Found {len(found_voip_like)} packets with VoIP-like characteristics:")
for pkt in found_voip_like[:10]:
print(f" Packet {pkt['packet_num']}: {pkt['src']} -> {pkt['dst']} ({pkt['protocol']}) - {', '.join(pkt['indicators'])}")
else:
print("β No VoIP-like patterns found")
except Exception as e:
print(f"β Alternative detection failed: {e}")
except Exception as e:
print(f"β VoIP extraction failed: {e}")
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python pcap_debug.py <path_to_pcap_file>")
print("Example: python pcap_debug.py sample.pcap")
sys.exit(1)
pcap_path = sys.argv[1]
analyze_pcap(pcap_path)