33import json
44import os
55import pickle
6- from typing import Any
6+ from typing import Any , cast
77
88import aiosqlite
9+ from pydantic import BaseModel
910from uipath .core .errors import ErrorCategory , UiPathFaultedTriggerError
1011from uipath .runtime import (
1112 UiPathApiTrigger ,
@@ -35,6 +36,9 @@ async def setup(self) -> None:
3536
3637 try :
3738 async with aiosqlite .connect (self .storage_path ) as conn :
39+ # Enable WAL mode for better concurrent write handling
40+ await conn .execute ("PRAGMA journal_mode=WAL" )
41+
3842 # Table for workflow contexts
3943 await conn .execute ("""
4044 CREATE TABLE IF NOT EXISTS workflow_contexts (
@@ -47,26 +51,40 @@ async def setup(self) -> None:
4751 await conn .execute ("""
4852 CREATE TABLE IF NOT EXISTS resume_triggers (
4953 id INTEGER PRIMARY KEY AUTOINCREMENT,
54+ runtime_id TEXT NOT NULL,
5055 trigger_data TEXT NOT NULL,
5156 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
5257 )
5358 """ )
5459
60+ await conn .execute (
61+ """
62+ CREATE TABLE IF NOT EXISTS runtime_kv (
63+ runtime_id TEXT NOT NULL,
64+ namespace TEXT NOT NULL,
65+ key TEXT NOT NULL,
66+ value TEXT,
67+ timestamp DATETIME DEFAULT (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc')),
68+ PRIMARY KEY (runtime_id, namespace, key)
69+ )
70+ """
71+ )
72+
5573 await conn .commit ()
5674 except aiosqlite .Error as exc :
5775 msg = f"Failed to initialize SQLite storage at { self .storage_path !r} : { exc .sqlite_errorname } { exc .sqlite_errorcode } "
5876 raise UiPathFaultedTriggerError (ErrorCategory .SYSTEM , msg ) from exc
5977
60- async def save_trigger (self , trigger : UiPathResumeTrigger ) -> None :
78+ async def save_trigger (self , runtime_id : str , trigger : UiPathResumeTrigger ) -> None :
6179 """Save resume trigger to SQLite database."""
6280 trigger_dict = self ._serialize_trigger (trigger )
6381 trigger_json = json .dumps (trigger_dict )
6482
6583 try :
6684 async with aiosqlite .connect (self .storage_path ) as conn :
6785 await conn .execute (
68- "INSERT INTO resume_triggers (trigger_data) VALUES (?)" ,
69- (trigger_json , ),
86+ "INSERT INTO resume_triggers (runtime_id, trigger_data) VALUES (?, ?)" ,
87+ (runtime_id , trigger_json ),
7088 )
7189 await conn .commit ()
7290 except aiosqlite .Error as exc :
@@ -78,12 +96,13 @@ async def save_trigger(self, trigger: UiPathResumeTrigger) -> None:
7896 )
7997 raise UiPathFaultedTriggerError (ErrorCategory .SYSTEM , msg ) from exc
8098
81- async def get_latest_trigger (self ) -> UiPathResumeTrigger | None :
99+ async def get_latest_trigger (self , runtime_id : str ) -> UiPathResumeTrigger | None :
82100 """Get most recent trigger from SQLite database."""
83101 try :
84102 async with aiosqlite .connect (self .storage_path ) as conn :
85103 cursor = await conn .execute (
86- "SELECT trigger_data FROM resume_triggers ORDER BY created_at DESC LIMIT 1"
104+ "SELECT trigger_data FROM resume_triggers WHERE runtime_id = ? ORDER BY id DESC LIMIT 1" ,
105+ (runtime_id ,),
87106 )
88107 row = await cursor .fetchone ()
89108 except aiosqlite .Error as exc :
@@ -148,6 +167,58 @@ async def load_context(self, runtime_id: str) -> dict[str, Any] | None:
148167
149168 return pickle .loads (row [0 ])
150169
170+ async def set_value (
171+ self ,
172+ runtime_id : str ,
173+ namespace : str ,
174+ key : str ,
175+ value : Any ,
176+ ) -> None :
177+ """Save arbitrary key-value pair to database."""
178+ if not (
179+ isinstance (value , str )
180+ or isinstance (value , dict )
181+ or isinstance (value , BaseModel )
182+ or value is None
183+ ):
184+ raise TypeError ("Value must be str, dict, BaseModel or None." )
185+
186+ value_text = self ._dump_value (value )
187+
188+ async with aiosqlite .connect (self .storage_path ) as conn :
189+ await conn .execute (
190+ """
191+ INSERT INTO runtime_kv (runtime_id, namespace, key, value)
192+ VALUES (?, ?, ?, ?)
193+ ON CONFLICT(runtime_id, namespace, key)
194+ DO UPDATE SET
195+ value = excluded.value,
196+ timestamp = (strftime('%Y-%m-%d %H:%M:%S', 'now', 'utc'))
197+ """ ,
198+ (runtime_id , namespace , key , value_text ),
199+ )
200+ await conn .commit ()
201+
202+ async def get_value (self , runtime_id : str , namespace : str , key : str ) -> Any :
203+ """Get arbitrary key-value pair from database (scoped by runtime_id + namespace)."""
204+
205+ async with aiosqlite .connect (self .storage_path ) as conn :
206+ cur = await conn .execute (
207+ """
208+ SELECT value
209+ FROM runtime_kv
210+ WHERE runtime_id = ? AND namespace = ? AND key = ?
211+ LIMIT 1
212+ """ ,
213+ (runtime_id , namespace , key ),
214+ )
215+ row = await cur .fetchone ()
216+
217+ if not row :
218+ return None
219+
220+ return self ._load_value (cast (str | None , row [0 ]))
221+
151222 def _serialize_trigger (self , trigger : UiPathResumeTrigger ) -> dict [str , Any ]:
152223 """Serialize a resume trigger to a dictionary."""
153224 trigger_key = (
@@ -194,3 +265,21 @@ def _deserialize_trigger(self, trigger_data: dict[str, Any]) -> UiPathResumeTrig
194265 )
195266
196267 return resume_trigger
268+
269+ def _dump_value (self , value : str | dict [str , Any ] | BaseModel | None ) -> str | None :
270+ if value is None :
271+ return None
272+ if isinstance (value , BaseModel ):
273+ return "j:" + json .dumps (value .model_dump ())
274+ if isinstance (value , dict ):
275+ return "j:" + json .dumps (value )
276+ return "s:" + value
277+
278+ def _load_value (self , raw : str | None ) -> Any :
279+ if raw is None :
280+ return None
281+ if raw .startswith ("s:" ):
282+ return raw [2 :]
283+ if raw .startswith ("j:" ):
284+ return json .loads (raw [2 :])
285+ return raw
0 commit comments