Skip to content

Commit ae9afa7

Browse files
committed
Add ready_ CAS protocol and restructure IOCP work counting
Implement a thread-safety protocol to coordinate between the I/O initiator thread and the GQCS dispatch thread: - Add ready_ flag to overlapped_op with InterlockedCompareExchange handshake between do_one() and on_pending()/on_completion() - Add on_pending() for async paths: CAS coordinates with do_one() to ensure the handler only dispatches after the initiator has returned - Add on_completion() for sync error/noop paths: packs results into the op and posts with key_result_stored for deferred dispatch - Move work_started() to the top of connect/read_some/write_some/accept unconditionally so one call covers all error and async paths - Move work_finished() after complete() in do_one() so the handler runs while outstanding_work is still positive
1 parent 7354ab6 commit ae9afa7

4 files changed

Lines changed: 116 additions & 62 deletions

File tree

include/boost/corosio/native/detail/iocp/win_acceptor_service.hpp

Lines changed: 47 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,11 @@ win_socket_internal::connect(
397397
op.h = h;
398398
op.ex = d;
399399
op.ec_out = ec;
400-
op.target_endpoint = ep; // Store target for endpoint caching
400+
op.target_endpoint = ep;
401401
op.start(token);
402402

403+
svc_.work_started();
404+
403405
sockaddr_in bind_addr{};
404406
bind_addr.sin_family = AF_INET;
405407
bind_addr.sin_addr.s_addr = INADDR_ANY;
@@ -409,25 +411,19 @@ win_socket_internal::connect(
409411
socket_, reinterpret_cast<sockaddr*>(&bind_addr),
410412
sizeof(bind_addr)) == SOCKET_ERROR)
411413
{
412-
op.dwError = ::WSAGetLastError();
413-
svc_.post(&op);
414-
// completion is always posted to scheduler queue, never inline.
414+
svc_.on_completion(&op, ::WSAGetLastError(), 0);
415415
return std::noop_coroutine();
416416
}
417417

418418
auto connect_ex = svc_.connect_ex();
419419
if (!connect_ex)
420420
{
421-
op.dwError = WSAEOPNOTSUPP;
422-
svc_.post(&op);
423-
// completion is always posted to scheduler queue, never inline.
421+
svc_.on_completion(&op, WSAEOPNOTSUPP, 0);
424422
return std::noop_coroutine();
425423
}
426424

427425
sockaddr_in addr = detail::to_sockaddr_in(ep);
428426

429-
svc_.work_started();
430-
431427
BOOL result = connect_ex(
432428
socket_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr), nullptr, 0,
433429
nullptr, &op);
@@ -437,15 +433,12 @@ win_socket_internal::connect(
437433
DWORD err = ::WSAGetLastError();
438434
if (err != ERROR_IO_PENDING)
439435
{
440-
svc_.work_finished();
441-
op.dwError = err;
442-
svc_.post(&op);
443-
// completion is always posted to scheduler queue, never inline.
436+
svc_.on_completion(&op, err, 0);
444437
return std::noop_coroutine();
445438
}
446439
}
447-
// Synchronous completion: IOCP will deliver the completion packet
448-
// completion is always posted to scheduler queue, never inline.
440+
441+
svc_.on_pending(&op);
449442
return std::noop_coroutine();
450443
}
451444

@@ -470,18 +463,18 @@ win_socket_internal::read_some(
470463
op.bytes_out = bytes_out;
471464
op.start(token);
472465

473-
// Prepare buffers (must happen before initiator runs)
466+
svc_.work_started();
467+
468+
// Prepare buffers
474469
capy::mutable_buffer bufs[read_op::max_buffers];
475470
op.wsabuf_count =
476471
static_cast<DWORD>(param.copy_to(bufs, read_op::max_buffers));
477472

478-
// Handle empty buffer: complete with 0 bytes via post for consistency
473+
// Handle empty buffer: complete with 0 bytes
479474
if (op.wsabuf_count == 0)
480475
{
481-
op.bytes_transferred = 0;
482-
op.dwError = 0;
483-
op.empty_buffer = true;
484-
svc_.post(&op);
476+
op.empty_buffer = true;
477+
svc_.on_completion(&op, 0, 0);
485478
return std::noop_coroutine();
486479
}
487480

@@ -491,11 +484,8 @@ win_socket_internal::read_some(
491484
op.wsabufs[i].len = static_cast<ULONG>(bufs[i].size());
492485
}
493486

494-
// Issue WSARecv directly — caller is already suspended
495487
op.flags = 0;
496488

497-
svc_.work_started();
498-
499489
int result = ::WSARecv(
500490
socket_, op.wsabufs, op.wsabuf_count, nullptr, &op.flags, &op,
501491
nullptr);
@@ -505,15 +495,14 @@ win_socket_internal::read_some(
505495
DWORD err = ::WSAGetLastError();
506496
if (err != WSA_IO_PENDING)
507497
{
508-
svc_.work_finished();
509-
op.dwError = err;
510-
svc_.post(&op);
498+
svc_.on_completion(&op, err, 0);
511499
return std::noop_coroutine();
512500
}
513501
}
514-
// I/O is now pending. If stop was requested before WSARecv
515-
// started, the CancelIoEx in the stop callback had nothing
516-
// to cancel. Re-check and cancel the now-pending operation.
502+
503+
svc_.on_pending(&op);
504+
505+
// Re-check cancellation after I/O is pending
517506
if (op.cancelled.load(std::memory_order_acquire))
518507
::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
519508

@@ -540,17 +529,17 @@ win_socket_internal::write_some(
540529
op.bytes_out = bytes_out;
541530
op.start(token);
542531

543-
// Prepare buffers (must happen before initiator runs)
532+
svc_.work_started();
533+
534+
// Prepare buffers
544535
capy::mutable_buffer bufs[write_op::max_buffers];
545536
op.wsabuf_count =
546537
static_cast<DWORD>(param.copy_to(bufs, write_op::max_buffers));
547538

548539
// Handle empty buffer: complete immediately with 0 bytes
549540
if (op.wsabuf_count == 0)
550541
{
551-
op.bytes_transferred = 0;
552-
op.dwError = 0;
553-
svc_.post(&op);
542+
svc_.on_completion(&op, 0, 0);
554543
return std::noop_coroutine();
555544
}
556545

@@ -560,9 +549,6 @@ win_socket_internal::write_some(
560549
op.wsabufs[i].len = static_cast<ULONG>(bufs[i].size());
561550
}
562551

563-
// Issue WSASend directly — caller is already suspended
564-
svc_.work_started();
565-
566552
int result = ::WSASend(
567553
socket_, op.wsabufs, op.wsabuf_count, nullptr, 0, &op, nullptr);
568554

@@ -571,12 +557,13 @@ win_socket_internal::write_some(
571557
DWORD err = ::WSAGetLastError();
572558
if (err != WSA_IO_PENDING)
573559
{
574-
svc_.work_finished();
575-
op.dwError = err;
576-
svc_.post(&op);
560+
svc_.on_completion(&op, err, 0);
577561
return std::noop_coroutine();
578562
}
579563
}
564+
565+
svc_.on_pending(&op);
566+
580567
// Re-check cancellation after I/O is pending
581568
if (op.cancelled.load(std::memory_order_acquire))
582569
::CancelIoEx(reinterpret_cast<HANDLE>(socket_), &op);
@@ -1005,6 +992,18 @@ win_sockets::post(overlapped_op* op)
1005992
sched_.post(op);
1006993
}
1007994

995+
inline void
996+
win_sockets::on_pending(overlapped_op* op) noexcept
997+
{
998+
sched_.on_pending(op);
999+
}
1000+
1001+
inline void
1002+
win_sockets::on_completion(overlapped_op* op, DWORD error, DWORD bytes) noexcept
1003+
{
1004+
sched_.on_completion(op, error, bytes);
1005+
}
1006+
10081007
inline void
10091008
win_sockets::work_started() noexcept
10101009
{
@@ -1179,6 +1178,8 @@ win_acceptor_internal::accept(
11791178
op.impl_out = impl_out;
11801179
op.start(token);
11811180

1181+
svc_.work_started();
1182+
11821183
// Create wrapper for the peer socket (service owns it)
11831184
auto& peer_wrapper = static_cast<win_socket&>(*svc_.construct());
11841185

@@ -1189,9 +1190,7 @@ win_acceptor_internal::accept(
11891190
if (accepted == INVALID_SOCKET)
11901191
{
11911192
svc_.destroy(&peer_wrapper);
1192-
op.dwError = ::WSAGetLastError();
1193-
svc_.post(&op);
1194-
// completion is always posted to scheduler queue, never inline.
1193+
svc_.on_completion(&op, ::WSAGetLastError(), 0);
11951194
return std::noop_coroutine();
11961195
}
11971196

@@ -1203,9 +1202,7 @@ win_acceptor_internal::accept(
12031202
DWORD err = ::GetLastError();
12041203
::closesocket(accepted);
12051204
svc_.destroy(&peer_wrapper);
1206-
op.dwError = err;
1207-
svc_.post(&op);
1208-
// completion is always posted to scheduler queue, never inline.
1205+
svc_.on_completion(&op, err, 0);
12091206
return std::noop_coroutine();
12101207
}
12111208

@@ -1221,14 +1218,11 @@ win_acceptor_internal::accept(
12211218
svc_.destroy(&peer_wrapper);
12221219
op.peer_wrapper = nullptr;
12231220
op.accepted_socket = INVALID_SOCKET;
1224-
op.dwError = WSAEOPNOTSUPP;
1225-
svc_.post(&op);
1226-
// completion is always posted to scheduler queue, never inline.
1221+
svc_.on_completion(&op, WSAEOPNOTSUPP, 0);
12271222
return std::noop_coroutine();
12281223
}
12291224

12301225
DWORD bytes_received = 0;
1231-
svc_.work_started();
12321226

12331227
BOOL ok = accept_ex(
12341228
socket_, accepted, op.addr_buf, 0, sizeof(sockaddr_in) + 16,
@@ -1239,19 +1233,16 @@ win_acceptor_internal::accept(
12391233
DWORD err = ::WSAGetLastError();
12401234
if (err != ERROR_IO_PENDING)
12411235
{
1242-
svc_.work_finished();
12431236
::closesocket(accepted);
12441237
svc_.destroy(&peer_wrapper);
12451238
op.peer_wrapper = nullptr;
12461239
op.accepted_socket = INVALID_SOCKET;
1247-
op.dwError = err;
1248-
svc_.post(&op);
1249-
// completion is always posted to scheduler queue, never inline.
1240+
svc_.on_completion(&op, err, 0);
12501241
return std::noop_coroutine();
12511242
}
12521243
}
1253-
// Synchronous completion: IOCP will deliver the completion packet
1254-
// completion is always posted to scheduler queue, never inline.
1244+
1245+
svc_.on_pending(&op);
12551246
return std::noop_coroutine();
12561247
}
12571248

include/boost/corosio/native/detail/iocp/win_overlapped_op.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ struct overlapped_op
6060
/** Function pointer type for cancellation hook. */
6161
using cancel_func_type = void (*)(overlapped_op*) noexcept;
6262

63+
long ready_ = 0;
6364
std::coroutine_handle<> h;
6465
capy::executor_ref ex;
6566
std::error_code* ec_out = nullptr;
@@ -89,6 +90,7 @@ struct overlapped_op
8990
void reset() noexcept
9091
{
9192
reset_overlapped();
93+
ready_ = 0;
9294
dwError = 0;
9395
bytes_transferred = 0;
9496
empty_buffer = false;

include/boost/corosio/native/detail/iocp/win_scheduler.hpp

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ class BOOST_COROSIO_DECL win_scheduler final
8080
void work_started() noexcept override;
8181
void work_finished() noexcept override;
8282

83+
/** Signal that an overlapped I/O operation is now pending.
84+
Coordinates with do_one() via the ready_ CAS protocol. */
85+
void on_pending(overlapped_op* op) const;
86+
87+
/** Post an immediate completion with pre-stored results.
88+
Used for sync errors and noop paths. */
89+
void on_completion(overlapped_op* op, DWORD error, DWORD bytes) const;
90+
8391
// Timer service integration
8492
void set_timer_service(timer_service* svc);
8593
void update_timeout();
@@ -312,6 +320,42 @@ win_scheduler::work_finished() noexcept
312320
stop();
313321
}
314322

323+
inline void
324+
win_scheduler::on_pending(overlapped_op* op) const
325+
{
326+
// CAS: try to set ready_ from 0 to 1.
327+
// If the old value was 1, GQCS already grabbed this op and stored
328+
// results — we need to re-post so do_one() can dispatch it.
329+
if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
330+
{
331+
if (!::PostQueuedCompletionStatus(
332+
iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
333+
{
334+
std::lock_guard<win_mutex> lock(dispatch_mutex_);
335+
completed_ops_.push(op);
336+
::InterlockedExchange(&dispatch_required_, 1);
337+
}
338+
}
339+
}
340+
341+
inline void
342+
win_scheduler::on_completion(
343+
overlapped_op* op, DWORD error, DWORD bytes) const
344+
{
345+
// Sync completion: pack results into op and post for dispatch.
346+
op->ready_ = 1;
347+
op->dwError = error;
348+
op->bytes_transferred = bytes;
349+
350+
if (!::PostQueuedCompletionStatus(
351+
iocp_, 0, key_result_stored, static_cast<LPOVERLAPPED>(op)))
352+
{
353+
std::lock_guard<win_mutex> lock(dispatch_mutex_);
354+
completed_ops_.push(op);
355+
::InterlockedExchange(&dispatch_required_, 1);
356+
}
357+
}
358+
315359
inline void
316360
win_scheduler::stop()
317361
{
@@ -493,28 +537,39 @@ win_scheduler::do_one(unsigned long timeout_ms)
493537
case key_io:
494538
case key_result_stored:
495539
{
496-
// Actual I/O completion: overlapped is OVERLAPPED* (first base of overlapped_op)
497540
auto* ov_op = overlapped_to_op(overlapped);
498541

499-
// If key_result_stored, results are pre-stored in overlapped fields
542+
// If key_result_stored, results are pre-stored in op fields
500543
if (key == key_result_stored)
501544
{
502545
bytes = ov_op->bytes_transferred;
503546
err = ov_op->dwError;
504547
}
505548

549+
// Store GQCS results so on_pending() re-post has valid data
506550
ov_op->store_result(bytes, err);
507-
work_finished();
508-
ov_op->complete(this, bytes, err);
509-
return 1;
551+
552+
// CAS: try to set ready_ from 0 to 1.
553+
// If old value was 1, the initiator already returned
554+
// (on_pending/on_completion set it) — safe to dispatch.
555+
// If old value was 0, the initiator hasn't returned yet —
556+
// skip dispatch; on_pending() will re-post.
557+
if (::InterlockedCompareExchange(
558+
&ov_op->ready_, 1, 0) == 1)
559+
{
560+
ov_op->complete(this, bytes, err);
561+
work_finished();
562+
return 1;
563+
}
564+
continue;
510565
}
511566

512567
case key_posted:
513568
{
514569
// Posted scheduler_op*: overlapped is actually a scheduler_op*
515570
auto* op = reinterpret_cast<scheduler_op*>(overlapped);
516-
work_finished();
517571
op->complete(this, bytes, err);
572+
work_finished();
518573
return 1;
519574
}
520575

include/boost/corosio/native/detail/iocp/win_sockets.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ class BOOST_COROSIO_DECL win_sockets final
129129
/** Post an overlapped operation for completion. */
130130
void post(overlapped_op* op);
131131

132+
/** Signal that an overlapped I/O is now pending (CAS protocol). */
133+
void on_pending(overlapped_op* op) noexcept;
134+
135+
/** Post an immediate completion with pre-stored results. */
136+
void on_completion(overlapped_op* op, DWORD error, DWORD bytes) noexcept;
137+
132138
/** Notify scheduler of pending I/O work. */
133139
void work_started() noexcept;
134140

0 commit comments

Comments
 (0)