Skip to content

Commit 964d35c

Browse files
taloricsharang
authored andcommitted
[app] optimize query
1 parent def6d8f commit 964d35c

5 files changed

Lines changed: 87 additions & 79 deletions

File tree

app/app.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ app:
88
# http request/response timeout
99
http_request_timeout: 600
1010
http_response_timeout: 600
11+
worker_numbers: 10
1112

1213
querier:
1314
host: deepflow-server

app/app/app.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import server
88
from config import config
99
from log import logger, sanic_logger
10-
from common.const import WORKER_NUMBER
1110

1211
log = logger.getLogger(__name__)
1312

@@ -47,7 +46,7 @@ def main():
4746
except OSError:
4847
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4948
sock.bind(('', config.listen_port))
50-
server.server.run(workers=WORKER_NUMBER,
49+
server.server.run(workers=config.worker_numbers,
5150
sock=sock,
5251
protocol=sanic_logger.DFHttpProtocol)
5352

app/app/application/l7_flow_tracing.py

Lines changed: 84 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,6 @@
9090
"auto_instance_type_0",
9191
"auto_instance_id_0",
9292
"auto_instance_0",
93-
"auto_instance_0_node_type",
94-
"auto_instance_0_icon_id",
9593
"process_kname_0",
9694
"subnet_id_1",
9795
"subnet_1",
@@ -101,8 +99,6 @@
10199
"auto_instance_type_1",
102100
"auto_instance_id_1",
103101
"auto_instance_1",
104-
"auto_instance_1_node_type",
105-
"auto_instance_1_icon_id",
106102
"process_kname_1",
107103
"auto_service_type_0",
108104
"auto_service_id_0",
@@ -128,15 +124,6 @@
128124
FIELDS_MAP = {
129125
"start_time_us": "toUnixTimestamp64Micro(start_time) as start_time_us",
130126
"end_time_us": "toUnixTimestamp64Micro(end_time) as end_time_us",
131-
"auto_instance_0_node_type":
132-
"node_type(auto_instance_0) as auto_instance_0_node_type",
133-
"auto_instance_0_icon_id":
134-
"icon_id(auto_instance_0) as auto_instance_0_icon_id",
135-
"auto_instance_1_node_type":
136-
"node_type(auto_instance_1) as auto_instance_1_node_type",
137-
"auto_instance_1_icon_id":
138-
"icon_id(auto_instance_1) as auto_instance_1_icon_id",
139-
"_id": "toString(_id) as `_id_str`"
140127
}
141128
# 请求-响应合并的 key,当找到未合并的请求-响应时如果这些 key 相同,将合并为一个 span,标记为会话
142129
MERGE_KEYS = [
@@ -227,16 +214,15 @@ async def query_and_trace_flowmetas(
227214
new_trace_ids_in_prev_iteration = set() # 上一轮迭代过程中发现的新 trace_id 集合
228215

229216
# Query1: 先获取 _id 对应的数据
230-
dataframe_flowmetas = await self.query_flowmetas(
231-
time_filter, base_filter)
217+
# don't use timefilter here, querier would extract time from _id (e.g.: id>>32)
218+
dataframe_flowmetas = await self.query_flowmetas("1=1", base_filter)
232219
if type(dataframe_flowmetas) != DataFrame or dataframe_flowmetas.empty:
233220
# when app_spans_from_api got values from api, return it
234221
return [], app_spans_from_api
235-
dataframe_flowmetas.rename(columns={'_id_str': '_id'}, inplace=True)
236222
l7_flow_ids = set(dataframe_flowmetas['_id']) # set(flow._id)
237223

238224
# 用于下一轮迭代,记录元信息
239-
new_trace_infos = TraceInfo.construct_from_dataframe(
225+
build_req_tcp_seqs, build_resp_tcp_seqs, build_x_request_ids, build_syscall_trace_ids = _build_simple_trace_info_from_dataframe(
240226
dataframe_flowmetas)
241227

242228
# remember the initial trace_id
@@ -288,8 +274,6 @@ async def query_and_trace_flowmetas(
288274
time_filter, ' AND '.join(query_trace_filters))
289275
if type(new_trace_id_flows
290276
) == DataFrame and not new_trace_id_flows.empty:
291-
new_trace_id_flows.rename(columns={'_id_str': '_id'},
292-
inplace=True)
293277

294278
# remove duplicate or trace_id conflict flows
295279
new_trace_id_flow_delete_index = []
@@ -319,8 +303,13 @@ async def query_and_trace_flowmetas(
319303
dataframe_flowmetas = self.concat_l7_flow_log_dataframe(
320304
[dataframe_flowmetas, new_trace_id_flows])
321305
l7_flow_ids = set(dataframe_flowmetas['_id'])
322-
new_trace_infos += TraceInfo.construct_from_dataframe(
306+
307+
new_trace_req_tcp_seqs, new_trace_resp_tcp_seqs, new_trace_x_request_ids, new_trace_syscall_trace_ids = _build_simple_trace_info_from_dataframe(
323308
new_trace_id_flows)
309+
build_req_tcp_seqs += new_trace_req_tcp_seqs
310+
build_resp_tcp_seqs += new_trace_resp_tcp_seqs
311+
build_x_request_ids += new_trace_x_request_ids
312+
build_syscall_trace_ids += new_trace_syscall_trace_ids
324313

325314
# remove used trace_ids
326315
new_trace_ids_in_prev_iteration = set()
@@ -336,13 +325,14 @@ async def query_and_trace_flowmetas(
336325
# 2.1. new tcp_seqs
337326
new_req_tcp_seqs = set() # set(str(req_tcp_seq))
338327
new_resp_tcp_seqs = set() # set(str(resp_tcp_seq))
339-
for nti in new_trace_infos:
340-
if nti.req_tcp_seq and nti.req_tcp_seq not in req_tcp_seqs:
341-
req_tcp_seqs.add(nti.req_tcp_seq)
342-
new_req_tcp_seqs.add(str(nti.req_tcp_seq))
343-
if nti.resp_tcp_seq and nti.resp_tcp_seq not in resp_tcp_seqs:
344-
resp_tcp_seqs.add(nti.resp_tcp_seq)
345-
new_resp_tcp_seqs.add(str(nti.resp_tcp_seq))
328+
for nrts in build_req_tcp_seqs:
329+
if nrts and nrts not in req_tcp_seqs:
330+
req_tcp_seqs.add(nrts)
331+
new_req_tcp_seqs.add(str(nrts))
332+
for nrts in build_resp_tcp_seqs:
333+
if nrts and nrts not in resp_tcp_seqs:
334+
resp_tcp_seqs.add(nrts)
335+
new_resp_tcp_seqs.add(str(nrts))
346336
# 2.1. Condition 1: 以 req_tcp_seq & resp_tcp_seq 作为条件查询关联 flow
347337
tcp_seq_filters = []
348338
if new_req_tcp_seqs:
@@ -355,15 +345,10 @@ async def query_and_trace_flowmetas(
355345
new_filters.append(f"({' OR '.join(tcp_seq_filters)})")
356346
# 2.2. new syscall_trace_ids
357347
new_syscall_trace_ids = set() # set(str(syscall_trace_id))
358-
for nti in new_trace_infos:
359-
if nti.syscall_trace_id_request and nti.syscall_trace_id_request not in syscall_trace_ids:
360-
syscall_trace_ids.add(nti.syscall_trace_id_request)
361-
new_syscall_trace_ids.add(str(
362-
nti.syscall_trace_id_request))
363-
if nti.syscall_trace_id_response and nti.syscall_trace_id_response not in syscall_trace_ids:
364-
syscall_trace_ids.add(nti.syscall_trace_id_response)
365-
new_syscall_trace_ids.add(
366-
str(nti.syscall_trace_id_response))
348+
for nsti in build_syscall_trace_ids:
349+
if nsti and nsti not in syscall_trace_ids:
350+
syscall_trace_ids.add(nsti)
351+
new_syscall_trace_ids.add(str(nsti))
367352
# 2.2. Condition 2: 以 syscall_trace_id_request & syscall_trace_id_response 作为条件查询关联 flow
368353
syscall_trace_id_filters = []
369354
if new_syscall_trace_ids:
@@ -377,13 +362,10 @@ async def query_and_trace_flowmetas(
377362
f"({' OR '.join(syscall_trace_id_filters)})")
378363
# 2.3. new x_request_ids
379364
new_x_request_ids = set() # set(x_request_id)
380-
for nti in new_trace_infos:
381-
if nti.x_request_id_0 and nti.x_request_id_0 not in x_request_ids:
382-
x_request_ids.add(nti.x_request_id_0)
383-
new_x_request_ids.add(nti.x_request_id_0)
384-
if nti.x_request_id_1 and nti.x_request_id_1 not in x_request_ids:
385-
x_request_ids.add(nti.x_request_id_1)
386-
new_x_request_ids.add(nti.x_request_id_1)
365+
for nxri in build_x_request_ids:
366+
if nxri and nxri not in x_request_ids:
367+
x_request_ids.add(nxri)
368+
new_x_request_ids.add(nxri)
387369
# 2.3. Condition 3: 以 x_request_id_0 & x_request_id_1 作为条件查询关联 flow
388370
x_request_id_filters = []
389371
if new_x_request_ids:
@@ -406,7 +388,6 @@ async def query_and_trace_flowmetas(
406388
if type(new_flows
407389
) != DataFrame or new_flows.empty: # no more new flows
408390
break
409-
new_flows.rename(columns={'_id_str': '_id'}, inplace=True)
410391

411392
# remove duplicate or trace_id conflict flows
412393
new_flow_remove_indices = []
@@ -464,7 +445,8 @@ async def query_and_trace_flowmetas(
464445
l7_flow_ids = set(dataframe_flowmetas['_id'])
465446

466447
# reset new_trace_infos
467-
new_trace_infos = TraceInfo.construct_from_dataframe(new_flows)
448+
build_req_tcp_seqs, build_resp_tcp_seqs, build_x_request_ids, build_syscall_trace_ids = _build_simple_trace_info_from_dataframe(
449+
new_flows)
468450

469451
else: # no new_flows, no more iterations needed
470452
break
@@ -515,7 +497,6 @@ async def trace_l7_flow(
515497
# 但由于 tracing_completion api 也调用此处追踪逻辑,接口可能传入不存在的 trace_id
516498
# 所以这里兼容 len(l7_flow_ids)=0 场景,仅对: 当 len(l7_flow_ids)>0 但 `query_all_flows` 为空时返回
517499
return {}
518-
l7_flows.rename(columns={'_id_str': '_id'}, inplace=True)
519500

520501
# 将外部 APM 查询到的 Span 与数据库中的 Span 结果进行合并
521502
l7_flows = self.concat_l7_flow_log_dataframe(
@@ -574,7 +555,7 @@ async def query_flowmetas(self, time_filter: str,
574555
type, signal_source, req_tcp_seq, resp_tcp_seq, toUnixTimestamp64Micro(start_time) AS start_time_us,
575556
toUnixTimestamp64Micro(end_time) AS end_time_us, vtap_id, syscall_trace_id_request,
576557
syscall_trace_id_response, span_id, parent_span_id, l7_protocol, trace_id, x_request_id_0,
577-
x_request_id_1, toString(_id) AS `_id_str`, tap_side, auto_instance_0, auto_instance_1
558+
x_request_id_1, _id, tap_side
578559
FROM `l7_flow_log`
579560
WHERE (({time_filter}) AND ({base_filter})) limit {l7_tracing_limit}
580561
""".format(time_filter=time_filter,
@@ -623,8 +604,24 @@ async def query_all_flows(self, time_filter: str, l7_flow_ids: list,
623604
dictGet(deepflow.pod_node_map, ('name'), (toUInt64(pod_node_id_1))) AS pod_node_name_1
624605
"""
625606
ids = []
607+
# time => [_ids]
608+
# build _id IN (xxx) conditions, grouping _id base on the same seconds
609+
iddict = dict()
626610
for flow_id in l7_flow_ids:
627-
ids.append(f"_id={flow_id}")
611+
seconds = flow_id >> 32
612+
iddict.setdefault(seconds, []).append(str(flow_id))
613+
# fix start_time from min to max extract from _id
614+
min_start_time = list(iddict.keys())[-1] if len(
615+
iddict.keys()) > 0 else 0
616+
max_end_time = 0
617+
for sec in iddict.keys():
618+
ids.append(f"_id IN ({', '.join(iddict[sec])})")
619+
if sec > max_end_time:
620+
max_end_time = sec
621+
if sec < min_start_time:
622+
min_start_time = sec
623+
if min_start_time > 0:
624+
time_filter = f"time>={min_start_time} AND time<={max_end_time}"
628625
fields = []
629626
for field in return_fields:
630627
if field in FIELDS_MAP:
@@ -727,6 +724,16 @@ def set_all_relate(trace_infos: list,
727724
if fast_check and find_related: continue
728725

729726

727+
def _build_simple_trace_info_from_dataframe(df: pd.DataFrame):
728+
req_tcp_seqs = df['req_tcp_seq'].tolist()
729+
resp_tcp_seqs = df['resp_tcp_seq'].tolist()
730+
x_request_ids = df['x_request_id_0'].tolist()
731+
x_request_ids += df['x_request_id_1'].tolist()
732+
syscall_trace_ids = df['syscall_trace_id_request'].tolist()
733+
syscall_trace_ids += df['syscall_trace_id_response'].tolist()
734+
return req_tcp_seqs, resp_tcp_seqs, x_request_ids, syscall_trace_ids
735+
736+
730737
class TraceInfo:
731738

732739
def __init__(self, _id, signal_source, vtap_id, _type, start_time_us,
@@ -781,32 +788,33 @@ def construct_from_dataframe(cls, dataframe_flowmetas: DataFrame):
781788
constructor of traceinfo from database records to build tracing keys
782789
"""
783790
trace_infos = [] # [TraceInfo]
784-
for index in dataframe_flowmetas.index:
791+
for row in dataframe_flowmetas.itertuples():
785792
trace_infos.append(
786793
TraceInfo(
787-
dataframe_flowmetas.at[index, '_id'],
788-
dataframe_flowmetas.at[index, 'signal_source'],
789-
dataframe_flowmetas.at[index, 'vtap_id'],
790-
dataframe_flowmetas.at[index, 'type'],
794+
# key start with '_' can not access through attr
795+
dataframe_flowmetas.at[row.Index, '_id'],
796+
getattr(row, 'signal_source'),
797+
getattr(row, 'vtap_id'),
798+
getattr(row, 'type'),
791799
# time
792-
dataframe_flowmetas.at[index, 'start_time_us'],
793-
dataframe_flowmetas.at[index, 'end_time_us'],
800+
getattr(row, 'start_time_us'),
801+
getattr(row, 'end_time_us'),
794802
# tcp_seq
795-
dataframe_flowmetas.at[index, 'req_tcp_seq'],
796-
dataframe_flowmetas.at[index, 'resp_tcp_seq'],
803+
getattr(row, 'req_tcp_seq'),
804+
getattr(row, 'resp_tcp_seq'),
797805
# span_id
798-
dataframe_flowmetas.at[index, 'trace_id'],
799-
dataframe_flowmetas.at[index, 'span_id'],
800-
dataframe_flowmetas.at[index, 'parent_span_id'],
806+
getattr(row, 'trace_id'),
807+
getattr(row, 'span_id'),
808+
getattr(row, 'parent_span_id'),
801809
# x_request_id
802-
dataframe_flowmetas.at[index, 'x_request_id_0'],
803-
dataframe_flowmetas.at[index, 'x_request_id_1'],
810+
getattr(row, 'x_request_id_0'),
811+
getattr(row, 'x_request_id_1'),
804812
# syscall_trace_id
805-
dataframe_flowmetas.at[index, 'syscall_trace_id_request'],
806-
dataframe_flowmetas.at[index, 'syscall_trace_id_response'],
813+
getattr(row, 'syscall_trace_id_request'),
814+
getattr(row, 'syscall_trace_id_response'),
807815
# origin_flow_list
808816
dataframe_flowmetas,
809-
index))
817+
row.Index))
810818
return trace_infos
811819

812820
@classmethod
@@ -917,6 +925,17 @@ def flow_field_conflict(cls, lhs: TraceInfo, rhs: TraceInfo) -> bool:
917925
'response_exception',
918926
'response_result',
919927
]:
928+
if is_http2_grpc_and_differ and key == 'l7_protocol_str':
929+
# 当已经确认 l7_protocol 忽略差异时,不用比较 l7_protocol_str
930+
continue
931+
932+
if is_http2_grpc_and_differ and key == 'request_resource':
933+
# 某些情况下同一股流量在不同位置可能会被 Agent 分别解析为 HTTP2 和 gRPC
934+
# 目前这两种协议的 request_resource 取自不同的协议字段,详见下面的文档:
935+
# https://deepflow.io/docs/zh/features/universal-map/l7-protocols/#http2
936+
# 于是,当一个协议是 HTTP2、另一个是 gRPC 时,不用比较这些差异字段
937+
continue
938+
920939
lhs_value = lhs.get_extra_field(key)
921940
rhs_value = rhs.get_extra_field(key)
922941
if not lhs_value or not rhs_value:
@@ -940,17 +959,6 @@ def flow_field_conflict(cls, lhs: TraceInfo, rhs: TraceInfo) -> bool:
940959
is_http2_grpc_and_differ = True
941960
continue
942961

943-
if is_http2_grpc_and_differ and key == 'l7_protocol_str':
944-
# 当已经确认 l7_protocol 忽略差异时,不用比较 l7_protocol_str
945-
continue
946-
947-
if is_http2_grpc_and_differ and key == 'request_resource':
948-
# 某些情况下同一股流量在不同位置可能会被 Agent 分别解析为 HTTP2 和 gRPC
949-
# 目前这两种协议的 request_resource 取自不同的协议字段,详见下面的文档:
950-
# https://deepflow.io/docs/zh/features/universal-map/l7-protocols/#http2
951-
# 于是,当一个协议是 HTTP2、另一个是 gRPC 时,不用比较这些差异字段
952-
continue
953-
954962
if lhs_value != rhs_value:
955963
return True
956964
return False

app/app/common/const.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11

22
# API
3-
WORKER_NUMBER = 10
43
API_VERSION = 'v1'
54
API_PREFIX = '/' + API_VERSION + '/stats'
65

app/app/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ def __init__(self):
1111
def parse_log(self, cfg):
1212
self.log_level = cfg.get('log-level', 'info')
1313
self.log_file = cfg.get('log-file', '/etc/deepflow/app.log')
14+
self.worker_numbers = cfg.get('worker_numbers', 10)
1415

1516
def parse_spec(self, cfg):
1617
spec = cfg.get('spec')

0 commit comments

Comments
 (0)