Skip to content

Commit e1c89ee

Browse files
committed
feat: expand time range for query
1 parent 5f7e395 commit e1c89ee

4 files changed

Lines changed: 49 additions & 12 deletions

File tree

app/app.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,10 @@ app:
4949
# each strategy enables one type of weak cross span_set connection when strong evidence (tcp_seq/x_request_id/span_id) is absent:
5050
# net_span_c_to_s_via_trace_id: connect a client-side leaf NetSpan to a server-side root NetSpan
5151
# when they share the same trace_id but have different tcp_seq
52-
span_set_connection_strategies: []
52+
span_set_connection_strategies: []
53+
# when in multiple trace_id scenarios, we may miss some data due to time range not in query window
54+
# in default, query time range is fixed value from front-end, which is [-1m:+1m]
55+
# but in async request-response scenarios, we have no idea how long an async response would be sent after request
56+
# it may lead to a missing result
57+
# so we use config for extra time range expansion, default: 0, unit: s
58+
iteration_expand_time_range: 0

app/app/application/l7_flow_tracing.py

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ async def query_and_trace_flowmetas(
284284
max_iteration: int = config.max_iteration,
285285
network_delay_us: int = config.network_delay_us,
286286
host_clock_offset_us: int = config.host_clock_offset_us,
287-
app_spans_from_api: list = []) -> Tuple[Set, list]:
287+
app_spans_from_api: list = []) -> Tuple[Set, list, str]:
288288
"""多次迭代,查询可追踪到的所有 l7_flow_log 的摘要
289289
参数说明:
290290
time_filter: 查询的时间范围过滤条件,SQL表达式
@@ -309,12 +309,17 @@ async def query_and_trace_flowmetas(
309309
visited_trace_ids = set() # 已访问过的 trace_id 集合
310310
new_trace_ids_for_next_iteration = set() # 给下一轮迭代查询用的 trace_id 集合
311311

312+
# during iteration, maybe update query time range based on config.iteration_expand_time_range
313+
# in this case, should updated global query time range for all l7_flow_ids
314+
min_start_time = self.start_time
315+
max_end_time = self.end_time
316+
312317
# Query1: 先获取 _id 对应的数据
313-
# don't use timefilter here, querier would extract time from _id (e.g.: id>>32)
318+
# don't use time_filter here, querier would extract time from _id (e.g.: id>>32)
314319
dataframe_flowmetas = await self.query_flowmetas("1=1", base_filter)
315320
if type(dataframe_flowmetas) != DataFrame or dataframe_flowmetas.empty:
316321
# when app_spans_from_api got values from api, return it
317-
return set(), app_spans_from_api
322+
return set(), app_spans_from_api, time_filter
318323
l7_flow_ids = set(dataframe_flowmetas['_id']) # set(flow._id)
319324

320325
# 用于下一轮迭代,记录元信息
@@ -355,6 +360,13 @@ async def query_and_trace_flowmetas(
355360
max_iteration = 1
356361

357362
config.tracing_source = config.tracing_source or DEFAULT_TRACING_SOURCE
363+
# 注意扩大时间范围能力只用于搜索多 trace_id,其他关联搜索(tcp_seq, x_req_id, syscall)不要扩大查询范围
364+
# 当什么也不配置时,查询范围是 _id 对应的 l7_flow_log 的时间前后各推一分钟,由前端传参决定
365+
trace_id_time_filter = time_filter
366+
if config.iteration_expand_time_range:
367+
start_time = self.start_time - config.iteration_expand_time_range
368+
end_time = self.end_time + config.iteration_expand_time_range
369+
trace_id_time_filter = f"time>={start_time} AND time<={end_time}"
358370

359371
# 进行迭代查询,上限为 config.spec.max_iteration
360372
for i in range(max_iteration):
@@ -384,7 +396,7 @@ async def query_and_trace_flowmetas(
384396
# Query2: 基于 trace_id 获取相关数据,第一层迭代
385397
new_trace_id_flows = pd.DataFrame()
386398
new_trace_id_flows = await self.query_flowmetas(
387-
time_filter, ' AND '.join(query_trace_filters))
399+
trace_id_time_filter, ' AND '.join(query_trace_filters))
388400

389401
# 已查询过一次的 trace_id,直接加入 visited,不需要再被查询
390402
# new_trace_ids_for_next_iteration 已被查询,可在这一步直接清空
@@ -444,6 +456,17 @@ async def query_and_trace_flowmetas(
444456
dataframe_flowmetas = self.concat_l7_flow_log_dataframe(
445457
[dataframe_flowmetas, new_trace_id_flows])
446458
l7_flow_ids = set(dataframe_flowmetas['_id'])
459+
if config.iteration_expand_time_range:
460+
trace_min_s = int(
461+
new_trace_id_flows['start_time_us'].min() //
462+
1_000_000)
463+
trace_max_s = int(
464+
new_trace_id_flows['end_time_us'].max() //
465+
1_000_000)
466+
if trace_min_s < min_start_time:
467+
min_start_time = trace_min_s
468+
if trace_max_s > max_end_time:
469+
max_end_time = trace_max_s
447470

448471
new_trace_req_tcp_seqs, new_trace_resp_tcp_seqs, new_trace_x_request_ids, new_trace_syscall_trace_ids, new_request_ids = _build_simple_trace_info_from_dataframe(
449472
new_trace_id_flows)
@@ -547,9 +570,13 @@ async def query_and_trace_flowmetas(
547570
elif not new_filters: # only new_trace_ids_for_next_iteration got values
548571
continue
549572

573+
# 关联查询(tcp_seq/syscall/dns/xreqid)的时间范围应跟随已查到的 trace_id flows 的实际时间跨度
574+
# min_start_time/max_end_time 在每次 append 新 flows 时增量更新,此处直接使用
575+
assoc_time_filter = f"time>={min_start_time} AND time<={max_end_time}"
576+
550577
# Query3: 查询上述基于 Condition[123] 构建出的条件,即与【第一层迭代】关联的所有 flow,此处构建【第二层迭代】查询
551578
new_flows = pd.DataFrame()
552-
new_flows = await self.query_flowmetas(time_filter,
579+
new_flows = await self.query_flowmetas(assoc_time_filter,
553580
' OR '.join(new_filters))
554581
if type(new_flows
555582
) != DataFrame or new_flows.empty: # no more new flows
@@ -630,8 +657,9 @@ async def query_and_trace_flowmetas(
630657
break
631658

632659
# end of `for i in range(max_iteration)`
633-
634-
return l7_flow_ids, app_spans_from_external
660+
# update final_time_filter for all l7_flow_ids
661+
final_time_filter = f"time>={min_start_time} AND time<={max_end_time}"
662+
return l7_flow_ids, app_spans_from_external, final_time_filter
635663

636664
async def trace_l7_flow(
637665
self,
@@ -654,7 +682,7 @@ async def trace_l7_flow(
654682
network_delay_us: 使用Flowmeta进行流日志匹配的时间偏差容忍度,越大漏报率越低但误报率越高,一般设置为网络时延的最大可能值
655683
"""
656684
# 多次迭代,查询到所有相关的 l7_flow_log 摘要
657-
l7_flow_ids, app_spans_from_external = await self.query_and_trace_flowmetas(
685+
l7_flow_ids, app_spans_from_external, final_time_filter = await self.query_and_trace_flowmetas(
658686
time_filter, base_filter, max_iteration, network_delay_us,
659687
host_clock_offset_us, app_spans_from_api)
660688

@@ -668,8 +696,8 @@ async def trace_l7_flow(
668696
return_fields.append("attribute")
669697
l7_flows = pd.DataFrame()
670698
if len(l7_flow_ids) > 0:
671-
l7_flows = await self.query_all_flows(time_filter, l7_flow_ids,
672-
return_fields)
699+
l7_flows = await self.query_all_flows(final_time_filter,
700+
l7_flow_ids, return_fields)
673701
if type(l7_flows) != DataFrame or l7_flows.empty:
674702
# 一般不可能发生没有 l7_flows 但有 app_spans_from_external 的情况
675703
# 实际上几乎不可能发生没有 l7_flows 的情况,因为至少包含初始 flow

app/app/application/tracing_completion.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ async def query(self):
4747
for j in range(len(self.app_spans)):
4848
if i == j:
4949
continue
50-
related_map_from_api[self.app_spans[i]['_id']][self.app_spans[j]['_id']] |= L7_FLOW_RELATIONSHIP_SPAN_ID
50+
related_map_from_api[self.app_spans[i]['_id']][
51+
self.app_spans[j]['_id']] |= L7_FLOW_RELATIONSHIP_SPAN_ID
5152
rst = await self.trace_l7_flow(
5253
time_filter,
5354
base_filter,

app/app/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def parse_spec(self, cfg):
3131
if not isinstance(strategies, list):
3232
strategies = []
3333
self.span_set_connection_strategies = strategies
34+
self.iteration_expand_time_range = int(
35+
spec.get('iteration_expand_time_range', 0))
3436

3537
def parse_querier(self, cfg):
3638
querier = cfg.get('querier', dict())

0 commit comments

Comments
 (0)