@@ -256,6 +256,10 @@ typedef struct {
256256 int * err_lines ;
257257 int * err_cols ;
258258 const char * counter_name ;
259+ ExecStatus * statuses ;
260+ int * break_counts ;
261+ int * stop_launch ;
262+ mtx_t * control_lock ;
259263} ParforStart ;
260264
261265static int parfor_merge_iteration_env (ParforStart * start , char * * merge_error ) {
@@ -306,14 +310,47 @@ static int parfor_merge_iteration_env(ParforStart* start, char** merge_error) {
306310static int parfor_worker (void * arg ) {
307311 ParforStart * start = (ParforStart * )arg ;
308312 LabelMap labels = {0 };
309- start -> interp -> current_thr = start -> thr_val .as .thr ;
310- ExecResult res = exec_stmt (start -> interp , start -> body , start -> env , & labels );
313+ int skip_iteration = 0 ;
314+ if (start -> control_lock && start -> stop_launch ) {
315+ mtx_lock (start -> control_lock );
316+ skip_iteration = * start -> stop_launch ;
317+ mtx_unlock (start -> control_lock );
318+ }
319+
320+ ExecResult res ;
321+ if (!skip_iteration ) {
322+ start -> interp -> current_thr = start -> thr_val .as .thr ;
323+ res = exec_stmt (start -> interp , start -> body , start -> env , & labels );
324+ } else {
325+ res .status = EXEC_OK ;
326+ res .value = value_null ();
327+ res .break_count = 0 ;
328+ res .jump_index = -1 ;
329+ res .error = NULL ;
330+ res .error_line = 0 ;
331+ res .error_column = 0 ;
332+ }
333+
334+ if (res .status == EXEC_BREAK && start -> control_lock && start -> stop_launch ) {
335+ mtx_lock (start -> control_lock );
336+ * start -> stop_launch = 1 ;
337+ mtx_unlock (start -> control_lock );
338+ }
311339
312340 char * merge_error = NULL ;
313341 if (res .status != EXEC_ERROR ) {
314342 (void )parfor_merge_iteration_env (start , & merge_error );
315343 }
316344
345+ ExecStatus final_status = res .status ;
346+ int final_break_count = (res .status == EXEC_BREAK ) ? res .break_count : 0 ;
347+ if (merge_error ) {
348+ final_status = EXEC_ERROR ;
349+ final_break_count = 0 ;
350+ }
351+ if (start -> statuses ) start -> statuses [start -> index ] = final_status ;
352+ if (start -> break_counts ) start -> break_counts [start -> index ] = final_break_count ;
353+
317354 for (size_t i = 0 ; i < labels .count ; i ++ ) value_free (labels .items [i ].key );
318355 free (labels .items );
319356
@@ -3029,23 +3066,57 @@ static ExecResult exec_stmt(Interpreter* interp, Stmt* stmt, Env* env, LabelMap*
30293066
30303067 // Spawn worker threads for each iteration
30313068 size_t n = (size_t )limit ;
3069+ if (n == 0 ) {
3070+ interp -> loop_depth -- ;
3071+ return make_ok (value_null ());
3072+ }
30323073 char * * errors = calloc (n , sizeof (char * ));
30333074 int * err_lines = calloc (n , sizeof (int ));
30343075 int * err_cols = calloc (n , sizeof (int ));
3035- Value * thr_vals = malloc (sizeof (Value ) * n );
3036- ParforStart * * starts = malloc (sizeof (ParforStart * ) * n );
3076+ ExecStatus * statuses = calloc (n , sizeof (ExecStatus ));
3077+ int * break_counts = calloc (n , sizeof (int ));
3078+ Value * thr_vals = calloc (n , sizeof (Value ));
3079+ int stop_launch = 0 ;
3080+ mtx_t parfor_control_lock ;
3081+ int control_lock_inited = 0 ;
3082+
3083+ if (mtx_init (& parfor_control_lock , 0 ) == thrd_success ) {
3084+ control_lock_inited = 1 ;
3085+ }
3086+
3087+ if (!errors || !err_lines || !err_cols || !statuses || !break_counts || !thr_vals || !control_lock_inited ) {
3088+ interp -> loop_depth -- ;
3089+ free (errors );
3090+ free (err_lines );
3091+ free (err_cols );
3092+ free (statuses );
3093+ free (break_counts );
3094+ free (thr_vals );
3095+ if (control_lock_inited ) mtx_destroy (& parfor_control_lock );
3096+ return make_error ("Out of memory" , stmt -> line , stmt -> column );
3097+ }
30373098
30383099 for (size_t i = 0 ; i < n ; i ++ ) {
3100+ int should_stop = 0 ;
3101+ mtx_lock (& parfor_control_lock );
3102+ should_stop = stop_launch ;
3103+ mtx_unlock (& parfor_control_lock );
3104+ if (should_stop ) {
3105+ break ;
3106+ }
3107+
30393108 if (++ iteration_count > max_iterations ) {
30403109 interp -> loop_depth -- ;
30413110 // cleanup
30423111 for (size_t j = 0 ; j < i ; j ++ ) value_free (thr_vals [j ]);
30433112 free (thr_vals );
3044- free (starts );
30453113 for (size_t j = 0 ; j < n ; j ++ ) free (errors [j ]);
30463114 free (errors );
30473115 free (err_lines );
30483116 free (err_cols );
3117+ free (statuses );
3118+ free (break_counts );
3119+ mtx_destroy (& parfor_control_lock );
30493120 return make_error ("Infinite loop detected" , stmt -> line , stmt -> column );
30503121 }
30513122
@@ -3055,7 +3126,7 @@ static ExecResult exec_stmt(Interpreter* interp, Stmt* stmt, Env* env, LabelMap*
30553126 Interpreter * thr_interp = safe_malloc (sizeof (Interpreter ));
30563127 * thr_interp = (Interpreter ){0 };
30573128 thr_interp -> global_env = interp -> global_env ;
3058- thr_interp -> loop_depth = 0 ;
3129+ thr_interp -> loop_depth = interp -> loop_depth ;
30593130 thr_interp -> error = NULL ;
30603131 thr_interp -> error_line = 0 ;
30613132 thr_interp -> error_col = 0 ;
@@ -3090,8 +3161,11 @@ static ExecResult exec_stmt(Interpreter* interp, Stmt* stmt, Env* env, LabelMap*
30903161 start -> err_cols = err_cols ;
30913162 start -> index = (int )i ;
30923163 start -> counter_name = stmt -> as .parfor_stmt .counter ;
3164+ start -> statuses = statuses ;
3165+ start -> break_counts = break_counts ;
3166+ start -> stop_launch = & stop_launch ;
3167+ start -> control_lock = & parfor_control_lock ;
30933168 start -> thr_val = value_copy (thr_vals [i ]);
3094- starts [i ] = start ;
30953169
30963170 /* record body/env on Thr so restart is possible */
30973171 thr_vals [i ].as .thr -> body = start -> body ;
@@ -3127,14 +3201,24 @@ static ExecResult exec_stmt(Interpreter* interp, Stmt* stmt, Env* env, LabelMap*
31273201 if (errors [i ]) { first_err = errors [i ]; first_err_line = err_lines [i ]; first_err_col = err_cols [i ]; break ; }
31283202 }
31293203
3204+ int first_break_count = 0 ;
3205+ for (size_t i = 0 ; i < n ; i ++ ) {
3206+ if (statuses [i ] == EXEC_BREAK ) {
3207+ first_break_count = break_counts [i ];
3208+ break ;
3209+ }
3210+ }
3211+
31303212 // Cleanup thr values
31313213 for (size_t i = 0 ; i < n ; i ++ ) value_free (thr_vals [i ]);
31323214 free (thr_vals );
3133- free (starts );
31343215 for (size_t i = 0 ; i < n ; i ++ ) if (errors [i ] && errors [i ] != first_err ) free (errors [i ]);
31353216 free (errors );
31363217 free (err_lines );
31373218 free (err_cols );
3219+ free (statuses );
3220+ free (break_counts );
3221+ mtx_destroy (& parfor_control_lock );
31383222
31393223 interp -> loop_depth -- ;
31403224
@@ -3151,6 +3235,21 @@ static ExecResult exec_stmt(Interpreter* interp, Stmt* stmt, Env* env, LabelMap*
31513235 return err ;
31523236 }
31533237
3238+ if (first_break_count > 0 ) {
3239+ if (first_break_count > 1 ) {
3240+ ExecResult res ;
3241+ res .status = EXEC_BREAK ;
3242+ res .value = value_null ();
3243+ res .break_count = first_break_count - 1 ;
3244+ res .jump_index = -1 ;
3245+ res .error = NULL ;
3246+ res .error_line = 0 ;
3247+ res .error_column = 0 ;
3248+ return res ;
3249+ }
3250+ return make_ok (value_null ());
3251+ }
3252+
31543253 return make_ok (value_null ());
31553254 }
31563255
0 commit comments