44
55from __future__ import annotations
66
7+ import re
78from datetime import datetime , timedelta , timezone
89from urllib .parse import urlparse
910
1011from pyinfra import host
11- from pyinfra .api import OperationError , operation
12+ from pyinfra .api import operation
13+ from pyinfra .api .exceptions import OperationError
1214from pyinfra .facts .apt import (
1315 AptKeys ,
1416 AptSources ,
1820)
1921from pyinfra .facts .deb import DebPackage , DebPackages
2022from pyinfra .facts .files import File
21- from pyinfra .facts .gpg import GpgKey
23+ from pyinfra .facts .gpg import GpgKey , GpgKeyrings
2224from pyinfra .facts .server import Date
25+ from pyinfra .operations import files , gpg
2326
24- from . import files
2527from .util .packaging import ensure_packages
2628
2729APT_UPDATE_FILENAME = "/var/lib/apt/periodic/update-success-stamp"
@@ -45,22 +47,111 @@ def _simulate_then_perform(command: str):
4547 yield noninteractive_apt (command )
4648
4749
50+ def _sanitize_apt_keyring_name (name : str ) -> str :
51+ """
52+ Produce a filesystem-friendly name from an URL host/basename or a local filename.
53+ """
54+ name = name .strip ().lower ()
55+ name = re .sub (r"[^\w.-]+" , "_" , name )
56+ name = re .sub (r"_+" , "_" , name ).strip ("_." )
57+ return name or "apt-keyring"
58+
59+
60+ def _derive_dest_from_src_and_keyids (
61+ src : str | None , keyids : list [str ] | None , dest : str | None
62+ ) -> str :
63+ """
64+ Compute a stable destination path in /etc/apt/keyrings/.
65+ Priority:
66+ 1) explicit dest if provided
67+ 2) from src (URL host + basename, or local basename)
68+ 3) from keyids (joined)
69+ 4) fallback "apt-keyring.gpg"
70+ """
71+ if dest :
72+ # Ensure it ends with .gpg and is absolute under /etc/apt/keyrings
73+ if not dest .endswith (".gpg" ):
74+ dest += ".gpg"
75+ if not dest .startswith ("/" ):
76+ dest = f"/etc/apt/keyrings/{ dest } "
77+ return dest
78+
79+ base = None
80+ if src :
81+ parsed = urlparse (src )
82+ if parsed .scheme and parsed .netloc :
83+ host_name = _sanitize_apt_keyring_name (parsed .netloc .replace (":" , "_" ))
84+ bn = _sanitize_apt_keyring_name (
85+ (parsed .path .rsplit ("/" , 1 )[- 1 ] or "key" ).replace (".asc" , "" ).replace (".gpg" , "" )
86+ )
87+ base = f"{ host_name } -{ bn } "
88+ else :
89+ bn = _sanitize_apt_keyring_name (
90+ src .rsplit ("/" , 1 )[- 1 ].replace (".asc" , "" ).replace (".gpg" , "" )
91+ )
92+ base = bn or "key"
93+ elif keyids :
94+ base = "keyserver-" + _sanitize_apt_keyring_name ("-" .join (keyids ))
95+ else :
96+ base = "apt-keyring"
97+
98+ return f"/etc/apt/keyrings/{ base } .gpg"
99+
100+
101+ def _get_apt_keys_comprehensive () -> dict [str , str ]:
102+ """
103+ Get all GPG keys available in APT directories using the GpgKeyrings fact.
104+ This provides more comprehensive coverage than AptKeys fact.
105+ Falls back gracefully if GpgKeyrings data is not available.
106+
107+ Returns:
108+ dict: Key ID -> keyring file path mapping
109+ """
110+ try :
111+ apt_directories = ["/etc/apt/trusted.gpg.d" , "/etc/apt/keyrings" , "/usr/share/keyrings" ]
112+ keyrings_info = host .get_fact (GpgKeyrings , directories = apt_directories )
113+
114+ all_keys = {}
115+ for keyring_path , keyring_data in keyrings_info .items ():
116+ keys = keyring_data .get ("keys" , {})
117+ for key_id in keys .keys ():
118+ all_keys [key_id ] = keyring_path
119+
120+ return all_keys
121+ except (KeyError , AttributeError ):
122+ # Fallback to empty dict if GpgKeyrings fact is not available (e.g., in tests)
123+ return {}
124+
125+
48126@operation ()
49- def key (src : str | None = None , keyserver : str | None = None , keyid : str | list [str ] | None = None ):
127+ def key (
128+ src : str | None = None ,
129+ keyserver : str | None = None ,
130+ keyid : str | list [str ] | None = None ,
131+ dest : str | None = None ,
132+ present : bool = True ,
133+ ):
50134 """
51- Add apt gpg keys with ``apt-key`` .
135+ Add or remove apt GPG keys using modern keyring management .
52136
53- + src: filename or URL
54- + keyserver: URL of keyserver to fetch key from
55- + keyid: key ID or list of key IDs when using keyserver
137+ This operation manages GPG keys for APT repos without using the deprecated apt-key command.
138+ Keys are stored in /etc/apt/keyrings/ and can be referenced in source lists via signed-by=.
56139
57- keyserver/id:
58- These must be provided together.
140+ Args:
141+ src: filename or URL to a key (ASCII .asc or binary .gpg)
142+ keyserver: keyserver URL for fetching keys by ID
143+ keyid: key ID or list of key IDs (required with keyserver, optional for removal)
144+ dest: optional keyring path ('.gpg' will be enforced, defaults under /etc/apt/keyrings)
145+ present: whether the key should be present (True) or absent (False)
59146
60- .. warning::
61- ``apt-key`` is deprecated in Debian, it is recommended NOT to use this
62- operation and instead follow the instructions here:
147+ Behavior:
148+ - Installation: Idempotent via AptKeys - if key IDs are already present, nothing changes
149+ - Removal: Uses GpgKeyrings fact to find and remove keys from APT directories
150+ - If src is ASCII (.asc), it will be dearmored; if binary (.gpg), it's copied as-is
151+ - Keyserver flow uses temporary GNUPGHOME, then exports to destination keyring
63152
153+ .. warning::
154+ ``apt-key`` is deprecated in Debian. This operation follows modern keyring management:
64155 https://wiki.debian.org/DebianRepository/UseThirdParty
65156
66157 **Examples:**
@@ -69,53 +160,94 @@ def key(src: str | None = None, keyserver: str | None = None, keyid: str | list[
69160
70161 from pyinfra.operations import apt
71162 # Note: If using URL, wget is assumed to be installed.
163+
164+ apt.key(
165+ name="Add Docker apt GPG key",
166+ src="https://download.docker.com/linux/debian/gpg",
167+ dest="docker.gpg",
168+ )
169+
72170 apt.key(
73- name="Add the Docker apt gpg key",
74- src="https://download.docker.com/linux/ubuntu/gpg",
171+ name="Remove specific keyring file",
172+ dest="old-vendor.gpg",
173+ present=False,
174+ )
175+
176+ apt.key(
177+ name="Remove key by ID from all APT keyrings",
178+ keyid="0xCOMPROMISED123",
179+ present=False,
75180 )
76181
77182 apt.key(
78- name="Install VirtualBox key",
79- src="https://www.virtualbox.org/download/oracle_vbox_2016.asc",
183+ name="Fetch keys from keyserver",
184+ keyserver="hkps://keyserver.ubuntu.com",
185+ keyid=["0xD88E42B4", "0x7EA0A9C3"],
186+ dest="vendor-archive.gpg",
80187 )
81188 """
82189
190+ # Handle removal operations using the GPG infrastructure
191+ if present is False :
192+ # Use the GPG operation for removal, but restrict to APT directories
193+ apt_working_dirs = ["/etc/apt/trusted.gpg.d" , "/etc/apt/keyrings" , "/usr/share/keyrings" ]
194+ yield from gpg .key ._inner (
195+ dest = dest ,
196+ keyid = keyid ,
197+ present = False ,
198+ working_dirs = apt_working_dirs ,
199+ )
200+ return
201+
202+ # Installation logic (existing code)
203+ # Get comprehensive view of all keys in APT directories
204+ existing_keys_comprehensive = _get_apt_keys_comprehensive ()
205+ # Also get the legacy AptKeys fact for compatibility
83206 existing_keys = host .get_fact (AptKeys )
84207
208+ # Combine both sources of key information for complete coverage
209+ all_available_keys = set (existing_keys_comprehensive .keys ()) | set (existing_keys .keys ())
210+
211+ # Check idempotency for src branch
85212 if src :
86- key_data = host .get_fact (GpgKey , src = src )
87- if key_data :
88- keyid = list (key_data .keys ())
89-
90- if not keyid or not all (kid in existing_keys for kid in keyid ):
91- # If URL, wget the key to stdout and pipe into apt-key, because the "adv"
92- # apt-key passes to gpg which doesn't always support https!
93- if urlparse (src ).scheme :
94- yield "(wget -O - {0} || curl -sSLf {0}) | apt-key add -" .format (src )
95- else :
96- yield "apt-key add {0}" .format (src )
97- else :
98- host .noop ("All keys from {0} are already available in the apt keychain" .format (src ))
213+ key_data = host .get_fact (GpgKey , src = src ) # Parses the key(s) from src to extract key IDs
214+ keyids_from_src = list (key_data .keys ()) if key_data else []
215+
216+ # If we don't know the IDs (eg. unreachable URL), we cannot determine idempotency
217+ # -> try to install.
218+ # Otherwise, skip if all key IDs are already present.
219+ if keyids_from_src and all (kid in all_available_keys for kid in keyids_from_src ):
220+ host .noop (f"All keys from { src } are already available in the apt keychain" )
221+ return
99222
100- if keyserver :
223+ dest_path = _derive_dest_from_src_and_keyids (src , keyids_from_src or None , dest )
224+
225+ # Check idempotency for keyserver branch
226+ elif keyserver :
101227 if not keyid :
102228 raise OperationError ("`keyid` must be provided with `keyserver`" )
103229
104230 if isinstance (keyid , str ):
105231 keyid = [keyid ]
106232
107- needed_keys = sorted (set (keyid ) - set (existing_keys .keys ()))
108- if needed_keys :
109- yield "apt-key adv --keyserver {0} --recv-keys {1}" .format (
110- keyserver ,
111- " " .join (needed_keys ),
112- )
113- else :
114- host .noop (
115- "Keys {0} are already available in the apt keychain" .format (
116- ", " .join (keyid ),
117- ),
118- )
233+ needed_keys = sorted (set (keyid ) - all_available_keys )
234+ if not needed_keys :
235+ host .noop (f"Keys { ', ' .join (keyid )} are already available in the apt keychain" )
236+ return
237+
238+ dest_path = _derive_dest_from_src_and_keyids (None , needed_keys , dest )
239+ # Only install the needed keys
240+ keyid = needed_keys
241+
242+ # Use the generic GPG operation to install the key
243+ yield from gpg .key ._inner (
244+ src = src ,
245+ dest = dest_path ,
246+ keyserver = keyserver ,
247+ keyid = keyid ,
248+ dearmor = True ,
249+ mode = "0644" ,
250+ )
119251
120252
121253@operation ()
0 commit comments