|
| 1 | +using System.Text.RegularExpressions; |
1 | 2 | using EssentialCSharp.Chat.Common.Models; |
2 | 3 | using Microsoft.Extensions.AI; |
3 | 4 | using Microsoft.Extensions.VectorData; |
| 5 | +using Npgsql; |
4 | 6 |
|
5 | 7 | namespace EssentialCSharp.Chat.Common.Services; |
6 | 8 |
|
7 | 9 | /// <summary> |
8 | | -/// Service for generating embeddings for markdown chunks using Azure OpenAI |
| 10 | +/// Service for generating embeddings for markdown chunks using Azure OpenAI and uploading |
| 11 | +/// them to a PostgreSQL vector store via a staging-then-swap pattern to avoid downtime. |
9 | 12 | /// </summary> |
10 | | -public class EmbeddingService(VectorStore vectorStore, IEmbeddingGenerator<string, Embedding<float>> embeddingGenerator) |
| 13 | +public class EmbeddingService( |
| 14 | + VectorStore vectorStore, |
| 15 | + IEmbeddingGenerator<string, Embedding<float>> embeddingGenerator, |
| 16 | + NpgsqlDataSource? dataSource = null) |
11 | 17 | { |
12 | 18 | public static string CollectionName { get; } = "markdown_chunks"; |
13 | 19 |
|
| 20 | + /// <summary> |
| 21 | + /// Maximum number of inputs per Azure OpenAI embedding batch call. |
| 22 | + /// </summary> |
| 23 | + private const int EmbeddingBatchSize = 2048; |
| 24 | + |
| 25 | + // Only allow simple identifiers: letters, digits, and underscores, starting with a letter or underscore. |
| 26 | + private static readonly Regex _safeIdentifierRegex = new(@"^[a-zA-Z_][a-zA-Z0-9_]*$", RegexOptions.Compiled); |
| 27 | + |
14 | 28 | /// <summary> |
15 | 29 | /// Generate an embedding for the given text. |
16 | 30 | /// </summary> |
17 | | - /// <param name="text">The text to generate an embedding for.</param> |
18 | | - /// <param name="cancellationToken">The cancellation token.</param> |
19 | | - /// <returns>A search vector as ReadOnlyMemory<float>.</returns> |
20 | 31 | public async Task<ReadOnlyMemory<float>> GenerateEmbeddingAsync(string text, CancellationToken cancellationToken = default) |
21 | 32 | { |
22 | 33 | var embedding = await embeddingGenerator.GenerateAsync(text, cancellationToken: cancellationToken); |
23 | 34 | return embedding.Vector; |
24 | 35 | } |
25 | 36 |
|
26 | 37 | /// <summary> |
27 | | - /// Generate an embedding for each text paragraph and upload it to the specified collection. |
| 38 | + /// Generate embeddings for all chunks in batches and upload them to the vector store |
| 39 | + /// using a staging-then-atomic-swap pattern so the live collection stays queryable |
| 40 | + /// throughout the rebuild. |
| 41 | + /// |
| 42 | + /// Steps: |
| 43 | + /// 1. Create a staging collection ({collectionName}_staging). |
| 44 | + /// 2. For each batch of <see cref="EmbeddingBatchSize"/> chunks: embed the batch |
| 45 | + /// and immediately upsert it into staging, keeping peak memory bounded. |
| 46 | + /// 3. Atomically swap tables in a single transaction using two SQL RENAME operations |
| 47 | + /// (live → old, staging → live). PostgreSQL ALTER TABLE acquires |
| 48 | + /// AccessExclusiveLock automatically; no explicit LOCK TABLE is needed. The |
| 49 | + /// transaction ensures no reader sees an intermediate state. |
| 50 | + /// 4. Drop the old live backup table with DROP TABLE. |
| 51 | + /// |
| 52 | + /// If an error occurs before the swap, only the staging table is affected — the live |
| 53 | + /// collection is untouched. |
28 | 54 | /// </summary> |
29 | | - /// <param name="collectionName">The name of the collection to upload the text paragraphs to.</param> |
30 | | - /// <returns>An async task.</returns> |
31 | | - public async Task GenerateBookContentEmbeddingsAndUploadToVectorStore(IEnumerable<BookContentChunk> bookContents, CancellationToken cancellationToken, string? collectionName = null) |
| 55 | + public async Task GenerateBookContentEmbeddingsAndUploadToVectorStore( |
| 56 | + IEnumerable<BookContentChunk> bookContents, |
| 57 | + CancellationToken cancellationToken, |
| 58 | + string? collectionName = null) |
32 | 59 | { |
33 | 60 | collectionName ??= CollectionName; |
34 | 61 |
|
35 | | - var collection = vectorStore.GetCollection<string, BookContentChunk>(collectionName); |
36 | | - await collection.EnsureCollectionDeletedAsync(cancellationToken); |
37 | | - await collection.EnsureCollectionExistsAsync(cancellationToken); |
| 62 | + if (dataSource is null) |
| 63 | + throw new InvalidOperationException( |
| 64 | + $"{nameof(NpgsqlDataSource)} must be provided to upload embeddings. Ensure it is registered in DI."); |
38 | 65 |
|
39 | | - int uploadedCount = 0; |
| 66 | + if (!_safeIdentifierRegex.IsMatch(collectionName)) |
| 67 | + throw new ArgumentException( |
| 68 | + $"Collection name '{collectionName}' contains unsafe characters. Use only letters, digits, and underscores.", |
| 69 | + nameof(collectionName)); |
40 | 70 |
|
41 | | - foreach (var chunk in bookContents) |
| 71 | + string stagingName = $"{collectionName}_staging"; |
| 72 | + string oldName = $"{collectionName}_old"; |
| 73 | + |
| 74 | + // ── Step 1: Prepare staging collection ──────────────────────────────────────── |
| 75 | + var staging = vectorStore.GetCollection<string, BookContentChunk>(stagingName); |
| 76 | + await staging.EnsureCollectionDeletedAsync(cancellationToken); |
| 77 | + await staging.EnsureCollectionExistsAsync(cancellationToken); |
| 78 | + |
| 79 | + // ── Step 2 & 3: Batch-embed and immediately upsert each batch ───────────────── |
| 80 | + // Azure OpenAI supports at most EmbeddingBatchSize inputs per GenerateAsync call. |
| 81 | + // bookContents is streamed in fixed-size batches without full upfront materialization, |
| 82 | + // keeping peak memory bounded to one batch of chunk objects and their embeddings at a time. |
| 83 | + // The staging-swap (Step 3) is safe because it only runs after all batches have |
| 84 | + // been successfully upserted. |
| 85 | + var buffer = new List<BookContentChunk>(EmbeddingBatchSize); |
| 86 | + int totalCount = 0; |
| 87 | + |
| 88 | + async Task EmbedAndUpsertBatchAsync() |
42 | 89 | { |
43 | | - cancellationToken.ThrowIfCancellationRequested(); |
44 | | - chunk.TextEmbedding = await GenerateEmbeddingAsync(chunk.ChunkText, cancellationToken); |
45 | | - await collection.UpsertAsync(chunk, cancellationToken); |
46 | | - Console.WriteLine($"Uploaded chunk '{chunk.Id}' to collection '{collectionName}' for file '{chunk.FileName}' with heading '{chunk.Heading}'."); |
47 | | - uploadedCount++; |
| 90 | + var batchEmbeddings = await embeddingGenerator.GenerateAsync( |
| 91 | + buffer.Select(c => c.ChunkText), cancellationToken: cancellationToken); |
| 92 | + |
| 93 | + if (batchEmbeddings.Count != buffer.Count) |
| 94 | + throw new InvalidOperationException( |
| 95 | + $"Embedding count mismatch: expected {buffer.Count}, got {batchEmbeddings.Count}."); |
| 96 | + |
| 97 | + for (int i = 0; i < buffer.Count; i++) |
| 98 | + buffer[i].TextEmbedding = batchEmbeddings[i].Vector; |
| 99 | + |
| 100 | + await staging.UpsertAsync(buffer, cancellationToken); |
| 101 | + totalCount += buffer.Count; |
| 102 | + buffer.Clear(); |
| 103 | + } |
| 104 | + |
| 105 | + try |
| 106 | + { |
| 107 | + foreach (var chunk in bookContents) |
| 108 | + { |
| 109 | + buffer.Add(chunk); |
| 110 | + if (buffer.Count == EmbeddingBatchSize) |
| 111 | + await EmbedAndUpsertBatchAsync(); |
| 112 | + } |
| 113 | + |
| 114 | + if (buffer.Count > 0) |
| 115 | + await EmbedAndUpsertBatchAsync(); |
| 116 | + |
| 117 | + Console.WriteLine($"Uploaded {totalCount} chunks to staging collection '{stagingName}'."); |
48 | 118 | } |
49 | | - Console.WriteLine($"Successfully generated embeddings and uploaded {uploadedCount} chunks to collection '{collectionName}'."); |
| 119 | + catch |
| 120 | + { |
| 121 | + // Best-effort cleanup: drop the partially-populated staging table so the |
| 122 | + // next run starts clean. Do not let this secondary failure mask the original. |
| 123 | + try |
| 124 | + { |
| 125 | + await staging.EnsureCollectionDeletedAsync(cancellationToken); |
| 126 | + } |
| 127 | + catch (Exception cleanupEx) when (cleanupEx is not OperationCanceledException) |
| 128 | + { |
| 129 | + Console.Error.WriteLine($"Warning: failed to clean up staging collection '{stagingName}' after upsert failure: {cleanupEx.Message}"); |
| 130 | + } |
| 131 | + throw; |
| 132 | + } |
| 133 | + |
| 134 | + // ── Step 3: Atomic swap — staging → live ────────────────────────────────────── |
| 135 | + // Two ALTER TABLE RENAME operations in one transaction (live → old, staging → live). |
| 136 | + // Each RENAME auto-acquires AccessExclusiveLock on its table; the transaction |
| 137 | + // guarantees both renames are visible atomically to other sessions. |
| 138 | + await using var conn = await dataSource.OpenConnectionAsync(cancellationToken); |
| 139 | + await using var tx = await conn.BeginTransactionAsync(cancellationToken); |
| 140 | + |
| 141 | + await using (var cmd = conn.CreateCommand()) |
| 142 | + { |
| 143 | + cmd.Transaction = tx; |
| 144 | + |
| 145 | + // Drop any leftover backup from a previous run |
| 146 | + cmd.CommandText = $"DROP TABLE IF EXISTS \"{oldName}\""; |
| 147 | + await cmd.ExecuteNonQueryAsync(cancellationToken); |
| 148 | + |
| 149 | + // Rename live → old. IF EXISTS is a no-op on first run when no live table exists. |
| 150 | + cmd.CommandText = $"ALTER TABLE IF EXISTS \"{collectionName}\" RENAME TO \"{oldName}\""; |
| 151 | + await cmd.ExecuteNonQueryAsync(cancellationToken); |
| 152 | + |
| 153 | + // Rename staging → live |
| 154 | + cmd.CommandText = $"ALTER TABLE \"{stagingName}\" RENAME TO \"{collectionName}\""; |
| 155 | + await cmd.ExecuteNonQueryAsync(cancellationToken); |
| 156 | + } |
| 157 | + |
| 158 | + await tx.CommitAsync(cancellationToken); |
| 159 | + Console.WriteLine($"Swapped '{stagingName}' → '{collectionName}' atomically."); |
| 160 | + |
| 161 | + // ── Step 4: Drop the old backup ─────────────────────────────────────────────── |
| 162 | + await using (var cmd = conn.CreateCommand()) |
| 163 | + { |
| 164 | + cmd.CommandText = $"DROP TABLE IF EXISTS \"{oldName}\""; |
| 165 | + await cmd.ExecuteNonQueryAsync(cancellationToken); |
| 166 | + } |
| 167 | + |
| 168 | + Console.WriteLine($"Successfully generated embeddings and uploaded {totalCount} chunks to collection '{collectionName}'."); |
50 | 169 | } |
51 | 170 | } |
0 commit comments