Skip to content

Commit ed89af6

Browse files
authored
refactor: Mark skywalking/vector integration as feature (#11381)
1 parent 1e2f6a4 commit ed89af6

4 files changed

Lines changed: 79 additions & 48 deletions

File tree

agent/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ http2 = { path = "plugins/http2" }
7171
humantime = "2.1"
7272
humantime-serde = "1.0"
7373
hyper = { version = "0.14", features = ["full"] }
74-
integration_skywalking = { path = "plugins/integration_skywalking" }
75-
integration_vector = { path = "plugins/integration_vector" }
74+
integration_skywalking = { path = "plugins/integration_skywalking", optional = true }
75+
integration_vector = { path = "plugins/integration_vector", optional = true }
7676
ipnet = "2.4.0"
7777
ipnetwork = "0.18.0"
7878
lazy_static = "1.5.0"
@@ -180,6 +180,7 @@ walkdir = "2"
180180
[features]
181181
default = ["libtrace"]
182182
enterprise = ["dep:enterprise-utils", "dep:pcap-parser"]
183+
enterprise-integration = ["dep:integration_skywalking", "dep:integration_vector"]
183184
extended_observability = ["libtrace"]
184185
dylib_pcap = []
185186
libtrace = []

agent/src/config/handler.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2903,6 +2903,7 @@ impl ConfigHandler {
29032903
);
29042904
}
29052905

2906+
#[cfg(feature = "enterprise-integration")]
29062907
fn set_vector(handler: &ConfigHandler, components: &mut AgentComponents) {
29072908
components.vector_component.on_config_change(
29082909
handler.candidate_config.user_config.inputs.vector.enabled,
@@ -5345,18 +5346,21 @@ impl ConfigHandler {
53455346
tunning.session_aggregate_max_entries = new_tunning.session_aggregate_max_entries;
53465347
}
53475348

5348-
let vector = &mut config.inputs.vector;
5349-
let new_vector = &mut new_config.user_config.inputs.vector;
5350-
if vector.enabled != new_vector.enabled || vector.config != new_vector.config {
5351-
info!(
5352-
"vector inputs.vector from {:#?} to {:#?}",
5353-
vector, new_vector
5354-
);
5355-
if components.is_some() {
5356-
callbacks.push(Self::set_vector);
5349+
#[cfg(feature = "enterprise-integration")]
5350+
{
5351+
let vector = &mut config.inputs.vector;
5352+
let new_vector = &mut new_config.user_config.inputs.vector;
5353+
if vector.enabled != new_vector.enabled || vector.config != new_vector.config {
5354+
info!(
5355+
"vector inputs.vector from {:#?} to {:#?}",
5356+
vector, new_vector
5357+
);
5358+
if components.is_some() {
5359+
callbacks.push(Self::set_vector);
5360+
}
5361+
vector.enabled = new_vector.enabled;
5362+
vector.config = new_vector.config.clone();
53575363
}
5358-
vector.enabled = new_vector.enabled;
5359-
vector.config = new_vector.config.clone();
53605364
}
53615365

53625366
candidate_config.enabled = new_config.enabled;

agent/src/integration_collector.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use crate::{
6060
policy::PolicyGetter,
6161
};
6262

63+
#[cfg(feature = "enterprise-integration")]
6364
use integration_skywalking::{
6465
handle_skywalking_request, handle_skywalking_streaming_request, SkyWalkingExtra,
6566
};
@@ -612,7 +613,7 @@ async fn handler(
612613
telegraf_sender: DebugSender<TelegrafMetric>,
613614
profile_sender: DebugSender<Profile>,
614615
application_log_sender: DebugSender<ApplicationLog>,
615-
skywalking_sender: DebugSender<SkyWalkingExtra>,
616+
#[cfg(feature = "enterprise-integration")] skywalking_sender: DebugSender<SkyWalkingExtra>,
616617
datadog_sender: DebugSender<Datadog>,
617618
exception_handler: ExceptionHandler,
618619
compressed: bool,
@@ -844,6 +845,7 @@ async fn handler(
844845

845846
Ok(Response::builder().body(Body::empty()).unwrap())
846847
}
848+
#[cfg(feature = "enterprise-integration")]
847849
(
848850
&Method::POST,
849851
"/v3/segments"
@@ -866,6 +868,7 @@ async fn handler(
866868
.await,
867869
)
868870
}
871+
#[cfg(feature = "enterprise-integration")]
869872
(
870873
&Method::POST,
871874
"/skywalking.v3.TraceSegmentReportService/collect"
@@ -1041,6 +1044,7 @@ pub struct MetricServer {
10411044
telegraf_sender: DebugSender<TelegrafMetric>,
10421045
profile_sender: DebugSender<Profile>,
10431046
application_log_sender: DebugSender<ApplicationLog>,
1047+
#[cfg(feature = "enterprise-integration")]
10441048
skywalking_sender: DebugSender<SkyWalkingExtra>,
10451049
datadog_sender: DebugSender<Datadog>,
10461050
port: Arc<AtomicU16>,
@@ -1070,7 +1074,7 @@ impl MetricServer {
10701074
telegraf_sender: DebugSender<TelegrafMetric>,
10711075
profile_sender: DebugSender<Profile>,
10721076
application_log_sender: DebugSender<ApplicationLog>,
1073-
skywalking_sender: DebugSender<SkyWalkingExtra>,
1077+
#[cfg(feature = "enterprise-integration")] skywalking_sender: DebugSender<SkyWalkingExtra>,
10741078
datadog_sender: DebugSender<Datadog>,
10751079
port: u16,
10761080
exception_handler: ExceptionHandler,
@@ -1100,6 +1104,7 @@ impl MetricServer {
11001104
telegraf_sender,
11011105
profile_sender,
11021106
application_log_sender,
1107+
#[cfg(feature = "enterprise-integration")]
11031108
skywalking_sender,
11041109
datadog_sender,
11051110
port: Arc::new(AtomicU16::new(port)),
@@ -1151,6 +1156,7 @@ impl MetricServer {
11511156
let telegraf_sender = self.telegraf_sender.clone();
11521157
let profile_sender = self.profile_sender.clone();
11531158
let application_log_sender = self.application_log_sender.clone();
1159+
#[cfg(feature = "enterprise-integration")]
11541160
let skywalking_sender = self.skywalking_sender.clone();
11551161
let datadog_sender = self.datadog_sender.clone();
11561162
let port = self.port.clone();
@@ -1225,6 +1231,7 @@ impl MetricServer {
12251231
let telegraf_sender = telegraf_sender.clone();
12261232
let profile_sender = profile_sender.clone();
12271233
let application_log_sender = application_log_sender.clone();
1234+
#[cfg(feature = "enterprise-integration")]
12281235
let skywalking_sender = skywalking_sender.clone();
12291236
let datadog_sender = datadog_sender.clone();
12301237
let exception_handler_inner = exception_handler.clone();
@@ -1244,6 +1251,7 @@ impl MetricServer {
12441251
let telegraf_sender = telegraf_sender.clone();
12451252
let profile_sender = profile_sender.clone();
12461253
let application_log_sender = application_log_sender.clone();
1254+
#[cfg(feature = "enterprise-integration")]
12471255
let skywalking_sender = skywalking_sender.clone();
12481256
let datadog_sender = datadog_sender.clone();
12491257
let exception_handler = exception_handler_inner.clone();
@@ -1269,6 +1277,7 @@ impl MetricServer {
12691277
telegraf_sender.clone(),
12701278
profile_sender.clone(),
12711279
application_log_sender.clone(),
1280+
#[cfg(feature = "enterprise-integration")]
12721281
skywalking_sender.clone(),
12731282
datadog_sender.clone(),
12741283
exception_handler.clone(),

agent/src/trident.rs

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ use dns_lookup::lookup_host;
3636
use flexi_logger::{
3737
colored_opt_format, writers::LogWriter, Age, Cleanup, Criterion, FileSpec, Logger, Naming,
3838
};
39-
use integration_vector::vector_component::VectorComponent;
4039
use log::{debug, error, info, warn};
4140
use num_enum::{FromPrimitive, IntoPrimitive};
4241
use tokio::runtime::{Builder, Runtime};
@@ -116,7 +115,10 @@ use crate::{
116115
utils::environment::{IN_CONTAINER, K8S_WATCH_POLICY},
117116
};
118117

118+
#[cfg(feature = "enterprise-integration")]
119119
use integration_skywalking::SkyWalkingExtra;
120+
#[cfg(feature = "enterprise-integration")]
121+
use integration_vector::vector_component::VectorComponent;
120122
use packet_sequence_block::BoxedPacketSequenceBlock;
121123
use pcap_assembler::{BoxedPcapBatch, PcapAssembler};
122124

@@ -1806,6 +1808,7 @@ pub struct AgentComponents {
18061808
#[cfg(feature = "libtrace")]
18071809
pub proc_event_uniform_sender: UniformSenderThread<crate::common::proc_event::BoxedProcEvents>,
18081810
pub application_log_uniform_sender: UniformSenderThread<ApplicationLog>,
1811+
#[cfg(feature = "enterprise-integration")]
18091812
pub skywalking_uniform_sender: UniformSenderThread<SkyWalkingExtra>,
18101813
pub datadog_uniform_sender: UniformSenderThread<Datadog>,
18111814
pub exception_handler: ExceptionHandler,
@@ -1821,6 +1824,7 @@ pub struct AgentComponents {
18211824
pub policy_getter: PolicyGetter,
18221825
pub npb_bandwidth_watcher: Box<Arc<NpbBandwidthWatcher>>,
18231826
pub npb_arp_table: Arc<NpbArpTable>,
1827+
#[cfg(feature = "enterprise-integration")]
18241828
pub vector_component: VectorComponent,
18251829
pub is_ce_version: bool, // Determine whether the current version is a ce version, CE-AGENT always set pcap-assembler disabled
18261830
pub tap_interfaces: Vec<Link>,
@@ -2124,7 +2128,8 @@ impl AgentComponents {
21242128
agent_mode: RunningMode,
21252129
runtime: Arc<Runtime>,
21262130
sender_leaky_bucket: Arc<LeakyBucket>,
2127-
ipmac_tx: Arc<broadcast::Sender<IpMacPair>>,
2131+
// only used in vector component
2132+
#[allow(unused)] ipmac_tx: Arc<broadcast::Sender<IpMacPair>>,
21282133
) -> Result<Self> {
21292134
let static_config = &config_handler.static_config;
21302135
let candidate_config = &config_handler.candidate_config;
@@ -2771,37 +2776,41 @@ impl AgentComponents {
27712776
sender_leaky_bucket.clone(),
27722777
);
27732778

2774-
let skywalking_queue_name = "1-skywalking-to-sender";
2775-
let (skywalking_sender, skywalking_receiver, counter) = queue::bounded_with_debug(
2776-
user_config
2777-
.processors
2778-
.flow_log
2779-
.tunning
2780-
.flow_aggregator_queue_size,
2781-
skywalking_queue_name,
2782-
&queue_debugger,
2783-
);
2784-
stats_collector.register_countable(
2785-
&QueueStats {
2786-
module: skywalking_queue_name,
2787-
..Default::default()
2788-
},
2789-
Countable::Owned(Box::new(counter)),
2790-
);
2791-
let skywalking_uniform_sender = UniformSenderThread::new(
2792-
skywalking_queue_name,
2793-
Arc::new(skywalking_receiver),
2794-
config_handler.sender(),
2795-
stats_collector.clone(),
2796-
exception_handler.clone(),
2797-
None,
2798-
if candidate_config.metric_server.compressed {
2799-
SenderEncoder::Zstd
2800-
} else {
2801-
SenderEncoder::Raw
2802-
},
2803-
sender_leaky_bucket.clone(),
2804-
);
2779+
#[cfg(feature = "enterprise-integration")]
2780+
let (skywalking_sender, skywalking_uniform_sender) = {
2781+
let skywalking_queue_name = "1-skywalking-to-sender";
2782+
let (skywalking_sender, skywalking_receiver, counter) = queue::bounded_with_debug(
2783+
user_config
2784+
.processors
2785+
.flow_log
2786+
.tunning
2787+
.flow_aggregator_queue_size,
2788+
skywalking_queue_name,
2789+
&queue_debugger,
2790+
);
2791+
stats_collector.register_countable(
2792+
&QueueStats {
2793+
module: skywalking_queue_name,
2794+
..Default::default()
2795+
},
2796+
Countable::Owned(Box::new(counter)),
2797+
);
2798+
let skywalking_uniform_sender = UniformSenderThread::new(
2799+
skywalking_queue_name,
2800+
Arc::new(skywalking_receiver),
2801+
config_handler.sender(),
2802+
stats_collector.clone(),
2803+
exception_handler.clone(),
2804+
None,
2805+
if candidate_config.metric_server.compressed {
2806+
SenderEncoder::Zstd
2807+
} else {
2808+
SenderEncoder::Raw
2809+
},
2810+
sender_leaky_bucket.clone(),
2811+
);
2812+
(skywalking_sender, skywalking_uniform_sender)
2813+
};
28052814

28062815
let datadog_queue_name = "1-datadog-to-sender";
28072816
let (datadog_sender, datadog_receiver, counter) = queue::bounded_with_debug(
@@ -3099,6 +3108,7 @@ impl AgentComponents {
30993108
telegraf_sender,
31003109
profile_sender,
31013110
application_log_sender,
3111+
#[cfg(feature = "enterprise-integration")]
31023112
skywalking_sender,
31033113
datadog_sender,
31043114
candidate_config.metric_server.port,
@@ -3154,6 +3164,7 @@ impl AgentComponents {
31543164
&stats::NoTagModule("npb_bandwidth_watcher"),
31553165
Countable::Ref(Arc::downgrade(&npb_bandwidth_watcher_counter) as Weak<dyn RefCountable>),
31563166
);
3167+
#[cfg(feature = "enterprise-integration")]
31573168
let vector_component = VectorComponent::new(
31583169
user_config.inputs.vector.enabled,
31593170
user_config.inputs.vector.config.clone(),
@@ -3193,6 +3204,7 @@ impl AgentComponents {
31933204
#[cfg(feature = "libtrace")]
31943205
proc_event_uniform_sender,
31953206
application_log_uniform_sender,
3207+
#[cfg(feature = "enterprise-integration")]
31963208
skywalking_uniform_sender,
31973209
datadog_uniform_sender,
31983210
capture_mode: candidate_config.capture_mode,
@@ -3211,6 +3223,7 @@ impl AgentComponents {
32113223
policy_getter,
32123224
npb_bandwidth_watcher,
32133225
npb_arp_table,
3226+
#[cfg(feature = "enterprise-integration")]
32143227
vector_component,
32153228
runtime,
32163229
dispatcher_components,
@@ -3285,6 +3298,7 @@ impl AgentComponents {
32853298
#[cfg(feature = "libtrace")]
32863299
self.proc_event_uniform_sender.start();
32873300
self.application_log_uniform_sender.start();
3301+
#[cfg(feature = "enterprise-integration")]
32883302
self.skywalking_uniform_sender.start();
32893303
self.datadog_uniform_sender.start();
32903304
if self.config.metric_server.enabled {
@@ -3295,6 +3309,7 @@ impl AgentComponents {
32953309

32963310
self.npb_bandwidth_watcher.start();
32973311
self.npb_arp_table.start();
3312+
#[cfg(feature = "enterprise-integration")]
32983313
self.vector_component.start();
32993314
#[cfg(any(target_os = "linux", target_os = "android"))]
33003315
self.process_listener.start();
@@ -3361,6 +3376,7 @@ impl AgentComponents {
33613376
if let Some(h) = self.application_log_uniform_sender.notify_stop() {
33623377
join_handles.push(h);
33633378
}
3379+
#[cfg(feature = "enterprise-integration")]
33643380
if let Some(h) = self.skywalking_uniform_sender.notify_stop() {
33653381
join_handles.push(h);
33663382
}
@@ -3386,6 +3402,7 @@ impl AgentComponents {
33863402
if let Some(h) = self.process_listener.notify_stop() {
33873403
join_handles.push(h);
33883404
}
3405+
#[cfg(feature = "enterprise-integration")]
33893406
if let Some(h) = self.vector_component.notify_stop() {
33903407
join_handles.push(h);
33913408
}

0 commit comments

Comments
 (0)