diff --git a/server/.env.example b/server/.env.example new file mode 100644 index 0000000..3a6b397 --- /dev/null +++ b/server/.env.example @@ -0,0 +1,11 @@ +# OpenAI API Configuration +OPENAI_API_KEY=your_openai_api_key_here + +# PostgreSQL connection (used by DatabaseManager) +DATABASE_URL=postgresql://postgres:postgres@localhost:5432/veille_technique + +# Optional: Override embedding model (OpenAI) +EMBEDDING_MODEL=text-embedding-3-small + +# Optional: GitHub Token for higher rate limits +GITHUB_TOKEN=your_github_token_here diff --git a/server/ARCHITECTURE.md b/server/ARCHITECTURE.md new file mode 100644 index 0000000..94175f9 --- /dev/null +++ b/server/ARCHITECTURE.md @@ -0,0 +1,210 @@ +# Watch Server - Complete Redesign + +## šŸ“‹ Summary of Changes + +The server has been redesigned with a modular architecture and two operating modes. + +## šŸ—ļø Architecture + +``` +server/ +ā”œā”€ā”€ main.py # Main server with WatchServer (orchestrator) +ā”œā”€ā”€ database.py # PostgreSQL + pgvector database manager +ā”œā”€ā”€ embeddings.py # Embeddings manager (vectors) +ā”œā”€ā”€ config.py # Centralized configuration +ā”œā”€ā”€ examples.py # Usage examples +ā”œā”€ā”€ requirements.txt # Python dependencies +ā”œā”€ā”€ README.md # Complete documentation +└── scrapers/ + ā”œā”€ā”€ base.py # Abstract BaseScraper interface + ā”œā”€ā”€ arxiv_scraper.py # Scraper for arXiv + ā”œā”€ā”€ github_scraper.py # Scraper for GitHub + ā”œā”€ā”€ medium_scraper.py # Scraper for Medium + ā”œā”€ā”€ lemonde_scraper.py # Scraper for Le Monde + └── huggingface_scraper.py # Scraper for Hugging Face +``` + +## šŸŽÆ Operating Modes + +### 1ļøāƒ£ Backfill Mode (History) +**When:** At startup (optional) + +**What:** Scrapes all available history from each source. + +**How:** +```bash +python main.py backfill --limit 100 +``` + +**Flow:** +1. Each scraper calls `scrape_all()`. +2. Articles are saved (deduplicated by ID and hash). +3. Embeddings are generated and stored. +4. Sync history is recorded. + +### 2ļøāƒ£ Watch Mode (Monitoring) +**When:** Continuous monitoring after (or without) backfill. + +**How:** +```bash +python main.py watch --interval 300 +``` + +**Flow:** +1. Infinite loop (default 5-minute interval). +2. Each scraper calls `scrape_latest()`. +3. New articles are saved; embeddings generated. +4. Sync history is recorded. + +## šŸ”§ Main Components + +### BaseScraper (abstract interface) +- `scrape_latest(limit)` → watch mode +- `scrape_all(limit)` → backfill mode +- `normalize_item()` → unified format + +### DatabaseManager +- PostgreSQL persistence with pgvector +- Tables: articles, embeddings (vector), sync_history +- Automatic deduplication via `ON CONFLICT` +- Vector-ready queries and batch operations + +### EmbeddingManager +- Providers: Dummy, SentenceTransformers, OpenAI +- Generates numpy vectors sized to the chosen model +- Stores vectors directly in pgvector columns (no pickle) + +### WatchServer (orchestrator) +- Initializes all scrapers +- Manages both modes +- Logging, statistics, and monitoring + +## šŸ’¾ Database Structure + +### Table `articles` +``` +id (TEXT PRIMARY KEY) # Unique identifier per source +source_site (TEXT) # arxiv, github, medium, le_monde, huggingface +title (TEXT) # Article title +description (TEXT) # Summary/content +author_info (TEXT) # Author(s) +keywords (TEXT) # Tags/categories +content_url (TEXT) # Link to source +published_date (TIMESTAMPTZ) # Publication date +item_type (TEXT) # article, paper, repository, etc. +created_at (TIMESTAMPTZ) # When retrieved +updated_at (TIMESTAMPTZ) # Last update +``` + +### Table `embeddings` +``` +id (SERIAL PRIMARY KEY) # Unique embedding row +article_id (TEXT UNIQUE) # Link to articles.id +embedding vector(1536) # pgvector column (dimension tied to embedding model) +embedding_model (TEXT) # Which model generated the embedding +created_at (TIMESTAMPTZ) # When created +``` + +### Table `sync_history` +``` +id (SERIAL PRIMARY KEY) # Unique sync row +source_site (TEXT) # Which source +sync_mode (TEXT) # "watch" or "backfill" +last_sync_time (TIMESTAMPTZ) # When +items_processed (INTEGER) # How many articles +created_at (TIMESTAMPTZ) # When recorded +``` + +## šŸš€ Usage + +### Simple startup +```bash +# 1. Fill DB with history +python main.py backfill --limit 50 + +# 2. Monitor continuously +python main.py watch --interval 300 + +# 3. Check stats +python main.py stats +``` + +### With options +```bash +# Custom backfill +python main.py backfill --limit 200 --db-url postgresql://user:pass@localhost:5432/veille_technique + +# Watch with 10-minute interval +python main.py watch --interval 600 + +# Stats on specific DB +python main.py stats --db-url postgresql://user:pass@localhost:5432/veille_technique +``` + +## šŸ“Š Complete Flow Example + +``` +Server startup +│ +ā”œā”€ā†’ BACKFILL Mode (optional) +│ ā”œā”€ā†’ ArXiv.scrape_all(100) → … articles → DB +│ ā”œā”€ā†’ GitHub.scrape_all(100) → … articles → DB +│ ā”œā”€ā†’ Medium.scrape_all(100) → … articles → DB +│ ā”œā”€ā†’ LeMonde.scrape_all(100) → … articles → DB +│ └─→ HF.scrape_all(100) → … articles → DB +│ ↓ All articles receive an embedding +│ +└─→ WATCH Mode (infinite loop) + ā”œā”€ā†’ Iteration 1 … + ā”œā”€ā†’ [Wait interval] + └─→ Iteration 2 … +``` + +## šŸ”‘ Key Design Points + +### āœ“ Modularity +- Independent scrapers; easy to add/remove +- Interchangeable embedding providers + +### āœ“ Robustness +- Error isolation per scraper +- Deduplication prevents duplicates + +### āœ“ Scalability +- Batch DB operations +- Vector-ready schema +- Structured logging + +### āœ“ Maintainability +- Clear code and docs +- Centralized configuration +- Usage examples included + +## šŸ’» How to View the Database + +Use PostgreSQL tooling (`psql`, `pgcli`, DBeaver, PgAdmin`) with `DATABASE_URL`. + +```bash +# List tables +psql "$DATABASE_URL" -c "\dt" + +# Check pgvector extension +psql "$DATABASE_URL" -c "\dx vector" + +# Quick counts +psql "$DATABASE_URL" -c "SELECT COUNT(*) FROM articles;" +psql "$DATABASE_URL" -c "SELECT COUNT(*) FROM embeddings;" + +# Example vector query (top 5 nearest) +psql "$DATABASE_URL" -c "SELECT article_id, embedding <-> '[0.1,0.2,...]' AS distance FROM embeddings ORDER BY embedding <-> '[0.1,0.2,...]' LIMIT 5;" + +# Last syncs +psql "$DATABASE_URL" -c "SELECT * FROM sync_history ORDER BY created_at DESC LIMIT 5;" + +# Export (custom format) +pg_dump --dbname="$DATABASE_URL" --format=c --file=veille_technique.dump +``` + +## šŸ“ Migration from Old Server + +Legacy code in `scrap/` remains for reference; the new server reuses scraping logic with the updated architecture. diff --git a/server/config.py b/server/config.py new file mode 100644 index 0000000..273392c --- /dev/null +++ b/server/config.py @@ -0,0 +1,72 @@ +"""Configuration for the watch server.""" + +import os +from dataclasses import dataclass +from typing import Dict + + +@dataclass +class ScraperConfig: + """Configuration for a specific scraper.""" + enabled: bool = True + limit_latest: int = 20 + + limit_all: int = 100 + + +@dataclass +class ServerConfig: + """Global server configuration.""" + + db_url: str = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/veille_technique") + + watch_interval_seconds: int = 300 + + log_level: str = "INFO" + + scrapers: Dict[str, ScraperConfig] = None + + def __post_init__(self): + """Initialize default scraper configuration.""" + if self.scrapers is None: + self.scrapers = { + "arxiv": ScraperConfig(enabled=True, limit_latest=20, limit_all=100), + "github": ScraperConfig(enabled=True, limit_latest=20, limit_all=100), + "medium": ScraperConfig(enabled=True, limit_latest=20, limit_all=100), + "lemonde": ScraperConfig(enabled=True, limit_latest=20, limit_all=100), + "huggingface": ScraperConfig(enabled=True, limit_latest=20, limit_all=100), + } + + @classmethod + def from_file(cls, filepath: str) -> "ServerConfig": + """Load configuration from JSON/YAML file.""" + import json + + try: + with open(filepath, 'r') as f: + data = json.load(f) + + if "scrapers" in data: + data["scrapers"] = { + name: ScraperConfig(**cfg) + for name, cfg in data["scrapers"].items() + } + + return cls(**data) + except FileNotFoundError: + print(f"Config file {filepath} not found, using default config") + return cls() + + +DEFAULT_CONFIG = ServerConfig() + +DEV_CONFIG = ServerConfig( + db_url=os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/veille_technique_dev"), + watch_interval_seconds=60, +) + +PROD_CONFIG = ServerConfig( + db_url=os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/veille_technique"), + watch_interval_seconds=600, + log_level="WARNING", +) diff --git a/server/database.py b/server/database.py new file mode 100644 index 0000000..17c1d8f --- /dev/null +++ b/server/database.py @@ -0,0 +1,289 @@ +"""Database manager for the server (PostgreSQL + pgvector).""" + +from contextlib import contextmanager +from datetime import UTC, datetime +from typing import Dict, List, Optional + +import numpy as np +import psycopg +from pgvector.psycopg import register_vector +from psycopg.rows import dict_row + + +class DatabaseManager: + """Manages unified database operations using PostgreSQL.""" + + def __init__(self, db_url: str, embedding_dimension: int = 1536): + """Initialize the database manager and ensure schema is ready.""" + self.db_url = db_url + self.embedding_dimension = embedding_dimension + self.setup_database() + + @contextmanager + def get_connection(self): + """Context manager for PostgreSQL connections with pgvector registered.""" + conn = psycopg.connect(self.db_url, row_factory=dict_row, autocommit=False) + register_vector(conn) + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() + + def setup_database(self): + """Initialize database and create tables (idempotent).""" + with self.get_connection() as conn: + cur = conn.cursor() + + cur.execute("CREATE EXTENSION IF NOT EXISTS vector") + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS articles ( + id TEXT PRIMARY KEY, + source_site TEXT NOT NULL, + title TEXT NOT NULL, + description TEXT, + full_content TEXT, + content_hash TEXT UNIQUE, + author_info TEXT, + keywords TEXT, + content_url TEXT NOT NULL, + published_date TIMESTAMPTZ, + item_type TEXT, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + + cur.execute( + f""" + CREATE TABLE IF NOT EXISTS embeddings ( + id SERIAL PRIMARY KEY, + article_id TEXT NOT NULL UNIQUE REFERENCES articles(id) ON DELETE CASCADE, + embedding vector({self.embedding_dimension}) NOT NULL, + embedding_model TEXT, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + + cur.execute( + """ + CREATE TABLE IF NOT EXISTS sync_history ( + id SERIAL PRIMARY KEY, + source_site TEXT NOT NULL, + sync_mode TEXT NOT NULL, + last_sync_time TIMESTAMPTZ, + items_processed INTEGER DEFAULT 0, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + + cur.execute("CREATE INDEX IF NOT EXISTS idx_content_hash ON articles(content_hash)") + cur.execute("CREATE INDEX IF NOT EXISTS idx_source_site ON articles(source_site)") + + def _compute_content_hash(self, item: Dict) -> str: + import hashlib + + full_content = item.get("full_content", item.get("description", "")) + return hashlib.sha256(full_content.encode()).hexdigest() + + def save_article(self, item: Dict, conn: Optional[psycopg.Connection] = None) -> bool: + """Save a single article, returning True if inserted.""" + content_hash = self._compute_content_hash(item) + + if conn is None: + with self.get_connection() as owned_conn: + return self.save_article(item, owned_conn) + + cur = conn.cursor() + cur.execute( + """ + INSERT INTO articles + (id, source_site, title, description, full_content, content_hash, author_info, keywords, content_url, published_date, item_type, created_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT DO NOTHING + """, + ( + item["id"], + item["source_site"], + item["title"], + item.get("description", ""), + item.get("full_content", item.get("description", "")), + content_hash, + item.get("author_info", ""), + item.get("keywords", ""), + item["content_url"], + item.get("published_date", datetime.now(UTC)), + item.get("item_type", "article"), + datetime.now(UTC), + ), + ) + + return cur.rowcount > 0 + + def save_articles_batch(self, items: List[Dict]) -> int: + """Save multiple articles in one transaction.""" + with self.get_connection() as conn: + count = 0 + for item in items: + if self.save_article(item, conn): + count += 1 + return count + + def article_exists(self, article_id: str) -> bool: + """Check if article already exists by ID.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT 1 FROM articles WHERE id = %s", (article_id,)) + return cur.fetchone() is not None + + def article_exists_by_hash(self, content_hash: str) -> bool: + """Check if article already exists by content hash.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT 1 FROM articles WHERE content_hash = %s", (content_hash,)) + return cur.fetchone() is not None + + def get_article_by_hash(self, content_hash: str) -> Optional[Dict]: + """Get article by content hash.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT * FROM articles WHERE content_hash = %s", (content_hash,)) + row = cur.fetchone() + return dict(row) if row else None + + def save_embedding(self, article_id: str, embedding: np.ndarray, model: str = "default") -> bool: + """Save article embedding as a pgvector.""" + if embedding.ndim != 1 or embedding.shape[0] != self.embedding_dimension: + raise ValueError( + f"Embedding dimension mismatch: expected {self.embedding_dimension}, got {embedding.shape}" + ) + + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO embeddings (article_id, embedding, embedding_model) + VALUES (%s, %s, %s) + ON CONFLICT (article_id) DO UPDATE + SET embedding = EXCLUDED.embedding, + embedding_model = EXCLUDED.embedding_model, + created_at = CURRENT_TIMESTAMP + """, + (article_id, embedding.tolist(), model), + ) + return cur.rowcount > 0 + + def get_articles_without_embeddings(self, limit: int = 100) -> List[Dict]: + """Get articles without embeddings.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT a.* FROM articles a + LEFT JOIN embeddings e ON a.id = e.article_id + WHERE e.id IS NULL + LIMIT %s + """, + (limit,), + ) + return list(cur.fetchall()) + + def get_articles_by_source(self, source: str, limit: int = 50) -> List[Dict]: + """Get articles from a specific source.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT * FROM articles + WHERE source_site = %s + ORDER BY published_date DESC NULLS LAST + LIMIT %s + """, + (source, limit), + ) + return list(cur.fetchall()) + + def get_total_articles(self) -> int: + """Return total number of articles.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT COUNT(*) AS count FROM articles") + row = cur.fetchone() + return row["count"] if row else 0 + + def get_articles_by_source_count(self) -> Dict[str, int]: + """Return number of articles per source.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT source_site, COUNT(*) as count + FROM articles + GROUP BY source_site + ORDER BY count DESC + """ + ) + return {row["source_site"]: row["count"] for row in cur.fetchall()} + + def record_sync(self, source: str, mode: str, items_processed: int = 0): + """Record a synchronization event.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + INSERT INTO sync_history + (source_site, sync_mode, last_sync_time, items_processed) + VALUES (%s, %s, %s, %s) + """, + (source, mode, datetime.now(UTC), items_processed), + ) + + def get_last_sync(self, source: str, mode: str) -> Optional[Dict]: + """Get last sync for a source and mode.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute( + """ + SELECT * FROM sync_history + WHERE source_site = %s AND sync_mode = %s + ORDER BY created_at DESC + LIMIT 1 + """, + (source, mode), + ) + row = cur.fetchone() + return dict(row) if row else None + + def get_stats(self) -> Dict: + """Return database statistics.""" + with self.get_connection() as conn: + cur = conn.cursor() + cur.execute("SELECT COUNT(*) AS count FROM articles") + total_articles = cur.fetchone()["count"] + + cur.execute("SELECT COUNT(*) AS count FROM embeddings") + total_embeddings = cur.fetchone()["count"] + + cur.execute( + """ + SELECT source_site, COUNT(*) as count + FROM articles + GROUP BY source_site + """ + ) + articles_by_source = {row["source_site"]: row["count"] for row in cur.fetchall()} + + return { + "total_articles": total_articles, + "total_embeddings": total_embeddings, + "articles_by_source": articles_by_source, + "articles_without_embeddings": total_articles - total_embeddings, + } diff --git a/server/embeddings.py b/server/embeddings.py new file mode 100644 index 0000000..0476a95 --- /dev/null +++ b/server/embeddings.py @@ -0,0 +1,224 @@ +"""Embedding management and generation.""" + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import List, Optional + +import numpy as np +import os + + +class EmbeddingProvider(ABC): + """Abstract base class for embedding providers.""" + + @abstractmethod + def embed(self, text: str) -> np.ndarray: + """ + Generate embedding for text. + + Args: + text: Text to embed + + Returns: + Embedding vector (numpy array) + """ + pass + + @abstractmethod + def get_name(self) -> str: + """Return provider name.""" + pass + + def get_dimension(self) -> Optional[int]: + """Return embedding dimension when available.""" + return None + + +class DummyEmbeddingProvider(EmbeddingProvider): + """Dummy embedding provider for development.""" + + def __init__(self, dimension: int = 1536): + """ + Initialize dummy provider. + + Args: + dimension: Embedding dimension + """ + self.dimension = dimension + + def embed(self, text: str) -> np.ndarray: + """Generate deterministic random embedding from text hash.""" + seed = abs(hash(text)) % (2**31) + np.random.seed(seed) + return np.random.randn(self.dimension).astype(np.float32) + + def get_name(self) -> str: + """Return provider name.""" + return "dummy" + + def get_dimension(self) -> Optional[int]: + return self.dimension + + +class SentenceTransformerEmbeddingProvider(EmbeddingProvider): + """Embedding provider using sentence-transformers.""" + + def __init__(self, model_name: str = "all-MiniLM-L6-v2"): + """ + Initialize SentenceTransformers provider. + + Args: + model_name: Model name to use + """ + try: + from sentence_transformers import SentenceTransformer + except ImportError: + raise ImportError( + "sentence-transformers is required. Install it with: " + "pip install sentence-transformers" + ) + + self.model_name = model_name + self.model = SentenceTransformer(model_name) + self.dimension = getattr(self.model, "get_sentence_embedding_dimension", lambda: None)() + + def embed(self, text: str) -> np.ndarray: + """Generate embedding with SentenceTransformer.""" + embedding = self.model.encode(text, convert_to_numpy=True) + return embedding.astype(np.float32) + + def get_name(self) -> str: + """Return provider name.""" + return f"sentence-transformers-{self.model_name}" + + def get_dimension(self) -> Optional[int]: + return self.dimension + + +class OpenAIEmbeddingProvider(EmbeddingProvider): + """Embedding provider using OpenAI API.""" + + def __init__(self, model: str = "text-embedding-3-small", api_key: Optional[str] = None): + """ + Initialize OpenAI embedding provider. + + Args: + model: OpenAI embedding model to use (text-embedding-3-small, text-embedding-3-large, text-embedding-ada-002) + api_key: OpenAI API key (defaults to .env file or OPENAI_API_KEY env var) + """ + try: + from openai import OpenAI + except ImportError: + raise ImportError( + "openai package is required. Install it with: " + "pip install openai" + ) + + self.model = model + self.dimension = self._infer_dimension(model) + + if api_key: + self.api_key = api_key + else: + self.api_key = self._load_api_key_from_env() + + if not self.api_key: + raise ValueError( + "OpenAI API key is required. Add OPENAI_API_KEY to .env file " + "or set OPENAI_API_KEY environment variable." + ) + + self.client = OpenAI(api_key=self.api_key) + + def _load_api_key_from_env(self) -> Optional[str]: + """Load API key from .env file or environment variable.""" + api_key = os.getenv("OPENAI_API_KEY") + if api_key: + return api_key + + env_path = Path(__file__).parent / ".env" + if env_path.exists(): + with open(env_path, 'r') as f: + for line in f: + line = line.strip() + if line.startswith('OPENAI_API_KEY='): + return line.split('=', 1)[1].strip().strip('"').strip("'") + + return None + + def embed(self, text: str) -> np.ndarray: + """Generate embedding with OpenAI API.""" + max_chars = 30000 + if len(text) > max_chars: + text = text[:max_chars] + + response = self.client.embeddings.create( + input=text, + model=self.model + ) + + embedding = np.array(response.data[0].embedding, dtype=np.float32) + return embedding + + def get_name(self) -> str: + """Return provider name.""" + return f"openai-{self.model}" + + def get_dimension(self) -> Optional[int]: + return self.dimension + + def _infer_dimension(self, model: str) -> Optional[int]: + return { + "text-embedding-3-small": 1536, + "text-embedding-3-large": 3072, + "text-embedding-ada-002": 1536, + }.get(model) + + +class EmbeddingManager: + """Manage embeddings for articles.""" + + def __init__(self, provider: Optional[EmbeddingProvider] = None, expected_dimension: Optional[int] = None): + """Initialize embedding manager. + + Args: + provider: Embedding provider to use (default: Dummy) + expected_dimension: Optional enforced dimension (aligns with DB vector size) + """ + self.provider = provider or DummyEmbeddingProvider() + self.expected_dimension = expected_dimension or self.provider.get_dimension() + + def embed_text(self, text: str) -> np.ndarray: + """Generate embedding for text.""" + embedding = self.provider.embed(text) + embedding = embedding.astype(np.float32) + self._validate_dimension(embedding) + return embedding + + def embed_article(self, article: dict) -> np.ndarray: + """Generate embedding for complete article.""" + full_content = article.get("full_content") + if full_content: + text = full_content + else: + title = article.get("title", "") + description = article.get("description", "") + text = f"{title}\n{description}" + + return self.embed_text(text) + + def deserialize_embedding(self, embedding_values: List[float]) -> np.ndarray: + """Convert stored vector values back to numpy array.""" + embedding = np.array(embedding_values, dtype=np.float32) + self._validate_dimension(embedding) + return embedding + + def get_provider_name(self) -> str: + """Return embedding provider name.""" + return self.provider.get_name() + + def _validate_dimension(self, embedding: np.ndarray): + if self.expected_dimension and embedding.shape[0] != self.expected_dimension: + raise ValueError( + f"Embedding dimension mismatch: expected {self.expected_dimension}, got {embedding.shape[0]}" + ) diff --git a/server/examples.py b/server/examples.py new file mode 100644 index 0000000..2a76c0e --- /dev/null +++ b/server/examples.py @@ -0,0 +1,148 @@ +""" +Usage examples for the watch server. +""" + +import asyncio +import os +from main import WatchServer +from config import DEV_CONFIG, PROD_CONFIG + + +def example_backfill(): + """Example 1: Backfill mode (history).""" + print("\n" + "="*60) + print("EXAMPLE 1 : BACKFILL Mode") + print("="*60) + + server = WatchServer( + db_url=os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/veille_technique"), + check_interval=300 + ) + + print("\nāœ“ Server created, launching backfill...") + server.run_backfill_mode(limit_per_scraper=10) + + server.print_stats() + + +def example_watch_limited(): + """Example 2: Watch mode with iteration limit (for testing).""" + print("\n" + "="*60) + print("EXAMPLE 2 : WATCH Mode (limited to 3 iterations)") + print("="*60) + + server = WatchServer( + db_url=os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/veille_technique"), + check_interval=10 + ) + + print("\nāœ“ Server created, launching watch (3 iterations)...") + print(" Each iteration scrapes all sources\n") + + try: + import threading + + def stop_after_30s(srv): + import time + time.sleep(30) + srv.running = False + print("\nā¹ļø Auto-stopped after 30 seconds") + + thread = threading.Thread(target=stop_after_30s, args=(server,), daemon=True) + thread.start() + + asyncio.run(server.run_watch_mode()) + except KeyboardInterrupt: + print("\nā¹ļø Stopped by user") + + server.print_stats() + + +def example_multi_source_stats(): + """Example 3: View stats with multiple sources scraped.""" + print("\n" + "="*60) + print("EXAMPLE 3 : Backfill + Stats") + print("="*60) + + server = WatchServer(db_url=os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/veille_technique")) + + print("\nšŸ“„ Scraping each source...") + server.run_backfill_mode(limit_per_scraper=5) + + print("\nšŸ“Š Checking stats...") + stats = server.get_stats() + + print(f"\nSummary :") + print(f" Total articles : {stats['total_articles']}") + print(f" Embeddings : {stats['total_embeddings']}") + print(f" Missing : {stats['articles_without_embeddings']}") + + print(f"\nPer source :") + for source, count in sorted(stats['articles_by_source'].items()): + pct = 100 * count / max(1, stats['total_articles']) + print(f" {source:15} : {count:3} ({pct:.1f}%)") + + +def example_custom_config(): + """Example 4: Use custom configuration.""" + print("\n" + "="*60) + print("EXAMPLE 4 : Custom Configuration") + print("="*60) + + from config import ServerConfig, ScraperConfig + + custom_config = ServerConfig( + db_url=os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/veille_technique"), + watch_interval_seconds=120, + scrapers={ + "arxiv": ScraperConfig(enabled=True, limit_latest=10, limit_all=30), + "github": ScraperConfig(enabled=True, limit_latest=15, limit_all=50), + "medium": ScraperConfig(enabled=False), + "lemonde": ScraperConfig(enabled=False), + "huggingface": ScraperConfig(enabled=True, limit_latest=10, limit_all=30), + } + ) + + print(f"\nāœ“ Custom config created") + print(f" DB : {custom_config.db_url}") + print(f" Interval : {custom_config.watch_interval_seconds}s") + + print(f"\nāœ“ Enabled scrapers :") + for name, cfg in custom_config.scrapers.items(): + if cfg.enabled: + print(f" - {name} (latest:{cfg.limit_latest}, all:{cfg.limit_all})") + + +if __name__ == "__main__": + import sys + + print("\n" + "="*60) + print("USAGE EXAMPLES") + print("Watch Server") + print("="*60) + + examples = { + "1": ("Backfill (history)", example_backfill), + "2": ("Watch limited (test)", example_watch_limited), + "3": ("Multi-source stats", example_multi_source_stats), + "4": ("Custom config", example_custom_config), + } + + print("\nChoose an example :") + for key, (desc, _) in examples.items(): + print(f" {key}. {desc}") + + if len(sys.argv) > 1: + choice = sys.argv[1] + else: + choice = input("\nEnter your choice (1-4) : ").strip() + + if choice in examples: + try: + examples[choice][1]() + except Exception as e: + print(f"\nāŒ Error : {e}") + import traceback + traceback.print_exc() + else: + print(f"āŒ Invalid choice : {choice}") diff --git a/server/main.py b/server/main.py new file mode 100644 index 0000000..277efa3 --- /dev/null +++ b/server/main.py @@ -0,0 +1,324 @@ +""" +Watch Server - Technical surveillance with watch and backfill modes. + +Modes: +1. "watch": Scrape new articles continuously +2. "backfill": Retrieve entire available history at startup +""" + +import argparse +import asyncio +import logging +import os +from datetime import datetime, UTC +from typing import Dict, List, Optional + +from database import DatabaseManager +from embeddings import EmbeddingManager, OpenAIEmbeddingProvider +from scrapers.arxiv_scraper import ArxivScraper +from scrapers.github_scraper import GithubScraper +from scrapers.medium_scraper import MediumScraper +from scrapers.lemonde_scraper import LeMondeScraper +from scrapers.huggingface_scraper import HuggingFaceScraper + + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +class WatchServer: + """Technical watch server with watch and backfill modes.""" + + def __init__( + self, + db_url: str, + check_interval: int = 300, + embedding_model: str = "text-embedding-3-small", + ): + """ + Initialize the server. + + Args: + db_url: PostgreSQL connection URL + check_interval: Scraping interval in seconds (watch mode) + embedding_model: Embedding model to use (OpenAI) + """ + embedding_provider = OpenAIEmbeddingProvider(model=embedding_model) + embedding_dim = embedding_provider.get_dimension() or 1536 + + self.db_manager = DatabaseManager(db_url, embedding_dimension=embedding_dim) + self.embedding_manager = EmbeddingManager( + embedding_provider, + expected_dimension=embedding_dim, + ) + + self.check_interval = check_interval + self.running = False + + self.scrapers = self._init_scrapers() + + def _init_scrapers(self) -> Dict[str, object]: + """Initialize all available scrapers.""" + scrapers = {} + + try: + scrapers["arxiv"] = ArxivScraper(category="cs.LG") + logger.info("āœ“ ArXiv scraper initialized") + except ImportError as e: + logger.warning(f"āœ— ArXiv scraper failed: {e}") + + try: + scrapers["github"] = GithubScraper() + logger.info("āœ“ GitHub scraper initialized") + except Exception as e: + logger.warning(f"āœ— GitHub scraper failed: {e}") + + try: + scrapers["medium"] = MediumScraper() + logger.info("āœ“ Medium scraper initialized") + except ImportError as e: + logger.warning(f"āœ— Medium scraper failed: {e}") + + try: + scrapers["lemonde"] = LeMondeScraper() + logger.info("āœ“ Le Monde scraper initialized") + except ImportError as e: + logger.warning(f"āœ— Le Monde scraper failed: {e}") + + try: + scrapers["huggingface"] = HuggingFaceScraper() + logger.info("āœ“ Hugging Face scraper initialized") + except Exception as e: + logger.warning(f"āœ— Hugging Face scraper failed: {e}") + + return scrapers + + def _process_articles(self, articles: List[Dict]) -> int: + """ + Process articles: save to DB and create embeddings. + + Args: + articles: List of normalized articles + + Returns: + Number of new articles processed + """ + import hashlib + + new_count = 0 + + for article in articles: + if self.db_manager.article_exists(article["id"]): + continue + + full_content = article.get("full_content", article.get("description", "")) + content_hash = hashlib.sha256(full_content.encode()).hexdigest() + + if self.db_manager.article_exists_by_hash(content_hash): + logger.debug(f"Article {article['id']} is duplicate (same content hash)") + continue + + if self.db_manager.save_article(article): + new_count += 1 + + try: + embedding = self.embedding_manager.embed_article(article) + self.db_manager.save_embedding( + article["id"], + embedding, + model=self.embedding_manager.get_provider_name() + ) + except Exception as e: + logger.error(f"Embedding error for {article['id']}: {e}") + + return new_count + + def run_backfill_mode(self, limit_per_scraper: int = 100): + """ + Launch backfill mode - scrape entire available history. + + Args: + limit_per_scraper: Maximum articles per scraper + """ + logger.info("=" * 60) + logger.info("šŸ”„ BACKFILL MODE START (History)") + logger.info("=" * 60) + + total_new = 0 + + for source_name, scraper in self.scrapers.items(): + logger.info(f"\nšŸ“„ Scraping {source_name} (backfill mode)...") + + try: + articles = scraper.scrape_all(limit=limit_per_scraper) + + if not articles: + logger.info(f" āš ļø No articles found for {source_name}") + continue + + logger.info(f" šŸ“¦ {len(articles)} articles received") + + new_count = self._process_articles(articles) + total_new += new_count + + logger.info(f" āœ“ {new_count} new articles saved") + + self.db_manager.record_sync(source_name, "backfill", new_count) + + except Exception as e: + logger.error(f" āœ— Error for {source_name}: {e}") + + logger.info("\n" + "=" * 60) + logger.info(f"āœ“ BACKFILL MODE COMPLETE - {total_new} articles processed") + logger.info("=" * 60) + + async def run_watch_mode(self): + """Launch watch mode - scrape continuously.""" + logger.info("=" * 60) + logger.info("šŸ‘€ WATCH MODE START (Surveillance)") + logger.info(f"Scraping interval: {self.check_interval}s") + logger.info("=" * 60) + + self.running = True + iteration = 0 + + try: + while self.running: + iteration += 1 + logger.info(f"\n[Iteration {iteration}] {datetime.now(UTC).isoformat()}") + + total_new = 0 + + for source_name, scraper in self.scrapers.items(): + logger.info(f" šŸ“” Scraping {source_name}...") + + try: + articles = scraper.scrape_latest(limit=20) + + if not articles: + logger.info(f" - No new articles") + continue + + new_count = self._process_articles(articles) + total_new += new_count + + if new_count > 0: + logger.info(f" āœ“ {new_count} new articles") + self.db_manager.record_sync(source_name, "watch", new_count) + else: + logger.info(f" - All articles already exist") + + except Exception as e: + logger.error(f" āœ— Error: {e}") + + logger.info(f" šŸ“Š Total: {total_new} new articles") + logger.info(f" ā³ Waiting {self.check_interval}s...") + + stats = self.db_manager.get_stats() + logger.info(f" šŸ“ˆ DB: {stats['total_articles']} articles, " + f"{stats['articles_without_embeddings']} without embedding") + + await asyncio.sleep(self.check_interval) + + except KeyboardInterrupt: + logger.info("\nā¹ļø Server stopped") + finally: + self.running = False + + def get_stats(self) -> Dict: + """Return database statistics.""" + return self.db_manager.get_stats() + + def print_stats(self): + """Display database statistics.""" + stats = self.get_stats() + + print("\n" + "=" * 60) + print("šŸ“Š DATABASE STATISTICS") + print("=" * 60) + print(f"Total articles: {stats['total_articles']}") + print(f"Articles with embedding: {stats['total_embeddings']}") + print(f"Articles without embedding: {stats['articles_without_embeddings']}") + print("\nArticles per source:") + for source, count in stats['articles_by_source'].items(): + print(f" - {source}: {count}") + print("=" * 60) + + def export_database(self, output_path: str) -> bool: + """Guide export for PostgreSQL deployments.""" + logger.error("Export is not handled automatically for PostgreSQL. Use pg_dump instead.") + logger.info( + "Example: pg_dump --dbname=$DATABASE_URL --format=c --file=%s", + output_path, + ) + return False + + +def main(): + """Server entry point.""" + parser = argparse.ArgumentParser( + description="Technical watch server with watch and backfill modes" + ) + parser.add_argument( + "mode", + choices=["watch", "backfill", "stats", "export"], + help="Execution mode" + ) + parser.add_argument( + "--db-url", + default=os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/veille_technique"), + help="PostgreSQL connection URL (or set DATABASE_URL)" + ) + parser.add_argument( + "--interval", + type=int, + default=300, + help="Scraping interval in seconds (watch mode)" + ) + parser.add_argument( + "--embedding-model", + default=os.getenv("EMBEDDING_MODEL", "text-embedding-3-small"), + help="Embedding model to use (OpenAI)" + ) + parser.add_argument( + "--limit", + type=int, + default=100, + help="Max articles per source (backfill mode)" + ) + parser.add_argument( + "--output", + default="veille_export.db", + help="Output file path for export mode" + ) + + args = parser.parse_args() + + server = WatchServer( + db_url=args.db_url, + check_interval=args.interval, + embedding_model=args.embedding_model, + ) + + if args.mode == "backfill": + server.run_backfill_mode(limit_per_scraper=args.limit) + server.print_stats() + + elif args.mode == "watch": + asyncio.run(server.run_watch_mode()) + + elif args.mode == "stats": + server.print_stats() + + elif args.mode == "export": + logger.info( + "Export helper: use pg_dump on your PostgreSQL instance. Example: pg_dump --dbname=$DATABASE_URL --file=%s", + args.output, + ) + + +if __name__ == "__main__": + main() diff --git a/server/requirements.txt b/server/requirements.txt new file mode 100644 index 0000000..cfc0203 --- /dev/null +++ b/server/requirements.txt @@ -0,0 +1,9 @@ +feedparser==6.0.10 +requests==2.31.0 +arxiv==1.4.8 +numpy==1.26.4 +beautifulsoup4==4.12.2 +PyPDF2==3.0.1 +openai==1.54.0 +psycopg[binary]==3.2.1 +pgvector==0.2.5 diff --git a/server/scrapers/__init__.py b/server/scrapers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/server/scrapers/arxiv_scraper.py b/server/scrapers/arxiv_scraper.py new file mode 100644 index 0000000..4563d4d --- /dev/null +++ b/server/scrapers/arxiv_scraper.py @@ -0,0 +1,166 @@ +"""Scraper for arXiv.""" + +from typing import List, Dict, TYPE_CHECKING +from .base import BaseScraper +import requests +from datetime import datetime +import io + +try: + import arxiv +except ImportError: + arxiv = None + +try: + import PyPDF2 +except ImportError: + PyPDF2 = None + +if TYPE_CHECKING: + from arxiv import Result + + +class ArxivScraper(BaseScraper): + """Scraper for arXiv papers.""" + + def __init__(self, category: str = "cs.LG"): + """ + Initialize ArXiv scraper. + + Args: + category: arXiv category (e.g., "cs.LG", "cs.AI") + """ + super().__init__("arxiv") + self.category = category + + if arxiv is None: + raise ImportError("arxiv package is required. Install it with: pip install arxiv") + + def _extract_pdf_text(self, pdf_url: str) -> str: + """Download and extract text from arXiv PDF.""" + if PyPDF2 is None: + return "" + + try: + response = requests.get(pdf_url, timeout=30) + if response.status_code != 200: + return "" + + pdf_file = io.BytesIO(response.content) + pdf_reader = PyPDF2.PdfReader(pdf_file) + + text_parts = [] + for page in pdf_reader.pages: + text = page.extract_text() + if text: + text_parts.append(text) + + full_text = '\n'.join(text_parts) + return full_text + + except Exception as e: + print(f"[WARN] PDF extraction failed for {pdf_url}: {e}") + return "" + + def _fetch_full_content(self, paper) -> str: + """Fetch full paper content from arXiv PDF.""" + try: + pdf_url = paper.pdf_url + pdf_text = self._extract_pdf_text(pdf_url) + + if pdf_text: + full_content = f"""{paper.title} + +Authors: {', '.join([a.name for a in paper.authors])} + +Category: {paper.primary_category} + +Published: {paper.published.strftime('%Y-%m-%d')} + +Abstract: +{paper.summary} + +Full Paper Content: +{pdf_text}""" + return full_content + else: + full_content = f"""{paper.title} + +Authors: {', '.join([a.name for a in paper.authors])} + +Category: {paper.primary_category} + +{paper.summary}""" + return full_content + except Exception as e: + print(f"[WARN] Content fetch failed: {e}") + return paper.summary + return "" + + def _normalize_result(self, paper) -> Dict: + """Normalize arXiv result.""" + authors = ", ".join([a.name for a in paper.authors]) + link = paper.entry_id + + keywords_list = [paper.primary_category] + if paper.categories: + keywords_list.extend(paper.categories) + + full_content = self._fetch_full_content(paper) + if not full_content: + full_content = paper.summary.replace('\n', ' ') + + return self.normalize_item( + item_id=link, + source_site=self.source_name, + title=paper.title.replace('\n', ' '), + description=paper.summary.replace('\n', ' '), + full_content=full_content, + author_info=authors, + keywords=", ".join(keywords_list), + content_url=link, + published_date=paper.published.isoformat(), + item_type="paper" + ) + + def scrape_latest(self, limit: int = 20) -> List[Dict]: + """Scrape latest articles.""" + try: + search = arxiv.Search( + query=f"cat:{self.category}", + max_results=limit, + sort_by=arxiv.SortCriterion.SubmittedDate, + sort_order=arxiv.SortOrder.Descending + ) + + results = [] + for paper in search.results(): + results.append(self._normalize_result(paper)) + + self.update_last_check() + return results + + except Exception as e: + print(f"[ERROR] ArXiv scrape_latest: {e}") + return [] + + def scrape_all(self, limit: int = 100) -> List[Dict]: + """Scrape all available articles (with limit).""" + try: + search = arxiv.Search( + query=f"cat:{self.category}", + max_results=limit, + sort_by=arxiv.SortCriterion.SubmittedDate, + sort_order=arxiv.SortOrder.Descending + ) + + results = [] + for paper in search.results(): + results.append(self._normalize_result(paper)) + + self.update_last_check() + return results + + except Exception as e: + print(f"[ERROR] ArXiv scrape_all: {e}") + return [] diff --git a/server/scrapers/base.py b/server/scrapers/base.py new file mode 100644 index 0000000..8c1fa66 --- /dev/null +++ b/server/scrapers/base.py @@ -0,0 +1,95 @@ +"""Abstract base class for all scrapers.""" + +from abc import ABC, abstractmethod +from typing import List, Dict, Optional +from datetime import datetime, UTC + + +class BaseScraper(ABC): + """Abstract base class defining interface for all scrapers.""" + + def __init__(self, source_name: str): + """ + Initialize scraper. + + Args: + source_name: Unique source name (e.g., "arxiv", "github", "medium") + """ + self.source_name = source_name + self.last_check = None + + @abstractmethod + def scrape_latest(self, limit: int = 20) -> List[Dict]: + """ + Scrape latest articles/items from source. + Used in watch mode (polling). + + Args: + limit: Maximum number of items to return + + Returns: + List of normalized items + """ + pass + + @abstractmethod + def scrape_all(self, limit: int = 100) -> List[Dict]: + """ + Scrape all available articles/items. + Used in backfill mode (history). + + Args: + limit: Maximum number of items to return + + Returns: + List of normalized items + """ + pass + + def update_last_check(self): + """Update last check timestamp.""" + self.last_check = datetime.now(UTC) + + @staticmethod + def normalize_item( + item_id: str, + source_site: str, + title: str, + description: str, + author_info: str, + keywords: str, + content_url: str, + published_date: str, + item_type: str = "article", + full_content: str = "" + ) -> Dict: + """ + Normalize item to unified format. + + Args: + item_id: Unique item identifier + source_site: Source name + title: Item title + description: Short description/summary + author_info: Author information + keywords: Tags/keywords + content_url: URL to source + published_date: Publication date + item_type: Type of item + full_content: Complete article text (optional) + + Returns: + Dict with unified structure + """ + return { + "id": item_id, + "source_site": source_site, + "title": title, + "description": description, + "full_content": full_content or description, + "author_info": author_info, + "keywords": keywords, + "content_url": content_url, + "published_date": published_date, + "item_type": item_type, + } diff --git a/server/scrapers/github_scraper.py b/server/scrapers/github_scraper.py new file mode 100644 index 0000000..c6782e3 --- /dev/null +++ b/server/scrapers/github_scraper.py @@ -0,0 +1,132 @@ +"""Scraper for GitHub.""" + +import os +import requests +from typing import List, Dict, Optional +from .base import BaseScraper + + +class GithubScraper(BaseScraper): + """Scraper for GitHub repositories.""" + + def __init__(self, token: Optional[str] = None): + """ + Initialize GitHub scraper. + + Args: + token: GitHub token (optional, loads from GITHUB_TOKEN env var) + """ + super().__init__("github") + self.token = token or os.getenv("GITHUB_TOKEN") + self.themes = [ + "large-language-model", "llm", "transformer", "text-generation", + "retrieval-augmented-generation", "rag", "agents", "chatbot", + "fine-tuning", "quantization", "lora", "peft", "diffusion", + "stable-diffusion", "image-generation", "multimodal", + "speech-to-text", "speech-synthesis", "audio", + "reinforcement-learning", "computer-vision", + ] + self.headers = { + "Accept": "application/vnd.github+json", + "User-Agent": "server-ai-watcher/1.0" + } + if self.token: + self.headers["Authorization"] = f"Bearer {self.token}" + + def _fetch_readme(self, full_name: str) -> str: + """Fetch README content from repository.""" + try: + url = f"https://api.github.com/repos/{full_name}/readme" + headers = self.headers.copy() + headers["Accept"] = "application/vnd.github.v3.raw" + + resp = requests.get(url, headers=headers, timeout=10) + if resp.status_code == 200: + return resp.text + except Exception: + pass + return "" + + def _normalize_repo(self, repo: Dict, theme: str) -> Dict: + """Normalize GitHub repository.""" + full_name = repo.get("full_name") + keywords_list = [theme, repo.get("language") or ""] + if repo.get("topics"): + keywords_list.extend(repo.get("topics")) + + updated_at = repo.get("updated_at") or repo.get("pushed_at") + description = repo.get("description") or "" + + readme = self._fetch_readme(full_name) + full_content = f"{description}\n\n{readme}" if readme else description + + return self.normalize_item( + item_id=full_name, + source_site=self.source_name, + title=repo.get("name"), + description=description, + full_content=full_content, + author_info=repo.get("owner", {}).get("login", ""), + keywords=", ".join(filter(None, keywords_list)), + content_url=repo.get("html_url") or f"https://github.com/{full_name}", + published_date=updated_at, + item_type="repository" + ) + + def _search_repos(self, query: str, per_page: int = 20) -> List[Dict]: + """Search repositories with given query.""" + url = "https://api.github.com/search/repositories" + params = { + "q": query, + "sort": "stars", + "order": "desc", + "per_page": per_page + } + + try: + resp = requests.get(url, headers=self.headers, params=params, timeout=20) + + if resp.status_code == 403: + retry_after = resp.headers.get("Retry-After") + raise Exception(f"GitHub rate limit hit. Retry after: {retry_after}") + + if resp.status_code != 200: + print(f"[WARN] GitHub API returned {resp.status_code}") + return [] + + data = resp.json() + return data.get("items", []) + + except Exception as e: + print(f"[ERROR] GitHub search: {e}") + return [] + + def scrape_latest(self, limit: int = 20) -> List[Dict]: + """Scrape latest repositories for themes.""" + results = [] + items_per_theme = max(1, limit // len(self.themes)) + + for theme in self.themes: + query = f"{theme} in:name,description,readme stars:>50" + repos = self._search_repos(query, per_page=items_per_theme) + + for repo in repos: + results.append(self._normalize_repo(repo, theme)) + + self.update_last_check() + return results[:limit] + + def scrape_all(self, limit: int = 100) -> List[Dict]: + """Scrape all available repositories (with limit).""" + results = [] + items_per_theme = max(1, limit // len(self.themes)) + + for theme in self.themes: + query = f"{theme} in:name,description,readme stars:>10" + repos = self._search_repos(query, per_page=items_per_theme) + + for repo in repos: + results.append(self._normalize_repo(repo, theme)) + + self.update_last_check() + return results[:limit] diff --git a/server/scrapers/huggingface_scraper.py b/server/scrapers/huggingface_scraper.py new file mode 100644 index 0000000..9a32abc --- /dev/null +++ b/server/scrapers/huggingface_scraper.py @@ -0,0 +1,124 @@ +"""Scraper for Hugging Face.""" + +import requests +from typing import List, Dict, Optional +from datetime import datetime, UTC +from .base import BaseScraper + + +class HuggingFaceScraper(BaseScraper): + """Scraper for Hugging Face Hub.""" + + def __init__(self): + """Initialize Hugging Face scraper.""" + super().__init__("huggingface") + self.endpoints = [ + ("models", "model"), + ("datasets", "dataset"), + ("spaces", "space"), + ("collections", "collection"), + ("papers", "paper"), + ] + + def _build_url(self, item: Dict, item_type: str) -> str: + """Build public URL for item.""" + base = "https://huggingface.co" + item_id = item.get("id") + + if item_type == "model": + return f"{base}/{item.get('modelId')}" + elif item_type in ("dataset", "space", "collection", "paper"): + return f"{base}/{item_id}" + + return base + + def _fetch_model_card(self, item_id: str, item_type: str) -> str: + """Fetch model card or README from Hugging Face.""" + try: + if item_type == "model": + url = f"https://huggingface.co/{item_id}/raw/main/README.md" + else: + url = f"https://huggingface.co/{item_id}/raw/main/README.md" + + resp = requests.get(url, timeout=10) + if resp.status_code == 200: + return resp.text + except Exception: + pass + return "" + + def _normalize_item(self, item: Dict, item_type: str) -> Dict: + """Normalize Hugging Face item.""" + item_name = item.get("name") or item.get("modelId") or item.get("id") + item_id = item.get("id") or item.get("modelId") or item.get("name") + + author = item.get("author") or item.get("organization", "") + description = item.get("description", item_name) + + keywords_list = [] + if item.get("tags"): + keywords_list.extend(item.get("tags")) + if item.get("pipeline_tag"): + tag = item.get("pipeline_tag") + keywords_list.append(tag if isinstance(tag, str) else ", ".join(tag)) + + last_modified = item.get("lastModified") or item.get("last_modified") or datetime.now(UTC).isoformat() + + model_card = self._fetch_model_card(item_id, item_type) + full_content = model_card if model_card else description + + return self.normalize_item( + item_id=item_id, + source_site=self.source_name, + title=item_name, + description=description, + full_content=full_content, + author_info=author, + keywords=", ".join(keywords_list), + content_url=self._build_url(item, item_type), + published_date=last_modified, + item_type=item_type + ) + + def _fetch_endpoint(self, endpoint: str, item_type: str, limit: int = 20) -> List[Dict]: + """Fetch data from specific endpoint.""" + url = f"https://huggingface.co/api/{endpoint}?sort=lastModified&direction=-1&limit={limit}" + + try: + r = requests.get(url, timeout=20) + + if r.status_code == 404: + return [] + + r.raise_for_status() + + items = r.json() + return [self._normalize_item(item, item_type) for item in items] + + except Exception as e: + print(f"[ERROR] HF {item_type}: {e}") + return [] + + def scrape_latest(self, limit: int = 20) -> List[Dict]: + """Scrape latest items.""" + all_items = [] + items_per_endpoint = max(1, limit // len(self.endpoints)) + + for endpoint, item_type in self.endpoints: + items = self._fetch_endpoint(endpoint, item_type, items_per_endpoint) + all_items.extend(items) + + self.update_last_check() + return all_items[:limit] + + def scrape_all(self, limit: int = 100) -> List[Dict]: + """Scrape all available items.""" + all_items = [] + items_per_endpoint = max(1, limit // len(self.endpoints)) + + for endpoint, item_type in self.endpoints: + items = self._fetch_endpoint(endpoint, item_type, items_per_endpoint) + all_items.extend(items) + + self.update_last_check() + return all_items[:limit] diff --git a/server/scrapers/lemonde_scraper.py b/server/scrapers/lemonde_scraper.py new file mode 100644 index 0000000..0313bf9 --- /dev/null +++ b/server/scrapers/lemonde_scraper.py @@ -0,0 +1,133 @@ +"""Scraper for Le Monde.""" + +from typing import List, Dict +from .base import BaseScraper +import requests +from bs4 import BeautifulSoup + +try: + import feedparser +except ImportError: + feedparser = None + + +class LeMondeScraper(BaseScraper): + """Scraper for Le Monde articles.""" + + def __init__(self): + """Initialize Le Monde scraper.""" + super().__init__("le_monde") + self.feeds = [ + "https://www.lemonde.fr/international/rss_full.xml", + "https://www.lemonde.fr/actualite-medias/rss_full.xml", + "https://www.lemonde.fr/rss/en_continu.xml" + ] + + if feedparser is None: + raise ImportError("feedparser package is required. Install it with: pip install feedparser") + + def _fetch_article_content(self, url: str) -> str: + """Fetch full article content from Le Monde.""" + try: + resp = requests.get(url, timeout=10) + if resp.status_code == 200: + soup = BeautifulSoup(resp.content, 'html.parser') + article = soup.find('article') + if article: + for script in article(["script", "style"]): + script.decompose() + text = article.get_text(separator='\n', strip=True) + return text + except Exception: + pass + return "" + + def _normalize_entry(self, entry: Dict, feed_url: str) -> Dict: + """Normalize RSS entry from Le Monde.""" + import time + from datetime import datetime + + entry_id = getattr(entry, "id", None) or getattr(entry, "link", None) + + published_date = datetime.utcnow().isoformat() + if getattr(entry, "published_parsed", None): + published_date = datetime.fromtimestamp(time.mktime(entry.published_parsed)).isoformat() + elif getattr(entry, "updated_parsed", None): + published_date = datetime.fromtimestamp(time.mktime(entry.updated_parsed)).isoformat() + + category = "general news" + if "international" in feed_url: + category = "international" + elif "medias" in feed_url: + category = "media news" + elif "continu" in feed_url: + category = "continuous" + + summary = getattr(entry, "summary", "") + link = getattr(entry, "link", "") + + article_content = self._fetch_article_content(link) + full_content = article_content if article_content else summary + + return self.normalize_item( + item_id=entry_id, + source_site=self.source_name, + title=getattr(entry, "title", ""), + description=summary, + full_content=full_content, + author_info=getattr(entry, "author", "Le Monde"), + keywords=category, + content_url=link, + published_date=published_date, + item_type="article" + ) + + def scrape_latest(self, limit: int = 20) -> List[Dict]: + """Scrape latest articles.""" + import time + + all_items = [] + unique_ids = set() + items_per_feed = limit // len(self.feeds) + 1 + + for feed_url in self.feeds: + try: + feed = feedparser.parse(feed_url) + + for entry in feed.entries[:items_per_feed]: + entry_id = getattr(entry, "id", None) or getattr(entry, "link", None) + if entry_id and entry_id not in unique_ids: + all_items.append(self._normalize_entry(entry, feed_url)) + unique_ids.add(entry_id) + + time.sleep(1) + except Exception as e: + print(f"[ERROR] Le Monde feed {feed_url}: {e}") + + self.update_last_check() + return all_items[:limit] + + def scrape_all(self, limit: int = 100) -> List[Dict]: + """Scrape all available articles.""" + import time + + all_items = [] + unique_ids = set() + items_per_feed = limit // len(self.feeds) + 1 + + for feed_url in self.feeds: + try: + feed = feedparser.parse(feed_url) + + for entry in feed.entries[:items_per_feed]: + entry_id = getattr(entry, "id", None) or getattr(entry, "link", None) + if entry_id and entry_id not in unique_ids: + all_items.append(self._normalize_entry(entry, feed_url)) + unique_ids.add(entry_id) + + time.sleep(1) + except Exception as e: + print(f"[ERROR] Le Monde feed {feed_url}: {e}") + + self.update_last_check() + return all_items[:limit] diff --git a/server/scrapers/medium_scraper.py b/server/scrapers/medium_scraper.py new file mode 100644 index 0000000..bd2db3c --- /dev/null +++ b/server/scrapers/medium_scraper.py @@ -0,0 +1,123 @@ +"""Scraper for Medium.""" + +from typing import List, Dict +from .base import BaseScraper +import requests +from bs4 import BeautifulSoup + +try: + import feedparser +except ImportError: + feedparser = None + + +class MediumScraper(BaseScraper): + """Scraper for Medium articles.""" + + def __init__(self): + """Initialize Medium scraper.""" + super().__init__("medium") + self.feeds = [ + "https://medium.com/feed/tag/artificial-intelligence", + "https://medium.com/feed/tag/machine-learning", + "https://medium.com/feed/tag/deep-learning", + "https://medium.com/feed/tag/ai", + ] + + if feedparser is None: + raise ImportError("feedparser package is required. Install it with: pip install feedparser") + + def _fetch_article_content(self, url: str) -> str: + """Fetch full article content from Medium.""" + try: + resp = requests.get(url, timeout=10) + if resp.status_code == 200: + soup = BeautifulSoup(resp.content, 'html.parser') + article = soup.find('article') + if article: + paragraphs = article.find_all('p') + content = '\n'.join([p.get_text() for p in paragraphs]) + return content + except Exception: + pass + return "" + + def _normalize_entry(self, entry: Dict) -> Dict: + """Normalize RSS entry from Medium.""" + import time + from datetime import datetime + + entry_id = entry.get('link', '') + + published_date = datetime.utcnow().isoformat() + if getattr(entry, "published_parsed", None): + published_date = datetime.fromtimestamp(time.mktime(entry.published_parsed)).isoformat() + + keywords = [tag.term for tag in entry.get('tags', [])] if 'tags' in entry else [] + summary = entry.get('summary', 'N/A') + + article_content = self._fetch_article_content(entry_id) + full_content = article_content if article_content else summary + + return self.normalize_item( + item_id=entry_id, + source_site=self.source_name, + title=entry.get('title', 'N/A'), + description=summary, + full_content=full_content, + author_info=entry.get('author', 'N/A'), + keywords=", ".join(keywords), + content_url=entry_id, + published_date=published_date, + item_type="article" + ) + + def scrape_latest(self, limit: int = 20) -> List[Dict]: + """Scrape latest articles.""" + import time + + all_items = [] + unique_links = set() + items_per_feed = limit // len(self.feeds) + 1 + + for feed_url in self.feeds: + try: + feed = feedparser.parse(feed_url) + + for entry in feed.entries[:items_per_feed]: + link = entry.get('link') + if link and link not in unique_links: + all_items.append(self._normalize_entry(entry)) + unique_links.add(link) + + time.sleep(1) + except Exception as e: + print(f"[ERROR] Medium feed {feed_url}: {e}") + + self.update_last_check() + return all_items[:limit] + + def scrape_all(self, limit: int = 100) -> List[Dict]: + """Scrape all available articles.""" + import time + + all_items = [] + unique_links = set() + items_per_feed = limit // len(self.feeds) + 1 + + for feed_url in self.feeds: + try: + feed = feedparser.parse(feed_url) + + for entry in feed.entries[:items_per_feed]: + link = entry.get('link') + if link and link not in unique_links: + all_items.append(self._normalize_entry(entry)) + unique_links.add(link) + + time.sleep(1) + except Exception as e: + print(f"[ERROR] Medium feed {feed_url}: {e}") + + self.update_last_check() + return all_items[:limit] diff --git a/server/veille_technique.db b/server/veille_technique.db new file mode 100644 index 0000000..2f2188a Binary files /dev/null and b/server/veille_technique.db differ