|
| 1 | +import csv |
| 2 | +import os |
| 3 | +from argparse import ArgumentParser |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +import psycopg |
| 7 | +from dotenv import load_dotenv |
| 8 | + |
| 9 | +load_dotenv() |
| 10 | + |
| 11 | +CHAINLIT_DB_URI = f"postgresql://{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@postgres:5432/{os.getenv('POSTGRES_CHAINLIT_DB')}?sslmode=disable" |
| 12 | + |
| 13 | + |
| 14 | +def build_query(since_timestamp: str | None) -> str: |
| 15 | + if since_timestamp is None: |
| 16 | + since_timestamp = "" |
| 17 | + query = f""" |
| 18 | + SELECT "threadId", "createdAt", name, type, output |
| 19 | + FROM steps |
| 20 | + WHERE |
| 21 | + type IN ('user_message', 'assistant_message') AND |
| 22 | + "createdAt" > '{since_timestamp}' |
| 23 | + ORDER BY ( |
| 24 | + SELECT MIN("createdAt") |
| 25 | + FROM steps s |
| 26 | + WHERE s."threadId" = steps."threadId" |
| 27 | + ), "createdAt"; |
| 28 | + """ |
| 29 | + return query |
| 30 | + |
| 31 | + |
| 32 | +def last_record_timestamp(records_dir: Path) -> str | None: |
| 33 | + record_names: list[str] = list(f.stem for f in records_dir.glob("records_*.csv")) |
| 34 | + if len(record_names) > 0: |
| 35 | + last_record: str = max(record_names) |
| 36 | + return last_record[len("records_") :] |
| 37 | + else: |
| 38 | + return None |
| 39 | + |
| 40 | + |
| 41 | +def main(records_dir: Path): |
| 42 | + records_dir.mkdir(exist_ok=True) |
| 43 | + |
| 44 | + since_timestamp: str | None = last_record_timestamp(records_dir) |
| 45 | + query: str = build_query(since_timestamp) |
| 46 | + |
| 47 | + with psycopg.connect(CHAINLIT_DB_URI) as conn: |
| 48 | + with conn.cursor() as cur: |
| 49 | + cur.execute(query) |
| 50 | + records = cur.fetchall() |
| 51 | + |
| 52 | + latest_timestamp: str = max(row[1] for row in records) |
| 53 | + |
| 54 | + record_file = records_dir / f"records_{latest_timestamp}.csv" |
| 55 | + |
| 56 | + with open(record_file, mode="w", newline="") as file: |
| 57 | + writer = csv.writer(file) |
| 58 | + writer.writerow(["threadId", "createdAt", "name", "type", "output"]) |
| 59 | + writer.writerows(records) |
| 60 | + |
| 61 | + |
| 62 | +if __name__ == "__main__": |
| 63 | + parser = ArgumentParser() |
| 64 | + parser.add_argument("records_dir", type=Path, default=Path("records")) |
| 65 | + args = parser.parse_args() |
| 66 | + main(**vars(args)) |
0 commit comments