|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -from hashlib import md5, sha256 |
| 3 | +from hashlib import md5, new, sha256 |
4 | 4 | from hmac import compare_digest |
| 5 | +from time import time |
5 | 6 | from typing import Callable |
| 7 | +from uuid import uuid4 |
6 | 8 |
|
7 | 9 | from httoop.exceptions import InvalidHeader |
8 | 10 | from httoop.header.element import HeaderElement |
9 | 11 | from httoop.util import ByteUnicodeDict, _ |
10 | 12 |
|
11 | 13 |
|
12 | 14 | class DigestAuthScheme: |
13 | | - |
14 | 15 | algorithms = { |
15 | | - 'MD5': lambda val: md5(val).hexdigest().encode('ASCII'), # nosec |
16 | | - 'MD5-sess': lambda val: md5(val).hexdigest().encode('ASCII'), # nosec |
17 | | - 'SHA-256': lambda val: sha256(val).hexdigest().encode('ASCII'), |
18 | | - 'SHA-256-sess': lambda val: sha256(val).hexdigest().encode('ASCII'), |
19 | | - # 'SHA-512-256': lambda val: sha256(val).hexdigest().encode('ASCII'), TODO: ?? |
20 | | - # 'SHA-512-256-sess': lambda val: sha256(val).hexdigest().encode('ASCII'), TODO: ?? |
21 | | - } |
| 16 | + 'MD5': lambda: md5(), # noqa: S324 |
| 17 | + 'SHA-256': lambda: sha256(), |
| 18 | + 'SHA-512-256': lambda: new('sha512_256'), |
| 19 | + } # not case insensitive per RFC |
| 20 | + algorithms['MD5-sess'] = algorithms['MD5'] |
| 21 | + algorithms['SHA-256-sess'] = algorithms['SHA-256'] |
| 22 | + algorithms['SHA-512-256-sess'] = algorithms['SHA-512-256'] |
22 | 23 | qops = (b'auth', b'auth-int') # quality of protection |
23 | 24 |
|
24 | 25 | @classmethod |
25 | | - def get_algorithm(cls, algorithm: bytes | str) -> Callable: |
| 26 | + def get_algorithm(cls, algorithm: bytes | str) -> Callable[bytes, bytes]: |
26 | 27 | try: |
27 | | - return cls.algorithms[algorithm.decode('ASCII', 'ignore') if isinstance(algorithm, bytes) else algorithm] |
| 28 | + H = cls.algorithms[algorithm.decode('ASCII', 'ignore') if isinstance(algorithm, bytes) else algorithm] |
28 | 29 | except KeyError: |
29 | | - raise InvalidHeader(_('Unknown digest authentication algorithm: %r'), algorithm) |
| 30 | + raise InvalidHeader(_('Unknown digest authentication algorithm: %r'), algorithm) from None |
| 31 | + |
| 32 | + def _algo(value) -> bytes: |
| 33 | + h = H() |
| 34 | + h.update(value) |
| 35 | + return h.hexdigest().encode('ASCII') |
| 36 | + |
| 37 | + return _algo |
30 | 38 |
|
31 | 39 | @classmethod |
32 | 40 | def generate_nonce(cls, authinfo: ByteUnicodeDict) -> bytes: |
33 | | - from time import time |
34 | | - from uuid import uuid4 |
35 | | - |
36 | 41 | nonce = b'%d:%s:%s' % ( |
37 | 42 | time(), |
38 | 43 | authinfo.get('etag', authinfo.get('realm', b'')), |
@@ -176,7 +181,7 @@ def calculate_request_digest(cls, authinfo: ByteUnicodeDict) -> bytes: |
176 | 181 | algorithm = authinfo.get('algorithm', b'MD5').decode('ASCII', 'replace') |
177 | 182 | H = cls.get_algorithm(algorithm) |
178 | 183 |
|
179 | | - if algorithm == 'MD5-sess' and authinfo.get('A1'): # noqa: SIM108 |
| 184 | + if algorithm.endswith('-sess') and authinfo.get('A1'): # noqa: SIM108 |
180 | 185 | secret = H(authinfo['A1']) |
181 | 186 | else: |
182 | 187 | secret = H(cls.A1(authinfo)) |
@@ -208,7 +213,7 @@ def A1(cls, params: ByteUnicodeDict) -> bytes: |
208 | 213 |
|
209 | 214 | if not algorithm or algorithm == b'MD5': |
210 | 215 | return b'%s:%s:%s' % (params['username'], params['realm'], params['password']) |
211 | | - if algorithm == b'MD5-sess': |
| 216 | + if algorithm.endswith(b'-sess'): |
212 | 217 | H = cls.get_algorithm(algorithm) |
213 | 218 | s = b'%s:%s:%s' % (params['username'], params['realm'], params['password']) |
214 | 219 | return b'%s:%s:%s' % (H(s), params['nonce'], params['cnonce']) |
|
0 commit comments