-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathheight_interval.py
More file actions
269 lines (213 loc) · 9.11 KB
/
height_interval.py
File metadata and controls
269 lines (213 loc) · 9.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
Right-open block-height intervals [earliest, latest) and a sorted,
non-overlapping IntervalQueue with gap-aware claiming.
Ported 1:1 from github.com/alexesDev/cosmos-extractor3/internal/model/interval.go
"""
from __future__ import annotations
import bisect
from dataclasses import dataclass
from typing import List, Optional
@dataclass
class Interval:
earliest: int
latest: int # right-open: value v in interval iff earliest <= v < latest
def __repr__(self) -> str:
return f"[{self.earliest}, {self.latest}) S:{self.size}"
def __eq__(self, other: object) -> bool:
return isinstance(other, Interval) and self.earliest == other.earliest and self.latest == other.latest
def __hash__(self) -> int:
return hash((self.earliest, self.latest))
@property
def size(self) -> int:
return self.latest - self.earliest
@property
def valid(self) -> bool:
return self.size > 0
def chunks(self, size: int) -> List[Interval]:
result = []
i = self.earliest
while i < self.latest:
result.append(Interval(i, min(i + size, self.latest)))
i += size
return result
def clamp(self, bound: Interval) -> None:
self.earliest = max(self.earliest, bound.earliest)
self.latest = min(self.latest, bound.latest)
def left_limit(self, size: int) -> int:
"""Shrink interval from the left so it is at most `size` values. Returns actual size used."""
curr = self.size
if curr > size:
self.earliest = self.latest - size
return size
return curr
def contains(self, other: Interval) -> bool:
return self.earliest <= other.earliest and self.latest >= other.latest
def overlaped(self, other: Interval) -> bool:
return self.earliest < other.latest and other.earliest < self.latest
def to_dict(self) -> dict:
return {'earliest': self.earliest, 'latest': self.latest}
def to_list(self) -> list:
return [self.earliest, self.latest]
@classmethod
def from_dict(cls, d: dict) -> Interval:
return cls(d['earliest'], d['latest'])
@classmethod
def from_list(cls, data: list) -> Interval:
return cls(data[0], data[1])
class IntervalQueue:
"""
Sorted, non-overlapping list of Interval objects.
Invariant: items sorted by .earliest; no two items overlap or touch.
push() merges touching intervals automatically.
"""
def __init__(self, items: Optional[List[Interval]] = None):
self._items: List[Interval] = list(items) if items else []
def __repr__(self) -> str:
parts = [repr(it) for it in self._items]
return f"IQ[{', '.join(parts)}] total={self.total_size}"
def __eq__(self, other: object) -> bool:
if isinstance(other, IntervalQueue):
return self._items == other._items
if isinstance(other, list):
return self._items == other
return False
def __len__(self) -> int:
return len(self._items)
def __getitem__(self, idx: int) -> Interval:
return self._items[idx]
def copy(self) -> IntervalQueue:
return IntervalQueue([Interval(it.earliest, it.latest) for it in self._items])
def sort_by_latest(self) -> None:
self._items.sort(key=lambda it: it.latest)
@property
def total_size(self) -> int:
return sum(it.size for it in self._items)
def push(self, it: Interval) -> bool:
"""
Insert interval. Returns False for invalid or overlapping intervals.
Merges touching intervals: [1,2) + [2,3) → [1,3).
"""
if not it.valid:
return False
items = self._items
idx = bisect.bisect_right(items, it.earliest, key=lambda x: x.earliest)
if idx < len(items) and items[idx].overlaped(it):
return False
if idx > 0 and items[idx - 1].overlaped(it):
return False
touch_left = idx > 0 and items[idx - 1].latest == it.earliest
touch_right = idx < len(items) and items[idx].earliest == it.latest
if touch_left and touch_right:
items[idx - 1].latest = items[idx].latest
del items[idx]
return True
if touch_left:
items[idx - 1].latest = it.latest
return True
if touch_right:
items[idx].earliest = it.earliest
return True
items.insert(idx, Interval(it.earliest, it.latest))
return True
def pop_gaps(self, bound: Interval, max_size: int, max_count: int = 0) -> List[Interval]:
"""
Return unprocessed ranges within bound, newest-first, and claim each in
the same call so the same range is never returned twice.
Returns at most max_size values in total across all gaps, and — when
max_count > 0 — at most max_count gaps. Returns [] when bound is invalid,
max_size <= 0, or bound is fully covered.
"""
if max_size <= 0 or not bound.valid:
return []
gaps = self._find_gaps(bound, max_size, max_count)
for gap in gaps:
self.push(gap)
return gaps
def _find_gaps(self, bound: Interval, max_size: int, max_count: int) -> List[Interval]:
"""Walk the complement of the queue within bound, newest-first, read-only."""
gaps: List[Interval] = []
total = 0
ceil = bound.latest
def collect(gap: Interval) -> bool:
nonlocal total
gap.clamp(bound)
if not gap.valid:
return False
total += gap.left_limit(max_size - total)
gaps.append(Interval(gap.earliest, gap.latest))
return total >= max_size or (max_count > 0 and len(gaps) >= max_count)
for it in reversed(self._items):
if collect(Interval(it.latest, ceil)):
return gaps
if it.earliest < ceil:
ceil = it.earliest
collect(Interval(bound.earliest, ceil))
return gaps
def cut(self, it: Interval) -> bool:
"""Remove interval `it` from the queue, splitting items as needed."""
cut = False
new_items: List[Interval] = []
for current in self._items:
if current.overlaped(it):
cut = True
if current.earliest < it.earliest and current.latest > it.latest:
new_items.append(Interval(current.earliest, it.earliest))
new_items.append(Interval(it.latest, current.latest))
elif current.earliest < it.earliest:
new_items.append(Interval(current.earliest, it.earliest))
elif current.latest > it.latest:
new_items.append(Interval(it.latest, current.latest))
# else: fully covered, drop
else:
new_items.append(current)
self._items = new_items
return cut
def all_heights(self) -> List[int]:
result: List[int] = []
for it in self._items:
result.extend(range(it.earliest, it.latest))
return result
def contains_height(self, h: int) -> bool:
"""Binary search: is h covered by any interval?"""
idx = bisect.bisect_right(self._items, h, key=lambda x: x.earliest) - 1
return idx >= 0 and self._items[idx].latest > h
def fully_covered(self, bound: Interval) -> bool:
return self.remaining_size(bound) == 0
def peek_gaps(self, bound: Interval, max_size: int, max_count: int = 0) -> List[Interval]:
"""Like pop_gaps but read-only — nothing is claimed."""
if max_size <= 0 or not bound.valid:
return []
return self._find_gaps(bound, max_size, max_count)
def merge(self, other: 'IntervalQueue') -> None:
for it in other._items:
self.push(it)
def count(self) -> int:
"""Number of distinct intervals (not total values)."""
return len(self._items)
def frontier(self) -> Interval:
"""[earliest.earliest, latest.latest) spanning the full queue. Returns invalid Interval(0,0) when empty."""
if not self._items:
return Interval(0, 0)
return Interval(self._items[0].earliest, self._items[-1].latest)
def gap_count(self, bound: Interval) -> int:
if not bound.valid:
return 0
return len(self._find_gaps(bound, bound.size, 0))
def remaining_size(self, available: Interval) -> int:
remaining = available.size
for it in self._items:
clamped = Interval(it.earliest, it.latest)
clamped.clamp(available)
remaining -= max(0, clamped.size)
return remaining
def to_list(self) -> list:
return [[it.earliest, it.latest] for it in self._items]
def to_json(self) -> list:
return [it.to_dict() for it in self._items]
@classmethod
def from_list(cls, data: list) -> IntervalQueue:
return cls([Interval(e, l) for e, l in data])
@classmethod
def from_json(cls, data: list) -> IntervalQueue:
"""Load from wire format: [{"earliest": N, "latest": N}, ...]"""
return cls([Interval.from_dict(d) for d in (data or [])])