-
Notifications
You must be signed in to change notification settings - Fork 59
Add SSL log validator script #296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
gilles-peskine-arm
wants to merge
7
commits into
Mbed-TLS:main
Choose a base branch
from
gilles-peskine-arm:ssl_fork_server-rng-test-framework
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
98c1274
New script to cross-validate SSL client and server logs
gilles-peskine-arm b0c3668
Also parse bignum value dumps
gilles-peskine-arm f911af9
New validation: distinct_server_ephemeral, distinct_server_random
gilles-peskine-arm dd68332
Fix spurious match of data in DUMP_CHUNK_RE
gilles-peskine-arm e42959c
Document validation task return values
gilles-peskine-arm ebd9ff1
distinct_server_ephemeral: validation failure rather than exception i…
gilles-peskine-arm 37e13da
Fix --list-tasks parsing
gilles-peskine-arm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| """Parse logs from ssl_client2 and ssl_server2.""" | ||
|
|
||
| # Copyright The Mbed TLS Contributors | ||
| # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later | ||
|
|
||
| import re | ||
| from typing import Dict, Iterator, List, Pattern, Tuple | ||
|
|
||
|
|
||
| class Info: | ||
| """Information gathered from a log of ssl_client2 or ssl_server2.""" | ||
|
|
||
| PREFIX_RE_S = (r'(?P<filename>\S+):(?P<lineno>[0-9]+): +' + | ||
| r'\|(?P<level>[0-9]+)\|(?: (?P<address>\w+):)? +') | ||
| DUMPING_RE = re.compile( | ||
| PREFIX_RE_S + | ||
| r'dumping \'(?P<what>.*?)\' \((?P<length>[0-9]+) bytes\)') | ||
| DUMP_CHUNK_RE = re.compile( | ||
| PREFIX_RE_S + | ||
| r'[0-9a-f]+: *(?P<data>(?: [0-9a-f]{2}){1,16})') | ||
| VALUE_OF_RE = re.compile( | ||
| PREFIX_RE_S + | ||
| r'value of \'(?P<what>.*?)\' \((?P<length>[0-9]+) bits\)') | ||
| VALUE_CHUNK_RE = re.compile( | ||
| PREFIX_RE_S + | ||
| r'(?P<data>(?:[0-9a-f]{2}(?:$| )){1,16})') | ||
|
|
||
| def __init__(self) -> None: | ||
| """Create an empty log info object.""" | ||
| self.dumps = {} #type: Dict[str, List[str]] | ||
|
|
||
| def add_dump(self, name: str, hex_data: str) -> None: | ||
| """Add a hex dump.""" | ||
| self.dumps.setdefault(name, []).append(hex_data) | ||
|
|
||
| @staticmethod | ||
| def read_dump(filename: str, | ||
| lines: Iterator[Tuple[int, str]], | ||
| length: int, chunk_re: Pattern) -> str: | ||
| """Read a hex dump. Return the hex data. | ||
|
|
||
| This method consumes the data dump lines from lines. | ||
| """ | ||
| acc = '' | ||
| remaining = length | ||
| while remaining > 0: | ||
| lineno, line = next(lines) | ||
| m = chunk_re.match(line) | ||
| if not m: | ||
| raise Exception(f'{filename}:{lineno}: not a dump chunk as expected') | ||
| acc += m.group('data') | ||
| remaining -= 16 | ||
| plain = acc.replace(' ', '') | ||
| if len(plain) != length * 2: | ||
| raise Exception(f'{filename}:{lineno}: ' | ||
| f'found {len(plain)} hex digits but expected {length * 2}') | ||
| return plain | ||
|
|
||
| def read_file_contents(self, | ||
| filename: str, | ||
| lines: Iterator[Tuple[int, str]]) -> None: | ||
| """Read lines from a log file and store the information we find. | ||
|
|
||
| The iterator lines delivers a stream of lines with their line numbers. | ||
| """ | ||
| for _lineno, line in lines: | ||
| m = self.DUMPING_RE.match(line) | ||
| if m: | ||
| what = m.group('what') | ||
| hex_data = self.read_dump(filename, lines, | ||
| int(m.group('length')), | ||
| self.DUMP_CHUNK_RE) | ||
| self.add_dump(what, hex_data) | ||
| m = self.VALUE_OF_RE.match(line) | ||
| if m: | ||
| what = m.group('what') | ||
| n_bits = int(m.group('length')) | ||
| n_bytes = (n_bits + 7) // 8 | ||
| hex_data = self.read_dump(filename, lines, | ||
| n_bytes, self.VALUE_CHUNK_RE) | ||
| self.add_dump(what, hex_data) | ||
|
|
||
| def read_file(self, filename: str) -> None: | ||
| """Read a log file and store the information we find.""" | ||
| with open(filename) as input_: | ||
| self.read_file_contents(filename, enumerate(input_, 1)) | ||
|
|
||
|
|
||
| def parse_log_file(filename: str) -> Info: | ||
| """Parse a log of ssl_client2 or ssl_server2.""" | ||
| info = Info() | ||
| info.read_file(filename) | ||
| return info |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| #!/usr/bin/env python3 | ||
| """Validate logs from ssl_client2 and ssl_server2. | ||
|
|
||
| On success, print nothing and return 0. | ||
| On a validation failure, print a short error message and return 1. | ||
| On a command line or parse error, die with an exception and return 1. | ||
| """ | ||
|
|
||
| # Copyright The Mbed TLS Contributors | ||
| # SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later | ||
|
|
||
| import argparse | ||
| import sys | ||
| from typing import Callable, Dict, List, Optional | ||
|
|
||
| from mbedtls_framework import ssl_log_parser | ||
|
|
||
|
|
||
|
|
||
| # None: validation succeeded. | ||
| # str: validation failed; the string is a human-readable explanation of the failure. | ||
| Validation = Optional[str] | ||
|
|
||
| def match_random(client_log: ssl_log_parser.Info, | ||
| server_log: ssl_log_parser.Info) -> Validation: | ||
| """Check that both sides have the same idea of client_random and server_random.""" | ||
| client_client_randoms = client_log.dumps['client hello, random bytes'] | ||
| client_server_randoms = client_log.dumps['server hello, random bytes'] | ||
| server_client_randoms = server_log.dumps['client hello, random bytes'] | ||
| server_server_randoms = server_log.dumps['server hello, random bytes'] | ||
| if len(client_client_randoms) != len(server_client_randoms): | ||
| return ('Client and server disagree on the number of client_random ' + | ||
| f'({len(client_client_randoms)} != {len(server_client_randoms)})') | ||
| if len(client_server_randoms) != len(server_server_randoms): | ||
| return ('Client and server disagree on the number of server_random ' + | ||
| f'({len(client_server_randoms)} != {len(server_server_randoms)})') | ||
| for n, (c, s) in enumerate(zip(client_client_randoms, server_client_randoms)): | ||
| if c != s: | ||
| return f'Client and server disagree on client random #{n}' | ||
| for n, (c, s) in enumerate(zip(client_server_randoms, server_server_randoms)): | ||
| if c != s: | ||
| return f'Client and server disagree on server random #{n}' | ||
| return None | ||
|
|
||
|
|
||
| def distinct_server_ephemeral(client_log: ssl_log_parser.Info, | ||
| _server_log: ssl_log_parser.Info) -> Validation: | ||
| """Check that server ephemeral keys as seen from the client are not repeated.""" | ||
| # The current implementation does not handle cases where the client | ||
| # receives and discards a legitimate resend of the ServerKeyExchange | ||
| # message in DTLS. | ||
| if 'DHM: GY' in client_log.dumps: | ||
| values = client_log.dumps['DHM: GY'] | ||
| elif 'server ephemeral public key' in client_log.dumps: | ||
| values = client_log.dumps['server ephemeral public key'] | ||
| else: | ||
| return 'Ephemeral public keys not found in client log' | ||
| if len(values) < 2: | ||
| return 'Fewer than two server ephemeral public keys found' | ||
| seen = {} #type: Dict[str, int] | ||
| for n, v in enumerate(values): | ||
| if v in seen: | ||
| return f'server ephemeral public key #{n} repeats #{seen[v]}' | ||
| seen[v] = n | ||
| return None | ||
|
|
||
| def distinct_server_random(client_log: ssl_log_parser.Info, | ||
| _server_log: ssl_log_parser.Info) -> Validation: | ||
| """Check that server randoms as seen from the client are not repeated.""" | ||
| # The current implementation does not handle cases where the client | ||
| # receives and discards a legitimate resend of the server hello in DTLS. | ||
| values = client_log.dumps['server hello, random bytes'] | ||
| if len(values) < 2: | ||
| return 'Fewer than two server_random found' | ||
| def random_part(hex_data: str) -> str: | ||
| # In TLS <=1.2, the first 4 bytes (8 hex digits) are the time, | ||
| # and may differ even if the actually random part is repeated. | ||
| # The last 8 bytes (16 hex digits) are not random in TLS 1.2 when | ||
| # the server also supports 1.3 (they are forced to b'DOWNGR\001'). | ||
| return hex_data[8:48] | ||
|
bjwtaylor marked this conversation as resolved.
|
||
| seen = {} #type: Dict[str, int] | ||
| for n, v in enumerate(values): | ||
| r = random_part(v) | ||
| if r in seen: | ||
| return f'server_random #{n} repeats #{seen[r]}' | ||
| seen[r] = n | ||
| return None | ||
|
|
||
|
|
||
| Task = Callable[[ssl_log_parser.Info, ssl_log_parser.Info], Validation] | ||
|
|
||
| TASKS = { | ||
| 'distinct_server_ephemeral': distinct_server_ephemeral, | ||
| 'distinct_server_random': distinct_server_random, | ||
| 'match_random': match_random, | ||
| } #type: Dict[str, Task] | ||
|
|
||
| def validate(client_log: ssl_log_parser.Info, | ||
| server_log: ssl_log_parser.Info, | ||
| tasks: List[str]) -> Validation: | ||
| """Perform validation tasks on a pair of matching logs. | ||
|
|
||
| Return None if the validation succeeds, a human-oriented error message | ||
| otherwise. | ||
| """ | ||
| for task_name in tasks: | ||
| task = TASKS[task_name] | ||
| outcome = task(client_log, server_log) | ||
| if outcome is not None: | ||
| return outcome | ||
| return None | ||
|
|
||
|
|
||
| def main() -> int: | ||
| """Command line entry point.""" | ||
| parser = argparse.ArgumentParser(description=__doc__) | ||
| parser.add_argument('--list-tasks', | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you need to add action='store_true', action='store_false', or nargs=0, argparse to avoid the error "error: argument --list-tasks: expected one argument" here? |
||
| action='store_true', | ||
| help='List available tasks and exit') | ||
| parser.add_argument('client_log', metavar='CLIENT_LOG_FILE', | ||
| nargs='?', | ||
| help='Client log file ($CLI_OUT or ?-cli-*.log)') | ||
| parser.add_argument('server_log', metavar='SERVER_LOG_FILE', | ||
| nargs='?', | ||
| help='Server log file ($SRV_OUT or ?-srv-*.log)') | ||
| parser.add_argument('tasks', metavar='TASK', | ||
| nargs='*', | ||
| help='Tasks to perform (use --list-tasks to see supported task names)') | ||
| args = parser.parse_args() | ||
|
|
||
| if args.list_tasks: | ||
| for task_name in sorted(TASKS.keys()): | ||
| print(task_name) | ||
| return 0 | ||
|
|
||
| if args.client_log is None or args.server_log is None: | ||
| parser.error('the following arguments are required: CLIENT_LOG_FILE SERVER_LOG_FILE') | ||
| if not args.tasks: | ||
| parser.error('at least one TASK is required') | ||
|
|
||
| client_log = ssl_log_parser.parse_log_file(args.client_log) | ||
| server_log = ssl_log_parser.parse_log_file(args.server_log) | ||
| outcome = validate(client_log, server_log, args.tasks) | ||
| if outcome is None: | ||
| return 0 | ||
| else: | ||
| if outcome and outcome[-1] != '\n': | ||
| outcome += '\n' | ||
| sys.stderr.write(outcome) | ||
| return 1 | ||
|
|
||
| if __name__ == '__main__': | ||
| sys.exit(main()) | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I've understood correctly I believe this will abort if the lookup fails as no matches are present, is this the desired behavior or should it return a validation failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I'll return a more informative and non-fatal validation failure.