@@ -33,6 +33,26 @@ class Environment:
3333 has_sudo : bool = False # Best guess if user has sudo access
3434
3535
36+ def _read_probe_file (path : Path ) -> Optional [str ]:
37+ """
38+ Read an optional environment probe file.
39+
40+ Returns:
41+ File contents, or None when the probe file does not exist or cannot be read.
42+ """
43+ if not path .exists ():
44+ return None
45+
46+ try :
47+ with open (path ) as f :
48+ return f .read ()
49+ except OSError :
50+ # Environment detection is best-effort. Proc/sys metadata files can be
51+ # unreadable or disappear between exists() and open(), so treat that as
52+ # "signal unavailable" and continue with fallback probes.
53+ return None
54+
55+
3656def _detect_linux_distro () -> tuple [Optional [str ], Optional [str ]]:
3757 """
3858 Detect Linux distribution and version.
@@ -48,47 +68,46 @@ def _detect_linux_distro() -> tuple[Optional[str], Optional[str]]:
4868
4969 # Try /etc/os-release first, then /usr/lib/os-release (per freedesktop spec)
5070 for os_release_path in [Path ("/etc/os-release" ), Path ("/usr/lib/os-release" )]:
51- if os_release_path .exists ():
52- try :
53- with open (os_release_path ) as f :
54- os_release = {}
55- for line in f :
56- line = line .strip ()
57- if not line or line .startswith ("#" ):
58- continue
59- if "=" in line :
60- key , _ , value = line .partition ("=" )
61- # Remove quotes
62- value = value .strip ('"' ).strip ("'" )
63- os_release [key ] = value
64-
65- distro_id = os_release .get ("ID" , "" ).lower ()
66- version = os_release .get ("VERSION_ID" , "" )
67- id_like = os_release .get ("ID_LIKE" , "" ).lower ().split ()
68-
69- # Normalize distro IDs
70- if distro_id in known_base_distros :
71- return distro_id , version
72- elif distro_id in rhel_family :
73- # Oracle Linux (ol), Amazon Linux (amzn)
74- return "rhel" , version
75- elif distro_id in suse_family :
76- return "suse" , version
77-
78- # Check ID_LIKE for derivative distributions (e.g., Pop!_OS, Raspbian, Mint)
79- if id_like :
80- for parent in id_like :
81- if parent in known_base_distros :
82- return parent , version
83- elif parent in rhel_family :
84- return "rhel" , version
85- elif parent in suse_family :
86- return "suse" , version
87-
88- # Return the raw distro_id if we couldn't normalize it
89- return distro_id , version
90- except OSError :
91- pass
71+ os_release_content = _read_probe_file (os_release_path )
72+ if os_release_content is None :
73+ continue
74+
75+ os_release = {}
76+ for line in os_release_content .splitlines ():
77+ line = line .strip ()
78+ if not line or line .startswith ("#" ):
79+ continue
80+ if "=" in line :
81+ key , _ , value = line .partition ("=" )
82+ # Remove quotes
83+ value = value .strip ('"' ).strip ("'" )
84+ os_release [key ] = value
85+
86+ distro_id = os_release .get ("ID" , "" ).lower ()
87+ version = os_release .get ("VERSION_ID" , "" )
88+ id_like = os_release .get ("ID_LIKE" , "" ).lower ().split ()
89+
90+ # Normalize distro IDs
91+ if distro_id in known_base_distros :
92+ return distro_id , version
93+ elif distro_id in rhel_family :
94+ # Oracle Linux (ol), Amazon Linux (amzn)
95+ return "rhel" , version
96+ elif distro_id in suse_family :
97+ return "suse" , version
98+
99+ # Check ID_LIKE for derivative distributions (e.g., Pop!_OS, Raspbian, Mint)
100+ if id_like :
101+ for parent in id_like :
102+ if parent in known_base_distros :
103+ return parent , version
104+ elif parent in rhel_family :
105+ return "rhel" , version
106+ elif parent in suse_family :
107+ return "suse" , version
108+
109+ # Return the raw distro_id if we couldn't normalize it
110+ return distro_id , version
92111
93112 # Fallback: check for specific files
94113 if Path ("/etc/debian_version" ).exists ():
@@ -112,44 +131,31 @@ def _detect_cloud_provider() -> Optional[str]:
112131 """
113132 # AWS detection
114133 # Check for EC2 metadata
115- if Path ("/sys/hypervisor/uuid" ).exists ():
116- try :
117- with open ("/sys/hypervisor/uuid" ) as f :
118- uuid = f .read ().strip ()
119- if uuid .startswith ("ec2" ) or uuid .startswith ("EC2" ):
120- return "aws"
121- except OSError :
122- pass
134+ uuid = _read_probe_file (Path ("/sys/hypervisor/uuid" ))
135+ if uuid and uuid .strip ().lower ().startswith ("ec2" ):
136+ return "aws"
123137
124138 # Check AWS environment variables
125139 if os .getenv ("AWS_EXECUTION_ENV" ) or os .getenv ("AWS_REGION" ):
126140 return "aws"
127141
128142 # GCP detection
129143 # Check for GCP metadata
130- if Path ("/sys/class/dmi/id/product_name" ).exists ():
131- try :
132- with open ("/sys/class/dmi/id/product_name" ) as f :
133- product = f .read ().strip ()
134- if "Google" in product or "GCE" in product :
135- return "gcp"
136- except OSError :
137- pass
144+ product = _read_probe_file (Path ("/sys/class/dmi/id/product_name" ))
145+ if product :
146+ product = product .strip ()
147+ if "Google" in product or "GCE" in product :
148+ return "gcp"
138149
139150 # Check GCP environment variables
140151 if os .getenv ("GOOGLE_CLOUD_PROJECT" ) or os .getenv ("GCP_PROJECT" ):
141152 return "gcp"
142153
143154 # Azure detection
144- if Path ("/sys/class/dmi/id/sys_vendor" ).exists ():
145- try :
146- with open ("/sys/class/dmi/id/sys_vendor" ) as f :
147- vendor = f .read ().strip ()
148- # Could be Azure or Hyper-V, check for Azure-specific
149- if "Microsoft Corporation" in vendor and Path ("/var/lib/waagent" ).exists ():
150- return "azure"
151- except OSError :
152- pass
155+ vendor = _read_probe_file (Path ("/sys/class/dmi/id/sys_vendor" ))
156+ # Could be Azure or Hyper-V, check for Azure-specific
157+ if vendor and "Microsoft Corporation" in vendor .strip () and Path ("/var/lib/waagent" ).exists ():
158+ return "azure"
153159
154160 # Check Azure environment variables
155161 if os .getenv ("AZURE_SUBSCRIPTION_ID" ) or os .getenv ("WEBSITE_INSTANCE_ID" ):
@@ -173,14 +179,9 @@ def _detect_container() -> tuple[bool, bool]:
173179 is_docker = True
174180
175181 # Also check cgroup
176- if Path ("/proc/1/cgroup" ).exists ():
177- try :
178- with open ("/proc/1/cgroup" ) as f :
179- cgroup_content = f .read ()
180- if "docker" in cgroup_content or "containerd" in cgroup_content :
181- is_docker = True
182- except OSError :
183- pass
182+ cgroup_content = _read_probe_file (Path ("/proc/1/cgroup" ))
183+ if cgroup_content and ("docker" in cgroup_content or "containerd" in cgroup_content ):
184+ is_docker = True
184185
185186 # Kubernetes detection
186187 if os .getenv ("KUBERNETES_SERVICE_HOST" ):
@@ -201,14 +202,11 @@ def _detect_wsl() -> bool:
201202 return True
202203
203204 # Check /proc/version for Microsoft/WSL signatures
204- if Path ("/proc/version" ).exists ():
205- try :
206- with open ("/proc/version" ) as f :
207- version_info = f .read ().lower ()
208- if "microsoft" in version_info or "wsl" in version_info :
209- return True
210- except OSError :
211- pass
205+ version_info = _read_probe_file (Path ("/proc/version" ))
206+ if version_info :
207+ version_info = version_info .lower ()
208+ if "microsoft" in version_info or "wsl" in version_info :
209+ return True
212210
213211 # Check for Windows filesystem mounts (WSL mounts Windows drives at /mnt/)
214212 # This is less reliable but can catch WSL 1
0 commit comments