2525 */
2626
2727#include "rti_remote.h"
28+ #include "clock.h" // For lf_clock_cond_timedwait()
2829#include "net_util.h"
2930#include <string.h>
30- #include "clock.h" // For lf_clock_cond_timedwait()
3131
3232// Global variables defined in tag.c:
3333extern instant_t start_time ;
@@ -227,8 +227,8 @@ static int get_num_absent_upstream_transients(federate_info_t* fed) {
227227}
228228
229229/**
230- * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the RTI,
231- * telling it that the specified `upstream` federate is also now connected.
230+ * @brief Send MSG_TYPE_UPSTREAM_CONNECTED to the specified `destination` if it is connected to the
231+ * RTI, telling it that the specified `upstream` federate is also now connected.
232232 *
233233 * This function assumes that the mutex lock is already held.
234234 * @param destination The destination federate.
@@ -651,9 +651,9 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
651651 // issue a TAG before this message has been forwarded.
652652 LF_MUTEX_LOCK (& rti_mutex );
653653
654- // If the destination federate is no longer connected, or it is a transient that has not started executing yet
655- // (the delayed intended tag is less than the effective start tag of the destination), issue a warning, remove the
656- // message from the socket, and return.
654+ // If the destination federate is no longer connected, or it is a transient that has not started
655+ // executing yet (the delayed intended tag is less than the effective start tag of the
656+ // destination), issue a warning, remove the message from the socket, and return.
657657 federate_info_t * fed = GET_FED_INFO (federate_id );
658658 interval_t delay = NEVER ;
659659 for (int i = 0 ; i < fed -> enclave .num_immediate_upstreams ; i ++ ) {
@@ -771,7 +771,8 @@ void handle_next_event_tag(federate_info_t* fed) {
771771 // Acquire a mutex lock to ensure that this state does not change while a
772772 // message is in transport or being used to determine a TAG.
773773 LF_MUTEX_LOCK (& rti_mutex ); // FIXME: Instead of using a mutex, it might be more efficient to use a
774- // select() mechanism to read and process federates' buffers in an orderly fashion.
774+ // select() mechanism to read and process federates' buffers in an
775+ // orderly fashion.
775776
776777 tag_t intended_tag = extract_tag (buffer );
777778 if (rti_remote -> base .tracing_enabled ) {
@@ -1004,7 +1005,7 @@ void handle_address_query(uint16_t fed_id) {
10041005 // outbound_transients array (if not already present).
10051006 if (remote_is_transient ) {
10061007 LF_MUTEX_LOCK (& rti_mutex );
1007- bool already_registered = false;
1008+ bool already_registered = false;
10081009 int32_t i = 0 ;
10091010 for (; i < fed -> number_of_outbound_transients ; i ++ ) {
10101011 if (fed -> outbound_transients [i ] == (int32_t )remote_fed_id ) {
@@ -1076,17 +1077,18 @@ void handle_address_ad(uint16_t federate_id) {
10761077}
10771078
10781079/**
1079- * @brief Send the global federation start time and the federate-specific starting tag to the specified federate.
1080+ * @brief Send the global federation start time and the federate-specific starting tag to the
1081+ * specified federate.
10801082 *
1081- * For persistent federates and transient federates that happen to join during federation startup, the
1082- * `federation_start_time` will match the time in the `federate_start_tag`, and the microstep will be 0.
1083- * For a transient federate that joins later, the time in the `federate_start_tag` will be greater than the
1084- * federation_start_time`.
1083+ * For persistent federates and transient federates that happen to join during federation startup,
1084+ * the `federation_start_time` will match the time in the `federate_start_tag`, and the microstep
1085+ * will be 0. For a transient federate that joins later, the time in the `federate_start_tag` will
1086+ * be greater than the federation_start_time`.
10851087 *
10861088 *
1087- * Before sending the start time and tag, this function notifies my_fed of all upstream transient federates that are
1088- * connected. After sending the start time and tag, and if my_fed is transient, notify federates downstream of its
1089- * connection, ensuring proper handling of zero-delay cycles.
1089+ * Before sending the start time and tag, this function notifies my_fed of all upstream transient
1090+ * federates that are connected. After sending the start time and tag, and if my_fed is transient,
1091+ * notify federates downstream of its connection, ensuring proper handling of zero-delay cycles.
10901092 *
10911093 * This function assumes that the mutex lock is already held.
10921094 *
@@ -1096,16 +1098,16 @@ void handle_address_ad(uint16_t federate_id) {
10961098 */
10971099static void send_start_tag_locked (federate_info_t * my_fed , instant_t federation_start_time , tag_t federate_start_tag ) {
10981100 // Notify my_fed of any upstream transient federates that are connected.
1099- // This has to occur before sending the start tag so that my_fed does not begin executing thinking that these
1100- // upstream federates are not connected.
1101+ // This has to occur before sending the start tag so that my_fed does not begin executing thinking
1102+ // that these upstream federates are not connected.
11011103 for (int i = 0 ; i < my_fed -> enclave .num_immediate_upstreams ; i ++ ) {
11021104 federate_info_t * fed = GET_FED_INFO (my_fed -> enclave .immediate_upstreams [i ]);
11031105 if (fed -> is_transient && fed -> enclave .state == GRANTED ) {
11041106 send_upstream_connected_locked (my_fed , fed );
11051107 }
11061108 }
11071109 send_outbound_connected_locked (my_fed );
1108-
1110+
11091111 // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START
11101112 // message.
11111113 // If it is a persistent federate, only the start_time is sent. If, however, it is a transient
@@ -1266,7 +1268,8 @@ void handle_timestamp(federate_info_t* my_fed) {
12661268 // effective_start_time of the federate, cancel it.
12671269 // FIXME: Should this be higher-than or equal to?
12681270 // FIXME: Also, won't the grant simply be lost?
1269- // If the joining federate doesn't send anything, the downstream federate won't issue another NET.
1271+ // If the joining federate doesn't send anything, the downstream federate won't issue another
1272+ // NET.
12701273 for (int j = 0 ; j < my_fed -> enclave .num_immediate_downstreams ; j ++ ) {
12711274 federate_info_t * downstream = GET_FED_INFO (my_fed -> enclave .immediate_downstreams [j ]);
12721275
@@ -1467,7 +1470,8 @@ static void handle_federate_failed(federate_info_t* my_fed) {
14671470 // Check downstream federates to see whether they should now be granted a TAG.
14681471 // To handle cycles, need to create a boolean array to keep
14691472 // track of which upstream federates have been visited.
1470- bool * visited = (bool * )calloc (rti_remote -> base .number_of_scheduling_nodes , sizeof (bool )); // Initializes to 0.
1473+ bool * visited = (bool * )calloc (rti_remote -> base .number_of_scheduling_nodes ,
1474+ sizeof (bool )); // Initializes to 0.
14711475 notify_downstream_advance_grant_if_safe (& (my_fed -> enclave ), visited );
14721476 free (visited );
14731477
@@ -1508,7 +1512,8 @@ static void handle_federate_resign(federate_info_t* my_fed) {
15081512 // Check downstream federates to see whether they should now be granted a TAG.
15091513 // To handle cycles, need to create a boolean array to keep
15101514 // track of which upstream federates have been visited.
1511- bool * visited = (bool * )calloc (rti_remote -> base .number_of_scheduling_nodes , sizeof (bool )); // Initializes to 0.
1515+ bool * visited = (bool * )calloc (rti_remote -> base .number_of_scheduling_nodes ,
1516+ sizeof (bool )); // Initializes to 0.
15121517 notify_downstream_advance_grant_if_safe (& (my_fed -> enclave ), visited );
15131518 free (visited );
15141519
@@ -1775,7 +1780,8 @@ static int32_t receive_and_check_fed_id_message(int* socket_id) {
17751780
17761781 // The MSG_TYPE_FED_IDS message has the right federation ID.
17771782
1778- // Get the peer address from the connected socket_id. Then assign it as the federate's socket server.
1783+ // Get the peer address from the connected socket_id. Then assign it as the federate's socket
1784+ // server.
17791785 struct sockaddr_in peer_addr ;
17801786 socklen_t addr_len = sizeof (peer_addr );
17811787 if (getpeername (* socket_id , (struct sockaddr * )& peer_addr , & addr_len ) != 0 ) {
@@ -2191,10 +2197,11 @@ void send_stop(federate_info_t* fed) {
21912197}
21922198
21932199void * lf_connect_to_transient_federates_thread (void * nothing ) {
2194- // This loop will continue to accept connections of transient federates, as soon as there is room, or enable hot swap
2200+ // This loop will continue to accept connections of transient federates, as soon as there is room,
2201+ // or enable hot swap
21952202 while (!rti_remote -> all_persistent_federates_exited ) {
2196- // Continue waiting for an incoming connection requests from transients to join, or for hot swap.
2197- // Wait for an incoming connection request.
2203+ // Continue waiting for an incoming connection requests from transients to join, or for hot
2204+ // swap. Wait for an incoming connection request.
21982205 int socket_id = accept_socket (rti_remote -> socket_descriptor_TCP , -1 );
21992206
22002207 // If accept failed (e.g., socket was shut down), exit the loop.
@@ -2410,8 +2417,8 @@ void reset_transient_federate(federate_info_t* fed) {
24102417 for (int32_t i = 0 ; i < num_transients ; i ++ ) {
24112418 fed -> outbound_transients [i ] = -1 ;
24122419 }
2413- // Whenver a transient resigns or leaves, invalidate all federates, so that all min_delays_upstream
2414- // get re-computed.
2420+ // Whenver a transient resigns or leaves, invalidate all federates, so that all
2421+ // min_delays_upstream get re-computed.
24152422 // FIXME: Maybe optimize it to only invalidate those affected by the transient
24162423 invalidate_min_delays ();
24172424}
0 commit comments