Skip to content

Commit cb4802a

Browse files
florin-akermannflorin.akermann
authored andcommitted
Refactor PartitionEOF to use TopicPartitionOffset struct
Replaced the `PartitionEOF` error variant to include detailed information by introducing the `TopicPartitionOffset` struct. This change provides more context by encapsulating topic name, partition, and offset details, improving error reporting and debugging.
1 parent f317bce commit cb4802a

5 files changed

Lines changed: 90 additions & 6 deletions

File tree

src/consumer/base_consumer.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::log::trace;
2525
use crate::message::{BorrowedMessage, Message};
2626
use crate::metadata::Metadata;
2727
use crate::topic_partition_list::{Offset, TopicPartitionList};
28-
use crate::util::{cstr_to_owned, NativePtr, Timeout};
28+
use crate::util::{cstr_to_owned, NativePtr, Timeout, TopicPartitionOffset};
2929

3030
/// A low-level consumer that requires manual polling.
3131
///
@@ -244,8 +244,18 @@ where
244244
if rdkafka_err == rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF {
245245
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
246246
let partition = unsafe { (*tp_ptr).partition };
247+
let topic = unsafe {
248+
CStr::from_ptr((*tp_ptr).topic)
249+
.to_string_lossy()
250+
.into_owned()
251+
};
252+
let offset = unsafe { (*tp_ptr).offset };
247253
unsafe { rdsys::rd_kafka_topic_partition_destroy(tp_ptr) };
248-
Some(KafkaError::PartitionEOF(partition))
254+
Some(KafkaError::PartitionEOF(TopicPartitionOffset{
255+
topic,
256+
partition,
257+
offset,
258+
}))
249259
} else if unsafe { rdsys::rd_kafka_event_error_is_fatal(event.ptr()) } != 0 {
250260
Some(KafkaError::MessageConsumptionFatal(rdkafka_err.into()))
251261
} else {

src/error.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::sync::Arc;
99
use rdkafka_sys as rdsys;
1010
use rdkafka_sys::types::*;
1111

12-
use crate::util::{KafkaDrop, NativePtr};
12+
use crate::util::{KafkaDrop, NativePtr, TopicPartitionOffset};
1313

1414
// Re-export rdkafka error code
1515
pub use rdsys::types::RDKafkaErrorCode;
@@ -170,7 +170,7 @@ pub enum KafkaError {
170170
/// Offset fetch failed.
171171
OffsetFetch(RDKafkaErrorCode),
172172
/// End of partition reached.
173-
PartitionEOF(i32),
173+
PartitionEOF(TopicPartitionOffset),
174174
/// Pause/Resume failed.
175175
PauseResume(String),
176176
/// Rebalance failed.

src/message.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use rdkafka_sys::types::*;
1414

1515
use crate::admin::NativeEvent;
1616
use crate::error::{IsError, KafkaError, KafkaResult};
17-
use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};
17+
use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr, TopicPartitionOffset};
1818

1919
/// Timestamp of a Kafka message.
2020
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Clone, Copy)]
@@ -346,7 +346,19 @@ impl<'a> BorrowedMessage<'a> {
346346
if ptr.err.is_error() {
347347
let err = match ptr.err {
348348
rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
349-
KafkaError::PartitionEOF(ptr.partition)
349+
let tp_ptr = unsafe { rdsys::rd_kafka_event_topic_partition(event.ptr()) };
350+
let partition = unsafe { (*tp_ptr).partition };
351+
let topic = unsafe {
352+
CStr::from_ptr((*tp_ptr).topic)
353+
.to_string_lossy()
354+
.into_owned()
355+
};
356+
let offset = unsafe { (*tp_ptr).offset };
357+
KafkaError::PartitionEOF(TopicPartitionOffset{
358+
topic,
359+
partition,
360+
offset,
361+
})
350362
}
351363
e => KafkaError::MessageConsumption(e.into()),
352364
};

src/util.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,28 @@ pub(crate) enum Deadline {
3737
Never,
3838
}
3939

40+
/// Represents the coordinates of a kafka record
41+
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
42+
pub struct TopicPartitionOffset{
43+
/// The name of the Kafka topic.
44+
pub topic: String,
45+
/// The partition within the Kafka topic.
46+
pub partition: i32,
47+
/// The offset within the specified Kafka partition.
48+
pub offset: i64,
49+
}
50+
51+
52+
impl fmt::Display for TopicPartitionOffset {
53+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54+
write!(
55+
f,
56+
"Topic: {}, Partition: {}, Offset: {}",
57+
self.topic, self.partition, self.offset
58+
)
59+
}
60+
}
61+
4062
impl Deadline {
4163
// librdkafka's flush api requires an i32 millisecond timeout
4264
const MAX_FLUSH_DURATION: Duration = Duration::from_millis(i32::MAX as u64);

tests/test_low_consumers.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,3 +555,43 @@ async fn test_invalid_consumer_position() {
555555
Err(KafkaError::MetadataFetch(RDKafkaErrorCode::UnknownGroup))
556556
);
557557
}
558+
559+
#[tokio::test]
560+
async fn test_partition_eof_error_details() {
561+
let _r = env_logger::try_init();
562+
let topic_name = rand_test_topic("test_partition_eof_error_details");
563+
let message_count = 5;
564+
populate_topic(&topic_name, message_count, &value_fn, &key_fn, Some(0), None).await;
565+
566+
let mut config_overrides = HashMap::new();
567+
config_overrides.insert("enable.partition.eof", "true");
568+
let consumer = create_base_consumer(&rand_test_group(), Some(config_overrides));
569+
570+
let mut tpl = TopicPartitionList::new();
571+
tpl.add_partition_offset(&topic_name, 0, Offset::Beginning).unwrap();
572+
consumer.assign(&tpl).unwrap();
573+
574+
for i in 0..message_count {
575+
match consumer.poll(Timeout::from(Duration::from_secs(5))) {
576+
Some(Ok(message)) => {
577+
assert_eq!(message.offset(), i as i64);
578+
assert_eq!(message.partition(), 0);
579+
assert_eq!(message.topic(), topic_name);
580+
}
581+
Some(Err(e)) => panic!("Error receiving message: {:?}", e),
582+
None => panic!("No message received within timeout"),
583+
}
584+
}
585+
586+
// The next poll should return a PartitionEOF error with detailed information
587+
match consumer.poll(Timeout::from(Duration::from_secs(5))) {
588+
Some(Err(KafkaError::PartitionEOF(tpo))) => {
589+
assert_eq!(tpo.topic, topic_name);
590+
assert_eq!(tpo.partition, 0);
591+
assert_eq!(tpo.offset, message_count as i64);
592+
}
593+
Some(Ok(_)) => panic!("Expected PartitionEOF error, got message"),
594+
Some(Err(e)) => panic!("Expected PartitionEOF error, got: {:?}", e),
595+
None => panic!("No message or error received within timeout"),
596+
}
597+
}

0 commit comments

Comments
 (0)