-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparse_trace.py
More file actions
356 lines (291 loc) · 13.7 KB
/
Copy pathparse_trace.py
File metadata and controls
356 lines (291 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
import argparse
import struct
from dataclasses import dataclass
from typing import Optional
from pdbparse.symlookup import Lookup
import bisect
import download_pdb
import json
import os
_func_map = []
_func_starts = []
def get_mod_containing(address):
"""
Return the (function_start_addr, function_end_addr, unwind_info_addr, pdata_ordinal)
tuple that contains the given address, or None if not found.
"""
if modules is not None:
for mid in modules:
mod = modules[mid]
if mod['base'] <= address < mod['base']+mod['size']:
return mid
return None
def get_function_containing(address):
"""
Binary search for the function that contains `address`.
Assumes `build_func_index()` was called once after func_map is loaded.
"""
i = bisect.bisect_right(_func_starts, address) - 1
if i >= 0:
entry = _func_map[i]
if address < entry['function_end_addr']:
return entry
return None
def build_func_index(func_map):
global _func_map, _func_starts
_func_map = sorted(func_map, key=lambda e: e['function_start_addr'])
_func_starts = [e['function_start_addr'] for e in _func_map]
def get_name_of_function(address):
if address in functions:
func = functions[address]
if func['unlisted'] and len(func['thunk_jumps']) == 1:
thunk_jump = func['thunk_jumps'][0]
return func['function_id'] + f" continues to ({thunk_jump}) " + get_name_of_function(thunk_jump)
return func['function_id']
in_func = get_function_containing(address)
if in_func is not None:
return "call to inside of: " + in_func['function_id']
mod_name = get_mod_containing(address)
if mod_name is None:
return "unknown module"
if mod_name in module_lookup:
if isinstance(module_lookup[mod_name], tuple):
#load pdb and download it if we dont have it
mod = modules[mod_name]
pdb_file = module_lookup[mod_name][0]
if ":\\windows\\" in mod['path'].lower():
download_pdb.get_pdb_from_microsoft(mod['path'])
if os.path.exists(pdb_file) and os.path.getsize(pdb_file) != 0:
try:
addrs_names = Lookup([module_lookup[mod_name]])
module_lookup[mod_name] = dict(next(iter(addrs_names.addrs.values()))['addrs'])
except:
print("could not parse pdb for: ", mod_name)
module_lookup[mod_name] = {}
if not isinstance(module_lookup[mod_name], tuple):
mod_lookup = module_lookup[mod_name]
if address in mod_lookup:
return mod_name + "!" + undecorate_nice(mod_lookup[address])
return "unknown function in: " + mod_name
import ctypes
# Load the dbghelp.dll
dbghelp = ctypes.WinDLL("Dbghelp.dll")
UnDecorateSymbolName = dbghelp.UnDecorateSymbolName
UnDecorateSymbolName.argtypes = [
ctypes.c_char_p, # Pointer to the decorated symbol (input)
ctypes.c_char_p, # Pointer to the buffer for the undecorated name (output)
ctypes.c_uint, # Size of the output buffer
ctypes.c_uint # Flags controlling the undecoration (0 for default behavior)
]
UnDecorateSymbolName.restype = ctypes.c_uint
def undecorate_symbol(mangled_name, flags=0):
# Create a buffer for the undecorated name (adjust size if necessary)
buffer = ctypes.create_string_buffer(2024)
# Call the function
result = UnDecorateSymbolName(mangled_name.encode('utf-8'), buffer, ctypes.sizeof(buffer), flags)
if result:
return buffer.value.decode('utf-8')
else:
return None
def undecorate_nice(decorated, remove_arguments = True):
func_name = decorated #decorated.split("!", 1)
#func_name_preample = func_name.pop(0)
#func_name = func_name.pop(0)
#Sometimes there is a + with some extra garbage at the end that need to be removed
func_name = func_name.split('+').pop(0)
undeced = undecorate_symbol(func_name)
#if present Remove "public: virtual long __cdecl "
pts = undeced.split(" __cdecl ")
ret = pts[-1]
if remove_arguments:
ret = ret.split("(")[0].split("_<")[0]
return ret
@dataclass
class TracePoint:
i: int
trace_type: int
thread_id: int
function_ordinal: int
timestamp: int
return_address_pointer: Optional[int] = None
return_address: Optional[int] = None
call_num: Optional[int] = None
target_address: Optional[int] = None
enter_type: Optional[str] = None
type_str: Optional[str] = None
matching_enter: Optional[int] = None
def print_traces(traces, stack_offset, contains_exits):
stack_height = stack_offset
for trace in traces:
func_addr = None
if trace.type_str == "enter":
if trace.function_ordinal in ordinal2addr:
func_addr = ordinal2addr[trace.function_ordinal]
elif trace.type_str == "call":
func_addr = trace.target_address
if trace.type_str == 'exit' or trace.type_str == 'called':
stack_height -= 1
typ = trace.type_str
if contains_exits:
if trace.type_str == "enter" and trace.enter_type is None:
typ = 'jump'
if trace.type_str == "exit" and trace.matching_enter == None:
typ = 'un_matched_exit'
name = None
if module_lookup is not None and func_addr is not None:
name = get_name_of_function(func_addr)
print(f"{trace.i:03d}", " "*stack_height, typ, "func:", trace.function_ordinal, "tid:", trace.thread_id, 'ptr:', trace.return_address_pointer, 'callnum:', trace.call_num, trace.enter_type, trace.matching_enter, trace.return_address, trace.target_address, name)
if trace.enter_type == "enter" or trace.type_str == 'call':
stack_height += 1
modules = None
module_lookup = {}
func_map = []
functions = {}
ordinal2addr = {}
def show_top_counts(counts):
top_30 = sorted(counts.items(), key=lambda x: x[1], reverse=True)[:30]
for key, val in top_30:
print(key, val)
def add_names_to_calls():
global functions
changed = False
for func in func_map:
for call in func['calls']:
if 'target_name' not in call and 'target' in call:
call['target_name'] = get_name_of_function(call['target'])
changed = True
return changed
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='shows binary call trace files')
parser.add_argument('--file', type=str, required=False, help='Binary trace file')
parser.add_argument('--modules', type=str, required=False, help='File with modules')
parser.add_argument('--map', type=str, required=False, help='File with modules')
parser.add_argument('--lookup', type=int, required=False, help='Address to lookup')
parser.add_argument("--count", action="store_true", help="Count traces")
parser.add_argument("--supplement_map", action="store_true", help="Adds info to map")
args = parser.parse_args()
if args.modules is not None:
with open(args.modules, "r") as f:
modules = json.load(f)
for mid in modules:
mod = modules[mid]
pdb_file = "pdbs\\" + download_pdb.get_pdb_name(mod['path'])
module_lookup[mid] = (pdb_file, mod['base'])
if args.map is not None:
with open(args.map, "r") as f:
func_map = json.load(f)
for func in func_map:
functions[func['function_start_addr']] = func
ordinal2addr[func['ordinal']] = func['function_start_addr']
build_func_index(func_map)
if args.supplement_map:
changed = add_names_to_calls()
if changed:
with open(args.map, "w") as f:
json.dump(func_map, f, indent=2)
else:
args.supplement_map = False #you need a map to supplement
if args.lookup is not None:
name_of_address = get_name_of_function(args.lookup)
print(name_of_address)
exit()
if args.file is None:
exit()
#print(get_mod_containing(140695557676265))
#exit()
i=0
contains_exits = False
call_stacks = {}
counts = {}
return_stack = []
un_matched_exits = []
traces = []
with open(args.file, "rb") as f:
while dat_type := f.read(17):
trace_type, thread_id, function_ordinal, timestamp = struct.unpack("<BIIQ", dat_type)
trace = TracePoint(i, trace_type, thread_id, function_ordinal, timestamp)
if thread_id not in call_stacks:
call_stacks[thread_id] = []
extra = ""
func_addr = None
lift_stack = False
prt_str = ""
if trace.trace_type == 1:# type 1 is function enter
trace.return_address_pointer, trace.return_address = struct.unpack("<QQ", f.read(16))
trace.type_str = "enter"
if function_ordinal in ordinal2addr:
func_addr = ordinal2addr[function_ordinal]
if trace.trace_type == 2:# type 2 is function exit
trace.type_str = "exit"
contains_exits = True
trace.return_address_pointer, = struct.unpack("<Q", f.read(8))
trace.enter_type == "exit"
found_matching_enter = False
for trc in reversed(traces):
if trc.thread_id == trace.thread_id and trc.function_ordinal == trace.function_ordinal:
if trc.trace_type == 1 and trc.return_address_pointer == trace.return_address_pointer and trc.enter_type is None:
trc.enter_type = 'enter'
trace.matching_enter = trc.i
found_matching_enter = True
break
if not found_matching_enter:
un_matched_exits.append(trace.i)# This may happen if the program craches inside some function before it returns (i think)
if trace.trace_type == 3:# type 3 is function call
trace.type_str = "call"
lift_stack = True
trace.call_num, trace.target_address = struct.unpack("<IQ", f.read(12))
func_addr = trace.target_address
if args.supplement_map:
call = func_map[function_ordinal]['calls'][trace.call_num]
if 'target' in call and call['target'] != trace.target_address:
print("call target in trace does not mached target in map")
call['target'] = trace.target_address
if trace.trace_type == 4:# type 4 is function called
if len(call_stacks[thread_id]) != 0:
call_stacks[thread_id].pop()
trace.type_str = "called"
trace.call_num, = struct.unpack("<I", f.read(4))
traces.append(trace)
stack_height = len(call_stacks[thread_id])
name = None
if module_lookup is not None and func_addr is not None:
name = get_name_of_function(func_addr)
if not args.count:
if True:
print(f"{trace.i:03d}", " "*stack_height, trace.type_str, "func:", trace.function_ordinal, "tid:", trace.thread_id, 'time:', trace.timestamp, 'callnum:', trace.call_num, trace.enter_type, trace.matching_enter, trace.return_address, trace.target_address, name)
else:
if trace.i % 1000000 == 0 and trace.i != 0:
pos = f.tell() # bytes read so far
mb = pos / (1024 * 1024)
print("--- processing ("+str(trace.i)+", "+f"{mb:.2f} MB"+")---")
elif trace.trace_type == 1:
if trace.function_ordinal not in counts:
counts[trace.function_ordinal] = 0
counts[trace.function_ordinal] += 1
if trace.i % 1000000 == 0 and trace.i != 0:
pos = f.tell() # bytes read so far
mb = pos / (1024 * 1024)
print("--- top counts at ("+str(trace.i)+", "+f"{mb:.2f} MB"+")---")
show_top_counts(counts)
sorted_count = sorted(counts.items(), key=lambda x: x[1], reverse=True)
sorted_count = {k: v for k, v in sorted_count}
if trace.i % 5000000 == 0 and trace.i != 0:
with open("output\count.json", "w") as wf:
json.dump(sorted_count, wf, indent=2)
if lift_stack:
call_stacks[thread_id].append(function_ordinal)
i+=1
if args.supplement_map:
changed = add_names_to_calls()
if changed:
with open(args.map, "w") as f:
json.dump(func_map, f, indent=2)
if args.count:
with open("output\count.json", "w") as wf:
sorted_count = sorted(counts.items(), key=lambda x: x[1], reverse=True)
sorted_count = {k: v for k, v in sorted_count}
json.dump(sorted_count, wf, indent=2)
print("\n")
if contains_exits:
print("printing in hindsight allows you to acount for enter and exits effect on the call stack")
print_traces(traces, len(un_matched_exits), contains_exits)