@@ -228,6 +228,8 @@ def encode_event(event, structured_format: false, key_mapper: :NOT_SET, **format
228228
229229 private
230230
231+ # Decode a single event from binary content mode. Reads ce_* headers as
232+ # attributes and the message value as event data.
231233 def decode_binary_content ( message , content_type , **format_args )
232234 headers = message [ :headers ] || { }
233235 spec_version = headers [ "ce_specversion" ]
@@ -246,6 +248,9 @@ def decode_binary_content(message, content_type, **format_args)
246248 Event . create ( spec_version : spec_version , set_attributes : attributes )
247249 end
248250
251+ # Populate data-related attributes (data_encoded, data, data_content_type)
252+ # by running the value through registered data decoders. Returns the
253+ # (possibly updated) content_type.
249254 def populate_data_attributes ( attributes , value , content_type , spec_version , format_args )
250255 attributes [ "data_encoded" ] = value
251256 result = @data_decoders . decode_data ( spec_version : spec_version ,
@@ -259,6 +264,8 @@ def populate_data_attributes(attributes, value, content_type, spec_version, form
259264 content_type
260265 end
261266
267+ # Decode a single event from structured content mode. Delegates to
268+ # registered event decoders, falling back to Event::Opaque if allowed.
262269 def decode_structured_content ( message , content_type , allow_opaque , **format_args )
263270 content = message [ :value ] . to_s
264271 result = @event_decoders . decode_event ( content : content ,
@@ -270,13 +277,18 @@ def decode_structured_content(message, content_type, allow_opaque, **format_args
270277 raise ( UnsupportedFormatError , "Unknown cloudevents content type: #{ content_type } " )
271278 end
272279
280+ # Apply the reverse_key_mapper to merge Kafka record key attributes into
281+ # the decoded event. Returns the event unchanged if the mapper is nil or
282+ # returns an empty hash.
273283 def apply_reverse_key_mapper ( event , key , reverse_key_mapper )
274284 return event unless reverse_key_mapper
275285 mapped_attrs = reverse_key_mapper . call ( key )
276286 return event if mapped_attrs . nil? || mapped_attrs . empty?
277287 event . with ( **mapped_attrs . transform_keys ( &:to_sym ) )
278288 end
279289
290+ # Encode an event in binary content mode. Writes attributes as ce_*
291+ # headers and event data as the message value.
280292 def encode_binary_event ( event , key_mapper , **format_args )
281293 key = key_mapper &.call ( event )
282294 headers = { }
@@ -289,6 +301,8 @@ def encode_binary_event(event, key_mapper, **format_args)
289301 { key : key , value : body , headers : headers }
290302 end
291303
304+ # Encode an event in structured content mode using a named format encoder.
305+ # The entire event is serialized into the message value.
292306 def encode_structured_event ( event , structured_format , key_mapper , **format_args )
293307 key = key_mapper &.call ( event )
294308 structured_format = default_encoder_name if structured_format == true
@@ -300,10 +314,15 @@ def encode_structured_event(event, structured_format, key_mapper, **format_args)
300314 { key : key , value : result [ :content ] , headers : { "content-type" => result [ :content_type ] . to_s } }
301315 end
302316
317+ # Encode an opaque event by passing through its content and content_type
318+ # directly, with a nil key.
303319 def encode_opaque_event ( event )
304320 { key : nil , value : event . content , headers : { "content-type" => event . content_type . to_s } }
305321 end
306322
323+ # Extract the event data and content type for binary mode encoding.
324+ # Uses data_encoded if present, otherwise delegates to data encoders.
325+ # Returns [body, content_type], where body is nil for tombstones.
307326 def extract_event_data ( event , format_args )
308327 body = event . data_encoded
309328 if body
0 commit comments