|
| 1 | +"""Source selection helpers. |
| 2 | +
|
| 3 | +This module contains the policy for combining multiple source types (local STIX |
| 4 | +bundles and TAXII) into the final set of domain sources used by the client. |
| 5 | +""" |
| 6 | + |
| 7 | +from __future__ import annotations |
| 8 | + |
| 9 | +from typing import Any |
| 10 | + |
| 11 | +from .local_loader import load_local_sources |
| 12 | +from .taxii_loader import load_taxii_sources |
| 13 | + |
| 14 | + |
| 15 | +def load_sources( |
| 16 | + *, |
| 17 | + enterprise: str | None = None, |
| 18 | + mobile: str | None = None, |
| 19 | + ics: str | None = None, |
| 20 | + connect_taxii: bool = True, |
| 21 | + proxies: dict | None = None, |
| 22 | + verify: bool = True, |
| 23 | + collection_url: str | None = None, |
| 24 | +) -> tuple[tuple[Any | None, Any | None, Any | None], dict[str, str | None], str, str | None]: |
| 25 | + """Load sources using a local-first policy with optional TAXII fallback. |
| 26 | +
|
| 27 | + Policy: |
| 28 | + - If local sources exist, use them. |
| 29 | + - If some local domains are missing and `connect_taxii=True`, fill missing domains from TAXII. |
| 30 | + - If no local sources exist: |
| 31 | + - If `connect_taxii=True`, load all domains from TAXII. |
| 32 | + - If `connect_taxii=False`: |
| 33 | + - Raise if the caller provided local paths (they were invalid). |
| 34 | + - Otherwise return an empty configuration. |
| 35 | +
|
| 36 | + Args: |
| 37 | + enterprise: Path to the local enterprise bundle (dir or JSON file). |
| 38 | + mobile: Path to the local mobile bundle (dir or JSON file). |
| 39 | + ics: Path to the local ICS bundle (dir or JSON file). |
| 40 | + connect_taxii: If `True`, allow TAXII fallback/fill behavior. |
| 41 | + proxies: Requests proxy configuration for TAXII. |
| 42 | + verify: Whether to verify TLS certificates for TAXII. |
| 43 | + collection_url: Base TAXII collections URL (ending in `/collections/`). |
| 44 | +
|
| 45 | + Returns |
| 46 | + ------- |
| 47 | + tuple |
| 48 | + A tuple `(sources, versions, mode, spec_version)` where: |
| 49 | + - `sources` is `(enterprise_source, mobile_source, ics_source)` |
| 50 | + - `versions` maps `enterprise|mobile|ics` to spec versions (or `None`) |
| 51 | + - `mode` is one of `local`, `taxii`, `mixed`, `empty` |
| 52 | + - `spec_version` is the unified spec version if known, else `None` |
| 53 | +
|
| 54 | + Raises |
| 55 | + ------ |
| 56 | + ValueError: If local paths were provided but none were loadable and |
| 57 | + `connect_taxii=False`. |
| 58 | + """ |
| 59 | + local_paths_provided = any((enterprise, mobile, ics)) |
| 60 | + |
| 61 | + (enterprise_source, mobile_source, ics_source), versions = load_local_sources( |
| 62 | + enterprise=enterprise, |
| 63 | + mobile=mobile, |
| 64 | + ics=ics, |
| 65 | + ) |
| 66 | + |
| 67 | + any_local = any((enterprise_source, mobile_source, ics_source)) |
| 68 | + if not any_local: |
| 69 | + if not connect_taxii: |
| 70 | + if local_paths_provided: |
| 71 | + raise ValueError("No valid local data sources found.") |
| 72 | + return (None, None, None), {"enterprise": None, "mobile": None, "ics": None}, "empty", None |
| 73 | + |
| 74 | + (enterprise_source, mobile_source, ics_source), versions = load_taxii_sources( |
| 75 | + proxies=proxies, |
| 76 | + verify=verify, |
| 77 | + collection_url=collection_url, |
| 78 | + ) |
| 79 | + return (enterprise_source, mobile_source, ics_source), versions, "taxii", "2.1" |
| 80 | + |
| 81 | + missing_any = any(ds is None for ds in (enterprise_source, mobile_source, ics_source)) |
| 82 | + if missing_any and connect_taxii: |
| 83 | + (taxii_enterprise, taxii_mobile, taxii_ics), taxii_versions = load_taxii_sources( |
| 84 | + proxies=proxies, |
| 85 | + verify=verify, |
| 86 | + collection_url=collection_url, |
| 87 | + ) |
| 88 | + if enterprise_source is None: |
| 89 | + enterprise_source = taxii_enterprise |
| 90 | + versions["enterprise"] = taxii_versions["enterprise"] |
| 91 | + if mobile_source is None: |
| 92 | + mobile_source = taxii_mobile |
| 93 | + versions["mobile"] = taxii_versions["mobile"] |
| 94 | + if ics_source is None: |
| 95 | + ics_source = taxii_ics |
| 96 | + versions["ics"] = taxii_versions["ics"] |
| 97 | + mode = "mixed" |
| 98 | + else: |
| 99 | + mode = "local" |
| 100 | + |
| 101 | + non_null_versions = {v for v in versions.values() if v is not None} |
| 102 | + spec_version = non_null_versions.pop() if len(non_null_versions) == 1 else None |
| 103 | + return (enterprise_source, mobile_source, ics_source), versions, mode, spec_version |
0 commit comments