@@ -15,6 +15,7 @@ extern crate libc;
1515
1616use arrow_ipc:: { reader:: StreamReader , writer:: StreamWriter , CompressionType } ;
1717use async_trait:: async_trait;
18+ use bytes:: Bytes ;
1819use databricks_zerobus_ingest_sdk:: databricks:: zerobus:: RecordType ;
1920use databricks_zerobus_ingest_sdk:: {
2021 EncodedRecord , HeadersProvider , NoTlsConfig , ZerobusError , ZerobusResult , ZerobusSdk ,
@@ -341,6 +342,8 @@ pub extern "C" fn zerobus_arrow_stream_free(stream: *mut CArrowStream) {
341342/// Ingests one Arrow RecordBatch supplied as Arrow IPC stream bytes.
342343///
343344/// `ipc_bytes` must be a valid Arrow IPC stream (schema + one record batch).
345+ /// Uses the zero-copy path (`ingest_ipc_batch`). If the stream was created with
346+ /// IPC compression, use `zerobus_arrow_stream_ingest_batch_via_record_batch` instead.
344347/// Returns the logical offset assigned to this batch, or -1 on error.
345348#[ no_mangle]
346349pub extern "C" fn zerobus_arrow_stream_ingest_batch (
@@ -365,8 +368,9 @@ pub extern "C" fn zerobus_arrow_stream_ingest_batch(
365368 let bytes = unsafe { std:: slice:: from_raw_parts ( ipc_bytes, ipc_len) } ;
366369
367370 let offset_res = RUNTIME . block_on ( async {
368- let batch = ipc_bytes_to_record_batch ( bytes) ?;
369- stream_ref. ingest_batch ( batch) . await
371+ stream_ref
372+ . ingest_ipc_batch ( Bytes :: copy_from_slice ( bytes) )
373+ . await
370374 } ) ;
371375
372376 match offset_res {
@@ -385,6 +389,64 @@ pub extern "C" fn zerobus_arrow_stream_ingest_batch(
385389 }
386390}
387391
392+ /// Ingests one Arrow RecordBatch supplied as Arrow IPC stream bytes, deserializing
393+ /// to a `RecordBatch` first so that any configured `ipc_compression` is applied.
394+ ///
395+ /// Use this instead of `zerobus_arrow_stream_ingest_batch` when the stream was created
396+ /// with `LZ4_FRAME` or `ZSTD` compression.
397+ /// Returns the logical offset assigned to this batch, or -1 on error.
398+ #[ no_mangle]
399+ pub extern "C" fn zerobus_arrow_stream_ingest_batch_via_record_batch (
400+ stream : * mut CArrowStream ,
401+ ipc_bytes : * const u8 ,
402+ ipc_len : usize ,
403+ result : * mut CResult ,
404+ ) -> i64 {
405+ if ipc_bytes. is_null ( ) || ipc_len == 0 {
406+ write_error_result ( result, "IPC bytes are required" , false ) ;
407+ return -1 ;
408+ }
409+
410+ let stream_ref = match validate_arrow_stream_ptr ( stream) {
411+ Ok ( s) => s,
412+ Err ( msg) => {
413+ write_error_result ( result, msg, false ) ;
414+ return -1 ;
415+ }
416+ } ;
417+
418+ let bytes = unsafe { std:: slice:: from_raw_parts ( ipc_bytes, ipc_len) } ;
419+
420+ let batch = match ipc_bytes_to_record_batch ( bytes) {
421+ Ok ( b) => b,
422+ Err ( e) => {
423+ if !result. is_null ( ) {
424+ unsafe {
425+ * result = CResult :: error ( e) ;
426+ }
427+ }
428+ return -1 ;
429+ }
430+ } ;
431+
432+ let offset_res = RUNTIME . block_on ( async { stream_ref. ingest_batch ( batch) . await } ) ;
433+
434+ match offset_res {
435+ Ok ( offset) => {
436+ write_success_result ( result) ;
437+ offset
438+ }
439+ Err ( err) => {
440+ if !result. is_null ( ) {
441+ unsafe {
442+ * result = CResult :: error ( err) ;
443+ }
444+ }
445+ -1
446+ }
447+ }
448+ }
449+
388450/// Waits until the server acknowledges the batch at the given logical offset.
389451#[ no_mangle]
390452pub extern "C" fn zerobus_arrow_stream_wait_for_offset (
@@ -1068,8 +1130,8 @@ pub extern "C" fn zerobus_sdk_create_stream(
10681130
10691131 let stream = builder. build ( ) . await . map_err ( |e| e. to_string ( ) ) ?;
10701132
1071- let boxed = Box :: new ( stream) ;
1072- Ok :: < * mut CZerobusStream , String > ( Box :: into_raw ( boxed ) as * mut CZerobusStream )
1133+ let arc = Arc :: new ( stream) ;
1134+ Ok :: < * mut CZerobusStream , String > ( Arc :: into_raw ( arc ) as * mut CZerobusStream )
10731135 } ) ;
10741136
10751137 match res {
@@ -1148,8 +1210,8 @@ pub extern "C" fn zerobus_sdk_create_stream_with_headers_provider(
11481210
11491211 let stream = builder. build ( ) . await . map_err ( |e| e. to_string ( ) ) ?;
11501212
1151- let boxed = Box :: new ( stream) ;
1152- Ok :: < * mut CZerobusStream , String > ( Box :: into_raw ( boxed ) as * mut CZerobusStream )
1213+ let arc = Arc :: new ( stream) ;
1214+ Ok :: < * mut CZerobusStream , String > ( Arc :: into_raw ( arc ) as * mut CZerobusStream )
11531215 } ) ;
11541216
11551217 match res {
@@ -1169,7 +1231,9 @@ pub extern "C" fn zerobus_sdk_create_stream_with_headers_provider(
11691231pub extern "C" fn zerobus_stream_free ( stream : * mut CZerobusStream ) {
11701232 if !stream. is_null ( ) {
11711233 unsafe {
1172- let _ = Box :: from_raw ( stream as * mut ZerobusStream ) ;
1234+ // Reconstruct the Arc and drop it. If nowait tasks still hold clones,
1235+ // the stream is not freed until the last Arc is dropped.
1236+ let _ = Arc :: from_raw ( stream as * const ZerobusStream ) ;
11731237 }
11741238 }
11751239}
@@ -1408,6 +1472,193 @@ pub extern "C" fn zerobus_stream_ingest_json_records(
14081472 }
14091473}
14101474
1475+ /// Clones the `Arc<ZerobusStream>` from a raw `CZerobusStream` pointer without
1476+ /// consuming the pointer. The caller retains ownership of the original pointer;
1477+ /// the returned `Arc` will keep the stream alive until it is dropped.
1478+ ///
1479+ /// # Safety
1480+ /// `stream` must be a non-null pointer produced by `zerobus_sdk_create_stream` or
1481+ /// `zerobus_sdk_create_stream_with_headers_provider` and must not have been freed.
1482+ unsafe fn clone_stream_arc ( stream : * mut CZerobusStream ) -> Arc < ZerobusStream > {
1483+ Arc :: increment_strong_count ( stream as * const ZerobusStream ) ;
1484+ Arc :: from_raw ( stream as * const ZerobusStream )
1485+ }
1486+
1487+ /// Ingest a protobuf record without waiting for the record to be queued (fire-and-forget).
1488+ ///
1489+ /// Spawns a background task to queue the record and returns immediately.
1490+ /// The result only reflects argument validation errors; ingestion errors are silently ignored.
1491+ ///
1492+ /// # Safety
1493+ /// The stream must remain valid until all background tasks spawned by this function complete.
1494+ #[ no_mangle]
1495+ pub extern "C" fn zerobus_stream_ingest_proto_record_nowait (
1496+ stream : * mut CZerobusStream ,
1497+ data : * const u8 ,
1498+ data_len : usize ,
1499+ result : * mut CResult ,
1500+ ) {
1501+ if data. is_null ( ) {
1502+ write_error_result ( result, "Invalid data pointer" , false ) ;
1503+ return ;
1504+ }
1505+
1506+ if let Err ( msg) = validate_stream_ptr ( stream) {
1507+ write_error_result ( result, msg, false ) ;
1508+ return ;
1509+ }
1510+
1511+ let data_slice = unsafe { std:: slice:: from_raw_parts ( data, data_len) } ;
1512+ let data_vec = data_slice. to_vec ( ) ;
1513+ let stream_arc = unsafe { clone_stream_arc ( stream) } ;
1514+
1515+ RUNTIME . spawn ( async move {
1516+ let payload = EncodedRecord :: Proto ( data_vec) ;
1517+ let _ = stream_arc. ingest_record_offset ( payload) . await ;
1518+ } ) ;
1519+
1520+ write_success_result ( result) ;
1521+ }
1522+
1523+ /// Ingest a JSON record without waiting for the record to be queued (fire-and-forget).
1524+ ///
1525+ /// Spawns a background task to queue the record and returns immediately.
1526+ /// The result only reflects argument validation errors; ingestion errors are silently ignored.
1527+ ///
1528+ /// # Safety
1529+ /// The stream must remain valid until all background tasks spawned by this function complete.
1530+ #[ no_mangle]
1531+ pub extern "C" fn zerobus_stream_ingest_json_record_nowait (
1532+ stream : * mut CZerobusStream ,
1533+ json_data : * const c_char ,
1534+ result : * mut CResult ,
1535+ ) {
1536+ if let Err ( msg) = validate_stream_ptr ( stream) {
1537+ write_error_result ( result, msg, false ) ;
1538+ return ;
1539+ }
1540+
1541+ let json_str = match unsafe { c_str_to_string ( json_data) } {
1542+ Ok ( s) => s,
1543+ Err ( e) => {
1544+ write_error_result ( result, e, false ) ;
1545+ return ;
1546+ }
1547+ } ;
1548+
1549+ let stream_arc = unsafe { clone_stream_arc ( stream) } ;
1550+
1551+ RUNTIME . spawn ( async move {
1552+ let payload = EncodedRecord :: Json ( json_str) ;
1553+ let _ = stream_arc. ingest_record_offset ( payload) . await ;
1554+ } ) ;
1555+
1556+ write_success_result ( result) ;
1557+ }
1558+
1559+ /// Ingest a batch of protobuf records without waiting (fire-and-forget).
1560+ ///
1561+ /// Copies all record data before spawning the background task, so the caller's
1562+ /// memory is safe to release immediately after this function returns.
1563+ ///
1564+ /// # Safety
1565+ /// The stream must remain valid until all background tasks spawned by this function complete.
1566+ #[ no_mangle]
1567+ pub extern "C" fn zerobus_stream_ingest_proto_records_nowait (
1568+ stream : * mut CZerobusStream ,
1569+ records : * const * const u8 ,
1570+ record_lens : * const usize ,
1571+ num_records : usize ,
1572+ result : * mut CResult ,
1573+ ) {
1574+ if records. is_null ( ) || record_lens. is_null ( ) {
1575+ write_error_result ( result, "Invalid records pointer" , false ) ;
1576+ return ;
1577+ }
1578+
1579+ if let Err ( msg) = validate_stream_ptr ( stream) {
1580+ write_error_result ( result, msg, false ) ;
1581+ return ;
1582+ }
1583+
1584+ if num_records == 0 {
1585+ write_success_result ( result) ;
1586+ return ;
1587+ }
1588+
1589+ let records_vec: Vec < Vec < u8 > > = unsafe {
1590+ let records_slice = std:: slice:: from_raw_parts ( records, num_records) ;
1591+ let lens_slice = std:: slice:: from_raw_parts ( record_lens, num_records) ;
1592+ records_slice
1593+ . iter ( )
1594+ . zip ( lens_slice. iter ( ) )
1595+ . map ( |( ptr, len) | std:: slice:: from_raw_parts ( * ptr, * len) . to_vec ( ) )
1596+ . collect ( )
1597+ } ;
1598+
1599+ let stream_arc = unsafe { clone_stream_arc ( stream) } ;
1600+
1601+ RUNTIME . spawn ( async move {
1602+ let payloads: Vec < EncodedRecord > =
1603+ records_vec. into_iter ( ) . map ( EncodedRecord :: Proto ) . collect ( ) ;
1604+ let _ = stream_arc. ingest_records_offset ( payloads) . await ;
1605+ } ) ;
1606+
1607+ write_success_result ( result) ;
1608+ }
1609+
1610+ /// Ingest a batch of JSON records without waiting (fire-and-forget).
1611+ ///
1612+ /// Copies all strings before spawning the background task, so the caller's
1613+ /// memory is safe to release immediately after this function returns.
1614+ ///
1615+ /// # Safety
1616+ /// The stream must remain valid until all background tasks spawned by this function complete.
1617+ #[ no_mangle]
1618+ pub extern "C" fn zerobus_stream_ingest_json_records_nowait (
1619+ stream : * mut CZerobusStream ,
1620+ json_records : * const * const c_char ,
1621+ num_records : usize ,
1622+ result : * mut CResult ,
1623+ ) {
1624+ if json_records. is_null ( ) {
1625+ write_error_result ( result, "Invalid records pointer" , false ) ;
1626+ return ;
1627+ }
1628+
1629+ if let Err ( msg) = validate_stream_ptr ( stream) {
1630+ write_error_result ( result, msg, false ) ;
1631+ return ;
1632+ }
1633+
1634+ if num_records == 0 {
1635+ write_success_result ( result) ;
1636+ return ;
1637+ }
1638+
1639+ let json_vec: Result < Vec < String > , _ > = unsafe {
1640+ let json_slice = std:: slice:: from_raw_parts ( json_records, num_records) ;
1641+ json_slice. iter ( ) . map ( |ptr| c_str_to_string ( * ptr) ) . collect ( )
1642+ } ;
1643+
1644+ let json_vec = match json_vec {
1645+ Ok ( v) => v,
1646+ Err ( e) => {
1647+ write_error_result ( result, e, false ) ;
1648+ return ;
1649+ }
1650+ } ;
1651+
1652+ let stream_arc = unsafe { clone_stream_arc ( stream) } ;
1653+
1654+ RUNTIME . spawn ( async move {
1655+ let payloads: Vec < EncodedRecord > = json_vec. into_iter ( ) . map ( EncodedRecord :: Json ) . collect ( ) ;
1656+ let _ = stream_arc. ingest_records_offset ( payloads) . await ;
1657+ } ) ;
1658+
1659+ write_success_result ( result) ;
1660+ }
1661+
14111662/// Wait for a specific offset to be acknowledged by the server
14121663#[ no_mangle]
14131664pub extern "C" fn zerobus_stream_wait_for_offset (
0 commit comments