Skip to content

Commit ccd3f40

Browse files
rvqlkylewanginchina
authored andcommitted
feat: refactor OracleParser to associated functions and add custom field policy support
Refactor OracleParser from stateful instance methods to stateless associated functions, add Frame lifetime with payload slice, and extend Source::Sql with optional raw payload. Also add CustomFieldPolicy support to Oracle parser following the MySQL pattern.
1 parent 8b1ef40 commit ccd3f40

3 files changed

Lines changed: 93 additions & 23 deletions

File tree

agent/crates/enterprise-utils/src/lib.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub mod l7 {
6868
Url(&'a str),
6969
Header(&'a str, &'a str),
7070
Payload(PayloadType, &'a [u8]),
71-
Sql(&'a str),
71+
Sql(&'a str, Option<&'a [u8]>),
7272
// used as a 'policy trigger' when the protocol only supports save payload
7373
Dummy,
7474
}
@@ -288,27 +288,25 @@ pub mod l7 {
288288
pub resp_0x04_extra_byte: bool,
289289
}
290290

291-
pub struct Frame {
291+
pub struct Frame<'a> {
292292
pub packet_type: TnsPacketType,
293293
pub length: usize,
294294
pub body: Body,
295+
pub payload: &'a [u8],
295296
}
296297

297-
#[derive(Default)]
298-
pub struct OracleParser {
299-
pub frames: Vec<Frame>,
300-
}
298+
pub struct OracleParser;
301299

302300
impl OracleParser {
303-
pub fn check_payload(
304-
&mut self,
305-
_: &[u8],
306-
_: &OracleParseConfig,
307-
) -> Option<LogMessageType> {
301+
pub fn check_payload(_: &[u8], _: &OracleParseConfig) -> Option<LogMessageType> {
308302
unimplemented!()
309303
}
310304

311-
pub fn parse_payload(&mut self, _: &[u8], _: bool, _: &OracleParseConfig) -> bool {
305+
pub fn parse_payload<'a>(
306+
_: &'a [u8],
307+
_: bool,
308+
_: &OracleParseConfig,
309+
) -> Vec<Frame<'a>> {
312310
unimplemented!()
313311
}
314312
}

agent/src/flow_generator/protocol_logs/sql/mysql.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ impl MysqlInfo {
456456
&mut parser.custom_field_store,
457457
self,
458458
TrafficDirection::REQUEST,
459-
Source::Sql(sql_string),
459+
Source::Sql(sql_string, None),
460460
);
461461
}
462462

agent/src/flow_generator/protocol_logs/sql/oracle.rs

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414
* limitations under the License.
1515
*/
1616

17+
use std::borrow::Cow;
18+
1719
use serde::Serialize;
1820

1921
use super::super::value_is_default;
2022
use crate::config::handler::LogParserConfig;
2123
use crate::flow_generator::{
2224
protocol_logs::{
25+
auto_merge_custom_field,
2326
pb_adapter::{ExtendedInfo, KeyVal},
2427
swap_if, L7ResponseStatus,
2528
},
@@ -37,12 +40,29 @@ use crate::{
3740
},
3841
};
3942

40-
use enterprise_utils::l7::sql::oracle::{
41-
Body, CallId, DataFlags, DataId, OracleParseConfig, OracleParser, TnsPacketType,
43+
use enterprise_utils::l7::{
44+
custom_policy::{
45+
custom_field_policy::{
46+
enums::{Op, Source},
47+
Store,
48+
},
49+
enums::TrafficDirection,
50+
},
51+
sql::oracle::{
52+
Body, CallId, DataFlags, DataId, OracleParseConfig, OracleParser, TnsPacketType,
53+
},
4254
};
43-
use public::l7_protocol::{L7Protocol, LogMessageType};
55+
use public::l7_protocol::{Field, FieldSetter, L7Log, L7LogAttribute, L7Protocol, LogMessageType};
56+
use public_derive::L7Log;
4457

45-
#[derive(Serialize, Debug, Default, Clone, PartialEq)]
58+
#[derive(L7Log, Serialize, Debug, Default, Clone, PartialEq)]
59+
#[l7_log(request_type.getter = "OracleInfo::get_request_type", request_type.setter = "OracleInfo::set_request_type")]
60+
#[l7_log(version.skip = "true", request_domain.skip = "true", endpoint.skip = "true")]
61+
#[l7_log(request_id.skip = "true", http_proxy_client.skip = "true")]
62+
#[l7_log(trace_id.skip = "true", span_id.skip = "true", x_request_id.skip = "true")]
63+
#[l7_log(response_result.skip = "true", response_code.skip = "true")]
64+
#[l7_log(biz_type.skip = "true", biz_code.skip = "true", biz_scenario.skip = "true")]
65+
#[l7_log(biz_response_code.skip = "true")]
4666
pub struct OracleInfo {
4767
pub msg_type: LogMessageType,
4868
#[serde(skip)]
@@ -51,6 +71,7 @@ pub struct OracleInfo {
5171
#[serde(rename = "request_type", skip_serializing_if = "value_is_default")]
5272
pub packet_type: TnsPacketType,
5373
// req
74+
#[l7_log(request_resource)]
5475
#[serde(rename = "request_resource", skip_serializing_if = "value_is_default")]
5576
pub sql: String,
5677
#[serde(skip)]
@@ -68,11 +89,13 @@ pub struct OracleInfo {
6889
pub ret_code: u16,
6990
#[serde(rename = "sql_affected_rows", skip_serializing_if = "value_is_default")]
7091
pub affected_rows: Option<u32>,
92+
#[l7_log(response_exception)]
7193
#[serde(
72-
rename = "response_execption",
94+
rename = "response_exception",
7395
skip_serializing_if = "value_is_default"
7496
)]
7597
pub error_message: String,
98+
#[l7_log(response_status)]
7699
#[serde(rename = "response_status")]
77100
pub status: L7ResponseStatus,
78101
#[serde(skip)]
@@ -87,8 +110,28 @@ pub struct OracleInfo {
87110

88111
#[serde(skip)]
89112
is_on_blacklist: bool,
113+
114+
#[serde(skip)]
115+
pub attributes: Vec<KeyVal>,
90116
}
117+
impl L7LogAttribute for OracleInfo {
118+
fn add_attribute(&mut self, name: Cow<'_, str>, value: Cow<'_, str>) {
119+
self.attributes.push(KeyVal {
120+
key: name.into_owned(),
121+
val: value.into_owned(),
122+
});
123+
}
124+
}
125+
91126
impl OracleInfo {
127+
pub fn get_request_type(&self) -> Field<'_> {
128+
Field::from(self.packet_type.as_str())
129+
}
130+
131+
pub fn set_request_type(&mut self, _setter: FieldSetter<'_>) {
132+
// TnsPacketType is an enum, skip rewrite
133+
}
134+
92135
pub fn merge(&mut self, other: &mut Self) {
93136
self.packet_type = other.packet_type;
94137
swap_if!(self, sql, is_empty, other);
@@ -120,6 +163,7 @@ impl OracleInfo {
120163
if other.auth_session_id.is_some() {
121164
self.auth_session_id = other.auth_session_id.take();
122165
}
166+
self.attributes.append(&mut other.attributes);
123167
}
124168

125169
fn set_is_on_blacklist(&mut self, config: &LogParserConfig) {
@@ -209,6 +253,7 @@ impl From<OracleInfo> for L7ProtocolSendLog {
209253
val: f.resp_data_flags.to_string(),
210254
});
211255
}
256+
attrs.extend(f.attributes);
212257
let log = L7ProtocolSendLog {
213258
captured_request_byte: f.captured_request_byte,
214259
captured_response_byte: f.captured_response_byte,
@@ -248,12 +293,12 @@ impl From<&OracleInfo> for LogCache {
248293
#[derive(Default)]
249294
pub struct OracleLog {
250295
perf_stats: Vec<L7PerfStats>,
251-
parser: OracleParser,
296+
custom_field_store: Store,
252297
}
253298

254299
impl L7ProtocolParserInterface for OracleLog {
255300
fn check_payload(&mut self, payload: &[u8], param: &ParseParam) -> Option<LogMessageType> {
256-
self.parser.check_payload(
301+
OracleParser::check_payload(
257302
payload,
258303
&OracleParseConfig {
259304
is_be: param.oracle_parse_conf.is_be,
@@ -264,22 +309,32 @@ impl L7ProtocolParserInterface for OracleLog {
264309
}
265310

266311
fn parse_payload(&mut self, payload: &[u8], param: &ParseParam) -> Result<L7ParseResult> {
267-
if !self.parser.parse_payload(
312+
let frames = OracleParser::parse_payload(
268313
payload,
269314
param.direction == PacketDirection::ClientToServer,
270315
&OracleParseConfig {
271316
is_be: param.oracle_parse_conf.is_be,
272317
int_compress: param.oracle_parse_conf.int_compressed,
273318
resp_0x04_extra_byte: param.oracle_parse_conf.resp_0x04_extra_byte,
274319
},
275-
) {
320+
);
321+
if frames.is_empty() {
276322
return Err(Error::L7ProtocolUnknown);
323+
}
324+
325+
let custom_policies = {
326+
self.custom_field_store.clear();
327+
param
328+
.parse_config
329+
.as_ref()
330+
.and_then(|c| c.get_custom_field_policies(L7Protocol::Oracle.into(), param))
277331
};
278332

279333
self.perf_stats.clear();
280334

281335
let mut info = vec![];
282-
for frame in self.parser.frames.drain(..) {
336+
for frame in frames {
337+
let frame_payload = frame.payload;
283338
let mut log_info = match frame.body {
284339
Body::Request(req) => OracleInfo {
285340
msg_type: param.direction.into(),
@@ -311,6 +366,23 @@ impl L7ProtocolParserInterface for OracleLog {
311366
},
312367
};
313368

369+
if let Some(policies) = custom_policies {
370+
if !log_info.sql.is_empty() {
371+
policies.apply(
372+
&mut self.custom_field_store,
373+
&log_info,
374+
TrafficDirection::REQUEST,
375+
Source::Sql(&log_info.sql, Some(frame_payload)),
376+
);
377+
for op in self.custom_field_store.drain_with(policies, &log_info) {
378+
match &op.op {
379+
Op::AddMetric(_, _) | Op::SaveHeader(_) | Op::SavePayload(_) => (),
380+
_ => auto_merge_custom_field(op, &mut log_info),
381+
}
382+
}
383+
}
384+
}
385+
314386
if let Some(config) = param.parse_config {
315387
log_info.set_is_on_blacklist(config);
316388
}

0 commit comments

Comments
 (0)