@@ -284,7 +284,7 @@ async def query_and_trace_flowmetas(
284284 max_iteration : int = config .max_iteration ,
285285 network_delay_us : int = config .network_delay_us ,
286286 host_clock_offset_us : int = config .host_clock_offset_us ,
287- app_spans_from_api : list = []) -> Tuple [Set , list ]:
287+ app_spans_from_api : list = []) -> Tuple [Set , list , str ]:
288288 """多次迭代,查询可追踪到的所有 l7_flow_log 的摘要
289289 参数说明:
290290 time_filter: 查询的时间范围过滤条件,SQL表达式
@@ -309,8 +309,13 @@ async def query_and_trace_flowmetas(
309309 visited_trace_ids = set () # 已访问过的 trace_id 集合
310310 new_trace_ids_for_next_iteration = set () # 给下一轮迭代查询用的 trace_id 集合
311311
312+ # during iteration, maybe update query time range based on config.multi_trace_id_query_time_range
313+ # in this case, should updated global query time range for all l7_flow_ids
314+ min_start_time = self .start_time
315+ max_end_time = self .end_time
316+
312317 # Query1: 先获取 _id 对应的数据
313- # don't use timefilter here, querier would extract time from _id (e.g.: id>>32)
318+ # don't use time_filter here, querier would extract time from _id (e.g.: id>>32)
314319 dataframe_flowmetas = await self .query_flowmetas ("1=1" , base_filter )
315320 if type (dataframe_flowmetas ) != DataFrame or dataframe_flowmetas .empty :
316321 # when app_spans_from_api got values from api, return it
@@ -355,6 +360,13 @@ async def query_and_trace_flowmetas(
355360 max_iteration = 1
356361
357362 config .tracing_source = config .tracing_source or DEFAULT_TRACING_SOURCE
363+ # 注意扩大时间范围能力只用于搜索多 trace_id,其他关联搜索(tcp_seq, x_req_id, syscall)不要扩大查询范围
364+ # 当什么也不配置时,查询范围是 _id 对应的 l7_flow_log 的时间前后各推一分钟,由前端传参决定
365+ trace_id_time_filter = time_filter
366+ if config .iteration_expand_time_range :
367+ start_time = self .start_time - config .iteration_expand_time_range
368+ end_time = self .end_time + config .iteration_expand_time_range
369+ trace_id_time_filter = f"time>={ start_time } AND time<={ end_time } "
358370
359371 # 进行迭代查询,上限为 config.spec.max_iteration
360372 for i in range (max_iteration ):
@@ -384,7 +396,7 @@ async def query_and_trace_flowmetas(
384396 # Query2: 基于 trace_id 获取相关数据,第一层迭代
385397 new_trace_id_flows = pd .DataFrame ()
386398 new_trace_id_flows = await self .query_flowmetas (
387- time_filter , ' AND ' .join (query_trace_filters ))
399+ trace_id_time_filter , ' AND ' .join (query_trace_filters ))
388400
389401 # 已查询过一次的 trace_id,直接加入 visited,不需要再被查询
390402 # new_trace_ids_for_next_iteration 已被查询,可在这一步直接清空
@@ -547,9 +559,34 @@ async def query_and_trace_flowmetas(
547559 elif not new_filters : # only new_trace_ids_for_next_iteration got values
548560 continue
549561
562+ # 关联查询(tcp_seq/syscall/dns/xreqid)的时间范围应跟随已查到的 trace_id flows 的实际时间跨度
563+ # 取已查询到的 trace_id 的左右界时间范围来做新的查询
564+ # when query(tcp_seq/syscall/dns/xreqid), query time range should follow the actual time range of all trace_ids
565+ # use the min/max time from trace_ids data
566+ assoc_start_time = self .start_time
567+ assoc_end_time = self .end_time
568+ # 如果没有配置 iteration_expand_time_range,不需执行此功能
569+ # if not config.iteration_expand_time_range, should use default time range
570+ if config .iteration_expand_time_range and not dataframe_flowmetas .empty :
571+ trace_min_s = int (dataframe_flowmetas ['start_time_us' ].min () //
572+ 1_000_000 )
573+ trace_max_s = int (dataframe_flowmetas ['end_time_us' ].max () //
574+ 1_000_000 )
575+ # use for current iteration, not for global
576+ if trace_min_s < assoc_start_time :
577+ assoc_start_time = trace_min_s
578+ if trace_max_s > assoc_end_time :
579+ assoc_end_time = trace_max_s
580+ # use for global l7_flow_ids query, not for current iteration
581+ if trace_min_s < min_start_time :
582+ min_start_time = trace_min_s
583+ if trace_max_s > max_end_time :
584+ max_end_time = trace_max_s
585+ assoc_time_filter = f"time>={ assoc_start_time } AND time<={ assoc_end_time } "
586+
550587 # Query3: 查询上述基于 Condition[123] 构建出的条件,即与【第一层迭代】关联的所有 flow,此处构建【第二层迭代】查询
551588 new_flows = pd .DataFrame ()
552- new_flows = await self .query_flowmetas (time_filter ,
589+ new_flows = await self .query_flowmetas (assoc_time_filter ,
553590 ' OR ' .join (new_filters ))
554591 if type (new_flows
555592 ) != DataFrame or new_flows .empty : # no more new flows
@@ -630,8 +667,9 @@ async def query_and_trace_flowmetas(
630667 break
631668
632669 # end of `for i in range(max_iteration)`
633-
634- return l7_flow_ids , app_spans_from_external
670+ # update final_time_filter for all l7_flow_ids
671+ final_time_filter = f"time>={ min_start_time } AND time<={ max_end_time } "
672+ return l7_flow_ids , app_spans_from_external , final_time_filter
635673
636674 async def trace_l7_flow (
637675 self ,
@@ -654,7 +692,7 @@ async def trace_l7_flow(
654692 network_delay_us: 使用Flowmeta进行流日志匹配的时间偏差容忍度,越大漏报率越低但误报率越高,一般设置为网络时延的最大可能值
655693 """
656694 # 多次迭代,查询到所有相关的 l7_flow_log 摘要
657- l7_flow_ids , app_spans_from_external = await self .query_and_trace_flowmetas (
695+ l7_flow_ids , app_spans_from_external , final_time_filter = await self .query_and_trace_flowmetas (
658696 time_filter , base_filter , max_iteration , network_delay_us ,
659697 host_clock_offset_us , app_spans_from_api )
660698
@@ -668,8 +706,8 @@ async def trace_l7_flow(
668706 return_fields .append ("attribute" )
669707 l7_flows = pd .DataFrame ()
670708 if len (l7_flow_ids ) > 0 :
671- l7_flows = await self .query_all_flows (time_filter , l7_flow_ids ,
672- return_fields )
709+ l7_flows = await self .query_all_flows (final_time_filter ,
710+ l7_flow_ids , return_fields )
673711 if type (l7_flows ) != DataFrame or l7_flows .empty :
674712 # 一般不可能发生没有 l7_flows 但有 app_spans_from_external 的情况
675713 # 实际上几乎不可能发生没有 l7_flows 的情况,因为至少包含初始 flow
0 commit comments