Skip to content

Commit 0605f52

Browse files
authored
Merge pull request #2 from KunoVonHagen/restructure-api
Refactor Fluvio C++ Client API to Modern Object-Oriented Design with Updated Rust Internals and C API
2 parents d190da7 + a1d0cf6 commit 0605f52

17 files changed

Lines changed: 399 additions & 290 deletions

README.md

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
The client API documentation is written in standard Markdown and generated dynamically into C++ headers. You can find the full API overview in [docs/API.md](docs/API.md).
1616

17+
The public API follows the same object-oriented layout as the original Fluvio client: connect with `Fluvio` or `FluvioAdmin`, then call methods on the returned objects.
18+
1719
## Installation
1820

1921
You can install the client effortlessly without compiling the heavy Rust toolchain by using `vcpkg`.
@@ -37,8 +39,8 @@ target_link_libraries(main PRIVATE fluvio_client_cpp::fluvio_client_cpp)
3739
#include "fluvio-client-cpp/src/lib.rs.h"
3840

3941
int main() {
40-
auto admin = fluvio_admin_connect();
41-
admin_create_topic(*admin, "a_topic", 1, 1);
42+
auto admin = FluvioAdmin::connect();
43+
admin->create_topic("a_topic", 1, 1);
4244
return 0;
4345
}
4446
```
@@ -51,18 +53,20 @@ int main() {
5153
#include <string>
5254

5355
int main() {
54-
auto client = fluvio_connect();
55-
auto producer = create_producer(*client, "my-topic");
56+
auto client = Fluvio::connect();
57+
auto producer = client->topic_producer("my-topic");
5658

5759
std::string payload = "FOOBAR";
5860
uint8_t key[] = {};
5961

60-
producer_send(*producer,
62+
auto out = producer->send(
6163
rust::Slice<const uint8_t>(key, 0),
6264
rust::Slice<const uint8_t>(reinterpret_cast<const uint8_t*>(payload.data()), payload.size())
6365
);
6466

65-
producer_flush(*producer);
67+
auto meta = out->wait();
68+
(void)meta;
69+
producer->flush();
6670
return 0;
6771
}
6872
```
@@ -74,16 +78,13 @@ int main() {
7478
#include <iostream>
7579

7680
int main() {
77-
auto client = fluvio_connect();
78-
auto consumer = partition_consumer(*client, "my-topic", 0);
79-
auto stream = consumer_stream(*consumer, 0); // Offset::beginning
81+
auto client = Fluvio::connect();
82+
auto stream = client->consumer_stream("my-topic", 0, 0); // Offset::beginning
8083

81-
for (int i = 0; i < 1; i++) {
82-
auto rec = stream_next(*stream);
83-
auto val = record_value(*rec);
84-
std::string payload(val.begin(), val.end());
85-
std::cout << payload << std::endl;
86-
}
84+
auto rec = stream->next();
85+
auto val = rec->value();
86+
std::string payload(val.begin(), val.end());
87+
std::cout << payload << std::endl;
8788

8889
return 0;
8990
}
@@ -108,4 +109,5 @@ cmake -B build
108109
cmake --build build
109110
cd build
110111
ctest --output-on-failure
112+
cd ..
111113
```

examples/README.md

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ Before building the examples, ensure you have the following:
1212

1313
## What's Included?
1414

15-
We provide two simple applications to demonstrate the core features of the client:
15+
We provide two simple applications to demonstrate the current object-oriented client API:
1616

17-
- **Producer (`producer.cpp`)**: Connects to the Fluvio cluster as an admin to ensure a topic named `example-topic` exists. It then creates a producer and sends a mock JSON payload representing sensor data.
18-
- **Consumer (`consumer.cpp`)**: Connects to the Fluvio cluster, opens a stream on `example-topic`, and parses the incoming JSON data using `nlohmann::json`.
17+
- **Producer (`producer.cpp`)**: Uses `FluvioAdmin::connect()` to ensure a topic named `example-topic` exists, then uses `Fluvio::connect()` and `client->topic_producer(...)` to send a JSON payload.
18+
- **Consumer (`consumer.cpp`)**: Uses `Fluvio::connect()` and `client->consumer_stream(...)` to read from `example-topic`, then parses the incoming JSON data with `nlohmann::json`.
1919

2020
## Building the Examples
2121

@@ -72,3 +72,14 @@ Parsed JSON successfully: Sensor=temp-01 Value=24.5
7272
```
7373

7474
Congratulations! You've successfully streamed data using C++!
75+
76+
## API Mapping
77+
78+
The examples mirror the types exported from `src/lib.rs`:
79+
80+
- `FluvioAdmin::connect()` creates an admin client.
81+
- `Fluvio::connect()` creates the main client.
82+
- `client->topic_producer(...)` returns a `TopicProducerPool` for sending records.
83+
- `client->consumer_stream(...)` returns a `FluvioStream` for receiving records.
84+
- `record->value()` exposes the fetched payload bytes.
85+

examples/consumer.cpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,21 @@ int main() {
99
try {
1010
fmt::print("Starting Fluvio Consumer Example...\n");
1111

12-
auto client = fluvio_connect();
13-
auto consumer = partition_consumer(*client, "example-topic", 0);
14-
15-
auto stream = consumer_stream(*consumer, 0); // Offset::beginning()
12+
auto client = Fluvio::connect();
13+
auto stream = client->consumer_stream("example-topic", 0, 0);
1614

1715
fmt::print("Waiting for messages...\n");
18-
19-
// Fetch one record
20-
auto rec = stream_next(*stream);
21-
auto val = record_value(*rec);
16+
17+
auto rec = stream->next();
18+
auto val = rec->value();
2219

2320
std::string payload(val.begin(), val.end());
2421
fmt::print("Received Raw Bytes: {}\n", payload);
2522

2623
try {
2724
json j = json::parse(payload);
28-
fmt::print("Parsed JSON successfully: Sensor={} Value={}\n",
29-
j["sensor"].get<std::string>(),
25+
fmt::print("Parsed JSON successfully: Sensor={} Value={}\n",
26+
j["sensor"].get<std::string>(),
3027
j["value"].get<double>());
3128
} catch (const json::parse_error& e) {
3229
fmt::print(stderr, "Failed to parse JSON: {}\n", e.what());

examples/producer.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,17 @@ int main() {
99
try {
1010
fmt::print("Starting Fluvio Producer Example...\n");
1111

12-
auto admin = fluvio_admin_connect();
12+
auto admin = FluvioAdmin::connect();
1313
try {
14-
admin_create_topic(*admin, "example-topic", 1, 1);
14+
admin->create_topic("example-topic", 1, 1);
1515
fmt::print("Created 'example-topic'.\n");
1616
} catch (...) {
1717
fmt::print("'example-topic' already exists or creation failed.\n");
1818
}
1919

20-
auto client = fluvio_connect();
21-
auto producer = create_producer(*client, "example-topic");
20+
auto client = Fluvio::connect();
21+
auto producer = client->topic_producer("example-topic");
2222

23-
// Create a JSON payload
2423
json j = {
2524
{"sensor", "temp-01"},
2625
{"value", 24.5},
@@ -31,13 +30,14 @@ int main() {
3130
fmt::print("Sending JSON: {}\n", payload);
3231

3332
uint8_t key[] = {'j', 's', 'o', 'n'};
34-
auto out = producer_send(*producer,
33+
auto out = producer->send(
3534
rust::Slice<const uint8_t>(key, sizeof(key)),
3635
rust::Slice<const uint8_t>(reinterpret_cast<const uint8_t*>(payload.data()), payload.size())
3736
);
3837

39-
auto meta = produce_output_wait(*out);
40-
producer_flush(*producer);
38+
auto meta = out->wait();
39+
(void)meta;
40+
producer->flush();
4141

4242
fmt::print("Record successfully sent to Fluvio!\n");
4343

src/admin.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
1-
use fluvio::FluvioAdmin;
1+
use fluvio::FluvioAdmin as FluvioAdminNative;
22
use fluvio_sc_schema::topic::TopicSpec;
33
use fluvio_future::task::run_block_on;
44

5-
pub struct FluvioAdminClient { pub inner: FluvioAdmin }
5+
pub struct FluvioAdmin { pub inner: FluvioAdminNative }
66

7-
pub fn fluvio_admin_connect() -> Result<Box<FluvioAdminClient>, String> {
8-
run_block_on(FluvioAdmin::connect()).map(|a| Box::new(FluvioAdminClient { inner: a })).map_err(|e| e.to_string())
9-
}
107

11-
pub fn admin_create_topic(admin: &FluvioAdminClient, topic: &str, partitions: i32, replicas: i32) -> Result<(), String> {
12-
run_block_on(admin.inner.create(topic.to_string(), false, TopicSpec::new_computed(partitions as u32, replicas as u32, None)))
13-
.map_err(|e| e.to_string())
14-
}
8+
impl FluvioAdmin {
9+
pub fn connect() -> Result<Box<FluvioAdmin>, String> {
10+
run_block_on(FluvioAdminNative::connect()).map(|a| Box::new(FluvioAdmin { inner: a })).map_err(|e| e.to_string())
11+
}
12+
13+
pub fn create_topic(self: &Self, topic: &str, partitions: i32, replicas: i32) -> Result<(), String> {
14+
run_block_on(self.inner.create(topic.to_string(), false, TopicSpec::new_computed(partitions as u32, replicas as u32, None)))
15+
.map_err(|e| e.to_string())
16+
}
17+
18+
pub fn delete_topic(self: &Self, topic: &str) -> Result<(), String> {
19+
run_block_on(self.inner.delete::<TopicSpec>(topic.to_string()))
20+
.map_err(|e| e.to_string())
21+
}
1522

16-
pub fn admin_delete_topic(admin: &FluvioAdminClient, topic: &str) -> Result<(), String> {
17-
run_block_on(admin.inner.delete::<TopicSpec>(topic.to_string()))
18-
.map_err(|e| e.to_string())
1923
}

0 commit comments

Comments
 (0)