@@ -803,8 +803,7 @@ def push(self, task: ExplorationTask) -> bool:
803803 self ._task_counter += 1
804804 task_id = self ._task_counter
805805 self ._tasks [task_id ] = task
806-
807-
806+
808807 heapq .heappush (self ._heap , (- task .priority , self ._push_counter , task_id ))
809808
810809 self ._push_counter += 1
@@ -853,6 +852,14 @@ def pop(self) -> ExplorationTask | None:
853852 if task is None :
854853 continue # stale entry — task was already removed
855854
855+ # Remove the key from _submitted so the set only tracks tasks that
856+ # are *currently* in the queue. Without this removal, _submitted
857+ # would grow without bound over a long run because tasks are added
858+ # on push() but were never removed. Duplicate-submission prevention
859+ # is now handled by ExploredPairsLog.has() in _enqueue_perturbations.
860+ key = (task .node_id , tuple (task .afir_params ))
861+ self ._submitted .discard (key )
862+
856863 # Lazy delta_E / priority refresh for the single task being returned.
857864 if self ._current_ref_e is not None :
858865 node_g = task .metadata .get ("source_node_free_energy" )
@@ -1460,6 +1467,11 @@ def get_node(self, node_id: int) -> EQNode | None:
14601467 def all_nodes (self ) -> list [EQNode ]:
14611468 return list (self ._nodes .values ())
14621469
1470+ @property
1471+ def node_count (self ) -> int :
1472+ """Return the number of nodes in O(1) without allocating a list copy."""
1473+ return len (self ._nodes )
1474+
14631475 def next_node_id (self ) -> int :
14641476 nid = self ._node_counter
14651477 self ._node_counter += 1
@@ -2495,7 +2507,7 @@ def run(self) -> None:
24952507 try :
24962508 profile_dirs = self ._run_autots (task , run_dir )
24972509 except Exception as e :
2498- logger .error (f "AutoTS failed for run { run_dir } : { e } " )
2510+ logger .error ("AutoTS failed for run %s: %s" , run_dir , e )
24992511 self ._flush_node_energy_updates ()
25002512 self ._save_run_metadata (run_dir , task , status = "FAILED" , profile_dirs = [])
25012513 self .graph .save (self .graph_json_path )
@@ -2526,7 +2538,7 @@ def run(self) -> None:
25262538 self .graph .save (self .graph_json_path )
25272539 self .explored_log .close ()
25282540
2529- def _collect_task_batch (self , n : int ) -> list [tuple [ExplorationTask , str , str , str , str , int ]]:
2541+ def _collect_task_batch (self , n : int ) -> list [tuple [ExplorationTask , str , str , int , int , int ]]:
25302542 """Pop up to *n* ready tasks and create their run directories.
25312543
25322544 Returns a list of ``(task, run_dir, gamma_sign, atom_i, atom_j, iteration)``
@@ -2575,12 +2587,12 @@ def _collect_task_batch(self, n: int) -> list[tuple[ExplorationTask, str, str, s
25752587 # iteration number and therefore its own run directory.
25762588 self ._iteration += 1
25772589 run_dir = self ._make_run_dir (task , iteration = self ._iteration )
2578- batch .append ((task , run_dir , gamma_sign , str ( atom_i ), str ( atom_j ) , self ._iteration ))
2590+ batch .append ((task , run_dir , gamma_sign , atom_i , atom_j , self ._iteration ))
25792591 return batch
25802592
25812593 def _run_batch_parallel (
25822594 self ,
2583- batch : list [tuple [ExplorationTask , str , str , str , str , int ]],
2595+ batch : list [tuple [ExplorationTask , str , str , int , int , int ]],
25842596 history_log : str ,
25852597 priority_log : str ,
25862598 ) -> None :
@@ -2841,7 +2853,8 @@ def _run_initial_optimization(
28412853 energy = float (raw )
28422854 logger .info (
28432855 "Initial optimization energy "
2844- f"(optimizer.{ method_name } ()): { energy :.10f} Ha"
2856+ "(optimizer.%s()): %.10f Ha" ,
2857+ method_name , energy ,
28452858 )
28462859 break # success — stop trying methods
28472860 except Exception :
@@ -2857,7 +2870,8 @@ def _run_initial_optimization(
28572870 try :
28582871 energy = float (raw )
28592872 logger .info (
2860- f"Initial optimization energy (optimizer.{ attr } ): { energy :.10f} Ha"
2873+ "Initial optimization energy (optimizer.%s): %.10f Ha" ,
2874+ attr , energy ,
28612875 )
28622876 break
28632877 except (TypeError , ValueError ):
@@ -2874,13 +2888,13 @@ def _run_initial_optimization(
28742888 float_attrs = {k : v for k , v in vars (optimizer ).items ()
28752889 if isinstance (v , float )}
28762890 if float_attrs :
2877- logger .warning (f "Float attributes on optimizer: { float_attrs } " )
2891+ logger .warning ("Float attributes on optimizer: %s" , float_attrs )
28782892
28792893 return optimized_xyz_path , energy
28802894
28812895 except Exception as e :
28822896 logger .error (
2883- f "Initial optimization failed: { e } . Falling back to unoptimized geometry."
2897+ "Initial optimization failed: %s . Falling back to unoptimized geometry." , e
28842898 )
28852899 traceback .print_exc ()
28862900 return seed_xyz , None
@@ -3068,20 +3082,47 @@ def _run_autots(self, task: ExplorationTask, run_dir: str) -> list[str]:
30683082 # overhead (executor pool creation, queue initialisation, etc.).
30693083 # The return value travels back via a multiprocessing.Queue that
30703084 # is safe to use across spawn boundaries.
3085+ #
3086+ # IMPORTANT — deadlock prevention
3087+ # --------------------------------
3088+ # result_q.get() MUST be called *before* proc.join().
3089+ # If proc.join() is called first, and the worker's single put() blocks
3090+ # because the OS pipe buffer is full (e.g. a very large traceback
3091+ # string), the parent will wait forever for the child to exit while
3092+ # the child waits for the parent to consume the queue — deadlock.
3093+ # Consuming the item first guarantees the worker can finish and exit.
30713094 result_q : multiprocessing .Queue = self ._mp_ctx .Queue () # type: ignore[type-arg]
30723095 proc = self ._mp_ctx .Process (
30733096 target = _autots_worker_with_queue ,
30743097 args = (config , run_dir , workspace , result_q ),
30753098 )
30763099 proc .start ()
3077- proc .join ()
30783100
3079- if result_q .empty ():
3101+ # ── Step 1: blocking get — drains the single result before join ──
3102+ try :
3103+ tag , payload = result_q .get (timeout = None ) # block until worker puts
3104+ except Exception as exc :
3105+ # Queue.get() itself raised (e.g. interrupted); terminate cleanly.
3106+ proc .terminate ()
3107+ proc .join ()
30803108 raise RuntimeError (
3081- f"_run_autots: worker process exited with code { proc .exitcode } "
3082- "without returning a result (possibly killed by a signal)."
3083- )
3084- tag , payload = result_q .get_nowait ()
3109+ f"_run_autots: failed to receive result from worker: { exc } "
3110+ ) from exc
3111+
3112+ # ── Step 2: drain any unexpected residual items (get_nowait loop) ─
3113+ # The worker is designed to put exactly one item, but drain defensively
3114+ # so that no unconsumed data keeps the child's feeder thread alive and
3115+ # prevents clean exit.
3116+ import queue as _queue_mod
3117+ while True :
3118+ try :
3119+ result_q .get_nowait ()
3120+ except _queue_mod .Empty :
3121+ break
3122+
3123+ # ── Step 3: join after the queue is empty — never deadlocks ──────
3124+ proc .join ()
3125+
30853126 if tag == "err" :
30863127 # payload is a formatted traceback string (see _autots_worker_with_queue).
30873128 # Wrap in RuntimeError so it can be raised and caught normally.
@@ -3487,7 +3528,7 @@ def _enqueue_perturbations(self, node: EQNode, force_add: bool = False) -> None:
34873528 # Special case: when the graph contains only one node, all exclusion
34883529 # rules are disabled so that the sole available node is never silently
34893530 # skipped (the network cannot grow if the only node is excluded).
3490- n_total_nodes = len ( self .graph .all_nodes ())
3531+ n_total_nodes = self .graph .node_count
34913532 if n_total_nodes == 1 and node .node_id in self .excluded_node_ids :
34923533 logger .debug (
34933534 "_enqueue_perturbations: EQ%d is in excluded_node_ids but "
0 commit comments