11#!/usr/bin/env python3
22"""
3- Chelon Sign RPM
3+ Chelon Sign (Unified Client)
44
5- Sign RPM packages using Chelon service.
6- Supports creating detached signatures (.asc) and embedding signatures via rpmsign integration.
5+ Signs files using the Chelon service.
6+ Supports:
7+ - RPM packages (detached signatures or embedded via rpmsign)
8+ - Repository metadata (repomd.xml)
9+ - GPG emulation for rpmsign integration
710"""
811
912import os
1013import sys
1114import argparse
1215import subprocess
1316import base64
17+ import shlex
18+ import binascii
1419from pathlib import Path
1520from typing import Optional , List
1621
@@ -93,10 +98,28 @@ def gpg_mode(args: List[str]):
9398 # Read input data with a 10MB limit (DoS protection)
9499 MAX_INPUT_SIZE = 10 * 1024 * 1024 # 10MB
95100 if input_file == "-" :
96- data = sys .stdin .buffer .read (MAX_INPUT_SIZE + 1 )
97- if len (data ) > MAX_INPUT_SIZE :
98- print (f"Error: Input data from stdin exceeds limit of { MAX_INPUT_SIZE } bytes" , file = sys .stderr )
99- sys .exit (1 )
101+ # Read stdin in chunks to avoid buffering MAX_INPUT_SIZE+1 bytes unconditionally
102+ MAX_CHUNK_SIZE = 64 * 1024 # 64KB
103+ data_buf = bytearray ()
104+ stdin_buffer = sys .stdin .buffer
105+
106+ while True :
107+ # Limit each read to the remaining allowed bytes plus one extra for overflow detection
108+ remaining = (MAX_INPUT_SIZE + 1 ) - len (data_buf )
109+ if remaining <= 0 :
110+ break
111+
112+ chunk = stdin_buffer .read (min (MAX_CHUNK_SIZE , remaining ))
113+ if not chunk :
114+ break
115+
116+ data_buf .extend (chunk )
117+
118+ if len (data_buf ) > MAX_INPUT_SIZE :
119+ print (f"Error: Input data from stdin exceeds limit of { MAX_INPUT_SIZE } bytes" , file = sys .stderr )
120+ sys .exit (1 )
121+
122+ data = bytes (data_buf )
100123 else :
101124 file_size = os .path .getsize (input_file )
102125 if file_size > MAX_INPUT_SIZE :
@@ -145,7 +168,11 @@ def gpg_mode(args: List[str]):
145168 sys .exit (1 )
146169 else :
147170 with open (output_file , 'wb' ) as f :
148- f .write (base64 .b64decode (signature ))
171+ try :
172+ f .write (base64 .b64decode (signature ))
173+ except binascii .Error as e :
174+ print (f"Error: Malformed base64 signature: { e } " , file = sys .stderr )
175+ sys .exit (1 )
149176 except IOError as e :
150177 print (f"Error writing to { output_file } : { e } " , file = sys .stderr )
151178 sys .exit (1 )
@@ -170,7 +197,11 @@ def gpg_mode(args: List[str]):
170197 print ("Error: 'gpg' command not found." , file = sys .stderr )
171198 sys .exit (1 )
172199 else :
173- sys .stdout .buffer .write (base64 .b64decode (signature ))
200+ try :
201+ sys .stdout .buffer .write (base64 .b64decode (signature ))
202+ except binascii .Error as e :
203+ print (f"Error: Malformed base64 signature: { e } " , file = sys .stderr )
204+ sys .exit (1 )
174205 sys .stdout .buffer .flush ()
175206
176207 except Exception as e :
@@ -204,7 +235,7 @@ def sign_rpm_integrated(rpm_path: str, key_type: str = 'modern', verbose: bool =
204235
205236 cmd = [
206237 'rpmsign' ,
207- '--define' , f'__gpg { script_path } ' ,
238+ '--define' , f'__gpg { shlex . quote ( str ( script_path )) } ' ,
208239 '--define' , f'_gpg_name { key_id } ' ,
209240 '--define' , '_gpg_sign_cmd_extra_args --batch --no-tty' ,
210241 '--resign' , str (rpm_file )
@@ -226,32 +257,38 @@ def sign_rpm_integrated(rpm_path: str, key_type: str = 'modern', verbose: bool =
226257 return False
227258
228259
229- def sign_rpm_detached ( rpm_path : str , output_path : Optional [str ] = None ,
230- key_type : str = 'modern' , verbose : bool = False ) -> str :
260+ def sign_file_detached ( file_path : str , output_path : Optional [str ] = None ,
261+ key_type : str = 'modern' , operation : str = 'rpm ' , verbose : bool = False ) -> str :
231262 """
232- Sign an RPM file (creates detached signature)
263+ Sign a file (creates detached signature)
264+ Supports both rpm and repodata operations
233265 """
234- rpm_file = Path (rpm_path )
235- if not rpm_file .exists ():
236- raise FileNotFoundError (f"RPM file not found: { rpm_path } " )
266+ target_file = Path (file_path )
267+ if not target_file .exists ():
268+ raise FileNotFoundError (f"File not found: { file_path } " )
237269
238270 if output_path is None :
239- output_path = str (rpm_file ) + '.asc'
271+ output_path = str (target_file ) + '.asc'
240272
241273 if verbose :
242- print (f"Signing: { rpm_path } " )
274+ print (f"Signing: { file_path } " )
243275 print (f"Output: { output_path } " )
244276 print (f"Key type: { key_type } " )
277+ print (f"Operation: sign_{ operation } " )
245278
246279 try :
247280 client = get_client ()
248- response = client .sign_file (str (rpm_file ), key_type = key_type , operation = 'rpm' )
281+ response = client .sign_file (str (target_file ), key_type = key_type , operation = operation )
249282
250283 with open (output_path , 'w' ) as f :
251284 f .write (response ['signature' ])
252285
253286 if verbose :
254- print (f"✓ Signature written to: { output_path } " )
287+ print (f"✓ Signed successfully" )
288+ print (f" Request ID: { response .get ('request_id' )} " )
289+ print (f" Key ID: { response .get ('key_id' )} " )
290+ print (f" Signature written to: { output_path } " )
291+
255292 return output_path
256293
257294 except ChelonClientError as e :
@@ -261,24 +298,25 @@ def sign_rpm_detached(rpm_path: str, output_path: Optional[str] = None,
261298
262299def main ():
263300 # Detect if we are being called as a GPG wrapper
264- # rpmsign usually calls with a lot of flags, including -sbo or --detach-sign
265- if len (sys .argv ) > 1 and sys .argv [1 ].startswith ('--' ):
301+ if len (sys .argv ) > 1 :
266302 # Check if it looks specifically like a GPG call from rpmsign
267303 gpg_flags = {'-sbo' , '--detach-sign' , '--armor' , '--no-secmem-warning' }
268304 if any (arg in gpg_flags for arg in sys .argv ):
269305 gpg_mode (sys .argv [1 :])
270- elif '--version' in sys . argv and len (sys .argv ) == 2 :
306+ elif sys . argv [ 1 ] == '--version' and len (sys .argv ) == 2 :
271307 # Handle standalone --version for discovery tools
272308 gpg_mode (sys .argv [1 :])
273309
274310 parser = argparse .ArgumentParser (
275- description = 'Sign RPM packages using Chelon service ' ,
311+ description = 'Chelon Sign - Unified Signing Tool ' ,
276312 epilog = 'Environment variables: CHELON_URL, CHELON_TOKEN, CHELON_CERT_DIR'
277313 )
278314
279- parser .add_argument ('rpm_file' , help = 'Path to RPM file' )
280- parser .add_argument ('--resign' , action = 'store_true' , help = 'Embed signature into RPM header (requires rpmsign)' )
281- parser .add_argument ('-o' , '--output' , help = 'Output signature file (default: <rpm_file>.asc, only for detached)' )
315+ parser .add_argument ('file' , help = 'Path to file (RPM or repomd.xml)' )
316+ parser .add_argument ('-t' , '--type' , choices = ['rpm' , 'repodata' ],
317+ help = 'Signing type (default: guess from extension or "rpm")' )
318+ parser .add_argument ('--resign' , action = 'store_true' , help = 'Embed signature into RPM header (requires rpmsign, implies --type rpm)' )
319+ parser .add_argument ('-o' , '--output' , help = 'Output signature file (default: <file>.asc, only for detached)' )
282320 parser .add_argument ('-k' , '--key-type' , choices = ['legacy' , 'modern' ], default = 'modern' ,
283321 help = 'GPG key type to use (default: modern)' )
284322 parser .add_argument ('--insecure' , action = 'store_true' , help = 'Disable SSL certificate verification' )
@@ -288,16 +326,30 @@ def main():
288326
289327 if args .insecure :
290328 os .environ ['CHELON_VERIFY_SSL' ] = 'false'
329+
330+ # Determine operation type
331+ op_type = args .type
332+ if not op_type :
333+ if args .resign :
334+ op_type = 'rpm'
335+ elif args .file .endswith ('.xml' ):
336+ op_type = 'repodata'
337+ else :
338+ op_type = 'rpm'
291339
292340 try :
293341 if args .resign :
294- success = sign_rpm_integrated (args .rpm_file , key_type = args .key_type , verbose = args .verbose )
342+ if op_type != 'rpm' :
343+ print ("Error: --resign is only supported for RPM files" , file = sys .stderr )
344+ return 1
345+ success = sign_rpm_integrated (args .file , key_type = args .key_type , verbose = args .verbose )
295346 return 0 if success else 1
296347 else :
297- output_file = sign_rpm_detached (
298- args .rpm_file ,
348+ output_file = sign_file_detached (
349+ args .file ,
299350 output_path = args .output ,
300351 key_type = args .key_type ,
352+ operation = op_type ,
301353 verbose = args .verbose
302354 )
303355 if not args .verbose :
@@ -314,4 +366,3 @@ def main():
314366
315367if __name__ == '__main__' :
316368 sys .exit (main ())
317-
0 commit comments