Skip to content

Commit 850db87

Browse files
authored
Merge pull request #122 from timlnx/add-windows-support
Add Windows device capacity support via DeviceIoControl, closes #52
2 parents 8c5565f + f272a2a commit 850db87

4 files changed

Lines changed: 108 additions & 15 deletions

File tree

.github/workflows/python.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
strategy:
1212
matrix:
1313
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
14-
os: ["macos-latest", "ubuntu-latest"]
14+
os: ["macos-latest", "ubuntu-latest", "windows-latest"]
1515
runs-on: ${{ matrix.os }}
1616
steps:
1717
- name: "GitHub Checks it out :sunglasses-face:"
@@ -24,6 +24,7 @@ jobs:
2424
cache: 'pip'
2525

2626
- name: Verify Test Case Names Unique
27+
if: runner.os != 'Windows'
2728
run: |
2829
./tests/test_unique_testcase_names.sh
2930
@@ -34,8 +35,8 @@ jobs:
3435
3536
- name: Pre-Tests code smell validation
3637
run: |
37-
pycodestyle -v --ignore=E501,E722 bitmath/__init__.py tests/*.py
38-
flake8 --select=F bitmath/__init__.py tests/*.py
38+
pycodestyle -v --ignore=E501,E722 bitmath/__init__.py tests/
39+
flake8 --select=F bitmath/__init__.py tests/
3940
4041
- name: Run Unit Tests
4142
run: |

bitmath/__init__.py

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,19 @@
5858
from collections.abc import Generator, Iterable, Iterator
5959
from typing import IO, Any
6060

61-
# For device capacity reading in query_device_capacity(). Only supported
62-
# on posix systems for now. Will be addressed in issue #52 on GitHub.
61+
# For device capacity reading in query_device_capacity().
6362
if os.name == 'posix':
6463
import stat
6564
import fcntl
6665
import struct
66+
elif os.name == 'nt':
67+
import ctypes
68+
import ctypes.wintypes
69+
import msvcrt
6770

71+
#: Platforms where :func:`query_device_capacity` is supported.
72+
#: Corresponds to possible values of :data:`os.name`.
73+
SUPPORTED_PLATFORMS = frozenset({'posix', 'nt'})
6874

6975
__all__ = ['Bit', 'Byte', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB',
7076
'kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB', 'Kib',
@@ -1285,6 +1291,56 @@ def best_prefix(bytes: Bitmath | int | float, system: int = NIST) -> Bitmath:
12851291
return Byte(value).best_prefix(system=system)
12861292

12871293

1294+
def _query_device_capacity_windows(device_fd: IO[Any]) -> int:
1295+
"""Return device capacity in bytes on Windows via DeviceIoControl.
1296+
1297+
Windows physical disk paths look like ``\\\\.\\PhysicalDrive0``.
1298+
Raises :class:`ValueError` if the file descriptor is not a physical device.
1299+
Raises :class:`OSError` if the DeviceIoControl call fails.
1300+
"""
1301+
if not device_fd.name.startswith('\\\\.\\'):
1302+
raise ValueError("The file descriptor provided is not of a device type")
1303+
1304+
IOCTL_DISK_GET_DRIVE_GEOMETRY_EX = 0x000700A0
1305+
1306+
class DISK_GEOMETRY(ctypes.Structure):
1307+
_fields_ = [
1308+
('Cylinders', ctypes.c_longlong),
1309+
('MediaType', ctypes.c_uint),
1310+
('TracksPerCylinder', ctypes.c_ulong),
1311+
('SectorsPerTrack', ctypes.c_ulong),
1312+
('BytesPerSector', ctypes.c_ulong),
1313+
]
1314+
1315+
class DISK_GEOMETRY_EX(ctypes.Structure):
1316+
_fields_ = [
1317+
('Geometry', DISK_GEOMETRY),
1318+
('DiskSize', ctypes.c_longlong),
1319+
('Data', ctypes.c_byte * 1),
1320+
]
1321+
1322+
geometry = DISK_GEOMETRY_EX()
1323+
bytes_returned = ctypes.wintypes.DWORD(0)
1324+
handle = msvcrt.get_osfhandle(device_fd.fileno())
1325+
1326+
result = ctypes.windll.kernel32.DeviceIoControl(
1327+
handle,
1328+
IOCTL_DISK_GET_DRIVE_GEOMETRY_EX,
1329+
None,
1330+
0,
1331+
ctypes.byref(geometry),
1332+
ctypes.sizeof(geometry),
1333+
ctypes.byref(bytes_returned),
1334+
None,
1335+
)
1336+
1337+
if not result:
1338+
error_code = ctypes.windll.kernel32.GetLastError()
1339+
raise OSError(f"DeviceIoControl failed with error code: {error_code}")
1340+
1341+
return geometry.DiskSize
1342+
1343+
12881344
def query_device_capacity(device_fd: IO[Any]) -> Byte:
12891345
"""Create bitmath instances of the capacity of a system block device
12901346
@@ -1300,13 +1356,16 @@ def query_device_capacity(device_fd: IO[Any]) -> Byte:
13001356
* http://stackoverflow.com/a/9764508/263969
13011357
13021358
:param file device_fd: A ``file`` object of the device to query the
1303-
capacity of (as in ``get_device_capacity(open("/dev/sda"))``).
1359+
capacity of. On Linux/macOS: ``open("/dev/sda", "rb")``. On Windows:
1360+
``open(r'\\\\.\\PhysicalDrive0', 'rb')`` (requires administrator privileges).
13041361
13051362
:return: a bitmath :class:`bitmath.Byte` instance equivalent to the
13061363
capacity of the target device in bytes.
13071364
"""
1308-
if os.name != 'posix':
1365+
if os.name not in SUPPORTED_PLATFORMS:
13091366
raise NotImplementedError(f"'bitmath.query_device_capacity' is not supported on this platform: {os.name}")
1367+
if os.name == 'nt':
1368+
return Byte(_query_device_capacity_windows(device_fd))
13101369

13111370
s = os.stat(device_fd.name).st_mode
13121371
if not stat.S_ISBLK(s):

tests/test_file_size.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from . import TestCase
3333
import bitmath
3434
import os
35+
import pathlib
3536

3637

3738
class TestFileSize(TestCase):
@@ -113,8 +114,8 @@ def test_listdir_nosymlinks(self):
113114

114115
# Ensure the returned paths match the expected paths
115116
discovered_paths = [
116-
contents[0][0],
117-
contents[1][0],
117+
pathlib.Path(contents[0][0]).as_posix(),
118+
pathlib.Path(contents[1][0]).as_posix(),
118119
]
119120
expected_paths = [
120121
'tests/listdir_nosymlinks/depth1/depth2/10_byte_file',
@@ -183,8 +184,8 @@ def test_listdir_symlinks_follow(self):
183184
'tests/listdir_symlinks/depth1/depth2/10_byte_file'
184185
]
185186
discovered_paths = [
186-
contents[0][0],
187-
contents[1][0]
187+
pathlib.Path(contents[0][0]).as_posix(),
188+
pathlib.Path(contents[1][0]).as_posix()
188189
]
189190
self.assertListEqualUnordered(discovered_paths, expected_paths)
190191

@@ -243,7 +244,7 @@ def test_listdir_filtering_nosymlinks(self):
243244
filter='1024*'))
244245

245246
# Ensure the returned path matches the expected path
246-
self.assertEqual(contents[0][0],
247+
self.assertEqual(pathlib.Path(contents[0][0]).as_posix(),
247248
'tests/listdir_nosymlinks/depth1/depth2/1024_byte_file')
248249

249250
# Ensure the measured size is what we expect

tests/test_query_device_capacity.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131

3232
from . import TestCase
3333
import bitmath
34-
from unittest import mock
34+
import os
35+
from unittest import mock, skipUnless
3536
import struct
3637
from contextlib import ExitStack, contextmanager
3738

@@ -50,8 +51,16 @@ def nested(*contexts):
5051
non_device_file = mock.MagicMock('file')
5152
non_device_file.name = "/home/"
5253

54+
windows_device = mock.MagicMock('file')
55+
windows_device.fileno = mock.Mock(return_value=4)
56+
windows_device.name = r'\\.\PhysicalDrive0'
57+
58+
windows_non_device = mock.MagicMock('file')
59+
windows_non_device.name = r'C:\somefile.txt'
60+
5361

5462
class TestQueryDeviceCapacity(TestCase):
63+
@skipUnless(os.name == 'posix', 'fcntl is POSIX only')
5564
def test_query_device_capacity_linux_everything_is_wonderful(self):
5665
"""query device capacity works on a happy Linux host"""
5766
with nested(
@@ -71,6 +80,7 @@ def test_query_device_capacity_linux_everything_is_wonderful(self):
7180
self.assertEqual(ioctl.call_count, 1)
7281
ioctl.assert_called_once_with(4, 0x80081272, struct.calcsize('L'))
7382

83+
@skipUnless(os.name == 'posix', 'fcntl is POSIX only')
7484
def test_query_device_capacity_mac_everything_is_wonderful(self):
7585
"""query device capacity works on a happy Mac OS X host"""
7686
with nested(
@@ -100,6 +110,7 @@ def side_effect(*args, **kwargs):
100110
self.assertEqual(bytes, 1000000000000)
101111
self.assertEqual(ioctl.call_count, 2)
102112

113+
@skipUnless(os.name == 'posix', 'fcntl is POSIX only')
103114
def test_query_device_capacity_device_not_block(self):
104115
"""query device capacity aborts if a non-block-device is provided"""
105116
with nested(
@@ -116,8 +127,29 @@ def test_query_device_capacity_device_not_block(self):
116127

117128
self.assertEqual(ioctl.call_count, 0)
118129

119-
def test_query_device_capacity_non_posix_system_fails(self):
120-
"""query device capacity fails on a non-posix host"""
130+
def test_query_device_capacity_windows_everything_is_wonderful(self):
131+
"""query device capacity works on a happy Windows host"""
132+
expected_bytes = 1_000_000_000_000 # 1 TB
133+
134+
with mock.patch('bitmath._query_device_capacity_windows', return_value=expected_bytes):
135+
with mock.patch('bitmath.os.name', 'nt'):
136+
result = bitmath.query_device_capacity(windows_device)
137+
138+
self.assertEqual(result, bitmath.Byte(expected_bytes))
139+
140+
def test_query_device_capacity_windows_non_device_fails(self):
141+
"""query device capacity rejects a non-device path on Windows"""
121142
with mock.patch('bitmath.os.name', 'nt'):
143+
with self.assertRaises(ValueError):
144+
bitmath.query_device_capacity(windows_non_device)
145+
146+
def test_query_device_capacity_unsupported_platform_fails(self):
147+
"""query device capacity fails on an unsupported platform"""
148+
# Derive a value that is guaranteed not to be in SUPPORTED_PLATFORMS.
149+
unsupported = next(
150+
p for p in ('os2', 'java', 'riscos', 'ce')
151+
if p not in bitmath.SUPPORTED_PLATFORMS
152+
)
153+
with mock.patch('bitmath.os.name', unsupported):
122154
with self.assertRaises(NotImplementedError):
123155
bitmath.query_device_capacity(device)

0 commit comments

Comments
 (0)