Skip to content

Commit 935aae8

Browse files
bushidocodesclaude
andcommitted
perf: make priority_queue delete O(log n) via in-element index tracking (#263)
The min-heap priority_queue located elements for deletion with an O(n) linear scan. This makes delete O(log n) by having each element store its own 1-based heap index, which the queue keeps in sync on every move (append, swaps during percolate up/down, dequeue, delete). delete then reads the element's slot directly and repairs the heap in O(log n). Mirrors the approach in the composite codebase referenced in the issue. - priority_queue.h: * Add an optional `get_index_fn` accessor (returns a pointer to the element's in-struct size_t index slot). NULL preserves the legacy O(n) linear-scan delete, so the change is opt-in per queue and behavior-preserving where not wired up. * Maintain the index slot on every element move via small record/clear/swap helpers; generalize percolate_up into percolate_up_from(start_index). * Rewrite delete_nolock: O(1) lookup via the slot (with a defensive items[i]==value validation that still returns -1 for absent elements), then repair the heap. Also fixes a latent correctness bug: arbitrary-position delete must percolate the replacement UP or DOWN, not down-only — the old code could leave the heap invalid when the moved last element was smaller than the hole's parent. - Add a `size_t pq_idx` slot + accessor to the four element types that are delete()'d (sandbox, tenant_global_request_queue, perworker_tenant_sandbox_queue, tenant_timeout) and wire the accessor into the 7 queues that call delete. Queues that are only enqueued/dequeued (global_request_scheduler_minheap, tgrq->sandbox_requests) keep the NULL/legacy path. Each tracked element is a member of at most one tracked queue at a time, so a single slot is sufficient. Verified with a deterministic unit test against the real header (heap-invariant checks, the percolate-up-on-delete case, absent-element delete, and 200k random ops proving the tracked O(log n) path is behaviorally identical to the legacy O(n) path), clean under ASan+UBSan. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 1e7bd47 commit 935aae8

10 files changed

Lines changed: 167 additions & 34 deletions

runtime/include/priority_queue.h

Lines changed: 120 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,21 @@
1919
*/
2020
typedef uint64_t (*priority_queue_get_priority_fn_t)(void *element);
2121

22+
/**
23+
* How to get/set an element's current 1-based index within the heap's backing array.
24+
* Returns a pointer to a size_t slot stored inside the element itself. When provided, the
25+
* queue keeps this slot in sync on every move, allowing priority_queue_delete to locate the
26+
* element in O(1) and repair the heap in O(log n) instead of scanning in O(n). Pass NULL to
27+
* priority_queue_initialize to keep the legacy linear-scan delete.
28+
* @param element
29+
* @returns pointer to the element's index slot
30+
*/
31+
typedef size_t *(*priority_queue_index_ptr_fn_t)(void *element);
32+
2233
/* We assume that priority is expressed in terms of a 64 bit unsigned integral */
2334
struct priority_queue {
2435
priority_queue_get_priority_fn_t get_priority_fn;
36+
priority_queue_index_ptr_fn_t get_index_fn; /* NULL => O(n) delete; non-NULL => O(log n) delete */
2537
bool use_lock;
2638
lock_t lock;
2739
uint64_t highest_priority;
@@ -49,6 +61,46 @@ priority_queue_update_highest_priority(struct priority_queue *priority_queue, co
4961
priority_queue->highest_priority = priority;
5062
}
5163

64+
/**
65+
* Records an element's current slot inside the element itself, when index tracking is enabled.
66+
* No-op (single branch) for queues initialized without an index accessor.
67+
* @param priority_queue the priority queue
68+
* @param index the 1-based slot whose occupant should remember its position
69+
*/
70+
static inline void
71+
priority_queue_record_index(struct priority_queue *priority_queue, size_t index)
72+
{
73+
if (priority_queue->get_index_fn != NULL)
74+
*priority_queue->get_index_fn(priority_queue->items[index]) = index;
75+
}
76+
77+
/**
78+
* Marks an element as no longer enqueued (slot 0), when index tracking is enabled
79+
* @param priority_queue the priority queue
80+
* @param element the departing element
81+
*/
82+
static inline void
83+
priority_queue_clear_index(struct priority_queue *priority_queue, void *element)
84+
{
85+
if (priority_queue->get_index_fn != NULL) *priority_queue->get_index_fn(element) = 0;
86+
}
87+
88+
/**
89+
* Swaps two heap slots, keeping any tracked indices in sync
90+
* @param priority_queue the priority queue
91+
* @param a 1-based slot
92+
* @param b 1-based slot
93+
*/
94+
static inline void
95+
priority_queue_swap(struct priority_queue *priority_queue, size_t a, size_t b)
96+
{
97+
void *temp = priority_queue->items[a];
98+
priority_queue->items[a] = priority_queue->items[b];
99+
priority_queue->items[b] = temp;
100+
priority_queue_record_index(priority_queue, a);
101+
priority_queue_record_index(priority_queue, b);
102+
}
103+
52104
/**
53105
* Adds a value to the end of the binary heap
54106
* @param priority_queue the priority queue
@@ -67,6 +119,7 @@ priority_queue_append(struct priority_queue *priority_queue, void *new_item)
67119
if (unlikely(priority_queue->size > priority_queue->capacity)) panic("PQ overflow");
68120
if (unlikely(priority_queue->size == priority_queue->capacity)) goto err_enospc;
69121
priority_queue->items[++priority_queue->size] = new_item;
122+
priority_queue_record_index(priority_queue, priority_queue->size);
70123

71124
rc = 0;
72125
done:
@@ -90,6 +143,32 @@ priority_queue_is_empty(struct priority_queue *priority_queue)
90143
return priority_queue->size == 0;
91144
}
92145

146+
/**
147+
* Shifts the value at start_index upwards to restore the heap structure property, keeping any
148+
* tracked indices and the memoized highest priority in sync
149+
* @param priority_queue the priority queue
150+
* @param start_index the 1-based slot to percolate up from
151+
*/
152+
static inline void
153+
priority_queue_percolate_up_from(struct priority_queue *priority_queue, int start_index)
154+
{
155+
assert(priority_queue != NULL);
156+
assert(priority_queue->get_priority_fn != NULL);
157+
assert(!priority_queue->use_lock || lock_is_locked(&priority_queue->lock));
158+
159+
for (int i = start_index; i / 2 != 0
160+
&& priority_queue->get_priority_fn(priority_queue->items[i])
161+
< priority_queue->get_priority_fn(priority_queue->items[i / 2]);
162+
i /= 2) {
163+
assert(priority_queue->get_priority_fn(priority_queue->items[i]) != ULONG_MAX);
164+
priority_queue_swap(priority_queue, i / 2, i);
165+
/* If percolated to highest priority, update highest priority */
166+
if (i / 2 == 1)
167+
priority_queue_update_highest_priority(priority_queue, priority_queue->get_priority_fn(
168+
priority_queue->items[1]));
169+
}
170+
}
171+
93172
/**
94173
* Shifts an appended value upwards to restore heap structure property
95174
* @param priority_queue the priority queue
@@ -108,19 +187,7 @@ priority_queue_percolate_up(struct priority_queue *priority_queue)
108187
return;
109188
}
110189

111-
for (int i = priority_queue->size; i / 2 != 0
112-
&& priority_queue->get_priority_fn(priority_queue->items[i])
113-
< priority_queue->get_priority_fn(priority_queue->items[i / 2]);
114-
i /= 2) {
115-
assert(priority_queue->get_priority_fn(priority_queue->items[i]) != ULONG_MAX);
116-
void *temp = priority_queue->items[i / 2];
117-
priority_queue->items[i / 2] = priority_queue->items[i];
118-
priority_queue->items[i] = temp;
119-
/* If percolated to highest priority, update highest priority */
120-
if (i / 2 == 1)
121-
priority_queue_update_highest_priority(priority_queue, priority_queue->get_priority_fn(
122-
priority_queue->items[1]));
123-
}
190+
priority_queue_percolate_up_from(priority_queue, priority_queue->size);
124191
}
125192

126193
/**
@@ -180,9 +247,7 @@ priority_queue_percolate_down(struct priority_queue *priority_queue, int parent_
180247
<= priority_queue->get_priority_fn(priority_queue->items[smallest_child_index]))
181248
break;
182249
/* Otherwise, swap and continue down the tree */
183-
void *temp = priority_queue->items[smallest_child_index];
184-
priority_queue->items[smallest_child_index] = priority_queue->items[parent_index];
185-
priority_queue->items[parent_index] = temp;
250+
priority_queue_swap(priority_queue, smallest_child_index, parent_index);
186251

187252
parent_index = smallest_child_index;
188253
left_child_index = 2 * parent_index;
@@ -225,9 +290,11 @@ priority_queue_dequeue_if_earlier_nolock(struct priority_queue *priority_queue,
225290
if (priority_queue_is_empty(priority_queue) || priority_queue->highest_priority >= target_deadline)
226291
goto err_enoent;
227292

228-
*dequeued_element = priority_queue->items[1];
293+
*dequeued_element = priority_queue->items[1];
294+
priority_queue_clear_index(priority_queue, *dequeued_element);
229295
priority_queue->items[1] = priority_queue->items[priority_queue->size];
230296
priority_queue->items[priority_queue->size--] = NULL;
297+
if (priority_queue->size >= 1) priority_queue_record_index(priority_queue, 1);
231298

232299
priority_queue_percolate_down(priority_queue, 1);
233300
return_code = 0;
@@ -264,10 +331,13 @@ priority_queue_dequeue_if_earlier(struct priority_queue *priority_queue, void **
264331
* @param capacity the number of elements to store in the data structure
265332
* @param use_lock indicates that we want a concurrent data structure
266333
* @param get_priority_fn pointer to a function that returns the priority of an element
334+
* @param get_index_fn pointer to a function exposing an element's in-struct index slot, enabling
335+
* O(log n) priority_queue_delete; pass NULL to keep the legacy O(n) linear-scan delete
267336
* @return priority queue
268337
*/
269338
static inline struct priority_queue *
270-
priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn)
339+
priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn,
340+
priority_queue_index_ptr_fn_t get_index_fn)
271341
{
272342
assert(get_priority_fn != NULL);
273343

@@ -280,6 +350,7 @@ priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_pri
280350
priority_queue->size = 0;
281351
priority_queue->capacity = capacity;
282352
priority_queue->get_priority_fn = get_priority_fn;
353+
priority_queue->get_index_fn = get_index_fn;
283354
priority_queue->use_lock = use_lock;
284355

285356
if (use_lock) lock_init(&priority_queue->lock);
@@ -408,16 +479,41 @@ priority_queue_delete_nolock(struct priority_queue *priority_queue, void *value)
408479
assert(value != NULL);
409480
assert(!priority_queue->use_lock || lock_is_locked(&priority_queue->lock));
410481

411-
for (int i = 1; i <= priority_queue->size; i++) {
412-
if (priority_queue->items[i] == value) {
413-
priority_queue->items[i] = priority_queue->items[priority_queue->size];
414-
priority_queue->items[priority_queue->size--] = NULL;
482+
int i;
483+
if (priority_queue->get_index_fn != NULL) {
484+
/* O(log n): the element remembers its own slot. Validate it still points back to value
485+
* (guards against a value that was never enqueued or already removed). */
486+
i = (int)*priority_queue->get_index_fn(value);
487+
if (i < 1 || (size_t)i > priority_queue->size || priority_queue->items[i] != value) return -1;
488+
} else {
489+
/* O(n) fallback: linear scan for the element */
490+
for (i = 1; (size_t)i <= priority_queue->size && priority_queue->items[i] != value; i++) {}
491+
if ((size_t)i > priority_queue->size) return -1;
492+
}
493+
494+
priority_queue_clear_index(priority_queue, value);
495+
496+
/* Fill the hole with the last element and shrink */
497+
priority_queue->items[i] = priority_queue->items[priority_queue->size];
498+
priority_queue->items[priority_queue->size--] = NULL;
499+
500+
if ((size_t)i <= priority_queue->size) {
501+
priority_queue_record_index(priority_queue, i);
502+
/* Restore the heap property from the hole. Unlike a pop, the replacement can be smaller
503+
* than the hole's parent, so it may need to rise rather than sink. */
504+
if (i > 1
505+
&& priority_queue->get_priority_fn(priority_queue->items[i])
506+
< priority_queue->get_priority_fn(priority_queue->items[i / 2])) {
507+
priority_queue_percolate_up_from(priority_queue, i);
508+
} else {
415509
priority_queue_percolate_down(priority_queue, i);
416-
return 0;
417510
}
511+
} else if (priority_queue->size == 0) {
512+
/* Removed the final element */
513+
priority_queue_update_highest_priority(priority_queue, ULONG_MAX);
418514
}
419515

420-
return -1;
516+
return 0;
421517
}
422518

423519
/**

runtime/include/sandbox_functions.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ sandbox_get_priority(void *element)
5151
return sandbox->absolute_deadline;
5252
}
5353

54+
static inline size_t *
55+
sandbox_get_index_ptr(void *element)
56+
{
57+
return &((struct sandbox *)element)->pq_idx;
58+
}
59+
5460
static inline void
5561
sandbox_process_scheduler_updates(struct sandbox *sandbox)
5662
{

runtime/include/sandbox_types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ struct sandbox {
6464

6565
uint64_t remaining_exec;
6666
uint64_t absolute_deadline;
67+
size_t pq_idx; /* 1-based slot in its scheduling priority_queue (0 = not enqueued) */
6768
uint64_t admissions_estimate; /* estimated execution time (cycles) * runtime_admissions_granularity / relative
6869
deadline (cycles) */
6970
uint64_t total_time; /* Total time from Request to Response */

runtime/include/tenant.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,23 @@ struct tenant_timeout {
1616
uint64_t timeout;
1717
struct tenant *tenant;
1818
struct perworker_tenant_sandbox_queue *pwt;
19+
size_t pq_idx; /* 1-based slot in its timeout priority_queue (0 = not enqueued) */
1920
};
2021

2122
struct perworker_tenant_sandbox_queue {
2223
struct priority_queue *sandboxes;
2324
struct tenant *tenant; // to be able to find the RB/MB/RP/RT.
2425
struct tenant_timeout tenant_timeout;
2526
enum MULTI_TENANCY_CLASS mt_class; // check whether the corresponding PWM has been demoted
27+
size_t pq_idx; /* 1-based slot in the local runqueue priority_queue (0 = not enqueued) */
2628
} __attribute__((aligned(CACHE_PAD)));
2729

2830
struct tenant_global_request_queue {
2931
struct priority_queue *sandbox_requests;
3032
struct tenant *tenant;
3133
struct tenant_timeout tenant_timeout;
3234
_Atomic volatile enum MULTI_TENANCY_CLASS mt_class;
35+
size_t pq_idx; /* 1-based slot in the global scheduler PQ (0 = not enqueued) */
3336
};
3437

3538
struct tenant {

runtime/include/tenant_functions.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ tenant_policy_specific_init(struct tenant *tenant, struct tenant_config *config)
4949

5050
for (int i = 0; i < runtime_worker_threads_count; i++) {
5151
tenant->pwt_sandboxes[i].sandboxes = priority_queue_initialize(RUNTIME_TENANT_QUEUE_SIZE, false,
52-
sandbox_get_priority);
52+
sandbox_get_priority,
53+
sandbox_get_index_ptr);
5354
tenant->pwt_sandboxes[i].tenant = tenant;
5455
tenant->pwt_sandboxes[i].mt_class = (tenant->replenishment_period == 0) ? MT_DEFAULT
5556
: MT_GUARANTEED;
@@ -59,8 +60,9 @@ tenant_policy_specific_init(struct tenant *tenant, struct tenant_config *config)
5960

6061
/* Initialize the tenant's global request queue */
6162
tenant->tgrq_requests = malloc(sizeof(struct tenant_global_request_queue));
63+
/* sandbox_requests is never delete()'d from directly, so it keeps the legacy O(n) delete (NULL) */
6264
tenant->tgrq_requests->sandbox_requests = priority_queue_initialize(RUNTIME_TENANT_QUEUE_SIZE, false,
63-
sandbox_get_priority);
65+
sandbox_get_priority, NULL);
6466
tenant->tgrq_requests->tenant = tenant;
6567
tenant->tgrq_requests->mt_class = (tenant->replenishment_period == 0) ? MT_DEFAULT : MT_GUARANTEED;
6668
tenant->tgrq_requests->tenant_timeout.tenant = tenant;
@@ -173,6 +175,12 @@ tenant_timeout_get_priority(void *element)
173175
return ((struct tenant_timeout *)element)->timeout;
174176
}
175177

178+
static inline size_t *
179+
tenant_timeout_get_index_ptr(void *element)
180+
{
181+
return &((struct tenant_timeout *)element)->pq_idx;
182+
}
183+
176184
/**
177185
* Compute the next timeout given a tenant's replenishment period
178186
* @param m_replenishment_period

runtime/src/global_request_scheduler_minheap.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ sandbox_get_priority_fn(void *element)
7777
void
7878
global_request_scheduler_minheap_initialize()
7979
{
80-
global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_get_priority_fn);
80+
/* This queue is only enqueued/dequeued (never delete()'d from), so it keeps the legacy delete (NULL) */
81+
global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_get_priority_fn, NULL);
8182

8283
struct global_request_scheduler_config config = {.add_fn = global_request_scheduler_minheap_add,
8384
.remove_fn = global_request_scheduler_minheap_remove,

runtime/src/global_request_scheduler_mtds.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ tenant_request_queue_get_priority(void *element)
2222
return (sandbox) ? sandbox->absolute_deadline : UINT64_MAX;
2323
}
2424

25+
static inline size_t *
26+
tenant_request_queue_get_index_ptr(void *element)
27+
{
28+
return &((struct tenant_global_request_queue *)element)->pq_idx;
29+
}
30+
2531
/**
2632
* Demotes the given tenant's request queue, which means deletes the TGRQ from the Guaranteed queue
2733
* and adds to the Default queue.
@@ -221,12 +227,14 @@ void
221227
global_request_scheduler_mtds_initialize()
222228
{
223229
global_request_scheduler_mtds_guaranteed = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
224-
tenant_request_queue_get_priority);
230+
tenant_request_queue_get_priority,
231+
tenant_request_queue_get_index_ptr);
225232
global_request_scheduler_mtds_default = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
226-
tenant_request_queue_get_priority);
233+
tenant_request_queue_get_priority,
234+
tenant_request_queue_get_index_ptr);
227235

228236
global_tenant_timeout_queue = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
229-
tenant_timeout_get_priority);
237+
tenant_timeout_get_priority, tenant_timeout_get_index_ptr);
230238

231239
lock_init(&global_lock);
232240

runtime/src/local_runqueue_minheap.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ void
8181
local_runqueue_minheap_initialize()
8282
{
8383
/* Initialize local state */
84-
local_runqueue_minheap = priority_queue_initialize(RUNTIME_RUNQUEUE_SIZE, false, sandbox_get_priority);
84+
local_runqueue_minheap = priority_queue_initialize(RUNTIME_RUNQUEUE_SIZE, false, sandbox_get_priority,
85+
sandbox_get_index_ptr);
8586

8687
/* Register Function Pointers for Abstract Scheduling API */
8788
struct local_runqueue_config config = {.add_fn = local_runqueue_minheap_add,

runtime/src/local_runqueue_mtds.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@ perworker_tenant_get_priority(void *element)
3232
return (sandbox) ? sandbox->absolute_deadline : UINT64_MAX;
3333
}
3434

35+
static inline size_t *
36+
perworker_tenant_get_index_ptr(void *element)
37+
{
38+
return &((struct perworker_tenant_sandbox_queue *)element)->pq_idx;
39+
}
40+
3541
/**
3642
* Checks if the run queue is empty
3743
* @returns true if empty. false otherwise
@@ -167,9 +173,11 @@ local_runqueue_mtds_initialize()
167173
{
168174
/* Initialize local state */
169175
local_runqueue_mtds_guaranteed = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
170-
perworker_tenant_get_priority);
176+
perworker_tenant_get_priority,
177+
perworker_tenant_get_index_ptr);
171178
local_runqueue_mtds_default = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
172-
perworker_tenant_get_priority);
179+
perworker_tenant_get_priority,
180+
perworker_tenant_get_index_ptr);
173181

174182
/* Register Function Pointers for Abstract Scheduling API */
175183
struct local_runqueue_config config = {.add_fn = local_runqueue_mtds_add,

runtime/src/worker_thread.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ worker_thread_main(void *argument)
5959

6060
if (scheduler == SCHEDULER_MTDS) {
6161
worker_thread_timeout_queue = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
62-
tenant_timeout_get_priority);
62+
tenant_timeout_get_priority,
63+
tenant_timeout_get_index_ptr);
6364
}
6465

6566
software_interrupt_unmask_signal(SIGFPE);

0 commit comments

Comments
 (0)