11"""Module for the LangchainSummarizer class."""
22
3+ import asyncio
34import logging
4- import traceback
55from typing import Optional
66
77from langchain .text_splitter import RecursiveCharacterTextSplitter
88from langchain_core .documents import Document
99from langchain_core .runnables import Runnable , RunnableConfig , ensure_config
10+ from openai import APIConnectionError , APIError , APITimeoutError , RateLimitError
1011
12+ from admin_api_lib .impl .settings .summarizer_settings import SummarizerSettings
1113from admin_api_lib .summarizer .summarizer import (
1214 Summarizer ,
1315 SummarizerInput ,
1416 SummarizerOutput ,
1517)
1618from rag_core_lib .impl .langfuse_manager .langfuse_manager import LangfuseManager
19+ from rag_core_lib .impl .settings .retry_decorator_settings import RetryDecoratorSettings
1720from rag_core_lib .impl .utils .async_threadsafe_semaphore import AsyncThreadsafeSemaphore
21+ from rag_core_lib .impl .utils .retry_decorator import retry_with_backoff
1822
1923logger = logging .getLogger (__name__ )
2024
@@ -32,10 +36,15 @@ def __init__(
3236 langfuse_manager : LangfuseManager ,
3337 chunker : RecursiveCharacterTextSplitter ,
3438 semaphore : AsyncThreadsafeSemaphore ,
39+ summarizer_settings : SummarizerSettings ,
40+ retry_decorator_settings : RetryDecoratorSettings ,
3541 ):
3642 self ._chunker = chunker
3743 self ._langfuse_manager = langfuse_manager
3844 self ._semaphore = semaphore
45+ self ._retry_decorator_settings = self ._create_retry_decorator_settings (
46+ summarizer_settings , retry_decorator_settings
47+ )
3948
4049 async def ainvoke (self , query : SummarizerInput , config : Optional [RunnableConfig ] = None ) -> SummarizerOutput :
4150 """
@@ -65,40 +74,88 @@ async def ainvoke(self, query: SummarizerInput, config: Optional[RunnableConfig]
6574 """
6675 assert query , "Query is empty: %s" % query # noqa S101
6776 config = ensure_config (config )
68- tries_remaining = config .get ("configurable" , {}).get ("tries_remaining" , 3 )
69- logger .debug ("Tries remaining %d" % tries_remaining )
7077
71- if tries_remaining < 0 :
72- raise Exception ("Summary creation failed." )
7378 document = Document (page_content = query )
7479 langchain_documents = self ._chunker .split_documents ([document ])
80+ logger .debug ("Summarizing %d chunk(s)..." , len (langchain_documents ))
7581
76- outputs = []
77- for langchain_document in langchain_documents :
78- async with self ._semaphore :
79- try :
80- result = await self ._create_chain ().ainvoke ({"text" : langchain_document .page_content }, config )
81- # Extract content from AIMessage if it's not already a string
82- content = result .content if hasattr (result , "content" ) else str (result )
83- outputs .append (content )
84- except Exception as e :
85- logger .error ("Error in summarizing langchain doc: %s %s" , e , traceback .format_exc ())
86- config ["tries_remaining" ] = tries_remaining - 1
87- result = await self ._create_chain ().ainvoke ({"text" : langchain_document .page_content }, config )
88- # Extract content from AIMessage if it's not already a string
89- content = result .content if hasattr (result , "content" ) else str (result )
90- outputs .append (content )
82+ # Fan out with concurrency, bounded by your semaphore inside _summarize_chunk
83+ tasks = [asyncio .create_task (self ._summarize_chunk (doc .page_content , config )) for doc in langchain_documents ]
84+ outputs = await asyncio .gather (* tasks )
9185
9286 if len (outputs ) == 1 :
9387 return outputs [0 ]
94- summary = " " .join (outputs )
88+
89+ # Optional single reduce pass (no recursion)
90+ merged = " " .join (outputs )
9591 logger .debug (
96- "Reduced number of chars from %d to %d"
97- % (len ("" .join ([x .page_content for x in langchain_documents ])), len (summary ))
92+ "Reduced number of chars from %d to %d" ,
93+ len ("" .join ([x .page_content for x in langchain_documents ])),
94+ len (merged ),
95+ )
96+ return await self ._summarize_chunk (merged , config )
97+
98+ def _create_retry_decorator_settings (
99+ self , summarizer_settings : SummarizerSettings , retry_decorator_settings : RetryDecoratorSettings
100+ ):
101+ return RetryDecoratorSettings (
102+ max_retries = (
103+ summarizer_settings .max_retries
104+ if summarizer_settings .max_retries is not None
105+ else retry_decorator_settings .max_retries
106+ ),
107+ retry_base_delay = (
108+ summarizer_settings .retry_base_delay
109+ if summarizer_settings .retry_base_delay is not None
110+ else retry_decorator_settings .retry_base_delay
111+ ),
112+ retry_max_delay = (
113+ summarizer_settings .retry_max_delay
114+ if summarizer_settings .retry_max_delay is not None
115+ else retry_decorator_settings .retry_max_delay
116+ ),
117+ backoff_factor = (
118+ summarizer_settings .backoff_factor
119+ if summarizer_settings .backoff_factor is not None
120+ else retry_decorator_settings .backoff_factor
121+ ),
122+ attempt_cap = (
123+ summarizer_settings .attempt_cap
124+ if summarizer_settings .attempt_cap is not None
125+ else retry_decorator_settings .attempt_cap
126+ ),
127+ jitter_min = (
128+ summarizer_settings .jitter_min
129+ if summarizer_settings .jitter_min is not None
130+ else retry_decorator_settings .jitter_min
131+ ),
132+ jitter_max = (
133+ summarizer_settings .jitter_max
134+ if summarizer_settings .jitter_max is not None
135+ else retry_decorator_settings .jitter_max
136+ ),
98137 )
99- return await self .ainvoke (summary , config )
100138
101139 def _create_chain (self ) -> Runnable :
102140 return self ._langfuse_manager .get_base_prompt (self .__class__ .__name__ ) | self ._langfuse_manager .get_base_llm (
103141 self .__class__ .__name__
104142 )
143+
144+ def _retry_with_backoff_wrapper (self ):
145+ # Prefer summarizer-specific overrides; fall back to global retry settings
146+ return retry_with_backoff (
147+ settings = self ._retry_decorator_settings ,
148+ exceptions = (APIError , RateLimitError , APITimeoutError , APIConnectionError ),
149+ rate_limit_exceptions = (RateLimitError ,),
150+ logger = logger ,
151+ )
152+
153+ async def _summarize_chunk (self , text : str , config : Optional [RunnableConfig ]) -> SummarizerOutput :
154+ @self ._retry_with_backoff_wrapper ()
155+ async def _call (text : str , config : Optional [RunnableConfig ]) -> SummarizerOutput :
156+ response = await self ._create_chain ().ainvoke ({"text" : text }, config )
157+ return response .content if hasattr (response , "content" ) else str (response )
158+
159+ # Hold the semaphore for the entire retry lifecycle
160+ async with self ._semaphore :
161+ return await _call (text , config )
0 commit comments