Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions scripts/mbedtls_framework/ssl_log_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Parse logs from ssl_client2 and ssl_server2."""

# Copyright The Mbed TLS Contributors
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later

import re
from typing import Dict, Iterator, List, Pattern, Tuple


class Info:
"""Information gathered from a log of ssl_client2 or ssl_server2."""

PREFIX_RE_S = (r'(?P<filename>\S+):(?P<lineno>[0-9]+): +' +
r'\|(?P<level>[0-9]+)\|(?: (?P<address>\w+):)? +')
DUMPING_RE = re.compile(
PREFIX_RE_S +
r'dumping \'(?P<what>.*?)\' \((?P<length>[0-9]+) bytes\)')
DUMP_CHUNK_RE = re.compile(
PREFIX_RE_S +
r'[0-9a-f]+: *(?P<data>(?: [0-9a-f]{2}){1,16})')
VALUE_OF_RE = re.compile(
PREFIX_RE_S +
r'value of \'(?P<what>.*?)\' \((?P<length>[0-9]+) bits\)')
VALUE_CHUNK_RE = re.compile(
PREFIX_RE_S +
r'(?P<data>(?:[0-9a-f]{2}(?:$| )){1,16})')

def __init__(self) -> None:
"""Create an empty log info object."""
self.dumps = {} #type: Dict[str, List[str]]

def add_dump(self, name: str, hex_data: str) -> None:
"""Add a hex dump."""
self.dumps.setdefault(name, []).append(hex_data)

@staticmethod
def read_dump(filename: str,
lines: Iterator[Tuple[int, str]],
length: int, chunk_re: Pattern) -> str:
"""Read a hex dump. Return the hex data.

This method consumes the data dump lines from lines.
"""
acc = ''
remaining = length
while remaining > 0:
lineno, line = next(lines)
m = chunk_re.match(line)
if not m:
raise Exception(f'{filename}:{lineno}: not a dump chunk as expected')
acc += m.group('data')
remaining -= 16
plain = acc.replace(' ', '')
if len(plain) != length * 2:
raise Exception(f'{filename}:{lineno}: '
f'found {len(plain)} hex digits but expected {length * 2}')
return plain

def read_file_contents(self,
filename: str,
lines: Iterator[Tuple[int, str]]) -> None:
"""Read lines from a log file and store the information we find.

The iterator lines delivers a stream of lines with their line numbers.
"""
for _lineno, line in lines:
m = self.DUMPING_RE.match(line)
if m:
what = m.group('what')
hex_data = self.read_dump(filename, lines,
int(m.group('length')),
self.DUMP_CHUNK_RE)
self.add_dump(what, hex_data)
m = self.VALUE_OF_RE.match(line)
if m:
what = m.group('what')
n_bits = int(m.group('length'))
n_bytes = (n_bits + 7) // 8
hex_data = self.read_dump(filename, lines,
n_bytes, self.VALUE_CHUNK_RE)
self.add_dump(what, hex_data)

def read_file(self, filename: str) -> None:
"""Read a log file and store the information we find."""
with open(filename) as input_:
self.read_file_contents(filename, enumerate(input_, 1))


def parse_log_file(filename: str) -> Info:
"""Parse a log of ssl_client2 or ssl_server2."""
info = Info()
info.read_file(filename)
return info
153 changes: 153 additions & 0 deletions scripts/validate_ssl_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""Validate logs from ssl_client2 and ssl_server2.

On success, print nothing and return 0.
On a validation failure, print a short error message and return 1.
On a command line or parse error, die with an exception and return 1.
"""

# Copyright The Mbed TLS Contributors
# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later

import argparse
import sys
from typing import Callable, Dict, List, Optional

from mbedtls_framework import ssl_log_parser



# None: validation succeeded.
# str: validation failed; the string is a human-readable explanation of the failure.
Validation = Optional[str]

def match_random(client_log: ssl_log_parser.Info,
server_log: ssl_log_parser.Info) -> Validation:
"""Check that both sides have the same idea of client_random and server_random."""
client_client_randoms = client_log.dumps['client hello, random bytes']
client_server_randoms = client_log.dumps['server hello, random bytes']
server_client_randoms = server_log.dumps['client hello, random bytes']
server_server_randoms = server_log.dumps['server hello, random bytes']
if len(client_client_randoms) != len(server_client_randoms):
return ('Client and server disagree on the number of client_random ' +
f'({len(client_client_randoms)} != {len(server_client_randoms)})')
if len(client_server_randoms) != len(server_server_randoms):
return ('Client and server disagree on the number of server_random ' +
f'({len(client_server_randoms)} != {len(server_server_randoms)})')
for n, (c, s) in enumerate(zip(client_client_randoms, server_client_randoms)):
if c != s:
return f'Client and server disagree on client random #{n}'
for n, (c, s) in enumerate(zip(client_server_randoms, server_server_randoms)):
if c != s:
return f'Client and server disagree on server random #{n}'
return None


def distinct_server_ephemeral(client_log: ssl_log_parser.Info,
_server_log: ssl_log_parser.Info) -> Validation:
"""Check that server ephemeral keys as seen from the client are not repeated."""
# The current implementation does not handle cases where the client
# receives and discards a legitimate resend of the ServerKeyExchange
# message in DTLS.
if 'DHM: GY' in client_log.dumps:
values = client_log.dumps['DHM: GY']
elif 'server ephemeral public key' in client_log.dumps:
values = client_log.dumps['server ephemeral public key']
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I've understood correctly I believe this will abort if the lookup fails as no matches are present, is this the desired behavior or should it return a validation failure?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'll return a more informative and non-fatal validation failure.

else:
return 'Ephemeral public keys not found in client log'
if len(values) < 2:
return 'Fewer than two server ephemeral public keys found'
seen = {} #type: Dict[str, int]
for n, v in enumerate(values):
if v in seen:
return f'server ephemeral public key #{n} repeats #{seen[v]}'
seen[v] = n
return None

def distinct_server_random(client_log: ssl_log_parser.Info,
_server_log: ssl_log_parser.Info) -> Validation:
"""Check that server randoms as seen from the client are not repeated."""
# The current implementation does not handle cases where the client
# receives and discards a legitimate resend of the server hello in DTLS.
values = client_log.dumps['server hello, random bytes']
if len(values) < 2:
return 'Fewer than two server_random found'
def random_part(hex_data: str) -> str:
# In TLS <=1.2, the first 4 bytes (8 hex digits) are the time,
# and may differ even if the actually random part is repeated.
# The last 8 bytes (16 hex digits) are not random in TLS 1.2 when
# the server also supports 1.3 (they are forced to b'DOWNGR\001').
return hex_data[8:48]
Comment thread
bjwtaylor marked this conversation as resolved.
seen = {} #type: Dict[str, int]
for n, v in enumerate(values):
r = random_part(v)
if r in seen:
return f'server_random #{n} repeats #{seen[r]}'
seen[r] = n
return None


Task = Callable[[ssl_log_parser.Info, ssl_log_parser.Info], Validation]

TASKS = {
'distinct_server_ephemeral': distinct_server_ephemeral,
'distinct_server_random': distinct_server_random,
'match_random': match_random,
} #type: Dict[str, Task]

def validate(client_log: ssl_log_parser.Info,
server_log: ssl_log_parser.Info,
tasks: List[str]) -> Validation:
"""Perform validation tasks on a pair of matching logs.

Return None if the validation succeeds, a human-oriented error message
otherwise.
"""
for task_name in tasks:
task = TASKS[task_name]
outcome = task(client_log, server_log)
if outcome is not None:
return outcome
return None


def main() -> int:
"""Command line entry point."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--list-tasks',
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to add action='store_true', action='store_false', or nargs=0, argparse to avoid the error "error: argument --list-tasks: expected one argument" here?

action='store_true',
help='List available tasks and exit')
parser.add_argument('client_log', metavar='CLIENT_LOG_FILE',
nargs='?',
help='Client log file ($CLI_OUT or ?-cli-*.log)')
parser.add_argument('server_log', metavar='SERVER_LOG_FILE',
nargs='?',
help='Server log file ($SRV_OUT or ?-srv-*.log)')
parser.add_argument('tasks', metavar='TASK',
nargs='*',
help='Tasks to perform (use --list-tasks to see supported task names)')
args = parser.parse_args()

if args.list_tasks:
for task_name in sorted(TASKS.keys()):
print(task_name)
return 0

if args.client_log is None or args.server_log is None:
parser.error('the following arguments are required: CLIENT_LOG_FILE SERVER_LOG_FILE')
if not args.tasks:
parser.error('at least one TASK is required')

client_log = ssl_log_parser.parse_log_file(args.client_log)
server_log = ssl_log_parser.parse_log_file(args.server_log)
outcome = validate(client_log, server_log, args.tasks)
if outcome is None:
return 0
else:
if outcome and outcome[-1] != '\n':
outcome += '\n'
sys.stderr.write(outcome)
return 1

if __name__ == '__main__':
sys.exit(main())