|
2 | 2 | import typing as t |
3 | 3 |
|
4 | 4 |
|
| 5 | +def _make_pairs(order: t.Sequence[int], size: int): |
| 6 | + if not all(i < size for i in order) or not all(i >= 0 for i in order): |
| 7 | + raise ValueError( |
| 8 | + f"some indices in the reordering are out-of-bounds" |
| 9 | + ) |
| 10 | + |
| 11 | + order_set = frozenset(order) |
| 12 | + |
| 13 | + if len(order_set) != len(order): |
| 14 | + raise ValueError( |
| 15 | + f"duplicate indices in reordering" |
| 16 | + ) |
| 17 | + |
| 18 | + return zip( |
| 19 | + range(size), |
| 20 | + (*order, *(i for i in range(size) if i not in order_set)) |
| 21 | + ) |
| 22 | + |
| 23 | + |
| 24 | +def reorder_bits(data: t.Sequence[bool], order: t.Sequence[int]) -> t.Tuple[bool, ...]: |
| 25 | + if not order: |
| 26 | + return tuple(data) |
| 27 | + |
| 28 | + pairs = _make_pairs(order, len(data)) |
| 29 | + |
| 30 | + return tuple(data[i] for _, i in pairs) |
| 31 | + |
| 32 | + |
| 33 | +def unreorder_bits(data: t.Sequence[bool], order: t.Sequence[int]) -> t.Tuple[bool, ...]: |
| 34 | + if not order: |
| 35 | + return tuple(data) |
| 36 | + |
| 37 | + pairs = sorted(_make_pairs(order, len(data)), key=lambda x: x[1]) |
| 38 | + |
| 39 | + return tuple(data[i] for i, _ in pairs) |
| 40 | + |
| 41 | + |
5 | 42 | def bytes_to_bits(data: t.ByteString) -> t.Tuple[bool, ...]: |
6 | 43 | return tuple( |
7 | 44 | bit for byte in data for bit in uint_to_bits(byte, 8) |
@@ -63,6 +100,9 @@ def as_bits(self) -> t.Tuple[bool, ...]: |
63 | 100 | def as_bytes(self) -> bytes: |
64 | 101 | return bits_to_bytes(self._bits) |
65 | 102 |
|
| 103 | + def unreorder(self, order: t.Sequence[int]) -> BitstreamWriter: |
| 104 | + return BitstreamWriter(unreorder_bits(self._bits, order)) |
| 105 | + |
66 | 106 |
|
67 | 107 | class BitstreamReader: |
68 | 108 | _bits: t.Tuple[bool, ...] |
@@ -118,6 +158,9 @@ def as_bits(self) -> t.Tuple[bool, ...]: |
118 | 158 | def as_bytes(self) -> bytes: |
119 | 159 | return self.take_bytes(self.bytes_remaining())[0] |
120 | 160 |
|
| 161 | + def reorder(self, order: t.Sequence[int]) -> BitstreamReader: |
| 162 | + return BitstreamReader(reorder_bits(self._bits, order)) |
| 163 | + |
121 | 164 |
|
122 | 165 | class AttrProxy(t.Mapping[str, t.Any]): |
123 | 166 | _data: t.Dict[str, t.Any] |
|
0 commit comments