Skip to content

Commit 9ee2243

Browse files
authored
Fiber scheduler: invoke #io_write hook on IO flush (ruby#15609)
Previously, calling IO#flush or closing an IO with unflushed buffered writes would just invoke `#blocking_operation_wait` and flush the write buffer using a `write` syscall. This change adds flushing through the fiber scheduler by invoking the `#io_write` hook. * Prefer IO::Buffer#write in IOScheduler * Use Dir.tmpdir for test file * Correctly handle errors in io_flush_buffer_fiber_scheduler
1 parent dd2f7d6 commit 9ee2243

3 files changed

Lines changed: 136 additions & 0 deletions

File tree

io.c

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,10 +1418,34 @@ io_flush_buffer_sync(void *arg)
14181418
return (VALUE)-1;
14191419
}
14201420

1421+
static inline VALUE
1422+
io_flush_buffer_fiber_scheduler(VALUE scheduler, rb_io_t *fptr)
1423+
{
1424+
VALUE ret = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, fptr->wbuf.ptr+fptr->wbuf.off, fptr->wbuf.len, 0);
1425+
if (!UNDEF_P(ret)) {
1426+
ssize_t result = rb_fiber_scheduler_io_result_apply(ret);
1427+
if (result > 0) {
1428+
fptr->wbuf.off += result;
1429+
fptr->wbuf.len -= result;
1430+
}
1431+
return result >= 0 ? (VALUE)0 : (VALUE)-1;
1432+
}
1433+
return ret;
1434+
}
1435+
14211436
static VALUE
14221437
io_flush_buffer_async(VALUE arg)
14231438
{
14241439
rb_io_t *fptr = (rb_io_t *)arg;
1440+
1441+
VALUE scheduler = rb_fiber_scheduler_current();
1442+
if (scheduler != Qnil) {
1443+
VALUE result = io_flush_buffer_fiber_scheduler(scheduler, fptr);
1444+
if (!UNDEF_P(result)) {
1445+
return result;
1446+
}
1447+
}
1448+
14251449
return rb_io_blocking_region_wait(fptr, io_flush_buffer_sync, fptr, RUBY_IO_WRITABLE);
14261450
}
14271451

test/fiber/scheduler.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,29 @@ def blocking(&block)
488488
end
489489
end
490490

491+
class IOScheduler < Scheduler
492+
def __io_ops__
493+
@__io_ops__ ||= []
494+
end
495+
496+
def io_write(io, buffer, length, offset)
497+
fd = io.fileno
498+
str = buffer.get_string
499+
__io_ops__ << [:io_write, fd, str]
500+
Fiber.blocking { buffer.write(IO.for_fd(fd), 0, offset) }
501+
end
502+
end
503+
504+
class IOErrorScheduler < Scheduler
505+
def io_read(io, buffer, length, offset)
506+
return -Errno::EBADF::Errno
507+
end
508+
509+
def io_write(io, buffer, length, offset)
510+
return -Errno::EINVAL::Errno
511+
end
512+
end
513+
491514
# This scheduler has a broken implementation of `unblock`` in the sense that it
492515
# raises an exception. This is used to test the behavior of the scheduler when
493516
# unblock raises an exception.

test/fiber/test_scheduler.rb

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# frozen_string_literal: true
22
require 'test/unit'
3+
require 'securerandom'
4+
require 'fileutils'
35
require_relative 'scheduler'
46

57
class TestFiberScheduler < Test::Unit::TestCase
@@ -283,4 +285,91 @@ def test_post_fork_fiber_blocking
283285
ensure
284286
thread.kill rescue nil
285287
end
288+
289+
def test_io_write_on_flush
290+
fn = File.join(Dir.tmpdir, "ruby_test_io_write_on_flush_#{SecureRandom.hex}")
291+
write_fd = nil
292+
io_ops = nil
293+
thread = Thread.new do
294+
scheduler = IOScheduler.new
295+
Fiber.set_scheduler scheduler
296+
297+
Fiber.schedule do
298+
File.open(fn, 'w+') do |f|
299+
write_fd = f.fileno
300+
f << 'foo'
301+
f.flush
302+
f << 'bar'
303+
end
304+
end
305+
io_ops = scheduler.__io_ops__
306+
end
307+
thread.join
308+
assert_equal [
309+
[:io_write, write_fd, 'foo'],
310+
[:io_write, write_fd, 'bar']
311+
], io_ops
312+
313+
assert_equal 'foobar', IO.read(fn)
314+
ensure
315+
thread.kill rescue nil
316+
FileUtils.rm_f(fn)
317+
end
318+
319+
def test_io_read_error
320+
fn = File.join(Dir.tmpdir, "ruby_test_io_read_error_#{SecureRandom.hex}")
321+
exception = nil
322+
thread = Thread.new do
323+
scheduler = IOErrorScheduler.new
324+
Fiber.set_scheduler scheduler
325+
Fiber.schedule do
326+
File.open(fn, 'w+') { it.read }
327+
rescue => e
328+
exception = e
329+
end
330+
end
331+
thread.join
332+
assert_kind_of Errno::EBADF, exception
333+
ensure
334+
thread.kill rescue nil
335+
FileUtils.rm_f(fn)
336+
end
337+
338+
def test_io_write_error
339+
fn = File.join(Dir.tmpdir, "ruby_test_io_write_error_#{SecureRandom.hex}")
340+
exception = nil
341+
thread = Thread.new do
342+
scheduler = IOErrorScheduler.new
343+
Fiber.set_scheduler scheduler
344+
Fiber.schedule do
345+
File.open(fn, 'w+') { it.sync = true; it << 'foo' }
346+
rescue => e
347+
exception = e
348+
end
349+
end
350+
thread.join
351+
assert_kind_of Errno::EINVAL, exception
352+
ensure
353+
thread.kill rescue nil
354+
FileUtils.rm_f(fn)
355+
end
356+
357+
def test_io_write_flush_error
358+
fn = File.join(Dir.tmpdir, "ruby_test_io_write_flush_error_#{SecureRandom.hex}")
359+
exception = nil
360+
thread = Thread.new do
361+
scheduler = IOErrorScheduler.new
362+
Fiber.set_scheduler scheduler
363+
Fiber.schedule do
364+
File.open(fn, 'w+') { it << 'foo' }
365+
rescue => e
366+
exception = e
367+
end
368+
end
369+
thread.join
370+
assert_kind_of Errno::EINVAL, exception
371+
ensure
372+
thread.kill rescue nil
373+
FileUtils.rm_f(fn)
374+
end
286375
end

0 commit comments

Comments
 (0)