11from collections import defaultdict
22import pandas as pd
3+ from log import logger
34from pandas import DataFrame
45
56from .l7_flow_tracing import (TAP_SIDE_CLIENT_PROCESS , TAP_SIDE_SERVER_PROCESS ,
6- TAP_SIDE_CLIENT_APP , TAP_SIDE_SERVER_APP ,
7- TAP_SIDE_APP , RETURN_FIELDS , L7_FLOW_SIGNAL_SOURCE_OTEL )
7+ RETURN_FIELDS )
88from .l7_flow_tracing import (L7FlowTracing , L7NetworkMeta , L7SyscallMeta ,
9- L7XrequestMeta )
9+ L7XrequestMeta , TraceInfo )
1010from .l7_flow_tracing import sort_all_flows , format_final_result , set_all_relate
1111from common import const
12+ from common .const import L7_FLOW_SIGNAL_SOURCE_OTEL
13+ from common .utils import inner_defaultdict_set
1214from config import config
1315from models .models import AppSpans
1416
17+ log = logger .getLogger (__name__ )
18+
1519
1620class TracingCompletion (L7FlowTracing ):
1721
@@ -80,11 +84,9 @@ async def trace_l7_flow(self,
8084 network_metas = set ()
8185 syscall_metas = set ()
8286 trace_ids = set ()
83- app_metas = set ()
8487 x_request_metas = set ()
8588 l7_flow_ids = set ()
86- xrequests = []
87- related_map = defaultdict (dict )
89+ related_map = defaultdict (inner_defaultdict_set )
8890 query_simple_trace_id = False
8991 dataframe_flowmetas = self .app_spans_df
9092 if dataframe_flowmetas .empty :
@@ -134,6 +136,10 @@ async def trace_l7_flow(self,
134136 f"signal_source={ L7_FLOW_SIGNAL_SOURCE_OTEL } " )
135137 new_trace_id_flows = await self .query_flowmetas (
136138 time_filter , ' AND ' .join (query_trace_filters ))
139+ if type (new_trace_id_flows
140+ ) == DataFrame and not new_trace_id_flows .empty :
141+ new_trace_id_flows .rename (columns = {'_id_str' : '_id' },
142+ inplace = True )
137143 query_simple_trace_id = True
138144 if delete_index :
139145 dataframe_flowmetas = dataframe_flowmetas .drop (
@@ -162,6 +168,8 @@ async def trace_l7_flow(self,
162168 f"signal_source={ L7_FLOW_SIGNAL_SOURCE_OTEL } " )
163169 new_trace_id_flows = await self .query_flowmetas (
164170 time_filter , ' AND ' .join (query_trace_filters ))
171+ new_trace_id_flows .rename (columns = {'_id_str' : '_id' },
172+ inplace = True )
165173
166174 if type (new_trace_id_flows ) != DataFrame :
167175 break
@@ -206,10 +214,6 @@ async def trace_l7_flow(self,
206214 ))
207215 new_network_metas -= network_metas
208216 network_metas |= new_network_metas
209- networks = [
210- L7NetworkMeta (nnm , network_delay_us )
211- for nnm in new_network_metas
212- ]
213217 for nnm in new_network_metas :
214218 req_tcp_seq = nnm [2 ]
215219 resp_tcp_seq = nnm [3 ]
@@ -251,7 +255,6 @@ async def trace_l7_flow(self,
251255 ))
252256 new_syscall_metas -= syscall_metas
253257 syscall_metas |= new_syscall_metas
254- syscalls = [L7SyscallMeta (nsm ) for nsm in new_syscall_metas ]
255258 for nsm in new_syscall_metas :
256259 syscall_trace_id_request = nsm [2 ]
257260 syscall_trace_id_response = nsm [3 ]
@@ -289,9 +292,6 @@ async def trace_l7_flow(self,
289292 dataframe_flowmetas ['x_request_id_1' ][index ]))
290293 new_x_request_metas -= x_request_metas
291294 x_request_metas |= new_x_request_metas
292- xrequests = [
293- L7XrequestMeta (nxr ) for nxr in new_x_request_metas
294- ]
295295 for nxr in new_x_request_metas :
296296 x_request_id_0 = nxr [1 ]
297297 x_request_id_1 = nxr [2 ]
@@ -374,28 +374,22 @@ async def trace_l7_flow(self,
374374 log .debug (f"删除的trace id为:{ deleted_trace_ids } " )
375375 new_flows .rename (columns = {'_id_str' : '_id' }, inplace = True )
376376
377- new_related_map = defaultdict (dict )
378- new_flow_ids = set (new_flows ['_id' ])
379- if xrequests :
380- for x_request in xrequests :
381- x_request .set_relate (new_flow_ids , new_related_map ,
382- id_to_related_tag )
383-
384- if syscalls :
385- for syscall in syscalls :
386- syscall .set_relate (new_flow_ids , new_related_map ,
387- id_to_related_tag )
377+ trace_infos = TraceInfo .construct_from_dataframe (
378+ dataframe_flowmetas
379+ ) + TraceInfo .construct_from_dataframe (new_flows )
388380
389- if networks :
390- for network in networks :
391- network .set_relate (new_flow_ids , new_related_map ,
392- id_to_related_tag )
381+ set_all_relate (
382+ trace_infos ,
383+ related_map ,
384+ network_delay_us ,
385+ fast_check = True ,
386+ skip_first_n_trace_infos = len (dataframe_flowmetas ))
393387
394388 new_flow_delete_index = []
395389 for index in range (len (new_flows .index )):
396390 _id = new_flows ['_id' ][index ]
397391 # Delete unrelated data
398- if _id not in new_related_map :
392+ if _id not in related_map :
399393 new_flow_delete_index .append (index )
400394 if new_flow_delete_index :
401395 new_flows = new_flows .drop (
@@ -413,7 +407,8 @@ async def trace_l7_flow(self,
413407 new_flows_length = len (dataframe_flowmetas )
414408 if old_flows_length == new_flows_length :
415409 break
416- set_all_relate (dataframe_flowmetas , related_map , network_delay_us )
410+ # end of `for i in range(max_iteration):`
411+
417412 # 获取追踪到的所有应用流日志
418413 return_fields += RETURN_FIELDS
419414 flow_fields = list (RETURN_FIELDS )
@@ -437,10 +432,11 @@ async def trace_l7_flow(self,
437432 l7_flows .at [index , 'related_ids' ] = related_map [l7_flows .at [index ,
438433 '_id' ]]
439434 # 对所有应用流日志排序
440- l7_flows_merged , app_flows , networks = sort_all_flows (
435+ l7_flows_merged , app_flows , networks , flow_index_to_id0 , related_flow_index_map = sort_all_flows (
441436 l7_flows , network_delay_us , return_fields , ntp_delay_us )
442- return format (l7_flows_merged , networks , app_flows ,
443- self .args .get ('_id' ), network_delay_us )
437+ return format_final_result (l7_flows_merged , networks , app_flows ,
438+ self .args .get ('_id' ), network_delay_us ,
439+ flow_index_to_id0 , related_flow_index_map )
444440
445441 # update start time and end time
446442 def update_time (self ):
0 commit comments