Skip to content

Commit 0f9fa30

Browse files
gh-91: Serialize index assignment.
1 parent b3d550d commit 0f9fa30

6 files changed

Lines changed: 170 additions & 14 deletions

File tree

src/builtins.c

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "builtins.h"
22
#include "interpreter.h"
3+
#include "ns_buffer.h"
34
#include "lexer.h"
45
#include "parser.h"
56
#include "extensions.h"
@@ -6692,7 +6693,33 @@ static Value builtin_assign(Interpreter* interp, Value* args, int argc, Expr** a
66926693

66936694
// Indexed target (e.g., tns[...], map<...>)
66946695
if (target->type == EXPR_INDEX) {
6695-
ExecResult res = assign_index_chain(interp, env, target, rhs, line, col);
6696+
ExecResult res;
6697+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread()) {
6698+
char* buffered_error = NULL;
6699+
int buffered_line = 0;
6700+
int buffered_col = 0;
6701+
if (!ns_buffer_assign_index(interp, env, target, rhs, line, col,
6702+
&buffered_error, &buffered_line, &buffered_col)) {
6703+
res.status = EXEC_ERROR;
6704+
res.value = value_null();
6705+
res.break_count = 0;
6706+
res.jump_index = -1;
6707+
res.error = buffered_error ? strdup(buffered_error) : strdup("Indexed assignment failed");
6708+
res.error_line = buffered_line ? buffered_line : line;
6709+
res.error_column = buffered_col ? buffered_col : col;
6710+
} else {
6711+
res.status = EXEC_OK;
6712+
res.value = value_null();
6713+
res.break_count = 0;
6714+
res.jump_index = -1;
6715+
res.error = NULL;
6716+
res.error_line = 0;
6717+
res.error_column = 0;
6718+
}
6719+
free(buffered_error);
6720+
} else {
6721+
res = assign_index_chain(interp, env, target, rhs, line, col);
6722+
}
66966723
if (res.status == EXEC_ERROR) {
66976724
if (res.error) {
66986725
interp->error = strdup(res.error);

src/env.c

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -394,47 +394,47 @@ int env_permafreeze_direct(Env* env, const char* name) {
394394
/* ================================================================== */
395395

396396
bool env_define(Env* env, const char* name, DeclType type) {
397-
if (ns_buffer_active())
397+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread())
398398
return ns_buffer_define(env, name, type);
399399
return env_define_direct(env, name, type);
400400
}
401401

402402
bool env_assign(Env* env, const char* name, Value value,
403403
DeclType type, bool declare_if_missing) {
404-
if (ns_buffer_active())
404+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread())
405405
return ns_buffer_assign(env, name, value, type, declare_if_missing);
406406
return env_assign_direct(env, name, value, type, declare_if_missing);
407407
}
408408

409409
bool env_delete(Env* env, const char* name) {
410-
if (ns_buffer_active())
410+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread())
411411
return ns_buffer_delete(env, name);
412412
return env_delete_direct(env, name);
413413
}
414414

415415
bool env_set_alias(Env* env, const char* name, const char* target_name,
416416
DeclType type, bool declare_if_missing) {
417-
if (ns_buffer_active())
417+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread())
418418
return ns_buffer_set_alias(env, name, target_name, type,
419419
declare_if_missing);
420420
return env_set_alias_direct(env, name, target_name, type,
421421
declare_if_missing);
422422
}
423423

424424
int env_freeze(Env* env, const char* name) {
425-
if (ns_buffer_active())
425+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread())
426426
return ns_buffer_freeze(env, name);
427427
return env_freeze_direct(env, name);
428428
}
429429

430430
int env_thaw(Env* env, const char* name) {
431-
if (ns_buffer_active())
431+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread())
432432
return ns_buffer_thaw(env, name);
433433
return env_thaw_direct(env, name);
434434
}
435435

436436
int env_permafreeze(Env* env, const char* name) {
437-
if (ns_buffer_active())
437+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread())
438438
return ns_buffer_permafreeze(env, name);
439439
return env_permafreeze_direct(env, name);
440440
}
@@ -447,7 +447,7 @@ int env_permafreeze(Env* env, const char* name) {
447447

448448
EnvEntry* env_get_entry(Env* env, const char* name) {
449449
EnvEntry* snap = env_entry_snap_alloc();
450-
if (ns_buffer_active()) {
450+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread()) {
451451
ns_buffer_read_lock(name);
452452
EnvEntry* entry = env_get_entry_raw(env, name);
453453
env_entry_snap_from_raw(snap, entry);
@@ -471,7 +471,7 @@ EnvEntry* env_get_entry(Env* env, const char* name) {
471471

472472
bool env_get(Env* env, const char* name, Value* out_value,
473473
DeclType* out_type, bool* out_initialized) {
474-
if (ns_buffer_active()) {
474+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread()) {
475475
ns_buffer_read_lock(name);
476476
bool r = env_get_raw(env, name, out_value, out_type, out_initialized);
477477
ns_buffer_read_unlock();
@@ -481,7 +481,7 @@ bool env_get(Env* env, const char* name, Value* out_value,
481481
}
482482

483483
bool env_exists(Env* env, const char* name) {
484-
if (ns_buffer_active()) {
484+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread()) {
485485
ns_buffer_read_lock(name);
486486
bool r = env_exists_raw(env, name);
487487
ns_buffer_read_unlock();
@@ -491,7 +491,7 @@ bool env_exists(Env* env, const char* name) {
491491
}
492492

493493
int env_frozen_state(Env* env, const char* name) {
494-
if (ns_buffer_active()) {
494+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread()) {
495495
ns_buffer_read_lock(name);
496496
int r = env_frozen_state_raw(env, name);
497497
ns_buffer_read_unlock();
@@ -501,7 +501,7 @@ int env_frozen_state(Env* env, const char* name) {
501501
}
502502

503503
int env_permafrozen(Env* env, const char* name) {
504-
if (ns_buffer_active()) {
504+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread()) {
505505
ns_buffer_read_lock(name);
506506
int r = env_permafrozen_raw(env, name);
507507
ns_buffer_read_unlock();

src/interpreter.c

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2362,7 +2362,24 @@ static ExecResult exec_stmt(Interpreter* interp, Stmt* stmt, Env* env, LabelMap*
23622362
return make_error("Can only assign to indexed targets or identifiers", stmt->line, stmt->column);
23632363
}
23642364

2365-
ExecResult ar = assign_index_chain(interp, env, stmt->as.assign.target, v, stmt->line, stmt->column);
2365+
ExecResult ar;
2366+
if (ns_buffer_active() && !ns_buffer_is_prepare_thread()) {
2367+
char* buffered_error = NULL;
2368+
int buffered_line = 0;
2369+
int buffered_col = 0;
2370+
if (!ns_buffer_assign_index(interp, env, stmt->as.assign.target, v,
2371+
stmt->line, stmt->column,
2372+
&buffered_error, &buffered_line, &buffered_col)) {
2373+
ar = make_error(buffered_error ? buffered_error : "Indexed assignment failed",
2374+
buffered_line ? buffered_line : stmt->line,
2375+
buffered_col ? buffered_col : stmt->column);
2376+
} else {
2377+
ar = make_ok(value_null());
2378+
}
2379+
free(buffered_error);
2380+
} else {
2381+
ar = assign_index_chain(interp, env, stmt->as.assign.target, v, stmt->line, stmt->column);
2382+
}
23662383
if (ar.status == EXEC_ERROR) {
23672384
value_free(v);
23682385
return ar;

src/ns_buffer.c

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
#include "ns_buffer.h"
2828
#include "env.h"
29+
#include "interpreter.h"
2930

3031
#include <stdlib.h>
3132
#include <string.h>
@@ -40,6 +41,7 @@
4041
/* ------------------------------------------------------------------ */
4142

4243
static NsBuffer* g_ns_buf = NULL;
44+
static _Thread_local int g_ns_prepare_thread = 0;
4345

4446
/* ------------------------------------------------------------------ */
4547
/* Helpers */
@@ -95,6 +97,19 @@ static void execute_op(NsOp* op) {
9597
op->result_ok = env_assign_direct(op->env, op->name, op->value,
9698
op->decl_type, op->declare_if_missing);
9799
break;
100+
case NS_OP_INDEX_ASSIGN: {
101+
ExecResult res = assign_index_chain(op->interp, op->env, op->index_expr,
102+
op->value, op->stmt_line, op->stmt_col);
103+
op->result_ok = (res.status == EXEC_OK);
104+
if (res.status == EXEC_ERROR && res.error) {
105+
op->error_message = strdup(res.error);
106+
op->error_line = res.error_line;
107+
op->error_col = res.error_column;
108+
}
109+
value_free(res.value);
110+
if (res.error) free(res.error);
111+
break;
112+
}
98113
case NS_OP_DELETE:
99114
op->result_ok = env_delete_direct(op->env, op->name);
100115
break;
@@ -121,6 +136,7 @@ static void execute_op(NsOp* op) {
121136

122137
static int prepare_thread_func(void* arg) {
123138
NsBuffer* buf = (NsBuffer*)arg;
139+
g_ns_prepare_thread = 1;
124140

125141
while (buf->running) {
126142
/* ---- Phase 1: wait for and dequeue the oldest operation ---- */
@@ -202,6 +218,7 @@ static int prepare_thread_func(void* arg) {
202218
}
203219
mtx_unlock(&buf->queue_mtx);
204220

221+
g_ns_prepare_thread = 0;
205222
return 0;
206223
}
207224

@@ -267,6 +284,10 @@ bool ns_buffer_active(void) {
267284
return g_ns_buf != NULL && g_ns_buf->running;
268285
}
269286

287+
bool ns_buffer_is_prepare_thread(void) {
288+
return g_ns_prepare_thread != 0;
289+
}
290+
270291
/* ------------------------------------------------------------------ */
271292
/* Public: read-side synchronisation */
272293
/* ------------------------------------------------------------------ */
@@ -332,6 +353,7 @@ static void wait_op(NsOp* op) {
332353
static void free_op(NsOp* op) {
333354
free(op->name);
334355
if (op->target_name) free(op->target_name);
356+
if (op->error_message) free(op->error_message);
335357
/* Note: op->value is NOT freed here – ownership was transferred
336358
to the env by the _direct function. If the op failed we must
337359
still not double-free because the _direct function already
@@ -370,6 +392,52 @@ bool ns_buffer_assign(struct Env* env, const char* name, Value value,
370392
return r;
371393
}
372394

395+
static const char* ns_index_base_name(Expr* idx_expr) {
396+
Expr* walker = idx_expr;
397+
while (walker && walker->type == EXPR_INDEX) {
398+
walker = walker->as.index.target;
399+
}
400+
if (!walker || walker->type != EXPR_IDENT) return NULL;
401+
return walker->as.ident;
402+
}
403+
404+
bool ns_buffer_assign_index(struct Interpreter* interp, struct Env* env,
405+
Expr* idx_expr, Value value,
406+
int stmt_line, int stmt_col,
407+
char** out_error, int* out_line, int* out_col) {
408+
const char* base_name = ns_index_base_name(idx_expr);
409+
if (out_error) *out_error = NULL;
410+
if (out_line) *out_line = 0;
411+
if (out_col) *out_col = 0;
412+
if (!base_name) {
413+
if (out_error) *out_error = strdup("Indexed assignment base must be an identifier");
414+
if (out_line) *out_line = stmt_line;
415+
if (out_col) *out_col = stmt_col;
416+
return false;
417+
}
418+
419+
NsOp* op = make_op(NS_OP_INDEX_ASSIGN, env, base_name);
420+
op->interp = interp;
421+
op->index_expr = idx_expr;
422+
op->value = value_copy(value);
423+
op->stmt_line = stmt_line;
424+
op->stmt_col = stmt_col;
425+
enqueue_op(op);
426+
wait_op(op);
427+
428+
bool r = op->result_ok;
429+
value_free(op->value);
430+
if (!r) {
431+
if (out_error) {
432+
*out_error = strdup(op->error_message ? op->error_message : "Indexed assignment failed");
433+
}
434+
if (out_line) *out_line = op->error_line ? op->error_line : stmt_line;
435+
if (out_col) *out_col = op->error_col ? op->error_col : stmt_col;
436+
}
437+
free_op(op);
438+
return r;
439+
}
440+
373441
bool ns_buffer_delete(struct Env* env, const char* name) {
374442
NsOp* op = make_op(NS_OP_DELETE, env, name);
375443
enqueue_op(op);

src/ns_buffer.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@ struct Env; // forward declaration
1111
typedef enum {
1212
NS_OP_DEFINE,
1313
NS_OP_ASSIGN,
14+
NS_OP_INDEX_ASSIGN,
1415
NS_OP_DELETE,
1516
NS_OP_ALIAS,
1617
NS_OP_FREEZE,
1718
NS_OP_THAW,
1819
NS_OP_PERMAFREEZE
1920
} NsOpType;
2021

22+
struct Interpreter;
23+
2124
// A single write operation enqueued in the central buffer.
2225
typedef struct NsOp {
2326
NsOpType op;
@@ -27,6 +30,13 @@ typedef struct NsOp {
2730
DeclType decl_type; // for DEFINE / ASSIGN / ALIAS
2831
bool declare_if_missing;// for ASSIGN / ALIAS
2932
char* target_name; // for ALIAS (owned copy)
33+
struct Interpreter* interp;
34+
Expr* index_expr;
35+
int stmt_line;
36+
int stmt_col;
37+
char* error_message;
38+
int error_line;
39+
int error_col;
3040

3141
// Result fields – filled by the prepare thread after execution
3242
bool result_ok; // true = success (for bool-returning ops)
@@ -86,6 +96,11 @@ void ns_buffer_shutdown(void);
8696
// Returns true if the buffer system is active.
8797
bool ns_buffer_active(void);
8898

99+
// Returns true when called on the prepare thread itself. The prepare
100+
// thread must bypass buffered env access to avoid recursive queueing and
101+
// self-deadlocks while it is draining the buffer.
102+
bool ns_buffer_is_prepare_thread(void);
103+
89104
// Block the calling thread until all pending writes for `name` have
90105
// been processed. Then acquire the env-access lock so the caller can
91106
// safely read. The caller MUST call ns_buffer_read_unlock() when done.
@@ -101,6 +116,10 @@ void ns_buffer_read_unlock(void);
101116
bool ns_buffer_define(struct Env* env, const char* name, DeclType type);
102117
bool ns_buffer_assign(struct Env* env, const char* name, Value value,
103118
DeclType type, bool declare_if_missing);
119+
bool ns_buffer_assign_index(struct Interpreter* interp, struct Env* env,
120+
Expr* idx_expr, Value value,
121+
int stmt_line, int stmt_col,
122+
char** out_error, int* out_line, int* out_col);
104123
bool ns_buffer_delete(struct Env* env, const char* name);
105124
bool ns_buffer_set_alias(struct Env* env, const char* name,
106125
const char* target_name, DeclType type,

tests/test2.pre

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,31 @@ ASSERT(EQ(pf_outer_hits, 0d0))
659659
DEL(pf_outer_hits)
660660
PRINT("PARFOR: PASS\n")
661661

662+
PRINT("Testing PARFOR indexed assignment serialization (regression GH-91)...")
663+
FOR(pf_rep, 0d16){
664+
TNS: pf_tns_serial = [0d0,0d0,0d0,0d0,0d0,0d0,0d0,0d0]
665+
PARFOR(pf_idx, 0d8){
666+
pf_tns_serial[pf_idx] = pf_idx
667+
}
668+
ASSERT(EQ(pf_tns_serial, [0d1,0d2,0d3,0d4,0d5,0d6,0d7,0d8]))
669+
DEL(pf_tns_serial)
670+
671+
MAP: pf_map_serial = <0d1 = 0d0, 0d2 = 0d0, 0d3 = 0d0, 0d4 = 0d0, 0d5 = 0d0, 0d6 = 0d0, 0d7 = 0d0, 0d8 = 0d0>
672+
PARFOR(pf_key, 0d8){
673+
pf_map_serial<pf_key> = pf_key
674+
}
675+
ASSERT(EQ(pf_map_serial<0d1>, 0d1))
676+
ASSERT(EQ(pf_map_serial<0d2>, 0d2))
677+
ASSERT(EQ(pf_map_serial<0d3>, 0d3))
678+
ASSERT(EQ(pf_map_serial<0d4>, 0d4))
679+
ASSERT(EQ(pf_map_serial<0d5>, 0d5))
680+
ASSERT(EQ(pf_map_serial<0d6>, 0d6))
681+
ASSERT(EQ(pf_map_serial<0d7>, 0d7))
682+
ASSERT(EQ(pf_map_serial<0d8>, 0d8))
683+
DEL(pf_map_serial)
684+
}
685+
PRINT("PARFOR indexed assignment serialization: PASS\n")
686+
662687
PRINT("Testing PARALLEL...")
663688

664689
! Simple no-op functions to exercise PARALLEL

0 commit comments

Comments
 (0)