diff --git a/runtime/include/priority_queue.h b/runtime/include/priority_queue.h index 7b1650cc..600426dd 100644 --- a/runtime/include/priority_queue.h +++ b/runtime/include/priority_queue.h @@ -19,9 +19,21 @@ */ typedef uint64_t (*priority_queue_get_priority_fn_t)(void *element); +/** + * How to get/set an element's current 1-based index within the heap's backing array. + * Returns a pointer to a size_t slot stored inside the element itself. When provided, the + * queue keeps this slot in sync on every move, allowing priority_queue_delete to locate the + * element in O(1) and repair the heap in O(log n) instead of scanning in O(n). Pass NULL to + * priority_queue_initialize to keep the legacy linear-scan delete. + * @param element + * @returns pointer to the element's index slot + */ +typedef size_t *(*priority_queue_index_ptr_fn_t)(void *element); + /* We assume that priority is expressed in terms of a 64 bit unsigned integral */ struct priority_queue { priority_queue_get_priority_fn_t get_priority_fn; + priority_queue_index_ptr_fn_t get_index_fn; /* NULL => O(n) delete; non-NULL => O(log n) delete */ bool use_lock; lock_t lock; uint64_t highest_priority; @@ -49,6 +61,46 @@ priority_queue_update_highest_priority(struct priority_queue *priority_queue, co priority_queue->highest_priority = priority; } +/** + * Records an element's current slot inside the element itself, when index tracking is enabled. + * No-op (single branch) for queues initialized without an index accessor. + * @param priority_queue the priority queue + * @param index the 1-based slot whose occupant should remember its position + */ +static inline void +priority_queue_record_index(struct priority_queue *priority_queue, size_t index) +{ + if (priority_queue->get_index_fn != NULL) + *priority_queue->get_index_fn(priority_queue->items[index]) = index; +} + +/** + * Marks an element as no longer enqueued (slot 0), when index tracking is enabled + * @param priority_queue the priority queue + * @param element the departing element + */ +static inline void +priority_queue_clear_index(struct priority_queue *priority_queue, void *element) +{ + if (priority_queue->get_index_fn != NULL) *priority_queue->get_index_fn(element) = 0; +} + +/** + * Swaps two heap slots, keeping any tracked indices in sync + * @param priority_queue the priority queue + * @param a 1-based slot + * @param b 1-based slot + */ +static inline void +priority_queue_swap(struct priority_queue *priority_queue, size_t a, size_t b) +{ + void *temp = priority_queue->items[a]; + priority_queue->items[a] = priority_queue->items[b]; + priority_queue->items[b] = temp; + priority_queue_record_index(priority_queue, a); + priority_queue_record_index(priority_queue, b); +} + /** * Adds a value to the end of the binary heap * @param priority_queue the priority queue @@ -67,6 +119,7 @@ priority_queue_append(struct priority_queue *priority_queue, void *new_item) if (unlikely(priority_queue->size > priority_queue->capacity)) panic("PQ overflow"); if (unlikely(priority_queue->size == priority_queue->capacity)) goto err_enospc; priority_queue->items[++priority_queue->size] = new_item; + priority_queue_record_index(priority_queue, priority_queue->size); rc = 0; done: @@ -90,6 +143,32 @@ priority_queue_is_empty(struct priority_queue *priority_queue) return priority_queue->size == 0; } +/** + * Shifts the value at start_index upwards to restore the heap structure property, keeping any + * tracked indices and the memoized highest priority in sync + * @param priority_queue the priority queue + * @param start_index the 1-based slot to percolate up from + */ +static inline void +priority_queue_percolate_up_from(struct priority_queue *priority_queue, int start_index) +{ + assert(priority_queue != NULL); + assert(priority_queue->get_priority_fn != NULL); + assert(!priority_queue->use_lock || lock_is_locked(&priority_queue->lock)); + + for (int i = start_index; i / 2 != 0 + && priority_queue->get_priority_fn(priority_queue->items[i]) + < priority_queue->get_priority_fn(priority_queue->items[i / 2]); + i /= 2) { + assert(priority_queue->get_priority_fn(priority_queue->items[i]) != ULONG_MAX); + priority_queue_swap(priority_queue, i / 2, i); + /* If percolated to highest priority, update highest priority */ + if (i / 2 == 1) + priority_queue_update_highest_priority(priority_queue, priority_queue->get_priority_fn( + priority_queue->items[1])); + } +} + /** * Shifts an appended value upwards to restore heap structure property * @param priority_queue the priority queue @@ -108,19 +187,7 @@ priority_queue_percolate_up(struct priority_queue *priority_queue) return; } - for (int i = priority_queue->size; i / 2 != 0 - && priority_queue->get_priority_fn(priority_queue->items[i]) - < priority_queue->get_priority_fn(priority_queue->items[i / 2]); - i /= 2) { - assert(priority_queue->get_priority_fn(priority_queue->items[i]) != ULONG_MAX); - void *temp = priority_queue->items[i / 2]; - priority_queue->items[i / 2] = priority_queue->items[i]; - priority_queue->items[i] = temp; - /* If percolated to highest priority, update highest priority */ - if (i / 2 == 1) - priority_queue_update_highest_priority(priority_queue, priority_queue->get_priority_fn( - priority_queue->items[1])); - } + priority_queue_percolate_up_from(priority_queue, priority_queue->size); } /** @@ -180,9 +247,7 @@ priority_queue_percolate_down(struct priority_queue *priority_queue, int parent_ <= priority_queue->get_priority_fn(priority_queue->items[smallest_child_index])) break; /* Otherwise, swap and continue down the tree */ - void *temp = priority_queue->items[smallest_child_index]; - priority_queue->items[smallest_child_index] = priority_queue->items[parent_index]; - priority_queue->items[parent_index] = temp; + priority_queue_swap(priority_queue, smallest_child_index, parent_index); parent_index = smallest_child_index; left_child_index = 2 * parent_index; @@ -225,9 +290,11 @@ priority_queue_dequeue_if_earlier_nolock(struct priority_queue *priority_queue, if (priority_queue_is_empty(priority_queue) || priority_queue->highest_priority >= target_deadline) goto err_enoent; - *dequeued_element = priority_queue->items[1]; + *dequeued_element = priority_queue->items[1]; + priority_queue_clear_index(priority_queue, *dequeued_element); priority_queue->items[1] = priority_queue->items[priority_queue->size]; priority_queue->items[priority_queue->size--] = NULL; + if (priority_queue->size >= 1) priority_queue_record_index(priority_queue, 1); priority_queue_percolate_down(priority_queue, 1); return_code = 0; @@ -264,10 +331,13 @@ priority_queue_dequeue_if_earlier(struct priority_queue *priority_queue, void ** * @param capacity the number of elements to store in the data structure * @param use_lock indicates that we want a concurrent data structure * @param get_priority_fn pointer to a function that returns the priority of an element + * @param get_index_fn pointer to a function exposing an element's in-struct index slot, enabling + * O(log n) priority_queue_delete; pass NULL to keep the legacy O(n) linear-scan delete * @return priority queue */ static inline struct priority_queue * -priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn) +priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn, + priority_queue_index_ptr_fn_t get_index_fn) { assert(get_priority_fn != NULL); @@ -280,6 +350,7 @@ priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_pri priority_queue->size = 0; priority_queue->capacity = capacity; priority_queue->get_priority_fn = get_priority_fn; + priority_queue->get_index_fn = get_index_fn; priority_queue->use_lock = use_lock; if (use_lock) lock_init(&priority_queue->lock); @@ -408,16 +479,41 @@ priority_queue_delete_nolock(struct priority_queue *priority_queue, void *value) assert(value != NULL); assert(!priority_queue->use_lock || lock_is_locked(&priority_queue->lock)); - for (int i = 1; i <= priority_queue->size; i++) { - if (priority_queue->items[i] == value) { - priority_queue->items[i] = priority_queue->items[priority_queue->size]; - priority_queue->items[priority_queue->size--] = NULL; + int i; + if (priority_queue->get_index_fn != NULL) { + /* O(log n): the element remembers its own slot. Validate it still points back to value + * (guards against a value that was never enqueued or already removed). */ + i = (int)*priority_queue->get_index_fn(value); + if (i < 1 || (size_t)i > priority_queue->size || priority_queue->items[i] != value) return -1; + } else { + /* O(n) fallback: linear scan for the element */ + for (i = 1; (size_t)i <= priority_queue->size && priority_queue->items[i] != value; i++) {} + if ((size_t)i > priority_queue->size) return -1; + } + + priority_queue_clear_index(priority_queue, value); + + /* Fill the hole with the last element and shrink */ + priority_queue->items[i] = priority_queue->items[priority_queue->size]; + priority_queue->items[priority_queue->size--] = NULL; + + if ((size_t)i <= priority_queue->size) { + priority_queue_record_index(priority_queue, i); + /* Restore the heap property from the hole. Unlike a pop, the replacement can be smaller + * than the hole's parent, so it may need to rise rather than sink. */ + if (i > 1 + && priority_queue->get_priority_fn(priority_queue->items[i]) + < priority_queue->get_priority_fn(priority_queue->items[i / 2])) { + priority_queue_percolate_up_from(priority_queue, i); + } else { priority_queue_percolate_down(priority_queue, i); - return 0; } + } else if (priority_queue->size == 0) { + /* Removed the final element */ + priority_queue_update_highest_priority(priority_queue, ULONG_MAX); } - return -1; + return 0; } /** diff --git a/runtime/include/sandbox_functions.h b/runtime/include/sandbox_functions.h index 0f28516d..9d306665 100644 --- a/runtime/include/sandbox_functions.h +++ b/runtime/include/sandbox_functions.h @@ -51,6 +51,12 @@ sandbox_get_priority(void *element) return sandbox->absolute_deadline; } +static inline size_t * +sandbox_get_index_ptr(void *element) +{ + return &((struct sandbox *)element)->pq_idx; +} + static inline void sandbox_process_scheduler_updates(struct sandbox *sandbox) { diff --git a/runtime/include/sandbox_types.h b/runtime/include/sandbox_types.h index 4c4dd70b..9f16c555 100644 --- a/runtime/include/sandbox_types.h +++ b/runtime/include/sandbox_types.h @@ -64,6 +64,7 @@ struct sandbox { uint64_t remaining_exec; uint64_t absolute_deadline; + size_t pq_idx; /* 1-based slot in its scheduling priority_queue (0 = not enqueued) */ uint64_t admissions_estimate; /* estimated execution time (cycles) * runtime_admissions_granularity / relative deadline (cycles) */ uint64_t total_time; /* Total time from Request to Response */ diff --git a/runtime/include/tenant.h b/runtime/include/tenant.h index 615bcead..ac8d0c1c 100644 --- a/runtime/include/tenant.h +++ b/runtime/include/tenant.h @@ -16,6 +16,7 @@ struct tenant_timeout { uint64_t timeout; struct tenant *tenant; struct perworker_tenant_sandbox_queue *pwt; + size_t pq_idx; /* 1-based slot in its timeout priority_queue (0 = not enqueued) */ }; struct perworker_tenant_sandbox_queue { @@ -23,6 +24,7 @@ struct perworker_tenant_sandbox_queue { struct tenant *tenant; // to be able to find the RB/MB/RP/RT. struct tenant_timeout tenant_timeout; enum MULTI_TENANCY_CLASS mt_class; // check whether the corresponding PWM has been demoted + size_t pq_idx; /* 1-based slot in the local runqueue priority_queue (0 = not enqueued) */ } __attribute__((aligned(CACHE_PAD))); struct tenant_global_request_queue { @@ -30,6 +32,7 @@ struct tenant_global_request_queue { struct tenant *tenant; struct tenant_timeout tenant_timeout; _Atomic volatile enum MULTI_TENANCY_CLASS mt_class; + size_t pq_idx; /* 1-based slot in the global scheduler PQ (0 = not enqueued) */ }; struct tenant { diff --git a/runtime/include/tenant_functions.h b/runtime/include/tenant_functions.h index 18f96ce3..af1de7cf 100644 --- a/runtime/include/tenant_functions.h +++ b/runtime/include/tenant_functions.h @@ -49,7 +49,8 @@ tenant_policy_specific_init(struct tenant *tenant, struct tenant_config *config) for (int i = 0; i < runtime_worker_threads_count; i++) { tenant->pwt_sandboxes[i].sandboxes = priority_queue_initialize(RUNTIME_TENANT_QUEUE_SIZE, false, - sandbox_get_priority); + sandbox_get_priority, + sandbox_get_index_ptr); tenant->pwt_sandboxes[i].tenant = tenant; tenant->pwt_sandboxes[i].mt_class = (tenant->replenishment_period == 0) ? MT_DEFAULT : MT_GUARANTEED; @@ -59,8 +60,9 @@ tenant_policy_specific_init(struct tenant *tenant, struct tenant_config *config) /* Initialize the tenant's global request queue */ tenant->tgrq_requests = malloc(sizeof(struct tenant_global_request_queue)); + /* sandbox_requests is never delete()'d from directly, so it keeps the legacy O(n) delete (NULL) */ tenant->tgrq_requests->sandbox_requests = priority_queue_initialize(RUNTIME_TENANT_QUEUE_SIZE, false, - sandbox_get_priority); + sandbox_get_priority, NULL); tenant->tgrq_requests->tenant = tenant; tenant->tgrq_requests->mt_class = (tenant->replenishment_period == 0) ? MT_DEFAULT : MT_GUARANTEED; tenant->tgrq_requests->tenant_timeout.tenant = tenant; @@ -173,6 +175,12 @@ tenant_timeout_get_priority(void *element) return ((struct tenant_timeout *)element)->timeout; } +static inline size_t * +tenant_timeout_get_index_ptr(void *element) +{ + return &((struct tenant_timeout *)element)->pq_idx; +} + /** * Compute the next timeout given a tenant's replenishment period * @param m_replenishment_period diff --git a/runtime/src/global_request_scheduler_minheap.c b/runtime/src/global_request_scheduler_minheap.c index 5f0b894f..f47dfb7d 100644 --- a/runtime/src/global_request_scheduler_minheap.c +++ b/runtime/src/global_request_scheduler_minheap.c @@ -77,7 +77,8 @@ sandbox_get_priority_fn(void *element) void global_request_scheduler_minheap_initialize() { - global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_get_priority_fn); + /* This queue is only enqueued/dequeued (never delete()'d from), so it keeps the legacy delete (NULL) */ + global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_get_priority_fn, NULL); struct global_request_scheduler_config config = {.add_fn = global_request_scheduler_minheap_add, .remove_fn = global_request_scheduler_minheap_remove, diff --git a/runtime/src/global_request_scheduler_mtds.c b/runtime/src/global_request_scheduler_mtds.c index 5d4c1b7f..36f5ebce 100644 --- a/runtime/src/global_request_scheduler_mtds.c +++ b/runtime/src/global_request_scheduler_mtds.c @@ -22,6 +22,12 @@ tenant_request_queue_get_priority(void *element) return (sandbox) ? sandbox->absolute_deadline : UINT64_MAX; } +static inline size_t * +tenant_request_queue_get_index_ptr(void *element) +{ + return &((struct tenant_global_request_queue *)element)->pq_idx; +} + /** * Demotes the given tenant's request queue, which means deletes the TGRQ from the Guaranteed queue * and adds to the Default queue. @@ -221,12 +227,14 @@ void global_request_scheduler_mtds_initialize() { global_request_scheduler_mtds_guaranteed = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false, - tenant_request_queue_get_priority); + tenant_request_queue_get_priority, + tenant_request_queue_get_index_ptr); global_request_scheduler_mtds_default = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false, - tenant_request_queue_get_priority); + tenant_request_queue_get_priority, + tenant_request_queue_get_index_ptr); global_tenant_timeout_queue = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false, - tenant_timeout_get_priority); + tenant_timeout_get_priority, tenant_timeout_get_index_ptr); lock_init(&global_lock); diff --git a/runtime/src/local_runqueue_minheap.c b/runtime/src/local_runqueue_minheap.c index 7d4c3126..4d416872 100644 --- a/runtime/src/local_runqueue_minheap.c +++ b/runtime/src/local_runqueue_minheap.c @@ -81,7 +81,8 @@ void local_runqueue_minheap_initialize() { /* Initialize local state */ - local_runqueue_minheap = priority_queue_initialize(RUNTIME_RUNQUEUE_SIZE, false, sandbox_get_priority); + local_runqueue_minheap = priority_queue_initialize(RUNTIME_RUNQUEUE_SIZE, false, sandbox_get_priority, + sandbox_get_index_ptr); /* Register Function Pointers for Abstract Scheduling API */ struct local_runqueue_config config = {.add_fn = local_runqueue_minheap_add, diff --git a/runtime/src/local_runqueue_mtds.c b/runtime/src/local_runqueue_mtds.c index 318339cf..564e4c61 100644 --- a/runtime/src/local_runqueue_mtds.c +++ b/runtime/src/local_runqueue_mtds.c @@ -32,6 +32,12 @@ perworker_tenant_get_priority(void *element) return (sandbox) ? sandbox->absolute_deadline : UINT64_MAX; } +static inline size_t * +perworker_tenant_get_index_ptr(void *element) +{ + return &((struct perworker_tenant_sandbox_queue *)element)->pq_idx; +} + /** * Checks if the run queue is empty * @returns true if empty. false otherwise @@ -167,9 +173,11 @@ local_runqueue_mtds_initialize() { /* Initialize local state */ local_runqueue_mtds_guaranteed = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false, - perworker_tenant_get_priority); + perworker_tenant_get_priority, + perworker_tenant_get_index_ptr); local_runqueue_mtds_default = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false, - perworker_tenant_get_priority); + perworker_tenant_get_priority, + perworker_tenant_get_index_ptr); /* Register Function Pointers for Abstract Scheduling API */ struct local_runqueue_config config = {.add_fn = local_runqueue_mtds_add, diff --git a/runtime/src/worker_thread.c b/runtime/src/worker_thread.c index ae993ec4..13a84c91 100644 --- a/runtime/src/worker_thread.c +++ b/runtime/src/worker_thread.c @@ -59,7 +59,8 @@ worker_thread_main(void *argument) if (scheduler == SCHEDULER_MTDS) { worker_thread_timeout_queue = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false, - tenant_timeout_get_priority); + tenant_timeout_get_priority, + tenant_timeout_get_index_ptr); } software_interrupt_unmask_signal(SIGFPE);