Skip to content

Commit 7e7d2f2

Browse files
authored
fix(kqueue): call update_now in kqueue timer_next (#214)
This fixes some behavior I observed using libxev on macOS. Setting a timer fires before the expected time, apparently because `timer_next` uses the existing cached time (`self.cached_now`). If that cached time is outdated because of a long wait, the timer will fire almost immediately rather than after the desired time. I've included a test which fails on `main` that shows this behavior. This test delays loop wakeup, arms a timer from the callback, and asserts elapsed time is at least `timer_delay_ms`. The timer API describes `next_ms` as “from now”, so scheduling should use current loop time, not the potentially stale cached time. I looked through the existing issues and PRs and didn't see this referenced, so I'm not sure if this in fact expected behavior. If it is, am I misunderstanding the behavior of the timer API on Kqueue? This is solved in the `io_uring` backend with this line: `if (self.flags.now_outdated) self.update_now();` https://github.com/mitchellh/libxev/blob/78781eef639f9eb9935b067b4bbc1e69254bda3d/src/backend/io_uring.zig#L337 which updates the cached time if we're `now_outdated`. For simplicity, I opted to not add this flag and unconditionally update the cached time on Kqueue.
2 parents 2bef1ba + ddc8f58 commit 7e7d2f2

1 file changed

Lines changed: 113 additions & 1 deletion

File tree

src/backend/kqueue.zig

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ pub const Loop = struct {
659659
}
660660
}
661661

662-
fn timer_next(self: Loop, next_ms: u64) posix.timespec {
662+
fn timer_next(self: *Loop, next_ms: u64) posix.timespec {
663663
// Get the timestamp of the absolute time that we'll execute this timer.
664664
// There are lots of failure scenarios here in math. If we see any
665665
// of them we just use the maximum value.
@@ -675,6 +675,8 @@ pub const Loop = struct {
675675
(next_ms % std.time.ms_per_s) * std.time.ns_per_ms,
676676
) orelse return max;
677677

678+
self.update_now();
679+
678680
return .{
679681
.sec = std.math.add(isize, self.cached_now.sec, next_s) catch
680682
return max,
@@ -2674,6 +2676,116 @@ test "kqueue: mach port" {
26742676
try testing.expect(!called);
26752677
}
26762678

2679+
test "kqueue: timer armed from delayed callback must not fire early" {
2680+
if (builtin.os.tag != .macos) return error.SkipZigTest;
2681+
2682+
const testing = std.testing;
2683+
2684+
const send_delay_ms: u64 = 50;
2685+
const timer_delay_ms: u64 = 20;
2686+
2687+
var loop = try Loop.init(.{});
2688+
defer loop.deinit();
2689+
2690+
// Allocate the port used to wake the loop after a delayed send.
2691+
const mach_self = posix.system.mach_task_self();
2692+
var mach_port: posix.system.mach_port_name_t = undefined;
2693+
try testing.expectEqual(
2694+
darwin.KernE.SUCCESS,
2695+
darwin.getKernError(posix.system.mach_port_allocate(
2696+
mach_self,
2697+
@intFromEnum(posix.system.MACH_PORT_RIGHT.RECEIVE),
2698+
&mach_port,
2699+
)),
2700+
);
2701+
defer _ = posix.system.mach_port_deallocate(mach_self, mach_port);
2702+
2703+
const State = struct {
2704+
timer_started_ns: i128 = 0,
2705+
timer_fired_ns: i128 = 0,
2706+
timer_trigger: ?TimerTrigger = null,
2707+
timer_completion: Completion = undefined,
2708+
};
2709+
2710+
var state: State = .{};
2711+
2712+
const timer_cb: Callback = (struct {
2713+
fn callback(
2714+
ud: ?*anyopaque,
2715+
_: *Loop,
2716+
_: *Completion,
2717+
r: Result,
2718+
) CallbackAction {
2719+
const s: *State = @ptrCast(@alignCast(ud.?));
2720+
s.timer_fired_ns = std.time.nanoTimestamp();
2721+
s.timer_trigger = r.timer catch unreachable;
2722+
return .disarm;
2723+
}
2724+
}).callback;
2725+
2726+
var c_wait: Completion = .{
2727+
.op = .{
2728+
.machport = .{
2729+
.port = mach_port,
2730+
.buffer = .{ .array = undefined },
2731+
},
2732+
},
2733+
2734+
.userdata = &state,
2735+
.callback = (struct {
2736+
fn callback(
2737+
ud: ?*anyopaque,
2738+
l: *Loop,
2739+
_: *Completion,
2740+
r: Result,
2741+
) CallbackAction {
2742+
_ = r.machport catch unreachable;
2743+
const s: *State = @ptrCast(@alignCast(ud.?));
2744+
s.timer_started_ns = std.time.nanoTimestamp();
2745+
l.timer(&s.timer_completion, timer_delay_ms, s, timer_cb);
2746+
return .disarm;
2747+
}
2748+
}).callback,
2749+
};
2750+
loop.add(&c_wait);
2751+
2752+
// Send to the mach port only after the loop has been blocked for a while.
2753+
const sender = try std.Thread.spawn(.{}, (struct {
2754+
fn run(port: posix.system.mach_port_name_t) void {
2755+
std.Thread.sleep(send_delay_ms * std.time.ns_per_ms);
2756+
2757+
var msg: darwin.mach_msg_header_t = .{
2758+
.msgh_bits = @intFromEnum(posix.system.MACH_MSG_TYPE.MAKE_SEND_ONCE),
2759+
.msgh_size = @sizeOf(darwin.mach_msg_header_t),
2760+
.msgh_remote_port = port,
2761+
.msgh_local_port = darwin.MACH_PORT_NULL,
2762+
.msgh_voucher_port = undefined,
2763+
.msgh_id = undefined,
2764+
};
2765+
2766+
const rc = darwin.mach_msg(
2767+
&msg,
2768+
darwin.MACH_SEND_MSG,
2769+
msg.msgh_size,
2770+
0,
2771+
darwin.MACH_PORT_NULL,
2772+
darwin.MACH_MSG_TIMEOUT_NONE,
2773+
darwin.MACH_PORT_NULL,
2774+
);
2775+
assert(darwin.getMachMsgError(rc) == darwin.MachMsgE.SUCCESS);
2776+
}
2777+
}).run, .{mach_port});
2778+
defer sender.join();
2779+
2780+
try loop.run(.until_done);
2781+
2782+
try testing.expect(state.timer_trigger.? == .expiration);
2783+
2784+
const elapsed_ns = state.timer_fired_ns - state.timer_started_ns;
2785+
const elapsed_ms: i128 = @divFloor(elapsed_ns, std.time.ns_per_ms);
2786+
try testing.expect(elapsed_ms >= @as(i128, @intCast(timer_delay_ms)));
2787+
}
2788+
26772789
test "kqueue: socket accept/cancel cancellation should decrease active count" {
26782790
const mem = std.mem;
26792791
const net = std.net;

0 commit comments

Comments
 (0)