Skip to content

Commit 8d5922b

Browse files
committed
Updated to latest fluvio-client-cpp version
1 parent 87b329d commit 8d5922b

5 files changed

Lines changed: 31 additions & 25 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.idea

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ include(ZeekPlugin)
55
include(FetchContent)
66
FetchContent_Declare(
77
fluvio_client_cpp
8-
URL https://github.com/stefanDeveloper/fluvio-client-cpp/releases/download/v0.0.2/fluvio-client-cpp-linux-x64.tar.gz
8+
URL https://github.com/stefanDeveloper/fluvio-client-cpp/releases/download/v0.1.0/fluvio-client-cpp-linux-x64.tar.gz
99
)
1010
FetchContent_MakeAvailable(fluvio_client_cpp)
1111
find_package(fluvio_client_cpp CONFIG REQUIRED PATHS ${fluvio_client_cpp_SOURCE_DIR})

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Zeek-Fluvio Plugin
22

3-
A high-performance Zeek log writer plugin that bridges telemetry payloads seamlessly into **Fluvio** streams. Built natively on the [Fluvio C++ Client](https://github.com/infinyon/fluvio-client-cpp), this plugin ensures low-latency data ingestion for security operations and network analysis.
3+
A high-performance Zeek log writer plugin that bridges telemetry payloads seamlessly into **Fluvio** streams. Built natively on the [Fluvio C++ Client](https://github.com/stefanDeveloper/fluvio-client-cpp), this plugin ensures low-latency data ingestion for security operations and network analysis.
44

55
## 🚀 Features
66

@@ -24,9 +24,9 @@ zkg install zeek-fluvio
2424
If you prefer to build from source:
2525

2626
```bash
27-
git clone https://github.com/infinyon/zeek-fluvio
27+
git clone https://github.com/stefanDeveloper/zeek-fluvio.git
2828
cd zeek-fluvio
29-
./configure --zeek-dist=/path/to/zeek/source
29+
./configure
3030
make -C build
3131
sudo make -C build install
3232
```

src/FluvioWriter.cc

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,29 @@
44
namespace logging {
55
namespace writer {
66

7-
FluvioWriter::FluvioWriter(zeek::logging::WriterFrontend* frontend) : zeek::logging::WriterBackend(frontend), client(nullptr), producer(nullptr) {
7+
FluvioWriter::FluvioWriter(zeek::logging::WriterFrontend* frontend) : zeek::logging::WriterBackend(frontend) {
88
formatter = new zeek::threading::formatter::JSON(this, zeek::threading::formatter::JSON::TS_EPOCH);
99
}
1010

1111
FluvioWriter::~FluvioWriter() {
1212
delete formatter;
13-
if (producer) {
14-
fluvio_c_producer_free(producer);
15-
}
16-
if (client) {
17-
fluvio_c_client_free(client);
18-
}
1913
}
2014

2115
bool FluvioWriter::DoInit(const zeek::logging::WriterBackend::WriterInfo& info, int num_fields, const zeek::threading::Field* const* fields) {
2216
topic_name = info.path;
2317

24-
if (fluvio_c_connect(&client) != 0) {
25-
Error("Failed to connect to Fluvio Cluster");
18+
try {
19+
this->client = Fluvio::connect();
20+
} catch (const std::exception& e) {
21+
std::string err = std::string("Failed to connect to Fluvio: ") + e.what();
22+
Error(err.c_str());
2623
return false;
2724
}
2825

29-
if (fluvio_c_create_producer(client, topic_name.c_str(), &producer) != 0) {
30-
std::string err = "Failed to instantiate producer for topic: " + topic_name;
26+
try {
27+
this->producer = (*this->client)->topic_producer(topic_name);
28+
} catch (const std::exception& e) {
29+
std::string err = std::string("Failed to create producer for topic '") + topic_name + "': " + e.what();
3130
Error(err.c_str());
3231
return false;
3332
}
@@ -36,16 +35,19 @@ bool FluvioWriter::DoInit(const zeek::logging::WriterBackend::WriterInfo& info,
3635
}
3736

3837
bool FluvioWriter::DoWrite(int num_fields, const zeek::threading::Field* const* fields, zeek::threading::Value** vals) {
39-
if (!producer) return false;
38+
if (!this->producer) return false;
4039

4140
zeek::ODesc desc;
42-
formatter->Describe(&desc, num_fields, fields, vals);
41+
this->formatter->Describe(&desc, num_fields, fields, vals);
4342

4443
std::string payload = desc.Description();
45-
if (fluvio_c_producer_send(producer, nullptr, 0, (const uint8_t*)payload.data(), payload.size(), nullptr) != 0) {
46-
Warning("Dropped record due to internal queue/send limits");
47-
return false;
48-
}
44+
auto out = (*this->producer)->send(
45+
rust::Slice<const uint8_t>(nullptr, 0),
46+
rust::Slice<const uint8_t>(
47+
reinterpret_cast<const uint8_t*>(payload.data()),
48+
payload.size()
49+
)
50+
);
4951

5052
return true;
5153
}
@@ -54,12 +56,13 @@ bool FluvioWriter::DoSetBuf(bool enabled) { return true; }
5456
bool FluvioWriter::DoRotate(const char* rotated_path, double open, double close, bool terminating) { return true; }
5557

5658
bool FluvioWriter::DoFlush(double network_time) {
57-
if (producer) fluvio_c_producer_flush(producer);
59+
if (!this->producer) return false;
60+
(*this->producer)->flush();
5861
return true;
5962
}
6063

6164
bool FluvioWriter::DoFinish(double network_time) {
62-
if (producer) fluvio_c_producer_flush(producer);
65+
if (this->producer) (*this->producer)->flush();
6366
return true;
6467
}
6568

src/FluvioWriter.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#pragma once
2+
#include "rust/cxx.h"
3+
#include "fluvio-client-cpp/src/lib.rs.h"
24
#include <zeek/logging/WriterBackend.h>
35
#include <zeek/threading/formatters/JSON.h>
46
#include <fluvio.h>
@@ -27,8 +29,8 @@ class FluvioWriter : public zeek::logging::WriterBackend {
2729

2830
private:
2931
zeek::threading::formatter::JSON* formatter;
30-
fluvio_client_t* client;
31-
fluvio_topic_producer_t* producer;
32+
std::optional<rust::Box<Fluvio>> client;
33+
std::optional<rust::Box<TopicProducerPool>> producer;
3234
std::string topic_name;
3335
};
3436

0 commit comments

Comments
 (0)