Skip to content

Commit dd68cbc

Browse files
committed
feat: Handle Oracle cross-payload binds
Cache Oracle request info across frames so custom SQL extraction can resolve bind values when SQL and binds arrive separately. Keep the local enterprise-utils stubs aligned with the parser changes.
1 parent 040835f commit dd68cbc

2 files changed

Lines changed: 100 additions & 14 deletions

File tree

  • agent
    • crates/enterprise-utils/src
    • src/flow_generator/protocol_logs/sql

agent/crates/enterprise-utils/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,13 @@ pub mod l7 {
311311
}
312312
}
313313

314+
pub fn find_bind_values<'a>(
315+
_: &'a [u8],
316+
_: usize,
317+
) -> Option<impl Iterator<Item = ()> + 'a> {
318+
Some(std::iter::empty())
319+
}
320+
314321
#[derive(Serialize, Clone, Copy, Debug, Default, PartialEq)]
315322
pub enum TnsPacketType {
316323
#[default]

agent/src/flow_generator/protocol_logs/sql/oracle.rs

Lines changed: 93 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ use enterprise_utils::l7::{
4444
custom_policy::{
4545
custom_field_policy::{
4646
enums::{Op, Source},
47-
Store,
47+
PolicySlice, Store,
4848
},
4949
enums::TrafficDirection,
5050
},
5151
sql::oracle::{
52-
Body, CallId, DataFlags, DataId, OracleParseConfig, OracleParser, TnsPacketType,
52+
find_bind_values, Body, CallId, DataFlags, DataId, OracleParseConfig, OracleParser,
53+
TnsPacketType,
5354
},
5455
};
5556
use public::l7_protocol::{Field, FieldSetter, L7Log, L7LogAttribute, L7Protocol, LogMessageType};
@@ -294,6 +295,66 @@ impl From<&OracleInfo> for LogCache {
294295
pub struct OracleLog {
295296
perf_stats: Vec<L7PerfStats>,
296297
custom_field_store: Store,
298+
cached_info: Option<CachedOracleInfo>,
299+
}
300+
301+
struct CachedOracleInfo {
302+
info: OracleInfo,
303+
bind_count: usize,
304+
}
305+
306+
fn sql_max_bind_placeholder(sql: &str) -> usize {
307+
let bytes = sql.as_bytes();
308+
let mut max_n: usize = 0;
309+
let mut i = 0;
310+
while i < bytes.len() {
311+
if bytes[i] != b':' {
312+
i += 1;
313+
continue;
314+
}
315+
316+
let mut start = i + 1;
317+
if start < bytes.len() && (bytes[start] == b'x' || bytes[start] == b'X') {
318+
start += 1;
319+
}
320+
let mut end = start;
321+
while end < bytes.len() && bytes[end].is_ascii_digit() {
322+
end += 1;
323+
}
324+
if end > start {
325+
if let Ok(n) = sql[start..end].parse::<usize>() {
326+
if n > max_n {
327+
max_n = n;
328+
}
329+
}
330+
i = end;
331+
continue;
332+
}
333+
i += 1;
334+
}
335+
max_n
336+
}
337+
338+
impl OracleLog {
339+
fn apply_custom_field_operations(
340+
&mut self,
341+
policies: PolicySlice,
342+
info: &mut OracleInfo,
343+
frame_payload: &[u8],
344+
) {
345+
policies.apply(
346+
&mut self.custom_field_store,
347+
info,
348+
TrafficDirection::REQUEST,
349+
Source::Sql(&info.sql, Some(frame_payload)),
350+
);
351+
for op in self.custom_field_store.drain_with(policies, info) {
352+
match &op.op {
353+
Op::AddMetric(_, _) | Op::SaveHeader(_) | Op::SavePayload(_) => (),
354+
_ => auto_merge_custom_field(op, info),
355+
}
356+
}
357+
}
297358
}
298359

299360
impl L7ProtocolParserInterface for OracleLog {
@@ -335,6 +396,7 @@ impl L7ProtocolParserInterface for OracleLog {
335396
let mut info = vec![];
336397
for frame in frames {
337398
let frame_payload = frame.payload;
399+
let is_request = matches!(&frame.body, Body::Request(_));
338400
let mut log_info = match frame.body {
339401
Body::Request(req) => OracleInfo {
340402
msg_type: param.direction.into(),
@@ -367,18 +429,35 @@ impl L7ProtocolParserInterface for OracleLog {
367429
};
368430

369431
if let Some(policies) = custom_policies {
370-
if !log_info.sql.is_empty() {
371-
policies.apply(
372-
&mut self.custom_field_store,
373-
&log_info,
374-
TrafficDirection::REQUEST,
375-
Source::Sql(&log_info.sql, Some(frame_payload)),
376-
);
377-
for op in self.custom_field_store.drain_with(policies, &log_info) {
378-
match &op.op {
379-
Op::AddMetric(_, _) | Op::SaveHeader(_) | Op::SavePayload(_) => (),
380-
_ => auto_merge_custom_field(op, &mut log_info),
381-
}
432+
// Oracle 请求里,SQL 文本和 bind value 可能分布在两个 request frame 中。
433+
// 先缓存只携带 SQL 的请求,等待下一帧带上 bind value 后再重新执行自定义 SQL 提取。
434+
//
435+
// In Oracle requests, the SQL text and bind values may be split across two
436+
// request frames. Cache the SQL-only request first, then re-run custom SQL
437+
// extraction when the next request frame carries the bind values.
438+
if !is_request {
439+
self.cached_info = None;
440+
} else if !log_info.sql.is_empty() {
441+
let bind_count = sql_max_bind_placeholder(&log_info.sql);
442+
let can_apply_custom_fields =
443+
bind_count == 0 || find_bind_values(frame_payload, bind_count).is_some();
444+
if can_apply_custom_fields {
445+
self.apply_custom_field_operations(policies, &mut log_info, frame_payload);
446+
self.cached_info = None;
447+
} else {
448+
self.cached_info = Some(CachedOracleInfo {
449+
info: log_info,
450+
bind_count,
451+
});
452+
continue;
453+
}
454+
} else if let Some(mut cached) = self.cached_info.take() {
455+
if find_bind_values(frame_payload, cached.bind_count).is_some() {
456+
cached.info.merge(&mut log_info);
457+
log_info = cached.info;
458+
self.apply_custom_field_operations(policies, &mut log_info, frame_payload);
459+
} else {
460+
self.cached_info = Some(cached);
382461
}
383462
}
384463
}

0 commit comments

Comments
 (0)