Skip to content

Commit d190da7

Browse files
Merge pull request #1 from stefanDeveloper/add-auth
Add authentication
2 parents 2235dda + c97a6b9 commit d190da7

11 files changed

Lines changed: 298 additions & 63 deletions

File tree

.github/workflows/ci_e2e.yml

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
name: CI E2E Auth
2+
3+
on:
4+
push:
5+
branches: [ main ]
6+
pull_request:
7+
branches: [ main ]
8+
9+
jobs:
10+
auth_e2e_matrix:
11+
runs-on: ubuntu-latest
12+
steps:
13+
- uses: actions/checkout@v3
14+
15+
- name: Install Rust
16+
uses: actions-rs/toolchain@v1
17+
with:
18+
toolchain: stable
19+
override: true
20+
21+
- name: Install Core Dependencies
22+
run: |
23+
sudo apt-get update
24+
sudo apt-get install -y cmake g++ libssl-dev pkg-config openssl
25+
26+
- name: Install Fluvio Local Cluster
27+
run: |
28+
curl -fsS https://raw.githubusercontent.com/fluvio-community/fluvio/master/install.sh | FVM_VERSION=dev bash
29+
echo "$HOME/.fluvio/bin" >> $GITHUB_PATH
30+
31+
- name: Generate mTLS Evaluation Certificates
32+
run: |
33+
mkdir -p /tmp/certs && cd /tmp/certs
34+
openssl req -x509 -new -nodes -newkey rsa:2048 -keyout ca.key -out ca.crt -days 3650 -subj '/CN=fluvio-ca' -extensions v3_ca -config <(printf "[req]\ndistinguished_name=dn\n[dn]\n[v3_ca]\nbasicConstraints=CA:TRUE\nkeyUsage=keyCertSign,cRLSign")
35+
openssl req -new -newkey rsa:2048 -nodes -keyout server.key -out server.csr -subj '/CN=localhost'
36+
openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 365 -extfile <(printf "subjectAltName=DNS:localhost,DNS:custom-spu-5001.localhost,IP:127.0.0.1\nbasicConstraints=CA:FALSE\nextendedKeyUsage=serverAuth")
37+
openssl req -new -newkey rsa:2048 -nodes -keyout client.key -out client.csr -subj '/CN=fluvio-client'
38+
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 365 -extfile <(printf "subjectAltName=DNS:localhost,IP:127.0.0.1\nbasicConstraints=CA:FALSE\nextendedKeyUsage=clientAuth")
39+
40+
- name: Start Authenticated Local TLS Cluster
41+
run: |
42+
fluvio cluster start --local --tls --server-cert /tmp/certs/server.crt --server-key /tmp/certs/server.key --client-cert /tmp/certs/client.crt --client-key /tmp/certs/client.key --ca-cert /tmp/certs/ca.crt --domain localhost
43+
44+
- name: Dynamically Build C++ Drivers representing E2E Target
45+
run: |
46+
cmake -B build
47+
cmake --build build
48+
49+
- name: Execute Strict mTLS Validation Suite
50+
run: |
51+
export FLUVIO_E2E_TLS_DOMAIN="localhost"
52+
export FLUVIO_E2E_TLS_KEY="/tmp/certs/client.key"
53+
export FLUVIO_E2E_TLS_CERT="/tmp/certs/client.crt"
54+
export FLUVIO_E2E_TLS_CA="/tmp/certs/ca.crt"
55+
fluvio topic create test-auth-topic || true
56+
cd build
57+
ctest --output-on-failure -R fluvio_auth_test

CMakeLists.txt

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ execute_process(
1515

1616
set(RUST_TARGET_DIR "target/debug")
1717
file(GLOB_RECURSE CXXBRIDGE_HEADERS "${RUST_TARGET_DIR}/build/*lib.rs.h")
18-
file(GLOB_RECURSE CXXBRIDGE_SOURCES "${RUST_TARGET_DIR}/build/*lib.rs.cc")
18+
file(GLOB_RECURSE CXXBRIDGE_SOURCES_ALL "${RUST_TARGET_DIR}/build/*lib.rs.cc")
1919
file(GLOB_RECURSE CXX_CORE_HEADER "${RUST_TARGET_DIR}/build/*/cxx.h")
20+
file(GLOB RUST_STATIC_LIB "${RUST_TARGET_DIR}/libfluvio*.a")
21+
list(GET RUST_STATIC_LIB 0 FLUVIO_STATIC_LIB)
2022

23+
list(GET CXXBRIDGE_SOURCES_ALL 0 CXXBRIDGE_SOURCES)
2124
list(GET CXXBRIDGE_HEADERS 0 CXXBRIDGE_HEADER)
2225
get_filename_component(CXXBRIDGE_HDR_DIR ${CXXBRIDGE_HEADER} DIRECTORY)
2326
list(GET CXX_CORE_HEADER 0 CXX_CORE_HDR)
@@ -31,7 +34,7 @@ target_include_directories(test_producer PRIVATE
3134
${CXX_ROOT_DIR}
3235
)
3336
target_link_libraries(test_producer PRIVATE
34-
${CMAKE_CURRENT_SOURCE_DIR}/${RUST_TARGET_DIR}/libfluvio_client_cpp.a
37+
${FLUVIO_STATIC_LIB}
3538
pthread dl m
3639
)
3740
add_test(NAME fluvio_producer_test COMMAND test_producer)
@@ -43,7 +46,7 @@ target_include_directories(test_consumer PRIVATE
4346
${CXX_ROOT_DIR}
4447
)
4548
target_link_libraries(test_consumer PRIVATE
46-
${CMAKE_CURRENT_SOURCE_DIR}/${RUST_TARGET_DIR}/libfluvio_client_cpp.a
49+
${FLUVIO_STATIC_LIB}
4750
pthread dl m
4851
)
4952
add_test(NAME fluvio_consumer_test COMMAND test_consumer)
@@ -52,7 +55,7 @@ add_test(NAME fluvio_consumer_test COMMAND test_consumer)
5255
add_executable(test_admin tests/test_admin.cpp ${CXXBRIDGE_SOURCES})
5356
target_include_directories(test_admin PRIVATE ${CXXBRIDGE_HDR_DIR}/../.. ${CXX_ROOT_DIR})
5457
target_link_libraries(test_admin PRIVATE
55-
${CMAKE_CURRENT_SOURCE_DIR}/${RUST_TARGET_DIR}/libfluvio_client_cpp.a
58+
${FLUVIO_STATIC_LIB}
5659
pthread dl m
5760
)
5861
add_test(NAME fluvio_admin_test COMMAND test_admin)
@@ -61,7 +64,7 @@ add_test(NAME fluvio_admin_test COMMAND test_admin)
6164
add_executable(test_config tests/test_config.cpp ${CXXBRIDGE_SOURCES})
6265
target_include_directories(test_config PRIVATE ${CXXBRIDGE_HDR_DIR}/../.. ${CXX_ROOT_DIR})
6366
target_link_libraries(test_config PRIVATE
64-
${CMAKE_CURRENT_SOURCE_DIR}/${RUST_TARGET_DIR}/libfluvio_client_cpp.a
67+
${FLUVIO_STATIC_LIB}
6568
pthread dl m
6669
)
6770
add_test(NAME fluvio_config_test COMMAND test_config)
@@ -70,7 +73,25 @@ add_test(NAME fluvio_config_test COMMAND test_config)
7073
add_executable(test_c tests/test_c.c)
7174
target_include_directories(test_c PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include)
7275
target_link_libraries(test_c PRIVATE
73-
${CMAKE_CURRENT_SOURCE_DIR}/${RUST_TARGET_DIR}/libfluvio_client_cpp.a
76+
${FLUVIO_STATIC_LIB}
7477
stdc++ pthread dl m
7578
)
7679
add_test(NAME fluvio_c_native_test COMMAND test_c)
80+
81+
# Auth E2E CXX Test
82+
add_executable(test_auth tests/test_auth.cpp ${CXXBRIDGE_SOURCES})
83+
target_include_directories(test_auth PRIVATE ${CXXBRIDGE_HDR_DIR}/../.. ${CXX_ROOT_DIR})
84+
target_link_libraries(test_auth PRIVATE
85+
${FLUVIO_STATIC_LIB}
86+
pthread dl m
87+
)
88+
add_test(NAME fluvio_auth_test COMMAND test_auth)
89+
90+
# Auth E2E C Native Test
91+
add_executable(test_auth_c tests/test_auth_c.c)
92+
target_include_directories(test_auth_c PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include)
93+
target_link_libraries(test_auth_c PRIVATE
94+
${FLUVIO_STATIC_LIB}
95+
stdc++ pthread dl m
96+
)
97+
add_test(NAME fluvio_auth_c_test COMMAND test_auth_c)

include/fluvio.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,21 @@ extern "C" {
1111
// Opaque types
1212
typedef struct fluvio_client_opaque fluvio_client_t;
1313
typedef struct fluvio_producer_opaque fluvio_topic_producer_t;
14-
typedef struct fluvio_consumer_opaque fluvio_partition_consumer_t;
1514
typedef struct fluvio_produce_output_opaque fluvio_produce_output_t;
1615
typedef struct fluvio_stream_opaque fluvio_stream_t;
1716
typedef struct fluvio_record_opaque fluvio_record_t;
17+
typedef struct fluvio_config_opaque fluvio_config_t;
1818

1919
// Client
20+
int fluvio_c_config_load(fluvio_config_t** out_config);
21+
void fluvio_c_config_set_endpoint(fluvio_config_t* config, const char* endpoint);
22+
void fluvio_c_config_set_client_id(fluvio_config_t* config, const char* client_id);
23+
void fluvio_c_config_disable_tls(fluvio_config_t* config);
24+
void fluvio_c_config_set_anonymous_tls(fluvio_config_t* config);
25+
void fluvio_c_config_set_inline_tls(fluvio_config_t* config, const char* domain, const char* key, const char* cert, const char* ca_cert);
26+
void fluvio_c_config_set_tls_file_paths(fluvio_config_t* config, const char* domain, const char* key_path, const char* cert_path, const char* ca_cert_path);
2027
int fluvio_c_connect(fluvio_client_t** out_client);
28+
int fluvio_c_connect_with_config(fluvio_config_t* config, fluvio_client_t** out_client);
2129
void fluvio_c_client_free(fluvio_client_t* client);
2230

2331
// Producer
@@ -29,13 +37,11 @@ void fluvio_c_producer_free(fluvio_topic_producer_t* producer);
2937
void fluvio_c_produce_output_free(fluvio_produce_output_t* out);
3038

3139
// Consumer
32-
int fluvio_c_partition_consumer(fluvio_client_t* client, const char* topic, uint32_t partition, fluvio_partition_consumer_t** out_consumer);
33-
int fluvio_c_consumer_stream(fluvio_partition_consumer_t* consumer, int64_t offset_index, fluvio_stream_t** out_stream);
40+
int fluvio_c_consumer_stream(fluvio_client_t* client, const char* topic, uint32_t partition, int64_t offset_index, fluvio_stream_t** out_stream);
3441
int fluvio_c_stream_next(fluvio_stream_t* stream, fluvio_record_t** out_record);
3542
int fluvio_c_record_value(fluvio_record_t* record, const uint8_t** out_buf, size_t* out_len);
3643
void fluvio_c_record_free(fluvio_record_t* record);
3744
void fluvio_c_stream_free(fluvio_stream_t* stream);
38-
void fluvio_c_consumer_free(fluvio_partition_consumer_t* consumer);
3945

4046
#ifdef __cplusplus
4147
}

src/c_api.rs

Lines changed: 82 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
11
use crate::client::{FluvioClient, fluvio_connect};
22
use crate::producer::{FluvioProducer, create_producer, producer_send, producer_flush};
3-
use crate::consumer::{FluvioConsumer, FluvioStream, FluvioRecord, partition_consumer, consumer_stream, stream_next};
3+
use crate::consumer::{FluvioStream, FluvioRecord, consumer_stream, stream_next};
44
use crate::produce_output::{FluvioProduceOutput, produce_output_wait};
5+
use crate::config::{FluvioConfigWrapper, fluvio_config_load};
56
use std::os::raw::c_char;
67
use std::ffi::CStr;
78

9+
#[repr(C)]
10+
pub struct fluvio_config_t {
11+
_private: [u8; 0],
12+
}
13+
814
#[unsafe(no_mangle)]
915
pub extern "C" fn fluvio_c_connect(out_client: *mut *mut FluvioClient) -> i32 {
1016
match fluvio_connect() {
@@ -16,6 +22,19 @@ pub extern "C" fn fluvio_c_connect(out_client: *mut *mut FluvioClient) -> i32 {
1622
}
1723
}
1824

25+
#[unsafe(no_mangle)]
26+
pub unsafe extern "C" fn fluvio_c_connect_with_config(config: *mut fluvio_config_t, out_client: *mut *mut FluvioClient) -> i32 {
27+
if config.is_null() || out_client.is_null() { return -1; }
28+
let config_wrapper = &mut *(config as *mut FluvioConfigWrapper);
29+
match crate::client::fluvio_connect_with_config(config_wrapper) {
30+
Ok(client) => {
31+
unsafe { *out_client = Box::into_raw(client); }
32+
0
33+
}
34+
Err(_) => -1,
35+
}
36+
}
37+
1938
#[unsafe(no_mangle)]
2039
pub extern "C" fn fluvio_c_client_free(client: *mut FluvioClient) {
2140
if !client.is_null() { unsafe { let _ = Box::from_raw(client); } }
@@ -65,19 +84,73 @@ pub extern "C" fn fluvio_c_produce_output_free(out: *mut FluvioProduceOutput) {
6584
}
6685

6786
#[unsafe(no_mangle)]
68-
pub extern "C" fn fluvio_c_partition_consumer(client: *mut FluvioClient, topic: *const c_char, partition: u32, out_consumer: *mut *mut FluvioConsumer) -> i32 {
69-
if client.is_null() || topic.is_null() || out_consumer.is_null() { return -1; }
70-
let topic_str = unsafe { CStr::from_ptr(topic).to_str() }.unwrap_or("");
71-
match partition_consumer(unsafe { &*client }, topic_str, partition) {
72-
Ok(consumer) => { unsafe { *out_consumer = Box::into_raw(consumer); } 0 }
87+
pub extern "C" fn fluvio_c_config_load(out_config: *mut *mut fluvio_config_t) -> i32 {
88+
if out_config.is_null() { return -1; }
89+
match fluvio_config_load() {
90+
Ok(config) => { unsafe { *out_config = Box::into_raw(config) as *mut fluvio_config_t; } 0 }
7391
Err(_) => -1,
7492
}
7593
}
7694

7795
#[unsafe(no_mangle)]
78-
pub extern "C" fn fluvio_c_consumer_stream(consumer: *mut FluvioConsumer, offset_index: i64, out_stream: *mut *mut FluvioStream) -> i32 {
79-
if consumer.is_null() || out_stream.is_null() { return -1; }
80-
match consumer_stream(unsafe { &*consumer }, offset_index) {
96+
pub unsafe extern "C" fn fluvio_c_config_set_endpoint(config: *mut fluvio_config_t, endpoint: *const std::ffi::c_char) {
97+
if config.is_null() || endpoint.is_null() { return; }
98+
let config_wrapper = &mut *(config as *mut FluvioConfigWrapper);
99+
let ep_str = std::ffi::CStr::from_ptr(endpoint).to_string_lossy();
100+
crate::config::fluvio_config_set_endpoint(config_wrapper, &ep_str);
101+
}
102+
103+
#[unsafe(no_mangle)]
104+
pub unsafe extern "C" fn fluvio_c_config_set_client_id(config: *mut fluvio_config_t, client_id: *const std::ffi::c_char) {
105+
if config.is_null() || client_id.is_null() { return; }
106+
let config_wrapper = &mut *(config as *mut FluvioConfigWrapper);
107+
let client_id_str = std::ffi::CStr::from_ptr(client_id).to_string_lossy();
108+
crate::config::fluvio_config_set_client_id(config_wrapper, &client_id_str);
109+
}
110+
111+
#[unsafe(no_mangle)]
112+
pub unsafe extern "C" fn fluvio_c_config_disable_tls(config: *mut fluvio_config_t) {
113+
if config.is_null() { return; }
114+
let config_wrapper = &mut *(config as *mut FluvioConfigWrapper);
115+
crate::config::fluvio_config_disable_tls(config_wrapper);
116+
}
117+
118+
#[unsafe(no_mangle)]
119+
pub unsafe extern "C" fn fluvio_c_config_set_anonymous_tls(config: *mut fluvio_config_t) {
120+
if config.is_null() { return; }
121+
let config_wrapper = &mut *(config as *mut FluvioConfigWrapper);
122+
crate::config::fluvio_config_set_anonymous_tls(config_wrapper);
123+
}
124+
125+
#[unsafe(no_mangle)]
126+
pub unsafe extern "C" fn fluvio_c_config_set_inline_tls(config: *mut fluvio_config_t, domain: *const std::ffi::c_char, key: *const std::ffi::c_char, cert: *const std::ffi::c_char, ca_cert: *const std::ffi::c_char) {
127+
if config.is_null() || domain.is_null() || key.is_null() || cert.is_null() || ca_cert.is_null() { return; }
128+
let config_wrapper = &mut *(config as *mut FluvioConfigWrapper);
129+
crate::config::fluvio_config_set_inline_tls(config_wrapper,
130+
&std::ffi::CStr::from_ptr(domain).to_string_lossy(),
131+
&std::ffi::CStr::from_ptr(key).to_string_lossy(),
132+
&std::ffi::CStr::from_ptr(cert).to_string_lossy(),
133+
&std::ffi::CStr::from_ptr(ca_cert).to_string_lossy(),
134+
);
135+
}
136+
137+
#[unsafe(no_mangle)]
138+
pub unsafe extern "C" fn fluvio_c_config_set_tls_file_paths(config: *mut fluvio_config_t, domain: *const std::ffi::c_char, key_path: *const std::ffi::c_char, cert_path: *const std::ffi::c_char, ca_cert_path: *const std::ffi::c_char) {
139+
if config.is_null() || domain.is_null() || key_path.is_null() || cert_path.is_null() || ca_cert_path.is_null() { return; }
140+
let config_wrapper = &mut *(config as *mut FluvioConfigWrapper);
141+
crate::config::fluvio_config_set_tls_file_paths(config_wrapper,
142+
&std::ffi::CStr::from_ptr(domain).to_string_lossy(),
143+
&std::ffi::CStr::from_ptr(key_path).to_string_lossy(),
144+
&std::ffi::CStr::from_ptr(cert_path).to_string_lossy(),
145+
&std::ffi::CStr::from_ptr(ca_cert_path).to_string_lossy(),
146+
);
147+
}
148+
149+
#[unsafe(no_mangle)]
150+
pub extern "C" fn fluvio_c_consumer_stream(client: *mut FluvioClient, topic: *const c_char, partition: u32, offset_index: i64, out_stream: *mut *mut FluvioStream) -> i32 {
151+
if client.is_null() || topic.is_null() || out_stream.is_null() { return -1; }
152+
let topic_str = unsafe { CStr::from_ptr(topic).to_str() }.unwrap_or("");
153+
match consumer_stream(unsafe { &*client }, topic_str, partition, offset_index) {
81154
Ok(stream) => { unsafe { *out_stream = Box::into_raw(stream); } 0 }
82155
Err(_) => -1,
83156
}
@@ -110,7 +183,3 @@ pub extern "C" fn fluvio_c_stream_free(stream: *mut FluvioStream) {
110183
if !stream.is_null() { unsafe { let _ = Box::from_raw(stream); } }
111184
}
112185

113-
#[unsafe(no_mangle)]
114-
pub extern "C" fn fluvio_c_consumer_free(consumer: *mut FluvioConsumer) {
115-
if !consumer.is_null() { unsafe { let _ = Box::from_raw(consumer); } }
116-
}

src/config.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,29 @@ pub fn fluvio_config_set_endpoint(c: &mut FluvioConfigWrapper, endpoint: &str) {
4545
pub fn fluvio_config_set_client_id(c: &mut FluvioConfigWrapper, client_id: &str) {
4646
c.inner.client_id = Some(client_id.to_string());
4747
}
48+
49+
pub fn fluvio_config_disable_tls(c: &mut FluvioConfigWrapper) {
50+
c.inner.tls = fluvio::config::TlsPolicy::Disabled;
51+
}
52+
53+
pub fn fluvio_config_set_anonymous_tls(c: &mut FluvioConfigWrapper) {
54+
c.inner.tls = fluvio::config::TlsPolicy::Anonymous;
55+
}
56+
57+
pub fn fluvio_config_set_inline_tls(c: &mut FluvioConfigWrapper, domain: &str, key: &str, cert: &str, ca_cert: &str) {
58+
c.inner.tls = fluvio::config::TlsPolicy::Verified(fluvio::config::TlsConfig::Inline(fluvio::config::TlsCerts {
59+
domain: domain.to_string(),
60+
key: key.to_string(),
61+
cert: cert.to_string(),
62+
ca_cert: ca_cert.to_string(),
63+
}));
64+
}
65+
66+
pub fn fluvio_config_set_tls_file_paths(c: &mut FluvioConfigWrapper, domain: &str, key_path: &str, cert_path: &str, ca_cert_path: &str) {
67+
c.inner.tls = fluvio::config::TlsPolicy::Verified(fluvio::config::TlsConfig::Files(fluvio::config::TlsPaths {
68+
domain: domain.to_string(),
69+
key: std::path::PathBuf::from(key_path),
70+
cert: std::path::PathBuf::from(cert_path),
71+
ca_cert: std::path::PathBuf::from(ca_cert_path),
72+
}));
73+
}

src/consumer.rs

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,27 @@
1-
use fluvio::{PartitionConsumer, Offset, consumer::Record as NativeRecord};
1+
use fluvio::{Offset, consumer::Record as NativeRecord, consumer::ConsumerConfigExtBuilder};
22
use fluvio_future::task::run_block_on;
33
use futures_util::stream::StreamExt;
44
use futures_util::stream::Stream;
55
use std::pin::Pin;
66
use fluvio::dataplane::link::ErrorCode;
7-
use crate::config::ConsumerConfigWrapper;
87
use crate::client::FluvioClient;
98

10-
pub struct FluvioConsumer { pub inner: PartitionConsumer }
119
pub struct FluvioRecord { pub inner: NativeRecord }
1210

13-
type PartitionConsumerStreamInner = Pin<Box<dyn Stream<Item = Result<NativeRecord, ErrorCode>> + Send>>;
14-
pub struct FluvioStream { pub inner: PartitionConsumerStreamInner }
11+
type ConsumerStreamInner = Pin<Box<dyn Stream<Item = Result<NativeRecord, ErrorCode>> + Send>>;
12+
pub struct FluvioStream { pub inner: ConsumerStreamInner }
1513

16-
pub fn partition_consumer(client: &FluvioClient, topic: &str, partition: u32) -> Result<Box<FluvioConsumer>, String> {
17-
run_block_on(client.inner.partition_consumer(topic, partition)).map(|c| Box::new(FluvioConsumer { inner: c })).map_err(|e| e.to_string())
18-
}
19-
20-
#[allow(deprecated)]
21-
pub fn consumer_with_config(client: &FluvioClient, topic: &str, partition: u32, config: &ConsumerConfigWrapper) -> Result<Box<FluvioStream>, String> {
22-
let consumer = run_block_on(client.inner.partition_consumer(topic, partition)).map_err(|e| e.to_string())?;
23-
let built_config = config.inner.build().map_err(|e| e.to_string())?;
24-
run_block_on(consumer.stream_with_config(Offset::beginning(), built_config))
25-
.map(|s| Box::new(FluvioStream { inner: Box::pin(s) }))
26-
.map_err(|e| e.to_string())
27-
}
28-
29-
#[allow(deprecated)]
30-
pub fn consumer_stream(consumer: &FluvioConsumer, offset_index: i64) -> Result<Box<FluvioStream>, String> {
14+
pub fn consumer_stream(client: &FluvioClient, topic: &str, partition: u32, offset_index: i64) -> Result<Box<FluvioStream>, String> {
3115
let offset = if offset_index == -1 { Offset::end() } else if offset_index == 0 { Offset::beginning() } else { Offset::absolute(offset_index).unwrap() };
32-
run_block_on(consumer.inner.stream(offset)).map(|s| Box::new(FluvioStream { inner: Box::pin(s) })).map_err(|e| e.to_string())
16+
let config = ConsumerConfigExtBuilder::default()
17+
.topic(topic.to_string())
18+
.partition(partition)
19+
.offset_start(offset)
20+
.build()
21+
.map_err(|e| e.to_string())?;
22+
23+
let consumer_stream = run_block_on(client.inner.consumer_with_config(config)).map_err(|e| e.to_string())?;
24+
Ok(Box::new(FluvioStream { inner: Box::pin(consumer_stream) }))
3325
}
3426

3527
pub fn stream_next(stream: &mut FluvioStream) -> Result<Box<FluvioRecord>, String> {

0 commit comments

Comments
 (0)