Skip to content

Commit 4950757

Browse files
taloricsharang
authored andcommitted
fix: link grpc frame by request_id
1 parent f2548b4 commit 4950757

1 file changed

Lines changed: 62 additions & 4 deletions

File tree

app/app/application/l7_flow_tracing.py

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,19 @@ def set_parent_relation(self):
11801180
# 这里做一个简单的处理,当相邻两个 Span 都是同类 SYS Span 时不要按照 TCP Seq 来设置他们的 Parent 关系。
11811181
continue
11821182
else:
1183+
# if self.spans[i].parent has parent, possibly it's c-p attach to s-p in `try_attach_client_sys_span_via_sys_span`
1184+
# usually, c-p in [0] index and will not try to attach parent here
1185+
# but in grpc _RESPONSE_X mode, sort order would reverse and make c-p not in [0] index
1186+
# for those scenarios, prioritize tcp_seq connection, and clean c-p's index in s-p's childs
1187+
1188+
# 如果 self.spans[i] 已有 parent,很大概率是 c-p 在 `try_attach_client_sys_span_via_sys_span` 过程中关联上了 s-p
1189+
# 通常情况下,c-p 一般在[0]索引,不会在这里尝试关联 parent
1190+
# 而目前在 grpc _RESPONSE_X 模式下,会反转顺序,让 c-p 排序在末端,导致在这里尝试再关联 parent
1191+
# 对此类情况,认为 tcp_seq 关联优先级更高,允许关联,并清理 s-p childs 中的 c-p,否则结果中会重复
1192+
if self.spans[i].parent is not None:
1193+
self.spans[i].parent.flow['childs'].remove(
1194+
self.spans[i].get_flow_index())
1195+
11831196
self.spans[i].set_parent(self.spans[i - 1],
11841197
"trace mounted due to tcp_seq")
11851198

@@ -1188,6 +1201,7 @@ def _sort_network_spans(self):
11881201
对网络span进行排序,排序规则:
11891202
1. 按照TAP_SIDE_RANKS进行排序
11901203
2. 按采集器分组排序,与入口 span 同一个采集器的前移,出口 span 同一个采集器的后移,组内按 start_time 排序
1204+
通常情况下 client-side 是 ingress, server-side 是 egress
11911205
"""
11921206
sorted_spans = sorted(
11931207
self.spans,
@@ -1211,11 +1225,31 @@ def _sort_network_spans(self):
12111225
egress_agent = sorted_spans[i].flow['vtap_id']
12121226
break
12131227

1228+
# sort rank for ingress & egress agent
1229+
ingress_rank = 0 # up for ingress
1230+
egreass_rank = 2 # down for egress
1231+
# `flow_field_conflict` confirm `l7_protocol` and `request_type` are same in a network_span_set, so get first is enough
1232+
# `flow_field_conflict` 确保了 `l7_protocol` `request_type` 在同一个 network_span_set 中一定相等,取首个即可
1233+
if len(sorted_spans) > 0 and sorted_spans[0].flow['l7_protocol'] in [
1234+
L7_PROTOCOL_GRPC, L7_PROTOCOL_HTTP2
1235+
]:
1236+
# in `grpc` protocol, _HEADER and _DATA frame is unidirectional flow which identified as `session`
1237+
# but in fact, when 'req_tcp_seq'=0, it's a 'response', from server-side to client-side, request_type=_RESPONSE_DATA/HEADER
1238+
# so we need to reverse ingress and egress here to re-sort network spans
1239+
1240+
# 在 grpc 中, _HEADER 和 _DATA frame 是被标记为 session 的单向流
1241+
# 但实际上,如果 req_tcp_seq=0,说明这其实是一个 response,方向为 server-side -> client-side,request_type=_RESPONSE_DATA/HEADER
1242+
# 对此类情况,应要反转 ingress 和 egress 排序
1243+
if not sorted_spans[0].flow['req_tcp_seq'] and \
1244+
sorted_spans[0].flow['type'] == L7_FLOW_TYPE_SESSION:
1245+
ingress_rank = 2
1246+
egress_agent = 0
1247+
12141248
for i in range(len(sorted_spans)):
12151249
if sorted_spans[i].flow['vtap_id'] == ingress_agent:
1216-
sorted_spans[i].flow['agent_rank'] = 0
1250+
sorted_spans[i].flow['agent_rank'] = ingress_rank
12171251
elif sorted_spans[i].flow['vtap_id'] == egress_agent:
1218-
sorted_spans[i].flow['agent_rank'] = 2
1252+
sorted_spans[i].flow['agent_rank'] = egreass_rank
12191253
else:
12201254
sorted_spans[i].flow['agent_rank'] = 1
12211255

@@ -1300,6 +1334,9 @@ def get_req_tcp_seq(self) -> int:
13001334
def get_resp_tcp_seq(self) -> int:
13011335
return self.flow.get('resp_tcp_seq', 0)
13021336

1337+
def get_request_id(self) -> int:
1338+
return self.flow.get('request_id', 0)
1339+
13031340
def time_range_cover(self, other_sys_span: 'SpanNode') -> bool:
13041341
return self.flow['start_time_us'] <= other_sys_span.flow[
13051342
'start_time_us'] and self.flow[
@@ -2222,6 +2259,8 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
22222259
ps_child_span_id = ps_child.get_span_id()
22232260
ps_child_parent_span_id = ps_child.get_parent_span_id()
22242261
for net_parent in network_leafs:
2262+
if ps_child.get_parent_id() >= 0:
2263+
continue
22252264
if _same_span_set(ps_child, net_parent, 'network_span_set') \
22262265
or _same_span_set(ps_child, net_parent, 'process_span_set'):
22272266
continue
@@ -2253,6 +2292,8 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
22532292
ps_child_span_id = ps_child.get_span_id()
22542293
ps_child_parent_span_id = ps_child.get_parent_span_id()
22552294
for ps_parent in process_leafs:
2295+
if ps_child.get_parent_id() >= 0:
2296+
continue
22562297
if _same_span_set(ps_child, ps_parent, 'network_span_set') \
22572298
or _same_span_set(ps_child, ps_parent, 'process_span_set'):
22582299
continue
@@ -2304,10 +2345,14 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
23042345
continue
23052346
net_child_index = net_child.get_flow_index()
23062347
net_child_span_id = net_child.get_span_id()
2307-
net_child_x_request_id_0 = net_child.get_x_request_id_0
2348+
net_child_x_request_id_0 = net_child.get_x_request_id_0()
23082349
net_child_x_request_id_1 = net_child.get_x_request_id_1()
2350+
net_child_request_id = net_child.get_request_id()
2351+
net_child_l7_protocol = net_child.flow['l7_protocol']
23092352
net_child_response_duration = net_child.flow['response_duration']
23102353
for net_parent in network_leafs:
2354+
if net_child.get_parent_id() >= 0:
2355+
continue
23112356
if _same_span_set(net_child, net_parent, 'network_span_set') \
23122357
or _same_span_set(net_child, net_parent, 'process_span_set'):
23132358
continue
@@ -2324,7 +2369,6 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
23242369
# net_child, net_parent)
23252370
# process_span_list.append(fake_pss)
23262371
# flows.extend(fake_pss.spans)
2327-
23282372
elif (net_child_x_request_id_0 and net_child_x_request_id_0 == net_parent.get_x_request_id_0()) \
23292373
or (net_child_x_request_id_1 and net_child_x_request_id_1 == net_parent.get_x_request_id_1()) \
23302374
or (net_child_span_id and net_child_span_id == net_parent.get_span_id()):
@@ -2348,6 +2392,20 @@ def _connect_process_and_networks(process_roots: List[SpanNode],
23482392
network_match_parent[
23492393
net_child_index] = net_parent.get_flow_index()
23502394

2395+
elif net_child_request_id and net_child_request_id == net_parent.get_request_id() \
2396+
and net_child_l7_protocol in [L7_PROTOCOL_HTTP2, L7_PROTOCOL_GRPC] \
2397+
and net_child_l7_protocol == net_parent.flow['l7_protocol'] \
2398+
and net_child_response_duration <= net_parent.flow['response_duration']:
2399+
# grpc protocol: request_id get from `stream_id`, means different network_span_set share same stream, it should be connected
2400+
# but other protocol may re-use request_id, so only support grpc now
2401+
# net_child.response_duration <= net_parent.response_duration for case both duration is 0
2402+
# grpc 的 request_id 来源于 `stream_id`, 意味着不同的 network_span_set 在同一个 stream 里,应被连接
2403+
# 但其他协议的 request_id 有可能短时内被多次重用,容易误连接,比如 MySQL 的 StatementID,所以目前仅支持 grpc
2404+
# net_child.response_duration <= net_parent.response_duration 用于双方时延为0 的情况
2405+
# ref: https://www.deepflow.io/docs/zh/features/l7-protocols/http/#http2
2406+
net_child.set_parent(
2407+
net_parent, "net_span mounted due to grpc request_id")
2408+
23512409
for child, parent in network_match_parent.items():
23522410
# FIXME: 生成一个 pseudo net span,待前端修改后再开放此代码,注意处理时延计算
23532411
# fake_pss = _generate_pseudo_process_span_set(flow_index_to_span[child],

0 commit comments

Comments
 (0)