2222
2323import atexit
2424import io
25- import mmap
2625import os
2726import pickle
2827import sys
@@ -45,7 +44,6 @@ def __init__(self, dtype: type = np.complex64) -> None:
4544
4645 self ._read_ptr = 0
4746 self ._write_ptr = 0
48- self ._mmap_size = 0
4947
5048 self ._dtype_size = np .dtype (dtype ).itemsize
5149
@@ -58,13 +56,17 @@ def __init__(self, dtype: type = np.complex64) -> None:
5856
5957 self ._register_cleanup ()
6058
59+ def __del__ (self ) -> None :
60+ self ._cleanup ()
61+
6162 def __getitem__ (self , index : int | slice ) -> Any :
6263 with self ._rlock :
6364
6465 write_ptr = self ._write_ptr
6566
6667 if write_ptr == 0 :
6768 self ._not_empty .wait ()
69+ self ._not_empty .clear ()
6870 write_ptr = self ._write_ptr
6971
7072 size = write_ptr // self ._dtype_size
@@ -88,19 +90,21 @@ def __getitem__(self, index: int | slice) -> Any:
8890 raise TypeError ('index must be int or slice' )
8991
9092 def _cleanup (self ) -> None :
91- try :
92- with self ._wlock :
93- with self ._rlock :
94- filepath = self ._temp_file .name
95- self ._reader .close ()
96- self ._writer .close ()
97- self ._temp_file .close ()
93+ if self ._temp_file :
94+ filepath = self ._temp_file .name
9895
99- if os .path .exists (filepath ):
100- os .remove (filepath )
96+ try :
97+ with self ._wlock :
98+ with self ._rlock :
99+ self ._reader .close ()
100+ self ._writer .close ()
101+ self ._temp_file .close ()
101102
102- except Exception as er :
103- print (f'Exception during cleanup: { er } ' , file = sys .stderr )
103+ except Exception as er :
104+ print (f'Exception during cleanup: { er } ' , file = sys .stderr )
105+
106+ if os .path .exists (filepath ):
107+ os .remove (filepath )
104108
105109 def _register_cleanup (self ) -> None :
106110 atexit .register (self ._cleanup )
@@ -112,8 +116,8 @@ def append(self, data: np.ndarray, chunk_size: int = 131072) -> None:
112116 data = data .astype (self ._dtype , copy = False )
113117 chunk_elements = chunk_size // self ._dtype_size
114118
115- for i in range ( 0 , len ( data ), chunk_elements ) :
116- with self . _wlock :
119+ with self . _wlock :
120+ for i in range ( 0 , len ( data ), chunk_elements ) :
117121 data [i :i + chunk_elements ].tofile (self ._writer )
118122 self ._write_ptr = self ._writer .tell ()
119123 self ._not_empty .set ()
@@ -123,6 +127,7 @@ def get_all(self, use_memmap: bool = False) -> np.ndarray:
123127
124128 if self ._write_ptr == 0 :
125129 self ._not_empty .wait ()
130+ self ._not_empty .clear ()
126131
127132 if not use_memmap :
128133 self ._reader .seek (0 )
@@ -139,11 +144,13 @@ def get_new(self, wait: bool = False) -> np.ndarray:
139144
140145 if write_ptr == 0 :
141146 self ._not_empty .wait ()
147+ self ._not_empty .clear ()
142148 write_ptr = self ._write_ptr
143149
144150 while self ._read_ptr == write_ptr :
145151 if wait :
146152 self ._not_empty .wait ()
153+ self ._not_empty .clear ()
147154 write_ptr = self ._write_ptr
148155 else :
149156 return np .array ([], dtype = self ._dtype )
@@ -162,6 +169,7 @@ def get_chunk(self, num_elements: int, ring: bool = True) -> np.ndarray:
162169
163170 if write_ptr == 0 :
164171 self ._not_empty .wait ()
172+ self ._not_empty .clear ()
165173 write_ptr = self ._write_ptr
166174
167175 total_bytes = num_elements * self ._dtype_size
@@ -211,15 +219,15 @@ def clear(self) -> None:
211219 with self ._wlock :
212220 with self ._rlock :
213221 os .truncate (self ._temp_file .fileno (), 0 )
214- self ._write_ptr = 0
215- self ._read_ptr = 0
216222 self ._reader .seek (0 )
217223 self ._writer .seek (0 )
224+ self ._write_ptr = 0
225+ self ._read_ptr = 0
218226
219227
220- class MmapQueue :
228+ class FileQueue :
221229 '''
222- A memory-mapped queue for handling serialized Python objects, optimized for large-scale data storage and retrieval.
230+ A file based queue for handling serialized Python objects, optimized for large-scale data storage and retrieval.
223231 Utilizes file-based storage to minimize RAM usage and dynamically resizes with efficient memory management via an interval tree.
224232 Suitable in resource-constrained environments.
225233 '''
@@ -294,9 +302,9 @@ def balance_factor(self) -> int:
294302
295303 class IntervalTree :
296304 def __init__ (self ) -> None :
297- self .root : MmapQueue .IntervalTreeNode = None
305+ self .root : FileQueue .IntervalTreeNode = None
298306
299- def _insert_node (self , parent_node : 'MmapQueue .IntervalTreeNode' , child_node : 'MmapQueue .IntervalTreeNode' ) -> None :
307+ def _insert_node (self , parent_node : 'FileQueue .IntervalTreeNode' , child_node : 'FileQueue .IntervalTreeNode' ) -> None :
300308 if parent_node is None :
301309 self .root = child_node
302310 return
@@ -321,7 +329,7 @@ def _insert_node(self, parent_node: 'MmapQueue.IntervalTreeNode', child_node: 'M
321329 else :
322330 raise ValueError (f'Unexpected overlap: [{ child_node .start } , { child_node .end } ) intersects with [{ parent_node .start } , { parent_node .end } ).' )
323331
324- def _merge (self , target_node : 'MmapQueue .IntervalTreeNode' , source_node : 'MmapQueue .IntervalTreeNode' ) -> None :
332+ def _merge (self , target_node : 'FileQueue .IntervalTreeNode' , source_node : 'FileQueue .IntervalTreeNode' ) -> None :
325333 target_node .start = min (target_node .start , source_node .start )
326334 target_node .end = max (target_node .end , source_node .end )
327335
@@ -362,7 +370,7 @@ def _merge(self, target_node: 'MmapQueue.IntervalTreeNode', source_node: 'MmapQu
362370 else :
363371 break
364372
365- def _balance (self , node : 'MmapQueue .IntervalTreeNode' ) -> None :
373+ def _balance (self , node : 'FileQueue .IntervalTreeNode' ) -> None :
366374 self .root .update_height ()
367375
368376 while True :
@@ -381,7 +389,7 @@ def _balance(self, node: 'MmapQueue.IntervalTreeNode') -> None:
381389 return
382390 node = node .parent_node
383391
384- def _rotate_left (self , node : 'MmapQueue .IntervalTreeNode' ) -> 'MmapQueue .IntervalTreeNode' :
392+ def _rotate_left (self , node : 'FileQueue .IntervalTreeNode' ) -> 'FileQueue .IntervalTreeNode' :
385393 if node .right_node is None :
386394 return node
387395
@@ -401,7 +409,7 @@ def _rotate_left(self, node: 'MmapQueue.IntervalTreeNode') -> 'MmapQueue.Interva
401409
402410 return new_root
403411
404- def _rotate_right (self , node : 'MmapQueue .IntervalTreeNode' ) -> 'MmapQueue .IntervalTreeNode' :
412+ def _rotate_right (self , node : 'FileQueue .IntervalTreeNode' ) -> 'FileQueue .IntervalTreeNode' :
405413 if node .left_node is None :
406414 return node
407415 new_root = node .left_node
@@ -421,7 +429,7 @@ def _rotate_right(self, node: 'MmapQueue.IntervalTreeNode') -> 'MmapQueue.Interv
421429 return new_root
422430
423431 def insert (self , start : int , end : int ) -> None :
424- new_node = MmapQueue .IntervalTreeNode (start , end )
432+ new_node = FileQueue .IntervalTreeNode (start , end )
425433
426434 self ._insert_node (self .root , new_node )
427435 if self .root is not None :
@@ -430,7 +438,7 @@ def insert(self, start: int, end: int) -> None:
430438 self .root .update_min_start ()
431439 self .root .update_max_end ()
432440
433- def search (self , length : int ) -> 'MmapQueue .IntervalTreeNode | None' :
441+ def search (self , length : int ) -> 'FileQueue .IntervalTreeNode | None' :
434442 if self .root is None or self .root .max_length < length :
435443 return None
436444
@@ -446,7 +454,7 @@ def search(self, length: int) -> 'MmapQueue.IntervalTreeNode | None':
446454 else :
447455 return None
448456
449- def delete (self , node : 'MmapQueue .IntervalTreeNode' ) -> None :
457+ def delete (self , node : 'FileQueue .IntervalTreeNode' ) -> None :
450458
451459 if node .parent_node is not None :
452460 if node .parent_node .left_node == node :
@@ -476,7 +484,7 @@ def delete(self, node: 'MmapQueue.IntervalTreeNode') -> None:
476484 self .root .update_min_start ()
477485 self .root .update_max_end ()
478486
479- def change_node_range (self , node : 'MmapQueue .IntervalTreeNode' , new_start : int , new_end : int ) -> None :
487+ def change_node_range (self , node : 'FileQueue .IntervalTreeNode' , new_start : int , new_end : int ) -> None :
480488 if new_start >= node .start :
481489 node .start = new_start
482490 else :
@@ -501,12 +509,13 @@ def __init__(self, initial_size: int = int(1e9)) -> None:
501509 self ._temp_file .write (b'\x00 ' * self ._file_size )
502510 self ._temp_file .flush ()
503511
504- self ._mmap = mmap .mmap (self ._temp_file .fileno (), self ._file_size )
512+ self ._writer = io .FileIO (self ._temp_file .name , mode = 'w' )
513+ self ._reader = io .FileIO (self ._temp_file .name , mode = 'r' )
505514
506- self ._lock = RLock ()
507515 self ._not_empty = Event ()
516+ self ._lock = RLock ()
508517
509- self ._tree = MmapQueue .IntervalTree ()
518+ self ._tree = FileQueue .IntervalTree ()
510519 self ._tree .insert (0 , self ._file_size )
511520 self ._queue = Queue ()
512521
@@ -516,17 +525,20 @@ def __del__(self) -> None:
516525 self ._cleanup ()
517526
518527 def _cleanup (self ) -> None :
519- try :
520- with self ._lock :
521- filepath = self ._temp_file .name
522- self ._mmap .close ()
523- self ._temp_file .close ()
528+ if self ._temp_file :
529+ filepath = self ._temp_file .name
524530
525- if os .path .exists (filepath ):
526- os .remove (filepath )
531+ try :
532+ with self ._lock :
533+ self ._reader .close ()
534+ self ._writer .close ()
535+ self ._temp_file .close ()
527536
528- except Exception as er :
529- print (f'Exception during cleanup: { er } ' , file = sys .stderr )
537+ except Exception as er :
538+ print (f'Exception during cleanup: { er } ' , file = sys .stderr )
539+
540+ if os .path .exists (filepath ):
541+ os .remove (filepath )
530542
531543 def _register_cleanup (self ) -> None :
532544 atexit .register (self ._cleanup )
@@ -538,56 +550,61 @@ def _resize(self, additional_size: int) -> None:
538550 self ._temp_file .write (b'\x00 ' * additional_size )
539551 self ._temp_file .flush ()
540552
541- self ._tree .insert (self ._file_size , new_size )
542-
543- self ._mmap .close ()
544- self ._mmap = mmap .mmap (self ._temp_file .fileno (), new_size )
545- self ._file_size = new_size
553+ with self ._lock :
554+ self ._tree .insert (self ._file_size , new_size )
555+ self ._file_size = new_size
546556
547557 def size (self ) -> int :
548- with self ._lock :
549- return self ._queue .qsize ()
558+ return self ._queue .qsize ()
550559
551560 def clear (self ) -> None :
552561 with self ._lock :
553- self ._queue .queue .clear ()
554- self ._tree = MmapQueue .IntervalTree ()
562+ self ._tree = FileQueue .IntervalTree ()
555563 self ._tree .insert (0 , self ._file_size )
564+ self ._queue .queue .clear ()
556565
557566 def empty (self ) -> bool :
558- with self ._lock :
559- return self ._queue .empty ()
567+ return self ._queue .empty ()
560568
561569 def put (self , data : Any ) -> None :
562- with self ._lock :
563- data_bytes = pickle .dumps (data )
564- data_len_bytes = len (data_bytes )
570+ data_bytes = pickle .dumps (data )
571+ data_len_bytes = len (data_bytes )
572+ start_ptr = 0
573+ end_ptr = 0
565574
575+ with self ._lock :
566576 chunk_node = self ._tree .search (data_len_bytes )
567577 if chunk_node is None :
568578 self ._resize (data_len_bytes )
569579 chunk_node = self ._tree .search (data_len_bytes )
570580
581+ start_ptr = chunk_node .start
571582 if chunk_node .length == data_len_bytes :
572- self ._mmap [chunk_node .start :chunk_node .end ] = data_bytes
573- self ._queue .put_nowait ((chunk_node .start , chunk_node .end ))
583+ end_ptr = chunk_node .end
574584 self ._tree .delete (chunk_node )
575585 else :
576- self ._mmap [chunk_node .start : chunk_node .start + data_len_bytes ] = data_bytes
577- self ._queue .put_nowait ((chunk_node .start , chunk_node .start + data_len_bytes ))
586+ end_ptr = chunk_node .start + data_len_bytes
578587 self ._tree .change_node_range (chunk_node , chunk_node .start + data_len_bytes , chunk_node .end )
579588
580- self ._not_empty .set ()
589+ self ._writer .seek (start_ptr )
590+ self ._writer .write (data_bytes )
591+ self ._queue .put_nowait ((start_ptr , end_ptr ))
592+
593+ self ._not_empty .set ()
581594
582595 def get (self , wait : bool = False ) -> Any :
583- with self ._lock :
584- while self ._queue .empty ():
585- if wait :
586- self ._not_empty .wait ()
587- else :
588- return None
596+ while self ._queue .empty ():
597+ if wait :
598+ self ._not_empty .wait ()
599+ self ._not_empty .clear ()
600+ else :
601+ return None
602+
603+ start , end = self ._queue .get_nowait ()
604+ self ._reader .seek (start )
605+ result = pickle .loads (self ._reader .read (end - start )) # noqa: S301
589606
590- start , end = self ._queue .get_nowait ()
591- result = pickle .loads (self ._mmap [start :end ]) # noqa: S301
607+ with self ._lock :
592608 self ._tree .insert (start , end )
593- return result
609+
610+ return result
0 commit comments