@@ -513,14 +513,14 @@ async def trace_l7_flow(
513513 l7_flows = l7_flows .where (l7_flows .notnull (), None )
514514
515515 # 对所有调用日志排序,包含几个动作:排序+合并+分组+设置父子关系
516- l7_flows_merged , networks , flow_index_to_id0 , related_flow_index_map , host_clock_correction = sort_all_flows (
516+ l7_flows_merged , networks , flow_index_to_id0 , related_flow_index_map , host_clock_correction , instance_to_agent = sort_all_flows (
517517 l7_flows , network_delay_us , host_clock_offset_us , return_fields )
518518 if related_map_from_api :
519519 related_flow_index_map .update (related_map_from_api )
520520 return format_final_result (l7_flows_merged , networks ,
521521 self .args .get ('_id' ), host_clock_offset_us ,
522522 flow_index_to_id0 , related_flow_index_map ,
523- host_clock_correction )
523+ host_clock_correction , instance_to_agent )
524524
525525 async def query_ck (self , sql : str ):
526526 querier = Querier (to_dataframe = True ,
@@ -1181,7 +1181,7 @@ def append_span_node(self, span: 'SpanNode'):
11811181 self .spans .append (span )
11821182
11831183 def set_parent_relation (
1184- self , host_clock_offset_us : int ,
1184+ self , host_clock_offset_us : int , network_delay_us : int ,
11851185 host_clock_correct_callback : Callable [['SpanNode' , 'SpanNode' ],
11861186 None ]):
11871187 """
@@ -1192,11 +1192,12 @@ def set_parent_relation(
11921192 child = self .spans [i ]
11931193 parent = self .spans [i - 1 ]
11941194 if child .agent_id != parent .agent_id :
1195- if not _range_overlap (
1196- child .flow ['start_time_us' ], child .flow ['end_time_us' ],
1197- parent .flow ['start_time_us' ],
1198- parent .flow ['end_time_us' ], host_clock_offset_us ):
1199- # 当 child/parent 来自不同的主机,但时差超出 host_clock_offset_us,应认为二者无关
1195+ if not _range_overlap (child .flow ['start_time_us' ],
1196+ child .flow ['end_time_us' ],
1197+ parent .flow ['start_time_us' ],
1198+ parent .flow ['end_time_us' ],
1199+ host_clock_offset_us + network_delay_us ):
1200+ # 当 child/parent 来自不同的主机,但时差超出 host_clock_offset_us + network_delay_us,应认为二者无关
12001201 continue
12011202 if self .spans [i ].signal_source == self .spans [
12021203 i - 1 ].signal_source == L7_FLOW_SIGNAL_SOURCE_EBPF :
@@ -1325,6 +1326,7 @@ def __init__(self, flow: dict):
13251326 self .parent : SpanNode = None
13261327 self .tap_side = flow ['tap_side' ]
13271328 self .agent_id = flow ['vtap_id' ]
1329+ self .auto_instance = _get_auto_instance (self )
13281330
13291331 def __eq__ (self , other : 'SpanNode' ) -> bool :
13301332 return self .get_flow_index () == other .get_flow_index ()
@@ -1783,8 +1785,18 @@ def indirect_attach_client_sys_span_via_sys_span(
17831785 self .mounted_callback )
17841786 return True
17851787
1788+ def _get_auto_instance_name (span : SpanNode ) -> str :
1789+ """
1790+ get auto_instance name for span
1791+ only for Ebpf/Packet signal source
1792+ """
1793+ return span .flow ["auto_instance_0" ] if span .tap_side .startswith ('c' ) and span .tap_side != "app" else span .flow ["auto_instance_1" ]
17861794
17871795def _get_auto_instance (span : SpanNode ) -> str :
1796+ """
1797+ get auto_instance of span
1798+ note: incase we only get app span(maybe from tracing_completion api), we need to use app_instance to fix `instance`
1799+ """
17881800 server_side_key = 'auto_instance_id_1'
17891801 client_side_key = 'auto_instance_id_0'
17901802 # 对 x-app 位置的 flow,有可能 auto_instance_id=0,说明是外部资源
@@ -2000,6 +2012,9 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
20002012 for flow in flows :
20012013 flow_index_to_id0 [flow ['_index' ]] = flow ['_id' ][0 ]
20022014
2015+ # auto_instance => agent_id
2016+ # data from Packet,Ebpf, for OTel instance mapping
2017+ instance_to_agent = dict ()
20032018 network_spans : List [NetworkSpanNode ] = []
20042019 app_spans : List [AppSpanNode ] = []
20052020 server_sys_spans : List [SysSpanNode ] = []
@@ -2018,9 +2033,13 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
20182033 server_sys_spans .append (span )
20192034 else :
20202035 client_sys_spans .append (span )
2036+ if span .auto_instance != "" :
2037+ instance_to_agent [_get_auto_instance_name (span )] = span .agent_id
20212038 elif flow ['signal_source' ] == L7_FLOW_SIGNAL_SOURCE_PACKET :
20222039 span = NetworkSpanNode (flow )
20232040 network_spans .append (span )
2041+ if span .auto_instance != "" :
2042+ instance_to_agent [_get_auto_instance_name (span )] = span .agent_id
20242043 elif flow ['signal_source' ] == L7_FLOW_SIGNAL_SOURCE_OTEL :
20252044 span = AppSpanNode (flow )
20262045 app_spans .append (span )
@@ -2054,7 +2073,7 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
20542073
20552074 network_span_list = _build_network_span_set (
20562075 united_spans , related_flow_index_map , flow_index_to_span ,
2057- host_clock_offset_us ,
2076+ host_clock_offset_us , network_delay_us ,
20582077 host_clock_corrector .calculate_host_clock_correction )
20592078
20602079 ### Process Span Set 分离
@@ -2091,7 +2110,7 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
20912110 host_clock_corrector .calculate_host_clock_correction )
20922111
20932112 return process_span_list , network_span_list , flow_index_to_id0 , related_flow_index_map , host_clock_corrector .tidy_host_clock_correction (
2094- )
2113+ ), instance_to_agent
20952114
20962115
20972116def _union_app_spans (
@@ -2100,7 +2119,8 @@ def _union_app_spans(
21002119 host_clock_correct_callback : Callable [[SpanNode , SpanNode ], None ]
21012120) -> Dict [str , List [ProcessSpanSet ]]:
21022121 for span in app_spans :
2103- auto_instance = _get_auto_instance (span )
2122+ auto_instance = span .auto_instance
2123+ # auto_instance = _get_auto_instance(span)
21042124 if auto_instance not in process_span_map :
21052125 sp_span_pss = ProcessSpanSet (auto_instance )
21062126 sp_span_pss .mounted_callback = host_clock_correct_callback
@@ -2158,7 +2178,7 @@ def _union_sys_spans(
21582178 # 如果没有 app_span 时,不要做无效扫描
21592179 if len (process_span_map ) > 0 :
21602180 for span in client_sys_spans + server_sys_spans : # 先 c-p 后 s-p
2161- auto_instance = _get_auto_instance ( span )
2181+ auto_instance = span . auto_instance
21622182 for sp_span_pss in process_span_map .get (auto_instance , []):
21632183 if not sp_span_pss .attach_sys_span_via_app_span (span ):
21642184 # 这里 attach 失败,但可能关联关系在同一进程其他的 app_span 内,继续尝试
@@ -2172,7 +2192,7 @@ def _union_sys_spans(
21722192 for span in server_sys_spans :
21732193 if span .process_span_set is not None :
21742194 continue
2175- auto_instance = _get_auto_instance ( span )
2195+ auto_instance = span . auto_instance
21762196 if auto_instance not in process_span_map :
21772197 sp_span_pss = ProcessSpanSet (auto_instance )
21782198 sp_span_pss .mounted_callback = host_clock_correct_callback
@@ -2193,7 +2213,7 @@ def _union_sys_spans(
21932213 span = client_sys_spans [i ]
21942214 if span .process_span_set is not None :
21952215 continue
2196- auto_instance = _get_auto_instance ( span )
2216+ auto_instance = span . auto_instance
21972217 # 最终需要上挂的目标 s-p
21982218 target_sp = None
21992219 target_mounted_info = ""
@@ -2230,7 +2250,7 @@ def _union_sys_spans(
22302250 span = client_sys_spans [i ]
22312251 if span .process_span_set is not None :
22322252 continue
2233- auto_instance = _get_auto_instance ( span )
2253+ auto_instance = span . auto_instance
22342254 # 如果找不到 auto_instance,说明没有 s-p,c-p 应作为独立的 ProcessSpanSet
22352255 # process_span_set 允许存在多个 c-p,但这些 c-p 如果没有关联关系,需要划分为多个 Process Span Set
22362256 group_key = ''
@@ -2259,6 +2279,7 @@ def _build_network_span_set(
22592279 united_spans : List [SpanNode ],
22602280 related_flow_index_map : defaultdict (inner_defaultdict_set ),
22612281 flow_index_to_span : List [SpanNode ], host_clock_offset_us : int ,
2282+ network_delay_us : int ,
22622283 host_clock_correct_callback : Callable [[SpanNode , SpanNode ], None ]
22632284) -> List [NetworkSpanSet ]:
22642285 networks : List [NetworkSpanSet ] = []
@@ -2288,7 +2309,7 @@ def _build_network_span_set(
22882309 ### 网络span排序
22892310 # 网络 span 按照 tap_side_rank 排序,顺序始终为:c -> 其他 -> s,并按采集器分组排序,同一采集器内按 start_time 排序
22902311 for network in networks :
2291- network .set_parent_relation (host_clock_offset_us ,
2312+ network .set_parent_relation (host_clock_offset_us , network_delay_us ,
22922313 host_clock_correct_callback )
22932314 return networks
22942315
@@ -2786,12 +2807,18 @@ def merge_service(services: List[ProcessSpanSet], traces: list,
27862807 return service_duration_metrics
27872808
27882809
2789- def correct_span_time (flows : dict , host_clock_correction : dict ):
2810+ def correct_span_time (flows : dict , host_clock_correction : dict ,
2811+ instance_to_agent : dict ):
27902812 """
27912813 基于 host_clock_correction 调整 span 的时间误差
27922814 """
27932815 for flow in flows :
2794- agent_id = flow .get ('vtap_id' ) # should be agent_id
2816+ if flow ['signal_source' ] != L7_FLOW_SIGNAL_SOURCE_OTEL :
2817+ agent_id = flow ['vtap_id' ]
2818+ else :
2819+ # OTel data maybe sent to different host and tag by different agent
2820+ # should verify `agent` by instance_to_agent record by Ebpf/Packet signal source
2821+ agent_id = instance_to_agent .get (flow ['auto_instance' ], flow ['vtap_id' ])
27952822 if host_clock_correction .get (agent_id , 0 ) != 0 :
27962823 flow ['start_time_us' ] += host_clock_correction [agent_id ]
27972824 flow ['end_time_us' ] += host_clock_correction [agent_id ]
@@ -2801,7 +2828,7 @@ def format_final_result(
28012828 services : List [ProcessSpanSet ], networks : List [NetworkSpanSet ], _id ,
28022829 host_clock_offset_us : int , flow_index_to_id0 : list ,
28032830 related_flow_index_map : defaultdict (inner_defaultdict_set ),
2804- host_clock_correction : dict ):
2831+ host_clock_correction : dict , instance_to_agent : dict ):
28052832 """
28062833 格式化返回结果
28072834 """
@@ -2816,7 +2843,7 @@ def format_final_result(
28162843 format_selftime (traces , trace , trace .get ("childs" , []), uid_index_map )
28172844 response ['services' ] = merge_service (services , traces , uid_index_map )
28182845 if host_clock_correction is not None :
2819- correct_span_time (traces , host_clock_correction )
2846+ correct_span_time (traces , host_clock_correction , instance_to_agent )
28202847 response ['host_clock_correction' ] = host_clock_correction
28212848 deepflow_span_ids = {
28222849 trace .get ('deepflow_span_id' )
@@ -3054,9 +3081,8 @@ def calculate_host_clock_correction(self, child: SpanNode,
30543081 return
30553082 # App & Sys Span 即使 agent_id 不一样,也可能是来自同一个 Pod(非本机 agent 接收数据),这种情况下时延差异纯粹来自于统计,而不是主机误差
30563083 # 对这种情况不应该计算误差,但仅对 App Span 有此逻辑,其他类型的数据时差一定来自主机误差
3057- parent_instance = _get_auto_instance (parent )
3058- if parent_instance != 0 and parent_instance == _get_auto_instance (
3059- child ):
3084+ if parent .auto_instance != '' and parent .auto_instance != '0' \
3085+ and parent .auto_instance == child .auto_instance :
30603086 return
30613087 # 如果其中一边是 App Span,则只计算毫秒级别的误差
30623088 # 正数向下取整,负数向上取整
0 commit comments