Skip to content

Commit 71c59ce

Browse files
committed
feat: dubbo support java format
1 parent 4d5d0d3 commit 71c59ce

4 files changed

Lines changed: 190 additions & 38 deletions

File tree

agent/crates/public/src/codecs/hessian2.rs

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -309,39 +309,6 @@ impl Hessian2Decoder {
309309
}
310310
}
311311

312-
pub fn decode_list_first_string(bytes: &[u8], start: usize) -> (Option<String>, usize) {
313-
if start >= bytes.len() {
314-
return (None, 0);
315-
}
316-
let mut offset = 0;
317-
let bytes = &bytes[start..];
318-
319-
if !(BC_LIST_FIXED_TYPED_LEN_TAG_MIN..=BC_LIST_FIXED_TYPED_LEN_TAG_MAX)
320-
.contains(&bytes[offset])
321-
{
322-
return (None, 0);
323-
}
324-
offset += 1;
325-
326-
let (_, type_len) = Self::decode_string(bytes, offset);
327-
offset += type_len;
328-
329-
if offset >= bytes.len() {
330-
return (None, 0);
331-
}
332-
333-
let len = bytes[offset] as usize;
334-
offset += 1;
335-
336-
if offset + len >= bytes.len() {
337-
return (None, 0);
338-
}
339-
let Ok(value) = std::str::from_utf8(&bytes[offset..offset + len]) else {
340-
return (None, 0);
341-
};
342-
(Some(value.to_string()), offset + len)
343-
}
344-
345312
// https://github.com/apache/dubbo-go-hessian2/blob/master/int.go#L60
346313
pub fn decode_i32(payload: &[u8], index: usize) -> (i32, usize) {
347314
let tag = payload[index];

agent/src/flow_generator/protocol_logs/rpc/dubbo.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
mod consts;
1818
mod hessian2;
19+
mod java;
1920

2021
use std::{borrow::Cow, mem::replace};
2122

@@ -952,6 +953,7 @@ impl DubboLog {
952953
#[cfg(feature = "enterprise")]
953954
cf_ctx,
954955
),
956+
JAVA_SERIALIZATION_ID => java::get_req_body_info(config, payload, info),
955957
KRYO_SERIALIZATION2_ID => kryo::get_req_body_info(config, payload, info),
956958
KRYO_SERIALIZATION_ID => kryo::get_req_body_info(config, payload, info),
957959
FASTJSON2_SERIALIZATION_ID => fastjson2::get_req_body_info(config, payload, info),
@@ -1209,6 +1211,7 @@ impl DubboHeader {
12091211

12101212
#[cfg(test)]
12111213
mod tests {
1214+
use core::convert::Into;
12121215
use std::cell::RefCell;
12131216
use std::fmt;
12141217
use std::path::Path;
@@ -1283,6 +1286,7 @@ mod tests {
12831286
x_request_id: vec![],
12841287
trace_types: vec![
12851288
TraceType::Customize("EagleEye-TraceID".to_string()),
1289+
TraceType::Customize("x-g-rid".to_string()),
12861290
TraceType::Sw8,
12871291
],
12881292
span_types: vec![
@@ -1524,4 +1528,26 @@ mod tests {
15241528
}
15251529
perf_stat
15261530
}
1531+
1532+
#[test]
1533+
fn decode_java_trace_id() {
1534+
let payload = [
1535+
0x74, 0x00, 0x07, 0x78, 0x2d, 0x67, 0x2d, 0x72, 0x69, 0x64, 0x74, 0x00, 0x28, 0x33,
1536+
0x38, 0x62, 0x61, 0x66, 0x65, 0x36, 0x36, 0x38, 0x63, 0x32, 0x62, 0x65, 0x64, 0x62,
1537+
0x36, 0x64, 0x33, 0x64, 0x32, 0x62, 0x35, 0x30, 0x66, 0x63, 0x66, 0x31, 0x37, 0x61,
1538+
0x37, 0x33, 0x34, 0x35, 0x32, 0x31, 0x30, 0x62, 0x66, 0x31, 0x34, 0x00, 0x00,
1539+
];
1540+
let mut info = DubboInfo::default();
1541+
1542+
java::decode_trace_id(
1543+
&payload,
1544+
&TraceType::Customize("x-g-rid".to_string()),
1545+
&mut info,
1546+
);
1547+
assert_eq!(info.trace_ids.to_strings().len(), 1);
1548+
assert_eq!(
1549+
info.trace_ids.to_strings()[0],
1550+
"38bafe668c2bedb6d3d2b50fcf17a7345210bf14"
1551+
);
1552+
}
15271553
}

agent/src/flow_generator/protocol_logs/rpc/dubbo/hessian2.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ fn lookup_str(payload: &[u8], trace_type: &TraceType) -> Option<String> {
5959
{
6060
return Some(context);
6161
}
62-
if let (Some(context), _) =
63-
Hessian2Decoder::decode_list_first_string(payload, start + index + tag.len())
64-
{
65-
return Some(context);
66-
}
6762
start += index + tag.len();
6863
}
6964
return None;
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright (c) 2025 Yunshan Networks
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
use nom::FindSubstring;
18+
19+
use super::DubboInfo;
20+
use crate::config::handler::{L7LogDynamicConfig, TraceType};
21+
use crate::utils::bytes::{read_u16_be, read_u32_be};
22+
23+
const TC_STRING: u8 = 0x74;
24+
const TC_BLOCKDATA: u8 = 0x77;
25+
26+
fn decode_tc_string(payload: &[u8]) -> Option<String> {
27+
let offset = 3;
28+
if payload.len() <= 3 || payload[0] != TC_STRING {
29+
return None;
30+
}
31+
let length = read_u16_be(&payload[1..]) as usize;
32+
if offset + length >= payload.len() {
33+
return None;
34+
}
35+
let Ok(value) = std::str::from_utf8(&payload[offset..offset + length]) else {
36+
return None;
37+
};
38+
Some(value.to_string())
39+
}
40+
41+
fn lookup_str(payload: &[u8], trace_type: &TraceType) -> Option<String> {
42+
let tag = match trace_type {
43+
TraceType::Sw3 | TraceType::Sw8 | TraceType::CloudWise | TraceType::Customize(_) => {
44+
trace_type.as_str()
45+
}
46+
_ => return None,
47+
};
48+
if tag.len() <= 1 {
49+
return None;
50+
}
51+
52+
let mut start = 0;
53+
while start < payload.len() {
54+
let Some(index) = (&payload[start..]).find_substring(tag) else {
55+
break;
56+
};
57+
58+
start += index + tag.len();
59+
if start >= payload.len() {
60+
break;
61+
}
62+
63+
if let Some(s) = decode_tc_string(&payload[start..]) {
64+
return Some(s);
65+
}
66+
}
67+
return None;
68+
}
69+
70+
pub fn decode_trace_id(payload: &[u8], trace_type: &TraceType, info: &mut DubboInfo) {
71+
if let Some(trace_id) = lookup_str(payload, trace_type) {
72+
info.add_trace_id(trace_id, trace_type);
73+
}
74+
}
75+
76+
fn decode_span_id(payload: &[u8], trace_type: &TraceType, info: &mut DubboInfo) {
77+
if let Some(span_id) = lookup_str(payload, trace_type) {
78+
info.set_span_id(span_id, trace_type);
79+
}
80+
}
81+
82+
fn decode_header_string(payload: &[u8], offset: usize) -> Option<(String, usize)> {
83+
if offset + 4 >= payload.len() {
84+
return None;
85+
}
86+
let payload = &payload[offset..];
87+
let mut offset = 0;
88+
let first_length = read_u32_be(&payload[offset..]) as usize;
89+
offset += 4;
90+
91+
if offset + 2 >= payload.len() {
92+
return None;
93+
}
94+
let second_length = read_u16_be(&payload[offset..]) as usize;
95+
offset += 2;
96+
if first_length != second_length
97+
|| second_length == 0
98+
|| offset + second_length >= payload.len()
99+
{
100+
return None;
101+
}
102+
103+
let Ok(value) = std::str::from_utf8(&payload[offset..offset + second_length]) else {
104+
return None;
105+
};
106+
107+
Some((value.to_string(), offset + second_length))
108+
}
109+
110+
pub fn get_req_body_info(config: &L7LogDynamicConfig, payload: &[u8], info: &mut DubboInfo) {
111+
// Java Body
112+
// +---------------------------------------------------------------------------------------+
113+
// | Magic (2B) | Magic Version (2B) | TC BLOCKDATA Header (2B) | Double version (*) | ... |
114+
// +---------------------------------------------------------------------------------------+
115+
let mut offset = 6;
116+
let Some(version) = decode_header_string(&payload, offset) else {
117+
return;
118+
};
119+
info.dubbo_version = version.0;
120+
offset += version.1;
121+
122+
let Some(service_name) = decode_header_string(&payload, offset) else {
123+
return;
124+
};
125+
info.service_name = service_name.0;
126+
offset += service_name.1;
127+
128+
let Some(service_version) = decode_header_string(&payload, offset) else {
129+
return;
130+
};
131+
info.service_version = service_version.0;
132+
offset += service_version.1;
133+
134+
let Some(method_name) = decode_header_string(&payload, offset) else {
135+
return;
136+
};
137+
info.method_name = method_name.0;
138+
offset += method_name.1;
139+
140+
if config.trace_types.is_empty() || offset >= payload.len() {
141+
return;
142+
}
143+
144+
for trace_type in config.trace_types.iter() {
145+
if trace_type.as_str().len() > u8::MAX as usize {
146+
continue;
147+
}
148+
149+
decode_trace_id(&payload[offset..], &trace_type, info);
150+
if !config.multiple_trace_id_collection && !info.trace_ids.0.is_empty() {
151+
break;
152+
}
153+
}
154+
for span_type in config.span_types.iter() {
155+
if span_type.as_str().len() > u8::MAX as usize {
156+
continue;
157+
}
158+
159+
decode_span_id(&payload[offset..], &span_type, info);
160+
if info.span_id.get().len() != 0 {
161+
break;
162+
}
163+
}
164+
}

0 commit comments

Comments
 (0)