11"""
22OpenAI API interface for LLMs
3+
4+ This module also supports a "manual mode" (human-in-the-loop) where prompts are written
5+ to a task queue directory and the system waits for a corresponding *.answer.json file
36"""
47
58import asyncio
9+ import json
610import logging
711import time
12+ import uuid
13+ from datetime import datetime , timezone
14+ from pathlib import Path
815from typing import Any , Dict , List , Optional , Union
916
1017import openai
1118
12- from openevolve .config import LLMConfig
1319from openevolve .llm .base import LLMInterface
1420
1521logger = logging .getLogger (__name__ )
1622
1723
24+ def _iso_now () -> str :
25+ return datetime .now (tz = timezone .utc ).isoformat ()
26+
27+
28+ def _build_display_prompt (messages : List [Dict [str , str ]]) -> str :
29+ """
30+ Render messages into a single plain-text prompt for the manual UI.
31+ """
32+ chunks : List [str ] = []
33+ for m in messages :
34+ role = str (m .get ("role" , "user" )).upper ()
35+ content = m .get ("content" , "" )
36+ chunks .append (f"### { role } \n { content } \n " )
37+ return "\n " .join (chunks ).rstrip () + "\n "
38+
39+
40+ def _atomic_write_json (path : Path , payload : Dict [str , Any ]) -> None :
41+ path .parent .mkdir (parents = True , exist_ok = True )
42+ tmp = path .parent / f".{ path .name } .tmp"
43+ tmp .write_text (json .dumps (payload , ensure_ascii = False , indent = 2 ), encoding = "utf-8" )
44+ tmp .replace (path )
45+
46+
1847class OpenAILLM (LLMInterface ):
1948 """LLM interface using OpenAI-compatible APIs"""
2049
@@ -35,15 +64,30 @@ def __init__(
3564 self .random_seed = getattr (model_cfg , "random_seed" , None )
3665 self .reasoning_effort = getattr (model_cfg , "reasoning_effort" , None )
3766
38- # Set up API client
39- # OpenAI client requires max_retries to be int, not None
40- max_retries = self .retries if self .retries is not None else 0
41- self .client = openai .OpenAI (
42- api_key = self .api_key ,
43- base_url = self .api_base ,
44- timeout = self .timeout ,
45- max_retries = max_retries ,
46- )
67+ # Manual mode: enabled via llm.manual_mode in config.yaml
68+ self .manual_mode = (getattr (model_cfg , "manual_mode" , False ) is True )
69+ self .manual_queue_dir : Optional [Path ] = None
70+
71+ if self .manual_mode :
72+ qdir = getattr (model_cfg , "_manual_queue_dir" , None )
73+ if not qdir :
74+ raise ValueError (
75+ "Manual mode is enabled but manual_queue_dir is missing. "
76+ "This should be injected by the OpenEvolve controller."
77+ )
78+ self .manual_queue_dir = Path (str (qdir )).expanduser ().resolve ()
79+ self .manual_queue_dir .mkdir (parents = True , exist_ok = True )
80+ self .client = None
81+ else :
82+ # Set up API client (normal mode)
83+ # OpenAI client requires max_retries to be int, not None
84+ max_retries = self .retries if self .retries is not None else 0
85+ self .client = openai .OpenAI (
86+ api_key = self .api_key ,
87+ base_url = self .api_base ,
88+ timeout = self .timeout ,
89+ max_retries = max_retries ,
90+ )
4791
4892 # Only log unique models to reduce duplication
4993 if not hasattr (logger , "_initialized_models" ):
@@ -122,8 +166,9 @@ async def generate_with_context(
122166
123167 # Add seed parameter for reproducibility if configured
124168 # Skip seed parameter for Google AI Studio endpoint as it doesn't support it
169+ # Seed only makes sense for actual API calls
125170 seed = kwargs .get ("seed" , self .random_seed )
126- if seed is not None :
171+ if seed is not None and not self . manual_mode :
127172 if self .api_base == "https://generativelanguage.googleapis.com/v1beta/openai/" :
128173 logger .warning (
129174 "Skipping seed parameter as Google AI Studio endpoint doesn't support it. "
@@ -135,6 +180,12 @@ async def generate_with_context(
135180 # Attempt the API call with retries
136181 retries = kwargs .get ("retries" , self .retries )
137182 retry_delay = kwargs .get ("retry_delay" , self .retry_delay )
183+
184+ # Manual mode: no timeout unless explicitly passed by the caller
185+ if self .manual_mode :
186+ timeout = kwargs .get ("timeout" , None )
187+ return await self ._manual_wait_for_answer (params , timeout = timeout )
188+
138189 timeout = kwargs .get ("timeout" , self .timeout )
139190
140191 for attempt in range (retries + 1 ):
@@ -160,6 +211,9 @@ async def generate_with_context(
160211
161212 async def _call_api (self , params : Dict [str , Any ]) -> str :
162213 """Make the actual API call"""
214+ if self .client is None :
215+ raise RuntimeError ("OpenAI client is not initialized (manual_mode enabled?)" )
216+
163217 # Use asyncio to run the blocking API call in a thread pool
164218 loop = asyncio .get_event_loop ()
165219 response = await loop .run_in_executor (
@@ -170,3 +224,63 @@ async def _call_api(self, params: Dict[str, Any]) -> str:
170224 logger .debug (f"API parameters: { params } " )
171225 logger .debug (f"API response: { response .choices [0 ].message .content } " )
172226 return response .choices [0 ].message .content
227+
228+ async def _manual_wait_for_answer (
229+ self , params : Dict [str , Any ], timeout : Optional [Union [int , float ]]
230+ ) -> str :
231+ """
232+ Manual mode: write a task JSON file and poll for *.answer.json
233+ If timeout is provided, we respect it; otherwise we wait indefinitely
234+ """
235+
236+ if self .manual_queue_dir is None :
237+ raise RuntimeError ("manual_queue_dir is not initialized" )
238+
239+ task_id = str (uuid .uuid4 ())
240+ messages = params .get ("messages" , [])
241+ display_prompt = _build_display_prompt (messages )
242+
243+ task_payload : Dict [str , Any ] = {
244+ "id" : task_id ,
245+ "created_at" : _iso_now (),
246+ "model" : params .get ("model" ),
247+ "display_prompt" : display_prompt ,
248+ "messages" : messages ,
249+ "meta" : {
250+ "max_tokens" : params .get ("max_tokens" ),
251+ "max_completion_tokens" : params .get ("max_completion_tokens" ),
252+ "temperature" : params .get ("temperature" ),
253+ "top_p" : params .get ("top_p" ),
254+ "reasoning_effort" : params .get ("reasoning_effort" ),
255+ "verbosity" : params .get ("verbosity" ),
256+ },
257+ }
258+
259+ task_path = self .manual_queue_dir / f"{ task_id } .json"
260+ answer_path = self .manual_queue_dir / f"{ task_id } .answer.json"
261+
262+ _atomic_write_json (task_path , task_payload )
263+ logger .info (f"[manual_mode] Task enqueued: { task_path } " )
264+
265+ start = time .time ()
266+ poll_interval = 0.5
267+
268+ while True :
269+ if answer_path .exists ():
270+ try :
271+ data = json .loads (answer_path .read_text (encoding = "utf-8" ))
272+ except Exception as e :
273+ logger .warning (f"[manual_mode] Failed to parse answer JSON for { task_id } : { e } " )
274+ await asyncio .sleep (poll_interval )
275+ continue
276+
277+ answer = str (data .get ("answer" ) or "" )
278+ logger .info (f"[manual_mode] Answer received for { task_id } " )
279+ return answer
280+
281+ if timeout is not None and (time .time () - start ) > float (timeout ):
282+ raise asyncio .TimeoutError (
283+ f"Manual mode timed out after { timeout } seconds waiting for answer of task { task_id } "
284+ )
285+
286+ await asyncio .sleep (poll_interval )
0 commit comments