Skip to content

Commit 9bb1349

Browse files
author
Edouard Silvestre
committed
[FEAT] handle article indexing
1 parent 5871010 commit 9bb1349

2 files changed

Lines changed: 40 additions & 241 deletions

File tree

server/database.py

Lines changed: 26 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -97,24 +97,18 @@ def setup_database(self):
9797

9898
cur.execute("CREATE INDEX IF NOT EXISTS idx_content_hash ON articles(content_hash)")
9999
cur.execute("CREATE INDEX IF NOT EXISTS idx_source_site ON articles(source_site)")
100-
# Index sur le cluster_id pour le tri
101100
cur.execute("CREATE INDEX IF NOT EXISTS idx_cluster_id ON articles(cluster_id)")
102101

103-
104102
def _compute_content_hash(self, item: Dict) -> str:
105103
import hashlib
106-
107104
full_content = item.get("full_content", item.get("description", ""))
108105
return hashlib.sha256(full_content.encode()).hexdigest()
109106

110107
def save_article(self, item: Dict, conn: Optional[psycopg.Connection] = None) -> bool:
111-
"""Save a single article, returning True if inserted."""
112108
content_hash = self._compute_content_hash(item)
113-
114109
if conn is None:
115110
with self.get_connection() as owned_conn:
116111
return self.save_article(item, owned_conn)
117-
118112
cur = conn.cursor()
119113
cur.execute(
120114
"""
@@ -138,47 +132,21 @@ def save_article(self, item: Dict, conn: Optional[psycopg.Connection] = None) ->
138132
datetime.now(UTC),
139133
),
140134
)
141-
142135
return cur.rowcount > 0
143136

144-
def save_articles_batch(self, items: List[Dict]) -> int:
145-
"""Save multiple articles in one transaction."""
146-
with self.get_connection() as conn:
147-
count = 0
148-
for item in items:
149-
if self.save_article(item, conn):
150-
count += 1
151-
return count
152-
153137
def article_exists(self, article_id: str) -> bool:
154-
"""Check if article already exists by ID."""
155138
with self.get_connection() as conn:
156139
cur = conn.cursor()
157140
cur.execute("SELECT 1 FROM articles WHERE id = %s", (article_id,))
158141
return cur.fetchone() is not None
159142

160143
def article_exists_by_hash(self, content_hash: str) -> bool:
161-
"""Check if article already exists by content hash."""
162144
with self.get_connection() as conn:
163145
cur = conn.cursor()
164146
cur.execute("SELECT 1 FROM articles WHERE content_hash = %s", (content_hash,))
165147
return cur.fetchone() is not None
166148

167-
def get_article_by_hash(self, content_hash: str) -> Optional[Dict]:
168-
"""Get article by content hash."""
169-
with self.get_connection() as conn:
170-
cur = conn.cursor()
171-
cur.execute("SELECT * FROM articles WHERE content_hash = %s", (content_hash,))
172-
row = cur.fetchone()
173-
return dict(row) if row else None
174-
175149
def save_embedding(self, article_id: str, embedding: np.ndarray, model: str = "default") -> bool:
176-
"""Save article embedding as a pgvector."""
177-
if embedding.ndim != 1 or embedding.shape[0] != self.embedding_dimension:
178-
raise ValueError(
179-
f"Embedding dimension mismatch: expected {self.embedding_dimension}, got {embedding.shape}"
180-
)
181-
182150
with self.get_connection() as conn:
183151
cur = conn.cursor()
184152
cur.execute(
@@ -194,198 +162,56 @@ def save_embedding(self, article_id: str, embedding: np.ndarray, model: str = "d
194162
)
195163
return cur.rowcount > 0
196164

197-
# VOS AJOUTS ICI : MÉTHODE POUR L'EXTRACTION D'ENTITÉS
198-
def update_article_entities(
199-
self,
200-
article_id: str,
201-
subject: Optional[str],
202-
organization_list: Optional[str],
203-
event_type: Optional[str]
204-
) -> bool:
205-
"""Update an article with entities extracted by the LLM."""
206-
with self.get_connection() as conn:
207-
cur = conn.cursor()
208-
cur.execute(
209-
"""
210-
UPDATE articles
211-
SET subject = %s,
212-
organization_list = %s::jsonb, -- Casté en jsonb
213-
event_type = %s,
214-
updated_at = CURRENT_TIMESTAMP
215-
WHERE id = %s
216-
""",
217-
(subject, organization_list, event_type, article_id),
218-
)
219-
return cur.rowcount > 0
220-
221-
def get_articles_without_embeddings(self, limit: int = 100) -> List[Dict]:
222-
"""Get articles without embeddings."""
165+
def assign_cluster_by_entities(self, article_id: str, subject: str, orgs: str, event: str):
223166
with self.get_connection() as conn:
224167
cur = conn.cursor()
225168
cur.execute(
226169
"""
227-
SELECT a.* FROM articles a
228-
LEFT JOIN embeddings e ON a.id = e.article_id
229-
WHERE e.id IS NULL
230-
LIMIT %s
170+
SELECT cluster_id FROM articles
171+
WHERE subject = %s AND organization_list = %s::jsonb AND event_type = %s
172+
AND cluster_id IS NOT NULL LIMIT 1
231173
""",
232-
(limit,),
174+
(subject, orgs, event)
233175
)
234-
return list(cur.fetchall())
235-
236-
def get_all_embeddings_with_ids(self) -> List[Tuple[str, np.ndarray]]:
237-
"""
238-
Retrieve all article IDs and their corresponding embeddings for clustering.
239-
Returns a list of (article_id, embedding_array).
240-
"""
241-
with self.get_connection() as conn:
242-
cur = conn.cursor()
243-
cur.execute(
244-
"""
245-
SELECT article_id, embedding
246-
FROM embeddings
247-
ORDER BY article_id
248-
"""
249-
)
250-
return [(row["article_id"], row["embedding"]) for row in cur.fetchall()]
251-
252-
def batch_update_cluster_ids(self, updates: List[Tuple[str, int]]) -> int:
253-
"""
254-
Update cluster_id for a list of articles in a single batch transaction.
255-
256-
Args:
257-
updates: List of (article_id, cluster_id).
258-
259-
Returns:
260-
Number of rows updated.
261-
"""
262-
if not updates:
263-
return 0
264-
265-
updated_count = 0
266-
with self.get_connection() as conn:
267-
cur = conn.cursor()
268-
269-
cur.execute(
270-
"""
271-
CREATE TEMP TABLE cluster_updates (article_id TEXT, cluster_id INTEGER);
272-
"""
273-
)
274-
275-
sql_updates = [(item[0], item[1]) for item in updates]
276-
cur.executemany(
277-
"INSERT INTO cluster_updates (article_id, cluster_id) VALUES (%s, %s)",
278-
sql_updates,
279-
)
280-
281-
cur.execute(
282-
"""
283-
UPDATE articles
284-
SET cluster_id = cu.cluster_id,
285-
updated_at = CURRENT_TIMESTAMP
286-
FROM cluster_updates cu
287-
WHERE articles.id = cu.article_id
288-
"""
289-
)
290-
updated_count = cur.rowcount
291-
292-
cur.execute("DROP TABLE cluster_updates")
293-
294-
return updated_count
295-
296-
297-
def get_articles_by_source(self, source: str, limit: int = 50) -> List[Dict]:
298-
"""Get articles from a specific source."""
299-
with self.get_connection() as conn:
300-
cur = conn.cursor()
301-
cur.execute(
302-
"""
303-
SELECT * FROM articles
304-
WHERE source_site = %s
305-
ORDER BY published_date DESC NULLS LAST
306-
LIMIT %s
307-
""",
308-
(source, limit),
309-
)
310-
return list(cur.fetchall())
311-
312-
def get_total_articles(self) -> int:
313-
"""Return total number of articles."""
314-
with self.get_connection() as conn:
315-
cur = conn.cursor()
316-
cur.execute("SELECT COUNT(*) AS count FROM articles")
317176
row = cur.fetchone()
318-
return row["count"] if row else 0
319-
320-
def get_articles_by_source_count(self) -> Dict[str, int]:
321-
"""Return number of articles per source."""
322-
with self.get_connection() as conn:
323-
cur = conn.cursor()
324-
cur.execute(
325-
"""
326-
SELECT source_site, COUNT(*) as count
327-
FROM articles
328-
GROUP BY source_site
329-
ORDER BY count DESC
330-
"""
331-
)
332-
return {row["source_site"]: row["count"] for row in cur.fetchall()}
333-
334-
def record_sync(self, source: str, mode: str, items_processed: int = 0):
335-
"""Record a synchronization event."""
336-
with self.get_connection() as conn:
337-
cur = conn.cursor()
338-
cur.execute(
339-
"""
340-
INSERT INTO sync_history
341-
(source_site, sync_mode, last_sync_time, items_processed)
342-
VALUES (%s, %s, %s, %s)
343-
""",
344-
(source, mode, datetime.now(UTC), items_processed),
345-
)
346-
347-
def get_last_sync(self, source: str, mode: str) -> Optional[Dict]:
348-
"""Get last sync for a source and mode."""
349-
with self.get_connection() as conn:
350-
cur = conn.cursor()
177+
if row:
178+
cid = row['cluster_id']
179+
else:
180+
cur.execute("SELECT COALESCE(MAX(cluster_id), 0) + 1 as next_id FROM articles")
181+
cid = cur.fetchone()['next_id']
182+
351183
cur.execute(
352184
"""
353-
SELECT * FROM sync_history
354-
WHERE source_site = %s AND sync_mode = %s
355-
ORDER BY created_at DESC
356-
LIMIT 1
185+
UPDATE articles SET subject=%s, organization_list=%s::jsonb,
186+
event_type=%s, cluster_id=%s, updated_at=CURRENT_TIMESTAMP
187+
WHERE id=%s
357188
""",
358-
(source, mode),
189+
(subject, orgs, event, cid, article_id)
359190
)
360-
row = cur.fetchone()
361-
return dict(row) if row else None
362191

363192
def get_stats(self) -> Dict:
364-
"""Return database statistics."""
365193
with self.get_connection() as conn:
366194
cur = conn.cursor()
367195
cur.execute("SELECT COUNT(*) AS count FROM articles")
368196
total_articles = cur.fetchone()["count"]
369-
370197
cur.execute("SELECT COUNT(*) AS count FROM embeddings")
371198
total_embeddings = cur.fetchone()["count"]
372-
373-
cur.execute("SELECT COUNT(DISTINCT cluster_id) AS count FROM articles WHERE cluster_id IS NOT NULL AND cluster_id != -1")
199+
cur.execute("SELECT COUNT(DISTINCT cluster_id) AS count FROM articles WHERE cluster_id IS NOT NULL")
374200
total_clusters = cur.fetchone()["count"]
375-
376-
cur.execute(
377-
"""
378-
SELECT source_site, COUNT(*) as count
379-
FROM articles
380-
GROUP BY source_site
381-
"""
382-
)
201+
cur.execute("SELECT source_site, COUNT(*) as count FROM articles GROUP BY source_site")
383202
articles_by_source = {row["source_site"]: row["count"] for row in cur.fetchall()}
384-
385203
return {
386204
"total_articles": total_articles,
387205
"total_embeddings": total_embeddings,
388206
"articles_by_source": articles_by_source,
389207
"articles_without_embeddings": total_articles - total_embeddings,
390208
"total_clusters": total_clusters,
391-
}
209+
}
210+
211+
def record_sync(self, source: str, mode: str, items_processed: int = 0):
212+
with self.get_connection() as conn:
213+
cur = conn.cursor()
214+
cur.execute(
215+
"INSERT INTO sync_history (source_site, sync_mode, last_sync_time, items_processed) VALUES (%s, %s, %s, %s)",
216+
(source, mode, datetime.now(UTC), items_processed),
217+
)

server/entity_llm_processor.py

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,77 +1,50 @@
11
import logging
22
import json
3-
from typing import Dict, Any, List
3+
from typing import Dict, Any
44
from openai import OpenAI, APIError
55

66
logger = logging.getLogger(__name__)
77

88
class EntityLLMProcessor:
9-
10-
119
def __init__(self, model: str = "gpt-4o-mini"):
12-
1310
self.client = OpenAI()
1411
self.model = model
1512
self.system_prompt = (
1613
"You are an expert technical analysis system. Your task is to extract "
1714
"key entities from a given technical article text and return them in "
1815
"JSON format. Focus on high-level subjects, involved organizations, "
19-
"and the type of event the article describes (e.g., Release, Rumor, Paper, Vulnerability)."
16+
"and the type of event the article describes."
2017
)
21-
2218
self.entity_schema = {
2319
"type": "object",
2420
"properties": {
25-
"subject": {
26-
"type": "string",
27-
"description": "The main technical topic, e.g., 'LLMs', 'Kubernetes', 'Quantum Computing', 'Cybersecurity'."
28-
},
29-
"organization_list": {
30-
"type": "array",
31-
"items": {"type": "string"},
32-
"description": "A list of key organizations or companies mentioned (e.g., Google, Microsoft, Meta)."
33-
},
34-
"event_type": {
35-
"type": "string",
36-
"description": "The type of event described: 'Release', 'Rumor', 'Acquisition', 'Vulnerability', 'Paper', 'Announcement', 'Fake', 'Update', 'Explainer'."
37-
},
21+
"subject": {"type": "string"},
22+
"organization_list": {"type": "array", "items": {"type": "string"}},
23+
"event_type": {"type": "string"},
3824
},
3925
"required": ["subject", "organization_list", "event_type"]
4026
}
4127

4228
def process(self, article: Dict, db_manager: Any) -> bool:
43-
4429
article_id = article["id"]
45-
46-
content_for_llm = f"Title: {article.get('title', '')}\nDescription: {article.get('description', '')}\nContent snippet: {article.get('full_content', '')[:500]}..."
47-
30+
content = f"Title: {article.get('title', '')}\nDescription: {article.get('description', '')}\nContent: {article.get('full_content', '')[:500]}..."
4831
try:
4932
response = self.client.chat.completions.create(
5033
model=self.model,
5134
messages=[
5235
{"role": "system", "content": self.system_prompt},
53-
{"role": "user", "content": content_for_llm}
36+
{"role": "user", "content": content}
5437
],
55-
response_model=self.entity_schema,
5638
response_format={"type": "json_object"},
5739
)
58-
59-
entity_data = json.loads(response.choices[0].message.content)
60-
61-
db_manager.update_article_entities(
40+
data = json.loads(response.choices[0].message.content)
41+
db_manager.assign_cluster_by_entities(
6242
article_id,
63-
subject=entity_data.get("subject"),
64-
organization_list=json.dumps(entity_data.get("organization_list", [])),
65-
event_type=entity_data.get("event_type"),
43+
subject=data.get("subject"),
44+
orgs=json.dumps(data.get("organization_list", [])),
45+
event=data.get("event_type")
6646
)
67-
logger.debug(f"Entities successfully extracted and saved for {article_id}")
6847
return True
69-
70-
except APIError as e:
71-
logger.error(f"OpenAI API Error for {article_id}: {e}")
72-
except json.JSONDecodeError:
73-
logger.error(f"LLM did not return valid JSON for {article_id}")
7448
except Exception as e:
75-
logger.error(f"General error during LLM processing for {article_id}: {e}")
76-
77-
return False
49+
logger.error(f"Error for {article_id}: {e}")
50+
return False

0 commit comments

Comments
 (0)