Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 120 additions & 24 deletions runtime/include/priority_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,21 @@
*/
typedef uint64_t (*priority_queue_get_priority_fn_t)(void *element);

/**
* How to get/set an element's current 1-based index within the heap's backing array.
* Returns a pointer to a size_t slot stored inside the element itself. When provided, the
* queue keeps this slot in sync on every move, allowing priority_queue_delete to locate the
* element in O(1) and repair the heap in O(log n) instead of scanning in O(n). Pass NULL to
* priority_queue_initialize to keep the legacy linear-scan delete.
* @param element
* @returns pointer to the element's index slot
*/
typedef size_t *(*priority_queue_index_ptr_fn_t)(void *element);

/* We assume that priority is expressed in terms of a 64 bit unsigned integral */
struct priority_queue {
priority_queue_get_priority_fn_t get_priority_fn;
priority_queue_index_ptr_fn_t get_index_fn; /* NULL => O(n) delete; non-NULL => O(log n) delete */
bool use_lock;
lock_t lock;
uint64_t highest_priority;
Expand Down Expand Up @@ -49,6 +61,46 @@ priority_queue_update_highest_priority(struct priority_queue *priority_queue, co
priority_queue->highest_priority = priority;
}

/**
* Records an element's current slot inside the element itself, when index tracking is enabled.
* No-op (single branch) for queues initialized without an index accessor.
* @param priority_queue the priority queue
* @param index the 1-based slot whose occupant should remember its position
*/
static inline void
priority_queue_record_index(struct priority_queue *priority_queue, size_t index)
{
if (priority_queue->get_index_fn != NULL)
*priority_queue->get_index_fn(priority_queue->items[index]) = index;
}

/**
* Marks an element as no longer enqueued (slot 0), when index tracking is enabled
* @param priority_queue the priority queue
* @param element the departing element
*/
static inline void
priority_queue_clear_index(struct priority_queue *priority_queue, void *element)
{
if (priority_queue->get_index_fn != NULL) *priority_queue->get_index_fn(element) = 0;
}

/**
* Swaps two heap slots, keeping any tracked indices in sync
* @param priority_queue the priority queue
* @param a 1-based slot
* @param b 1-based slot
*/
static inline void
priority_queue_swap(struct priority_queue *priority_queue, size_t a, size_t b)
{
void *temp = priority_queue->items[a];
priority_queue->items[a] = priority_queue->items[b];
priority_queue->items[b] = temp;
priority_queue_record_index(priority_queue, a);
priority_queue_record_index(priority_queue, b);
}

/**
* Adds a value to the end of the binary heap
* @param priority_queue the priority queue
Expand All @@ -67,6 +119,7 @@ priority_queue_append(struct priority_queue *priority_queue, void *new_item)
if (unlikely(priority_queue->size > priority_queue->capacity)) panic("PQ overflow");
if (unlikely(priority_queue->size == priority_queue->capacity)) goto err_enospc;
priority_queue->items[++priority_queue->size] = new_item;
priority_queue_record_index(priority_queue, priority_queue->size);

rc = 0;
done:
Expand All @@ -90,6 +143,32 @@ priority_queue_is_empty(struct priority_queue *priority_queue)
return priority_queue->size == 0;
}

/**
* Shifts the value at start_index upwards to restore the heap structure property, keeping any
* tracked indices and the memoized highest priority in sync
* @param priority_queue the priority queue
* @param start_index the 1-based slot to percolate up from
*/
static inline void
priority_queue_percolate_up_from(struct priority_queue *priority_queue, int start_index)
{
assert(priority_queue != NULL);
assert(priority_queue->get_priority_fn != NULL);
assert(!priority_queue->use_lock || lock_is_locked(&priority_queue->lock));

for (int i = start_index; i / 2 != 0
&& priority_queue->get_priority_fn(priority_queue->items[i])
< priority_queue->get_priority_fn(priority_queue->items[i / 2]);
i /= 2) {
assert(priority_queue->get_priority_fn(priority_queue->items[i]) != ULONG_MAX);
priority_queue_swap(priority_queue, i / 2, i);
/* If percolated to highest priority, update highest priority */
if (i / 2 == 1)
priority_queue_update_highest_priority(priority_queue, priority_queue->get_priority_fn(
priority_queue->items[1]));
}
}

/**
* Shifts an appended value upwards to restore heap structure property
* @param priority_queue the priority queue
Expand All @@ -108,19 +187,7 @@ priority_queue_percolate_up(struct priority_queue *priority_queue)
return;
}

for (int i = priority_queue->size; i / 2 != 0
&& priority_queue->get_priority_fn(priority_queue->items[i])
< priority_queue->get_priority_fn(priority_queue->items[i / 2]);
i /= 2) {
assert(priority_queue->get_priority_fn(priority_queue->items[i]) != ULONG_MAX);
void *temp = priority_queue->items[i / 2];
priority_queue->items[i / 2] = priority_queue->items[i];
priority_queue->items[i] = temp;
/* If percolated to highest priority, update highest priority */
if (i / 2 == 1)
priority_queue_update_highest_priority(priority_queue, priority_queue->get_priority_fn(
priority_queue->items[1]));
}
priority_queue_percolate_up_from(priority_queue, priority_queue->size);
}

/**
Expand Down Expand Up @@ -180,9 +247,7 @@ priority_queue_percolate_down(struct priority_queue *priority_queue, int parent_
<= priority_queue->get_priority_fn(priority_queue->items[smallest_child_index]))
break;
/* Otherwise, swap and continue down the tree */
void *temp = priority_queue->items[smallest_child_index];
priority_queue->items[smallest_child_index] = priority_queue->items[parent_index];
priority_queue->items[parent_index] = temp;
priority_queue_swap(priority_queue, smallest_child_index, parent_index);

parent_index = smallest_child_index;
left_child_index = 2 * parent_index;
Expand Down Expand Up @@ -225,9 +290,11 @@ priority_queue_dequeue_if_earlier_nolock(struct priority_queue *priority_queue,
if (priority_queue_is_empty(priority_queue) || priority_queue->highest_priority >= target_deadline)
goto err_enoent;

*dequeued_element = priority_queue->items[1];
*dequeued_element = priority_queue->items[1];
priority_queue_clear_index(priority_queue, *dequeued_element);
priority_queue->items[1] = priority_queue->items[priority_queue->size];
priority_queue->items[priority_queue->size--] = NULL;
if (priority_queue->size >= 1) priority_queue_record_index(priority_queue, 1);

priority_queue_percolate_down(priority_queue, 1);
return_code = 0;
Expand Down Expand Up @@ -264,10 +331,13 @@ priority_queue_dequeue_if_earlier(struct priority_queue *priority_queue, void **
* @param capacity the number of elements to store in the data structure
* @param use_lock indicates that we want a concurrent data structure
* @param get_priority_fn pointer to a function that returns the priority of an element
* @param get_index_fn pointer to a function exposing an element's in-struct index slot, enabling
* O(log n) priority_queue_delete; pass NULL to keep the legacy O(n) linear-scan delete
* @return priority queue
*/
static inline struct priority_queue *
priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn)
priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_priority_fn_t get_priority_fn,
priority_queue_index_ptr_fn_t get_index_fn)
{
assert(get_priority_fn != NULL);

Expand All @@ -280,6 +350,7 @@ priority_queue_initialize(size_t capacity, bool use_lock, priority_queue_get_pri
priority_queue->size = 0;
priority_queue->capacity = capacity;
priority_queue->get_priority_fn = get_priority_fn;
priority_queue->get_index_fn = get_index_fn;
priority_queue->use_lock = use_lock;

if (use_lock) lock_init(&priority_queue->lock);
Expand Down Expand Up @@ -408,16 +479,41 @@ priority_queue_delete_nolock(struct priority_queue *priority_queue, void *value)
assert(value != NULL);
assert(!priority_queue->use_lock || lock_is_locked(&priority_queue->lock));

for (int i = 1; i <= priority_queue->size; i++) {
if (priority_queue->items[i] == value) {
priority_queue->items[i] = priority_queue->items[priority_queue->size];
priority_queue->items[priority_queue->size--] = NULL;
int i;
if (priority_queue->get_index_fn != NULL) {
/* O(log n): the element remembers its own slot. Validate it still points back to value
* (guards against a value that was never enqueued or already removed). */
i = (int)*priority_queue->get_index_fn(value);
if (i < 1 || (size_t)i > priority_queue->size || priority_queue->items[i] != value) return -1;
} else {
/* O(n) fallback: linear scan for the element */
for (i = 1; (size_t)i <= priority_queue->size && priority_queue->items[i] != value; i++) {}
if ((size_t)i > priority_queue->size) return -1;
}

priority_queue_clear_index(priority_queue, value);

/* Fill the hole with the last element and shrink */
priority_queue->items[i] = priority_queue->items[priority_queue->size];
priority_queue->items[priority_queue->size--] = NULL;

if ((size_t)i <= priority_queue->size) {
priority_queue_record_index(priority_queue, i);
/* Restore the heap property from the hole. Unlike a pop, the replacement can be smaller
* than the hole's parent, so it may need to rise rather than sink. */
if (i > 1
&& priority_queue->get_priority_fn(priority_queue->items[i])
< priority_queue->get_priority_fn(priority_queue->items[i / 2])) {
priority_queue_percolate_up_from(priority_queue, i);
} else {
priority_queue_percolate_down(priority_queue, i);
return 0;
}
} else if (priority_queue->size == 0) {
/* Removed the final element */
priority_queue_update_highest_priority(priority_queue, ULONG_MAX);
}

return -1;
return 0;
}

/**
Expand Down
6 changes: 6 additions & 0 deletions runtime/include/sandbox_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ sandbox_get_priority(void *element)
return sandbox->absolute_deadline;
}

static inline size_t *
sandbox_get_index_ptr(void *element)
{
return &((struct sandbox *)element)->pq_idx;
}

static inline void
sandbox_process_scheduler_updates(struct sandbox *sandbox)
{
Expand Down
1 change: 1 addition & 0 deletions runtime/include/sandbox_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ struct sandbox {

uint64_t remaining_exec;
uint64_t absolute_deadline;
size_t pq_idx; /* 1-based slot in its scheduling priority_queue (0 = not enqueued) */
uint64_t admissions_estimate; /* estimated execution time (cycles) * runtime_admissions_granularity / relative
deadline (cycles) */
uint64_t total_time; /* Total time from Request to Response */
Expand Down
3 changes: 3 additions & 0 deletions runtime/include/tenant.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@ struct tenant_timeout {
uint64_t timeout;
struct tenant *tenant;
struct perworker_tenant_sandbox_queue *pwt;
size_t pq_idx; /* 1-based slot in its timeout priority_queue (0 = not enqueued) */
};

struct perworker_tenant_sandbox_queue {
struct priority_queue *sandboxes;
struct tenant *tenant; // to be able to find the RB/MB/RP/RT.
struct tenant_timeout tenant_timeout;
enum MULTI_TENANCY_CLASS mt_class; // check whether the corresponding PWM has been demoted
size_t pq_idx; /* 1-based slot in the local runqueue priority_queue (0 = not enqueued) */
} __attribute__((aligned(CACHE_PAD)));

struct tenant_global_request_queue {
struct priority_queue *sandbox_requests;
struct tenant *tenant;
struct tenant_timeout tenant_timeout;
_Atomic volatile enum MULTI_TENANCY_CLASS mt_class;
size_t pq_idx; /* 1-based slot in the global scheduler PQ (0 = not enqueued) */
};

struct tenant {
Expand Down
12 changes: 10 additions & 2 deletions runtime/include/tenant_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ tenant_policy_specific_init(struct tenant *tenant, struct tenant_config *config)

for (int i = 0; i < runtime_worker_threads_count; i++) {
tenant->pwt_sandboxes[i].sandboxes = priority_queue_initialize(RUNTIME_TENANT_QUEUE_SIZE, false,
sandbox_get_priority);
sandbox_get_priority,
sandbox_get_index_ptr);
tenant->pwt_sandboxes[i].tenant = tenant;
tenant->pwt_sandboxes[i].mt_class = (tenant->replenishment_period == 0) ? MT_DEFAULT
: MT_GUARANTEED;
Expand All @@ -59,8 +60,9 @@ tenant_policy_specific_init(struct tenant *tenant, struct tenant_config *config)

/* Initialize the tenant's global request queue */
tenant->tgrq_requests = malloc(sizeof(struct tenant_global_request_queue));
/* sandbox_requests is never delete()'d from directly, so it keeps the legacy O(n) delete (NULL) */
tenant->tgrq_requests->sandbox_requests = priority_queue_initialize(RUNTIME_TENANT_QUEUE_SIZE, false,
sandbox_get_priority);
sandbox_get_priority, NULL);
tenant->tgrq_requests->tenant = tenant;
tenant->tgrq_requests->mt_class = (tenant->replenishment_period == 0) ? MT_DEFAULT : MT_GUARANTEED;
tenant->tgrq_requests->tenant_timeout.tenant = tenant;
Expand Down Expand Up @@ -173,6 +175,12 @@ tenant_timeout_get_priority(void *element)
return ((struct tenant_timeout *)element)->timeout;
}

static inline size_t *
tenant_timeout_get_index_ptr(void *element)
{
return &((struct tenant_timeout *)element)->pq_idx;
}

/**
* Compute the next timeout given a tenant's replenishment period
* @param m_replenishment_period
Expand Down
3 changes: 2 additions & 1 deletion runtime/src/global_request_scheduler_minheap.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ sandbox_get_priority_fn(void *element)
void
global_request_scheduler_minheap_initialize()
{
global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_get_priority_fn);
/* This queue is only enqueued/dequeued (never delete()'d from), so it keeps the legacy delete (NULL) */
global_request_scheduler_minheap = priority_queue_initialize(4096, true, sandbox_get_priority_fn, NULL);

struct global_request_scheduler_config config = {.add_fn = global_request_scheduler_minheap_add,
.remove_fn = global_request_scheduler_minheap_remove,
Expand Down
14 changes: 11 additions & 3 deletions runtime/src/global_request_scheduler_mtds.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ tenant_request_queue_get_priority(void *element)
return (sandbox) ? sandbox->absolute_deadline : UINT64_MAX;
}

static inline size_t *
tenant_request_queue_get_index_ptr(void *element)
{
return &((struct tenant_global_request_queue *)element)->pq_idx;
}

/**
* Demotes the given tenant's request queue, which means deletes the TGRQ from the Guaranteed queue
* and adds to the Default queue.
Expand Down Expand Up @@ -221,12 +227,14 @@ void
global_request_scheduler_mtds_initialize()
{
global_request_scheduler_mtds_guaranteed = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
tenant_request_queue_get_priority);
tenant_request_queue_get_priority,
tenant_request_queue_get_index_ptr);
global_request_scheduler_mtds_default = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
tenant_request_queue_get_priority);
tenant_request_queue_get_priority,
tenant_request_queue_get_index_ptr);

global_tenant_timeout_queue = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
tenant_timeout_get_priority);
tenant_timeout_get_priority, tenant_timeout_get_index_ptr);

lock_init(&global_lock);

Expand Down
3 changes: 2 additions & 1 deletion runtime/src/local_runqueue_minheap.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void
local_runqueue_minheap_initialize()
{
/* Initialize local state */
local_runqueue_minheap = priority_queue_initialize(RUNTIME_RUNQUEUE_SIZE, false, sandbox_get_priority);
local_runqueue_minheap = priority_queue_initialize(RUNTIME_RUNQUEUE_SIZE, false, sandbox_get_priority,
sandbox_get_index_ptr);

/* Register Function Pointers for Abstract Scheduling API */
struct local_runqueue_config config = {.add_fn = local_runqueue_minheap_add,
Expand Down
12 changes: 10 additions & 2 deletions runtime/src/local_runqueue_mtds.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ perworker_tenant_get_priority(void *element)
return (sandbox) ? sandbox->absolute_deadline : UINT64_MAX;
}

static inline size_t *
perworker_tenant_get_index_ptr(void *element)
{
return &((struct perworker_tenant_sandbox_queue *)element)->pq_idx;
}

/**
* Checks if the run queue is empty
* @returns true if empty. false otherwise
Expand Down Expand Up @@ -167,9 +173,11 @@ local_runqueue_mtds_initialize()
{
/* Initialize local state */
local_runqueue_mtds_guaranteed = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
perworker_tenant_get_priority);
perworker_tenant_get_priority,
perworker_tenant_get_index_ptr);
local_runqueue_mtds_default = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
perworker_tenant_get_priority);
perworker_tenant_get_priority,
perworker_tenant_get_index_ptr);

/* Register Function Pointers for Abstract Scheduling API */
struct local_runqueue_config config = {.add_fn = local_runqueue_mtds_add,
Expand Down
3 changes: 2 additions & 1 deletion runtime/src/worker_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ worker_thread_main(void *argument)

if (scheduler == SCHEDULER_MTDS) {
worker_thread_timeout_queue = priority_queue_initialize(RUNTIME_MAX_TENANT_COUNT, false,
tenant_timeout_get_priority);
tenant_timeout_get_priority,
tenant_timeout_get_index_ptr);
}

software_interrupt_unmask_signal(SIGFPE);
Expand Down
Loading