@@ -55,6 +55,11 @@ def _format_tool_call_message(tool_call_message: ChatMessage) -> Dict[str, Any]:
5555 Dictionary representing the tool call message in Bedrock's expected format
5656 """
5757 content : List [Dict [str , Any ]] = []
58+
59+ # tool call messages can contain reasoning content
60+ if reasoning_contents := tool_call_message .meta .get ("reasoning_contents" ):
61+ content .extend (_format_reasoning_contents (reasoning_contents = reasoning_contents ))
62+
5863 # Tool call message can contain text
5964 if tool_call_message .text :
6065 content .append ({"text" : tool_call_message .text })
@@ -157,6 +162,24 @@ def _repair_tool_result_messages(bedrock_formatted_messages: List[Dict[str, Any]
157162 return [msg for _ , msg in repaired_bedrock_formatted_messages ]
158163
159164
165+ def _format_reasoning_contents (reasoning_contents : List [Dict [str , Any ]]) -> List [Dict [str , Any ]]:
166+ """
167+ Format reasoning contents to match Bedrock's expected structure.
168+
169+ :param reasoning_contents: List of reasoning content dictionaries from Haystack ChatMessage metadata.
170+ :returns: List of formatted reasoning content dictionaries for Bedrock.
171+ """
172+ formatted_contents = []
173+ for reasoning_content in reasoning_contents :
174+ formatted_content = {"reasoningContent" : reasoning_content ["reasoning_content" ]}
175+ if reasoning_text := formatted_content ["reasoningContent" ].pop ("reasoning_text" , None ):
176+ formatted_content ["reasoningContent" ]["reasoningText" ] = reasoning_text
177+ if redacted_content := formatted_content ["reasoningContent" ].pop ("redacted_content" , None ):
178+ formatted_content ["reasoningContent" ]["redactedContent" ] = redacted_content
179+ formatted_contents .append (formatted_content )
180+ return formatted_contents
181+
182+
160183def _format_text_image_message (message : ChatMessage ) -> Dict [str , Any ]:
161184 """
162185 Format a Haystack ChatMessage containing text and optional image content into Bedrock format.
@@ -168,6 +191,10 @@ def _format_text_image_message(message: ChatMessage) -> Dict[str, Any]:
168191 content_parts = message ._content
169192
170193 bedrock_content_blocks : List [Dict [str , Any ]] = []
194+ # Add reasoning content if available as the first content block
195+ if message .meta .get ("reasoning_contents" ):
196+ bedrock_content_blocks .extend (_format_reasoning_contents (reasoning_contents = message .meta ["reasoning_contents" ]))
197+
171198 for part in content_parts :
172199 if isinstance (part , TextContent ):
173200 bedrock_content_blocks .append ({"text" : part .text })
@@ -221,7 +248,6 @@ def _format_messages(messages: List[ChatMessage]) -> Tuple[List[Dict[str, Any]],
221248 return system_prompts , repaired_bedrock_formatted_messages
222249
223250
224- # Bedrock to Haystack util method
225251def _parse_completion_response (response_body : Dict [str , Any ], model : str ) -> List [ChatMessage ]:
226252 """
227253 Parse a Bedrock API response into Haystack ChatMessage objects.
@@ -255,6 +281,7 @@ def _parse_completion_response(response_body: Dict[str, Any], model: str) -> Lis
255281 # Process all content blocks and combine them into a single message
256282 text_content = []
257283 tool_calls = []
284+ reasoning_contents = []
258285 for content_block in content_blocks :
259286 if "text" in content_block :
260287 text_content .append (content_block ["text" ])
@@ -267,14 +294,24 @@ def _parse_completion_response(response_body: Dict[str, Any], model: str) -> Lis
267294 arguments = tool_use .get ("input" , {}),
268295 )
269296 tool_calls .append (tool_call )
297+ elif "reasoningContent" in content_block :
298+ reasoning_content = content_block ["reasoningContent" ]
299+ # If reasoningText is present, replace it with reasoning_text
300+ if "reasoningText" in reasoning_content :
301+ reasoning_content ["reasoning_text" ] = reasoning_content .pop ("reasoningText" )
302+ if "redactedContent" in reasoning_content :
303+ reasoning_content ["redacted_content" ] = reasoning_content .pop ("redactedContent" )
304+ reasoning_contents .append ({"reasoning_content" : reasoning_content })
305+
306+ # If reasoning contents were found, add them to the base meta
307+ base_meta .update ({"reasoning_contents" : reasoning_contents })
270308
271309 # Create a single ChatMessage with combined text and tool calls
272310 replies .append (ChatMessage .from_assistant (" " .join (text_content ), tool_calls = tool_calls , meta = base_meta ))
273311
274312 return replies
275313
276314
277- # Bedrock streaming to Haystack util methods
278315def _convert_event_to_streaming_chunk (
279316 event : Dict [str , Any ], model : str , component_info : ComponentInfo
280317) -> StreamingChunk :
@@ -367,6 +404,22 @@ def _convert_event_to_streaming_chunk(
367404 "received_at" : datetime .now (timezone .utc ).isoformat (),
368405 },
369406 )
407+ # This is for accumulating reasoning content deltas
408+ elif "reasoningContent" in delta :
409+ reasoning_content = delta ["reasoningContent" ]
410+ if "redactedContent" in reasoning_content :
411+ reasoning_content ["redacted_content" ] = reasoning_content .pop ("redactedContent" )
412+ streaming_chunk = StreamingChunk (
413+ content = "" ,
414+ meta = {
415+ "model" : model ,
416+ "index" : 0 ,
417+ "tool_calls" : None ,
418+ "finish_reason" : None ,
419+ "received_at" : datetime .now (timezone .utc ).isoformat (),
420+ "reasoning_contents" : [{"index" : block_idx , "reasoning_content" : reasoning_content }],
421+ },
422+ )
370423
371424 elif "messageStop" in event :
372425 finish_reason = event ["messageStop" ].get ("stopReason" )
@@ -406,6 +459,66 @@ def _convert_event_to_streaming_chunk(
406459 return streaming_chunk
407460
408461
462+ def _process_reasoning_contents (chunks : List [StreamingChunk ]) -> List [Dict [str , Any ]]:
463+ """
464+ Process reasoning contents from a list of StreamingChunk objects into the Bedrock expected format.
465+
466+ :param chunks: List of StreamingChunk objects potentially containing reasoning contents.
467+
468+ :returns: List of Bedrock formatted reasoning content dictionaries
469+ """
470+ formatted_reasoning_contents = []
471+ current_index = None
472+ reasoning_text = ""
473+ reasoning_signature = None
474+ redacted_content = None
475+ for chunk in chunks :
476+ reasoning_contents = chunk .meta .get ("reasoning_contents" , [])
477+
478+ for reasoning_content in reasoning_contents :
479+ content_block_index = reasoning_content ["index" ]
480+
481+ # Start new group when index changes
482+ if current_index is not None and content_block_index != current_index :
483+ # Finalize current group
484+ if reasoning_text :
485+ formatted_reasoning_contents .append (
486+ {
487+ "reasoning_content" : {
488+ "reasoning_text" : {"text" : reasoning_text , "signature" : reasoning_signature },
489+ }
490+ }
491+ )
492+ if redacted_content :
493+ formatted_reasoning_contents .append ({"reasoning_content" : {"redacted_content" : redacted_content }})
494+ reasoning_text = ""
495+ reasoning_signature = None
496+ redacted_content = None
497+
498+ # Accumulate content for current index
499+ current_index = content_block_index
500+ reasoning_text += reasoning_content ["reasoning_content" ].get ("text" , "" )
501+ if "redacted_content" in reasoning_content ["reasoning_content" ]:
502+ redacted_content = reasoning_content ["reasoning_content" ]["redacted_content" ]
503+ if "signature" in reasoning_content ["reasoning_content" ]:
504+ reasoning_signature = reasoning_content ["reasoning_content" ]["signature" ]
505+
506+ # Finalize the last group
507+ if current_index is not None :
508+ if reasoning_text :
509+ formatted_reasoning_contents .append (
510+ {
511+ "reasoning_content" : {
512+ "reasoning_text" : {"text" : reasoning_text , "signature" : reasoning_signature },
513+ }
514+ }
515+ )
516+ if redacted_content :
517+ formatted_reasoning_contents .append ({"reasoning_content" : {"redacted_content" : redacted_content }})
518+
519+ return formatted_reasoning_contents
520+
521+
409522def _convert_streaming_chunks_to_chat_message (chunks : List [StreamingChunk ]) -> ChatMessage :
410523 """
411524 Converts a list of streaming chunks into a ChatMessage object.
@@ -421,8 +534,12 @@ def _convert_streaming_chunks_to_chat_message(chunks: List[StreamingChunk]) -> C
421534 A ChatMessage object constructed from the streaming chunks, containing the aggregated text, processed tool
422535 calls, and metadata.
423536 """
537+ # Join all text content from the chunks
424538 text = "" .join ([chunk .content for chunk in chunks ])
425539
540+ # If reasoning content is present in any chunk, accumulate it
541+ reasoning_contents = _process_reasoning_contents (chunks = chunks )
542+
426543 # Process tool calls if present in any chunk
427544 tool_calls = []
428545 tool_call_data : Dict [int , Dict [str , str ]] = {} # Track tool calls by index
@@ -474,6 +591,7 @@ def _convert_streaming_chunks_to_chat_message(chunks: List[StreamingChunk]) -> C
474591 "finish_reason" : finish_reason ,
475592 "completion_start_time" : chunks [0 ].meta .get ("received_at" ), # first chunk received
476593 "usage" : usage ,
594+ "reasoning_contents" : reasoning_contents ,
477595 }
478596
479597 return ChatMessage .from_assistant (text = text or None , tool_calls = tool_calls , meta = meta )
0 commit comments