Skip to content

Commit deea49e

Browse files
committed
fix(thread-safety): eliminate races in log mutex, watcher, and index threads
- http_server: Replace lazy log mutex init with cbm_ui_log_init() using atomic_exchange once-init. cbm_ui_log_append() calls it on first use so early startup logs are not silently dropped. - watcher: Add cbm_mutex_t to protect projects hash table. poll_once snapshots project pointers under lock then polls without holding it, keeping the critical section small during git I/O and indexing. - compat_thread: Add cbm_thread_detach() for POSIX and Windows. Both join() and detach() clear the handle on success for consistent lifecycle tracking across platforms. - http_server: Detach index job threads to prevent handle leaks.
1 parent 1d30971 commit deea49e

File tree

6 files changed

+92
-6
lines changed

6 files changed

+92
-6
lines changed

src/foundation/compat_thread.c

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,14 @@ int cbm_thread_join(cbm_thread_t *t) {
5959
return 0;
6060
}
6161

62+
int cbm_thread_detach(cbm_thread_t *t) {
63+
if (t->handle) {
64+
CloseHandle(t->handle);
65+
t->handle = NULL;
66+
}
67+
return 0;
68+
}
69+
6270
#else /* POSIX */
6371

6472
int cbm_thread_create(cbm_thread_t *t, size_t stack_size, void *(*fn)(void *), void *arg) {
@@ -74,7 +82,19 @@ int cbm_thread_create(cbm_thread_t *t, size_t stack_size, void *(*fn)(void *), v
7482
}
7583

7684
int cbm_thread_join(cbm_thread_t *t) {
77-
return pthread_join(t->handle, NULL);
85+
int rc = pthread_join(t->handle, NULL);
86+
if (rc == 0) {
87+
memset(&t->handle, 0, sizeof(t->handle));
88+
}
89+
return rc;
90+
}
91+
92+
int cbm_thread_detach(cbm_thread_t *t) {
93+
int rc = pthread_detach(t->handle);
94+
if (rc == 0) {
95+
memset(&t->handle, 0, sizeof(t->handle));
96+
}
97+
return rc;
7898
}
7999

80100
#endif

src/foundation/compat_thread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ int cbm_thread_create(cbm_thread_t *t, size_t stack_size, void *(*fn)(void *), v
3939
/* Wait for thread to finish. Returns 0 on success. */
4040
int cbm_thread_join(cbm_thread_t *t);
4141

42+
/* Detach thread so resources are freed on exit. Returns 0 on success. */
43+
int cbm_thread_detach(cbm_thread_t *t);
44+
4245
/* ── Mutex ────────────────────────────────────────────────────── */
4346

4447
#ifdef _WIN32

src/main.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,9 @@ int main(int argc, char **argv) {
307307
}
308308

309309
/* Create and start watcher in background thread */
310+
/* Initialize log mutex before any threads are created */
311+
cbm_ui_log_init();
312+
310313
cbm_store_t *watch_store = cbm_store_open_memory();
311314
g_watcher = cbm_watcher_new(watch_store, watcher_index_fn, NULL);
312315

src/ui/http_server.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,20 @@ static int g_log_count = 0;
142142
static cbm_mutex_t g_log_mutex;
143143
static atomic_int g_log_mutex_init = 0;
144144

145+
/* Must be called once before any threads are created. */
146+
void cbm_ui_log_init(void) {
147+
if (!atomic_exchange(&g_log_mutex_init, 1)) {
148+
cbm_mutex_init(&g_log_mutex);
149+
}
150+
}
151+
145152
/* Called from a log hook — appends a line to the ring buffer (thread-safe) */
146153
void cbm_ui_log_append(const char *line) {
147154
if (!line)
148155
return;
149-
if (!atomic_load(&g_log_mutex_init)) {
150-
cbm_mutex_init(&g_log_mutex);
151-
atomic_store(&g_log_mutex_init, 1);
152-
}
156+
/* Ensure mutex is initialized (safe for early single-threaded logging
157+
* and concurrent calls via atomic_exchange once-init pattern). */
158+
cbm_ui_log_init();
153159
cbm_mutex_lock(&g_log_mutex);
154160
snprintf(g_log_ring[g_log_head], LOG_LINE_MAX, "%s", line);
155161
g_log_head = (g_log_head + 1) % LOG_RING_SIZE;
@@ -791,6 +797,7 @@ static void handle_index_start(struct mg_connection *c, struct mg_http_message *
791797
mg_http_reply(c, 500, g_cors_json, "{\"error\":\"thread creation failed\"}");
792798
return;
793799
}
800+
cbm_thread_detach(&tid); /* Don't leak thread handle */
794801

795802
mg_http_reply(c, 202, g_cors_json, "{\"status\":\"indexing\",\"slot\":%d,\"path\":\"%s\"}",
796803
slot, job->root_path);

src/ui/http_server.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ void cbm_http_server_run(cbm_http_server_t *srv);
3232
/* Check if the server started successfully (listener bound). */
3333
bool cbm_http_server_is_running(const cbm_http_server_t *srv);
3434

35+
/* Initialize the log ring buffer mutex. Must be called once before any threads. */
36+
void cbm_ui_log_init(void);
37+
3538
/* Append a log line to the UI ring buffer (called from log hook). */
3639
void cbm_ui_log_append(const char *line);
3740

src/watcher/watcher.c

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "foundation/log.h"
2121
#include "foundation/hash_table.h"
2222
#include "foundation/compat.h"
23+
#include "foundation/compat_thread.h"
2324
#include "foundation/compat_fs.h"
2425
#include "foundation/str_util.h"
2526

@@ -50,6 +51,7 @@ struct cbm_watcher {
5051
cbm_index_fn index_fn;
5152
void *user_data;
5253
CBMHashTable *projects; /* name → project_state_t* */
54+
cbm_mutex_t projects_lock;
5355
atomic_int stopped;
5456
};
5557

@@ -236,6 +238,7 @@ cbm_watcher_t *cbm_watcher_new(cbm_store_t *store, cbm_index_fn index_fn, void *
236238
w->index_fn = index_fn;
237239
w->user_data = user_data;
238240
w->projects = cbm_ht_create(CBM_SZ_32);
241+
cbm_mutex_init(&w->projects_lock);
239242
atomic_init(&w->stopped, 0);
240243
return w;
241244
}
@@ -244,8 +247,11 @@ void cbm_watcher_free(cbm_watcher_t *w) {
244247
if (!w) {
245248
return;
246249
}
250+
cbm_mutex_lock(&w->projects_lock);
247251
cbm_ht_foreach(w->projects, free_state_entry, NULL);
248252
cbm_ht_free(w->projects);
253+
cbm_mutex_unlock(&w->projects_lock);
254+
cbm_mutex_destroy(&w->projects_lock);
249255
free(w);
250256
}
251257

@@ -264,6 +270,7 @@ void cbm_watcher_watch(cbm_watcher_t *w, const char *project_name, const char *r
264270
}
265271

266272
/* Remove old entry first (key points to state's project_name) */
273+
cbm_mutex_lock(&w->projects_lock);
267274
project_state_t *old = cbm_ht_get(w->projects, project_name);
268275
if (old) {
269276
cbm_ht_delete(w->projects, project_name);
@@ -272,17 +279,24 @@ void cbm_watcher_watch(cbm_watcher_t *w, const char *project_name, const char *r
272279

273280
project_state_t *s = state_new(project_name, root_path);
274281
cbm_ht_set(w->projects, s->project_name, s);
282+
cbm_mutex_unlock(&w->projects_lock);
275283
cbm_log_info("watcher.watch", "project", project_name, "path", root_path);
276284
}
277285

278286
void cbm_watcher_unwatch(cbm_watcher_t *w, const char *project_name) {
279287
if (!w || !project_name) {
280288
return;
281289
}
290+
bool removed = false;
291+
cbm_mutex_lock(&w->projects_lock);
282292
project_state_t *s = cbm_ht_get(w->projects, project_name);
283293
if (s) {
284294
cbm_ht_delete(w->projects, project_name);
285295
state_free(s);
296+
removed = true;
297+
}
298+
cbm_mutex_unlock(&w->projects_lock);
299+
if (removed) {
286300
cbm_log_info("watcher.unwatch", "project", project_name);
287301
}
288302
}
@@ -411,17 +425,53 @@ static void poll_project(const char *key, void *val, void *ud) {
411425
s->next_poll_ns = ctx->now + ((int64_t)s->interval_ms * US_PER_MS);
412426
}
413427

428+
/* Callback to snapshot project state pointers into an array. */
429+
typedef struct {
430+
project_state_t **items;
431+
int count;
432+
int cap;
433+
} snapshot_ctx_t;
434+
435+
static void snapshot_project(const char *key, void *val, void *ud) {
436+
(void)key;
437+
snapshot_ctx_t *sc = ud;
438+
if (val && sc->count < sc->cap) {
439+
sc->items[sc->count++] = val;
440+
}
441+
}
442+
414443
int cbm_watcher_poll_once(cbm_watcher_t *w) {
415444
if (!w) {
416445
return 0;
417446
}
418447

448+
/* Snapshot project pointers under lock, then poll without holding it.
449+
* This keeps the critical section small — poll_project does git I/O
450+
* and may invoke index_fn which runs the full pipeline. */
451+
cbm_mutex_lock(&w->projects_lock);
452+
int n = cbm_ht_count(w->projects);
453+
if (n == 0) {
454+
cbm_mutex_unlock(&w->projects_lock);
455+
return 0;
456+
}
457+
project_state_t **snap = malloc(n * sizeof(project_state_t *));
458+
if (!snap) {
459+
cbm_mutex_unlock(&w->projects_lock);
460+
return 0;
461+
}
462+
snapshot_ctx_t sc = {.items = snap, .count = 0, .cap = n};
463+
cbm_ht_foreach(w->projects, snapshot_project, &sc);
464+
cbm_mutex_unlock(&w->projects_lock);
465+
419466
poll_ctx_t ctx = {
420467
.w = w,
421468
.now = now_ns(),
422469
.reindexed = 0,
423470
};
424-
cbm_ht_foreach(w->projects, poll_project, &ctx);
471+
for (int i = 0; i < sc.count; i++) {
472+
poll_project(NULL, snap[i], &ctx);
473+
}
474+
free(snap);
425475
return ctx.reindexed;
426476
}
427477

0 commit comments

Comments
 (0)