Skip to content

Commit ca78a6e

Browse files
authored
Enhance telemetry input validation and error handling
Added validation for telemetry input freshness, schema, and source trust. Enhanced error handling for missing fields and replay records.
1 parent 4a7b61e commit ca78a6e

1 file changed

Lines changed: 386 additions & 0 deletions

File tree

src/ix_autonomy_assurance_case_runtime/telemetry_adapter.py

Lines changed: 386 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,389 @@ def normalize(self, telemetry_input: TelemetryNormalizationInput) -> TelemetryAd
384384
captured_at_utc=telemetry_input.captured_at_utc,
385385
received_at_utc=telemetry_input.received_at_utc,
386386
payload=telemetry_input.payload,
387+
freshness_status=freshness_status,
388+
quality_flags=quality_flags,
389+
replay_record_id=telemetry_input.replay_record_id,
390+
)
391+
decision = (
392+
TelemetryAdapterDecision.ACCEPTED
393+
if envelope.can_support_runtime_evaluation()
394+
else TelemetryAdapterDecision.DEGRADED
395+
)
396+
return TelemetryAdapterReport(
397+
input_id=telemetry_input.input_id,
398+
decision=decision,
399+
findings=findings,
400+
envelope=envelope,
401+
)
402+
403+
def _validate_source_and_schema(
404+
self,
405+
telemetry_input: TelemetryNormalizationInput,
406+
source: TelemetrySource | None,
407+
schema: TelemetrySchema | None,
408+
) -> tuple[TelemetryAdapterFinding, ...]:
409+
"""Validate source and schema catalog references."""
410+
411+
findings: list[TelemetryAdapterFinding] = []
412+
if source is None:
413+
findings.append(
414+
TelemetryAdapterFinding(
415+
finding_id=f"input-{telemetry_input.input_id}-missing-source",
416+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
417+
message="Telemetry input references a missing source.",
418+
source_id=telemetry_input.source_id,
419+
)
420+
)
421+
if schema is None:
422+
findings.append(
423+
TelemetryAdapterFinding(
424+
finding_id=f"input-{telemetry_input.input_id}-missing-schema",
425+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
426+
message="Telemetry input references a missing schema.",
427+
schema_id=telemetry_input.schema_id,
428+
)
429+
)
430+
if source is not None and schema is not None and not source.supports_schema(
431+
schema.schema_id
432+
):
433+
findings.append(
434+
TelemetryAdapterFinding(
435+
finding_id=f"source-{source.source_id}-does-not-support-{schema.schema_id}",
436+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
437+
message="Telemetry source is not allowed to emit the requested schema.",
438+
source_id=source.source_id,
439+
schema_id=schema.schema_id,
440+
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
441+
)
442+
)
443+
return tuple(findings)
444+
445+
def _validate_replay_boundary(
446+
self,
447+
telemetry_input: TelemetryNormalizationInput,
448+
source: TelemetrySource,
449+
) -> tuple[tuple[TelemetryAdapterFinding, ...], tuple[TelemetryQualityFlag, ...]]:
450+
"""Validate replay metadata and return findings plus quality flags."""
451+
452+
findings: list[TelemetryAdapterFinding] = []
453+
flags: list[TelemetryQualityFlag] = []
454+
needs_replay_record = (
455+
source.kind is TelemetrySourceKind.LOG_REPLAY
456+
or telemetry_input.replay_record_id is not None
457+
)
458+
if source.kind is TelemetrySourceKind.LOG_REPLAY:
459+
flags.append(TelemetryQualityFlag.REPLAY_ONLY)
460+
if not needs_replay_record:
461+
return (), ()
462+
463+
if telemetry_input.replay_record_id is None:
464+
findings.append(
465+
TelemetryAdapterFinding(
466+
finding_id=f"source-{source.source_id}-missing-replay-record",
467+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
468+
message="Replay telemetry source requires a replay record ID.",
469+
source_id=source.source_id,
470+
)
471+
)
472+
return tuple(findings), tuple(flags)
473+
474+
replay_record = self._catalog.replay_record_by_id(telemetry_input.replay_record_id)
475+
if replay_record is None:
476+
severity = (
477+
TelemetryAdapterFindingSeverity.BLOCKER
478+
if self._policy.require_known_replay_record
479+
else TelemetryAdapterFindingSeverity.WARNING
480+
)
481+
findings.append(
482+
TelemetryAdapterFinding(
483+
finding_id=f"replay-{telemetry_input.replay_record_id}-missing",
484+
severity=severity,
485+
message="Telemetry input references a replay record not present in catalog.",
486+
source_id=source.source_id,
487+
replay_record_id=telemetry_input.replay_record_id,
488+
quality_flag=TelemetryQualityFlag.REPLAY_ONLY,
489+
)
490+
)
491+
flags.append(TelemetryQualityFlag.REPLAY_ONLY)
492+
return tuple(findings), tuple(flags)
493+
494+
flags.append(TelemetryQualityFlag.REPLAY_ONLY)
495+
if replay_record.source_id != telemetry_input.source_id:
496+
findings.append(
497+
TelemetryAdapterFinding(
498+
finding_id=f"replay-{replay_record.replay_record_id}-source-mismatch",
499+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
500+
message="Replay record source does not match telemetry input source.",
501+
source_id=telemetry_input.source_id,
502+
replay_record_id=replay_record.replay_record_id,
503+
)
504+
)
505+
if replay_record.schema_id != telemetry_input.schema_id:
506+
findings.append(
507+
TelemetryAdapterFinding(
508+
finding_id=f"replay-{replay_record.replay_record_id}-schema-mismatch",
509+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
510+
message="Replay record schema does not match telemetry input schema.",
511+
schema_id=telemetry_input.schema_id,
512+
replay_record_id=replay_record.replay_record_id,
513+
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
514+
)
515+
)
516+
517+
return tuple(findings), tuple(flags)
518+
519+
def _validate_timestamp_posture(
520+
self,
521+
telemetry_input: TelemetryNormalizationInput,
522+
) -> tuple[
523+
tuple[TelemetryAdapterFinding, ...],
524+
TelemetryFreshnessStatus,
525+
tuple[TelemetryQualityFlag, ...],
526+
]:
527+
"""Validate telemetry capture and receive timestamp posture."""
528+
529+
findings: list[TelemetryAdapterFinding] = []
530+
flags: list[TelemetryQualityFlag] = []
531+
captured_at = telemetry_input.captured_at
532+
received_at = telemetry_input.received_at
533+
534+
if captured_at > received_at:
535+
findings.append(
536+
TelemetryAdapterFinding(
537+
finding_id=f"input-{telemetry_input.input_id}-future-captured",
538+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
539+
message="Telemetry captured_at_utc is after received_at_utc.",
540+
quality_flag=TelemetryQualityFlag.FUTURE_TIMESTAMP,
541+
)
542+
)
543+
return (
544+
tuple(findings),
545+
TelemetryFreshnessStatus.FUTURE_DATED,
546+
(TelemetryQualityFlag.FUTURE_TIMESTAMP,),
547+
)
548+
549+
latency_seconds = int((received_at - captured_at).total_seconds())
550+
if latency_seconds > self._policy.max_latency_seconds:
551+
findings.append(
552+
TelemetryAdapterFinding(
553+
finding_id=f"input-{telemetry_input.input_id}-stale",
554+
severity=TelemetryAdapterFindingSeverity.WARNING,
555+
message=(
556+
"Telemetry latency exceeds adapter freshness policy and cannot "
557+
"support acceptance-oriented evaluation without degradation."
558+
),
559+
quality_flag=TelemetryQualityFlag.STALE_TIMESTAMP,
560+
)
561+
)
562+
flags.append(TelemetryQualityFlag.STALE_TIMESTAMP)
563+
return tuple(findings), TelemetryFreshnessStatus.STALE, tuple(flags)
564+
565+
return tuple(findings), TelemetryFreshnessStatus.CURRENT, tuple(flags)
566+
567+
def _validate_payload_schema(
568+
self,
569+
telemetry_input: TelemetryNormalizationInput,
570+
schema: TelemetrySchema,
571+
) -> tuple[tuple[TelemetryAdapterFinding, ...], tuple[TelemetryQualityFlag, ...]]:
572+
"""Validate telemetry payload against schema fields."""
573+
574+
findings: list[TelemetryAdapterFinding] = []
575+
flags: list[TelemetryQualityFlag] = []
576+
for required_field_name in schema.required_field_names():
577+
if required_field_name not in telemetry_input.payload:
578+
findings.append(
579+
TelemetryAdapterFinding(
580+
finding_id=(
581+
f"input-{telemetry_input.input_id}-missing-field-"
582+
f"{required_field_name}"
583+
),
584+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
585+
message="Telemetry payload is missing a required schema field.",
586+
schema_id=schema.schema_id,
587+
field_name=required_field_name,
588+
quality_flag=TelemetryQualityFlag.MISSING_FIELD,
589+
)
590+
)
591+
flags.append(TelemetryQualityFlag.MISSING_FIELD)
592+
593+
schema_field_names = {field.field_name for field in schema.fields}
594+
for field_name, value in telemetry_input.payload.items():
595+
field = schema.field_by_name(field_name)
596+
if field is None:
597+
findings.append(
598+
TelemetryAdapterFinding(
599+
finding_id=f"input-{telemetry_input.input_id}-unexpected-field-{field_name}",
600+
severity=TelemetryAdapterFindingSeverity.WARNING,
601+
message="Telemetry payload includes a field not declared by the schema.",
602+
schema_id=schema.schema_id,
603+
field_name=field_name,
604+
)
605+
)
606+
continue
607+
field_findings, field_flags = _validate_field_value(
608+
telemetry_input=telemetry_input,
609+
schema=schema,
610+
field=field,
611+
value=value,
612+
)
613+
findings.extend(field_findings)
614+
flags.extend(field_flags)
615+
616+
if not schema_field_names:
617+
findings.append(
618+
TelemetryAdapterFinding(
619+
finding_id=f"schema-{schema.schema_id}-has-no-fields",
620+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
621+
message="Telemetry schema has no fields.",
622+
schema_id=schema.schema_id,
623+
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
624+
)
625+
)
626+
flags.append(TelemetryQualityFlag.SCHEMA_MISMATCH)
627+
628+
return tuple(findings), tuple(flags)
629+
630+
def _validate_source_trust(
631+
self,
632+
source: TelemetrySource,
633+
) -> tuple[tuple[TelemetryAdapterFinding, ...], tuple[TelemetryQualityFlag, ...]]:
634+
"""Validate telemetry source trust posture."""
635+
636+
if source.can_support_acceptance() or (
637+
source.kind is TelemetrySourceKind.LOG_REPLAY
638+
and source.trust_level.rank >= TelemetryTrustLevel.MODERATE.rank
639+
):
640+
return (), ()
641+
642+
severity = (
643+
TelemetryAdapterFindingSeverity.BLOCKER
644+
if self._policy.reject_untrusted_sources
645+
else TelemetryAdapterFindingSeverity.WARNING
646+
)
647+
return (
648+
(
649+
TelemetryAdapterFinding(
650+
finding_id=f"source-{source.source_id}-trust-{source.trust_level.value}",
651+
severity=severity,
652+
message=(
653+
"Telemetry source trust level cannot support acceptance-oriented "
654+
"evaluation without degradation."
655+
),
656+
source_id=source.source_id,
657+
quality_flag=TelemetryQualityFlag.SOURCE_UNTRUSTED,
658+
),
659+
),
660+
(TelemetryQualityFlag.SOURCE_UNTRUSTED,),
661+
)
662+
663+
664+
def _validate_field_value(
665+
telemetry_input: TelemetryNormalizationInput,
666+
schema: TelemetrySchema,
667+
field: TelemetrySchemaField,
668+
value: Any,
669+
) -> tuple[tuple[TelemetryAdapterFinding, ...], tuple[TelemetryQualityFlag, ...]]:
670+
"""Validate one payload field value against its schema field definition."""
671+
672+
findings: list[TelemetryAdapterFinding] = []
673+
flags: list[TelemetryQualityFlag] = []
674+
if not _matches_field_type(field, value):
675+
findings.append(
676+
TelemetryAdapterFinding(
677+
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-type",
678+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
679+
message=(
680+
f"Telemetry field {field.field_name!r} does not match expected type "
681+
f"{field.field_type.value!r}."
682+
),
683+
schema_id=schema.schema_id,
684+
field_name=field.field_name,
685+
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
686+
)
687+
)
688+
flags.append(TelemetryQualityFlag.SCHEMA_MISMATCH)
689+
return tuple(findings), tuple(flags)
690+
691+
if field.field_type is TelemetryFieldType.ENUM and value not in field.allowed_values:
692+
findings.append(
693+
TelemetryAdapterFinding(
694+
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-enum",
695+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
696+
message=f"Telemetry enum field {field.field_name!r} has unsupported value.",
697+
schema_id=schema.schema_id,
698+
field_name=field.field_name,
699+
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
700+
)
701+
)
702+
flags.append(TelemetryQualityFlag.SCHEMA_MISMATCH)
703+
704+
if field.field_type.is_numeric():
705+
numeric_value = float(value)
706+
if field.minimum_value is not None and numeric_value < field.minimum_value:
707+
findings.append(
708+
TelemetryAdapterFinding(
709+
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-below-min",
710+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
711+
message=f"Telemetry numeric field {field.field_name!r} is below minimum.",
712+
schema_id=schema.schema_id,
713+
field_name=field.field_name,
714+
quality_flag=TelemetryQualityFlag.OUT_OF_RANGE,
715+
)
716+
)
717+
flags.append(TelemetryQualityFlag.OUT_OF_RANGE)
718+
if field.maximum_value is not None and numeric_value > field.maximum_value:
719+
findings.append(
720+
TelemetryAdapterFinding(
721+
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-above-max",
722+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
723+
message=f"Telemetry numeric field {field.field_name!r} is above maximum.",
724+
schema_id=schema.schema_id,
725+
field_name=field.field_name,
726+
quality_flag=TelemetryQualityFlag.OUT_OF_RANGE,
727+
)
728+
)
729+
flags.append(TelemetryQualityFlag.OUT_OF_RANGE)
730+
731+
if field.field_type is TelemetryFieldType.TIMESTAMP:
732+
try:
733+
_parse_utc_timestamp(str(value), f"telemetry field {field.field_name!r}")
734+
except ContractValueError:
735+
findings.append(
736+
TelemetryAdapterFinding(
737+
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-timestamp",
738+
severity=TelemetryAdapterFindingSeverity.BLOCKER,
739+
message=f"Telemetry timestamp field {field.field_name!r} is invalid.",
740+
schema_id=schema.schema_id,
741+
field_name=field.field_name,
742+
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
743+
)
744+
)
745+
flags.append(TelemetryQualityFlag.SCHEMA_MISMATCH)
746+
747+
return tuple(findings), tuple(flags)
748+
749+
750+
def _matches_field_type(field: TelemetrySchemaField, value: Any) -> bool:
751+
"""Return whether a payload value matches the expected telemetry field type."""
752+
753+
if field.field_type is TelemetryFieldType.STRING:
754+
return isinstance(value, str)
755+
if field.field_type is TelemetryFieldType.INTEGER:
756+
return isinstance(value, int) and not isinstance(value, bool)
757+
if field.field_type is TelemetryFieldType.FLOAT:
758+
return isinstance(value, (int, float)) and not isinstance(value, bool)
759+
if field.field_type is TelemetryFieldType.BOOLEAN:
760+
return isinstance(value, bool)
761+
if field.field_type is TelemetryFieldType.ENUM:
762+
return isinstance(value, str)
763+
if field.field_type is TelemetryFieldType.TIMESTAMP:
764+
return isinstance(value, str)
765+
766+
767+
def _dedupe_quality_flags(
768+
quality_flags: tuple[TelemetryQualityFlag, ...],
769+
) -> tuple[TelemetryQualityFlag, ...]:
770+
"""Return quality flags in first-seen order without duplicates."""
771+
772+
return tuple(dict.fromkeys(quality_flags))

0 commit comments

Comments
 (0)