11import base64
22import hashlib
33import fnmatch
4- import re
5- import subprocess
6- import gnupg
74import json
85import logging
96import time
1512from functools import partial
1613from rest_framework .exceptions import Throttled
1714
15+ from pulpcore .app .openpgp import packet_iter , subpacket_iter
1816from pulpcore .plugin .models import Artifact , Task
1917from pulpcore .plugin .util import get_domain
2018
3230 SIGNATURE_SCHEMA ,
3331)
3432
35- KEY_ID_REGEX_COMPILED = re .compile (r"keyid ([0-9A-F]+)" )
36- TIMESTAMP_REGEX_COMPILED = re .compile (r"created ([0-9]+)" )
37-
3833signature_validator = Draft7Validator (SIGNATURE_SCHEMA )
3934
4035log = logging .getLogger (__name__ )
@@ -86,10 +81,113 @@ def urlpath_sanitize(*args):
8681 return "/" .join (segments )
8782
8883
84+ _TAG_ONE_PASS_SIG = 4
85+ _TAG_LITERAL_DATA = 11
86+ _TAG_SIGNATURE = 2
87+
88+
89+ def _extract_sig_metadata (subpacket_data ):
90+ """Extract key_id and timestamp from signature subpacket bytes."""
91+ result = {}
92+ for sp in subpacket_iter (subpacket_data ):
93+ if sp ["type" ] == 2 and len (sp ["body" ]) >= 4 :
94+ result ["timestamp" ] = int .from_bytes (sp ["body" ][:4 ], "big" )
95+ elif sp ["type" ] == 16 and len (sp ["body" ]) >= 8 :
96+ result ["key_id" ] = sp ["body" ][:8 ].hex ().upper ()
97+ elif sp ["type" ] == 33 and len (sp ["body" ]) >= 5 :
98+ result .setdefault ("key_id" , sp ["body" ][- 8 :].hex ().upper ())
99+ return result
100+
101+
102+ def _extract_inline_sig_data (signature_raw ):
103+ """
104+ Parse an OpenPGP inline-signed message and extract its content and metadata.
105+
106+ Handles both v4 (RFC 4880) and v6 (RFC 9580 / PQC) packet formats.
107+
108+ Returns:
109+ tuple: (literal_data_bytes, key_id_hex, timestamp_int) or None on failure.
110+
111+ """
112+ literal_data = None
113+ key_id = None
114+ timestamp = None
115+
116+ try :
117+ for packet in packet_iter (signature_raw ):
118+ tag = packet ["type" ]
119+ body = packet ["body" ]
120+
121+ if tag == _TAG_ONE_PASS_SIG :
122+ version = body [0 ]
123+ if version == 3 and len (body ) >= 13 :
124+ key_id = body [4 :12 ].hex ().upper ()
125+ elif version == 6 and len (body ) >= 7 :
126+ # v6 OPS: type(1) + hash(1) + pubkey(1) + salt_len(1) + salt(N)
127+ # + fpr_len(1) + fingerprint(N) + nested(1)
128+ salt_len = body [4 ]
129+ fpr_offset = 5 + salt_len
130+ if fpr_offset < len (body ):
131+ fpr_len = body [fpr_offset ]
132+ fpr = body [fpr_offset + 1 : fpr_offset + 1 + fpr_len ]
133+ key_id = fpr [- 8 :].hex ().upper () if len (fpr ) >= 8 else None
134+
135+ elif tag == _TAG_LITERAL_DATA :
136+ fname_len = body [1 ]
137+ literal_data = body [6 + fname_len :]
138+
139+ elif tag == _TAG_SIGNATURE :
140+ version = body [0 ]
141+ if version in (4 , 5 ) and len (body ) >= 6 :
142+ hashed_len = (body [4 ] << 8 ) + body [5 ]
143+ hashed_data = body [6 : 6 + hashed_len ]
144+ unhashed_start = 6 + hashed_len
145+ unhashed_len = (body [unhashed_start ] << 8 ) + body [unhashed_start + 1 ]
146+ unhashed_data = body [unhashed_start + 2 : unhashed_start + 2 + unhashed_len ]
147+ sp = {
148+ ** _extract_sig_metadata (unhashed_data ),
149+ ** _extract_sig_metadata (hashed_data ),
150+ }
151+ timestamp = timestamp or sp .get ("timestamp" )
152+ key_id = key_id or sp .get ("key_id" )
153+ elif version == 6 and len (body ) >= 8 :
154+ hashed_len = int .from_bytes (body [4 :8 ], "big" )
155+ hashed_data = body [8 : 8 + hashed_len ]
156+ unhashed_start = 8 + hashed_len
157+ unhashed_len = int .from_bytes (
158+ body [unhashed_start : unhashed_start + 4 ], "big"
159+ )
160+ unhashed_data = body [unhashed_start + 4 : unhashed_start + 4 + unhashed_len ]
161+ sp = {
162+ ** _extract_sig_metadata (unhashed_data ),
163+ ** _extract_sig_metadata (hashed_data ),
164+ }
165+ timestamp = timestamp or sp .get ("timestamp" )
166+ key_id = key_id or sp .get ("key_id" )
167+ except (ValueError , IndexError , NotImplementedError ) as exc :
168+ log .info ("Failed to parse OpenPGP packets: %s" , exc )
169+ return None
170+
171+ if literal_data is None or key_id is None or timestamp is None :
172+ log .info (
173+ "Incomplete OpenPGP inline-signed message "
174+ "(literal_data=%s, key_id=%s, timestamp=%s)" ,
175+ literal_data is not None ,
176+ key_id ,
177+ timestamp ,
178+ )
179+ return None
180+
181+ return literal_data , key_id , timestamp
182+
183+
89184def extract_data_from_signature (signature_raw , man_digest ):
90185 """
91186 Extract data from an "integrated" signature, aka a signed non-encrypted document.
92187
188+ Parses the OpenPGP inline-signed message directly, supporting both v4 (RFC 4880)
189+ and v6 (RFC 9580) packet formats including post-quantum cryptography signatures.
190+
93191 Args:
94192 signature_raw(bytes): A signed doc to get data from
95193 man_digest (str): A manifest digest for which the signature is for
@@ -98,20 +196,23 @@ def extract_data_from_signature(signature_raw, man_digest):
98196 dict: JSON representation of the document and available data about signature
99197
100198 """
101- gpg = gnupg .GPG ()
102- crypt_obj = gpg .decrypt (signature_raw , extra_args = ["--skip-verify" ])
103- if not crypt_obj .data :
199+ parsed = _extract_inline_sig_data (signature_raw )
200+ if parsed is None :
104201 log .info (
105- "It is not possible to read the signed document, GPG error: {}" .format (crypt_obj .stderr )
202+ "It is not possible to read the signed document for %s" ,
203+ man_digest ,
106204 )
107205 return
108206
207+ literal_data , key_id , timestamp = parsed
208+
109209 try :
110- sig_json = json .loads (crypt_obj . data )
210+ sig_json = json .loads (literal_data )
111211 except Exception as exc :
112212 log .info (
113- "Signed document cannot be parsed to create a signature for {}."
114- " Error: {}" .format (man_digest , str (exc ))
213+ "Signed document cannot be parsed to create a signature for %s. Error: %s" ,
214+ man_digest ,
215+ exc ,
115216 )
116217 return
117218
@@ -120,15 +221,11 @@ def extract_data_from_signature(signature_raw, man_digest):
120221 errors .append (f'{ "." .join (error .path )} : { error .message } ' )
121222
122223 if errors :
123- log .info ("The signature for {} is not synced due to: {}" . format ( man_digest , errors ) )
224+ log .info ("The signature for %s is not synced due to: %s" , man_digest , errors )
124225 return
125226
126- # decrypted and unverified signatures do not have prepopulated the key_id and timestamp
127- # fields; thus, it is necessary to use the debugging utilities of gpg to extract these
128- # fields since they are not encrypted and still readable without decrypting the signature first
129- packets = subprocess .check_output (["gpg" , "--list-packets" ], input = signature_raw ).decode ()
130- sig_json ["signing_key_id" ] = KEY_ID_REGEX_COMPILED .search (packets ).group (1 )
131- sig_json ["signature_timestamp" ] = TIMESTAMP_REGEX_COMPILED .search (packets ).group (1 )
227+ sig_json ["signing_key_id" ] = key_id
228+ sig_json ["signature_timestamp" ] = timestamp
132229
133230 return sig_json
134231
0 commit comments