-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcurator_cli.py
More file actions
539 lines (467 loc) · 31.5 KB
/
curator_cli.py
File metadata and controls
539 lines (467 loc) · 31.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
# --- File: ./curator_cli.py ---
import argparse
import sys
import json
from pathlib import Path
import hashlib
import datetime
import os
import fnmatch
from lxml import etree as ET
from email.utils import parsedate_to_datetime
from typing import Union
import duckdb
import subprocess
import requests
# Add the project directory to the Python path
project_dir = Path(__file__).resolve().parent
sys.path.append(str(project_dir))
from project.config import DOCUMENTS_DIR
import curator_pipeline_v2 as pipeline
# --- DuckDB Configuration ---
DUCKDB_FILE = project_dir / "curator_workspace.duckdb"
def get_db_conn():
"""Gets a direct database connection to the DuckDB file."""
try:
conn = duckdb.connect(database=str(DUCKDB_FILE))
conn.execute("INSTALL fts;")
conn.execute("LOAD fts;")
return conn
except Exception as e:
print(f"[FATAL] Could not connect to DuckDB: {e}"); sys.exit(1)
def setup_duckdb_schema():
"""Creates the Redleaf schema in the DuckDB database."""
print(f"--- Setting up or verifying DuckDB schema at {DUCKDB_FILE} ---")
conn = get_db_conn()
# Base tables
conn.execute("""
SET TimeZone='UTC';
CREATE TABLE IF NOT EXISTS documents (
id BIGINT PRIMARY KEY,
relative_path VARCHAR NOT NULL UNIQUE,
file_hash VARCHAR NOT NULL,
file_type VARCHAR NOT NULL,
status VARCHAR NOT NULL,
status_message VARCHAR,
added_at TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp,
processed_at TIMESTAMP WITH TIME ZONE,
file_modified_at TIMESTAMP WITH TIME ZONE,
color VARCHAR,
page_count INTEGER,
file_size_bytes BIGINT,
duration_seconds INTEGER,
linked_audio_path VARCHAR,
linked_video_path VARCHAR,
linked_audio_url VARCHAR,
last_audio_position DOUBLE DEFAULT 0.0,
audio_offset_seconds DOUBLE DEFAULT 0.0,
last_pdf_zoom DOUBLE,
last_pdf_page INTEGER,
cached_comment_count INTEGER DEFAULT 0,
cached_tag_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS document_metadata (doc_id BIGINT PRIMARY KEY, csl_json VARCHAR, last_updated TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp, updated_by_user_id BIGINT, FOREIGN KEY (doc_id) REFERENCES documents(id));
CREATE TABLE IF NOT EXISTS email_metadata ( doc_id BIGINT PRIMARY KEY, from_address VARCHAR, to_addresses VARCHAR, cc_addresses VARCHAR, subject VARCHAR, sent_at TIMESTAMP WITH TIME ZONE, FOREIGN KEY (doc_id) REFERENCES documents(id) );
CREATE TABLE IF NOT EXISTS catalogs (id BIGINT PRIMARY KEY, name VARCHAR NOT NULL UNIQUE, description VARCHAR, catalog_type VARCHAR NOT NULL DEFAULT 'user', created_at TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp);
CREATE TABLE IF NOT EXISTS document_catalogs (doc_id BIGINT NOT NULL, catalog_id BIGINT NOT NULL, added_at TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp, PRIMARY KEY (doc_id, catalog_id), FOREIGN KEY (doc_id) REFERENCES documents(id), FOREIGN KEY (catalog_id) REFERENCES catalogs(id));
CREATE TABLE IF NOT EXISTS entities (id BIGINT PRIMARY KEY, text VARCHAR NOT NULL, label VARCHAR NOT NULL, UNIQUE(text, label));
CREATE TABLE IF NOT EXISTS entity_appearances (doc_id BIGINT NOT NULL, entity_id BIGINT NOT NULL, page_number INTEGER NOT NULL, PRIMARY KEY (doc_id, entity_id, page_number), FOREIGN KEY (doc_id) REFERENCES documents(id), FOREIGN KEY (entity_id) REFERENCES entities(id));
CREATE TABLE IF NOT EXISTS entity_relationships (id BIGINT PRIMARY KEY, subject_entity_id BIGINT NOT NULL, object_entity_id BIGINT NOT NULL, relationship_phrase VARCHAR NOT NULL, doc_id BIGINT NOT NULL, page_number INTEGER NOT NULL, FOREIGN KEY (subject_entity_id) REFERENCES entities(id), FOREIGN KEY (object_entity_id) REFERENCES entities(id), FOREIGN KEY (doc_id) REFERENCES documents(id));
CREATE TABLE IF NOT EXISTS browse_cache (entity_id BIGINT PRIMARY KEY, entity_text VARCHAR NOT NULL, entity_label VARCHAR NOT NULL, document_count BIGINT NOT NULL, appearance_count BIGINT NOT NULL, FOREIGN KEY (entity_id) REFERENCES entities(id));
CREATE TABLE IF NOT EXISTS embedding_chunks ( id BIGINT PRIMARY KEY, doc_id BIGINT NOT NULL, page_number INTEGER NOT NULL, chunk_text VARCHAR NOT NULL, embedding BLOB NOT NULL, FOREIGN KEY (doc_id) REFERENCES documents(id) );
CREATE TABLE IF NOT EXISTS super_embedding_chunks ( id BIGINT PRIMARY KEY, doc_id BIGINT NOT NULL, page_number INTEGER NOT NULL, entity_id BIGINT NOT NULL, chunk_text VARCHAR NOT NULL, embedding BLOB NOT NULL, FOREIGN KEY (doc_id) REFERENCES documents(id), FOREIGN KEY (entity_id) REFERENCES entities(id) );
CREATE TABLE IF NOT EXISTS srt_cues (id BIGINT PRIMARY KEY, doc_id BIGINT NOT NULL, sequence INTEGER NOT NULL, timestamp VARCHAR NOT NULL, dialogue VARCHAR NOT NULL, UNIQUE(doc_id, sequence));
""")
# Staging tables for the new pipeline
pipeline.create_staging_tables(conn)
print("Creating Full-Text Search index (on final table)...");
conn.execute("CREATE TABLE IF NOT EXISTS content_index (doc_id BIGINT, page_number INTEGER, page_content VARCHAR);")
conn.execute("PRAGMA create_fts_index('content_index', 'doc_id', 'page_number', 'page_content', overwrite=true);")
print("Creating sequences...");
conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_documents_id START 1;");
conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_catalogs_id START 1;");
conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_entities_id START 1;");
conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_entity_relationships_id START 1;");
conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_embedding_chunks_id START 1;")
conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_super_embedding_chunks_id START 1;")
conn.execute("CREATE SEQUENCE IF NOT EXISTS seq_srt_cues_id START 1;")
print("--- Schema setup complete. ---")
conn.close()
def _gather_files_recursively(base_dir: Path, target_dir: Path, supported_patterns: list, virtual_prefix: str = "") -> list:
"""Helper to gather files, mapping them to a virtual prefix if using an .rlink alias."""
files = []
try:
for root, dirs, filenames in os.walk(target_dir):
for filename in filenames:
if any(fnmatch.fnmatch(filename.lower(), p.lower()) for p in supported_patterns):
file_path = Path(root) / filename
try:
if virtual_prefix:
rel_path_str = f"{virtual_prefix}/{file_path.relative_to(target_dir).as_posix()}"
else:
rel_path_str = file_path.relative_to(base_dir).as_posix()
files.append((file_path, rel_path_str))
except ValueError:
continue
except Exception as e:
print(f"[WARN] Error scanning directory {target_dir}: {e}")
return files
def discover_documents():
print(f"--- Scanning for documents in {DOCUMENTS_DIR} ---")
db = get_db_conn()
try:
# DuckDB returns a list of tuples
db_docs = db.execute("SELECT id, relative_path, file_hash, status FROM documents").fetchall()
db_files = {row[1]: row[2] for row in db_docs}
db_statuses = {row[1]: row[3] for row in db_docs}
db_path_to_id = {row[1]: row[0] for row in db_docs}
# Case-insensitive lookup map
db_files_lower = {k.lower(): k for k in db_files.keys()}
except duckdb.CatalogException:
db_files = {}
db_statuses = {}
db_path_to_id = {}
db_files_lower = {}
registered_count = 0
restored_count = 0
supported_patterns = ["*.pdf", "*.txt", "*.html", "*.srt", "*.eml"]
# Scan main directory
all_files_with_virtual_paths = _gather_files_recursively(DOCUMENTS_DIR, DOCUMENTS_DIR, supported_patterns)
# Scan .rlink external directories
for rlink_file in DOCUMENTS_DIR.glob('*.rlink'):
if rlink_file.is_file():
try:
target_path_str = rlink_file.read_text(encoding='utf-8').strip()
target_dir = Path(target_path_str).resolve()
if target_dir.exists() and target_dir.is_dir():
print(f" [INFO] Following alias '{rlink_file.name}' to: {target_dir}")
all_files_with_virtual_paths.extend(
_gather_files_recursively(DOCUMENTS_DIR, target_dir, supported_patterns, virtual_prefix=rlink_file.name)
)
else:
print(f" [WARN] Alias target does not exist or is not a directory: {target_dir}")
except Exception as e:
print(f" [WARN] Failed to read alias file {rlink_file}: {e}")
found_paths_exact = set()
for file_path, rel_path_str in all_files_with_virtual_paths:
lower_rel = rel_path_str.lower()
if lower_rel in db_files_lower:
rel_path_str = db_files_lower[lower_rel]
found_paths_exact.add(rel_path_str)
stats = file_path.stat()
file_type = file_path.suffix[1:].upper()
hasher = hashlib.md5()
with open(file_path, 'rb') as f:
while chunk := f.read(65536): hasher.update(chunk)
current_hash_str = hasher.hexdigest()
if rel_path_str not in db_files or db_files.get(rel_path_str) != current_hash_str:
print(f"Registering new/modified file: {rel_path_str}")
db.execute(
"INSERT INTO documents (id, relative_path, file_hash, file_type, status, status_message, file_size_bytes, file_modified_at) "
"VALUES (nextval('seq_documents_id'), ?, ?, ?, 'New', 'Ready for processing', ?, ?) "
"ON CONFLICT(relative_path) DO UPDATE SET file_hash=excluded.file_hash, status='New', status_message='File modified, ready for re-processing', file_size_bytes=excluded.file_size_bytes, file_modified_at=excluded.file_modified_at",
(rel_path_str, current_hash_str, file_type, stats.st_size, datetime.datetime.fromtimestamp(stats.st_mtime))
)
registered_count += 1
else:
if db_statuses.get(rel_path_str) == 'Missing':
print(f"Restoring previously missing file: {rel_path_str}")
db.execute("UPDATE documents SET status = 'Indexed', status_message = 'Restored from Recycle Bin' WHERE relative_path = ?", (rel_path_str,))
restored_count += 1
# Soft delete missing files to the Recycle Bin
missing_paths = set(db_files.keys()) - found_paths_exact
missing_count = 0
if missing_paths:
db.execute("BEGIN TRANSACTION;")
for path in missing_paths:
if db_statuses.get(path) != 'Missing':
doc_id = db_path_to_id[path]
print(f" Moving document to Recycle Bin: {path}")
db.execute("UPDATE documents SET status = 'Missing', status_message = 'File removed from directory or alias disconnected. View in Settings > Recycle Bin.' WHERE id = ?", (doc_id,))
missing_count += 1
db.execute("COMMIT;")
db.close()
print(f"\n--- Discovery complete. Registered {registered_count}. Restored {restored_count}. Trashed {missing_count}. ---")
def reset_document_status(status_filter: str = None, file_type_filter: str = None):
print("\n--- Resetting Document Status to 'New' ---")
where_clauses, params = [], []
if status_filter:
statuses = [s.strip().capitalize() for s in status_filter.split(',')]; where_clauses.append(f"status IN ({','.join(['?']*len(statuses))})"); params.extend(statuses); print(f"Targeting statuses: {', '.join(statuses)}")
if file_type_filter:
file_types = [t.strip().upper() for t in file_type_filter.split(',')]; where_clauses.append(f"file_type IN ({','.join(['?']*len(file_types))})"); params.extend(file_types); print(f"Targeting file types: {', '.join(file_types)}")
if not where_clauses:
if input("[WARNING] No filters specified. Reset ALL documents to 'New'? [y/N]: ").lower() != 'y': print("Operation cancelled."); return
db = get_db_conn()
try:
sql = "UPDATE documents SET status = 'New', status_message = 'Reset by user'" + (" WHERE " + " AND ".join(where_clauses) if where_clauses else "")
result = db.execute(sql, params); print(f"\n[SUCCESS] Reset {result.fetchone()[0]} document(s) to 'New' status.")
except Exception as e: print(f"\n[FAIL] An error occurred: {e}")
finally: db.close()
# ===================================================================
# --- UNIFIED MEDIA LINKING FUNCTIONS ---
# ===================================================================
def _fetch_archive_files(archive_id: str) -> Union[dict, None]:
metadata_url = f"https://archive.org/metadata/{archive_id}"
print(f"[INFO] Fetching file manifest from {metadata_url} ...")
try:
response = requests.get(metadata_url, timeout=15)
response.raise_for_status()
data = response.json()
if not data or 'files' not in data:
print("[FAIL] No 'files' key found in Archive.org metadata response.")
return None
file_map = {
file_info['name']: f"https://archive.org/download/{archive_id}/{file_info['name']}"
for file_info in data['files']
if 'name' in file_info
}
print(f"[OK] Found {len(file_map)} files in the '{archive_id}' archive.")
return file_map
except requests.exceptions.RequestException as e:
print(f"[FAIL] Could not fetch data from Archive.org: {e}")
return None
except json.JSONDecodeError:
print("[FAIL] Could not parse the JSON response from Archive.org.")
return None
def _parse_podcast_xml_to_csl(item_element: ET.Element, channel_title_text: str = None, channel_author_text: str = None) -> tuple[dict, Union[str, None]]:
csl_data = {}
if item_element is None: return None, None
namespaces = {'itunes': 'http://www.itunes.com/dtds/podcast-1.0.dtd'}
title_element = item_element.find('itunes:title', namespaces) or item_element.find('title')
if title_element is not None and title_element.text: csl_data['title'] = title_element.text.strip()
if channel_title_text: csl_data['container-title'] = channel_title_text.strip()
author_tag = item_element.find('itunes:author', namespaces)
author_text = (author_tag.text.strip() if author_tag is not None and author_tag.text else channel_author_text.strip() if channel_author_text else None)
if author_text: csl_data['author'] = [{'literal': author_text}]
pub_date_tag = item_element.find('pubDate')
if pub_date_tag is not None and pub_date_tag.text:
try:
dt = parsedate_to_datetime(pub_date_tag.text)
csl_data['issued'] = {'date-parts': [[dt.year, dt.month, dt.day]]}
except (ValueError, TypeError): pass
link_tag = item_element.find('link')
if link_tag is not None and link_tag.text: csl_data['URL'] = link_tag.text.strip()
enclosure_url = None
enclosure_tag = item_element.find('enclosure')
if enclosure_tag is not None and enclosure_tag.get('url'): enclosure_url = enclosure_tag.get('url').strip()
csl_data['type'] = 'interview'
return csl_data, enclosure_url
def find_xml_matches_for_doc(doc_relative_path: str):
target_srt_basename = Path(doc_relative_path).stem
matches = []
all_xmls = _gather_files_recursively(DOCUMENTS_DIR, DOCUMENTS_DIR, ['*.xml'])
for rlink_file in DOCUMENTS_DIR.glob('*.rlink'):
if rlink_file.is_file():
try:
target_dir = Path(rlink_file.read_text(encoding='utf-8').strip())
if target_dir.exists() and target_dir.is_dir():
all_xmls.extend(_gather_files_recursively(DOCUMENTS_DIR, target_dir, ['*.xml'], virtual_prefix=rlink_file.name))
except Exception:
pass
parser = ET.XMLParser(recover=True)
for xml_path, xml_virtual_path in all_xmls:
try:
tree = ET.parse(str(xml_path), parser=parser)
channel_title = tree.findtext('channel/title', "")
namespaces = {'itunes': 'http://www.itunes.com/dtds/podcast-1.0.dtd'}
channel_author = tree.findtext('channel/itunes:author', "", namespaces=namespaces)
for item in tree.findall('channel/item'):
is_match, enclosure_url = False, None
enclosure_tag = item.find('enclosure')
if enclosure_tag is not None and enclosure_tag.get('url'):
enclosure_url = enclosure_tag.get('url')
url_filename = enclosure_url.split('/')[-1].split('?')[0]
if Path(url_filename).stem == target_srt_basename: is_match = True
if not is_match:
title_element = item.find('itunes:title', namespaces) or item.find('title')
if title_element is not None and title_element.text and target_srt_basename in title_element.text: is_match = True
if is_match:
preview_data, _ = _parse_podcast_xml_to_csl(item, channel_title, channel_author)
if not preview_data: continue
guid_element = item.find('guid')
hash_content = (guid_element.text if guid_element is not None and guid_element.text else preview_data.get('title', ''))
item_hash = hashlib.md5(hash_content.encode()).hexdigest()
matches.append({'xml_path': xml_virtual_path,'item_hash': item_hash,'csl_data': preview_data, 'enclosure_url': enclosure_url,'confidence': 'high' if is_match and enclosure_url and Path(enclosure_url.split('/')[-1].split('?')[0]).stem == target_srt_basename else 'low'})
except ET.XMLSyntaxError: continue
return matches
def link_podcast_metadata():
db = get_db_conn()
try:
docs_to_process = db.execute("""
SELECT d.id, d.relative_path
FROM documents d
LEFT JOIN document_metadata dm ON d.id = dm.doc_id
WHERE d.file_type = 'SRT'
AND dm.csl_json IS NULL
AND d.linked_audio_path IS NULL
AND d.linked_video_path IS NULL
ORDER BY d.relative_path
""").fetchall()
if not docs_to_process:
print("\n--- No unprocessed SRT documents found that need metadata. Nothing to do. ---")
return
print(f"\n--- Found {len(docs_to_process)} SRTs to process for podcast metadata linking. ---")
counts = {'success': 0, 'no_match': 0, 'ambiguous': 0, 'no_enclosure': 0}
for row in docs_to_process:
doc_id = row[0]
relative_path = row[1]
print(f"\n[INFO] Processing: {relative_path} (Doc ID: {doc_id})")
matches = find_xml_matches_for_doc(relative_path)
if not matches:
counts['no_match'] += 1; print(" [FAIL] No potential XML match found."); continue
high_confidence = [m for m in matches if m['confidence'] == 'high']
best_match = None
if len(high_confidence) == 1:
best_match = high_confidence[0]; print(" [OK] Found one high-confidence XML match.")
elif len(high_confidence) > 1:
counts['ambiguous'] += 1; print(f" [SKIP] Ambiguous: Found {len(high_confidence)} high-confidence matches."); continue
elif len(matches) == 1:
best_match = matches[0]; print(" [OK] Found one low-confidence (title-based) XML match.")
else:
counts['ambiguous'] += 1; print(f" [SKIP] Ambiguous: Found {len(matches)} low-confidence matches."); continue
if not best_match: continue
csl_data, enclosure_url = best_match.get('csl_data'), best_match.get('enclosure_url')
if not csl_data: print(" [FAIL] Matched item could not be parsed into CSL data."); continue
if not enclosure_url:
counts['no_enclosure'] += 1; print(" [SKIP] Matched XML item has no <enclosure> media URL. Saving metadata only.")
csl_data['id'] = f"doc-{doc_id}"; csl_json_text = json.dumps(csl_data, indent=2); podcast_name = csl_data.get('container-title', '').strip()
db.execute("BEGIN TRANSACTION;")
db.execute("INSERT INTO document_metadata (doc_id, csl_json) VALUES (?, ?) ON CONFLICT(doc_id) DO UPDATE SET csl_json=excluded.csl_json;", (doc_id, csl_json_text))
if enclosure_url:
db.execute("UPDATE documents SET linked_audio_url = ? WHERE id = ?", (enclosure_url, doc_id))
if podcast_name:
catalog_row = db.execute("SELECT id FROM catalogs WHERE name = ? AND catalog_type = 'podcast'", (podcast_name,)).fetchone()
if catalog_row:
catalog_id = catalog_row[0]
else:
catalog_id = db.execute("INSERT INTO catalogs (id, name, description, catalog_type) VALUES (nextval('seq_catalogs_id'), ?, ?, 'podcast') RETURNING id", (podcast_name, f"Automatically generated for '{podcast_name}' podcast.")).fetchone()[0]
db.execute("INSERT INTO document_catalogs (doc_id, catalog_id) VALUES (?, ?) ON CONFLICT DO NOTHING", (doc_id, catalog_id))
db.execute("COMMIT;")
counts['success'] += 1
print(f" [SUCCESS] Linked metadata and media from '{best_match['xml_path']}'.")
print("\n--- Podcast Metadata Linking Summary ---")
print(f" Successfully linked: {counts['success']}")
print(f" Skipped (ambiguous): {counts['ambiguous']}")
print(f" No XML match found: {counts['no_match']}")
print(f" XML match had no audio: {counts['no_enclosure']}")
print("----------------------------------------")
except Exception as e: print(f"\n[FAIL] A database error occurred: {e}")
finally: db.close()
def link_archive_org(archive_id: str):
archive_file_map = _fetch_archive_files(archive_id)
if not archive_file_map: print("\n--- Operation cancelled due to failure fetching archive data. ---"); return
db = get_db_conn()
try:
docs_to_process = db.execute("SELECT id, relative_path FROM documents WHERE file_type = 'SRT' ORDER BY relative_path").fetchall()
if not docs_to_process: print("\n--- No SRT documents found. ---"); return
print(f"\n--- Found {len(docs_to_process)} total SRTs to check. ---")
if input(f"This will attempt to link them to '{archive_id}' and OVERWRITE existing media links.\nAre you sure? [y/N]: ").lower() != 'y': print("Operation cancelled."); return
counts = {'success': 0, 'no_xml_match': 0, 'no_enclosure': 0, 'not_in_archive': 0, 'skipped': 0}
for row in docs_to_process:
doc_id = row[0]
relative_path = row[1]
print(f"\n[INFO] Processing: {relative_path} (Doc ID: {doc_id})")
matches = find_xml_matches_for_doc(relative_path)
if not matches: counts['no_xml_match'] += 1; print(" [SKIP] No XML match found."); continue
high_confidence = [m for m in matches if m['confidence'] == 'high']
best_match = None
if len(high_confidence) == 1: best_match = high_confidence[0]; print(" [OK] Found one high-confidence match.")
elif len(high_confidence) > 1: counts['skipped'] += 1; print(f" [SKIP] Ambiguous: Found {len(high_confidence)} high-confidence matches."); continue
elif len(matches) == 1: best_match = matches[0]; print(" [OK] Found one low-confidence match.")
else: counts['skipped'] += 1; print(f" [SKIP] Ambiguous: Found {len(matches)} low-confidence matches."); continue
if not best_match: continue
enclosure_url = best_match.get('enclosure_url')
if not enclosure_url: counts['no_enclosure'] += 1; print(" [SKIP] Matched XML has no <enclosure> URL."); continue
original_filename = enclosure_url.split('/')[-1].split('?')[0]
if original_filename in archive_file_map:
archive_url = archive_file_map[original_filename]
db.execute("UPDATE documents SET linked_audio_url = ?, linked_audio_path = NULL, linked_video_path = NULL WHERE id = ?", (archive_url, doc_id))
counts['success'] += 1; print(f" [SUCCESS] Linked to: {archive_url}")
else:
counts['not_in_archive'] += 1; print(f" [FAIL] Filename '{original_filename}' not in '{archive_id}' archive.")
print("\n--- Archive.org Linking Summary ---")
print(f" Successfully linked: {counts['success']}")
print(f" Not in Archive.org: {counts['not_in_archive']}")
print(f" Skipped (ambiguous): {counts['skipped']}")
print(f" No XML match found: {counts['no_xml_match']}")
print(f" XML match had no audio: {counts['no_enclosure']}")
print("---------------------------------")
except Exception as e: print(f"\n[FAIL] An error occurred: {e}")
finally: db.close()
def unpodcast_documents():
if input("[WARNING] This will remove ALL bibliographic metadata and media links from ALL SRT documents, and delete ALL 'podcast' collections.\nThis is a destructive action. Are you sure? [y/N]: ").lower() != 'y': print("Operation cancelled."); return
db = get_db_conn()
try:
print("Resetting SRT media links..."); db.execute("UPDATE documents SET linked_audio_path=NULL, linked_video_path=NULL, linked_audio_url=NULL WHERE file_type='SRT'")
print("Deleting bibliographic metadata from SRTs..."); db.execute("DELETE FROM document_metadata WHERE doc_id IN (SELECT id FROM documents WHERE file_type='SRT')")
print("Deleting 'podcast' collections..."); db.execute("DELETE FROM catalogs WHERE catalog_type='podcast'")
print("\n--- All SRT documents have been reset. ---")
except Exception as e: print(f"\n[FAIL] An error occurred: {e}")
finally: db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Redleaf Curator CLI with DuckDB.")
subparsers = parser.add_subparsers(dest='command', required=True, help='Available commands')
subparsers.add_parser('init-db', help="Initializes the DuckDB database and all tables.")
subparsers.add_parser('discover-docs', help="[Step 1] Scans for documents and registers them.")
proc_parser = subparsers.add_parser('process-docs', help="[Step 2] Runs the high-throughput processing pipeline.")
proc_subparsers = proc_parser.add_subparsers(dest='phase', required=True, help='Pipeline phase to run')
run_all_parser = proc_subparsers.add_parser('run-all', help='Runs all processing phases in sequence.')
run_all_parser.add_argument('-w', '--workers', type=int, default=os.cpu_count() or 2, help="Number of parallel workers.")
run_all_parser.add_argument('--batch-size', type=int, default=128, help="Embedding batch size.")
run_all_parser.add_argument('--gpu', action='store_true', help="Enable GPU acceleration.")
run_all_parser.add_argument('--doc-limit', type=int, default=None, help="Process only N documents at a time to save RAM.")
extract_parser = proc_subparsers.add_parser('extract', help='Phase 1: Extracts clean text from all new documents.')
extract_parser.add_argument('-w', '--workers', type=int, default=os.cpu_count() or 2, help="Number of parallel workers.")
extract_parser.add_argument('--doc-limit', type=int, default=None, help="Process only N documents at a time to save RAM.")
nlp_parser = proc_subparsers.add_parser('nlp', help='Phase 2: Performs spaCy NLP analysis on extracted text.')
nlp_parser.add_argument('-w', '--workers', type=int, default=os.cpu_count() or 2, help="Number of parallel workers.")
# --- SWAPPED HELP TEXT AND DEFINITIONS ---
proc_subparsers.add_parser('finalize', help='Phase 3: Commits all staged data to the final tables (Graph Data).')
embed_parser = proc_subparsers.add_parser('embed', help='Phase 4: Generates AI embeddings for all chunks in batches.')
embed_parser.add_argument('-w', '--workers', type=int, default=os.cpu_count() or 2, help="Number of parallel workers.")
embed_parser.add_argument('--batch-size', type=int, default=128, help="Number of items to send to the embedding model at once.")
embed_parser.add_argument('--gpu', action='store_true', help="Enable GPU acceleration for embedding.")
proc_subparsers.add_parser('cleanup', help='Utility: Removes all temporary staging tables.')
reset_parser = subparsers.add_parser('reset-docs', help="[Utility] Resets document statuses to 'New'.")
reset_parser.add_argument('--status', type=str, help="Comma-separated list of statuses to reset (e.g., 'Error,Indexing')")
reset_parser.add_argument('--type', type=str, dest='file_type', help="Comma-separated list of file types to reset (e.g., 'EML,PDF')")
link_parser = subparsers.add_parser('link-media', help="[Optional] Tools for linking media and metadata to SRTs.")
link_subparsers = link_parser.add_subparsers(dest='link_command', required=True)
link_subparsers.add_parser('podcast-meta', help="[RECOMMENDED FIRST] Links SRTs to metadata and web media from local XMLs.")
link_subparsers.add_parser('unpodcast', help="[DESTRUCTIVE] Resets all SRTs by removing metadata, media links, and podcast collections.")
link_archive_parser = link_subparsers.add_parser('archive-org', help="Links SRTs to audio files on an Archive.org item (overwrites existing links).")
link_archive_parser.add_argument('archive_id', type=str, help="The identifier of the item on Archive.org (e.g., 'gaslit').")
bake_parser = subparsers.add_parser('bake-sqlite', help="[Step 3] Bakes the final data to SQLite.")
bake_parser.add_argument('--output', type=str, default='knowledge_base.db', help="Output SQLite file.")
args = parser.parse_args()
if args.command != 'init-db' and not DUCKDB_FILE.exists():
print("[ERROR] DuckDB file not found. Run 'init-db' or 'python curator_reset.py' first."), sys.exit(1)
if args.command == 'init-db':
setup_duckdb_schema()
elif args.command == 'discover-docs':
discover_documents()
elif args.command == 'process-docs':
if args.phase == 'run-all':
pipeline.run_full_pipeline(workers=args.workers, batch_size=args.batch_size, use_gpu=args.gpu, doc_limit=args.doc_limit)
elif args.phase == 'extract':
pipeline.phase_extract_text(workers=args.workers, doc_limit=args.doc_limit)
elif args.phase == 'nlp':
pipeline.phase_nlp_analysis(workers=args.workers)
# Note: No logic change needed here, just the UI text changed
elif args.phase == 'finalize':
pipeline.phase_finalize_data()
elif args.phase == 'embed':
pipeline.phase_generate_embeddings(workers=args.workers, batch_size=args.batch_size, use_gpu=args.gpu)
elif args.phase == 'cleanup':
pipeline.cleanup_staging_tables()
elif args.command == 'reset-docs':
reset_document_status(status_filter=args.status, file_type_filter=args.file_type)
elif args.command == 'link-media':
if args.link_command == 'podcast-meta':
link_podcast_metadata()
elif args.link_command == 'archive-org':
link_archive_org(args.archive_id)
elif args.link_command == 'unpodcast':
unpodcast_documents()
elif args.command == 'bake-sqlite':
subprocess.run(['python', 'curator_data_manager.py', 'bake-sqlite', '--output', args.output])