Conversation
1. Implemented collection of stats of all deadlines of an LF application 2. Implemented generation of an exponential-based priority assignment function in the lf_start.c file (to be called by the runtime, not yet modified) 3. Implemented a debug-only function that writes the deadlines of an LF application to a text file
1. Implemented current thread priority set using POSIX APIs 2. Linux scheduling policy statically set to SCHED_FIFO (will need target property to change it) and single-core mode only 3. Implemented call to Platform method in the scheduler to adjust thread priority right before executing a reaction
1. Set maximum POSIX priority to TCP thread and to sleeping main thread 2. Added an exception when the runtime fails to set real-time scheduling with POSIX APIs (likely due to lack of sudo rights)
1. Implemented the selection of Linux schedulers using the "thread-policy" target property
1. Implemented the specification of core affinity for the main thread. The target property specifies the number of cores that shall be available to it, and the runtime assigns them starting from the highest index.
1. Added void implementation of priority-setting APIs for non-POSIX platforms (may be implemented in the future) 2. Fixed a bug causing maxwait not be checked when a federate has no internal triggers before stop tag but receives inputs from other federates 3. Improved debug output for STP violation errors to determine which bundle causes the violation 4. Fixed a bug causing the priority function fail on assertion with only one distinct deadline value 5. Priority function is now generated only when platform is Native 6. Priority function is now generated also with non-federated programs
1. The TCP thread has now always higher priority than the main thread, even when this is sleeping. The sleeping main thread has got the second highest priority. The range of priorities for the main thread when executing reactions is now [1,97]. 2.Fixed bug causing TCP messages not being sent immediately (Nagel's algorithm is now disabled).
1. Implemented an API call to allow reactions set the maxwait parameter of the decentralized coordination while the LF program executes.
1. Fixed a bug causing the code generator to crash when the LF program did not contain any deadlines 2. Removed an old debug function that wrote to a file the LF deadlines
1. Fixed a concurrency bug possibly causing access by TCP thread to not-yet-initialized platform object
1. Refactored the priority scheduler to comply with the object-oriented-like design style 2. Created a ThreadedPlatform structure that models hw platforms with threading capabilities 3. Created a PriorityScheduler structure that only works with ThreadedPlatforms and that executes reactions with appropriate priorities
1. Fixed a bug causing the priority function not being generated when no deadlines were specified, but the scheduler was set to Priority.
1. Changed API function name to dynamically set maxwait from lf_set_maxwait to lf_set_fed_maxwait
|
Benchmark results after merging this PR: Benchmark resultsPerformance:PingPongUc: PingPongC: ReactionLatencyUc: ReactionLatencyC: Memory usage:PingPongUc: PingPongC: ReactionLatencyUc: ReactionLatencyC: |
edwardalee
left a comment
There was a problem hiding this comment.
I've only been through part of this PR, but I found some issues, so I'm submitting a partial review in case these comments are useful. Most of the comments are insignificant... minor suggestions for documentation. But one is potentially a bug.
| interval_t global_max_wait; // The global maximum wait time for remote input ports for this federate. | ||
|
|
||
| /** | ||
| * @brief Set the global maximum wait time for remote input ports for this federate. |
There was a problem hiding this comment.
| * @brief Set the global maximum wait time for remote input ports for this federate. | |
| * @brief Set the global maximum wait time for this federate to assume remote input ports are absent. |
| * @param super The environment. | ||
| * @param max_wait The maximum wait time to be set. | ||
| * | ||
| * This function sets the global maximum wait time for remote input ports for this federate. |
There was a problem hiding this comment.
Redundant comment is not needed. @brief is sufficient.
| StartupCoordinator* startup_coordinator, ClockSynchronization* clock_sync); | ||
| void FederatedEnvironment_free(FederatedEnvironment* self); | ||
|
|
||
| #define lf_set_fed_maxwait(time) ((FederatedEnvironment *)env)->set_maxwait(env, time) |
There was a problem hiding this comment.
Since this is the user-facing function, perhaps it needs documentation?
| LF_SCHED_FAIR, // Non real-time scheduling policy. Corresponds to SCHED_OTHER | ||
| LF_SCHED_TIMESLICE, // Real-time, time-slicing priority-based policy. Corresponds to SCHED_RR. | ||
| LF_SCHED_PRIORITY, // Real-time, priority-only based scheduling. Corresponds to SCHED_FIFO. |
There was a problem hiding this comment.
| LF_SCHED_FAIR, // Non real-time scheduling policy. Corresponds to SCHED_OTHER | |
| LF_SCHED_TIMESLICE, // Real-time, time-slicing priority-based policy. Corresponds to SCHED_RR. | |
| LF_SCHED_PRIORITY, // Real-time, priority-only based scheduling. Corresponds to SCHED_FIFO. | |
| /** Non real-time scheduling policy. Corresponds to SCHED_OTHER in Linux. */ | |
| LF_SCHED_FAIR, | |
| /** Real-time, time-slicing priority-based policy. Corresponds to SCHED_RR in Linux. */ | |
| LF_SCHED_TIMESLICE, | |
| /** Real-time, priority-only based scheduling. Corresponds to SCHED_FIFO in Linux. */ | |
| LF_SCHED_PRIORITY |
|
|
||
|
|
||
| /** | ||
| * @brief The number of cores of the hardware platform to use. |
There was a problem hiding this comment.
What does the default 0 mean? The default and its meaning should be documented.
| abstract fun keepAlive(): Boolean | ||
|
|
||
| /** | ||
| * Collects all deadlines from federates, including nested reactors. |
There was a problem hiding this comment.
| * Collects all deadlines from federates, including nested reactors. | |
| * Collect all deadlines from federates, including nested reactors. |
| * to find all reactions with deadlines. | ||
| * | ||
| * @param federates List of federates to collect deadlines from | ||
| * @return List of all deadlines found in the federates and their nested reactors |
There was a problem hiding this comment.
Would it make more sense for the return value to be a Set rather than a List? A list has semantics to the ordering and has duplicate entries. Same for all related methods below.
| var delayExpr = instance!!.parameters.find { p -> p.lhs.name == parameterName }!!.rhs.expr | ||
| // Being a deadline, it must be a Time value | ||
| if (delayExpr is org.lflang.lf.Time) { | ||
| ASTUtils.toTimeValue(delayExpr).toNanoSeconds() | ||
| } else { | ||
| 0 | ||
| } |
There was a problem hiding this comment.
This doesn't look quite right to me. If delay is a parameter reference that is not overridden in the instantiation, then this will return 0 because delayExpr will be null. But the right return value is the default value of the parameter, which can be obtained using getDefaultAsTimeValue.
Also, rhs.expr could itself be a ParameterReference, in which case, the parameter referred to belongs to the parent Instantiation. I'm not sure how to get that parent Instantiation, however. I don't think it's even possible given the arguments. I think that instead of the instance argument, this needs to be an instances argument with type List<Instantiation>. This list needs to include all the Instantiations that contain this Instantiation. That list would need to be constructed in the collectDeadlinesFromNestedReactors below, which does the recursive visiting of instantiations.
Also, this looks to me like a function that should be provided in ASTUtils. I'm kind of surprised it isn't. I guess that is because there is an implementation of the same functionality in ReactorInstance.getTimeValue(Expression). reactor-uc is not using ReactorInstance or any of that infrastructure, however.
| * @return List of deadlines found in this reactor | ||
| */ | ||
| fun collectDeadlinesFromReactor(reactor: Reactor, instance: Instantiation?): List<Long> { | ||
| val deadlines = mutableListOf<Long>() |
There was a problem hiding this comment.
Should this be a Set rather than a list?
| * Returns stats on the deadlines of all federates (or the main reactor only if non federated) | ||
| * in the form of a list of three double values provided in this order: | ||
| * 1. the minimum deadline | ||
| * 2. the median deadline |
There was a problem hiding this comment.
I guess if you need the median deadline, then collecting the deadlines as a Set won't work. You need to know how many times a deadline appears to get the median.
1. Fixed bug that made the code generator crash when reaction deadlines were parametrized and the default value was used 2. Fixed bug that did not allow the code generator to collect deadlines of reactions that were parametrized with a parameter defined in the parent reactor
1. Implemented exception handling when user does not provide a number for the cores target property 2. Added and edited documentation for functions 3. Renamed enum types for thread scheduler property 4. Removed leading underscore from the name of functions that have become part of the public interface 5. Implemented exception handling when user does not provide a valid thread policy for the thread policy target property
|
@edwardalee I fixed the bug with nested parameters and implemented most of your minor fixes. Thanks for your review! |
There was a problem hiding this comment.
Firstly: Very Good Work! 🎉 👍 🔧
I left a lot of comments (sorry :D)
Couple of more big picture comments (it's okay if you hate me XD):
-
I would like to remove target properties from uC => turn them into annotations, this would allow us to have federates running on embedded (zephyr/riot) talking to a LinuxRT federate.
-
The code generator is right now is the ugliest part of uC: Put the Priority stuff into a
PriortyScheduler.ktand lets add a good integration -
I would recommend we add a
PosixRTplatform that implements theThreadedPlatform, we still want that @edwardalee can run uC on his Macbook where there is no LinuxRT scheduler :D
|
|
||
| StartupCoordinator* startup_coordinator; // A pointer to the startup coordinator, if the program has one. | ||
| ClockSynchronization* clock_sync; // A pointer to the clock synchronization module, if the program has one. | ||
| interval_t global_max_wait; // The global maximum wait time for remote input ports for this federate. |
There was a problem hiding this comment.
Why do we need the max_wait here? Because the max_wait is normally stored inside FederatedInputConnections and can be configured per (Bundle/Connection)
|
|
||
| typedef struct { | ||
| Platform super; | ||
| ThreadedPlatform super; |
There was a problem hiding this comment.
This would require that all POSIX platforms are RT. I don't think this is wise, for example this won't work on MacOS. I would say we create posix and a posixRT platform.
| #define PLATFORM_T PlatformPosix | ||
| #define MUTEX_T MutexPosix | ||
|
|
||
| lf_ret_t PlatformPosix_set_thread_priority(interval_t rel_deadline); |
There was a problem hiding this comment.
You dont need to declare the functions here, because the functions are only called through the function pointers stored in the objects. (Declaration is not needed)
|
|
||
| bool _Scheduler_check_and_handle_stp_violations(DynamicScheduler *self, Reaction *reaction); | ||
| bool _Scheduler_check_and_handle_deadline_violations(DynamicScheduler *self, Reaction *reaction); | ||
| void Scheduler_pop_system_events_and_handle(Scheduler *untyped_self, tag_t next_tag); |
There was a problem hiding this comment.
Yes the convention is <Class Name>_<method_name>
| EventQueue* system_event_queue, ReactionQueue* reaction_queue, interval_t duration, | ||
| bool keep_alive); | ||
|
|
||
| bool Scheduler_check_and_handle_stp_violations(DynamicScheduler *self, Reaction *reaction); |
There was a problem hiding this comment.
same here you can remove this.
| * @return true if a violation was detected and handled, false otherwise. | ||
| */ | ||
| static bool _Scheduler_check_and_handle_stp_violations(DynamicScheduler* self, Reaction* reaction) { | ||
| bool Scheduler_check_and_handle_stp_violations(DynamicScheduler* self, Reaction* reaction) { |
There was a problem hiding this comment.
the _ before the function name made clear that these functions where internal to this file.
| } | ||
| } | ||
|
|
||
| LF_WARN(SCHED, "Event from %s trying to schedule at tag " PRINTF_TAG " past stop tag " PRINTF_TAG, |
There was a problem hiding this comment.
For what do we need the FederatedInputConnection name? Except printing it :D
| const char* federate_info = "unknown source"; | ||
| if (event->trigger && event->trigger->parent) { | ||
| // Check if this is a federated input connection | ||
| if (event->trigger->type == TRIG_CONN_FEDERATED_INPUT) { |
There was a problem hiding this comment.
and maybe put into an internal function for reuse ;D
| } | ||
| } | ||
|
|
||
| validate(self->level_size[reaction->level] < (int)self->capacity); |
| super->super.run = PriorityScheduler_run; | ||
|
|
||
| Mutex_ctor(&super->mutex.super); | ||
| } |
There was a problem hiding this comment.
we made some changes in #288 to the scheduler especially when it comes to PollingNetwork channels.
This draft pull request is the work @edwardalee and I have been doing to introduce priority scheduling of reactions with deadline for federated applications with the reactor-uc runtime. It is still a draft, because testing is incomplete and the syntax for some customization options is to be discussed (more on this in a dedicated section). However, I am creating this pull request to get feedback on the priority assignment rule and the scheduler design that would certainly help improve the scheduler. Feel free to comment with thoughts and suggestions!
Introduction
The reactor-uc runtime features a single OS thread that schedules the reactions in the reaction queue. In federated applications, each federate has its own runtime and its own thread. When the federates run on the same machine, the threads compete for the CPUs of the platform, and a scheduling algorithm handles contention.
Federates may have different urgency of execution depending on what reaction they are executing at a given time. In fact, reactions can be assigned with a deadline. To guarantee that deadlines are met, a real-time thread scheduler is needed to prioritize the completion of reactions with more urgent deadlines.
This PR implements a priority-based scheduler in reactor-uc that assigns a priority value to a reaction as a function of its deadline. The rule is based on the deadline monotonic (DM) principle, but extended to support platforms like Linux that feature a finite number of assignable priority values.
Scheduler design
Control flow
The main thread of each federate serves one reaction at a time. The thread picks the first ready reaction in the reaction queue and sets its priority to a value that depends on the deadline of the picked reaction. The OS thread scheduler will use that priority value to schedule the thread together with the threads of the other federates on the same machine.
Once a federate is done with the reactions in the reaction queue and waits until its physical time catches up with the next tag, it sets its priority to a value that is higher than any possible main thread executing reactions. This is needed to handle the case of asynchronous events (e.g., physical connections) and guarantees that the thread is woken up as soon as the event arrives.
Federates rely on TCP connections to communicate, and the connections are handled by the TCP thread. Similarly to the previous case, the priority of the TCP thread is higher than any sleeping thread (and thus, any thread executing reactions), to be sure that the messages are received without delays. However, this might be a problem with large messages.
Priority function
The priority value for a thread executing a reaction with deadline$d$ is determined by the priority function $prio(d)$ , which maps the deadline range [$d_{min}, d_{max}$] to the priority range [${prio}{min}, {prio}{max}$]. $d_{min}$ and $d_{max}$ are the minimum and maximum deadlines found in a given federated application, while ${prio}{min}$ and ${prio}{max}$ are the minimum and maximum scheduling priorities that can be assigned to threads scheduled with a given OS thread scheduler. The function is decreasing exponential to subsume the deadline monotonic rule: tighter deadlines are mapped to higher priority values. This function is tailored to the specific federated application and generated by the code generator.
The analytical expression of the priority function is:
Since the priority space is finite, collisions may occur: two very close deadline values may be mapped to the same priority value. The priority function allows to control in what part of the deadline range collisions are more likely to occur by tuning the$\alpha$ parameter. A big $\alpha$ value increases the speed of decay of the function, which means that most of the priority space is used to map small deadlines. This is useful when most of the deadlines of the federated application are tight and we do not want collisions to occur with these. Collisions occurring with looser deadlines are likely to cause less harm. A small $\alpha$ value decreases the speed of decay, and the priority function performs a priority mapping that tends to a linear conversion. $\alpha$ is automatically selected in the range [$\alpha_{min}$, $\alpha_{max}$ ] (empirically derived) by the code generator as a function of the median deadline of the federated application, to make the priority function better fit its deadline distribution.
Implementation details
The priority scheduler is a sublcass of the dynamic scheduler and is called
PriorityScheduler. It only works with platforms having threading capabilities that are modeled by theThreadedPlatformclass.The only supported platform at the moment is Linux. Both
SCHED_FIFOandSCHED_RRcan be selected as underlying thread scheduling algorithms. The POSIX thread API is used to control the thread scheduler.Federate's main threads can be assigned with priority values in the range [2, 97]. 1 is reserved to reactions with no deadline; 98 is the sleeping thread's priority; 99 is the TCP thread's priority.
The
PriorityScheduler_run_timestepfunction sets the priority of the current thread right before executing the reaction body by calling the priority function.Example
Tasks.lf.txt
Attached is a simple example of a federated real-time application in which deadline violations occur when it is not executed with the priority scheduler. It is made of two periodic tasks both triggered by a 50 ms timer, one of which has a tighter deadline and a longer execution time than the other. The more urgent task starts at a slightly greater logical time, which means that when its timer fires, the loose-deadline task is already executing. This test case shows the ability of the priority scheduler to preempt the loose-deadline task to execute the more urgent one. With the default scheduler, preemption does not happen and the urgent task misses its deadline.
Future work
Currently, the priority scheduler is enabled when either
SCHED_FIFOorSCHED_RRare selected as Linux scheduling policy. The selection is done with the thread-policy target property that takes as values "rt-fifo", "rt-rr" and "normal" (for the default non-real-time Linux scheduler). Similarly, the cores target property restricts the number of cores that the application can use. In the future, these two target properties will become configuration properties specified in the main reactor with the '@' sign. The most appropriate syntax is yet to be defined.A more thorough testing is in the works to make sure all corner cases are handled correctly.