Skip to content

Commit 0b759e1

Browse files
Add native C support
1 parent d0560b3 commit 0b759e1

5 files changed

Lines changed: 229 additions & 0 deletions

File tree

CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,12 @@ target_link_libraries(test_config PRIVATE
6565
pthread dl m
6666
)
6767
add_test(NAME fluvio_config_test COMMAND test_config)
68+
69+
# Pure C Native Test
70+
add_executable(test_c tests/test_c.c)
71+
target_include_directories(test_c PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include)
72+
target_link_libraries(test_c PRIVATE
73+
${CMAKE_CURRENT_SOURCE_DIR}/${RUST_TARGET_DIR}/libfluvio_client_cpp_sys.a
74+
stdc++ pthread dl m
75+
)
76+
add_test(NAME fluvio_c_native_test COMMAND test_c)

include/fluvio.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#ifndef FLUVIO_C_H
2+
#define FLUVIO_C_H
3+
4+
#include <stdint.h>
5+
#include <stddef.h>
6+
7+
#ifdef __cplusplus
8+
extern "C" {
9+
#endif
10+
11+
// Opaque types
12+
typedef struct fluvio_client_opaque fluvio_client_t;
13+
typedef struct fluvio_producer_opaque fluvio_topic_producer_t;
14+
typedef struct fluvio_consumer_opaque fluvio_partition_consumer_t;
15+
typedef struct fluvio_produce_output_opaque fluvio_produce_output_t;
16+
typedef struct fluvio_stream_opaque fluvio_stream_t;
17+
typedef struct fluvio_record_opaque fluvio_record_t;
18+
19+
// Client
20+
int fluvio_c_connect(fluvio_client_t** out_client);
21+
void fluvio_c_client_free(fluvio_client_t* client);
22+
23+
// Producer
24+
int fluvio_c_create_producer(fluvio_client_t* client, const char* topic, fluvio_topic_producer_t** out_producer);
25+
int fluvio_c_producer_send(fluvio_topic_producer_t* producer, const uint8_t* key, size_t key_len, const uint8_t* val, size_t val_len, fluvio_produce_output_t** out);
26+
int fluvio_c_produce_output_wait(fluvio_produce_output_t* out);
27+
int fluvio_c_producer_flush(fluvio_topic_producer_t* producer);
28+
void fluvio_c_producer_free(fluvio_topic_producer_t* producer);
29+
void fluvio_c_produce_output_free(fluvio_produce_output_t* out);
30+
31+
// Consumer
32+
int fluvio_c_partition_consumer(fluvio_client_t* client, const char* topic, uint32_t partition, fluvio_partition_consumer_t** out_consumer);
33+
int fluvio_c_consumer_stream(fluvio_partition_consumer_t* consumer, int64_t offset_index, fluvio_stream_t** out_stream);
34+
int fluvio_c_stream_next(fluvio_stream_t* stream, fluvio_record_t** out_record);
35+
int fluvio_c_record_value(fluvio_record_t* record, const uint8_t** out_buf, size_t* out_len);
36+
void fluvio_c_record_free(fluvio_record_t* record);
37+
void fluvio_c_stream_free(fluvio_stream_t* stream);
38+
void fluvio_c_consumer_free(fluvio_partition_consumer_t* consumer);
39+
40+
#ifdef __cplusplus
41+
}
42+
#endif
43+
44+
#endif // FLUVIO_C_H

src/c_api.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use crate::client::{FluvioClient, fluvio_connect};
2+
use crate::producer::{FluvioProducer, create_producer, producer_send, producer_flush};
3+
use crate::consumer::{FluvioConsumer, FluvioStream, FluvioRecord, partition_consumer, consumer_stream, stream_next};
4+
use crate::produce_output::{FluvioProduceOutput, produce_output_wait};
5+
use std::os::raw::c_char;
6+
use std::ffi::CStr;
7+
8+
#[unsafe(no_mangle)]
9+
pub extern "C" fn fluvio_c_connect(out_client: *mut *mut FluvioClient) -> i32 {
10+
match fluvio_connect() {
11+
Ok(client) => {
12+
unsafe { *out_client = Box::into_raw(client); }
13+
0
14+
}
15+
Err(_) => -1,
16+
}
17+
}
18+
19+
#[unsafe(no_mangle)]
20+
pub extern "C" fn fluvio_c_client_free(client: *mut FluvioClient) {
21+
if !client.is_null() { unsafe { let _ = Box::from_raw(client); } }
22+
}
23+
24+
#[unsafe(no_mangle)]
25+
pub extern "C" fn fluvio_c_create_producer(client: *mut FluvioClient, topic: *const c_char, out_producer: *mut *mut FluvioProducer) -> i32 {
26+
if client.is_null() || topic.is_null() || out_producer.is_null() { return -1; }
27+
let topic_str = unsafe { CStr::from_ptr(topic).to_str() }.unwrap_or("");
28+
match create_producer(unsafe { &*client }, topic_str) {
29+
Ok(producer) => { unsafe { *out_producer = Box::into_raw(producer); } 0 }
30+
Err(_) => -1,
31+
}
32+
}
33+
34+
#[unsafe(no_mangle)]
35+
pub extern "C" fn fluvio_c_producer_send(producer: *mut FluvioProducer, key: *const u8, key_len: usize, val: *const u8, val_len: usize, out: *mut *mut FluvioProduceOutput) -> i32 {
36+
if producer.is_null() || (key.is_null() && key_len > 0) || (val.is_null() && val_len > 0) { return -1; }
37+
let key_slice = if key_len > 0 { unsafe { std::slice::from_raw_parts(key, key_len) } } else { &[] };
38+
let val_slice = if val_len > 0 { unsafe { std::slice::from_raw_parts(val, val_len) } } else { &[] };
39+
match producer_send(unsafe { &*producer }, key_slice, val_slice) {
40+
Ok(o) => { if !out.is_null() { unsafe { *out = Box::into_raw(o); } } 0 }
41+
Err(_) => -1,
42+
}
43+
}
44+
45+
#[unsafe(no_mangle)]
46+
pub extern "C" fn fluvio_c_produce_output_wait(out: *mut FluvioProduceOutput) -> i32 {
47+
if out.is_null() { return -1; }
48+
match produce_output_wait(unsafe { &mut *out }) { Ok(_) => 0, Err(_) => -1 }
49+
}
50+
51+
#[unsafe(no_mangle)]
52+
pub extern "C" fn fluvio_c_producer_flush(producer: *mut FluvioProducer) -> i32 {
53+
if producer.is_null() { return -1; }
54+
match producer_flush(unsafe { &*producer }) { Ok(_) => 0, Err(_) => -1 }
55+
}
56+
57+
#[unsafe(no_mangle)]
58+
pub extern "C" fn fluvio_c_producer_free(producer: *mut FluvioProducer) {
59+
if !producer.is_null() { unsafe { let _ = Box::from_raw(producer); } }
60+
}
61+
62+
#[unsafe(no_mangle)]
63+
pub extern "C" fn fluvio_c_produce_output_free(out: *mut FluvioProduceOutput) {
64+
if !out.is_null() { unsafe { let _ = Box::from_raw(out); } }
65+
}
66+
67+
#[unsafe(no_mangle)]
68+
pub extern "C" fn fluvio_c_partition_consumer(client: *mut FluvioClient, topic: *const c_char, partition: u32, out_consumer: *mut *mut FluvioConsumer) -> i32 {
69+
if client.is_null() || topic.is_null() || out_consumer.is_null() { return -1; }
70+
let topic_str = unsafe { CStr::from_ptr(topic).to_str() }.unwrap_or("");
71+
match partition_consumer(unsafe { &*client }, topic_str, partition) {
72+
Ok(consumer) => { unsafe { *out_consumer = Box::into_raw(consumer); } 0 }
73+
Err(_) => -1,
74+
}
75+
}
76+
77+
#[unsafe(no_mangle)]
78+
pub extern "C" fn fluvio_c_consumer_stream(consumer: *mut FluvioConsumer, offset_index: i64, out_stream: *mut *mut FluvioStream) -> i32 {
79+
if consumer.is_null() || out_stream.is_null() { return -1; }
80+
match consumer_stream(unsafe { &*consumer }, offset_index) {
81+
Ok(stream) => { unsafe { *out_stream = Box::into_raw(stream); } 0 }
82+
Err(_) => -1,
83+
}
84+
}
85+
86+
#[unsafe(no_mangle)]
87+
pub extern "C" fn fluvio_c_stream_next(stream: *mut FluvioStream, out_record: *mut *mut FluvioRecord) -> i32 {
88+
if stream.is_null() || out_record.is_null() { return -1; }
89+
match stream_next(unsafe { &mut *stream }) {
90+
Ok(record) => { unsafe { *out_record = Box::into_raw(record); } 0 }
91+
Err(_) => -1,
92+
}
93+
}
94+
95+
#[unsafe(no_mangle)]
96+
pub extern "C" fn fluvio_c_record_value(record: *mut FluvioRecord, out_buf: *mut *const u8, out_len: *mut usize) -> i32 {
97+
if record.is_null() || out_buf.is_null() || out_len.is_null() { return -1; }
98+
let val = unsafe { &*record }.inner.value();
99+
unsafe { *out_buf = val.as_ptr(); *out_len = val.len(); }
100+
0
101+
}
102+
103+
#[unsafe(no_mangle)]
104+
pub extern "C" fn fluvio_c_record_free(rec: *mut FluvioRecord) {
105+
if !rec.is_null() { unsafe { let _ = Box::from_raw(rec); } }
106+
}
107+
108+
#[unsafe(no_mangle)]
109+
pub extern "C" fn fluvio_c_stream_free(stream: *mut FluvioStream) {
110+
if !stream.is_null() { unsafe { let _ = Box::from_raw(stream); } }
111+
}
112+
113+
#[unsafe(no_mangle)]
114+
pub extern "C" fn fluvio_c_consumer_free(consumer: *mut FluvioConsumer) {
115+
if !consumer.is_null() { unsafe { let _ = Box::from_raw(consumer); } }
116+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod config;
44
pub mod consumer;
55
pub mod produce_output;
66
pub mod producer;
7+
pub mod c_api;
78

89
#[cxx::bridge]
910
mod ffi {

tests/test_c.c

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#include "fluvio.h"
2+
#include <stdio.h>
3+
#include <string.h>
4+
5+
int main() {
6+
printf("Starting C Test...\n");
7+
8+
fluvio_client_t* client = NULL;
9+
if (fluvio_c_connect(&client) != 0) {
10+
printf("Failed to connect\n");
11+
return 1;
12+
}
13+
14+
fluvio_topic_producer_t* producer = NULL;
15+
if (fluvio_c_create_producer(client, "test-topic", &producer) != 0) {
16+
printf("Failed to create producer\n");
17+
return 1;
18+
}
19+
20+
const char* val = "C_PAYLOAD";
21+
fluvio_produce_output_t* out = NULL;
22+
if (fluvio_c_producer_send(producer, NULL, 0, (const uint8_t*)val, strlen(val), &out) != 0) {
23+
printf("Failed to send\n");
24+
return 1;
25+
}
26+
27+
fluvio_c_produce_output_wait(out);
28+
fluvio_c_produce_output_free(out);
29+
fluvio_c_producer_flush(producer);
30+
31+
fluvio_partition_consumer_t* consumer = NULL;
32+
if (fluvio_c_partition_consumer(client, "test-topic", 0, &consumer) != 0) {
33+
printf("Failed to create consumer\n");
34+
return 1;
35+
}
36+
37+
fluvio_stream_t* stream = NULL;
38+
if (fluvio_c_consumer_stream(consumer, 0, &stream) != 0) {
39+
printf("Failed to create stream\n");
40+
return 1;
41+
}
42+
43+
fluvio_record_t* record = NULL;
44+
if (fluvio_c_stream_next(stream, &record) == 0) {
45+
const uint8_t* buf = NULL;
46+
size_t len = 0;
47+
fluvio_c_record_value(record, &buf, &len);
48+
printf("Received payload length: %zu\n", len);
49+
fluvio_c_record_free(record);
50+
}
51+
52+
fluvio_c_stream_free(stream);
53+
fluvio_c_consumer_free(consumer);
54+
fluvio_c_producer_free(producer);
55+
fluvio_c_client_free(client);
56+
57+
printf("C Test Passed!\n");
58+
return 0;
59+
}

0 commit comments

Comments
 (0)