4242#if WANT_SIGINT
4343#include <signal.h>
4444#endif
45+ #if MHS_IO_POLL
46+ #include "io_poll.h"
47+ #endif
4548
4649extern char * * environ ; /* should probably be behind some WANT_ */
4750
@@ -125,6 +128,7 @@ int num_ffi;
125128#define THREAD_DEBUG 0
126129#endif
127130
131+
128132#define VERSION "v8.3\n"
129133
130134#define PRIvalue PRIdPTR
@@ -541,6 +545,7 @@ enum node_tag { T_FREE, T_IND, T_AP, T_INT, T_INT64, T_DBL, T_FLT32, T_PTR, T_FU
541545 T_WKNEWFIN , T_WKNEW , T_WKDEREF , T_WKFINAL ,
542546 T_IO_PP , /* for debugging */
543547 T_IO_STDIN , T_IO_STDOUT , T_IO_STDERR ,
548+ T_IO_WAITRDFD , T_IO_WAITWRFD ,
544549 T_LAST_TAG ,
545550};
546551
@@ -898,7 +903,14 @@ dump_tick_table(FILE *f)
898903
899904enum th_sched { mt_main , mt_resched , mt_raise };
900905/* The two enums below are known by the Haskell code. Do not change order */
901- enum th_state { ts_runnable , ts_wait_mvar , ts_wait_time , ts_finished , ts_died };
906+ enum th_state {
907+ ts_runnable ,
908+ ts_wait_mvar ,
909+ ts_wait_time ,
910+ ts_finished ,
911+ ts_died ,
912+ ts_wait_io , /* not visible to Haskell; must stay after ts_died */
913+ };
902914enum mask_state { mask_unmasked , mask_interruptible , mask_uninterruptible };
903915
904916/***************** HANDLER *****************/
@@ -911,6 +923,9 @@ struct handler {
911923
912924/***************** THREAD ******************/
913925
926+ #define IO_POLL_WAITING_FOR_NONE (-1)
927+ #define IO_POLL_EVENT_HAS_HAPPENED (-2)
928+
914929struct mthread {
915930 enum th_state mt_state ; /* thread state */
916931 enum mask_state mt_mask ; /* making state. */
@@ -923,6 +938,10 @@ struct mthread {
923938 NODEPTR mt_mval ; /* filled after waiting for take/read */
924939 bool mt_mark ; /* marked as accessible */
925940 uvalue_t mt_id ; /* thread number, thread 1 is the main thread */
941+ #if MHS_IO_POLL
942+ int mt_fd ; /* The file descriptor that we are waiting on (will be either IO_POLL_WAITING_FOR_NONE or IO_POLL_EVENT_HAS_HAPPENED) */
943+ int mt_events ; /* IO_POLL_READ or IO_POLL_WRITE */
944+ #endif
926945#if defined(CLOCK_INIT )
927946 CLOCK_T mt_at ; /* time to wake up when in threadDelay */
928947#endif
@@ -1114,6 +1133,19 @@ add_runq_tail(struct mthread *mt)
11141133 add_q_tail (& runq , mt );
11151134}
11161135
1136+ #if MHS_IO_POLL
1137+ /* this is the callback that is sent to the io_poll framework.
1138+ * It is invoked when an event a thread is waiting for becomes ready.
1139+ */
1140+ static void
1141+ io_thread_ready (void * ptr )
1142+ {
1143+ struct mthread * mt = (struct mthread * )ptr ;
1144+ mt -> mt_fd = IO_POLL_EVENT_HAS_HAPPENED ;
1145+ add_runq_tail (mt );
1146+ }
1147+ #endif
1148+
11171149struct mthread *
11181150remove_q_head (struct mqueue * q )
11191151{
@@ -1301,7 +1333,16 @@ yield(void)
13011333 check_timeq ();
13021334 check_thrown (false);
13031335 check_sigint ();
1304- // printf("yield %p %d\n", runq, (int)stack_ptr);
1336+
1337+ #if MHS_IO_POLL
1338+ if (io_waiter_count () > 0 ) {
1339+ /* Check if any threads blocked on IO can be scheduled. Since we pass in a delay of 0, checking
1340+ for the events should not block. */
1341+ io_poll (0 , io_thread_ready );
1342+ }
1343+ #endif
1344+
1345+ // printf("yield %p %d\n", runq, (int)stack_ptr);
13051346 /* if there is nothing after in the runq then there is no need to reschedule */
13061347 if (!runq .mq_head -> mt_queue ) {
13071348#if THREAD_DEBUG
@@ -1346,6 +1387,10 @@ new_thread(NODEPTR root)
13461387 mt -> mt_mark = false;
13471388 mt -> mt_num_slices = 0 ;
13481389 mt -> mt_id = num_thread_create ++ ;
1390+ #if MHS_IO_POLL
1391+ mt -> mt_fd = IO_POLL_WAITING_FOR_NONE ;
1392+ mt -> mt_events = -1 ;
1393+ #endif
13491394#if defined(CLOCK_INIT )
13501395 mt -> mt_at = 0 ; /* delay has not expired */
13511396#endif
@@ -1598,6 +1643,43 @@ thread_delay(uvalue_t usecs)
15981643void
15991644pause_exec (void )
16001645{
1646+ /* End up here if the run queue is empty. If there is no thread waiting for
1647+ * a delay to expire, we will never resume operation and we are deadlocked. However, if
1648+ * we compile with MHS_IO_POLL there might be threads waiting for IO events, so in
1649+ * that case we check for them as well. If there is no thread waiting for a delay or an
1650+ * IO event, we are deadlocked.
1651+ */
1652+ #if MHS_IO_POLL
1653+
1654+ /* Check for deadlock situation */
1655+ if (io_waiter_count () == 0
1656+ #if defined(CLOCK_INIT )
1657+ & & !timeq .mq_head
1658+ #endif
1659+ ) ERR ("deadlock" );
1660+
1661+ /* Loop until at least one thread is runnable.*/
1662+ while (!runq .mq_head ) {
1663+ int timeout_ms = -1 ; /* block indefinitely if only io_waiters */
1664+ #if defined(CLOCK_INIT )
1665+ /* If there are threads blocked on delays, recompute the timeout_ms to account
1666+ for that. */
1667+ if (timeq .mq_head ) {
1668+ CLOCK_T delta = timeq .mq_head -> mt_at - CLOCK_GET ();
1669+ /* +999 emulates a ceiling function, adding at most 0.999 ms. io_poll wants milliseconds,
1670+ not microseconds as delta represents. When delta is < 1000, without +999, we'll truncate
1671+ to zero and enter a loop at full CPU speed. */
1672+ timeout_ms = (delta > 0 ) ? (int )((delta + 999 ) / 1000 ) : 0 ;
1673+ }
1674+ #endif
1675+ io_poll (timeout_ms , io_thread_ready );
1676+ #if defined(CLOCK_INIT )
1677+ check_timeq ();
1678+ #endif
1679+ }
1680+
1681+ #else /* !MHS_IO_POLL */
1682+
16011683#if defined(CLOCK_INIT )
16021684 if (timeq .mq_head ) {
16031685 struct mthread * mt ;
@@ -1635,6 +1717,7 @@ pause_exec(void)
16351717#else /* CLOCK_INIT */
16361718 ERR ("no clock" );
16371719#endif /* CLOCK_INIT */
1720+ #endif /* MHS_IO_POLL */
16381721}
16391722
16401723/* Interrupt a sleeping thread in a throwTo/threadDelay */
@@ -1822,6 +1905,10 @@ new_ap(NODEPTR f, NODEPTR a)
18221905 return n ;
18231906}
18241907
1908+ #if MHS_IO_POLL
1909+ #include "io_poll_impl.c"
1910+ #endif
1911+
18251912NODEPTR evali (NODEPTR n );
18261913
18271914/* If this is non-0 it means that the threading system is active. */
@@ -1836,6 +1923,10 @@ start_exec(NODEPTR root)
18361923 mt -> mt_id = MAIN_THREAD ; /* make it the main thread in case this is foreign export calling */
18371924 main_thread = mt ;
18381925
1926+ #if MHS_IO_POLL
1927+ io_init ();
1928+ #endif
1929+
18391930 switch (setjmp (sched )) {
18401931 case mt_main :
18411932 break ;
@@ -2172,6 +2263,8 @@ struct {
21722263 { "binbs1" , T_BINBS1 },
21732264 { "unint1" , T_UNINT1 },
21742265 { "undbl1" , T_UNDBL1 },
2266+ { "IO.waitrdfd" , T_IO_WAITRDFD },
2267+ { "IO.waitwrfd" , T_IO_WAITWRFD },
21752268#if WANT_INT64
21762269 { "I+" , T_ADD64 , T_ADD64 },
21772270 { "I-" , T_SUB64 , T_SUBR64 },
@@ -5995,6 +6088,42 @@ evali(NODEPTR an)
59956088 POP (2 );
59966089 GOPAIR (mkInt (mt -> mt_state ));
59976090 }
6091+ case T_IO_WAITRDFD :
6092+ case T_IO_WAITWRFD : {
6093+ #if MHS_IO_POLL
6094+ CHKARG2NP ; /* x = the filedescriptor, y = RealWorld; no pop yet */
6095+
6096+ /* io_thread_ready sets mt_fd=IO_POLL_EVENT_HAS_HAPPENED when waking the thread.
6097+ If we did not do this check we would just register again.
6098+
6099+ This seems to be how T_IO_THREADDELAY works, with mt_at == -1.
6100+ */
6101+ if (runq .mq_head -> mt_fd == IO_POLL_EVENT_HAS_HAPPENED ) {
6102+ runq .mq_head -> mt_fd = IO_POLL_WAITING_FOR_NONE ;
6103+ POP (2 );
6104+ GOPAIR (mkInt (1 ));
6105+ }
6106+
6107+ POP (2 );
6108+ int fd = evalint (x );
6109+ int events = (tag == T_IO_WAITRDFD ) ? IO_POLL_READ : IO_POLL_WRITE ;
6110+
6111+ /* Set up the waiting thread's state, preparing it to leave the run queue
6112+ until an event is ready for it */
6113+ struct mthread * mt = remove_q_head (& runq );
6114+ mt -> mt_fd = fd ;
6115+ mt -> mt_events = events ;
6116+ mt -> mt_state = ts_wait_io ;
6117+
6118+ io_register (fd , events , mt );
6119+
6120+ resched (mt , ts_wait_io );
6121+ #else
6122+ CHKARG2NP ;
6123+ POP (2 );
6124+ GOPAIR (mkInt (-1 ));
6125+ #endif
6126+ }
59986127 case T_IO_GETMASKINGSTATE :
59996128 CHKARG1 ; /* x = ST */
60006129 GOPAIR (mkInt (runq .mq_head -> mt_mask ));
0 commit comments