11import base64
22import hashlib
33import fnmatch
4- import re
5- import subprocess
6- import gnupg
74import json
85import logging
96import time
1512from functools import partial
1613from rest_framework .exceptions import Throttled
1714
15+ from pysequoia .packet import PacketPile , Tag
16+
1817from pulpcore .plugin .models import Artifact , Task
1918
2019from pulp_container .constants import (
3130 SIGNATURE_SCHEMA ,
3231)
3332
34- KEY_ID_REGEX_COMPILED = re .compile (r"keyid ([0-9A-F]+)" )
35- TIMESTAMP_REGEX_COMPILED = re .compile (r"created ([0-9]+)" )
36-
3733signature_validator = Draft7Validator (SIGNATURE_SCHEMA )
3834
3935log = logging .getLogger (__name__ )
@@ -78,6 +74,20 @@ def urlpath_sanitize(*args):
7874 return "/" .join (segments )
7975
8076
77+ def keyid_from_fingerprint (fingerprint ):
78+ """Derive a key ID from an OpenPGP fingerprint.
79+
80+ For v4 fingerprints (40 hex chars / 20 bytes), the key ID is the last 8 bytes.
81+ For v6 fingerprints (64 hex chars / 32 bytes), the key ID is the first 8 bytes.
82+ """
83+ if len (fingerprint ) == 40 :
84+ return fingerprint [- 16 :]
85+ elif len (fingerprint ) == 64 :
86+ return fingerprint [:16 ]
87+ else :
88+ raise ValueError (f"Unexpected fingerprint length: { len (fingerprint )} " )
89+
90+
8191def extract_data_from_signature (signature_raw , man_digest ):
8292 """
8393 Extract data from an "integrated" signature, aka a signed non-encrypted document.
@@ -90,37 +100,56 @@ def extract_data_from_signature(signature_raw, man_digest):
90100 dict: JSON representation of the document and available data about signature
91101
92102 """
93- gpg = gnupg . GPG ()
94- crypt_obj = gpg . decrypt (signature_raw , extra_args = [ "--skip-verify" ] )
95- if not crypt_obj . data :
96- log . info (
97- "It is not possible to read the signed document, GPG error : {}" .format (crypt_obj . stderr )
103+ try :
104+ pile = PacketPile . from_bytes (signature_raw )
105+ except Exception as exc :
106+ raise ValueError (
107+ "Signed document for manifest {} is un-parseable : {}" .format (man_digest , str ( exc ) )
98108 )
99- return
109+
110+ literal_data = None
111+ signing_key_id = None
112+ signing_key_fingerprint = None
113+ signature_timestamp = None
114+
115+ for packet in pile :
116+ if packet .tag == Tag .Literal :
117+ literal_data = bytes (packet .literal_data )
118+ elif packet .tag == Tag .Signature :
119+ if packet .issuer_key_id is not None :
120+ signing_key_id = packet .issuer_key_id .upper ()
121+ elif packet .issuer_fingerprint is not None :
122+ signing_key_fingerprint = packet .issuer_fingerprint .upper ()
123+ signing_key_id = keyid_from_fingerprint (signing_key_fingerprint )
124+ else :
125+ raise ValueError (
126+ "Signature for manifest {} has no fingerprint or key_id" .format (man_digest )
127+ )
128+ if packet .signature_created is not None :
129+ signature_timestamp = int (packet .signature_created .timestamp ())
130+
131+ if not literal_data :
132+ raise ValueError ("Signature for manifest {} has no literal data" .format (man_digest ))
100133
101134 try :
102- sig_json = json .loads (crypt_obj . data )
135+ sig_json = json .loads (literal_data )
103136 except Exception as exc :
104- log .info (
105- "Signed document cannot be parsed to create a signature for {}."
106- " Error: {}" .format (man_digest , str (exc ))
137+ raise ValueError (
138+ "Signed document cannot be parsed to create a signature for {}. Error: {}" .format (
139+ man_digest , str (exc )
140+ )
107141 )
108- return
109142
110143 errors = []
111144 for error in signature_validator .iter_errors (sig_json ):
112145 errors .append (f'{ "." .join (error .path )} : { error .message } ' )
113146
114147 if errors :
115- log .info ("The signature for {} is not synced due to: {}" .format (man_digest , errors ))
116- return
117-
118- # decrypted and unverified signatures do not have prepopulated the key_id and timestamp
119- # fields; thus, it is necessary to use the debugging utilities of gpg to extract these
120- # fields since they are not encrypted and still readable without decrypting the signature first
121- packets = subprocess .check_output (["gpg" , "--list-packets" ], input = signature_raw ).decode ()
122- sig_json ["signing_key_id" ] = KEY_ID_REGEX_COMPILED .search (packets ).group (1 )
123- sig_json ["signature_timestamp" ] = TIMESTAMP_REGEX_COMPILED .search (packets ).group (1 )
148+ raise ValueError ("The signature for {} is not synced due to: {}" .format (man_digest , errors ))
149+
150+ sig_json ["signing_key_id" ] = signing_key_id
151+ sig_json ["signing_key_fingerprint" ] = signing_key_fingerprint
152+ sig_json ["signature_timestamp" ] = signature_timestamp
124153
125154 return sig_json
126155
0 commit comments