@@ -549,34 +549,72 @@ def _merge_platform_stats_rows(
549549 non_mergeable : list [dict [str , Any ]] = []
550550 invalid_count_warned = 0
551551
552- for row in rows :
553- normalized_row , normalized_timestamp , is_timestamp_valid = (
554- self ._normalize_platform_stats_row (row )
555- )
552+ def normalize_timestamp (value : Any ) -> str | None :
553+ if isinstance (value , datetime ):
554+ return self ._to_utc_iso (value )
555+ if isinstance (value , str ):
556+ timestamp = value .strip ()
557+ if not timestamp :
558+ return None
559+ if timestamp .endswith ("Z" ):
560+ timestamp = f"{ timestamp [:- 1 ]} +00:00"
561+ try :
562+ return self ._to_utc_iso (datetime .fromisoformat (timestamp ))
563+ except ValueError :
564+ return None
565+ return None
566+
567+ def build_key (row : dict [str , Any ]) -> tuple [str , str , str ] | None :
568+ normalized_timestamp = normalize_timestamp (row .get ("timestamp" ))
569+ if normalized_timestamp is not None :
570+ row ["timestamp" ] = normalized_timestamp
571+ platform_id = row .get ("platform_id" )
572+ platform_type = row .get ("platform_type" )
573+ if (
574+ normalized_timestamp is None
575+ or not isinstance (platform_id , str )
576+ or not isinstance (platform_type , str )
577+ ):
578+ return None
579+ return (normalized_timestamp , platform_id , platform_type )
580+
581+ def parse_count (
582+ raw_count : Any ,
583+ key_for_log : tuple [Any , Any , Any ],
584+ warned_count : int ,
585+ ) -> tuple [int , int ]:
586+ if warned_count >= PLATFORM_STATS_INVALID_COUNT_WARN_LIMIT :
587+ try :
588+ return int (raw_count ), warned_count
589+ except (TypeError , ValueError ):
590+ return 0 , warned_count
591+ try :
592+ return int (raw_count ), warned_count
593+ except (TypeError , ValueError ):
594+ logger .warning (
595+ "platform_stats count 非法,已按 0 处理: value=%r, key=%s" ,
596+ raw_count ,
597+ key_for_log ,
598+ )
599+ return 0 , warned_count + 1
556600
557- platform_id = normalized_row .get ("platform_id" )
558- platform_type = normalized_row .get ("platform_type" )
601+ for row in rows :
602+ normalized_row = dict (row )
603+ key = build_key (normalized_row )
559604 key_for_log = (
560- normalized_timestamp if is_timestamp_valid else "<invalid_timestamp>" ,
561- repr (platform_id ),
562- repr (platform_type ),
605+ normalized_row . get ( "timestamp" ) ,
606+ repr (normalized_row . get ( " platform_id" ) ),
607+ repr (normalized_row . get ( " platform_type" ) ),
563608 )
564- count , invalid_count_warned = self ._parse_platform_stats_count (
565- normalized_row .get ("count" , 0 ),
566- key_for_log ,
567- invalid_count_warned ,
609+ count , invalid_count_warned = parse_count (
610+ normalized_row .get ("count" , 0 ), key_for_log , invalid_count_warned
568611 )
569612 normalized_row ["count" ] = count
570613
571- if not is_timestamp_valid :
572- non_mergeable .append (normalized_row )
573- continue
574-
575- if not isinstance (platform_id , str ) or not isinstance (platform_type , str ):
614+ if key is None :
576615 non_mergeable .append (normalized_row )
577616 continue
578617
579- key = (normalized_timestamp , platform_id , platform_type )
580618 existing = merged .get (key )
581619 if existing is None :
582620 merged [key ] = normalized_row
@@ -585,71 +623,13 @@ def _merge_platform_stats_rows(
585623
586624 return [* non_mergeable , * merged .values ()]
587625
588- def _parse_platform_stats_count (
589- self ,
590- raw_count : Any ,
591- key_for_log : tuple [str , str , str ],
592- warned_count : int ,
593- ) -> tuple [int , int ]:
594- if warned_count >= PLATFORM_STATS_INVALID_COUNT_WARN_LIMIT :
595- try :
596- return int (raw_count ), warned_count
597- except (TypeError , ValueError ):
598- return 0 , warned_count
599- try :
600- return int (raw_count ), warned_count
601- except (TypeError , ValueError ):
602- logger .warning (
603- "platform_stats count 非法,已按 0 处理: value=%r, key=%s" ,
604- raw_count ,
605- key_for_log ,
606- )
607- return 0 , warned_count + 1
608-
609- def _normalize_platform_stats_row (
610- self , row : dict [str , Any ]
611- ) -> tuple [dict [str , Any ], str , bool ]:
612- normalized_row = dict (row )
613- raw_timestamp = normalized_row .get ("timestamp" )
614- normalized_timestamp = self ._normalize_platform_stats_timestamp (raw_timestamp )
615- if normalized_timestamp is None :
616- if isinstance (raw_timestamp , str ):
617- normalized_row ["timestamp" ] = raw_timestamp .strip ()
618- elif raw_timestamp is None :
619- normalized_row ["timestamp" ] = ""
620- else :
621- normalized_row ["timestamp" ] = str (raw_timestamp )
622- return normalized_row , normalized_row ["timestamp" ], False
623- normalized_row ["timestamp" ] = normalized_timestamp
624- return normalized_row , normalized_timestamp , True
625-
626626 def _to_utc_iso (self , dt : datetime ) -> str :
627627 if dt .tzinfo is None :
628628 dt = dt .replace (tzinfo = timezone .utc )
629629 else :
630630 dt = dt .astimezone (timezone .utc )
631631 return dt .isoformat ()
632632
633- def _normalize_platform_stats_timestamp (self , value : Any ) -> str | None :
634- if isinstance (value , datetime ):
635- return self ._to_utc_iso (value )
636-
637- if isinstance (value , str ):
638- timestamp = value .strip ()
639- if not timestamp :
640- return None
641- if timestamp .endswith ("Z" ):
642- timestamp = f"{ timestamp [:- 1 ]} +00:00"
643- try :
644- return self ._to_utc_iso (datetime .fromisoformat (timestamp ))
645- except ValueError :
646- return None
647-
648- if value is None :
649- return None
650-
651- return None
652-
653633 async def _import_knowledge_bases (
654634 self ,
655635 zf : zipfile .ZipFile ,
0 commit comments