Skip to content

Commit f73331a

Browse files
committed
feat: support pinpoint api
1 parent 2b4c197 commit f73331a

2 files changed

Lines changed: 23 additions & 7 deletions

File tree

app/app/application/l7_flow_tracing.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ async def query_and_trace_flowmetas(
314314
# when app_spans_from_api got values from api, return it
315315
return [], app_spans_from_api
316316
l7_flow_ids = set(dataframe_flowmetas['_id']) # set(flow._id)
317+
trace_span_ids = set() # set((flow.trace_id, flow.span_id))
317318

318319
# 用于下一轮迭代,记录元信息
319320
build_req_tcp_seqs, build_resp_tcp_seqs, build_x_request_ids, build_syscall_trace_ids, build_request_ids = _build_simple_trace_info_from_dataframe(
@@ -359,7 +360,7 @@ async def query_and_trace_flowmetas(
359360
# 1. 使用 trace_id 查询
360361
if new_trace_ids_for_next_iteration and TRACING_SRC_TRACE_ID in config.tracing_source:
361362
# 1.1. Call external APM API
362-
if config.call_apm_api_to_supplement_trace:
363+
if config.call_apm_api_to_supplement_trace and not config.apm_api_type_pinpoint:
363364
new_app_spans_from_apm = []
364365
for trace_id in new_trace_ids_for_next_iteration:
365366
app_spans = await self.query_apm_for_app_span_completion(
@@ -437,6 +438,8 @@ async def query_and_trace_flowmetas(
437438
dataframe_flowmetas = self.concat_l7_flow_log_dataframe(
438439
[dataframe_flowmetas, new_trace_id_flows])
439440
l7_flow_ids = set(dataframe_flowmetas['_id'])
441+
for i in range(len(dataframes_flowmetas['trace_id'])):
442+
trace_span_ids.add((dataframe_flowmetas['trace_id'][i], dataframe_flowmetas['span_id'][i]))
440443

441444
new_trace_req_tcp_seqs, new_trace_resp_tcp_seqs, new_trace_x_request_ids, new_trace_syscall_trace_ids, new_request_ids = _build_simple_trace_info_from_dataframe(
442445
new_trace_id_flows)
@@ -624,6 +627,17 @@ async def query_and_trace_flowmetas(
624627

625628
# end of `for i in range(max_iteration)`
626629

630+
# Call external APM (Pinpoint) API
631+
if config.call_apm_api_to_supplement_trace and config.apm_api_type_pinpoint:
632+
for trace_id, span_id in trace_span_ids:
633+
if not span_id:
634+
continue
635+
if isinstance(span_id, str) and not span_id.strip(): # skip empty span_id
636+
continue
637+
app_spans = await self.query_apm_for_app_span_completion(
638+
trace_id, span_id)
639+
app_spans_from_externa.extend(app_spans)
640+
627641
return l7_flow_ids, app_spans_from_external
628642

629643
async def trace_l7_flow(
@@ -726,7 +740,7 @@ async def query_flowmetas(self, time_filter: str,
726740
"""
727741
sql = """SELECT
728742
type, signal_source, req_tcp_seq, resp_tcp_seq, toUnixTimestamp64Micro(start_time) AS start_time_us,
729-
toUnixTimestamp64Micro(end_time) AS end_time_us, vtap_id, protocol, syscall_trace_id_request,
743+
toUnixTimestamp64Micro(end_time) AS end_time_us, vtap_id, protocol, syscall_trace_id_request,
730744
syscall_trace_id_response, span_id, parent_span_id, request_id, l7_protocol, trace_id, x_request_id_0,
731745
x_request_id_1, _id, tap_side
732746
FROM `l7_flow_log`
@@ -742,11 +756,11 @@ async def query_flowmetas(self, time_filter: str,
742756
self.status.append(status_discription, response)
743757
return response.get("data", [])
744758

745-
async def query_apm_for_app_span_completion(self, trace_id: str) -> list:
759+
async def query_apm_for_app_span_completion(self, trace_id: str, span_id: str) -> list:
746760
try:
747761
# if get data error from external apm, ignore it
748762
# it should not interrupt the main tracing process
749-
get_third_app_span_url = f"http://{config.querier_server}:{config.querier_port}/api/v1/adapter/tracing?traceid={trace_id}"
763+
get_third_app_span_url = f"http://{config.querier_server}:{config.querier_port}/api/v1/adapter/tracing?traceid={trace_id}&spanid={span_id}"
750764
app_spans_res, app_spans_code = await curl_perform(
751765
'get', get_third_app_span_url)
752766
if app_spans_code != HTTP_OK:
@@ -3512,7 +3526,7 @@ def merge_service(services: List[ProcessSpanSet], traces: list,
35123526
uid_to_trace_index: Dict[int, int]) -> list:
35133527
"""
35143528
按 service 对 flow 分组并统计时延指标,先基于剪枝后的 trace span 过滤服务,对剩下的服务统计时延
3515-
3529+
35163530
服务分组的依据是:「保持一定的抽象粒度的情况下,尽量找到最小粒度的应用服务」
35173531
通常情况下,service 直接按 auto_service/app_service 分组做合并
35183532
当同一个 auto_service_id 下有多个进程时,执行进程分组逻辑
@@ -3531,7 +3545,7 @@ def merge_service(services: List[ProcessSpanSet], traces: list,
35313545
1. 仅 eBPF 的一侧有
35323546
2. 采集器开启 inputs.proc.enabled
35333547
3. 采集器通过 inputs.proc.process_mathcer 规则匹配到进程
3534-
3548+
35353549
这里 2+3 步骤称为「gprocess 同步」
35363550
"""
35373551
metrics_map = {}
@@ -4000,7 +4014,7 @@ def tidy_host_clock_correction(self) -> dict:
40004014
1. 基于一个 hostX 作为基准做统一调整,所有 host 对齐到该 hostX
40014015
2. 理论上要以 rootSpan 的 host 作为基准,这里等价于"没有成为 child"的 host
40024016
3. 注意可能出现多个基准 host,比如存在如下情况:[hostX, hostY], [hostY, hostZ], [hostA, hostB], [hostA, hostC], 此时应该存在 hostX/hostA 两个基准
4003-
4017+
40044018
调整逻辑:
40054019
1. 基准 host 不做调整,仅对基准 host 以下的 host 做调整(准确地说是只对 child host 调整)
40064020
2. 对一组 [hostA, hostB],如果存在 [hostX, hostA] 且 hostA 已被调整,则 hostB 的调整值需要加上 hostA 的调整值,以对齐到 hostX

app/app/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ def parse_spec(self, cfg):
2424
'allow_multiple_trace_ids_in_tracing_result', False)
2525
self.call_apm_api_to_supplement_trace = spec.get(
2626
'call_apm_api_to_supplement_trace', False)
27+
self.apm_api_type_pinpoint = spec.get(
28+
'apm_api_type_pinpoint', False)
2729
self.tracing_source = spec.get(
2830
'tracing_source',
2931
["trace_id", "syscall", "tcp_seq", "x_request_id", "dns"])

0 commit comments

Comments
 (0)