Skip to content

Commit e0dfc22

Browse files
dazumaclaude
andcommitted
feat: Raise BatchNotSupportedError for batch content in KafkaBinding
Add BatchNotSupportedError to the error hierarchy and raise it from KafkaBinding#decode_event when given an application/cloudevents-batch content type, since the Kafka protocol binding does not support batch content mode per the spec. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Daniel Azuma <dazuma@gmail.com>
1 parent 678485d commit e0dfc22

3 files changed

Lines changed: 36 additions & 9 deletions

File tree

lib/cloud_events/errors.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ class SpecVersionError < CloudEventsError
5151
class AttributeError < CloudEventsError
5252
end
5353

54+
##
55+
# An error raised when a protocol binding that does not support batch
56+
# content mode receives a batch. For example, the Kafka protocol binding
57+
# does not support batches per the CloudEvents spec.
58+
#
59+
class BatchNotSupportedError < CloudEventsError
60+
end
61+
5462
##
5563
# Alias of UnsupportedFormatError, for backward compatibility.
5664
#

lib/cloud_events/kafka_binding.rb

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,7 @@ def decode_event(message, allow_opaque: false, reverse_key_mapper: :NOT_SET, **f
187187
content_type_string = headers["content-type"]
188188
content_type = ContentType.new(content_type_string) if content_type_string
189189

190-
event = if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents"
191-
decode_structured_content(message, content_type, allow_opaque, **format_args)
192-
elsif headers.key?("ce_specversion")
193-
decode_binary_content(message, content_type, **format_args)
194-
end
195-
unless event
196-
ct_desc = content_type_string ? content_type_string.inspect : "not present"
197-
raise(NotCloudEventError, "content-type is #{ct_desc}, and ce_specversion header is not present")
198-
end
190+
event = decode_content(message, headers, content_type, content_type_string, allow_opaque, **format_args)
199191
apply_reverse_key_mapper(event, message[:key], reverse_key_mapper)
200192
end
201193

@@ -229,6 +221,22 @@ def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format
229221

230222
private
231223

224+
# Detect content mode, reject batches, and dispatch to the appropriate
225+
# binary or structured decoder. Raises if the message is not a CloudEvent.
226+
def decode_content(message, headers, content_type, content_type_string, allow_opaque, **format_args)
227+
if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents-batch"
228+
raise(BatchNotSupportedError, "Kafka protocol binding does not support batch content mode")
229+
end
230+
if content_type&.media_type == "application" && content_type.subtype_base == "cloudevents"
231+
return decode_structured_content(message, content_type, allow_opaque, **format_args)
232+
end
233+
if headers.key?("ce_specversion")
234+
return decode_binary_content(message, content_type, **format_args)
235+
end
236+
ct_desc = content_type_string ? content_type_string.inspect : "not present"
237+
raise(NotCloudEventError, "content-type is #{ct_desc}, and ce_specversion header is not present")
238+
end
239+
232240
# Decode a single event from binary content mode. Reads ce_* headers as
233241
# attributes and the message value as event data.
234242
def decode_binary_content(message, content_type, **format_args)

test/test_kafka_binding.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,17 @@
274274
kafka_binding.decode_event(message, reverse_key_mapper: nil)
275275
end
276276
end
277+
278+
it "raises BatchNotSupportedError for batch content type" do
279+
message = {
280+
key: nil,
281+
value: "[{}]",
282+
headers: { "content-type" => "application/cloudevents-batch+json" },
283+
}
284+
assert_raises CloudEvents::BatchNotSupportedError do
285+
kafka_binding.decode_event(message, reverse_key_mapper: nil)
286+
end
287+
end
277288
end
278289

279290
describe "decode_event structured mode" do

0 commit comments

Comments
 (0)