Skip to content

Commit 6b390d8

Browse files
committed
Change the wait for IO on a descriptor to be more like the timeq.
This also switches from using epoll() to using poll(). This considerably more portable since it is POSIX. Using poll() still will not quite work on Windows, since it can only poll() on sockets.
1 parent d6917af commit 6b390d8

4 files changed

Lines changed: 92 additions & 609 deletions

File tree

src/runtime/eval.c

Lines changed: 92 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
#include <signal.h>
4444
#endif
4545
#if WANT_IO_POLL
46-
#include "io_poll.h"
46+
#include <poll.h>
4747
#endif
4848

4949
extern char **environ; /* should probably be behind some WANT_ */
@@ -923,9 +923,6 @@ struct handler {
923923

924924
/***************** THREAD ******************/
925925

926-
#define IO_POLL_WAITING_FOR_NONE (-1)
927-
#define IO_POLL_EVENT_HAS_HAPPENED (-2)
928-
929926
struct mthread {
930927
enum th_state mt_state; /* thread state */
931928
enum mask_state mt_mask; /* making state. */
@@ -939,10 +936,12 @@ struct mthread {
939936
bool mt_mark; /* marked as accessible */
940937
uvalue_t mt_id; /* thread number, thread 1 is the main thread */
941938
#if WANT_IO_POLL
942-
int mt_fd; /* The file descriptor that we are waiting on
943-
(will be either IO_POLL_WAITING_FOR_NONE or IO_POLL_EVENT_HAS_HAPPENED) */
944-
int mt_events; /* IO_POLL_READ or IO_POLL_WRITE */
945-
#endif
939+
int mt_fd; /* The file descriptor that we are waiting on,
940+
* IO_POLL_WAITING_FOR_NONE, or IO_POLL_EVENT_HAS_HAPPENED */
941+
#define IO_POLL_WAITING_FOR_NONE (-1)
942+
#define IO_POLL_EVENT_HAS_HAPPENED (-2)
943+
int mt_events; /* POLLIN or POLLOUT */
944+
#endif /* WANT_IO_POLL */
946945
#if defined(CLOCK_INIT)
947946
CLOCK_T mt_at; /* time to wake up when in threadDelay */
948947
#endif
@@ -953,8 +952,9 @@ struct mqueue {
953952
struct mthread *mq_head;
954953
struct mthread *mq_tail;
955954
};
956-
struct mqueue runq = { 0, 0 };;
957-
struct mqueue timeq = { 0, 0 };
955+
struct mqueue runq = { 0, 0 }; /* runnable threads */
956+
struct mqueue timeq = { 0, 0 }; /* waiting for a timer to expire, sorted in time order */
957+
struct mqueue pollq = { 0, 0 }; /* waiting for I/O on a file descriptor */
958958

959959
struct mvar {
960960
struct mvar *mv_next; /* all mvars linked together */
@@ -975,7 +975,7 @@ NODEPTR the_exn; /* Used to propagate the exception for longjmp(s
975975

976976
/****** StablePtr ******/
977977

978-
size_t sp_capacity = 4; /* size of stable pointer table */
978+
size_t sp_capacity = 4; /* initial size of stable pointer table */
979979
NODEPTR *sp_table; /* stable pointer table */
980980

981981
static void
@@ -1134,19 +1134,6 @@ add_runq_tail(struct mthread *mt)
11341134
add_q_tail(&runq, mt);
11351135
}
11361136

1137-
#if WANT_IO_POLL
1138-
/* This is the callback that is sent to the io_poll framework.
1139-
* It is invoked when an event a thread is waiting for becomes ready.
1140-
*/
1141-
static void
1142-
io_thread_ready(void *ptr)
1143-
{
1144-
struct mthread *mt = (struct mthread *)ptr;
1145-
mt->mt_fd = IO_POLL_EVENT_HAS_HAPPENED;
1146-
add_runq_tail(mt);
1147-
}
1148-
#endif
1149-
11501137
struct mthread*
11511138
remove_q_head(struct mqueue *q)
11521139
{
@@ -1257,6 +1244,52 @@ check_timeq(void)
12571244
#endif
12581245
}
12591246

1247+
void
1248+
check_pollq(int timeout)
1249+
{
1250+
#if WANT_IO_POLL
1251+
#define MAX_POLL_FDS 100
1252+
struct pollfd fds[MAX_POLL_FDS];
1253+
int nfds = 0;
1254+
for(struct mthread *mt = pollq.mq_head; mt; mt = mt->mt_queue) {
1255+
if (nfds >= MAX_POLL_FDS)
1256+
ERR("check_pollq: too many FDs");
1257+
fds[nfds].fd = mt->mt_fd;
1258+
fds[nfds].events = mt->mt_events;
1259+
nfds++;
1260+
}
1261+
#if THREAD_DEBUG
1262+
if (thread_trace)
1263+
printf("check_pollq: enter poll(_, %d, %d)\n", nfds, timeout);
1264+
#endif /* THREAD_DEBUG */
1265+
int r = poll(fds, nfds, timeout);
1266+
if (r < 0)
1267+
return; /* silently ignore errors */
1268+
nfds = 0;
1269+
struct mthread *next;
1270+
for(struct mthread *mt = pollq.mq_head; mt; mt = next) {
1271+
next = mt->mt_queue;
1272+
if (fds[nfds].revents & (mt->mt_events | POLLHUP)) {
1273+
/* Some event has happened, move the thread back to the runq. */
1274+
find_and_unlink(&pollq, mt); /* remove from I/O queue */
1275+
add_runq_tail(mt);
1276+
#if THREAD_DEBUG
1277+
if (thread_trace)
1278+
printf("check_pollq: FD=%d thread=%d done\n", mt->mt_fd, (int)mt->mt_id);
1279+
#endif /* THREAD_DEBUG */
1280+
mt->mt_fd = IO_POLL_EVENT_HAS_HAPPENED;
1281+
}
1282+
nfds++;
1283+
}
1284+
#if THREAD_DEBUG
1285+
if (thread_trace) {
1286+
printf("check_pollq: exit\n");
1287+
dump_q("runq", runq);
1288+
}
1289+
#endif /* THREAD_DEBUG */
1290+
#endif /* WANT_IO_POLL */
1291+
}
1292+
12601293
void
12611294
throwto(struct mthread *mt, NODEPTR exn)
12621295
{
@@ -1335,13 +1368,11 @@ yield(void)
13351368
check_thrown(false);
13361369
check_sigint();
13371370

1338-
#if WANT_IO_POLL
1339-
if (io_waiter_count() > 0) {
1371+
if (pollq.mq_head) {
13401372
/* Check if any threads blocked on IO can be scheduled. Since we pass in a delay of 0, checking
13411373
* for the events will not block. */
1342-
io_poll(0, io_thread_ready);
1374+
check_pollq(0);
13431375
}
1344-
#endif
13451376

13461377
// printf("yield %p %d\n", runq, (int)stack_ptr);
13471378
/* if there is nothing after in the runq then there is no need to reschedule */
@@ -1390,7 +1421,7 @@ new_thread(NODEPTR root)
13901421
mt->mt_id = num_thread_create++;
13911422
#if WANT_IO_POLL
13921423
mt->mt_fd = IO_POLL_WAITING_FOR_NONE;
1393-
mt->mt_events = -1;
1424+
mt->mt_events = 0;
13941425
#endif
13951426
#if defined(CLOCK_INIT)
13961427
mt->mt_at = 0; /* delay has not expired */
@@ -1652,9 +1683,8 @@ pause_exec(void)
16521683
* IO event, we are deadlocked.
16531684
*/
16541685
#if WANT_IO_POLL
1655-
16561686
/* Check for deadlock situation */
1657-
if (io_waiter_count() == 0
1687+
if (!pollq.mq_head
16581688
#if defined(CLOCK_INIT)
16591689
&& !timeq.mq_head
16601690
#endif
@@ -1664,20 +1694,20 @@ pause_exec(void)
16641694
while (!runq.mq_head) {
16651695
int timeout_ms = -1; /* block indefinitely if only io_waiters */
16661696
#if defined(CLOCK_INIT)
1667-
/* If there are threads blocked on delays, recompute the timeout_ms to account
1668-
* for that. */
1697+
/* If there are threads blocked on delays, compute the timeout_ms to account for that. */
16691698
if (timeq.mq_head) {
1670-
CLOCK_T delta = timeq.mq_head->mt_at - CLOCK_GET();
1671-
/* +999 emulates a ceiling function, adding at most 0.999 ms. io_poll wants milliseconds,
1672-
* not microseconds as delta represents. When delta is < 1000, without +999, we'll truncate
1673-
* to zero and enter a loop at full CPU speed. */
1674-
timeout_ms = (delta > 0) ? (int)((delta + 999) / 1000) : 0;
1699+
CLOCK_T dly = timeq.mq_head->mt_at - CLOCK_GET();
1700+
if (dly > 0) {
1701+
/* poll() can be unreliable, so sleep shorter than the delay */
1702+
dly /= 1100; /* 1.1=sleep shorter, 1000=convert us to ms */
1703+
timeout_ms = dly == 0 ? 1 : dly; /* sleep at least 1ms to avoid busy wait */
1704+
} else {
1705+
timeout_ms = 0; /* delay has already expired */
1706+
}
16751707
}
1676-
#endif
1677-
io_poll(timeout_ms, io_thread_ready);
1678-
#if defined(CLOCK_INIT)
16791708
check_timeq();
1680-
#endif
1709+
#endif /* defined(CLOCK_INIT) */
1710+
check_pollq(timeout_ms);
16811711
}
16821712

16831713
#else /* !WANT_IO_POLL */
@@ -1719,7 +1749,7 @@ pause_exec(void)
17191749
#else /* CLOCK_INIT */
17201750
ERR("no clock");
17211751
#endif /* CLOCK_INIT */
1722-
#endif /* WANT_IO_POLL */
1752+
#endif /* !WANT_IO_POLL */
17231753
}
17241754

17251755
/* Interrupt a sleeping thread in a throwTo/threadDelay */
@@ -1907,10 +1937,6 @@ new_ap(NODEPTR f, NODEPTR a)
19071937
return n;
19081938
}
19091939

1910-
#if WANT_IO_POLL
1911-
#include "io_poll_impl.c"
1912-
#endif
1913-
19141940
NODEPTR evali(NODEPTR n);
19151941

19161942
/* If this is non-0 it means that the threading system is active. */
@@ -1925,10 +1951,6 @@ start_exec(NODEPTR root)
19251951
mt->mt_id = MAIN_THREAD; /* make it the main thread in case this is foreign export calling */
19261952
main_thread = mt;
19271953

1928-
#if WANT_IO_POLL
1929-
io_init();
1930-
#endif
1931-
19321954
switch(setjmp(sched)) {
19331955
case mt_main:
19341956
break;
@@ -6093,37 +6115,43 @@ evali(NODEPTR an)
60936115
case T_IO_WAITRDFD:
60946116
case T_IO_WAITWRFD: {
60956117
#if WANT_IO_POLL
6096-
CHKARG2NP; /* x = the filedescriptor, y = RealWorld; no pop yet */
6118+
CHKARG2NP; /* x = filedescriptor, y = RealWorld; no pop yet */
60976119

6098-
/* io_thread_ready sets mt_fd=IO_POLL_EVENT_HAS_HAPPENED when waking the thread.
6099-
* If we did not do this check we would just register again.
6120+
/*
6121+
* When the thread wakes up again it will re-execute that last op.
6122+
* check_pollq() sets mt_fd=IO_POLL_EVENT_HAS_HAPPENED when waking the thread.
6123+
* If we did not do this check we would just sleep again.
61006124
*/
61016125
if (runq.mq_head->mt_fd == IO_POLL_EVENT_HAS_HAPPENED) {
61026126
runq.mq_head->mt_fd = IO_POLL_WAITING_FOR_NONE;
61036127
POP(2);
61046128
GOPAIR(mkInt(1));
61056129
}
61066130

6107-
POP(2);
6131+
check_thrown(true); /* check if we have a thrown exception */
6132+
61086133
int fd = evalint(x);
6109-
int events = (tag == T_IO_WAITRDFD) ? IO_POLL_READ : IO_POLL_WRITE;
6134+
int events = tag == T_IO_WAITRDFD ? POLLIN : POLLOUT;
61106135

61116136
/* Set up the waiting thread's state, preparing it to leave the run queue
61126137
* until an event is ready for it.
61136138
*/
61146139
struct mthread *mt = remove_q_head(&runq);
61156140
mt->mt_fd = fd;
61166141
mt->mt_events = events;
6117-
mt->mt_state = ts_wait_io;
6118-
6119-
io_register(fd, events, mt);
6142+
add_q_tail(&pollq, mt); /* put it on the q of I/O waiters */
6143+
#if THREAD_DEBUG
6144+
if (thread_trace) {
6145+
printf("T_IO_WAITxxFD: wait for FD=%d, events=%x, thread=%d\n", fd, events, (int)mt->mt_id);
6146+
}
6147+
#endif /* THREAD_DEBUG */
61206148

6121-
resched(mt, ts_wait_io);
6122-
#else
6123-
CHKARG2NP;
61246149
POP(2);
6125-
GOPAIR(mkInt(-1));
6126-
#endif
6150+
resched(mt, ts_wait_io); /* set the thread state and reschedule */
6151+
#else /* WANT_IO_POLL */
6152+
CHKARG2;
6153+
GOPAIR(mkInt(-1)); /* cannot poll */
6154+
#endif /* WANT_IO_POLL */
61276155
}
61286156
case T_IO_GETMASKINGSTATE:
61296157
CHKARG1; /* x = ST */

src/runtime/io_poll.h

Lines changed: 0 additions & 17 deletions
This file was deleted.

0 commit comments

Comments
 (0)