diff --git a/README.md b/README.md index 89138ad..ba01a12 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,10 @@ Verktyget kan generera författningar i flera olika format, beroende på använd - **`html`**: Genererar HTML-filer i ELI-struktur (`/eli/sfs/{år}/{nummer}/index.html`) för webbpublicering - **`htmldiff`**: Som HTML men inkluderar även separata versioner för varje ändringsförfattning +### Vektor-format (för semantisk sökning) + +- **`vector`**: Konverterar författningar till vektorembeddings för semantisk sökning och RAG-applikationer. Använder OpenAI:s text-embedding-3-large modell (3072 dimensioner) och stödjer lagring i PostgreSQL (pgvector), Elasticsearch eller JSON-fil. + Exempel på att kombinera flera format: ```bash @@ -186,6 +190,8 @@ Systemet hanterar temporal processing (tidsbaserad filtrering) olika beroende p - **`html`** och **`htmldiff`**: Tillämpar temporal processing med dagens datum innan HTML-generering, liknande `md`-format. +- **`vector`**: Tillämpar temporal processing med dagens datum (eller angivet `--target-date`) innan vektorgenerering. Detta säkerställer att endast gällande regelverk inkluderas i vektordatabasen. + #### Exempel med target-date För att se hur en lag såg ut vid ett specifikt datum: @@ -207,17 +213,77 @@ python sfs_processor.py [--input INPUT] [--output OUTPUT] [--formats FORMATS] [- - `--input`: Input-katalog med JSON-filer (default: "sfs_json") - `--output`: Output-katalog för konverterade filer (default: "SFS") -- `--formats`: Utdataformat att generera, kommaseparerat. Stödjer: md-markers, md, git, html, htmldiff (default: "md-markers") +- `--formats`: Utdataformat att generera, kommaseparerat. Stödjer: md-markers, md, git, html, htmldiff, vector (default: "md-markers") - `md-markers`: Generera markdown-filer med section-taggar bevarade - `md`: Generera rena markdown-filer utan section-taggar - `git`: Aktivera Git-commits med historiska datum - `html`: Generera HTML-filer i ELI-struktur (endast grunddokument) - `htmldiff`: Generera HTML-filer i ELI-struktur med ändringsversioner + - `vector`: Generera vektorembeddings för semantisk sökning - `--filter`: Filtrera filer efter år (YYYY) eller specifik beteckning (YYYY:NNN). Kan vara kommaseparerad lista. -- `--target-date`: Datum (YYYY-MM-DD) för temporal filtrering, baserat på selex-taggar. Används med `md`, `html` och `htmldiff` format för att filtrera innehåll baserat på giltighetsdatum. Om inte angivet används dagens datum för `md`-format. Exempel: `--target-date 2023-01-01` +- `--target-date`: Datum (YYYY-MM-DD) för temporal filtrering, baserat på selex-taggar. Används med `md`, `html`, `htmldiff` och `vector` format för att filtrera innehåll baserat på giltighetsdatum. Om inte angivet används dagens datum. Exempel: `--target-date 2023-01-01` - `--no-year-folder`: Skapa inte årbaserade undermappar för dokument - `--verbose`: Visa detaljerad information om bearbetningen +### Vektor-specifika parametrar + +- `--vector-backend`: Backend för vektorlagring (default: "json") + - `json`: Spara till JSON-fil (för test/utveckling) + - `postgresql`: PostgreSQL med pgvector-extension + - `elasticsearch`: Elasticsearch med dense_vector +- `--vector-chunking`: Strategi för att dela upp dokument (default: "paragraph") + - `paragraph`: Dela per paragraf (§) - bevarar juridisk struktur + - `chapter`: Dela per kapitel - större kontext + - `section`: Dela per selex-sektion + - `semantic`: Semantiska gränser med överlapp + - `fixed_size`: Fast tokenantal med överlapp +- `--embedding-model`: Embedding-modell (default: "text-embedding-3-large") +- `--vector-mock`: Använd mock-embeddings för test utan OpenAI API-nyckel + +## Vektorexport för semantisk sökning + +Vektorformatet (`--formats vector`) konverterar författningar till vektorembeddings som kan användas för semantisk sökning, RAG-applikationer (Retrieval-Augmented Generation) och AI-assistenter. + +### Hur det fungerar + +1. **Temporal filtrering**: Endast gällande regelverk inkluderas (samma som `md`/`html` mode) +2. **Intelligent chunking**: Dokument delas upp på ett sätt som bevarar juridisk struktur +3. **Embedding-generering**: Text konverteras till vektorer med OpenAI text-embedding-3-large +4. **Lagring**: Vektorer sparas till vald backend med fullständig metadata + +### Exempel + +```bash +# Test med mock-embeddings (utan API-nyckel) +python sfs_processor.py --formats vector --vector-mock --filter 2024:100 + +# Produktion med OpenAI (kräver OPENAI_API_KEY miljövariabel) +python sfs_processor.py --formats vector --filter 2024 + +# Med PostgreSQL/pgvector backend +python sfs_processor.py --formats vector --vector-backend postgresql + +# Med kapitel-chunking för större kontext +python sfs_processor.py --formats vector --vector-chunking chapter +``` + +### Backends + +| Backend | Användning | Krav | +|---------|-----------|------| +| `json` | Test/utveckling | Inga | +| `postgresql` | Produktion | PostgreSQL 12+ med pgvector | +| `elasticsearch` | Produktion | Elasticsearch 8.0+ | + +### Metadata som sparas + +Varje vektor-chunk innehåller: +- `document_id`: Beteckning (t.ex. "2024:100") +- `chapter`: Kapitelreferens (t.ex. "1 kap.") +- `paragraph`: Paragrafreferens (t.ex. "1 §") +- `departement`: Ansvarigt departement +- `ikraft_datum`: Ikraftträdandedatum + ## Bidra Vi välkomnar bidrag från communityn! 🙌 diff --git a/exporters/vector/__init__.py b/exporters/vector/__init__.py new file mode 100644 index 0000000..b9466f6 --- /dev/null +++ b/exporters/vector/__init__.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 +""" +Vector export functionality for SFS documents. + +This module provides tools for converting Swedish legal documents (SFS) +to vector embeddings suitable for semantic search and retrieval. + +The vector exporter applies temporal filtering (like md/html mode) to ensure +only current regulations are included in the vector store. +""" + +from exporters.vector.vector_export import create_vector_documents, VectorExportConfig +from exporters.vector.chunking import chunk_document, ChunkingStrategy +from exporters.vector.embeddings import EmbeddingProvider, get_embedding_provider + +__all__ = [ + 'create_vector_documents', + 'VectorExportConfig', + 'chunk_document', + 'ChunkingStrategy', + 'EmbeddingProvider', + 'get_embedding_provider', +] diff --git a/exporters/vector/backends/__init__.py b/exporters/vector/backends/__init__.py new file mode 100644 index 0000000..dd220a1 --- /dev/null +++ b/exporters/vector/backends/__init__.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +""" +Vector store backends for SFS documents. + +Supported backends: +- PostgreSQL with pgvector extension +- Elasticsearch with dense_vector +- JSON file (for testing/development) +""" + +from exporters.vector.backends.base import VectorStoreBackend, VectorRecord +from exporters.vector.backends.postgresql import PostgreSQLBackend +from exporters.vector.backends.elasticsearch import ElasticsearchBackend +from exporters.vector.backends.json_file import JSONFileBackend + +__all__ = [ + 'VectorStoreBackend', + 'VectorRecord', + 'PostgreSQLBackend', + 'ElasticsearchBackend', + 'JSONFileBackend', +] + + +def get_backend(backend_type: str, **kwargs) -> VectorStoreBackend: + """ + Factory function to get a vector store backend. + + Args: + backend_type: Type of backend ("postgresql", "elasticsearch", "json") + **kwargs: Backend-specific configuration + + Returns: + A VectorStoreBackend instance + + Example: + # PostgreSQL with pgvector + backend = get_backend("postgresql", connection_string="postgresql://...") + + # Elasticsearch + backend = get_backend("elasticsearch", hosts=["http://localhost:9200"]) + + # JSON file for testing + backend = get_backend("json", file_path="vectors.json") + """ + backends = { + "postgresql": PostgreSQLBackend, + "postgres": PostgreSQLBackend, + "pgvector": PostgreSQLBackend, + "elasticsearch": ElasticsearchBackend, + "elastic": ElasticsearchBackend, + "es": ElasticsearchBackend, + "json": JSONFileBackend, + "file": JSONFileBackend, + } + + backend_type_lower = backend_type.lower() + if backend_type_lower not in backends: + available = list(set(backends.values())) + raise ValueError( + f"Unknown backend type: {backend_type}. " + f"Available: postgresql, elasticsearch, json" + ) + + return backends[backend_type_lower](**kwargs) diff --git a/exporters/vector/backends/base.py b/exporters/vector/backends/base.py new file mode 100644 index 0000000..0b8a2f4 --- /dev/null +++ b/exporters/vector/backends/base.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +""" +Base class and interfaces for vector store backends. + +This module defines the abstract interface that all vector store +backends must implement. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Any, Tuple +from datetime import datetime + + +@dataclass +class VectorRecord: + """ + A record to be stored in the vector database. + + Contains the embedding vector along with metadata for retrieval. + """ + # Unique identifier for this record + id: str + + # The embedding vector + vector: List[float] + + # Content that was embedded + content: str + + # Document reference + document_id: str # beteckning (e.g., "2024:100") + document_title: Optional[str] = None # rubrik + + # Structural metadata + chunk_index: int = 0 + total_chunks: int = 1 + chapter: Optional[str] = None # e.g., "1 kap." + paragraph: Optional[str] = None # e.g., "1 §" + section_type: Optional[str] = None # kapitel, paragraf, etc. + + # Document metadata + departement: Optional[str] = None + effective_date: Optional[str] = None # ikraft_datum - when regulation takes effect + issued_date: Optional[str] = None # utfardad_datum - when regulation was issued + repealed: bool = False # upphavd - if regulation is repealed + expiration_date: Optional[str] = None # upphor_datum - when regulation expires + + # Technical metadata + embedding_model: Optional[str] = None + dimensions: int = 0 + created_at: Optional[datetime] = None + + # Additional metadata as dict + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert record to dictionary for serialization.""" + return { + 'id': self.id, + 'vector': self.vector, + 'content': self.content, + 'document_id': self.document_id, + 'document_title': self.document_title, + 'chunk_index': self.chunk_index, + 'total_chunks': self.total_chunks, + 'chapter': self.chapter, + 'paragraph': self.paragraph, + 'section_type': self.section_type, + 'departement': self.departement, + 'effective_date': self.effective_date, + 'issued_date': self.issued_date, + 'repealed': self.repealed, + 'expiration_date': self.expiration_date, + 'embedding_model': self.embedding_model, + 'dimensions': self.dimensions, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'metadata': self.metadata, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'VectorRecord': + """Create a VectorRecord from a dictionary.""" + created_at = data.get('created_at') + if isinstance(created_at, str): + created_at = datetime.fromisoformat(created_at) + + return cls( + id=data['id'], + vector=data['vector'], + content=data['content'], + document_id=data['document_id'], + document_title=data.get('document_title'), + chunk_index=data.get('chunk_index', 0), + total_chunks=data.get('total_chunks', 1), + chapter=data.get('chapter'), + paragraph=data.get('paragraph'), + section_type=data.get('section_type'), + departement=data.get('departement'), + effective_date=data.get('effective_date'), + issued_date=data.get('issued_date'), + repealed=data.get('repealed', False), + expiration_date=data.get('expiration_date'), + embedding_model=data.get('embedding_model'), + dimensions=data.get('dimensions', 0), + created_at=created_at, + metadata=data.get('metadata', {}), + ) + + +@dataclass +class SearchResult: + """Result from a vector similarity search.""" + record: VectorRecord + score: float # Similarity score + distance: Optional[float] = None # Distance metric (if applicable) + + +class VectorStoreBackend(ABC): + """ + Abstract base class for vector store backends. + + All backend implementations must implement these methods + to provide a consistent interface for storing and searching vectors. + """ + + @abstractmethod + def initialize(self, dimensions: int, **kwargs) -> None: + """ + Initialize the backend (create tables, indices, etc.). + + Args: + dimensions: The dimensionality of vectors to store + **kwargs: Backend-specific initialization options + """ + pass + + @abstractmethod + def insert(self, record: VectorRecord) -> None: + """ + Insert a single record into the store. + + Args: + record: The VectorRecord to insert + """ + pass + + @abstractmethod + def insert_batch(self, records: List[VectorRecord]) -> int: + """ + Insert multiple records in a batch. + + Args: + records: List of VectorRecords to insert + + Returns: + Number of records successfully inserted + """ + pass + + @abstractmethod + def search( + self, + query_vector: List[float], + limit: int = 10, + filters: Optional[Dict[str, Any]] = None + ) -> List[SearchResult]: + """ + Search for similar vectors. + + Args: + query_vector: The vector to search for + limit: Maximum number of results to return + filters: Optional filters to apply (e.g., {"document_id": "2024:100"}) + + Returns: + List of SearchResult objects ordered by similarity + """ + pass + + @abstractmethod + def delete_by_document(self, document_id: str) -> int: + """ + Delete all records for a specific document. + + Args: + document_id: The document identifier (beteckning) + + Returns: + Number of records deleted + """ + pass + + @abstractmethod + def get_by_id(self, record_id: str) -> Optional[VectorRecord]: + """ + Retrieve a specific record by ID. + + Args: + record_id: The record identifier + + Returns: + The VectorRecord if found, None otherwise + """ + pass + + @abstractmethod + def count(self, filters: Optional[Dict[str, Any]] = None) -> int: + """ + Count records in the store. + + Args: + filters: Optional filters to apply + + Returns: + Number of records matching the filters + """ + pass + + @abstractmethod + def close(self) -> None: + """Close any open connections.""" + pass + + def upsert(self, record: VectorRecord) -> None: + """ + Insert or update a record. + + Default implementation deletes existing and inserts new. + Backends may override with more efficient implementations. + + Args: + record: The VectorRecord to upsert + """ + existing = self.get_by_id(record.id) + if existing: + self.delete_by_document(record.document_id) + self.insert(record) + + def upsert_batch(self, records: List[VectorRecord]) -> int: + """ + Insert or update multiple records. + + Default implementation processes records individually. + Backends may override with more efficient implementations. + + Args: + records: List of VectorRecords to upsert + + Returns: + Number of records successfully upserted + """ + # Group by document_id for efficient deletion + doc_ids = set(r.document_id for r in records) + for doc_id in doc_ids: + self.delete_by_document(doc_id) + + return self.insert_batch(records) + + def health_check(self) -> Tuple[bool, str]: + """ + Check if the backend is healthy and accessible. + + Returns: + Tuple of (is_healthy, message) + """ + try: + count = self.count() + return True, f"Backend healthy, {count} records" + except Exception as e: + return False, f"Backend error: {e}" diff --git a/exporters/vector/backends/elasticsearch.py b/exporters/vector/backends/elasticsearch.py new file mode 100644 index 0000000..fc838cf --- /dev/null +++ b/exporters/vector/backends/elasticsearch.py @@ -0,0 +1,458 @@ +#!/usr/bin/env python3 +""" +Elasticsearch backend for vector storage. + +This backend uses Elasticsearch's dense_vector field type for vector similarity search. +Supports both exact and approximate (HNSW) similarity search. + +Requirements: + - Elasticsearch 8.0+ (for native kNN support) + - elasticsearch-py (pip install elasticsearch) + +Environment variables: + - ELASTICSEARCH_HOSTS: Comma-separated list of hosts + - ELASTICSEARCH_API_KEY: API key for authentication + - ELASTICSEARCH_USERNAME / ELASTICSEARCH_PASSWORD: Basic auth +""" + +import os +import json +from datetime import datetime +from typing import List, Optional, Dict, Any, Tuple + +from exporters.vector.backends.base import VectorStoreBackend, VectorRecord, SearchResult + + +class ElasticsearchBackend(VectorStoreBackend): + """ + Elasticsearch vector store backend using dense_vector fields. + + Supports both exact kNN and approximate (HNSW) similarity search + with cosine, dot_product, or l2_norm similarity metrics. + """ + + DEFAULT_INDEX_NAME = "sfs_vectors" + + def __init__( + self, + hosts: Optional[List[str]] = None, + api_key: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + cloud_id: Optional[str] = None, + index_name: str = DEFAULT_INDEX_NAME, + similarity: str = "cosine", # "cosine", "dot_product", "l2_norm" + use_approximate_knn: bool = True + ): + """ + Initialize Elasticsearch backend. + + Args: + hosts: List of Elasticsearch hosts + api_key: API key for authentication + username: Username for basic auth + password: Password for basic auth + cloud_id: Elastic Cloud ID (alternative to hosts) + index_name: Name of the index + similarity: Similarity metric + use_approximate_knn: Use HNSW for approximate kNN (faster) + """ + self.index_name = index_name + self.similarity = similarity + self.use_approximate_knn = use_approximate_knn + self._dimensions = None + self._client = None + + # Build connection config + self._hosts = hosts or self._parse_hosts( + os.environ.get('ELASTICSEARCH_HOSTS', 'http://localhost:9200') + ) + self._api_key = api_key or os.environ.get('ELASTICSEARCH_API_KEY') + self._username = username or os.environ.get('ELASTICSEARCH_USERNAME') + self._password = password or os.environ.get('ELASTICSEARCH_PASSWORD') + self._cloud_id = cloud_id or os.environ.get('ELASTICSEARCH_CLOUD_ID') + + def _parse_hosts(self, hosts_str: str) -> List[str]: + """Parse comma-separated hosts string.""" + return [h.strip() for h in hosts_str.split(',') if h.strip()] + + def _get_client(self): + """Get or create Elasticsearch client.""" + if self._client is None: + try: + from elasticsearch import Elasticsearch + except ImportError: + raise ImportError( + "Elasticsearch package not installed. Install with: " + "pip install elasticsearch" + ) + + # Build client with available credentials + client_kwargs = {} + + if self._cloud_id: + client_kwargs['cloud_id'] = self._cloud_id + else: + client_kwargs['hosts'] = self._hosts + + if self._api_key: + client_kwargs['api_key'] = self._api_key + elif self._username and self._password: + client_kwargs['basic_auth'] = (self._username, self._password) + + self._client = Elasticsearch(**client_kwargs) + + return self._client + + def initialize(self, dimensions: int, **kwargs) -> None: + """ + Initialize the Elasticsearch index. + + Creates the index with appropriate mappings for vector search. + + Args: + dimensions: Vector dimensionality + **kwargs: Additional options (e.g., number_of_shards) + """ + self._dimensions = dimensions + client = self._get_client() + + # Check if index exists + if client.indices.exists(index=self.index_name): + print(f"Elasticsearch index '{self.index_name}' already exists") + return + + # Build index settings + settings = { + "number_of_shards": kwargs.get('number_of_shards', 1), + "number_of_replicas": kwargs.get('number_of_replicas', 0) + } + + # Build mappings + mappings = { + "properties": { + "id": {"type": "keyword"}, + "vector": { + "type": "dense_vector", + "dims": dimensions, + "index": self.use_approximate_knn, + "similarity": self.similarity + }, + "content": { + "type": "text", + "analyzer": "swedish" # Use Swedish analyzer + }, + "document_id": {"type": "keyword"}, + "document_title": {"type": "text"}, + "chunk_index": {"type": "integer"}, + "total_chunks": {"type": "integer"}, + "chapter": {"type": "keyword"}, + "paragraph": {"type": "keyword"}, + "section_type": {"type": "keyword"}, + "departement": {"type": "keyword"}, + "effective_date": {"type": "date", "format": "yyyy-MM-dd||strict_date"}, + "issued_date": {"type": "date", "format": "yyyy-MM-dd||strict_date"}, + "repealed": {"type": "boolean"}, + "expiration_date": {"type": "date", "format": "yyyy-MM-dd||strict_date"}, + "embedding_model": {"type": "keyword"}, + "dimensions": {"type": "integer"}, + "created_at": {"type": "date"}, + "metadata": {"type": "object", "enabled": False} + } + } + + # Create index + client.indices.create( + index=self.index_name, + settings=settings, + mappings=mappings + ) + + print(f"Elasticsearch index '{self.index_name}' created with {dimensions}-dimensional vectors") + + def _record_to_doc(self, record: VectorRecord) -> Dict[str, Any]: + """Convert VectorRecord to Elasticsearch document.""" + doc = { + "id": record.id, + "vector": record.vector, + "content": record.content, + "document_id": record.document_id, + "document_title": record.document_title, + "chunk_index": record.chunk_index, + "total_chunks": record.total_chunks, + "chapter": record.chapter, + "paragraph": record.paragraph, + "section_type": record.section_type, + "departement": record.departement, + "embedding_model": record.embedding_model, + "dimensions": record.dimensions, + "created_at": record.created_at.isoformat() if record.created_at else datetime.now().isoformat(), + "metadata": record.metadata + } + + # Handle date fields (Elasticsearch expects proper format or null) + if record.effective_date: + doc["effective_date"] = record.effective_date + if record.issued_date: + doc["issued_date"] = record.issued_date + if record.expiration_date: + doc["expiration_date"] = record.expiration_date + + # Handle boolean field + doc["repealed"] = record.repealed + + return doc + + def _doc_to_record(self, doc: Dict[str, Any]) -> VectorRecord: + """Convert Elasticsearch document to VectorRecord.""" + source = doc.get('_source', doc) + + created_at = source.get('created_at') + if isinstance(created_at, str): + try: + created_at = datetime.fromisoformat(created_at.replace('Z', '+00:00')) + except ValueError: + created_at = None + + return VectorRecord( + id=source.get('id', doc.get('_id', '')), + vector=source.get('vector', []), + content=source.get('content', ''), + document_id=source.get('document_id', ''), + document_title=source.get('document_title'), + chunk_index=source.get('chunk_index', 0), + total_chunks=source.get('total_chunks', 1), + chapter=source.get('chapter'), + paragraph=source.get('paragraph'), + section_type=source.get('section_type'), + departement=source.get('departement'), + effective_date=source.get('effective_date'), + issued_date=source.get('issued_date'), + repealed=source.get('repealed', False), + expiration_date=source.get('expiration_date'), + embedding_model=source.get('embedding_model'), + dimensions=source.get('dimensions', 0), + created_at=created_at, + metadata=source.get('metadata', {}) + ) + + def insert(self, record: VectorRecord) -> None: + """Insert a single record.""" + self.insert_batch([record]) + + def insert_batch(self, records: List[VectorRecord]) -> int: + """Insert multiple records using bulk API.""" + if not records: + return 0 + + client = self._get_client() + + # Build bulk operations + operations = [] + for record in records: + operations.append({"index": {"_index": self.index_name, "_id": record.id}}) + operations.append(self._record_to_doc(record)) + + # Execute bulk request + response = client.bulk(operations=operations, refresh=True) + + # Count successful inserts + inserted = sum(1 for item in response['items'] if item['index'].get('result') in ['created', 'updated']) + + if response.get('errors'): + errors = [ + item['index']['error'] + for item in response['items'] + if 'error' in item.get('index', {}) + ] + for error in errors[:5]: # Show first 5 errors + print(f"Elasticsearch error: {error}") + + return inserted + + def search( + self, + query_vector: List[float], + limit: int = 10, + filters: Optional[Dict[str, Any]] = None + ) -> List[SearchResult]: + """Search for similar vectors using kNN.""" + client = self._get_client() + + # Build filter query + filter_query = [] + if filters: + for key, value in filters.items(): + if key in ['document_id', 'chapter', 'paragraph', 'section_type', 'departement']: + filter_query.append({"term": {key: value}}) + + # Build kNN query + knn = { + "field": "vector", + "query_vector": query_vector, + "k": limit, + "num_candidates": limit * 10 # Search more candidates for better accuracy + } + + if filter_query: + knn["filter"] = {"bool": {"must": filter_query}} + + # Execute search + response = client.search( + index=self.index_name, + knn=knn, + size=limit, + _source=True + ) + + # Convert results + results = [] + for hit in response['hits']['hits']: + record = self._doc_to_record(hit) + score = hit.get('_score', 0) + + results.append(SearchResult( + record=record, + score=score, + distance=1 - score if self.similarity == "cosine" else None + )) + + return results + + def delete_by_document(self, document_id: str) -> int: + """Delete all records for a document.""" + client = self._get_client() + + response = client.delete_by_query( + index=self.index_name, + query={"term": {"document_id": document_id}}, + refresh=True + ) + + return response.get('deleted', 0) + + def get_by_id(self, record_id: str) -> Optional[VectorRecord]: + """Get a record by ID.""" + client = self._get_client() + + try: + response = client.get(index=self.index_name, id=record_id) + return self._doc_to_record(response) + except Exception: + return None + + def count(self, filters: Optional[Dict[str, Any]] = None) -> int: + """Count records in the index.""" + client = self._get_client() + + query = {"match_all": {}} + if filters: + must = [] + for key, value in filters.items(): + if key in ['document_id', 'chapter', 'paragraph', 'section_type', 'departement']: + must.append({"term": {key: value}}) + if must: + query = {"bool": {"must": must}} + + response = client.count(index=self.index_name, query=query) + return response.get('count', 0) + + def close(self) -> None: + """Close Elasticsearch client.""" + if self._client: + self._client.close() + self._client = None + + def health_check(self) -> Tuple[bool, str]: + """Check Elasticsearch connectivity.""" + try: + client = self._get_client() + health = client.cluster.health() + status = health.get('status', 'unknown') + count = self.count() + return True, f"Elasticsearch status: {status}, {count} records in {self.index_name}" + except Exception as e: + return False, f"Elasticsearch error: {e}" + + def hybrid_search( + self, + query_vector: List[float], + query_text: str, + limit: int = 10, + vector_weight: float = 0.7, + text_weight: float = 0.3, + filters: Optional[Dict[str, Any]] = None + ) -> List[SearchResult]: + """ + Perform hybrid search combining vector similarity and text search. + + This is useful for Swedish legal documents where exact term matching + can complement semantic search. + + Args: + query_vector: Vector embedding of the query + query_text: Text query for BM25 matching + limit: Number of results to return + vector_weight: Weight for vector similarity (0-1) + text_weight: Weight for text similarity (0-1) + filters: Optional filters + + Returns: + List of SearchResult objects + """ + client = self._get_client() + + # Build filter + filter_query = [] + if filters: + for key, value in filters.items(): + if key in ['document_id', 'chapter', 'paragraph', 'section_type', 'departement']: + filter_query.append({"term": {key: value}}) + + # Build hybrid query using script_score + # This combines kNN similarity with BM25 text relevance + query_body = { + "query": { + "script_score": { + "query": { + "bool": { + "should": [ + { + "match": { + "content": { + "query": query_text, + "boost": text_weight + } + } + } + ], + "filter": filter_query if filter_query else [] + } + }, + "script": { + "source": f""" + double vector_score = cosineSimilarity(params.query_vector, 'vector') + 1.0; + double text_score = _score; + return {vector_weight} * vector_score + {text_weight} * text_score; + """, + "params": { + "query_vector": query_vector + } + } + } + }, + "size": limit + } + + response = client.search(index=self.index_name, body=query_body) + + results = [] + for hit in response['hits']['hits']: + record = self._doc_to_record(hit) + results.append(SearchResult( + record=record, + score=hit.get('_score', 0), + distance=None + )) + + return results diff --git a/exporters/vector/backends/json_file.py b/exporters/vector/backends/json_file.py new file mode 100644 index 0000000..3893ab8 --- /dev/null +++ b/exporters/vector/backends/json_file.py @@ -0,0 +1,264 @@ +#!/usr/bin/env python3 +""" +JSON file backend for vector storage. + +This is a simple file-based backend for testing and development. +It stores vectors in a JSON file and performs brute-force similarity search. + +Not recommended for production use with large datasets. +""" + +import json +import os +import math +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Dict, Any, Tuple + +from exporters.vector.backends.base import VectorStoreBackend, VectorRecord, SearchResult + + +class JSONFileBackend(VectorStoreBackend): + """ + JSON file-based vector store for testing and development. + + Stores all records in a single JSON file and performs brute-force + similarity search. Suitable for small datasets and testing. + """ + + def __init__( + self, + file_path: str = "sfs_vectors.json", + similarity: str = "cosine", # "cosine", "euclidean", "dot_product" + pretty_print: bool = True + ): + """ + Initialize JSON file backend. + + Args: + file_path: Path to the JSON file + similarity: Similarity metric for search + pretty_print: Whether to format JSON with indentation + """ + self.file_path = Path(file_path) + self.similarity = similarity + self.pretty_print = pretty_print + self._dimensions = None + self._data = None + + def _load_data(self) -> Dict[str, Any]: + """Load data from JSON file.""" + if self._data is not None: + return self._data + + if self.file_path.exists(): + try: + with open(self.file_path, 'r', encoding='utf-8') as f: + self._data = json.load(f) + except json.JSONDecodeError: + self._data = {"records": {}, "metadata": {}} + else: + self._data = {"records": {}, "metadata": {}} + + return self._data + + def _save_data(self) -> None: + """Save data to JSON file.""" + if self._data is None: + return + + # Ensure parent directory exists + self.file_path.parent.mkdir(parents=True, exist_ok=True) + + indent = 2 if self.pretty_print else None + with open(self.file_path, 'w', encoding='utf-8') as f: + json.dump(self._data, f, ensure_ascii=False, indent=indent, default=str) + + def initialize(self, dimensions: int, **kwargs) -> None: + """Initialize the backend.""" + self._dimensions = dimensions + data = self._load_data() + data['metadata'] = { + "dimensions": dimensions, + "similarity": self.similarity, + "created_at": datetime.now().isoformat() + } + self._save_data() + print(f"JSON file backend initialized at {self.file_path}") + + def insert(self, record: VectorRecord) -> None: + """Insert a single record.""" + self.insert_batch([record]) + + def insert_batch(self, records: List[VectorRecord]) -> int: + """Insert multiple records.""" + if not records: + return 0 + + data = self._load_data() + + for record in records: + data['records'][record.id] = record.to_dict() + + self._save_data() + return len(records) + + def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float: + """Calculate cosine similarity between two vectors.""" + dot_product = sum(a * b for a, b in zip(vec1, vec2)) + norm1 = math.sqrt(sum(a * a for a in vec1)) + norm2 = math.sqrt(sum(b * b for b in vec2)) + + if norm1 == 0 or norm2 == 0: + return 0.0 + + return dot_product / (norm1 * norm2) + + def _euclidean_distance(self, vec1: List[float], vec2: List[float]) -> float: + """Calculate Euclidean distance between two vectors.""" + return math.sqrt(sum((a - b) ** 2 for a, b in zip(vec1, vec2))) + + def _dot_product(self, vec1: List[float], vec2: List[float]) -> float: + """Calculate dot product between two vectors.""" + return sum(a * b for a, b in zip(vec1, vec2)) + + def _calculate_similarity(self, vec1: List[float], vec2: List[float]) -> Tuple[float, float]: + """Calculate similarity and distance between vectors.""" + if self.similarity == "cosine": + sim = self._cosine_similarity(vec1, vec2) + dist = 1 - sim + elif self.similarity == "euclidean": + dist = self._euclidean_distance(vec1, vec2) + sim = 1 / (1 + dist) + elif self.similarity == "dot_product": + sim = self._dot_product(vec1, vec2) + dist = -sim + else: + sim = self._cosine_similarity(vec1, vec2) + dist = 1 - sim + + return sim, dist + + def search( + self, + query_vector: List[float], + limit: int = 10, + filters: Optional[Dict[str, Any]] = None + ) -> List[SearchResult]: + """Search for similar vectors using brute force.""" + data = self._load_data() + results = [] + + for record_id, record_dict in data.get('records', {}).items(): + # Apply filters + if filters: + skip = False + for key, value in filters.items(): + if record_dict.get(key) != value: + skip = True + break + if skip: + continue + + record = VectorRecord.from_dict(record_dict) + score, distance = self._calculate_similarity(query_vector, record.vector) + + results.append(SearchResult( + record=record, + score=score, + distance=distance + )) + + # Sort by score (descending) and return top k + results.sort(key=lambda x: x.score, reverse=True) + return results[:limit] + + def delete_by_document(self, document_id: str) -> int: + """Delete all records for a document.""" + data = self._load_data() + deleted = 0 + + # Find and delete records with matching document_id + to_delete = [ + record_id for record_id, record_dict in data.get('records', {}).items() + if record_dict.get('document_id') == document_id + ] + + for record_id in to_delete: + del data['records'][record_id] + deleted += 1 + + if deleted > 0: + self._save_data() + + return deleted + + def get_by_id(self, record_id: str) -> Optional[VectorRecord]: + """Get a record by ID.""" + data = self._load_data() + record_dict = data.get('records', {}).get(record_id) + + if record_dict: + return VectorRecord.from_dict(record_dict) + return None + + def count(self, filters: Optional[Dict[str, Any]] = None) -> int: + """Count records in the store.""" + data = self._load_data() + + if not filters: + return len(data.get('records', {})) + + count = 0 + for record_dict in data.get('records', {}).values(): + match = True + for key, value in filters.items(): + if record_dict.get(key) != value: + match = False + break + if match: + count += 1 + + return count + + def close(self) -> None: + """Close and save any pending data.""" + if self._data is not None: + self._save_data() + self._data = None + + def health_check(self) -> Tuple[bool, str]: + """Check backend health.""" + try: + data = self._load_data() + count = len(data.get('records', {})) + return True, f"JSON file backend at {self.file_path}, {count} records" + except Exception as e: + return False, f"JSON file error: {e}" + + def export_for_import(self, output_path: Optional[str] = None) -> str: + """ + Export data in a format suitable for importing into other backends. + + Args: + output_path: Path for export file (defaults to {file_path}_export.json) + + Returns: + Path to the exported file + """ + if output_path is None: + output_path = str(self.file_path).replace('.json', '_export.json') + + data = self._load_data() + + # Export records as a list for easier importing + export_data = { + "metadata": data.get('metadata', {}), + "records": list(data.get('records', {}).values()) + } + + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(export_data, f, ensure_ascii=False, indent=2, default=str) + + print(f"Exported {len(export_data['records'])} records to {output_path}") + return output_path diff --git a/exporters/vector/backends/postgresql.py b/exporters/vector/backends/postgresql.py new file mode 100644 index 0000000..789b43e --- /dev/null +++ b/exporters/vector/backends/postgresql.py @@ -0,0 +1,456 @@ +#!/usr/bin/env python3 +""" +PostgreSQL backend with pgvector extension for vector storage. + +This backend uses the pgvector extension for efficient vector similarity search. +It supports both cosine similarity and L2 distance metrics. + +Requirements: + - PostgreSQL 12+ with pgvector extension installed + - psycopg2 or psycopg (pip install psycopg2-binary or psycopg[binary]) + +Environment variables: + - POSTGRES_CONNECTION_STRING: Full connection string + - Or individual: POSTGRES_HOST, POSTGRES_PORT, POSTGRES_DB, POSTGRES_USER, POSTGRES_PASSWORD +""" + +import os +import json +from datetime import datetime +from typing import List, Optional, Dict, Any, Tuple + +from exporters.vector.backends.base import VectorStoreBackend, VectorRecord, SearchResult + + +class PostgreSQLBackend(VectorStoreBackend): + """ + PostgreSQL vector store backend using pgvector. + + Supports efficient vector similarity search with cosine similarity + or L2 distance metrics. Uses HNSW or IVFFlat indexes for fast retrieval. + """ + + DEFAULT_TABLE_NAME = "sfs_vectors" + DEFAULT_INDEX_TYPE = "hnsw" # or "ivfflat" + + def __init__( + self, + connection_string: Optional[str] = None, + host: Optional[str] = None, + port: int = 5432, + database: Optional[str] = None, + user: Optional[str] = None, + password: Optional[str] = None, + table_name: str = DEFAULT_TABLE_NAME, + index_type: str = DEFAULT_INDEX_TYPE, + distance_metric: str = "cosine" # "cosine", "l2", "inner_product" + ): + """ + Initialize PostgreSQL backend. + + Args: + connection_string: Full PostgreSQL connection string + host: Database host (if not using connection_string) + port: Database port (default: 5432) + database: Database name + user: Database user + password: Database password + table_name: Name of the vector table + index_type: Type of index ("hnsw" or "ivfflat") + distance_metric: Distance metric for similarity search + """ + self.table_name = table_name + self.index_type = index_type + self.distance_metric = distance_metric + self._dimensions = None + self._conn = None + + # Build connection string + if connection_string: + self.connection_string = connection_string + else: + self.connection_string = self._build_connection_string( + host or os.environ.get('POSTGRES_HOST', 'localhost'), + port or int(os.environ.get('POSTGRES_PORT', '5432')), + database or os.environ.get('POSTGRES_DB', 'sfs_vectors'), + user or os.environ.get('POSTGRES_USER', 'postgres'), + password or os.environ.get('POSTGRES_PASSWORD', '') + ) + + def _build_connection_string( + self, + host: str, + port: int, + database: str, + user: str, + password: str + ) -> str: + """Build PostgreSQL connection string.""" + if password: + return f"postgresql://{user}:{password}@{host}:{port}/{database}" + return f"postgresql://{user}@{host}:{port}/{database}" + + def _get_connection(self): + """Get or create database connection.""" + if self._conn is None: + try: + import psycopg2 + self._conn = psycopg2.connect(self.connection_string) + except ImportError: + try: + import psycopg + self._conn = psycopg.connect(self.connection_string) + except ImportError: + raise ImportError( + "PostgreSQL driver not installed. Install with: " + "pip install psycopg2-binary or pip install psycopg[binary]" + ) + return self._conn + + def _get_distance_operator(self) -> str: + """Get the pgvector distance operator for the configured metric.""" + operators = { + "cosine": "<=>", # Cosine distance + "l2": "<->", # L2 distance + "inner_product": "<#>" # Inner product (negative) + } + return operators.get(self.distance_metric, "<=>") + + def initialize(self, dimensions: int, **kwargs) -> None: + """ + Initialize the database schema. + + Creates the pgvector extension, table, and index if they don't exist. + + Args: + dimensions: Vector dimensionality + **kwargs: Additional options (e.g., index_lists for IVFFlat) + """ + self._dimensions = dimensions + conn = self._get_connection() + + with conn.cursor() as cur: + # Create pgvector extension + cur.execute("CREATE EXTENSION IF NOT EXISTS vector") + + # Create table + cur.execute(f""" + CREATE TABLE IF NOT EXISTS {self.table_name} ( + id TEXT PRIMARY KEY, + vector vector({dimensions}), + content TEXT NOT NULL, + document_id TEXT NOT NULL, + document_title TEXT, + chunk_index INTEGER DEFAULT 0, + total_chunks INTEGER DEFAULT 1, + chapter TEXT, + paragraph TEXT, + section_type TEXT, + departement TEXT, + effective_date DATE, + issued_date DATE, + repealed BOOLEAN DEFAULT FALSE, + expiration_date DATE, + embedding_model TEXT, + dimensions INTEGER DEFAULT {dimensions}, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + metadata JSONB DEFAULT '{{}}'::jsonb + ) + """) + + # Create index on document_id for efficient deletion + cur.execute(f""" + CREATE INDEX IF NOT EXISTS idx_{self.table_name}_document_id + ON {self.table_name} (document_id) + """) + + # Create vector index + if self.index_type == "hnsw": + # HNSW index - better for most use cases + op_class = self._get_op_class() + cur.execute(f""" + CREATE INDEX IF NOT EXISTS idx_{self.table_name}_vector_hnsw + ON {self.table_name} + USING hnsw (vector {op_class}) + WITH (m = 16, ef_construction = 64) + """) + elif self.index_type == "ivfflat": + # IVFFlat index - faster to build, slower to query + lists = kwargs.get('index_lists', 100) + op_class = self._get_op_class() + cur.execute(f""" + CREATE INDEX IF NOT EXISTS idx_{self.table_name}_vector_ivfflat + ON {self.table_name} + USING ivfflat (vector {op_class}) + WITH (lists = {lists}) + """) + + conn.commit() + print(f"PostgreSQL backend initialized with {dimensions}-dimensional vectors") + + def _get_op_class(self) -> str: + """Get the operator class for index creation.""" + op_classes = { + "cosine": "vector_cosine_ops", + "l2": "vector_l2_ops", + "inner_product": "vector_ip_ops" + } + return op_classes.get(self.distance_metric, "vector_cosine_ops") + + def insert(self, record: VectorRecord) -> None: + """Insert a single record.""" + self.insert_batch([record]) + + def insert_batch(self, records: List[VectorRecord]) -> int: + """Insert multiple records in a batch.""" + if not records: + return 0 + + conn = self._get_connection() + inserted = 0 + + with conn.cursor() as cur: + for record in records: + try: + # Convert vector to string format for pgvector + vector_str = '[' + ','.join(str(v) for v in record.vector) + ']' + + cur.execute(f""" + INSERT INTO {self.table_name} ( + id, vector, content, document_id, document_title, + chunk_index, total_chunks, chapter, paragraph, + section_type, departement, effective_date, issued_date, + repealed, expiration_date, + embedding_model, dimensions, created_at, metadata + ) VALUES ( + %s, %s::vector, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, %s, %s, + %s, %s, + %s, %s, %s, %s + ) + ON CONFLICT (id) DO UPDATE SET + vector = EXCLUDED.vector, + content = EXCLUDED.content, + document_title = EXCLUDED.document_title, + chunk_index = EXCLUDED.chunk_index, + total_chunks = EXCLUDED.total_chunks, + chapter = EXCLUDED.chapter, + paragraph = EXCLUDED.paragraph, + section_type = EXCLUDED.section_type, + departement = EXCLUDED.departement, + effective_date = EXCLUDED.effective_date, + issued_date = EXCLUDED.issued_date, + repealed = EXCLUDED.repealed, + expiration_date = EXCLUDED.expiration_date, + embedding_model = EXCLUDED.embedding_model, + created_at = EXCLUDED.created_at, + metadata = EXCLUDED.metadata + """, ( + record.id, + vector_str, + record.content, + record.document_id, + record.document_title, + record.chunk_index, + record.total_chunks, + record.chapter, + record.paragraph, + record.section_type, + record.departement, + record.effective_date, + record.issued_date, + record.repealed, + record.expiration_date, + record.embedding_model, + record.dimensions, + record.created_at or datetime.now(), + json.dumps(record.metadata) + )) + inserted += 1 + except Exception as e: + print(f"Error inserting record {record.id}: {e}") + continue + + conn.commit() + + return inserted + + def search( + self, + query_vector: List[float], + limit: int = 10, + filters: Optional[Dict[str, Any]] = None + ) -> List[SearchResult]: + """Search for similar vectors.""" + conn = self._get_connection() + results = [] + + vector_str = '[' + ','.join(str(v) for v in query_vector) + ']' + operator = self._get_distance_operator() + + # Build WHERE clause from filters + where_clauses = [] + params = [vector_str] + + if filters: + for key, value in filters.items(): + if key in ['document_id', 'chapter', 'paragraph', 'section_type', 'departement']: + where_clauses.append(f"{key} = %s") + params.append(value) + + where_sql = "" + if where_clauses: + where_sql = "WHERE " + " AND ".join(where_clauses) + + params.append(limit) + + with conn.cursor() as cur: + cur.execute(f""" + SELECT + id, vector, content, document_id, document_title, + chunk_index, total_chunks, chapter, paragraph, + section_type, departement, effective_date, issued_date, + repealed, expiration_date, + embedding_model, dimensions, created_at, metadata, + vector {operator} %s::vector AS distance + FROM {self.table_name} + {where_sql} + ORDER BY distance + LIMIT %s + """, params) + + for row in cur.fetchall(): + record = VectorRecord( + id=row[0], + vector=list(row[1]) if row[1] else [], + content=row[2], + document_id=row[3], + document_title=row[4], + chunk_index=row[5], + total_chunks=row[6], + chapter=row[7], + paragraph=row[8], + section_type=row[9], + departement=row[10], + effective_date=row[11], + issued_date=row[12], + repealed=row[13], + expiration_date=row[14], + embedding_model=row[15], + dimensions=row[16], + created_at=row[17], + metadata=row[18] or {} + ) + distance = float(row[19]) + + # Convert distance to similarity score + if self.distance_metric == "cosine": + score = 1 - distance # Cosine similarity + elif self.distance_metric == "inner_product": + score = -distance # Inner product is negated in pgvector + else: + score = 1 / (1 + distance) # L2 to similarity + + results.append(SearchResult( + record=record, + score=score, + distance=distance + )) + + return results + + def delete_by_document(self, document_id: str) -> int: + """Delete all records for a document.""" + conn = self._get_connection() + + with conn.cursor() as cur: + cur.execute( + f"DELETE FROM {self.table_name} WHERE document_id = %s", + (document_id,) + ) + deleted = cur.rowcount + conn.commit() + + return deleted + + def get_by_id(self, record_id: str) -> Optional[VectorRecord]: + """Get a record by ID.""" + conn = self._get_connection() + + with conn.cursor() as cur: + cur.execute(f""" + SELECT + id, vector, content, document_id, document_title, + chunk_index, total_chunks, chapter, paragraph, + section_type, departement, effective_date, issued_date, + repealed, expiration_date, + embedding_model, dimensions, created_at, metadata + FROM {self.table_name} + WHERE id = %s + """, (record_id,)) + + row = cur.fetchone() + if not row: + return None + + return VectorRecord( + id=row[0], + vector=list(row[1]) if row[1] else [], + content=row[2], + document_id=row[3], + document_title=row[4], + chunk_index=row[5], + total_chunks=row[6], + chapter=row[7], + paragraph=row[8], + section_type=row[9], + departement=row[10], + effective_date=row[11], + issued_date=row[12], + repealed=row[13], + expiration_date=row[14], + embedding_model=row[15], + dimensions=row[16], + created_at=row[17], + metadata=row[18] or {} + ) + + def count(self, filters: Optional[Dict[str, Any]] = None) -> int: + """Count records in the store.""" + conn = self._get_connection() + + where_clauses = [] + params = [] + + if filters: + for key, value in filters.items(): + if key in ['document_id', 'chapter', 'paragraph', 'section_type', 'departement']: + where_clauses.append(f"{key} = %s") + params.append(value) + + where_sql = "" + if where_clauses: + where_sql = "WHERE " + " AND ".join(where_clauses) + + with conn.cursor() as cur: + cur.execute(f"SELECT COUNT(*) FROM {self.table_name} {where_sql}", params) + return cur.fetchone()[0] + + def close(self) -> None: + """Close database connection.""" + if self._conn: + self._conn.close() + self._conn = None + + def health_check(self) -> Tuple[bool, str]: + """Check database connectivity.""" + try: + conn = self._get_connection() + with conn.cursor() as cur: + cur.execute("SELECT 1") + cur.execute(f"SELECT COUNT(*) FROM {self.table_name}") + count = cur.fetchone()[0] + return True, f"PostgreSQL connected, {count} records in {self.table_name}" + except Exception as e: + return False, f"PostgreSQL error: {e}" diff --git a/exporters/vector/chunking.py b/exporters/vector/chunking.py new file mode 100644 index 0000000..103552c --- /dev/null +++ b/exporters/vector/chunking.py @@ -0,0 +1,788 @@ +#!/usr/bin/env python3 +""" +Text chunking module for SFS documents. + +This module provides intelligent chunking strategies for Swedish legal documents, +optimized for vector embedding and retrieval. + +Chunking strategies: +- PARAGRAPH: Split by paragraf (§) - preserves legal structure +- CHAPTER: Split by kapitel - larger context +- SECTION: Split by logical sections (marked with selex tags) +- SEMANTIC: Split by semantic boundaries with overlap +- FIXED_SIZE: Split by token count with overlap +""" + +import re +from enum import Enum +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Any + + +class ChunkingStrategy(Enum): + """Available chunking strategies for legal documents.""" + PARAGRAPH = "paragraph" # Split by § (paragraf) + CHAPTER = "chapter" # Split by kapitel + SECTION = "section" # Split by selex section tags + SEMANTIC = "semantic" # Semantic boundaries with overlap + FIXED_SIZE = "fixed_size" # Fixed token count with overlap + + +@dataclass +class DocumentChunk: + """Represents a chunk of a document with metadata.""" + content: str # The actual text content + chunk_id: str # Unique identifier for this chunk + document_id: str # Reference to parent document (beteckning) + chunk_index: int # Position within document + total_chunks: int # Total chunks in document + + # Structural metadata + chapter: Optional[str] = None # Chapter reference (e.g., "1 kap.") + paragraph: Optional[str] = None # Paragraph reference (e.g., "1 §") + section_type: Optional[str] = None # Type of section (kapitel, paragraf, etc.) + + # Document metadata + rubrik: Optional[str] = None # Document title + departement: Optional[str] = None # Responsible department + effective_date: Optional[str] = None # Entry into force date (ikraft_datum) + issued_date: Optional[str] = None # Issued date (utfardad_datum) + repealed: bool = False # If regulation is repealed (upphavd) + expiration_date: Optional[str] = None # Expiration date (upphor_datum) + + # Chunk-specific metadata + char_count: int = 0 # Character count + estimated_tokens: int = 0 # Estimated token count + + # Additional metadata as dict + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert chunk to dictionary for serialization.""" + return { + 'content': self.content, + 'chunk_id': self.chunk_id, + 'document_id': self.document_id, + 'chunk_index': self.chunk_index, + 'total_chunks': self.total_chunks, + 'chapter': self.chapter, + 'paragraph': self.paragraph, + 'section_type': self.section_type, + 'rubrik': self.rubrik, + 'departement': self.departement, + 'effective_date': self.effective_date, + 'issued_date': self.issued_date, + 'repealed': self.repealed, + 'expiration_date': self.expiration_date, + 'char_count': self.char_count, + 'estimated_tokens': self.estimated_tokens, + 'metadata': self.metadata, + } + + +def estimate_tokens(text: str) -> int: + """ + Estimate the number of tokens in a text. + + Uses a simple heuristic: ~4 characters per token for Swedish text. + This is a rough estimate; actual token count depends on the tokenizer. + + Args: + text: The text to estimate tokens for + + Returns: + Estimated token count + """ + # Swedish text averages about 4-5 characters per token + # Using 4 to be conservative (better to overestimate) + return len(text) // 4 + + +def _normalize_metadata_fields(metadata: Dict[str, Any]) -> Dict[str, Any]: + """ + Normalize metadata field names from Swedish to English. + + Maps Swedish field names to English equivalents for consistency + in vector export artifacts (JSON, SQL tables, etc.). + + Args: + metadata: Dictionary with Swedish field names + + Returns: + Dictionary with normalized English field names + """ + normalized = metadata.copy() + + # Map Swedish to English field names + field_mapping = { + 'ikraft_datum': 'effective_date', + 'utfardad_datum': 'issued_date', + 'upphor_datum': 'expiration_date', + 'upphavd': 'repealed', + } + + for swedish, english in field_mapping.items(): + if swedish in normalized: + normalized[english] = normalized.pop(swedish) + + return normalized + + +def chunk_document( + content: str, + document_id: str, + strategy: ChunkingStrategy = ChunkingStrategy.PARAGRAPH, + max_chunk_size: int = 512, + overlap_size: int = 50, + metadata: Optional[Dict[str, Any]] = None +) -> List[DocumentChunk]: + """ + Split a document into chunks using the specified strategy. + + Args: + content: The markdown content to chunk (after temporal processing) + document_id: The document identifier (beteckning) + strategy: The chunking strategy to use + max_chunk_size: Maximum tokens per chunk (for fixed_size/semantic) + overlap_size: Number of tokens to overlap between chunks + metadata: Additional metadata to attach to all chunks + + Returns: + List of DocumentChunk objects + """ + if metadata is None: + metadata = {} + + # Extract document-level metadata from frontmatter + doc_metadata = _extract_frontmatter_metadata(content) + doc_metadata.update(metadata) + # Normalize field names from Swedish to English + doc_metadata = _normalize_metadata_fields(doc_metadata) + + # Remove frontmatter for chunking + content_body = _remove_frontmatter(content) + + # Clean selex tags but preserve structure for chunking + # We keep the structural information for metadata extraction + + if strategy == ChunkingStrategy.PARAGRAPH: + chunks = _chunk_by_paragraph(content_body, document_id, doc_metadata) + elif strategy == ChunkingStrategy.CHAPTER: + chunks = _chunk_by_chapter(content_body, document_id, doc_metadata) + elif strategy == ChunkingStrategy.SECTION: + chunks = _chunk_by_section(content_body, document_id, doc_metadata) + elif strategy == ChunkingStrategy.SEMANTIC: + chunks = _chunk_semantic(content_body, document_id, doc_metadata, max_chunk_size, overlap_size) + elif strategy == ChunkingStrategy.FIXED_SIZE: + chunks = _chunk_fixed_size(content_body, document_id, doc_metadata, max_chunk_size, overlap_size) + else: + raise ValueError(f"Unknown chunking strategy: {strategy}") + + # Update total_chunks for all chunks + total = len(chunks) + for chunk in chunks: + chunk.total_chunks = total + + return chunks + + +def _extract_frontmatter_metadata(content: str) -> Dict[str, Any]: + """Extract metadata from YAML frontmatter and selex attributes.""" + metadata = {} + + # Find frontmatter between --- markers + frontmatter_match = re.match(r'^---\s*\n(.*?)\n---', content, re.DOTALL) + if not frontmatter_match: + return metadata + + frontmatter_text = frontmatter_match.group(1) + + # Extract key fields from frontmatter + for field in ['rubrik', 'departement', 'ikraft_datum', 'utfardad_datum', 'upphor_datum', 'beteckning']: + match = re.search(rf'^{field}:\s*(.+)$', frontmatter_text, re.MULTILINE) + if match: + value = match.group(1).strip() + # Remove quotes if present + if value.startswith('"') and value.endswith('"'): + value = value[1:-1] + elif value.startswith("'") and value.endswith("'"): + value = value[1:-1] + metadata[field] = value + + # Also extract from selex attributes in article tag if not in frontmatter + article_match = re.search(r']*)>', content) + if article_match: + article_attrs = article_match.group(1) + + # Extract ikraft_datum from selex:ikraft_datum if not in frontmatter + if 'ikraft_datum' not in metadata: + ikraft_match = re.search(r'selex:ikraft_datum="([^"]+)"', article_attrs) + if ikraft_match: + metadata['ikraft_datum'] = ikraft_match.group(1) + + # Extract utfardad_datum from selex:utfardad_datum if not in frontmatter + if 'utfardad_datum' not in metadata: + utfardad_match = re.search(r'selex:utfardad_datum="([^"]+)"', article_attrs) + if utfardad_match: + metadata['utfardad_datum'] = utfardad_match.group(1) + + # Extract upphor_datum from selex:upphor_datum if not in frontmatter + if 'upphor_datum' not in metadata: + upphor_match = re.search(r'selex:upphor_datum="([^"]+)"', article_attrs) + if upphor_match: + metadata['upphor_datum'] = upphor_match.group(1) + + # Check if upphavd flag is present + if 'selex:upphavd="true"' in article_attrs: + metadata['upphavd'] = True + + return metadata + + +def _remove_frontmatter(content: str) -> str: + """Remove YAML frontmatter from content.""" + # Match frontmatter between --- markers + result = re.sub(r'^---\s*\n.*?\n---\s*\n?', '', content, flags=re.DOTALL) + return result.strip() + + +def _clean_text_for_embedding(text: str) -> str: + """Clean text for embedding, removing tags but preserving readability.""" + # Remove section/article tags but keep content + cleaned = re.sub(r']*>', '', text) + # Normalize whitespace + cleaned = re.sub(r'\n{3,}', '\n\n', cleaned) + cleaned = re.sub(r' +', ' ', cleaned) + return cleaned.strip() + + +def _chunk_by_paragraph(content: str, document_id: str, metadata: Dict[str, Any]) -> List[DocumentChunk]: + """ + Chunk document by paragraph (§). + + Each paragraf becomes its own chunk, which preserves the natural + legal structure of Swedish law. + """ + chunks = [] + current_chapter = None + + # Pattern to find paragraf sections + # Matches sections marked as class="paragraf" or containing § in heading + paragraf_pattern = re.compile( + r']*class="paragraf"[^>]*>(.*?)', + re.DOTALL + ) + + # Also find chapter headings + chapter_pattern = re.compile(r'##\s*(\d+\s*kap\.?[^#\n]*)', re.IGNORECASE) + + # Find all paragraf sections + matches = list(paragraf_pattern.finditer(content)) + + if not matches: + # Fallback: split by § headings + return _chunk_by_paragraph_headings(content, document_id, metadata) + + for i, match in enumerate(matches): + section_content = match.group(1) + + # Extract paragraph number from heading + para_match = re.search(r'#+\s*(\d+\s*[a-z]?\s*§)', section_content) + paragraph_ref = para_match.group(1) if para_match else None + + # Find which chapter this belongs to + content_before = content[:match.start()] + chapter_matches = list(chapter_pattern.finditer(content_before)) + if chapter_matches: + current_chapter = chapter_matches[-1].group(1).strip() + + # Clean the content + clean_content = _clean_text_for_embedding(section_content) + + if clean_content: + chunk = DocumentChunk( + content=clean_content, + chunk_id=f"{document_id}_p{i}", + document_id=document_id, + chunk_index=i, + total_chunks=0, # Will be updated later + chapter=current_chapter, + paragraph=paragraph_ref, + section_type="paragraf", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(clean_content), + estimated_tokens=estimate_tokens(clean_content), + metadata=metadata.copy() + ) + chunks.append(chunk) + + return chunks + + +def _chunk_by_paragraph_headings(content: str, document_id: str, metadata: Dict[str, Any]) -> List[DocumentChunk]: + """Fallback: chunk by § headings when no section tags are present.""" + chunks = [] + current_chapter = None + + # Split by paragraph headings (#### N § or ### N §) + parts = re.split(r'(#{3,4}\s*\d+\s*[a-z]?\s*§[^\n]*)', content) + + current_paragraph = None + current_content = [] + chunk_index = 0 + + for part in parts: + para_match = re.match(r'#{3,4}\s*(\d+\s*[a-z]?\s*§)', part) + if para_match: + # Save previous chunk if exists + if current_content: + clean_content = _clean_text_for_embedding('\n'.join(current_content)) + if clean_content.strip(): + chunk = DocumentChunk( + content=clean_content, + chunk_id=f"{document_id}_p{chunk_index}", + document_id=document_id, + chunk_index=chunk_index, + total_chunks=0, + chapter=current_chapter, + paragraph=current_paragraph, + section_type="paragraf", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(clean_content), + estimated_tokens=estimate_tokens(clean_content), + metadata=metadata.copy() + ) + chunks.append(chunk) + chunk_index += 1 + + current_paragraph = para_match.group(1) + current_content = [part] + else: + # Check for chapter heading + chapter_match = re.search(r'##\s*(\d+\s*kap\.?[^#\n]*)', part, re.IGNORECASE) + if chapter_match: + current_chapter = chapter_match.group(1).strip() + current_content.append(part) + + # Don't forget the last chunk + if current_content: + clean_content = _clean_text_for_embedding('\n'.join(current_content)) + if clean_content.strip(): + chunk = DocumentChunk( + content=clean_content, + chunk_id=f"{document_id}_p{chunk_index}", + document_id=document_id, + chunk_index=chunk_index, + total_chunks=0, + chapter=current_chapter, + paragraph=current_paragraph, + section_type="paragraf", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(clean_content), + estimated_tokens=estimate_tokens(clean_content), + metadata=metadata.copy() + ) + chunks.append(chunk) + + return chunks + + +def _chunk_by_chapter(content: str, document_id: str, metadata: Dict[str, Any]) -> List[DocumentChunk]: + """ + Chunk document by chapter (kapitel). + + Each chapter becomes its own chunk, providing larger context + but fewer chunks per document. + """ + chunks = [] + + # Pattern to find kapitel sections + kapitel_pattern = re.compile( + r']*class="kapitel"[^>]*>(.*?)', + re.DOTALL + ) + + matches = list(kapitel_pattern.finditer(content)) + + if not matches: + # Fallback: split by chapter headings + return _chunk_by_chapter_headings(content, document_id, metadata) + + for i, match in enumerate(matches): + section_content = match.group(1) + + # Extract chapter reference from heading + chapter_match = re.search(r'##\s*(\d+\s*kap\.?[^\n]*)', section_content, re.IGNORECASE) + chapter_ref = chapter_match.group(1).strip() if chapter_match else f"Kapitel {i+1}" + + clean_content = _clean_text_for_embedding(section_content) + + if clean_content: + chunk = DocumentChunk( + content=clean_content, + chunk_id=f"{document_id}_ch{i}", + document_id=document_id, + chunk_index=i, + total_chunks=0, + chapter=chapter_ref, + paragraph=None, + section_type="kapitel", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(clean_content), + estimated_tokens=estimate_tokens(clean_content), + metadata=metadata.copy() + ) + chunks.append(chunk) + + return chunks + + +def _chunk_by_chapter_headings(content: str, document_id: str, metadata: Dict[str, Any]) -> List[DocumentChunk]: + """Fallback: chunk by chapter headings when no section tags are present.""" + chunks = [] + + # Split by chapter headings (## N kap.) + parts = re.split(r'(##\s*\d+\s*kap\.?[^\n]*)', content, flags=re.IGNORECASE) + + current_chapter = None + current_content = [] + chunk_index = 0 + + for part in parts: + chapter_match = re.match(r'##\s*(\d+\s*kap\.?[^\n]*)', part, re.IGNORECASE) + if chapter_match: + # Save previous chunk if exists + if current_content and current_chapter: + clean_content = _clean_text_for_embedding('\n'.join(current_content)) + if clean_content.strip(): + chunk = DocumentChunk( + content=clean_content, + chunk_id=f"{document_id}_ch{chunk_index}", + document_id=document_id, + chunk_index=chunk_index, + total_chunks=0, + chapter=current_chapter, + paragraph=None, + section_type="kapitel", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(clean_content), + estimated_tokens=estimate_tokens(clean_content), + metadata=metadata.copy() + ) + chunks.append(chunk) + chunk_index += 1 + + current_chapter = chapter_match.group(1).strip() + current_content = [part] + else: + current_content.append(part) + + # Don't forget the last chunk + if current_content and current_chapter: + clean_content = _clean_text_for_embedding('\n'.join(current_content)) + if clean_content.strip(): + chunk = DocumentChunk( + content=clean_content, + chunk_id=f"{document_id}_ch{chunk_index}", + document_id=document_id, + chunk_index=chunk_index, + total_chunks=0, + chapter=current_chapter, + paragraph=None, + section_type="kapitel", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(clean_content), + estimated_tokens=estimate_tokens(clean_content), + metadata=metadata.copy() + ) + chunks.append(chunk) + + # If no chapters found, return entire document as one chunk + if not chunks: + clean_content = _clean_text_for_embedding(content) + if clean_content.strip(): + chunks.append(DocumentChunk( + content=clean_content, + chunk_id=f"{document_id}_full", + document_id=document_id, + chunk_index=0, + total_chunks=1, + chapter=None, + paragraph=None, + section_type="document", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(clean_content), + estimated_tokens=estimate_tokens(clean_content), + metadata=metadata.copy() + )) + + return chunks + + +def _chunk_by_section(content: str, document_id: str, metadata: Dict[str, Any]) -> List[DocumentChunk]: + """ + Chunk document by selex section tags. + + Uses all section tags (both kapitel and paragraf) to create chunks. + """ + chunks = [] + + # Pattern to find all sections with class attribute + section_pattern = re.compile( + r']*class="([^"]+)"[^>]*>(.*?)', + re.DOTALL + ) + + matches = list(section_pattern.finditer(content)) + + if not matches: + # Fallback to paragraph chunking + return _chunk_by_paragraph(content, document_id, metadata) + + current_chapter = None + + for i, match in enumerate(matches): + section_class = match.group(1) + section_content = match.group(2) + + # Track chapters for context + if section_class == "kapitel": + chapter_match = re.search(r'##\s*(\d+\s*kap\.?[^\n]*)', section_content, re.IGNORECASE) + if chapter_match: + current_chapter = chapter_match.group(1).strip() + + # Extract paragraph reference if it's a paragraf + paragraph_ref = None + if section_class == "paragraf": + para_match = re.search(r'#+\s*(\d+\s*[a-z]?\s*§)', section_content) + if para_match: + paragraph_ref = para_match.group(1) + + clean_content = _clean_text_for_embedding(section_content) + + if clean_content: + chunk = DocumentChunk( + content=clean_content, + chunk_id=f"{document_id}_s{i}", + document_id=document_id, + chunk_index=i, + total_chunks=0, + chapter=current_chapter, + paragraph=paragraph_ref, + section_type=section_class, + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(clean_content), + estimated_tokens=estimate_tokens(clean_content), + metadata=metadata.copy() + ) + chunks.append(chunk) + + return chunks + + +def _chunk_semantic( + content: str, + document_id: str, + metadata: Dict[str, Any], + max_chunk_size: int, + overlap_size: int +) -> List[DocumentChunk]: + """ + Chunk document by semantic boundaries with overlap. + + Tries to split at natural boundaries (paragraphs, sections) + while respecting the max chunk size. + """ + chunks = [] + clean_content = _clean_text_for_embedding(content) + + # Split into paragraphs first + paragraphs = re.split(r'\n\n+', clean_content) + + current_chunk = [] + current_tokens = 0 + chunk_index = 0 + + for para in paragraphs: + para_tokens = estimate_tokens(para) + + if current_tokens + para_tokens > max_chunk_size and current_chunk: + # Save current chunk + chunk_text = '\n\n'.join(current_chunk) + chunk = DocumentChunk( + content=chunk_text, + chunk_id=f"{document_id}_sem{chunk_index}", + document_id=document_id, + chunk_index=chunk_index, + total_chunks=0, + chapter=None, + paragraph=None, + section_type="semantic", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(chunk_text), + estimated_tokens=estimate_tokens(chunk_text), + metadata=metadata.copy() + ) + chunks.append(chunk) + chunk_index += 1 + + # Start new chunk with overlap + overlap_paras = [] + overlap_tokens = 0 + for p in reversed(current_chunk): + p_tokens = estimate_tokens(p) + if overlap_tokens + p_tokens <= overlap_size: + overlap_paras.insert(0, p) + overlap_tokens += p_tokens + else: + break + + current_chunk = overlap_paras + current_tokens = overlap_tokens + + current_chunk.append(para) + current_tokens += para_tokens + + # Don't forget the last chunk + if current_chunk: + chunk_text = '\n\n'.join(current_chunk) + chunk = DocumentChunk( + content=chunk_text, + chunk_id=f"{document_id}_sem{chunk_index}", + document_id=document_id, + chunk_index=chunk_index, + total_chunks=0, + chapter=None, + paragraph=None, + section_type="semantic", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(chunk_text), + estimated_tokens=estimate_tokens(chunk_text), + metadata=metadata.copy() + ) + chunks.append(chunk) + + return chunks + + +def _chunk_fixed_size( + content: str, + document_id: str, + metadata: Dict[str, Any], + max_chunk_size: int, + overlap_size: int +) -> List[DocumentChunk]: + """ + Chunk document by fixed token count with overlap. + + Simple chunking that splits text at approximately max_chunk_size tokens + with overlap_size tokens of overlap between chunks. + """ + chunks = [] + clean_content = _clean_text_for_embedding(content) + + # Rough calculation: 4 chars per token + chars_per_chunk = max_chunk_size * 4 + overlap_chars = overlap_size * 4 + + start = 0 + chunk_index = 0 + + while start < len(clean_content): + end = start + chars_per_chunk + + # Try to break at a sentence boundary + if end < len(clean_content): + # Look for sentence end within last 100 chars + search_start = max(start + chars_per_chunk - 100, start) + search_end = min(end + 100, len(clean_content)) + search_text = clean_content[search_start:search_end] + + # Find last sentence boundary + sentence_end = -1 + for pattern in ['. ', '.\n', '? ', '!\n', '! ']: + pos = search_text.rfind(pattern) + if pos > sentence_end: + sentence_end = pos + + if sentence_end > 0: + end = search_start + sentence_end + 1 + + chunk_text = clean_content[start:end].strip() + + if chunk_text: + chunk = DocumentChunk( + content=chunk_text, + chunk_id=f"{document_id}_fix{chunk_index}", + document_id=document_id, + chunk_index=chunk_index, + total_chunks=0, + chapter=None, + paragraph=None, + section_type="fixed", + rubrik=metadata.get('rubrik'), + departement=metadata.get('departement'), + effective_date=metadata.get('effective_date'), + issued_date=metadata.get('issued_date'), + repealed=metadata.get('repealed', False), + expiration_date=metadata.get('expiration_date'), + char_count=len(chunk_text), + estimated_tokens=estimate_tokens(chunk_text), + metadata=metadata.copy() + ) + chunks.append(chunk) + chunk_index += 1 + + start = end - overlap_chars + + return chunks diff --git a/exporters/vector/embeddings.py b/exporters/vector/embeddings.py new file mode 100644 index 0000000..b7ec1c7 --- /dev/null +++ b/exporters/vector/embeddings.py @@ -0,0 +1,370 @@ +#!/usr/bin/env python3 +""" +Embedding module for SFS documents. + +This module provides embedding functionality using state-of-the-art embedding models. +Currently supports: +- OpenAI text-embedding-3-large (best quality, 3072 dimensions) +- OpenAI text-embedding-3-small (faster, cheaper, 1536 dimensions) + +The embeddings are optimized for Swedish legal text retrieval. +""" + +import os +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional, Union +import time + + +class EmbeddingModel(Enum): + """Available embedding models.""" + # OpenAI models - text-embedding-3 series (best for multilingual including Swedish) + OPENAI_3_LARGE = "text-embedding-3-large" # 3072 dims, best quality + OPENAI_3_SMALL = "text-embedding-3-small" # 1536 dims, faster/cheaper + + # Legacy OpenAI model (for reference) + OPENAI_ADA_002 = "text-embedding-ada-002" # 1536 dims, older model + + +@dataclass +class EmbeddingResult: + """Result of an embedding operation.""" + text: str # Original text + embedding: List[float] # The embedding vector + model: str # Model used + dimensions: int # Vector dimensions + tokens_used: int # Tokens consumed + metadata: Optional[dict] = None # Additional metadata + + +class EmbeddingProvider(ABC): + """Abstract base class for embedding providers.""" + + @abstractmethod + def embed_text(self, text: str) -> EmbeddingResult: + """Generate embedding for a single text.""" + pass + + @abstractmethod + def embed_texts(self, texts: List[str]) -> List[EmbeddingResult]: + """Generate embeddings for multiple texts (batch).""" + pass + + @abstractmethod + def get_dimensions(self) -> int: + """Return the embedding vector dimensions.""" + pass + + @abstractmethod + def get_model_name(self) -> str: + """Return the model name.""" + pass + + +class OpenAIEmbeddingProvider(EmbeddingProvider): + """ + OpenAI embedding provider using text-embedding-3 models. + + The text-embedding-3-large model is recommended for best quality + Swedish legal text retrieval. It has 3072 dimensions and excellent + multilingual support. + + Environment variables: + OPENAI_API_KEY: Your OpenAI API key + """ + + # Dimension mappings for each model + MODEL_DIMENSIONS = { + EmbeddingModel.OPENAI_3_LARGE.value: 3072, + EmbeddingModel.OPENAI_3_SMALL.value: 1536, + EmbeddingModel.OPENAI_ADA_002.value: 1536, + } + + # Token limits per model (for reference) + MODEL_MAX_TOKENS = { + EmbeddingModel.OPENAI_3_LARGE.value: 8191, + EmbeddingModel.OPENAI_3_SMALL.value: 8191, + EmbeddingModel.OPENAI_ADA_002.value: 8191, + } + + def __init__( + self, + model: EmbeddingModel = EmbeddingModel.OPENAI_3_LARGE, + api_key: Optional[str] = None, + dimensions: Optional[int] = None, + batch_size: int = 100, + retry_attempts: int = 3, + retry_delay: float = 1.0 + ): + """ + Initialize the OpenAI embedding provider. + + Args: + model: The embedding model to use + api_key: OpenAI API key (defaults to OPENAI_API_KEY env var) + dimensions: Optional reduced dimensions (for text-embedding-3 models) + batch_size: Number of texts to embed in one API call + retry_attempts: Number of retry attempts on failure + retry_delay: Base delay between retries (exponential backoff) + """ + self.model = model + self.model_name = model.value + self.api_key = api_key or os.environ.get('OPENAI_API_KEY') + self.batch_size = batch_size + self.retry_attempts = retry_attempts + self.retry_delay = retry_delay + + # Handle custom dimensions (text-embedding-3 supports dimension reduction) + if dimensions is not None: + if model not in [EmbeddingModel.OPENAI_3_LARGE, EmbeddingModel.OPENAI_3_SMALL]: + raise ValueError(f"Custom dimensions not supported for {model.value}") + max_dims = self.MODEL_DIMENSIONS[self.model_name] + if dimensions > max_dims: + raise ValueError(f"Dimensions cannot exceed {max_dims} for {model.value}") + self._dimensions = dimensions + else: + self._dimensions = self.MODEL_DIMENSIONS[self.model_name] + + self._client = None + + def _get_client(self): + """Lazy initialization of OpenAI client.""" + if self._client is None: + if not self.api_key: + raise ValueError( + "OpenAI API key not found. Set OPENAI_API_KEY environment variable " + "or pass api_key to the constructor." + ) + try: + from openai import OpenAI + self._client = OpenAI(api_key=self.api_key) + except ImportError: + raise ImportError( + "OpenAI package not installed. Install with: pip install openai" + ) + return self._client + + def embed_text(self, text: str) -> EmbeddingResult: + """ + Generate embedding for a single text. + + Args: + text: The text to embed + + Returns: + EmbeddingResult with the embedding vector + """ + results = self.embed_texts([text]) + return results[0] + + def embed_texts(self, texts: List[str]) -> List[EmbeddingResult]: + """ + Generate embeddings for multiple texts. + + Uses batching for efficiency with automatic retry on failures. + + Args: + texts: List of texts to embed + + Returns: + List of EmbeddingResult objects + """ + client = self._get_client() + results = [] + + # Process in batches + for i in range(0, len(texts), self.batch_size): + batch = texts[i:i + self.batch_size] + batch_results = self._embed_batch_with_retry(client, batch) + results.extend(batch_results) + + return results + + def _embed_batch_with_retry(self, client, texts: List[str]) -> List[EmbeddingResult]: + """Embed a batch of texts with retry logic.""" + last_error = None + + for attempt in range(self.retry_attempts): + try: + return self._embed_batch(client, texts) + except Exception as e: + last_error = e + if attempt < self.retry_attempts - 1: + delay = self.retry_delay * (2 ** attempt) # Exponential backoff + print(f"Embedding attempt {attempt + 1} failed: {e}. Retrying in {delay}s...") + time.sleep(delay) + + raise RuntimeError(f"Failed to embed texts after {self.retry_attempts} attempts: {last_error}") + + def _embed_batch(self, client, texts: List[str]) -> List[EmbeddingResult]: + """Embed a batch of texts.""" + # Build request parameters + params = { + "model": self.model_name, + "input": texts, + } + + # Add dimensions parameter for text-embedding-3 models if custom + if self._dimensions != self.MODEL_DIMENSIONS[self.model_name]: + params["dimensions"] = self._dimensions + + response = client.embeddings.create(**params) + + results = [] + for i, item in enumerate(response.data): + results.append(EmbeddingResult( + text=texts[i], + embedding=item.embedding, + model=self.model_name, + dimensions=len(item.embedding), + tokens_used=response.usage.total_tokens // len(texts), # Approximate per-text + metadata={ + "index": item.index, + } + )) + + return results + + def get_dimensions(self) -> int: + """Return the embedding vector dimensions.""" + return self._dimensions + + def get_model_name(self) -> str: + """Return the model name.""" + return self.model_name + + +class MockEmbeddingProvider(EmbeddingProvider): + """ + Mock embedding provider for testing without API calls. + + Generates deterministic pseudo-random vectors based on text content. + """ + + def __init__(self, dimensions: int = 3072, **kwargs): + """ + Initialize mock provider. + + Args: + dimensions: Number of dimensions for mock vectors + **kwargs: Ignored (for compatibility with other providers) + """ + self._dimensions = dimensions + + def embed_text(self, text: str) -> EmbeddingResult: + """Generate a mock embedding for testing.""" + # Generate deterministic pseudo-random vector based on text hash + import hashlib + text_hash = hashlib.sha256(text.encode()).hexdigest() + + # Convert hash to floats + embedding = [] + for i in range(self._dimensions): + # Use different parts of the hash + idx = (i * 4) % 64 + hex_chunk = text_hash[idx:idx+4] or text_hash[:4] + value = (int(hex_chunk, 16) / 65535) * 2 - 1 # Normalize to [-1, 1] + embedding.append(value) + + return EmbeddingResult( + text=text, + embedding=embedding, + model="mock", + dimensions=self._dimensions, + tokens_used=len(text) // 4, # Rough estimate + metadata={"is_mock": True} + ) + + def embed_texts(self, texts: List[str]) -> List[EmbeddingResult]: + """Generate mock embeddings for multiple texts.""" + return [self.embed_text(text) for text in texts] + + def get_dimensions(self) -> int: + """Return the embedding vector dimensions.""" + return self._dimensions + + def get_model_name(self) -> str: + """Return the model name.""" + return "mock" + + +def get_embedding_provider( + provider: str = "openai", + model: Optional[Union[str, EmbeddingModel]] = None, + **kwargs +) -> EmbeddingProvider: + """ + Factory function to get an embedding provider. + + Args: + provider: Provider name ("openai" or "mock") + model: Model to use (for OpenAI: "text-embedding-3-large", etc.) + **kwargs: Additional arguments passed to the provider + + Returns: + An EmbeddingProvider instance + + Example: + # Best quality embeddings + provider = get_embedding_provider("openai", model="text-embedding-3-large") + + # Faster/cheaper embeddings + provider = get_embedding_provider("openai", model="text-embedding-3-small") + + # Testing without API + provider = get_embedding_provider("mock") + """ + if provider == "openai": + if model is None: + model = EmbeddingModel.OPENAI_3_LARGE + elif isinstance(model, str): + # Convert string to enum + model_map = {m.value: m for m in EmbeddingModel} + if model not in model_map: + raise ValueError(f"Unknown model: {model}. Available: {list(model_map.keys())}") + model = model_map[model] + + return OpenAIEmbeddingProvider(model=model, **kwargs) + + elif provider == "mock": + return MockEmbeddingProvider(**kwargs) + + else: + raise ValueError(f"Unknown provider: {provider}. Available: openai, mock") + + +# Convenience functions for common use cases + +def embed_document_chunks( + chunks: List['DocumentChunk'], + provider: Optional[EmbeddingProvider] = None, + show_progress: bool = True +) -> List[tuple]: + """ + Embed a list of document chunks. + + Args: + chunks: List of DocumentChunk objects + provider: Embedding provider (defaults to OpenAI text-embedding-3-large) + show_progress: Whether to show progress indicator + + Returns: + List of (chunk, embedding_result) tuples + """ + if provider is None: + provider = get_embedding_provider("openai") + + texts = [chunk.content for chunk in chunks] + total = len(texts) + + if show_progress: + print(f"Embedding {total} chunks using {provider.get_model_name()}...") + + results = provider.embed_texts(texts) + + if show_progress: + print(f"Completed embedding {total} chunks") + + return list(zip(chunks, results)) diff --git a/exporters/vector/vector_export.py b/exporters/vector/vector_export.py new file mode 100644 index 0000000..33e2f0f --- /dev/null +++ b/exporters/vector/vector_export.py @@ -0,0 +1,519 @@ +#!/usr/bin/env python3 +""" +Vector export functionality for SFS documents. + +This module converts Swedish legal documents (SFS) to vector embeddings +suitable for semantic search and retrieval. + +The vector exporter: +1. Applies temporal filtering to get only current regulations (like md/html mode) +2. Chunks documents intelligently (by paragraph, chapter, or semantically) +3. Generates embeddings using state-of-the-art models (OpenAI text-embedding-3) +4. Stores vectors in the chosen backend (PostgreSQL, Elasticsearch, or JSON) + +Usage: + from exporters.vector import create_vector_documents, VectorExportConfig + + # Configure export + config = VectorExportConfig( + embedding_provider="openai", + embedding_model="text-embedding-3-large", + backend_type="postgresql", + chunking_strategy="paragraph" + ) + + # Export document + create_vector_documents(data, output_dir, config, target_date="2024-01-01") +""" + +import json +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Dict, Any, List, Optional, Union + +from exporters.vector.chunking import ( + chunk_document, + ChunkingStrategy, + DocumentChunk +) +from exporters.vector.embeddings import ( + EmbeddingProvider, + EmbeddingModel, + get_embedding_provider, + embed_document_chunks +) +from exporters.vector.backends import ( + VectorStoreBackend, + VectorRecord, + get_backend +) + + +@dataclass +class VectorExportConfig: + """Configuration for vector export.""" + + # Embedding settings + embedding_provider: str = "openai" # "openai" or "mock" + embedding_model: str = "text-embedding-3-large" # Best quality model + embedding_dimensions: Optional[int] = None # Custom dimensions (for text-embedding-3) + embedding_batch_size: int = 100 # Batch size for embedding API + + # Backend settings + backend_type: str = "json" # "postgresql", "elasticsearch", "json" + backend_config: Dict[str, Any] = field(default_factory=dict) + + # Chunking settings + chunking_strategy: ChunkingStrategy = ChunkingStrategy.PARAGRAPH + max_chunk_size: int = 512 # Max tokens per chunk + overlap_size: int = 50 # Overlap tokens between chunks + + # Output settings + save_json_export: bool = True # Also save as JSON file + verbose: bool = False # Verbose output + + def get_embedding_provider(self) -> EmbeddingProvider: + """Create embedding provider from config.""" + kwargs = { + "batch_size": self.embedding_batch_size + } + if self.embedding_dimensions: + kwargs["dimensions"] = self.embedding_dimensions + + return get_embedding_provider( + provider=self.embedding_provider, + model=self.embedding_model, + **kwargs + ) + + def get_backend(self) -> VectorStoreBackend: + """Create backend from config.""" + return get_backend(self.backend_type, **self.backend_config) + + +def create_vector_documents( + data: Dict[str, Any], + output_dir: Path, + config: Optional[VectorExportConfig] = None, + target_date: Optional[str] = None, + backend: Optional[VectorStoreBackend] = None, + embedding_provider: Optional[EmbeddingProvider] = None +) -> List[VectorRecord]: + """ + Create vector documents from SFS JSON data. + + This is the main entry point for vector export. It: + 1. Converts JSON to markdown + 2. Applies temporal filtering for the target date + 3. Chunks the document + 4. Generates embeddings + 5. Stores in the configured backend + + Args: + data: JSON data containing document information + output_dir: Directory for output files (used for JSON export) + config: Vector export configuration + target_date: Target date for temporal filtering (YYYY-MM-DD) + If None, uses today's date + backend: Pre-configured backend (overrides config.backend_type) + embedding_provider: Pre-configured embedding provider + + Returns: + List of VectorRecord objects that were created + + Example: + config = VectorExportConfig( + embedding_provider="openai", + backend_type="postgresql", + backend_config={"connection_string": "postgresql://..."} + ) + records = create_vector_documents(data, output_dir, config) + """ + if config is None: + config = VectorExportConfig() + + # Default target_date to today + if target_date is None: + target_date = datetime.now().strftime('%Y-%m-%d') + + # Extract beteckning for logging + beteckning = data.get('beteckning') + if not beteckning: + print("Varning: Beteckning saknas i dokumentdata") + return [] + + # Skip agency regulations (N-beteckningar) + if beteckning.startswith('N'): + if config.verbose: + print(f"Hoppar över myndighetsföreskrift: {beteckning}") + return [] + + if config.verbose: + print(f"Skapar vektordata för {beteckning} (target_date: {target_date})") + + # Step 1: Convert to markdown with temporal filtering + markdown_content = _prepare_document_content(data, target_date, config.verbose) + + if not markdown_content: + print(f"Varning: Inget innehåll för {beteckning} efter temporal processing") + return [] + + # Step 2: Chunk the document + chunks = chunk_document( + content=markdown_content, + document_id=beteckning, + strategy=config.chunking_strategy, + max_chunk_size=config.max_chunk_size, + overlap_size=config.overlap_size, + metadata={ + "target_date": target_date, + "export_timestamp": datetime.now().isoformat() + } + ) + + if not chunks: + print(f"Varning: Inga chunks skapades för {beteckning}") + return [] + + if config.verbose: + print(f" Skapade {len(chunks)} chunks") + + # Step 3: Generate embeddings + provider = embedding_provider or config.get_embedding_provider() + + if config.verbose: + print(f" Genererar embeddings med {provider.get_model_name()}...") + + chunk_embeddings = embed_document_chunks( + chunks=chunks, + provider=provider, + show_progress=config.verbose + ) + + # Step 4: Create VectorRecords + records = [] + for chunk, embedding_result in chunk_embeddings: + record = VectorRecord( + id=chunk.chunk_id, + vector=embedding_result.embedding, + content=chunk.content, + document_id=chunk.document_id, + document_title=chunk.rubrik, + chunk_index=chunk.chunk_index, + total_chunks=chunk.total_chunks, + chapter=chunk.chapter, + paragraph=chunk.paragraph, + section_type=chunk.section_type, + departement=chunk.departement, + effective_date=chunk.effective_date, + issued_date=chunk.issued_date, + repealed=chunk.repealed, + expiration_date=chunk.expiration_date, + embedding_model=embedding_result.model, + dimensions=embedding_result.dimensions, + created_at=datetime.now(), + metadata={ + "target_date": target_date, + "char_count": chunk.char_count, + "estimated_tokens": chunk.estimated_tokens, + "tokens_used": embedding_result.tokens_used + } + ) + records.append(record) + + # Step 5: Store in backend + store = backend or config.get_backend() + + # Initialize backend if needed + if records: + dimensions = records[0].dimensions + store.initialize(dimensions) + + # Delete existing records for this document + deleted = store.delete_by_document(beteckning) + if deleted > 0 and config.verbose: + print(f" Tog bort {deleted} befintliga records") + + # Insert new records + inserted = store.insert_batch(records) + + if config.verbose: + print(f" Infogade {inserted} records i {config.backend_type}") + + # Step 6: Optionally save JSON export + if config.save_json_export: + _save_json_export(records, output_dir, beteckning) + + return records + + +def _prepare_document_content( + data: Dict[str, Any], + target_date: str, + verbose: bool = False +) -> Optional[str]: + """ + Prepare document content with temporal filtering. + + This mimics the processing done in md/html mode to ensure + only current regulations are included. + + Args: + data: JSON document data + target_date: Target date for temporal filtering + verbose: Enable verbose output + + Returns: + Markdown content with temporal rules applied, or None if empty + """ + # Import processing functions + from sfs_processor import convert_to_markdown + from temporal.apply_temporal import apply_temporal, is_document_content_empty + from temporal.title_temporal import title_temporal + from formatters.format_sfs_text import normalize_heading_levels + + # Apply temporal title processing + if data.get('rubrik'): + rubrik_after_temporal = title_temporal(data['rubrik'], target_date) + data = data.copy() + data['rubrik_after_temporal'] = rubrik_after_temporal + + # Convert to markdown + try: + markdown_content = convert_to_markdown(data, fetch_predocs_from_api=False, apply_links=False) + except ValueError as e: + if verbose: + print(f"Varning: Kunde inte konvertera dokument: {e}") + return None + + # Normalize heading levels + markdown_content = normalize_heading_levels(markdown_content) + + # Apply temporal filtering + markdown_content = apply_temporal(markdown_content, target_date, verbose=verbose) + + # Check if document is empty after temporal processing + if is_document_content_empty(markdown_content): + if verbose: + print("Dokumentet är tomt efter temporal processing") + return None + + return markdown_content + + +def _save_json_export( + records: List[VectorRecord], + output_dir: Path, + beteckning: str +) -> Path: + """ + Save records as JSON file for portability. + + Args: + records: List of VectorRecords + output_dir: Output directory + beteckning: Document identifier + + Returns: + Path to the saved JSON file + """ + # Create vector output directory + vector_dir = output_dir / "vectors" + vector_dir.mkdir(parents=True, exist_ok=True) + + # Create safe filename + safe_beteckning = beteckning.replace(':', '-') + json_file = vector_dir / f"sfs-{safe_beteckning}-vectors.json" + + # Convert records to dict (without vectors for readability) + export_data = { + "document_id": beteckning, + "export_timestamp": datetime.now().isoformat(), + "total_records": len(records), + "records": [ + { + "id": r.id, + "content": r.content, + "document_title": r.document_title, + "chunk_index": r.chunk_index, + "total_chunks": r.total_chunks, + "chapter": r.chapter, + "paragraph": r.paragraph, + "section_type": r.section_type, + "embedding_model": r.embedding_model, + "dimensions": r.dimensions, + "vector_preview": r.vector[:5] + ["..."] + r.vector[-5:] if len(r.vector) > 10 else r.vector, + "metadata": r.metadata + } + for r in records + ] + } + + with open(json_file, 'w', encoding='utf-8') as f: + json.dump(export_data, f, ensure_ascii=False, indent=2, default=str) + + return json_file + + +def batch_create_vector_documents( + json_files: List[Path], + output_dir: Path, + config: Optional[VectorExportConfig] = None, + target_date: Optional[str] = None, + show_progress: bool = True +) -> Dict[str, List[VectorRecord]]: + """ + Create vector documents for multiple JSON files. + + Efficiently processes multiple documents using batched embedding calls + and shared backend connections. + + Args: + json_files: List of JSON file paths + output_dir: Output directory + config: Vector export configuration + target_date: Target date for temporal filtering + show_progress: Show progress indicator + + Returns: + Dictionary mapping beteckning to list of VectorRecords + """ + if config is None: + config = VectorExportConfig() + + if target_date is None: + target_date = datetime.now().strftime('%Y-%m-%d') + + # Create shared resources + embedding_provider = config.get_embedding_provider() + backend = config.get_backend() + + results = {} + total = len(json_files) + + for i, json_file in enumerate(json_files): + if show_progress: + print(f"[{i+1}/{total}] Bearbetar {json_file.name}...") + + try: + with open(json_file, 'r', encoding='utf-8') as f: + data = json.load(f) + except (json.JSONDecodeError, FileNotFoundError) as e: + print(f"Fel vid läsning av {json_file}: {e}") + continue + + beteckning = data.get('beteckning', json_file.stem) + + records = create_vector_documents( + data=data, + output_dir=output_dir, + config=config, + target_date=target_date, + backend=backend, + embedding_provider=embedding_provider + ) + + if records: + results[beteckning] = records + + # Close backend connection + backend.close() + + if show_progress: + total_records = sum(len(r) for r in results.values()) + print(f"\nKlar! Skapade {total_records} vektorer för {len(results)} dokument") + + return results + + +# CLI interface +if __name__ == "__main__": + import sys + import argparse + + parser = argparse.ArgumentParser( + description='Export SFS documents to vector format' + ) + parser.add_argument( + '--input', '-i', + required=True, + help='Input JSON file or directory' + ) + parser.add_argument( + '--output', '-o', + default='./vector-export', + help='Output directory' + ) + parser.add_argument( + '--target-date', + default=None, + help='Target date for temporal filtering (YYYY-MM-DD)' + ) + parser.add_argument( + '--backend', + default='json', + choices=['postgresql', 'elasticsearch', 'json'], + help='Vector store backend' + ) + parser.add_argument( + '--embedding-model', + default='text-embedding-3-large', + help='Embedding model to use' + ) + parser.add_argument( + '--chunking', + default='paragraph', + choices=['paragraph', 'chapter', 'section', 'semantic', 'fixed_size'], + help='Chunking strategy' + ) + parser.add_argument( + '--mock-embeddings', + action='store_true', + help='Use mock embeddings (for testing without API)' + ) + parser.add_argument( + '--verbose', '-v', + action='store_true', + help='Verbose output' + ) + + args = parser.parse_args() + + # Build config + config = VectorExportConfig( + embedding_provider="mock" if args.mock_embeddings else "openai", + embedding_model=args.embedding_model, + backend_type=args.backend, + chunking_strategy=ChunkingStrategy(args.chunking), + verbose=args.verbose + ) + + input_path = Path(args.input) + output_dir = Path(args.output) + output_dir.mkdir(parents=True, exist_ok=True) + + if input_path.is_file(): + # Single file + with open(input_path, 'r', encoding='utf-8') as f: + data = json.load(f) + records = create_vector_documents( + data=data, + output_dir=output_dir, + config=config, + target_date=args.target_date + ) + print(f"Skapade {len(records)} vektorer") + else: + # Directory + json_files = list(input_path.glob('*.json')) + if not json_files: + print(f"Inga JSON-filer hittades i {input_path}") + sys.exit(1) + + results = batch_create_vector_documents( + json_files=json_files, + output_dir=output_dir, + config=config, + target_date=args.target_date + ) diff --git a/sfs_processor.py b/sfs_processor.py index 80f4afe..1a27898 100644 --- a/sfs_processor.py +++ b/sfs_processor.py @@ -555,13 +555,24 @@ def main(): parser.add_argument('--verbose', action='store_true', help='Show detailed diff output for each amendment processing') parser.add_argument('--formats', dest='output_modes', default='md-markers', - help='Output formats to generate (comma-separated). Currently supported: md-markers, md, git, html, htmldiff. Default: md-markers. Use "md-markers" to preserve section tags with temporal attributes (standard). Use "md" for clean markdown without section tags. Use "git" to enable Git commits with historical dates. HTML creates documents in ELI directory structure (/eli/sfs/{YEAR}/{lopnummer}). HTMLDIFF includes amendment versions with diff view.') + help='Output formats to generate (comma-separated). Currently supported: md-markers, md, git, html, htmldiff, vector. Default: md-markers. Use "md-markers" to preserve section tags with temporal attributes (standard). Use "md" for clean markdown without section tags. Use "git" to enable Git commits with historical dates. HTML creates documents in ELI directory structure (/eli/sfs/{YEAR}/{lopnummer}). HTMLDIFF includes amendment versions with diff view. VECTOR creates embeddings for semantic search.') parser.add_argument('--predocs-fetch', action='store_true', dest='predocs_fetch', help='Fetch detailed information about förarbeten from Riksdagen API. Parsing of förarbeten always happens. This will make processing slower.') parser.add_argument('--target-date', dest='target_date', default=None, help='Target date (YYYY-MM-DD) for temporal processing. Used with md, html, and htmldiff formats to filter content based on validity dates. If not specified, today\'s date is used for md format. Example: --target-date 2023-01-01') parser.add_argument('--apply-links', action='store_true', default=True, help='Apply internal paragraph links (e.g., [9 §](#9§)), external SFS links (e.g., [2002:43](/sfs/2002:43)), EU legislation links (e.g., [(EU) nr 651/2014](https://eur-lex.europa.eu/...)), and law name links (e.g., [8 kap. 7 § regeringsformen](/sfs/1974/152)) to the document. Default: True') + # Vector export options + parser.add_argument('--vector-backend', dest='vector_backend', default='json', + choices=['postgresql', 'elasticsearch', 'json'], + help='Vector store backend for vector format (default: json)') + parser.add_argument('--vector-chunking', dest='vector_chunking', default='paragraph', + choices=['paragraph', 'chapter', 'section', 'semantic', 'fixed_size'], + help='Chunking strategy for vector format (default: paragraph)') + parser.add_argument('--vector-mock', dest='vector_mock', action='store_true', + help='Use mock embeddings for vector format (for testing without OpenAI API)') + parser.add_argument('--embedding-model', dest='embedding_model', default='text-embedding-3-large', + help='Embedding model for vector format (default: text-embedding-3-large)') parser.set_defaults(year_folder=True) args = parser.parse_args() @@ -571,7 +582,7 @@ def main(): output_modes = ['md'] # Default to markdown # Validate output modes - supported_formats = ['md', 'md-markers', 'git', 'html', 'htmldiff'] + supported_formats = ['md', 'md-markers', 'git', 'html', 'htmldiff', 'vector'] invalid_formats = [mode for mode in output_modes if mode not in supported_formats] if invalid_formats: print(f"Fel: Ej stödda utdataformat: {', '.join(invalid_formats)}") @@ -634,7 +645,44 @@ def main(): css_js_dir.mkdir(parents=True, exist_ok=True) generate_css_file(css_js_dir) generate_js_file(css_js_dir) - + + # Handle vector mode with batch processing + if "vector" in output_modes: + from exporters.vector import VectorExportConfig, ChunkingStrategy + from exporters.vector.vector_export import batch_create_vector_documents + + # Build vector config + backend_config = {} + if args.vector_backend == "json": + # Set JSON file path to output directory + backend_config["file_path"] = str(output_dir / "sfs_vectors.json") + + vector_config = VectorExportConfig( + embedding_provider="mock" if args.vector_mock else "openai", + embedding_model=args.embedding_model, + backend_type=args.vector_backend, + backend_config=backend_config, + chunking_strategy=ChunkingStrategy(args.vector_chunking), + verbose=args.verbose + ) + + # Set target_date to today if not specified + vector_target_date = args.target_date or datetime.now().strftime('%Y-%m-%d') + + print(f"Skapar vektordata med {args.embedding_model} och {args.vector_chunking}-chunking...") + batch_create_vector_documents( + json_files=json_files, + output_dir=output_dir, + config=vector_config, + target_date=vector_target_date, + show_progress=True + ) + + # Remove 'vector' from output_modes if it's the only one to avoid further processing + if output_modes == ['vector']: + print(f"\nVektorbearbetning klar! {len(json_files)} filer bearbetade") + return + # Handle git mode with batch processing if "git" in output_modes: from exporters.git import process_files_with_git_batch