Skip to content

Commit 4a7b61e

Browse files
authored
Remove unused validation methods from telemetry_adapter
Removed several validation methods and their associated logic from telemetry_adapter.py, streamlining the telemetry input processing.
1 parent 3f66833 commit 4a7b61e

1 file changed

Lines changed: 0 additions & 387 deletions

File tree

src/ix_autonomy_assurance_case_runtime/telemetry_adapter.py

Lines changed: 0 additions & 387 deletions
Original file line numberDiff line numberDiff line change
@@ -384,390 +384,3 @@ def normalize(self, telemetry_input: TelemetryNormalizationInput) -> TelemetryAd
384384
captured_at_utc=telemetry_input.captured_at_utc,
385385
received_at_utc=telemetry_input.received_at_utc,
386386
payload=telemetry_input.payload,
387-
freshness_status=freshness_status,
388-
quality_flags=quality_flags,
389-
replay_record_id=telemetry_input.replay_record_id,
390-
)
391-
decision = (
392-
TelemetryAdapterDecision.ACCEPTED
393-
if envelope.can_support_runtime_evaluation()
394-
else TelemetryAdapterDecision.DEGRADED
395-
)
396-
return TelemetryAdapterReport(
397-
input_id=telemetry_input.input_id,
398-
decision=decision,
399-
findings=findings,
400-
envelope=envelope,
401-
)
402-
403-
def _validate_source_and_schema(
404-
self,
405-
telemetry_input: TelemetryNormalizationInput,
406-
source: TelemetrySource | None,
407-
schema: TelemetrySchema | None,
408-
) -> tuple[TelemetryAdapterFinding, ...]:
409-
"""Validate source and schema catalog references."""
410-
411-
findings: list[TelemetryAdapterFinding] = []
412-
if source is None:
413-
findings.append(
414-
TelemetryAdapterFinding(
415-
finding_id=f"input-{telemetry_input.input_id}-missing-source",
416-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
417-
message="Telemetry input references a missing source.",
418-
source_id=telemetry_input.source_id,
419-
)
420-
)
421-
if schema is None:
422-
findings.append(
423-
TelemetryAdapterFinding(
424-
finding_id=f"input-{telemetry_input.input_id}-missing-schema",
425-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
426-
message="Telemetry input references a missing schema.",
427-
schema_id=telemetry_input.schema_id,
428-
)
429-
)
430-
if source is not None and schema is not None and not source.supports_schema(
431-
schema.schema_id
432-
):
433-
findings.append(
434-
TelemetryAdapterFinding(
435-
finding_id=f"source-{source.source_id}-does-not-support-{schema.schema_id}",
436-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
437-
message="Telemetry source is not allowed to emit the requested schema.",
438-
source_id=source.source_id,
439-
schema_id=schema.schema_id,
440-
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
441-
)
442-
)
443-
return tuple(findings)
444-
445-
def _validate_replay_boundary(
446-
self,
447-
telemetry_input: TelemetryNormalizationInput,
448-
source: TelemetrySource,
449-
) -> tuple[tuple[TelemetryAdapterFinding, ...], tuple[TelemetryQualityFlag, ...]]:
450-
"""Validate replay metadata and return findings plus quality flags."""
451-
452-
findings: list[TelemetryAdapterFinding] = []
453-
flags: list[TelemetryQualityFlag] = []
454-
needs_replay_record = (
455-
source.kind is TelemetrySourceKind.LOG_REPLAY
456-
or telemetry_input.replay_record_id is not None
457-
)
458-
if source.kind is TelemetrySourceKind.LOG_REPLAY:
459-
flags.append(TelemetryQualityFlag.REPLAY_ONLY)
460-
if not needs_replay_record:
461-
return (), ()
462-
463-
if telemetry_input.replay_record_id is None:
464-
findings.append(
465-
TelemetryAdapterFinding(
466-
finding_id=f"source-{source.source_id}-missing-replay-record",
467-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
468-
message="Replay telemetry source requires a replay record ID.",
469-
source_id=source.source_id,
470-
)
471-
)
472-
return tuple(findings), tuple(flags)
473-
474-
replay_record = self._catalog.replay_record_by_id(telemetry_input.replay_record_id)
475-
if replay_record is None:
476-
severity = (
477-
TelemetryAdapterFindingSeverity.BLOCKER
478-
if self._policy.require_known_replay_record
479-
else TelemetryAdapterFindingSeverity.WARNING
480-
)
481-
findings.append(
482-
TelemetryAdapterFinding(
483-
finding_id=f"replay-{telemetry_input.replay_record_id}-missing",
484-
severity=severity,
485-
message="Telemetry input references a replay record not present in catalog.",
486-
source_id=source.source_id,
487-
replay_record_id=telemetry_input.replay_record_id,
488-
quality_flag=TelemetryQualityFlag.REPLAY_ONLY,
489-
)
490-
)
491-
flags.append(TelemetryQualityFlag.REPLAY_ONLY)
492-
return tuple(findings), tuple(flags)
493-
494-
flags.append(TelemetryQualityFlag.REPLAY_ONLY)
495-
if replay_record.source_id != telemetry_input.source_id:
496-
findings.append(
497-
TelemetryAdapterFinding(
498-
finding_id=f"replay-{replay_record.replay_record_id}-source-mismatch",
499-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
500-
message="Replay record source does not match telemetry input source.",
501-
source_id=telemetry_input.source_id,
502-
replay_record_id=replay_record.replay_record_id,
503-
)
504-
)
505-
if replay_record.schema_id != telemetry_input.schema_id:
506-
findings.append(
507-
TelemetryAdapterFinding(
508-
finding_id=f"replay-{replay_record.replay_record_id}-schema-mismatch",
509-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
510-
message="Replay record schema does not match telemetry input schema.",
511-
schema_id=telemetry_input.schema_id,
512-
replay_record_id=replay_record.replay_record_id,
513-
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
514-
)
515-
)
516-
517-
return tuple(findings), tuple(flags)
518-
519-
def _validate_timestamp_posture(
520-
self,
521-
telemetry_input: TelemetryNormalizationInput,
522-
) -> tuple[
523-
tuple[TelemetryAdapterFinding, ...],
524-
TelemetryFreshnessStatus,
525-
tuple[TelemetryQualityFlag, ...],
526-
]:
527-
"""Validate telemetry capture and receive timestamp posture."""
528-
529-
findings: list[TelemetryAdapterFinding] = []
530-
flags: list[TelemetryQualityFlag] = []
531-
captured_at = telemetry_input.captured_at
532-
received_at = telemetry_input.received_at
533-
534-
if captured_at > received_at:
535-
findings.append(
536-
TelemetryAdapterFinding(
537-
finding_id=f"input-{telemetry_input.input_id}-future-captured",
538-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
539-
message="Telemetry captured_at_utc is after received_at_utc.",
540-
quality_flag=TelemetryQualityFlag.FUTURE_TIMESTAMP,
541-
)
542-
)
543-
return (
544-
tuple(findings),
545-
TelemetryFreshnessStatus.FUTURE_DATED,
546-
(TelemetryQualityFlag.FUTURE_TIMESTAMP,),
547-
)
548-
549-
latency_seconds = int((received_at - captured_at).total_seconds())
550-
if latency_seconds > self._policy.max_latency_seconds:
551-
findings.append(
552-
TelemetryAdapterFinding(
553-
finding_id=f"input-{telemetry_input.input_id}-stale",
554-
severity=TelemetryAdapterFindingSeverity.WARNING,
555-
message=(
556-
"Telemetry latency exceeds adapter freshness policy and cannot "
557-
"support acceptance-oriented evaluation without degradation."
558-
),
559-
quality_flag=TelemetryQualityFlag.STALE_TIMESTAMP,
560-
)
561-
)
562-
flags.append(TelemetryQualityFlag.STALE_TIMESTAMP)
563-
return tuple(findings), TelemetryFreshnessStatus.STALE, tuple(flags)
564-
565-
return tuple(findings), TelemetryFreshnessStatus.CURRENT, tuple(flags)
566-
567-
def _validate_payload_schema(
568-
self,
569-
telemetry_input: TelemetryNormalizationInput,
570-
schema: TelemetrySchema,
571-
) -> tuple[tuple[TelemetryAdapterFinding, ...], tuple[TelemetryQualityFlag, ...]]:
572-
"""Validate telemetry payload against schema fields."""
573-
574-
findings: list[TelemetryAdapterFinding] = []
575-
flags: list[TelemetryQualityFlag] = []
576-
for required_field_name in schema.required_field_names():
577-
if required_field_name not in telemetry_input.payload:
578-
findings.append(
579-
TelemetryAdapterFinding(
580-
finding_id=(
581-
f"input-{telemetry_input.input_id}-missing-field-"
582-
f"{required_field_name}"
583-
),
584-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
585-
message="Telemetry payload is missing a required schema field.",
586-
schema_id=schema.schema_id,
587-
field_name=required_field_name,
588-
quality_flag=TelemetryQualityFlag.MISSING_FIELD,
589-
)
590-
)
591-
flags.append(TelemetryQualityFlag.MISSING_FIELD)
592-
593-
schema_field_names = {field.field_name for field in schema.fields}
594-
for field_name, value in telemetry_input.payload.items():
595-
field = schema.field_by_name(field_name)
596-
if field is None:
597-
findings.append(
598-
TelemetryAdapterFinding(
599-
finding_id=f"input-{telemetry_input.input_id}-unexpected-field-{field_name}",
600-
severity=TelemetryAdapterFindingSeverity.WARNING,
601-
message="Telemetry payload includes a field not declared by the schema.",
602-
schema_id=schema.schema_id,
603-
field_name=field_name,
604-
)
605-
)
606-
continue
607-
field_findings, field_flags = _validate_field_value(
608-
telemetry_input=telemetry_input,
609-
schema=schema,
610-
field=field,
611-
value=value,
612-
)
613-
findings.extend(field_findings)
614-
flags.extend(field_flags)
615-
616-
if not schema_field_names:
617-
findings.append(
618-
TelemetryAdapterFinding(
619-
finding_id=f"schema-{schema.schema_id}-has-no-fields",
620-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
621-
message="Telemetry schema has no fields.",
622-
schema_id=schema.schema_id,
623-
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
624-
)
625-
)
626-
flags.append(TelemetryQualityFlag.SCHEMA_MISMATCH)
627-
628-
return tuple(findings), tuple(flags)
629-
630-
def _validate_source_trust(
631-
self,
632-
source: TelemetrySource,
633-
) -> tuple[tuple[TelemetryAdapterFinding, ...], tuple[TelemetryQualityFlag, ...]]:
634-
"""Validate telemetry source trust posture."""
635-
636-
if source.can_support_acceptance() or (
637-
source.kind is TelemetrySourceKind.LOG_REPLAY
638-
and source.trust_level.rank >= TelemetryTrustLevel.MODERATE.rank
639-
):
640-
return (), ()
641-
642-
severity = (
643-
TelemetryAdapterFindingSeverity.BLOCKER
644-
if self._policy.reject_untrusted_sources
645-
else TelemetryAdapterFindingSeverity.WARNING
646-
)
647-
return (
648-
(
649-
TelemetryAdapterFinding(
650-
finding_id=f"source-{source.source_id}-trust-{source.trust_level.value}",
651-
severity=severity,
652-
message=(
653-
"Telemetry source trust level cannot support acceptance-oriented "
654-
"evaluation without degradation."
655-
),
656-
source_id=source.source_id,
657-
quality_flag=TelemetryQualityFlag.SOURCE_UNTRUSTED,
658-
),
659-
),
660-
(TelemetryQualityFlag.SOURCE_UNTRUSTED,),
661-
)
662-
663-
664-
def _validate_field_value(
665-
telemetry_input: TelemetryNormalizationInput,
666-
schema: TelemetrySchema,
667-
field: TelemetrySchemaField,
668-
value: Any,
669-
) -> tuple[tuple[TelemetryAdapterFinding, ...], tuple[TelemetryQualityFlag, ...]]:
670-
"""Validate one payload field value against its schema field definition."""
671-
672-
findings: list[TelemetryAdapterFinding] = []
673-
flags: list[TelemetryQualityFlag] = []
674-
if not _matches_field_type(field, value):
675-
findings.append(
676-
TelemetryAdapterFinding(
677-
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-type",
678-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
679-
message=(
680-
f"Telemetry field {field.field_name!r} does not match expected type "
681-
f"{field.field_type.value!r}."
682-
),
683-
schema_id=schema.schema_id,
684-
field_name=field.field_name,
685-
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
686-
)
687-
)
688-
flags.append(TelemetryQualityFlag.SCHEMA_MISMATCH)
689-
return tuple(findings), tuple(flags)
690-
691-
if field.field_type is TelemetryFieldType.ENUM and value not in field.allowed_values:
692-
findings.append(
693-
TelemetryAdapterFinding(
694-
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-enum",
695-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
696-
message=f"Telemetry enum field {field.field_name!r} has unsupported value.",
697-
schema_id=schema.schema_id,
698-
field_name=field.field_name,
699-
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
700-
)
701-
)
702-
flags.append(TelemetryQualityFlag.SCHEMA_MISMATCH)
703-
704-
if field.field_type.is_numeric():
705-
numeric_value = float(value)
706-
if field.minimum_value is not None and numeric_value < field.minimum_value:
707-
findings.append(
708-
TelemetryAdapterFinding(
709-
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-below-min",
710-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
711-
message=f"Telemetry numeric field {field.field_name!r} is below minimum.",
712-
schema_id=schema.schema_id,
713-
field_name=field.field_name,
714-
quality_flag=TelemetryQualityFlag.OUT_OF_RANGE,
715-
)
716-
)
717-
flags.append(TelemetryQualityFlag.OUT_OF_RANGE)
718-
if field.maximum_value is not None and numeric_value > field.maximum_value:
719-
findings.append(
720-
TelemetryAdapterFinding(
721-
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-above-max",
722-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
723-
message=f"Telemetry numeric field {field.field_name!r} is above maximum.",
724-
schema_id=schema.schema_id,
725-
field_name=field.field_name,
726-
quality_flag=TelemetryQualityFlag.OUT_OF_RANGE,
727-
)
728-
)
729-
flags.append(TelemetryQualityFlag.OUT_OF_RANGE)
730-
731-
if field.field_type is TelemetryFieldType.TIMESTAMP:
732-
try:
733-
_parse_utc_timestamp(str(value), f"telemetry field {field.field_name!r}")
734-
except ContractValueError:
735-
findings.append(
736-
TelemetryAdapterFinding(
737-
finding_id=f"input-{telemetry_input.input_id}-field-{field.field_name}-timestamp",
738-
severity=TelemetryAdapterFindingSeverity.BLOCKER,
739-
message=f"Telemetry timestamp field {field.field_name!r} is invalid.",
740-
schema_id=schema.schema_id,
741-
field_name=field.field_name,
742-
quality_flag=TelemetryQualityFlag.SCHEMA_MISMATCH,
743-
)
744-
)
745-
flags.append(TelemetryQualityFlag.SCHEMA_MISMATCH)
746-
747-
return tuple(findings), tuple(flags)
748-
749-
750-
def _matches_field_type(field: TelemetrySchemaField, value: Any) -> bool:
751-
"""Return whether a payload value matches the expected telemetry field type."""
752-
753-
if field.field_type is TelemetryFieldType.STRING:
754-
return isinstance(value, str)
755-
if field.field_type is TelemetryFieldType.INTEGER:
756-
return isinstance(value, int) and not isinstance(value, bool)
757-
if field.field_type is TelemetryFieldType.FLOAT:
758-
return isinstance(value, (int, float)) and not isinstance(value, bool)
759-
if field.field_type is TelemetryFieldType.BOOLEAN:
760-
return isinstance(value, bool)
761-
if field.field_type is TelemetryFieldType.ENUM:
762-
return isinstance(value, str)
763-
if field.field_type is TelemetryFieldType.TIMESTAMP:
764-
return isinstance(value, str)
765-
return False
766-
767-
768-
def _dedupe_quality_flags(
769-
quality_flags: tuple[TelemetryQualityFlag, ...],
770-
) -> tuple[TelemetryQualityFlag, ...]:
771-
"""Return quality flags in first-seen order without duplicates."""
772-
773-
return tuple(dict.fromkeys(quality_flags))

0 commit comments

Comments
 (0)