Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions python/lib/sift_py/ingestion/config/yaml/load.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from pathlib import Path
from typing import Any, Dict, List, cast

import yaml

import sift_py.yaml.rule as rule_yaml
from sift_py.ingestion.config.yaml.error import YamlConfigError
from sift_py.ingestion.config.yaml.spec import (
Expand All @@ -11,7 +9,7 @@
)
from sift_py.yaml.channel import ChannelConfigYamlSpec, _validate_channel, _validate_channel_anchor
from sift_py.yaml.rule import RuleYamlSpec
from sift_py.yaml.utils import _type_fqn
from sift_py.yaml.utils import _type_fqn, try_fast_yaml_load

load_named_expression_modules = rule_yaml.load_named_expression_modules

Expand All @@ -22,7 +20,7 @@ def read_and_validate(path: Path) -> TelemetryConfigYamlSpec:
step will return an error whose source is the `yaml` package. Any errors that may occur during the
validation step will return a `sift_py.ingestion.config.yaml.error.YamlConfigError`.
"""
raw_config = _read_yaml(path)
raw_config = try_fast_yaml_load(path)
return _validate_yaml(raw_config)


Expand Down Expand Up @@ -88,11 +86,6 @@ def _validate_yaml(raw_config: Dict[Any, Any]) -> TelemetryConfigYamlSpec:
return cast(TelemetryConfigYamlSpec, raw_config)


def _read_yaml(path: Path) -> Dict[Any, Any]:
with open(path, "r") as f:
return cast(Dict[Any, Any], yaml.safe_load(f.read()))


def _validate_flow(val: Any):
flow = cast(Dict[Any, Any], val)

Expand Down
65 changes: 31 additions & 34 deletions python/lib/sift_py/yaml/calculated_channels.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from pathlib import Path
from typing import Any, Dict, List, cast

import yaml
from typing import List

from sift_py.calculated_channels.config import CalculatedChannelConfig
from sift_py.ingestion.config.yaml.error import YamlConfigError
from sift_py.yaml.utils import _handle_subdir
from sift_py.yaml.utils import _handle_subdir, try_fast_yaml_load


def load_calculated_channels(paths: List[Path]) -> List[CalculatedChannelConfig]:
Expand All @@ -29,36 +27,35 @@ def update_calculated_channels(path: Path):

def _read_calculated_channels_yaml(path: Path) -> List[CalculatedChannelConfig]:
calculated_channel_configs = []
with open(path, "r") as f:
channel_config_yaml = cast(Dict[str, Any], yaml.safe_load(f.read()))

calculated_channel_list = channel_config_yaml.get("calculated_channels", [])
for calc_channel in calculated_channel_list:
if not isinstance(calc_channel, dict):
channel_config_yaml = try_fast_yaml_load(path)

calculated_channel_list = channel_config_yaml.get("calculated_channels", [])
for calc_channel in calculated_channel_list:
if not isinstance(calc_channel, dict):
raise YamlConfigError(
f"Expected 'calculated_channels' to be a list of dictionaries in yaml: '{path}'"
)
for channel_ref in calc_channel.get("channel_references", []):
parsed_channel_refs = []
if not isinstance(channel_ref, dict):
raise YamlConfigError(
f"Expected 'calculated_channels' to be a list of dictionaries in yaml: '{path}'"
f"Expected 'channel_references' to be a list of dictionaries in yaml: '{path}'"
)
for channel_ref in calc_channel.get("channel_references", []):
parsed_channel_refs = []
if not isinstance(channel_ref, dict):
raise YamlConfigError(
f"Expected 'channel_references' to be a list of dictionaries in yaml: '{path}'"
)
if "channel_reference" not in channel_ref:
for k, v in channel_ref.items():
parsed_channel_refs.append(dict(channel_reference=k, channel_identifier=v))
else:
parsed_channel_refs.append(channel_ref)
calc_channel["channel_references"] = parsed_channel_refs

if not isinstance(calculated_channel_list, list):
raise YamlConfigError(f"Expected 'calculated_channels' to be a list in yaml: '{path}'")
if "channel_reference" not in channel_ref:
for k, v in channel_ref.items():
parsed_channel_refs.append(dict(channel_reference=k, channel_identifier=v))
else:
parsed_channel_refs.append(channel_ref)
calc_channel["channel_references"] = parsed_channel_refs

if not isinstance(calculated_channel_list, list):
raise YamlConfigError(f"Expected 'calculated_channels' to be a list in yaml: '{path}'")

for calc_channel in calculated_channel_list:
try:
calc_channel_cfg = CalculatedChannelConfig(**calc_channel)
calculated_channel_configs.append(calc_channel_cfg)
except Exception as e:
raise YamlConfigError(f"Error parsing calculated channel '{calc_channel}'") from e

for calc_channel in calculated_channel_list:
try:
calc_channel_cfg = CalculatedChannelConfig(**calc_channel)
calculated_channel_configs.append(calc_channel_cfg)
except Exception as e:
raise YamlConfigError(f"Error parsing calculated channel '{calc_channel}'") from e

return calculated_channel_configs
return calculated_channel_configs
32 changes: 15 additions & 17 deletions python/lib/sift_py/yaml/report_templates.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, cast
from typing import List

import yaml
from typing_extensions import NotRequired, TypedDict

from sift_py.ingestion.config.yaml.error import YamlConfigError
from sift_py.report_templates.config import ReportTemplateConfig
from sift_py.yaml.utils import _handle_subdir
from sift_py.yaml.utils import _handle_subdir, try_fast_yaml_load


def load_report_templates(paths: List[Path]) -> List[ReportTemplateConfig]:
Expand All @@ -31,23 +30,22 @@ def update_report_templates(path: Path):

def _read_report_template_yaml(path: Path) -> List[ReportTemplateConfig]:
report_templates = []
with open(path, "r") as f:
report_templates_yaml = cast(Dict[str, Any], yaml.safe_load(f.read()))
report_templates_yaml = try_fast_yaml_load(path)

report_template_list = report_templates_yaml.get("report_templates")
if not isinstance(report_template_list, list):
raise YamlConfigError(
f"Expected 'report_templates' to be a list in report template yaml: '{path}'"
)
report_template_list = report_templates_yaml.get("report_templates")
if not isinstance(report_template_list, list):
raise YamlConfigError(
f"Expected 'report_templates' to be a list in report template yaml: '{path}'"
)

for report_template in report_template_list:
try:
report_template_config = ReportTemplateConfig(**report_template)
report_templates.append(report_template_config)
except Exception as e:
raise YamlConfigError(f"Error parsing report template '{report_template}'") from e
for report_template in report_template_list:
try:
report_template_config = ReportTemplateConfig(**report_template)
report_templates.append(report_template_config)
except Exception as e:
raise YamlConfigError(f"Error parsing report template '{report_template}'") from e

return report_templates
return report_templates


class ReportTemplateYamlSpec(TypedDict):
Expand Down
47 changes: 22 additions & 25 deletions python/lib/sift_py/yaml/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from pathlib import Path
from typing import Any, Dict, List, Literal, Union, cast

import yaml
from typing_extensions import NotRequired, TypedDict

from sift_py.ingestion.config.yaml.error import YamlConfigError
Expand All @@ -13,7 +12,7 @@
ChannelConfigYamlSpec,
_validate_channel_reference,
)
from sift_py.yaml.utils import _handle_subdir, _type_fqn
from sift_py.yaml.utils import _handle_subdir, _type_fqn, try_fast_yaml_load

_SUB_EXPRESSION_REGEX = re.compile(r"^\$[a-zA-Z_]+$")

Expand Down Expand Up @@ -63,36 +62,34 @@ def update_rule_modules(rule_module_path: Path):


def _read_named_expression_module_yaml(path: Path) -> Dict[str, str]:
with open(path, "r") as f:
named_expressions = cast(Dict[Any, Any], yaml.safe_load(f.read()))
named_expressions = try_fast_yaml_load(path)

for key, value in named_expressions.items():
if not isinstance(key, str):
raise YamlConfigError(
f"Expected '{key}' to be a string in named expression module '{path}'."
)
if not isinstance(value, str):
raise YamlConfigError(
f"Expected expression of '{key}' to be a string in named expression module '{path}'."
)
for key, value in named_expressions.items():
if not isinstance(key, str):
raise YamlConfigError(
f"Expected '{key}' to be a string in named expression module '{path}'."
)
if not isinstance(value, str):
raise YamlConfigError(
f"Expected expression of '{key}' to be a string in named expression module '{path}'."
)

return cast(Dict[str, str], named_expressions)
return cast(Dict[str, str], named_expressions)


def _read_rule_module_yaml(path: Path) -> List[RuleYamlSpec]:
with open(path, "r") as f:
module_rules = cast(Dict[Any, Any], yaml.safe_load(f.read()))
rules = module_rules.get("rules")
if not isinstance(rules, list):
raise YamlConfigError(
f"Expected '{rules}' to be a list in rule module yaml: '{path}'"
f"{_type_fqn(RuleYamlSpec)}"
)
module_rules = try_fast_yaml_load(path)
rules = module_rules.get("rules")
if not isinstance(rules, list):
raise YamlConfigError(
f"Expected '{rules}' to be a list in rule module yaml: '{path}'"
f"{_type_fqn(RuleYamlSpec)}"
)

for rule in cast(List[Any], rules):
_validate_rule(rule)
for rule in cast(List[Any], rules):
_validate_rule(rule)

return cast(List[RuleYamlSpec], rules)
return cast(List[RuleYamlSpec], rules)


def _validate_rule(val: Any):
Expand Down
16 changes: 15 additions & 1 deletion python/lib/sift_py/yaml/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from pathlib import Path
from typing import Callable, Type
from typing import Any, Callable, Dict, Type, cast

import yaml


def _handle_subdir(path: Path, file_handler: Callable):
Expand All @@ -13,3 +15,15 @@ def _handle_subdir(path: Path, file_handler: Callable):

def _type_fqn(typ: Type) -> str:
return f"{typ.__module__}.{typ.__name__}"


def try_fast_yaml_load(path: Path) -> Dict[Any, Any]:
"""
Try to load the YAML file using the CSafeLoader, which is faster than the pyyaml safe loader but not built into the wheel for earlier versions of python..
If the CSafeLoader is not available, use the pyyaml safe loader.
"""
with open(path, "r") as f:
if hasattr(yaml, "CSafeLoader"):
return cast(Dict[Any, Any], yaml.load(f.read(), Loader=yaml.CSafeLoader))
else:
return cast(Dict[Any, Any], yaml.safe_load(f.read()))
Loading