Skip to content

Commit 2510076

Browse files
committed
Improve CTI discovery safety and diagnostics
Add preflight checks, pattern validation and safer globbing for GenTL (.cti) discovery and loading. Renamed default CTI pattern constant to _LEGACY_DEFAULT_CTI_PATTERNS (Windows-only comment) and imported Path. Introduced _cti_preflight to detect missing/locked/permissioned files before Harvester.add_file and applied it where CTIs are loaded (skipping and logging problematic entries). Harden gentl_discovery with: redact-able diagnostics.summarize, _validate_glob_pattern, bounded _glob_limited, static-prefix checks, allowed-roots option and new discover_cti_files params (allow_globs, root_globs_allowed, max_glob_hits_per_pattern) to limit expensive scans. Also adjusted Harvester.update failure handling to treat update errors as discovery failures (return empty loaded list) and improved logging messages for discovery/load failures.
1 parent 873ad07 commit 2510076

File tree

2 files changed

+212
-15
lines changed

2 files changed

+212
-15
lines changed

dlclivegui/cameras/backends/gentl_backend.py

Lines changed: 52 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import logging
77
import time
8+
from pathlib import Path
89
from typing import ClassVar
910

1011
import cv2
@@ -33,7 +34,7 @@ class GenTLCameraBackend(CameraBackend):
3334
"""Capture frames from GenTL-compatible devices via Harvesters."""
3435

3536
OPTIONS_KEY: ClassVar[str] = "gentl"
36-
_DEFAULT_CTI_PATTERNS: tuple[str, ...] = (
37+
_LEGACY_DEFAULT_CTI_PATTERNS: tuple[str, ...] = ( # Windows-only, ignored on other platforms
3738
r"C:\\Program Files\\The Imaging Source Europe GmbH\\IC4 GenTL Driver for USB3Vision Devices *\\bin\\*.cti",
3839
r"C:\\Program Files\\The Imaging Source Europe GmbH\\TIS Grabber\\bin\\win64_x64\\*.cti",
3940
r"C:\\Program Files\\The Imaging Source Europe GmbH\\TIS Camera SDK\\bin\\win64_x64\\*.cti",
@@ -194,11 +195,33 @@ def get_device_count(cls) -> int:
194195
except Exception:
195196
pass
196197

198+
@staticmethod
199+
def _cti_preflight(path: str) -> tuple[bool, str | None]:
200+
"""
201+
Best-effort check right before calling Harvester.add_file().
202+
Still subject to race conditions (e.g. file deleted after this check),
203+
but helps diagnose common issues like missing files or permission errors more gracefully and early.
204+
Returns (ok, reason_if_not_ok).
205+
"""
206+
p = Path(str(path))
207+
try:
208+
if not p.exists():
209+
return False, "missing at load time"
210+
if not p.is_file():
211+
return False, "not a file at load time"
212+
# Optional: try opening for read to detect permission/locking issues early
213+
with p.open("rb"):
214+
pass
215+
return True, None
216+
except PermissionError:
217+
return False, "permission denied at load time"
218+
except OSError as e:
219+
return False, f"os error at load time: {e}"
220+
197221
def _resolve_cti_files_for_settings(self) -> list[str]:
198222
"""
199223
Resolve CTI files to load.
200224
201-
Option B policy (source marker + fallback):
202225
- User override (properties.gentl.cti_file/cti_files OR legacy properties.cti_file/cti_files):
203226
* strict: must exist, otherwise raise
204227
* source = "user"
@@ -208,6 +231,8 @@ def _resolve_cti_files_for_settings(self) -> list[str]:
208231
* source = "auto"
209232
- Default: discovery (env + configured patterns/dirs) => source = "auto"
210233
234+
NOTE : legacy properties.cti_file(s) always take strict precedence as user override if present,
235+
even if source marker says "auto".
211236
Never raise just because multiple CTIs exist.
212237
Raise only when none are found (after allowed fallback).
213238
"""
@@ -328,7 +353,7 @@ def _build_harvester_for_discovery(
328353

329354
candidates, diag = cti_finder.discover_cti_files(
330355
include_env=True,
331-
cti_search_paths=list(cls._DEFAULT_CTI_PATTERNS),
356+
cti_search_paths=list(cls._LEGACY_DEFAULT_CTI_PATTERNS),
332357
must_exist=True,
333358
)
334359

@@ -350,12 +375,18 @@ def _build_harvester_for_discovery(
350375
failures: list[tuple[str, str]] = []
351376

352377
for cti in cti_files:
378+
ok, reason = cls._cti_preflight(cti)
379+
if not ok:
380+
failures.append((str(cti), reason or "Check failed"))
381+
LOG.warning("Skipping CTI '%s' during discovery preflight: %s", cti, reason)
382+
continue
383+
353384
try:
354385
harvester.add_file(cti)
355386
loaded.append(cti)
356387
except Exception as exc:
357-
failures.append((cti, str(exc)))
358-
LOG.warning("Failed to load CTI '%s': %s", cti, exc)
388+
failures.append((str(cti), str(exc)))
389+
LOG.warning("Failed to load CTI '%s' during discovery: %s", cti, exc)
359390

360391
if not loaded:
361392
try:
@@ -367,12 +398,19 @@ def _build_harvester_for_discovery(
367398
try:
368399
harvester.update()
369400
except Exception as exc:
370-
LOG.warning("Harvester.update() failed during discovery: %s", exc)
401+
LOG.error(
402+
"Harvester.update() failed during discovery: %s "
403+
"Device list not usable, treating as discovery failure."
404+
"CTIs loaded before failure : %s",
405+
exc,
406+
loaded,
407+
)
371408
try:
372409
harvester.reset()
373410
except Exception:
374411
pass
375-
return None, loaded, diag
412+
# Update failure
413+
return None, [], diag
376414

377415
return harvester, loaded, diag
378416

@@ -403,11 +441,17 @@ def open(self) -> None:
403441
failed: list[tuple[str, str]] = []
404442

405443
for cti in cti_files:
444+
ok, reason = self._cti_preflight(cti)
445+
if not ok:
446+
failed.append((str(cti), reason or "preflight failed"))
447+
LOG.warning("Skipping CTI '%s': %s", cti, reason)
448+
continue
449+
406450
try:
407451
self._harvester.add_file(cti)
408452
loaded.append(cti)
409453
except Exception as exc:
410-
failed.append((cti, str(exc)))
454+
failed.append((str(cti), str(exc)))
411455
LOG.warning("Failed to load CTI '%s': %s", cti, exc)
412456

413457
# Persist diagnostics for UI / debugging

dlclivegui/cameras/backends/utils/gentl_discovery.py

Lines changed: 160 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,18 @@ class CTIDiscoveryDiagnostics:
2929
candidates: list[str] = field(default_factory=list)
3030
rejected: list[tuple[str, str]] = field(default_factory=list) # (path, reason)
3131

32-
def summarize(self) -> str:
32+
def summarize(self, redact_env: bool = True) -> str:
3333
lines = []
3434
if self.explicit_files:
3535
lines.append(f"Explicit CTI file(s): {self.explicit_files}")
3636
if self.glob_patterns:
3737
lines.append(f"CTI glob pattern(s): {self.glob_patterns}")
3838
if self.env_vars_used:
39-
# Keep raw env var values in summary; you can redact if needed
40-
lines.append(f"Env vars: {self.env_vars_used}")
39+
if redact_env:
40+
redacted_env = {k: ("<redacted>" if v else "<empty>") for k, v in self.env_vars_used.items()}
41+
lines.append(f"Env vars used: {redacted_env}")
42+
else:
43+
lines.append(f"Env vars used: {self.env_vars_used}")
4144
if self.env_paths_expanded:
4245
lines.append(f"Env-derived path entries: {self.env_paths_expanded}")
4346
if self.extra_dirs:
@@ -66,7 +69,7 @@ def _normalize_path(p: str) -> str:
6669
"""
6770
pp = Path(os.path.expandvars(os.path.expanduser(p)))
6871
try:
69-
# resolve(False) avoids raising if parts don't exist (py>=3.9)
72+
# resolve(strict=False) will resolve as much as possible without raising if the path doesn't exist
7073
return str(pp.resolve(strict=False))
7174
except Exception:
7275
return str(pp.absolute())
@@ -98,6 +101,136 @@ def _split_env_paths(raw: str) -> list[str]:
98101
return out
99102

100103

104+
_GLOB_META_CHARS = set("*?[")
105+
106+
107+
def _pattern_has_glob(s: str) -> bool:
108+
return any(ch in s for ch in _GLOB_META_CHARS)
109+
110+
111+
def _pattern_static_prefix(pattern: str) -> str:
112+
"""
113+
Return the substring up to the first glob metacharacter (* ? [).
114+
This is used as a "base path" to constrain globbing.
115+
"""
116+
for i, ch in enumerate(pattern):
117+
if ch in _GLOB_META_CHARS:
118+
return pattern[:i]
119+
return pattern
120+
121+
122+
def _is_path_within(child: Path, parent: Path) -> bool:
123+
"""
124+
Cross-version safe "is_relative_to" implementation.
125+
"""
126+
try:
127+
child.relative_to(parent)
128+
return True
129+
except Exception:
130+
return False
131+
132+
133+
def _validate_glob_pattern(
134+
pattern: str,
135+
*,
136+
allowed_roots: Sequence[str] | None = None,
137+
require_cti_suffix: bool = True,
138+
) -> tuple[bool, str | None]:
139+
"""
140+
Validate user-supplied glob patterns to reduce filesystem probing risk.
141+
142+
Rules (conservative but practical):
143+
- Must expand (~ and env vars) into an absolute-ish location (prefix must exist as a path parent)
144+
- Must not include '..' path traversal segments
145+
- Must have a non-trivial static prefix (not empty / not root-only like '/' or 'C:\\')
146+
- Optionally restrict to allowed roots (directories)
147+
- Optionally require that the pattern looks like it targets .cti files
148+
"""
149+
if not pattern or not str(pattern).strip():
150+
return False, "empty glob pattern"
151+
152+
expanded = os.path.expandvars(os.path.expanduser(pattern)).strip()
153+
154+
# Basic traversal guard
155+
parts = Path(expanded).parts
156+
if any(p == ".." for p in parts):
157+
return False, "glob pattern contains '..' traversal"
158+
159+
if require_cti_suffix:
160+
# Encourage patterns that clearly target CTIs, e.g. '*.cti' or 'foo*.cti'
161+
lower = expanded.lower()
162+
if ".cti" not in lower:
163+
return False, "glob pattern does not target .cti files"
164+
165+
# Compute static prefix up to first glob meta-char
166+
prefix = _pattern_static_prefix(expanded).strip()
167+
if not prefix:
168+
return False, "glob pattern has no static base path"
169+
170+
prefix_path = Path(prefix)
171+
172+
# If prefix is a file-like thing, use its parent as base; otherwise use itself.
173+
# Example: "C:\\dir\\*.cti" -> base = "C:\\dir"
174+
base = prefix_path.parent if prefix_path.suffix else prefix_path
175+
176+
# Prevent overly broad patterns like "/" or "C:\\"
177+
try:
178+
resolved_base = base.resolve(strict=False)
179+
except Exception:
180+
resolved_base = base
181+
182+
# If base is a drive root or filesystem root, reject
183+
# - POSIX: "/" -> parent == itself
184+
# - Windows: "C:\\" -> parent often == itself
185+
try:
186+
if resolved_base == resolved_base.parent:
187+
return False, "glob pattern base is filesystem root (too broad)"
188+
except Exception:
189+
# If we can't determine, err on conservative side
190+
return False, "glob pattern base could not be validated"
191+
192+
# Optional allowlist enforcement
193+
if allowed_roots:
194+
ok = False
195+
for root in allowed_roots:
196+
try:
197+
r = Path(_normalize_path(root))
198+
except Exception:
199+
r = Path(root)
200+
try:
201+
r_resolved = r.resolve(strict=False)
202+
except Exception:
203+
r_resolved = r
204+
205+
try:
206+
b_resolved = resolved_base.resolve(strict=False)
207+
except Exception:
208+
b_resolved = resolved_base
209+
210+
if _is_path_within(b_resolved, r_resolved):
211+
ok = True
212+
break
213+
if not ok:
214+
return False, "glob pattern base is outside allowed roots"
215+
216+
return True, None
217+
218+
219+
def _glob_limited(pattern: str, *, max_hits: int = 200) -> list[str]:
220+
"""
221+
Iterate matches with an upper bound to prevent expensive scans.
222+
Uses iglob to avoid materializing huge lists.
223+
"""
224+
out: list[str] = []
225+
# Note: recursive globbing via "**" typically requires recursive=True.
226+
# We intentionally keep recursive off here to reduce scanning.
227+
for hit in glob.iglob(pattern, recursive=False):
228+
out.append(hit)
229+
if len(out) >= max_hits:
230+
break
231+
return out
232+
233+
101234
def discover_cti_files(
102235
*,
103236
cti_file: str | None = None,
@@ -109,6 +242,9 @@ def discover_cti_files(
109242
recursive_env_search: bool = False,
110243
recursive_extra_search: bool = False,
111244
must_exist: bool = True,
245+
allow_globs: bool = True,
246+
root_globs_allowed: Sequence[str] | None = None,
247+
max_glob_hits_per_pattern: int = 200,
112248
) -> tuple[list[str], CTIDiscoveryDiagnostics]:
113249
"""
114250
Discover candidate GenTL producer (.cti) files from multiple sources.
@@ -117,7 +253,11 @@ def discover_cti_files(
117253
(candidates, diagnostics)
118254
119255
Notes:
120-
- If must_exist=True (recommended), only existing files are returned.
256+
- If must_exist=True (recommended), only existing files are returned at duscovery time.
257+
- Best-effort checks, files may still be missing at load time (e.g. deleted after discovery).
258+
- Callers should handle load-time errors gracefully regardless.
259+
- Glob patterns can enumerate filesystem entries is user-controlled.
260+
Use allow_globs=False to disable globbing and treat patterns as literal paths.
121261
- Env vars are parsed as path lists; each entry may be a directory OR a .cti file.
122262
"""
123263
diag = CTIDiscoveryDiagnostics()
@@ -165,9 +305,22 @@ def _add_candidate(path: str, reason_ctx: str) -> None:
165305

166306
# Process glob patterns
167307
for pat in patterns:
168-
# Normalize only for readability; glob needs pattern semantics, so we expanduser/vars but keep globbing
169308
expanded_pat = os.path.expandvars(os.path.expanduser(pat))
170-
for hit in glob.glob(expanded_pat):
309+
310+
if not allow_globs:
311+
rejected.append((_normalize_path(expanded_pat), "glob patterns disabled"))
312+
continue
313+
314+
ok, reason = _validate_glob_pattern(
315+
expanded_pat,
316+
allowed_roots=root_globs_allowed,
317+
require_cti_suffix=True,
318+
)
319+
if not ok:
320+
rejected.append((_normalize_path(expanded_pat), f"glob pattern rejected: {reason}"))
321+
continue
322+
323+
for hit in _glob_limited(expanded_pat, max_hits=max_glob_hits_per_pattern):
171324
_add_candidate(hit, f"glob:{pat}")
172325

173326
# Process env var entries

0 commit comments

Comments
 (0)