Skip to content

Commit 2a29763

Browse files
committed
Remove Kafka support and make stream bus MQTT-only
1 parent 9a9d878 commit 2a29763

9 files changed

Lines changed: 12 additions & 253 deletions

File tree

Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ tokio-tungstenite = "0.21"
2727
reqwest = { version = "0.11", features = ["json"] }
2828
futures-util = "0.3"
2929

30-
[target.'cfg(not(windows))'.dependencies]
31-
rdkafka = "0.38.0"
32-
3330
[lib]
3431
name = "janus"
3532
path = "src/lib.rs"

src/bin/stream_bus_cli.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
//! Stream Bus CLI - Command Line tool for the Stream Bus to publish the data to a broker and storage.
22
//!
33
//! Usage:
4-
//! stream-bus-cli --input data/sensors.nq --broker kafka --topics sensors --rate 64
54
//! stream-bus-cli --input data/sensors.nq --broker mqtt --topics sensors --rate 64 --loop-file
65
//! stream-bus-cli --input data/sensors.nq --broker none --rate 0
76
87
use clap::Parser;
98
use janus::storage::segmented_storage::StreamingSegmentedStorage;
109
use janus::storage::util::StreamingConfig;
11-
use janus::stream_bus::{BrokerType, KafkaConfig, MqttConfig, StreamBus, StreamBusConfig};
10+
use janus::stream_bus::{BrokerType, MqttConfig, StreamBus, StreamBusConfig};
1211
use std::sync::Arc;
1312

1413
#[derive(Parser, Debug)]
@@ -19,8 +18,8 @@ struct Args {
1918
#[arg(short, long)]
2019
input: String,
2120

22-
/// Broker type: kafka, mqtt, or none
23-
#[arg(short, long, default_value = "kafka")]
21+
/// Broker type: mqtt or none
22+
#[arg(short, long, default_value = "mqtt")]
2423
broker: String,
2524

2625
/// Topics to publish to (comma-separated)
@@ -39,10 +38,6 @@ struct Args {
3938
#[arg(long)]
4039
add_timestamps: bool,
4140

42-
/// Kafka bootstrap servers
43-
#[arg(long, default_value = "localhost:9092")]
44-
kafka_servers: String,
45-
4641
/// MQTT host
4742
#[arg(long, default_value = "localhost")]
4843
mqtt_host: String,
@@ -76,12 +71,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
7671
let storage = Arc::new(storage);
7772

7873
let broker_type = match args.broker.to_lowercase().as_str() {
79-
"kafka" => BrokerType::Kafka,
8074
"mqtt" => BrokerType::Mqtt,
8175
"none" => BrokerType::None,
8276
_ => {
8377
eprintln!("Error: Unknown broker type: {}", args.broker);
84-
eprintln!("Valid options: kafka, mqtt, none");
78+
eprintln!("Valid options: mqtt, none");
8579
std::process::exit(1);
8680
}
8781
};
@@ -95,12 +89,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
9589
rate_of_publishing: args.rate,
9690
loop_file: args.loop_file,
9791
add_timestamps: args.add_timestamps,
98-
kafka_config: match broker_type {
99-
BrokerType::Kafka => {
100-
Some(KafkaConfig { bootstrap_servers: args.kafka_servers, ..Default::default() })
101-
}
102-
_ => None,
103-
},
10492
mqtt_config: match broker_type {
10593
BrokerType::Mqtt => Some(MqttConfig {
10694
host: args.mqtt_host,

src/http/server.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
parsing::rdf_parser,
1010
registry::query_registry::{QueryId, QueryRegistry},
1111
storage::segmented_storage::StreamingSegmentedStorage,
12-
stream_bus::{BrokerType, KafkaConfig, MqttConfig, StreamBus, StreamBusConfig},
12+
stream_bus::{BrokerType, MqttConfig, StreamBus, StreamBusConfig},
1313
};
1414
use axum::{
1515
extract::{
@@ -93,7 +93,6 @@ pub struct StartReplayRequest {
9393
pub loop_file: bool,
9494
#[serde(default = "default_true")]
9595
pub add_timestamps: bool,
96-
pub kafka_config: Option<KafkaConfigDto>,
9796
pub mqtt_config: Option<MqttConfigDto>,
9897
}
9998

@@ -113,13 +112,6 @@ fn default_true() -> bool {
113112
true
114113
}
115114

116-
#[derive(Debug, Deserialize)]
117-
pub struct KafkaConfigDto {
118-
pub bootstrap_servers: String,
119-
pub client_id: String,
120-
pub message_timeout_ms: String,
121-
}
122-
123115
#[derive(Debug, Deserialize)]
124116
pub struct MqttConfigDto {
125117
pub host: String,
@@ -418,24 +410,17 @@ async fn start_replay(
418410

419411
// Parse broker type
420412
let broker_type = match payload.broker_type.to_lowercase().as_str() {
421-
"kafka" => BrokerType::Kafka,
422413
"mqtt" => BrokerType::Mqtt,
423414
"none" => BrokerType::None,
424415
_ => {
425416
return Err(ApiError::BadRequest(format!(
426-
"Invalid broker type: {}. Use 'kafka', 'mqtt', or 'none'",
417+
"Invalid broker type: {}. Use 'mqtt' or 'none'",
427418
payload.broker_type
428419
)))
429420
}
430421
};
431422

432423
// Convert configs
433-
let kafka_config = payload.kafka_config.map(|cfg| KafkaConfig {
434-
bootstrap_servers: cfg.bootstrap_servers,
435-
client_id: cfg.client_id,
436-
message_timeout_ms: cfg.message_timeout_ms,
437-
});
438-
439424
let mqtt_config = payload.mqtt_config.map(|cfg| MqttConfig {
440425
host: cfg.host,
441426
port: cfg.port,
@@ -450,7 +435,6 @@ async fn start_replay(
450435
rate_of_publishing: payload.rate_of_publishing,
451436
loop_file: payload.loop_file,
452437
add_timestamps: payload.add_timestamps,
453-
kafka_config,
454438
mqtt_config,
455439
};
456440

src/sources/kafka_adapter.rs

Lines changed: 0 additions & 106 deletions
This file was deleted.

src/sources/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#[cfg(not(windows))]
2-
pub mod kafka_adapter;
31
pub mod mqtt_adapter;
42
pub mod stream_ingestion_pipeline;
53
pub mod stream_source;

src/sources/stream_ingestion_pipeline.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ impl StreamIngestionPipeline {
1313
StreamIngestionPipeline { storage, sources: Vec::new() }
1414
}
1515

16-
/// Adding the source for the stream ingestion pipeline (which can be MQTT, Kafka, etc.)
16+
/// Adding a source for the stream ingestion pipeline (for example MQTT).
1717
pub fn add_source(&mut self, source: Box<dyn StreamSource>) {
1818
self.sources.push(source);
1919
}

src/stream_bus/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
pub mod stream_bus;
22

33
pub use stream_bus::{
4-
BrokerType, KafkaConfig, MqttConfig, StreamBus, StreamBusConfig, StreamBusError,
5-
StreamBusMetrics,
4+
BrokerType, MqttConfig, StreamBus, StreamBusConfig, StreamBusError, StreamBusMetrics,
65
};

0 commit comments

Comments
 (0)