@@ -27,6 +27,35 @@ def _apply_hardened_pragmas(conn: sqlite3.Connection) -> None:
2727 conn .execute ("PRAGMA temp_store=MEMORY" ) # Store temp tables in memory
2828
2929
30+ def _checkpoint_and_copy_database (
31+ source_path : Path , dest_path : Path , timeout : int = SQLITE_CONNECTION_TIMEOUT
32+ ) -> None :
33+ """
34+ Safely copy a SQLite database by checkpointing WAL first.
35+
36+ In WAL mode, data may exist in the -wal file that hasn't been written
37+ to the main database file. This function performs a TRUNCATE checkpoint
38+ to flush all WAL data to the main file before copying, ensuring a
39+ complete and consistent copy.
40+
41+ Args:
42+ source_path: Path to the source database file.
43+ dest_path: Path where the copy should be created.
44+ timeout: Connection timeout in seconds.
45+ """
46+ # First, checkpoint the WAL to ensure all data is in the main file
47+ conn = sqlite3 .connect (str (source_path ), timeout = timeout )
48+ try :
49+ # TRUNCATE mode: checkpoint and truncate the WAL file to zero bytes
50+ # This ensures all data is flushed to the main database file
51+ conn .execute ("PRAGMA wal_checkpoint(TRUNCATE)" )
52+ finally :
53+ conn .close ()
54+
55+ # Now safely copy just the main database file
56+ shutil .copyfile (str (source_path ), str (dest_path ))
57+
58+
3059class SQLResource (ForkableResource ):
3160 """
3261 A ForkableResource for managing SQL database states, primarily SQLite.
@@ -134,7 +163,8 @@ async def fork(self) -> "SQLResource":
134163 forked_db_name = f"fork_{ uuid .uuid4 ().hex } .sqlite"
135164 forked_resource ._db_path = self ._temp_dir / forked_db_name
136165
137- shutil .copyfile (str (self ._db_path ), str (forked_resource ._db_path ))
166+ # Use checkpoint-and-copy to ensure WAL data is flushed before copying
167+ _checkpoint_and_copy_database (self ._db_path , forked_resource ._db_path )
138168 return forked_resource
139169
140170 async def checkpoint (self ) -> Dict [str , Any ]:
@@ -148,7 +178,8 @@ async def checkpoint(self) -> Dict[str, Any]:
148178
149179 checkpoint_name = f"checkpoint_{ self ._db_path .stem } _{ uuid .uuid4 ().hex } .sqlite"
150180 checkpoint_path = self ._temp_dir / checkpoint_name
151- shutil .copyfile (str (self ._db_path ), str (checkpoint_path ))
181+ # Use checkpoint-and-copy to ensure WAL data is flushed before copying
182+ _checkpoint_and_copy_database (self ._db_path , checkpoint_path )
152183 return {"db_type" : "sqlite" , "checkpoint_path" : str (checkpoint_path )}
153184
154185 async def restore (self , state_data : Dict [str , Any ]) -> None :
@@ -170,7 +201,8 @@ async def restore(self, state_data: Dict[str, Any]) -> None:
170201 if not self ._db_path :
171202 self ._db_path = self ._temp_dir / f"restored_{ uuid .uuid4 ().hex } .sqlite"
172203
173- shutil .copyfile (str (checkpoint_path ), str (self ._db_path ))
204+ # Use checkpoint-and-copy to ensure WAL data is flushed before copying
205+ _checkpoint_and_copy_database (checkpoint_path , self ._db_path )
174206 self ._base_db_path = self ._db_path # The restored state becomes the new base for future forks
175207
176208 async def step (self , action_name : str , action_params : Dict [str , Any ]) -> Any :
0 commit comments