Skip to content

Commit 1fcfb07

Browse files
rustyconoverclaude
andcommitted
ci: make Windows green — skip POSIX-only paths and use msvcrt locks
- tests/test_shm.py: skip the whole module on win32. Shared-memory transport is POSIX-only (relies on mmap semantics that don't carry over to Windows page handling); the user does not need shm there. - tests/test_benchmarks_java.py: replace `socket.AF_UNIX` with a runtime `getattr` so mypy on Windows stops choking. The benchmark itself never runs without an explicitly built worker, so the RuntimeError on Windows is acceptable. - vgi_rpc/conformance/_types.py: factor `_probe_file_lock` so the cross-process cancel-probe counter file uses `fcntl.flock` on POSIX and `msvcrt.locking` on Windows. Same advisory semantics on both. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 37d7555 commit 1fcfb07

3 files changed

Lines changed: 60 additions & 31 deletions

File tree

tests/test_benchmarks_java.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,13 @@ def _wait_for_http(port: int, timeout: float = 10.0) -> None:
5757

5858

5959
def _wait_for_unix(path: str, timeout: float = 10.0) -> None:
60+
af_unix = getattr(socket, "AF_UNIX", None)
61+
if af_unix is None:
62+
raise RuntimeError("Unix domain sockets are not supported on this platform")
6063
deadline = time.monotonic() + timeout
6164
while time.monotonic() < deadline:
6265
try:
63-
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
66+
with socket.socket(af_unix, socket.SOCK_STREAM) as s:
6467
s.connect(path)
6568
return
6669
except (FileNotFoundError, ConnectionRefusedError):

tests/test_shm.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import gc
99
import logging
1010
import struct
11+
import sys
1112
import threading
1213
from dataclasses import dataclass
1314
from typing import Protocol
@@ -42,6 +43,8 @@
4243
)
4344
from vgi_rpc.utils import ArrowSerializableDataclass
4445

46+
pytestmark = pytest.mark.skipif(sys.platform == "win32", reason="shared-memory transport is POSIX-only")
47+
4548
# ---------------------------------------------------------------------------
4649
# Test schema & helpers
4750
# ---------------------------------------------------------------------------

vgi_rpc/conformance/_types.py

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99

1010
from __future__ import annotations
1111

12+
import contextlib
1213
import datetime as _dt
14+
import sys
15+
import time
16+
from collections.abc import Iterator
1317
from dataclasses import dataclass
1418
from decimal import Decimal
1519
from enum import Enum
16-
from typing import Annotated, Any, cast
20+
from typing import IO, Annotated, Any, cast
1721

1822
import pyarrow as pa
1923
import pyarrow.compute as pc
@@ -411,6 +415,40 @@ def exchange(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext
411415
# ---------------------------------------------------------------------------
412416

413417

418+
@contextlib.contextmanager
419+
def _probe_file_lock(f: IO[str], *, exclusive: bool) -> Iterator[None]:
420+
"""Cross-platform advisory file lock for the conformance probe file.
421+
422+
Uses ``fcntl.flock`` on POSIX and ``msvcrt.locking`` on Windows so the
423+
cross-process probe-counter file works on both platforms.
424+
"""
425+
if sys.platform == "win32":
426+
import msvcrt
427+
428+
f.seek(0)
429+
while True:
430+
try:
431+
msvcrt.locking(f.fileno(), msvcrt.LK_LOCK, 1)
432+
break
433+
except OSError:
434+
time.sleep(0.01)
435+
try:
436+
yield
437+
finally:
438+
with contextlib.suppress(OSError):
439+
f.seek(0)
440+
msvcrt.locking(f.fileno(), msvcrt.LK_UNLCK, 1)
441+
else:
442+
import fcntl
443+
444+
flag = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
445+
fcntl.flock(f.fileno(), flag)
446+
try:
447+
yield
448+
finally:
449+
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
450+
451+
414452
class _CancelProbe:
415453
"""Process-wide counters observed by cancel conformance tests.
416454
@@ -440,41 +478,31 @@ def _bump(cls, field: str) -> None:
440478
if path is None:
441479
setattr(cls, field, getattr(cls, field) + 1)
442480
return
443-
import fcntl
444481
import json
445482

446-
with open(path, "a+") as f:
447-
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
448-
try:
449-
f.seek(0)
450-
raw = f.read()
451-
data = json.loads(raw) if raw else {"produce_calls": 0, "exchange_calls": 0, "on_cancel_calls": 0}
452-
data[field] = data.get(field, 0) + 1
453-
f.seek(0)
454-
f.truncate()
455-
f.write(json.dumps(data))
456-
f.flush()
457-
finally:
458-
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
483+
with open(path, "a+") as f, _probe_file_lock(f, exclusive=True):
484+
f.seek(0)
485+
raw = f.read()
486+
data = json.loads(raw) if raw else {"produce_calls": 0, "exchange_calls": 0, "on_cancel_calls": 0}
487+
data[field] = data.get(field, 0) + 1
488+
f.seek(0)
489+
f.truncate()
490+
f.write(json.dumps(data))
491+
f.flush()
459492

460493
@classmethod
461494
def snapshot(cls) -> tuple[int, int, int]:
462495
"""Return ``(produce, exchange, on_cancel)`` from the active store."""
463496
path = cls._shared_path()
464497
if path is None:
465498
return cls.produce_calls, cls.exchange_calls, cls.on_cancel_calls
466-
import fcntl
467499
import json
468500
import os
469501

470502
if not os.path.exists(path):
471503
return 0, 0, 0
472-
with open(path) as f:
473-
fcntl.flock(f.fileno(), fcntl.LOCK_SH)
474-
try:
475-
raw = f.read()
476-
finally:
477-
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
504+
with open(path) as f, _probe_file_lock(f, exclusive=False):
505+
raw = f.read()
478506
if not raw:
479507
return 0, 0, 0
480508
data = json.loads(raw)
@@ -488,16 +516,11 @@ def reset(cls) -> None:
488516
cls.on_cancel_calls = 0
489517
path = cls._shared_path()
490518
if path is not None:
491-
import fcntl
492519
import json
493520

494-
with open(path, "w") as f:
495-
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
496-
try:
497-
f.write(json.dumps({"produce_calls": 0, "exchange_calls": 0, "on_cancel_calls": 0}))
498-
f.flush()
499-
finally:
500-
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
521+
with open(path, "w") as f, _probe_file_lock(f, exclusive=True):
522+
f.write(json.dumps({"produce_calls": 0, "exchange_calls": 0, "on_cancel_calls": 0}))
523+
f.flush()
501524

502525

503526
@dataclass

0 commit comments

Comments
 (0)