Skip to content

Commit 055b00b

Browse files
authored
fix: fix 879 (#902)
1 parent 48c39ee commit 055b00b

2 files changed

Lines changed: 36 additions & 5 deletions

File tree

pulsar/consumer_test.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -804,12 +804,22 @@ func TestConsumerCompression(t *testing.T) {
804804
topicName := newTopicName()
805805
ctx := context.Background()
806806

807-
producer, err := client.CreateProducer(ProducerOptions{
807+
// enable batching
808+
p1, err := client.CreateProducer(ProducerOptions{
808809
Topic: topicName,
809810
CompressionType: LZ4,
810811
})
811812
assert.Nil(t, err)
812-
defer producer.Close()
813+
defer p1.Close()
814+
815+
// disable batching
816+
p2, err := client.CreateProducer(ProducerOptions{
817+
Topic: topicName,
818+
CompressionType: LZ4,
819+
DisableBatching: true,
820+
})
821+
assert.Nil(t, err)
822+
defer p2.Close()
813823

814824
consumer, err := client.Subscribe(ConsumerOptions{
815825
Topic: topicName,
@@ -821,8 +831,16 @@ func TestConsumerCompression(t *testing.T) {
821831
const N = 100
822832

823833
for i := 0; i < N; i++ {
824-
if _, err := producer.Send(ctx, &ProducerMessage{
825-
Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
834+
if _, err := p1.Send(ctx, &ProducerMessage{
835+
Payload: []byte(fmt.Sprintf("msg-content-%d-batching-enabled", i)),
836+
}); err != nil {
837+
t.Fatal(err)
838+
}
839+
}
840+
841+
for i := 0; i < N; i++ {
842+
if _, err := p2.Send(ctx, &ProducerMessage{
843+
Payload: []byte(fmt.Sprintf("msg-content-%d-batching-disabled", i)),
826844
}); err != nil {
827845
t.Fatal(err)
828846
}
@@ -831,7 +849,14 @@ func TestConsumerCompression(t *testing.T) {
831849
for i := 0; i < N; i++ {
832850
msg, err := consumer.Receive(ctx)
833851
assert.Nil(t, err)
834-
assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
852+
assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-enabled", i), string(msg.Payload()))
853+
consumer.Ack(msg)
854+
}
855+
856+
for i := 0; i < N; i++ {
857+
msg, err := consumer.Receive(ctx)
858+
assert.Nil(t, err)
859+
assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-disabled", i), string(msg.Payload()))
835860
consumer.Ack(msg)
836861
}
837862
}

pulsar/producer_partition.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,12 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
574574
compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
575575
compressedSize = len(compressedPayload)
576576
checkSize = compressedSize
577+
578+
// set the compress type in msgMetaData
579+
compressionType := pb.CompressionType(p.options.CompressionType)
580+
if compressionType != pb.CompressionType_NONE {
581+
mm.Compression = &compressionType
582+
}
577583
} else {
578584
// final check for batching message is in serializeMessage
579585
// this is a double check

0 commit comments

Comments
 (0)