Skip to content

Commit 6cbe3a7

Browse files
yhl25vigith
andauthored
refactor: Merge Heartbeat and OT Buckets for Watermark Progression (#3287)
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com> Signed-off-by: Vigith Maurice <vigith@gmail.com> Co-authored-by: Vigith Maurice <vigith@gmail.com>
1 parent d021d72 commit 6cbe3a7

41 files changed

Lines changed: 1552 additions & 1479 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

pkg/apis/proto/source/v1/source.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,10 @@ message PendingResponse {
209209
*/
210210
message PartitionsResponse {
211211
message Result {
212-
// Required field holding the list of partitions.
212+
// Required field holding the list of active partitions.
213213
repeated int32 partitions = 1;
214+
// Total number of partitions in the source
215+
optional int32 total_partitions = 2;
214216
}
215217
// Required field holding the result.
216218
Result result = 1;

pkg/apis/proto/watermark/watermark.pb.go

Lines changed: 18 additions & 53 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/proto/watermark/watermark.proto

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,9 @@ message WMB {
3838

3939
// Partition to identify the partition to which the watermark belongs.
4040
int32 partition = 4;
41-
}
4241

43-
// Heartbeat is used to track the active processors
44-
message Heartbeat {
45-
// Heartbeat(current time in millis) published by the active processors.
46-
int64 heartbeat = 1;
42+
// Optional expected processor count for source watermarks.
43+
// When set, the fetcher will wait until this many processors are active before
44+
// computing a valid watermark.
45+
optional int32 processor_count = 5;
4746
}

pkg/isbsvc/interface.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,6 @@ type BufferInfo struct {
5757
TotalMessages int64
5858
}
5959

60-
func JetStreamProcessorKVName(bucketName string) string {
61-
return fmt.Sprintf("%s_PROCESSORS", bucketName)
62-
}
63-
6460
func JetStreamOTKVName(bucketName string) string {
6561
return fmt.Sprintf("%s_OT", bucketName)
6662
}

pkg/isbsvc/jetstream_service.go

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -232,25 +232,6 @@ func (jss *jetStreamSvc) CreateBuffersAndBuckets(ctx context.Context, buffers, b
232232
return fmt.Errorf("failed to create offset timeline KV %q, %w", otKVName, err)
233233
}
234234
}
235-
// Create processor KV
236-
procKVName := JetStreamProcessorKVName(bucket)
237-
if _, err := jss.js.KeyValue(procKVName); err != nil {
238-
if !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) {
239-
return fmt.Errorf("failed to query information of bucket %q during buffer creating, %w", procKVName, err)
240-
}
241-
if _, err := jss.js.CreateKeyValue(&nats.KeyValueConfig{
242-
Bucket: procKVName,
243-
MaxValueSize: v.GetInt32("procBucket.maxValueSize"),
244-
History: uint8(v.GetUint("procBucket.history")),
245-
TTL: v.GetDuration("procBucket.ttl"),
246-
MaxBytes: v.GetInt64("procBucket.maxBytes"),
247-
Storage: nats.StorageType(v.GetInt("procBucket.storage")),
248-
Replicas: v.GetInt("procBucket.replicas"),
249-
Placement: nil,
250-
}); err != nil {
251-
return fmt.Errorf("failed to create processor KV %q, %w", procKVName, err)
252-
}
253-
}
254235
}
255236
return nil
256237
}
@@ -273,11 +254,6 @@ func (jss *jetStreamSvc) DeleteBuffersAndBuckets(ctx context.Context, buffers, b
273254
return fmt.Errorf("failed to delete offset timeline KV %q, %w", otKVName, err)
274255
}
275256
log.Infow("Succeeded to delete an offset timeline KV", zap.String("kvName", otKVName))
276-
procKVName := JetStreamProcessorKVName(bucket)
277-
if err := jss.js.DeleteKeyValue(procKVName); err != nil && !errors.Is(err, nats.ErrBucketNotFound) && !errors.Is(err, nats.ErrStreamNotFound) {
278-
return fmt.Errorf("failed to delete processor KV %q, %w", procKVName, err)
279-
}
280-
log.Infow("Succeeded to delete a processor KV", zap.String("kvName", procKVName))
281257
}
282258

283259
if sideInputsStore != "" {
@@ -323,11 +299,6 @@ func (jss *jetStreamSvc) ValidateBuffersAndBuckets(ctx context.Context, buffers,
323299
if _, err := jss.js.KeyValue(otKVName); err != nil {
324300
return fmt.Errorf("failed to query OT KV %q, %w", otKVName, err)
325301
}
326-
327-
procKVName := JetStreamProcessorKVName(bucket)
328-
if _, err := jss.js.KeyValue(procKVName); err != nil {
329-
return fmt.Errorf("failed to query processor KV %q, %w", procKVName, err)
330-
}
331302
}
332303
if sideInputsStore != "" {
333304
sideInputsKVName := JetStreamSideInputsStoreKVName(sideInputsStore)

pkg/reconciler/isbsvc/installer/jetstream.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ func (r *jetStreamInstaller) Install(ctx context.Context) (*dfv1.BufferServiceCo
9797
if r.isbSvc.Spec.JetStream.GetReplicas() < 3 {
9898
// Replica can not > 1 with non-cluster mode
9999
v.Set("otbucket.replicas", 1)
100-
v.Set("procbucket.replicas", 1)
101100
v.Set("stream.replicas", 1)
102101
}
103102
b, err := yaml.Marshal(v.AllSettings())

rust/extns/numaflow-kafka/src/source.rs

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ pub struct KafkaOffset {
8585
pub offset: i64,
8686
}
8787

88+
/// Represents partition information for a Kafka source.
89+
#[derive(Debug, Clone)]
90+
pub struct KafkaPartitionsInfo {
91+
/// The partitions this consumer is actively assigned to.
92+
pub active_partitions: Vec<i32>,
93+
/// The maximum number of partitions across all configured topics.
94+
pub total_partitions: u32,
95+
}
96+
8897
enum KafkaActorMessage {
8998
Read {
9099
respond_to: oneshot::Sender<Option<Result<Vec<KafkaMessage>>>>,
@@ -97,7 +106,7 @@ enum KafkaActorMessage {
97106
respond_to: oneshot::Sender<Result<Option<usize>>>,
98107
},
99108
PartitionsInfo {
100-
respond_to: oneshot::Sender<Result<Vec<i32>>>,
109+
respond_to: oneshot::Sender<Result<KafkaPartitionsInfo>>,
101110
},
102111
}
103112

@@ -239,16 +248,16 @@ impl KafkaActor {
239248
.expect("Failed to send pending messages from Kafka actor to main task");
240249
}
241250
KafkaActorMessage::PartitionsInfo { respond_to } => {
242-
let partitions = self.partitions_info().await;
251+
let partitions = self.partitions_info();
243252
respond_to
244253
.send(partitions)
245254
.inspect_err(|e| {
246255
error!(
247256
?e,
248-
"Failed to send partition count from Kafka actor to main task"
257+
"Failed to send partition info from Kafka actor to main task"
249258
);
250259
})
251-
.expect("Failed to send partition count from Kafka actor to main task");
260+
.expect("Failed to send partition info from Kafka actor to main task");
252261
}
253262
}
254263
}
@@ -487,52 +496,50 @@ impl KafkaActor {
487496
Ok(Some(total_pending))
488497
}
489498

490-
/// Returns the partition IDs of the topic with the maximum number of partitions.
491-
/// Since we support specifying multiple topics, this method returns the Vec of partition IDs
492-
/// from the topic that has the most partitions among all configured topics.
493-
async fn partitions_info(&mut self) -> Result<Vec<i32>> {
499+
/// Returns partition information including active partitions (assigned to this consumer)
500+
/// and total partitions (max across all configured topics).
501+
/// FIXME: multi-topics needs to be handled differently
502+
fn partitions_info(&self) -> Result<KafkaPartitionsInfo> {
503+
// Get active partitions from consumer assignment
504+
let active_partitions = self
505+
.consumer
506+
.assignment()
507+
.map_err(|e| Error::Kafka(format!("Failed to get consumer assignment: {e}")))
508+
.map(|tpl| {
509+
tpl.elements()
510+
.iter()
511+
.map(|elem| elem.partition())
512+
.collect::<Vec<i32>>()
513+
})?;
514+
515+
// Get total partitions from metadata (max across all topics)
516+
let total_partitions = self.fetch_max_partitions_from_metadata()?;
517+
518+
Ok(KafkaPartitionsInfo {
519+
active_partitions,
520+
total_partitions,
521+
})
522+
}
523+
524+
/// Fetches the maximum number of partitions across all configured topics from metadata.
525+
fn fetch_max_partitions_from_metadata(&self) -> Result<u32> {
494526
let timeout = Duration::from_secs(5);
495-
let mut handles = Vec::new();
496-
for topic in &self.topics {
497-
let consumer = Arc::clone(&self.consumer);
498-
let topic = topic.clone();
527+
let mut max_partitions: u32 = 0;
499528

500-
// fetch_metadata internally calls [rd_kafka_metadata](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a84bba4a4b13fdb515f1a22d6fd4f7344)
501-
// This may be a blocking call, so we spawn a new task to run it.
502-
handles.push(tokio::task::spawn_blocking(move || {
503-
let metadata = consumer
504-
.fetch_metadata(Some(&topic), timeout)
505-
.map_err(|e| Error::Kafka(format!("Failed to fetch metadata: {e}")))?;
506-
let Some(topic_metadata) = metadata.topics().first() else {
507-
warn!(topic = topic, "No topic metadata found");
508-
return Ok(Vec::new());
509-
};
510-
let partitions: Vec<i32> =
511-
topic_metadata.partitions().iter().map(|p| p.id()).collect();
512-
Ok(partitions)
513-
}));
514-
}
515-
let mut max_partitions = 0;
516-
let mut result: Vec<i32> = Vec::new();
517-
for handle in handles {
518-
match handle.await {
519-
Ok(Ok(partitions)) => {
520-
if partitions.len() > max_partitions {
521-
max_partitions = partitions.len();
522-
result = partitions;
523-
}
524-
}
525-
Ok(Err(e)) => {
526-
error!(?e, "Error fetching partitions info");
527-
return Err(e);
528-
}
529-
Err(e) => {
530-
error!(?e, "Tokio task join error fetching partitions info");
531-
return Err(Error::Other(format!("Tokio task join error: {e}")));
532-
}
529+
for topic in &self.topics {
530+
let metadata = self
531+
.consumer
532+
.fetch_metadata(Some(topic), timeout)
533+
.map_err(|e| Error::Kafka(format!("Failed to fetch metadata: {e}")))?;
534+
535+
if let Some(topic_metadata) = metadata.topics().first() {
536+
max_partitions = max_partitions.max(topic_metadata.partitions().len() as u32);
537+
} else {
538+
warn!(topic = topic, "No topic metadata found");
533539
}
534540
}
535-
Ok(result)
541+
542+
Ok(max_partitions)
536543
}
537544
}
538545

@@ -580,7 +587,7 @@ impl KafkaSource {
580587
.map_err(|_| Error::Other("Actor task terminated".into()))?
581588
}
582589

583-
pub async fn partitions_info(&self) -> Result<Vec<i32>> {
590+
pub async fn partitions_info(&self) -> Result<KafkaPartitionsInfo> {
584591
let (tx, rx) = oneshot::channel();
585592
let msg = KafkaActorMessage::PartitionsInfo { respond_to: tx };
586593
let _ = self.actor_tx.send(msg).await;

0 commit comments

Comments
 (0)