Skip to content

Commit 95e267f

Browse files
committed
Merge remote-tracking branch 'origin/main' into paper/10-ink-bleed
# Conflicts: # frontend/taskdeck-web/src/views/PaperStyleGuideView.vue
2 parents 4458785 + 720fefe commit 95e267f

105 files changed

Lines changed: 13328 additions & 49 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

backend/src/Taskdeck.Api/Extensions/OptionsValidationRegistration.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public static IServiceCollection AddOptionsValidation(
4141

4242
services.RegisterValidatedOptions<WorkerSettings>(configuration, "Workers");
4343
services.RegisterValidatedOptions<AuditRetentionSettings>(configuration, "AuditRetention");
44+
services.RegisterValidatedOptions<EmbeddingBackfillSettings>(configuration, "EmbeddingBackfill");
4445

4546
// ── Settings from CorsRegistration (Cache is used in infrastructure) ─
4647

backend/src/Taskdeck.Api/Extensions/WorkerRegistration.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,15 @@ public static IServiceCollection AddTaskdeckWorkers(
4242
var auditRetentionSettings = configuration.GetSection("AuditRetention").Get<AuditRetentionSettings>() ?? new AuditRetentionSettings();
4343
services.AddSingleton(auditRetentionSettings);
4444

45+
var embeddingBackfillSettings = configuration.GetSection("EmbeddingBackfill").Get<EmbeddingBackfillSettings>() ?? new EmbeddingBackfillSettings();
46+
services.AddSingleton(embeddingBackfillSettings);
47+
4548
services.AddSingleton<WorkerHeartbeatRegistry>();
4649
services.AddHostedService<LlmQueueToProposalWorker>();
4750
services.AddHostedService<ProposalHousekeepingWorker>();
4851
services.AddHostedService<OutboundWebhookDeliveryWorker>();
4952
services.AddHostedService<AuditRetentionWorker>();
53+
services.AddHostedService<EmbeddingBackfillWorker>();
5054

5155
return services;
5256
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
using Taskdeck.Application.Services;
2+
using Taskdeck.Api.Telemetry;
3+
4+
namespace Taskdeck.Api.Workers;
5+
6+
/// <summary>
7+
/// Background worker that drives the embedding backfill service on a
8+
/// configurable interval. Resumable across restarts (the backfill service
9+
/// tracks which entities have embeddings). Uses exponential backoff on
10+
/// consecutive failures to avoid log spam.
11+
/// </summary>
12+
public class EmbeddingBackfillWorker : BackgroundService
13+
{
14+
private readonly IServiceScopeFactory _scopeFactory;
15+
private readonly EmbeddingBackfillSettings _settings;
16+
private readonly WorkerHeartbeatRegistry _heartbeatRegistry;
17+
private readonly ILogger<EmbeddingBackfillWorker> _logger;
18+
19+
public EmbeddingBackfillWorker(
20+
IServiceScopeFactory scopeFactory,
21+
EmbeddingBackfillSettings settings,
22+
WorkerHeartbeatRegistry heartbeatRegistry,
23+
ILogger<EmbeddingBackfillWorker> logger)
24+
{
25+
_scopeFactory = scopeFactory;
26+
_settings = settings;
27+
_heartbeatRegistry = heartbeatRegistry;
28+
_logger = logger;
29+
}
30+
31+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
32+
{
33+
if (!_settings.Enabled)
34+
{
35+
_logger.LogInformation("EmbeddingBackfillWorker is disabled via configuration");
36+
return;
37+
}
38+
39+
_logger.LogInformation(
40+
"EmbeddingBackfillWorker starting (batch={BatchSize}, interval={IntervalSeconds}s)",
41+
_settings.BatchSize,
42+
_settings.PollIntervalSeconds);
43+
44+
int consecutiveErrors = 0;
45+
46+
while (!stoppingToken.IsCancellationRequested)
47+
{
48+
_heartbeatRegistry.ReportHeartbeat(nameof(EmbeddingBackfillWorker));
49+
50+
try
51+
{
52+
using var scope = _scopeFactory.CreateScope();
53+
var backfillService = scope.ServiceProvider
54+
.GetRequiredService<IEmbeddingBackfillService>();
55+
56+
var result = await backfillService.ProcessBatchAsync(
57+
_settings.BatchSize,
58+
stoppingToken);
59+
60+
consecutiveErrors = 0;
61+
62+
// If nothing to process, use normal interval
63+
// If items remain, poll sooner to catch up
64+
var delay = result.Remaining > 0
65+
? TimeSpan.FromSeconds(1)
66+
: TimeSpan.FromSeconds(_settings.PollIntervalSeconds);
67+
68+
await Task.Delay(delay, stoppingToken);
69+
}
70+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
71+
{
72+
break;
73+
}
74+
catch (Exception ex)
75+
{
76+
consecutiveErrors++;
77+
78+
_logger.LogError(
79+
"EmbeddingBackfillWorker iteration failed ({ConsecutiveErrors} consecutive). {Error}",
80+
consecutiveErrors,
81+
SensitiveDataRedactor.SummarizeException(ex));
82+
83+
var backoffSeconds = CalculateBackoffSeconds(consecutiveErrors);
84+
85+
await Task.Delay(TimeSpan.FromSeconds(backoffSeconds), stoppingToken);
86+
}
87+
}
88+
89+
_logger.LogInformation("EmbeddingBackfillWorker stopped");
90+
}
91+
92+
internal int CalculateBackoffSeconds(int consecutiveErrors)
93+
{
94+
var cappedErrors = Math.Min(
95+
Math.Max(consecutiveErrors, 0),
96+
_settings.MaxConsecutiveErrors);
97+
98+
var delaySeconds = _settings.PollIntervalSeconds;
99+
for (var i = 0; i < cappedErrors; i++)
100+
{
101+
if (delaySeconds >= _settings.MaxBackoffSeconds / 2)
102+
{
103+
return _settings.MaxBackoffSeconds;
104+
}
105+
106+
delaySeconds *= 2;
107+
}
108+
109+
return Math.Min(_settings.MaxBackoffSeconds, delaySeconds);
110+
}
111+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace Taskdeck.Application.DTOs;
2+
3+
/// <summary>
4+
/// Represents a single result from a nearest-neighbor vector search.
5+
/// </summary>
6+
public sealed record VectorSearchResult(
7+
string DocumentId,
8+
double Score,
9+
IReadOnlyDictionary<string, string>? Metadata = null);
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
namespace Taskdeck.Application.Interfaces;
2+
3+
/// <summary>
4+
/// Generates fixed-dimension embedding vectors from text input.
5+
/// All implementations must run locally -- no user content may leave the machine.
6+
/// </summary>
7+
public interface IEmbeddingGenerator
8+
{
9+
/// <summary>
10+
/// The dimensionality of vectors produced by this generator.
11+
/// Callers use this to pre-allocate storage and validate index compatibility.
12+
/// </summary>
13+
int Dimensions { get; }
14+
15+
/// <summary>
16+
/// Whether the generator is ready to produce embeddings.
17+
/// Returns false when the underlying model failed to load or dependencies
18+
/// are unavailable, allowing callers to fall back to FTS.
19+
/// </summary>
20+
bool IsAvailable { get; }
21+
22+
/// <summary>
23+
/// Generates an embedding vector for a single text input.
24+
/// </summary>
25+
Task<ReadOnlyMemory<float>> GenerateAsync(
26+
string text,
27+
CancellationToken cancellationToken = default);
28+
29+
/// <summary>
30+
/// Generates embedding vectors for a batch of text inputs.
31+
/// The returned list is positionally aligned with the input list.
32+
/// </summary>
33+
Task<IReadOnlyList<ReadOnlyMemory<float>>> GenerateBatchAsync(
34+
IReadOnlyList<string> texts,
35+
CancellationToken cancellationToken = default);
36+
}

backend/src/Taskdeck.Application/Interfaces/IKnowledgeChunkRepository.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@ namespace Taskdeck.Application.Interfaces;
44

55
public interface IKnowledgeChunkRepository : IRepository<KnowledgeChunk>
66
{
7+
Task<IEnumerable<KnowledgeChunk>> GetUnindexedBatchAsync(
8+
KnowledgeChunkBackfillCursor? cursor,
9+
int batchSize,
10+
CancellationToken cancellationToken = default);
11+
12+
Task<IReadOnlySet<Guid>> GetExistingIdsAsync(
13+
IReadOnlyCollection<Guid> candidateChunkIds,
14+
CancellationToken cancellationToken = default);
15+
16+
Task<int> CountUnindexedAsync(
17+
KnowledgeChunkBackfillCursor? cursor,
18+
CancellationToken cancellationToken = default);
19+
720
Task<IEnumerable<KnowledgeChunk>> GetByDocumentIdAsync(
821
Guid documentId,
922
CancellationToken cancellationToken = default);
@@ -12,3 +25,8 @@ Task DeleteByDocumentIdAsync(
1225
Guid documentId,
1326
CancellationToken cancellationToken = default);
1427
}
28+
29+
public sealed record KnowledgeChunkBackfillCursor(
30+
DateTimeOffset CreatedAt,
31+
Guid Id,
32+
IReadOnlySet<Guid>? ProcessedIdsAtCreatedAt = null);
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
using Taskdeck.Application.DTOs;
2+
3+
namespace Taskdeck.Application.Interfaces;
4+
5+
/// <summary>
6+
/// Abstracts vector storage and nearest-neighbor search so the backing store
7+
/// (in-memory, sqlite-vec, external service) is swappable without changing
8+
/// application-layer code.
9+
///
10+
/// All vectors are keyed by a string document identifier.
11+
/// Implementations must be thread-safe for concurrent read/write.
12+
/// </summary>
13+
public interface IVectorIndex
14+
{
15+
/// <summary>
16+
/// Upserts a single document embedding into the index.
17+
/// If a vector with the same <paramref name="documentId"/> already exists
18+
/// it is replaced atomically.
19+
/// </summary>
20+
Task UpsertAsync(
21+
string documentId,
22+
ReadOnlyMemory<float> vector,
23+
IReadOnlyDictionary<string, string>? metadata = null,
24+
CancellationToken cancellationToken = default);
25+
26+
/// <summary>
27+
/// Upserts multiple document embeddings in a single batch.
28+
/// Implementations should optimize for throughput over latency.
29+
/// </summary>
30+
Task UpsertBatchAsync(
31+
IReadOnlyList<VectorDocument> documents,
32+
CancellationToken cancellationToken = default);
33+
34+
/// <summary>
35+
/// Finds the <paramref name="topK"/> nearest neighbors of the given
36+
/// <paramref name="queryVector"/>.
37+
/// </summary>
38+
Task<IReadOnlyList<VectorSearchResult>> QueryAsync(
39+
ReadOnlyMemory<float> queryVector,
40+
int topK = 10,
41+
IReadOnlyDictionary<string, string>? filter = null,
42+
CancellationToken cancellationToken = default);
43+
44+
/// <summary>
45+
/// Deletes the vector associated with <paramref name="documentId"/>.
46+
/// No-op if the document does not exist.
47+
/// </summary>
48+
Task DeleteAsync(
49+
string documentId,
50+
CancellationToken cancellationToken = default);
51+
52+
/// <summary>
53+
/// Deletes all vectors whose document IDs match any of the given IDs.
54+
/// </summary>
55+
Task DeleteBatchAsync(
56+
IReadOnlyList<string> documentIds,
57+
CancellationToken cancellationToken = default);
58+
59+
/// <summary>
60+
/// Returns the number of vectors currently stored in the index.
61+
/// </summary>
62+
Task<long> CountAsync(CancellationToken cancellationToken = default);
63+
}
64+
65+
/// <summary>
66+
/// Batch-upsert payload for <see cref="IVectorIndex.UpsertBatchAsync"/>.
67+
/// </summary>
68+
public sealed record VectorDocument(
69+
string DocumentId,
70+
ReadOnlyMemory<float> Vector,
71+
IReadOnlyDictionary<string, string>? Metadata = null);
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System.ComponentModel.DataAnnotations;
2+
3+
namespace Taskdeck.Application.Services;
4+
5+
/// <summary>
6+
/// Configuration for the embedding backfill worker.
7+
/// </summary>
8+
public class EmbeddingBackfillSettings
9+
{
10+
/// <summary>
11+
/// Whether the backfill worker is enabled. When false, no embedding
12+
/// backfill occurs even if vector dependencies are available.
13+
/// </summary>
14+
public bool Enabled { get; set; } = true;
15+
16+
/// <summary>
17+
/// Number of items to process in each backfill batch.
18+
/// </summary>
19+
[Range(1, 500, ErrorMessage = "BatchSize must be between 1 and 500.")]
20+
public int BatchSize { get; set; } = 50;
21+
22+
/// <summary>
23+
/// Seconds to wait between backfill iterations.
24+
/// </summary>
25+
[Range(1, 86400, ErrorMessage = "PollIntervalSeconds must be between 1 and 86400.")]
26+
public int PollIntervalSeconds { get; set; } = 30;
27+
28+
/// <summary>
29+
/// Maximum number of consecutive errors before the worker pauses
30+
/// with exponential backoff.
31+
/// </summary>
32+
[Range(1, 100, ErrorMessage = "MaxConsecutiveErrors must be between 1 and 100.")]
33+
public int MaxConsecutiveErrors { get; set; } = 5;
34+
35+
/// <summary>
36+
/// Maximum backoff delay in seconds when consecutive errors occur.
37+
/// </summary>
38+
[Range(10, 3600, ErrorMessage = "MaxBackoffSeconds must be between 10 and 3600.")]
39+
public int MaxBackoffSeconds { get; set; } = 300;
40+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
namespace Taskdeck.Application.Services;
2+
3+
/// <summary>
4+
/// Drives embedding backfill for entities that have not yet been indexed.
5+
/// Implementations must be resumable (track progress) and failure-safe
6+
/// (individual item failures do not block the batch).
7+
/// </summary>
8+
public interface IEmbeddingBackfillService
9+
{
10+
/// <summary>
11+
/// Processes a single batch of un-embedded items. Returns the number
12+
/// of items successfully embedded in this batch (0 when caught up).
13+
/// </summary>
14+
Task<BackfillBatchResult> ProcessBatchAsync(
15+
int batchSize,
16+
CancellationToken cancellationToken = default);
17+
}
18+
19+
/// <summary>
20+
/// Result of a single backfill batch execution.
21+
/// </summary>
22+
public sealed record BackfillBatchResult(
23+
int Processed,
24+
int Failed,
25+
int Remaining);
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
using Taskdeck.Application.DTOs;
2+
3+
namespace Taskdeck.Application.Services;
4+
5+
public interface IFtsKnowledgeSearchService : IKnowledgeSearchService
6+
{
7+
}

0 commit comments

Comments
 (0)