-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathunified_database.py
More file actions
594 lines (506 loc) · 23.1 KB
/
unified_database.py
File metadata and controls
594 lines (506 loc) · 23.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
"""
Unified database - Integrates scholar search and general search results into a single database
"""
import sqlite3
import json
import logging
from datetime import datetime
from typing import List, Optional, Dict, Any, Tuple
from dataclasses import dataclass
from pathlib import Path
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from checker.models import ExternalReference
logger = logging.getLogger(__name__)
@dataclass
class ScholarRecord:
"""Scholar search record data structure"""
id: Optional[int] = None
# Basic information
title: str = None
authors: str = None # Author list, separated by commas
year: Optional[int] = None
venue: str = None
url: str = None
# Timestamp
created_at: datetime = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
'id': self.id,
'title': self.title,
'authors': self.authors,
'year': self.year,
'venue': self.venue,
'url': self.url,
'created_at': self.created_at.isoformat() if self.created_at else None
}
@dataclass
class SearchResultRecord:
"""Search result record data structure"""
id: Optional[int] = None
# Basic information
title: str = None
authors: str = None # Author list, separated by commas
venue: str = None
year: Optional[int] = None
url: str = None
# Search-related information
source: str = None # Search source: google_search, scrapingdog
search_query: str = None # Search query
search_engine: str = None # Search engine: google, google_scholar
# Metadata
metadata: str = None # Additional metadata in JSON format
result_position: Optional[int] = None # Position in search results
# Timestamp
created_at: datetime = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary"""
return {
'id': self.id,
'title': self.title,
'authors': self.authors,
'venue': self.venue,
'year': self.year,
'url': self.url,
'source': self.source,
'search_query': self.search_query,
'search_engine': self.search_engine,
'metadata': self.metadata,
'result_position': self.result_position,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class UnifiedDatabase:
"""Unified database - Contains scholar_results and search_results tables"""
def __init__(self, db_path: str = "scholar_results.db"):
"""
Initialize unified database
Args:
db_path: Database file path
"""
self.db_path = Path(db_path)
self.connection: Optional[sqlite3.Connection] = None
self._create_tables()
def _create_tables(self):
"""Create database tables"""
with sqlite3.connect(self.db_path) as conn:
# Create scholar_results table (verification result cache)
conn.execute("""
CREATE TABLE IF NOT EXISTS scholar_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
-- Basic information
title TEXT,
authors TEXT,
year INTEGER,
venue TEXT,
url TEXT,
-- Timestamp
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create search_results table (search result storage)
conn.execute("""
CREATE TABLE IF NOT EXISTS search_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
-- Basic information
title TEXT,
authors TEXT,
venue TEXT,
year INTEGER,
url TEXT,
-- Search-related information
source TEXT NOT NULL,
search_query TEXT,
search_engine TEXT,
-- Metadata
metadata TEXT,
result_position INTEGER,
-- Timestamp
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Create index for scholar_results table
conn.execute("CREATE INDEX IF NOT EXISTS idx_title ON scholar_results(title)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_year ON scholar_results(year)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_created_at ON scholar_results(created_at)")
# Create unique constraint for scholar_results table
try:
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_reference
ON scholar_results(title, authors, year)
""")
except sqlite3.IntegrityError as e:
if "UNIQUE constraint failed" in str(e):
logger.warning("Duplicate records detected in scholar_results table, cleaning up...")
self._cleanup_scholar_duplicates(conn)
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_reference
ON scholar_results(title, authors, year)
""")
logger.info("Duplicate records in scholar_results cleaned up, unique index created successfully")
else:
raise
# Create index for search_results table
conn.execute("CREATE INDEX IF NOT EXISTS idx_search_title ON search_results(title)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_search_authors ON search_results(authors)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_search_venue ON search_results(venue)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_search_year ON search_results(year)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_search_source ON search_results(source)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_search_query ON search_results(search_query)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_search_engine ON search_results(search_engine)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_search_created_at ON search_results(created_at)")
# Create unique constraint for search_results table
# Main constraint: title + source must be unique (for the same document from the same source)
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_search_result_basic
ON search_results(title, source)
WHERE title IS NOT NULL AND source IS NOT NULL
""")
# Supplementary constraint: when complete information is available, title + authors + venue + source must be unique
conn.execute("""
CREATE UNIQUE INDEX IF NOT EXISTS idx_unique_search_result_full
ON search_results(title, authors, venue, source)
WHERE title IS NOT NULL AND authors IS NOT NULL AND venue IS NOT NULL AND source IS NOT NULL
""")
conn.commit()
logger.debug("Unified database tables created/verified")
def _cleanup_scholar_duplicates(self, conn: sqlite3.Connection) -> int:
"""Clean up duplicate records in scholar_results table"""
cursor = conn.execute("""
DELETE FROM scholar_results
WHERE id NOT IN (
SELECT MIN(id)
FROM scholar_results
GROUP BY title, authors, year
)
""")
deleted_count = cursor.rowcount
logger.info(f"Cleaned up {deleted_count} duplicate records")
return deleted_count
# ========== Scholar Results related methods ==========
def search_scholar_by_title(self, title: str) -> Optional[ScholarRecord]:
"""Search for scholar records by title"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT * FROM scholar_results
WHERE title LIKE ?
ORDER BY created_at DESC
LIMIT 1
""", (f"%{title}%",))
row = cursor.fetchone()
if row:
return self._row_to_scholar_record(row)
return None
def _row_to_scholar_record(self, row) -> ScholarRecord:
"""Convert database row to ScholarRecord object"""
if not row:
return None
return ScholarRecord(
id=row[0],
title=row[1],
authors=row[2],
year=row[3],
venue=row[4],
url=row[5],
created_at=datetime.fromisoformat(row[6]) if row[6] else None
)
def check_scholar_exists(self, title: str, authors: str, year: int) -> Optional[ScholarRecord]:
"""Check if scholar record exists"""
if not title or not authors or not year:
return None
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT * FROM scholar_results
WHERE title = ? AND authors = ? AND year = ?
LIMIT 1
""", (title, authors, year))
row = cursor.fetchone()
if row:
return self._row_to_scholar_record(row)
return None
def insert_scholar_result(self, record: ScholarRecord, ignore_duplicates: bool = True) -> Optional[int]:
"""Insert scholar record"""
if not ignore_duplicates:
existing = self.check_scholar_exists(record.title, record.authors, record.year)
if existing:
raise ValueError(f"Duplicate scholar record: title='{record.title}', authors='{record.authors}', year={record.year}")
with sqlite3.connect(self.db_path) as conn:
try:
cursor = conn.execute("""
INSERT INTO scholar_results (
title, authors, year, venue, url, created_at
) VALUES (?, ?, ?, ?, ?, ?)
""", (
record.title, record.authors, record.year, record.venue, record.url,
record.created_at or datetime.now()
))
record_id = cursor.lastrowid
conn.commit()
logger.debug(f"Inserted scholar result with ID {record_id}: {record.title[:50] if record.title else 'No title'}...")
return record_id
except sqlite3.IntegrityError as e:
if "UNIQUE constraint failed" in str(e):
if ignore_duplicates:
logger.debug(f"Duplicate scholar record detected during insert, skipping: {record.title[:50] if record.title else 'No title'}...")
return None
else:
raise ValueError(f"Duplicate scholar record: title='{record.title}', authors='{record.authors}', year={record.year}") from e
else:
raise
def get_scholar_statistics(self) -> Dict[str, Any]:
"""Get scholar_results table statistics"""
with sqlite3.connect(self.db_path) as conn:
# Total number of records
cursor = conn.execute("SELECT COUNT(*) FROM scholar_results")
total_records = cursor.fetchone()[0]
return {
'total_records': total_records
}
def cleanup_scholar_duplicates(self) -> int:
"""Clean up duplicate data in scholar_results table"""
with sqlite3.connect(self.db_path) as conn:
return self._cleanup_scholar_duplicates(conn)
def clear_scholar_data(self):
"""Clear all data from scholar_results table"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM scholar_results")
conn.commit()
logger.info("All scholar results data cleared")
# ========== Search Results Related Methods ==========
def check_search_duplicate(self, title: str, authors: str, venue: str, source: str) -> Optional[SearchResultRecord]:
"""Check if search result is duplicate"""
# Can perform duplicate check as long as title and source are present
if not title or not source:
return None
with sqlite3.connect(self.db_path) as conn:
# Use basic title + source matching for duplicate check
# This aligns with our primary uniqueness constraint
cursor = conn.execute("""
SELECT * FROM search_results
WHERE title = ? AND source = ?
LIMIT 1
""", (title, source))
row = cursor.fetchone()
if row:
return self._row_to_search_record(row)
return None
def _row_to_search_record(self, row) -> SearchResultRecord:
"""Convert database row to SearchResultRecord object"""
if not row:
return None
return SearchResultRecord(
id=row[0],
title=row[1],
authors=row[2],
venue=row[3],
year=row[4],
url=row[5],
source=row[6],
search_query=row[7],
search_engine=row[8],
metadata=row[9],
result_position=row[10],
created_at=datetime.fromisoformat(row[11]) if row[11] else None
)
def insert_search_result(self, record: SearchResultRecord, ignore_duplicates: bool = True) -> Optional[int]:
"""Insert search result record"""
if not ignore_duplicates:
existing = self.check_search_duplicate(record.title, record.authors, record.venue, record.source)
if existing:
raise ValueError(f"Duplicate search result: title='{record.title}', source='{record.source}'")
with sqlite3.connect(self.db_path) as conn:
try:
now = datetime.now()
cursor = conn.execute("""
INSERT INTO search_results (
title, authors, venue, year, url,
source, search_query, search_engine,
metadata, result_position, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record.title, record.authors, record.venue, record.year, record.url,
record.source, record.search_query, record.search_engine,
record.metadata, record.result_position, record.created_at or now
))
record_id = cursor.lastrowid
conn.commit()
logger.debug(f"Inserted search result with ID {record_id}: {record.title[:50] if record.title else 'No title'}...")
return record_id
except sqlite3.IntegrityError as e:
if "UNIQUE constraint failed" in str(e):
if ignore_duplicates:
logger.debug(f"Duplicate search result detected during insert, skipping: {record.title[:50] if record.title else 'No title'}...")
return None
else:
raise ValueError(f"Duplicate search result: title='{record.title}', source='{record.source}'") from e
else:
raise
def insert_search_results_batch(self, records: List[SearchResultRecord], ignore_duplicates: bool = True) -> Dict[str, int]:
"""Batch insert search result records"""
stats = {
'inserted': 0,
'duplicates': 0,
'errors': 0
}
for record in records:
try:
record_id = self.insert_search_result(record, ignore_duplicates)
if record_id:
stats['inserted'] += 1
else:
stats['duplicates'] += 1
except Exception as e:
logger.error(f"Error inserting search result {record.title}: {e}")
stats['errors'] += 1
logger.info(f"Batch insert completed: {stats}")
return stats
def search_search_results_by_title(self, title: str) -> List[SearchResultRecord]:
"""Search search result records by title"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT * FROM search_results
WHERE title LIKE ?
ORDER BY created_at DESC
""", (f"%{title}%",))
return [self._row_to_search_record(row) for row in cursor.fetchall()]
def search_search_results_by_source(self, source: str) -> List[SearchResultRecord]:
"""Search records by search source"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT * FROM search_results
WHERE source = ?
ORDER BY created_at DESC
""", (source,))
return [self._row_to_search_record(row) for row in cursor.fetchall()]
def get_search_results_statistics(self) -> Dict[str, Any]:
"""Get search results statistics"""
with sqlite3.connect(self.db_path) as conn:
# Total number of records
cursor = conn.execute("SELECT COUNT(*) FROM search_results")
total_records = cursor.fetchone()[0]
# Distribution by source
cursor = conn.execute("""
SELECT source, COUNT(*)
FROM search_results
GROUP BY source
ORDER BY COUNT(*) DESC
""")
source_distribution = dict(cursor.fetchall())
# Distribution by search engine
cursor = conn.execute("""
SELECT search_engine, COUNT(*)
FROM search_results
WHERE search_engine IS NOT NULL
GROUP BY search_engine
ORDER BY COUNT(*) DESC
""")
search_engine_distribution = dict(cursor.fetchall())
# Distribution by year
cursor = conn.execute("""
SELECT year, COUNT(*)
FROM search_results
WHERE year IS NOT NULL
GROUP BY year
ORDER BY year DESC
LIMIT 10
""")
year_distribution = dict(cursor.fetchall())
# Number of records with URL
cursor = conn.execute("SELECT COUNT(*) FROM search_results WHERE url IS NOT NULL AND url != ''")
records_with_url = cursor.fetchone()[0]
# Most recent search record
cursor = conn.execute("""
SELECT created_at
FROM search_results
ORDER BY created_at DESC
LIMIT 1
""")
latest_result = cursor.fetchone()
latest_search_time = latest_result[0] if latest_result else None
return {
'total_records': total_records,
'source_distribution': source_distribution,
'search_engine_distribution': search_engine_distribution,
'year_distribution': year_distribution,
'records_with_url': records_with_url,
'latest_search_time': latest_search_time
}
def export_search_results_to_csv(self, output_path: str, source_filter: str = None) -> int:
"""Export search results to CSV file"""
import pandas as pd
where_clause = f"WHERE source = '{source_filter}'" if source_filter else ""
with sqlite3.connect(self.db_path) as conn:
query = f"""
SELECT
id, title, authors, venue, year, url,
source, search_query, search_engine,
metadata, result_position, created_at
FROM search_results
{where_clause}
ORDER BY created_at DESC
"""
df = pd.read_sql_query(query, conn)
df.to_csv(output_path, index=False, encoding='utf-8')
logger.info(f"Exported {len(df)} search result records to {output_path}")
return len(df)
def clear_search_results_data(self):
"""Clear all search results data"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM search_results")
conn.commit()
logger.info("All search results data cleared")
# ========== General Methods ==========
def clear_all_data(self):
"""Clear all data"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM scholar_results")
conn.execute("DELETE FROM search_results")
conn.commit()
logger.info("All data cleared from unified database")
def close(self):
"""Close database connection"""
if self.connection:
self.connection.close()
self.connection = None
def create_scholar_record_from_external_reference(external_ref: ExternalReference) -> ScholarRecord:
"""Create ScholarRecord from ExternalReference"""
return ScholarRecord(
title=external_ref.title,
authors=', '.join(external_ref.authors) if external_ref.authors else None,
year=external_ref.year,
venue=external_ref.venue,
url=external_ref.url,
created_at=datetime.now()
)
def create_search_result_record_from_external_reference(
external_ref: ExternalReference,
search_query: str = None,
result_position: int = None
) -> SearchResultRecord:
"""Create SearchResultRecord from ExternalReference"""
authors_str = ', '.join(external_ref.authors) if external_ref.authors else None
# Extract search engine information from metadata
search_engine = None
if external_ref.metadata:
search_engine = external_ref.metadata.get('search_engine')
# Serialize metadata
metadata_json = None
if external_ref.metadata:
try:
metadata_json = json.dumps(external_ref.metadata, ensure_ascii=False)
except Exception as e:
logger.warning(f"Failed to serialize metadata: {e}")
return SearchResultRecord(
title=external_ref.title,
authors=authors_str,
venue=external_ref.venue,
year=external_ref.year,
url=external_ref.url,
source=external_ref.source,
search_query=search_query,
search_engine=search_engine,
metadata=metadata_json,
result_position=result_position,
created_at=datetime.now()
)