Skip to content

Commit 7e91f0c

Browse files
authored
Merge pull request #471 from Rewbert/epoll-foreign-io
Epoll foreign io
2 parents 9604052 + 58436cc commit 7e91f0c

15 files changed

Lines changed: 406 additions & 8 deletions

File tree

lib/Control/Concurrent.hs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,12 @@ threadStatus thr = do
119119
st <- primThreadStatus thr
120120
return $
121121
case st of
122-
0 -> ThreadRunning -- ts_runnable
123-
1 -> ThreadBlocked BlockedOnMVar -- ts_wait_mvar
124-
2 -> ThreadBlocked BlockedOnOther -- ts_wait_time
125-
3 -> ThreadFinished -- ts_finished
126-
4 -> ThreadDied -- ts_died
122+
0 -> ThreadRunning -- ts_runnable
123+
1 -> ThreadBlocked BlockedOnMVar -- ts_wait_mvar
124+
2 -> ThreadBlocked BlockedOnOther -- ts_wait_time
125+
3 -> ThreadFinished -- ts_finished
126+
4 -> ThreadDied -- ts_died
127+
5 -> ThreadBlocked BlockedOnForeignCall -- ts_wait_io
127128

128129
-------------------------------------------------------
129130
-- Just for GHC compatibility.

lib/Primitives.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,12 @@ primTryPutMVar = _primitive "IO.tryputmvar"
389389
primTryReadMVar :: MVar a -> IO b {-(Maybe a)-}
390390
primTryReadMVar = _primitive "IO.tryreadmvar"
391391

392+
primWaitWriteFD :: Int -> IO Int
393+
primWaitWriteFD = _primitive "IO.waitwrfd"
394+
395+
primWaitReadFD :: Int -> IO Int
396+
primWaitReadFD = _primitive "IO.waitrdfd"
397+
392398
primThreadDelay :: Int -> IO ()
393399
primThreadDelay = _primitive "IO.threaddelay"
394400

lib/System/IO/FD.hs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
module System.IO.FD (waitForReadFD, waitForWriteFD) where
2+
3+
import Primitives
4+
5+
waitForReadFD :: Int -> IO Int
6+
waitForReadFD = primWaitReadFD
7+
8+
waitForWriteFD :: Int -> IO Int
9+
waitForWriteFD = primWaitWriteFD

lib/base.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ library base
190190
System.Environment
191191
System.Exit
192192
System.IO
193+
System.IO.FD
193194
System.IO.Error
194195
System.IO.MD5
195196
System.IO.PrintOrRun

src/MicroHs/Translate.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,8 @@ primTable = [
272272
("bs2fp", _primitive "bs2fp"),
273273
("IO.fork", _primitive "IO.fork"),
274274
("IO.thid", _primitive "IO.thid"),
275+
("IO.waitrdfd", _primitive "IO.waitrdfd"),
276+
("IO.waitwrfd", _primitive "IO.waitwrfd"),
275277
("thnum", _primitive "thnum"),
276278
("IO.throwto", _primitive "IO.throwto"),
277279
("IO.yield", _primitive "IO.yield"),

src/runtime/eval.c

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
#if WANT_SIGINT
4343
#include <signal.h>
4444
#endif
45+
#if MHS_IO_POLL
46+
#include "io_poll.h"
47+
#endif
4548

4649
extern char **environ; /* should probably be behind some WANT_ */
4750

@@ -125,6 +128,7 @@ int num_ffi;
125128
#define THREAD_DEBUG 0
126129
#endif
127130

131+
128132
#define VERSION "v8.3\n"
129133

130134
#define PRIvalue PRIdPTR
@@ -541,6 +545,7 @@ enum node_tag { T_FREE, T_IND, T_AP, T_INT, T_INT64, T_DBL, T_FLT32, T_PTR, T_FU
541545
T_WKNEWFIN, T_WKNEW, T_WKDEREF, T_WKFINAL,
542546
T_IO_PP, /* for debugging */
543547
T_IO_STDIN, T_IO_STDOUT, T_IO_STDERR,
548+
T_IO_WAITRDFD, T_IO_WAITWRFD,
544549
T_LAST_TAG,
545550
};
546551

@@ -898,7 +903,14 @@ dump_tick_table(FILE *f)
898903

899904
enum th_sched { mt_main, mt_resched, mt_raise };
900905
/* The two enums below are known by the Haskell code. Do not change order */
901-
enum th_state { ts_runnable, ts_wait_mvar, ts_wait_time, ts_finished, ts_died };
906+
enum th_state {
907+
ts_runnable,
908+
ts_wait_mvar,
909+
ts_wait_time,
910+
ts_finished,
911+
ts_died,
912+
ts_wait_io, /* not visible to Haskell; must stay after ts_died */
913+
};
902914
enum mask_state { mask_unmasked, mask_interruptible, mask_uninterruptible };
903915

904916
/***************** HANDLER *****************/
@@ -911,6 +923,9 @@ struct handler {
911923

912924
/***************** THREAD ******************/
913925

926+
#define IO_POLL_WAITING_FOR_NONE (-1)
927+
#define IO_POLL_EVENT_HAS_HAPPENED (-2)
928+
914929
struct mthread {
915930
enum th_state mt_state; /* thread state */
916931
enum mask_state mt_mask; /* making state. */
@@ -923,6 +938,10 @@ struct mthread {
923938
NODEPTR mt_mval; /* filled after waiting for take/read */
924939
bool mt_mark; /* marked as accessible */
925940
uvalue_t mt_id; /* thread number, thread 1 is the main thread */
941+
#if MHS_IO_POLL
942+
int mt_fd; /* The file descriptor that we are waiting on (will be either IO_POLL_WAITING_FOR_NONE or IO_POLL_EVENT_HAS_HAPPENED) */
943+
int mt_events; /* IO_POLL_READ or IO_POLL_WRITE */
944+
#endif
926945
#if defined(CLOCK_INIT)
927946
CLOCK_T mt_at; /* time to wake up when in threadDelay */
928947
#endif
@@ -1114,6 +1133,19 @@ add_runq_tail(struct mthread *mt)
11141133
add_q_tail(&runq, mt);
11151134
}
11161135

1136+
#if MHS_IO_POLL
1137+
/* this is the callback that is sent to the io_poll framework.
1138+
* It is invoked when an event a thread is waiting for becomes ready.
1139+
*/
1140+
static void
1141+
io_thread_ready(void *ptr)
1142+
{
1143+
struct mthread *mt = (struct mthread *)ptr;
1144+
mt->mt_fd = IO_POLL_EVENT_HAS_HAPPENED;
1145+
add_runq_tail(mt);
1146+
}
1147+
#endif
1148+
11171149
struct mthread*
11181150
remove_q_head(struct mqueue *q)
11191151
{
@@ -1301,7 +1333,16 @@ yield(void)
13011333
check_timeq();
13021334
check_thrown(false);
13031335
check_sigint();
1304-
// printf("yield %p %d\n", runq, (int)stack_ptr);
1336+
1337+
#if MHS_IO_POLL
1338+
if (io_waiter_count() > 0) {
1339+
/* Check if any threads blocked on IO can be scheduled. Since we pass in a delay of 0, checking
1340+
for the events should not block. */
1341+
io_poll(0, io_thread_ready);
1342+
}
1343+
#endif
1344+
1345+
// printf("yield %p %d\n", runq, (int)stack_ptr);
13051346
/* if there is nothing after in the runq then there is no need to reschedule */
13061347
if (!runq.mq_head->mt_queue) {
13071348
#if THREAD_DEBUG
@@ -1346,6 +1387,10 @@ new_thread(NODEPTR root)
13461387
mt->mt_mark = false;
13471388
mt->mt_num_slices = 0;
13481389
mt->mt_id = num_thread_create++;
1390+
#if MHS_IO_POLL
1391+
mt->mt_fd = IO_POLL_WAITING_FOR_NONE;
1392+
mt->mt_events = -1;
1393+
#endif
13491394
#if defined(CLOCK_INIT)
13501395
mt->mt_at = 0; /* delay has not expired */
13511396
#endif
@@ -1598,6 +1643,43 @@ thread_delay(uvalue_t usecs)
15981643
void
15991644
pause_exec(void)
16001645
{
1646+
/* End up here if the run queue is empty. If there is no thread waiting for
1647+
* a delay to expire, we will never resume operation and we are deadlocked. However, if
1648+
* we compile with MHS_IO_POLL there might be threads waiting for IO events, so in
1649+
* that case we check for them as well. If there is no thread waiting for a delay or an
1650+
* IO event, we are deadlocked.
1651+
*/
1652+
#if MHS_IO_POLL
1653+
1654+
/* Check for deadlock situation */
1655+
if (io_waiter_count() == 0
1656+
#if defined(CLOCK_INIT)
1657+
&& !timeq.mq_head
1658+
#endif
1659+
) ERR("deadlock");
1660+
1661+
/* Loop until at least one thread is runnable.*/
1662+
while (!runq.mq_head) {
1663+
int timeout_ms = -1; /* block indefinitely if only io_waiters */
1664+
#if defined(CLOCK_INIT)
1665+
/* If there are threads blocked on delays, recompute the timeout_ms to account
1666+
for that. */
1667+
if (timeq.mq_head) {
1668+
CLOCK_T delta = timeq.mq_head->mt_at - CLOCK_GET();
1669+
/* +999 emulates a ceiling function, adding at most 0.999 ms. io_poll wants milliseconds,
1670+
not microseconds as delta represents. When delta is < 1000, without +999, we'll truncate
1671+
to zero and enter a loop at full CPU speed. */
1672+
timeout_ms = (delta > 0) ? (int)((delta + 999) / 1000) : 0;
1673+
}
1674+
#endif
1675+
io_poll(timeout_ms, io_thread_ready);
1676+
#if defined(CLOCK_INIT)
1677+
check_timeq();
1678+
#endif
1679+
}
1680+
1681+
#else /* !MHS_IO_POLL */
1682+
16011683
#if defined(CLOCK_INIT)
16021684
if (timeq.mq_head) {
16031685
struct mthread *mt;
@@ -1635,6 +1717,7 @@ pause_exec(void)
16351717
#else /* CLOCK_INIT */
16361718
ERR("no clock");
16371719
#endif /* CLOCK_INIT */
1720+
#endif /* MHS_IO_POLL */
16381721
}
16391722

16401723
/* Interrupt a sleeping thread in a throwTo/threadDelay */
@@ -1822,6 +1905,10 @@ new_ap(NODEPTR f, NODEPTR a)
18221905
return n;
18231906
}
18241907

1908+
#if MHS_IO_POLL
1909+
#include "io_poll_impl.c"
1910+
#endif
1911+
18251912
NODEPTR evali(NODEPTR n);
18261913

18271914
/* If this is non-0 it means that the threading system is active. */
@@ -1836,6 +1923,10 @@ start_exec(NODEPTR root)
18361923
mt->mt_id = MAIN_THREAD; /* make it the main thread in case this is foreign export calling */
18371924
main_thread = mt;
18381925

1926+
#if MHS_IO_POLL
1927+
io_init();
1928+
#endif
1929+
18391930
switch(setjmp(sched)) {
18401931
case mt_main:
18411932
break;
@@ -2172,6 +2263,8 @@ struct {
21722263
{ "binbs1", T_BINBS1 },
21732264
{ "unint1", T_UNINT1 },
21742265
{ "undbl1", T_UNDBL1 },
2266+
{ "IO.waitrdfd", T_IO_WAITRDFD},
2267+
{ "IO.waitwrfd", T_IO_WAITWRFD},
21752268
#if WANT_INT64
21762269
{ "I+", T_ADD64, T_ADD64 },
21772270
{ "I-", T_SUB64, T_SUBR64 },
@@ -5995,6 +6088,42 @@ evali(NODEPTR an)
59956088
POP(2);
59966089
GOPAIR(mkInt(mt->mt_state));
59976090
}
6091+
case T_IO_WAITRDFD:
6092+
case T_IO_WAITWRFD: {
6093+
#if MHS_IO_POLL
6094+
CHKARG2NP; /* x = the filedescriptor, y = RealWorld; no pop yet */
6095+
6096+
/* io_thread_ready sets mt_fd=IO_POLL_EVENT_HAS_HAPPENED when waking the thread.
6097+
If we did not do this check we would just register again.
6098+
6099+
This seems to be how T_IO_THREADDELAY works, with mt_at == -1.
6100+
*/
6101+
if (runq.mq_head->mt_fd == IO_POLL_EVENT_HAS_HAPPENED) {
6102+
runq.mq_head->mt_fd = IO_POLL_WAITING_FOR_NONE;
6103+
POP(2);
6104+
GOPAIR(mkInt(1));
6105+
}
6106+
6107+
POP(2);
6108+
int fd = evalint(x);
6109+
int events = (tag == T_IO_WAITRDFD) ? IO_POLL_READ : IO_POLL_WRITE;
6110+
6111+
/* Set up the waiting thread's state, preparing it to leave the run queue
6112+
until an event is ready for it */
6113+
struct mthread *mt = remove_q_head(&runq);
6114+
mt->mt_fd = fd;
6115+
mt->mt_events = events;
6116+
mt->mt_state = ts_wait_io;
6117+
6118+
io_register(fd, events, mt);
6119+
6120+
resched(mt, ts_wait_io);
6121+
#else
6122+
CHKARG2NP;
6123+
POP(2);
6124+
GOPAIR(mkInt(-1));
6125+
#endif
6126+
}
59986127
case T_IO_GETMASKINGSTATE:
59996128
CHKARG1; /* x = ST */
60006129
GOPAIR(mkInt(runq.mq_head->mt_mask));

src/runtime/io_poll.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/* Copyright 2026 Robert Krook
2+
* See LICENSE file for full license.
3+
*/
4+
5+
/*
6+
* OS-agnostic interface for non-blocking IO polling.
7+
* Platform-specific implementations live in <platform>/io_poll_impl.c and
8+
* are included into eval.c via #include "io_poll_impl.c".
9+
*
10+
*/
11+
12+
#define IO_POLL_READ 1
13+
#define IO_POLL_WRITE 2
14+
15+
void io_init(void);
16+
void io_poll(int timeout_ms, void (*on_ready)(void *cookie)); /* cookie is a struct mthread*, passed in from eval.c */
17+
void io_register(int fd, int events, void *cookie);
18+
int io_waiter_count(void);

src/runtime/unix/config.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@
8484
*/
8585
#define WANT_OVERFLOW 1
8686

87+
/*
88+
* Enable non-blocking IO polling.
89+
* Linux uses epoll
90+
* The backend is selected in unix/io_poll_impl.c.
91+
*/
92+
#if defined(__linux__)
93+
#define MHS_IO_POLL 1
94+
#else
95+
#define MHS_IO_POLL 0
96+
#endif
97+
8798
/*
8899
* Use CPU counters.
89100
* Only available on:

0 commit comments

Comments
 (0)