Skip to content

Commit 1f134c3

Browse files
committed
Stable index for _fed.inbound_net_listeners
1 parent fd5c157 commit 1f134c3

2 files changed

Lines changed: 29 additions & 8 deletions

File tree

core/federated/federate.c

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1905,7 +1905,10 @@ void lf_terminate_execution(environment_t* env) {
19051905
if (_fed.number_of_inbound_p2p_connections > 0 && _fed.inbound_net_listeners != NULL) {
19061906
LF_PRINT_LOG("Waiting for %zu threads listening for incoming messages to exit.",
19071907
_fed.number_of_inbound_p2p_connections);
1908-
for (size_t i = 0; i < _fed.number_of_inbound_p2p_connections; i++) {
1908+
// inbound_net_listeners is indexed by federate ID (size NUMBER_OF_FEDERATES).
1909+
// Entries for federates that never connected remain zero-initialized; lf_thread_join
1910+
// on a zero handle returns an error which we ignore.
1911+
for (int i = 0; i < NUMBER_OF_FEDERATES; i++) {
19091912
// Ignoring errors here.
19101913
lf_thread_join(_fed.inbound_net_listeners[i], NULL);
19111914
}
@@ -2279,8 +2282,13 @@ void lf_enqueue_port_absent_reactions(environment_t* env) {
22792282
void* lf_handle_p2p_connections_from_federates(void* env_arg) {
22802283
LF_ASSERT_NON_NULL(env_arg);
22812284
size_t received_federates = 0;
2282-
// Allocate memory to store thread IDs.
2283-
_fed.inbound_net_listeners = (lf_thread_t*)calloc(_fed.number_of_inbound_p2p_connections, sizeof(lf_thread_t));
2285+
// Allocate memory to store thread IDs, indexed by federate ID.
2286+
// Using NUMBER_OF_FEDERATES (not number_of_inbound_p2p_connections) so that transient
2287+
// federates that reconnect can reuse their slot without overflowing the array.
2288+
_fed.inbound_net_listeners = (lf_thread_t*)calloc(NUMBER_OF_FEDERATES, sizeof(lf_thread_t));
2289+
// Track which federate IDs have connected at least once, to count unique connections.
2290+
bool seen_fed[NUMBER_OF_FEDERATES];
2291+
memset(seen_fed, 0, sizeof(seen_fed));
22842292
while (!_lf_termination_executed) {
22852293
// Case where all inbound connections are to persistent federates
22862294
if (received_federates == _fed.number_of_inbound_p2p_connections && _fed.number_of_inbound_p2p_transients == 0) {
@@ -2369,7 +2377,14 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
23692377
// Start a thread to listen for incoming messages from other federates.
23702378
// The fed_id is a uint16_t, which we assume can be safely cast to and from void*.
23712379
void* fed_id_arg = (void*)(uintptr_t)remote_fed_id;
2372-
int result = lf_thread_create(&_fed.inbound_net_listeners[received_federates], listen_to_federates, fed_id_arg);
2380+
// For a transient federate that is reconnecting, join its previous listener thread
2381+
// before overwriting the slot, to avoid leaking the thread handle.
2382+
if (remote_fed_is_transient && seen_fed[remote_fed_id]) {
2383+
lf_thread_join(_fed.inbound_net_listeners[remote_fed_id], NULL);
2384+
}
2385+
// Index by remote_fed_id so that reconnecting transient federates reuse the same slot
2386+
// rather than incrementing past the end of the array.
2387+
int result = lf_thread_create(&_fed.inbound_net_listeners[remote_fed_id], listen_to_federates, fed_id_arg);
23732388
if (result != 0) {
23742389
// Failed to create a listening thread.
23752390
shutdown_net(_fed.net_for_inbound_p2p_connections[remote_fed_id], false);
@@ -2379,7 +2394,11 @@ void* lf_handle_p2p_connections_from_federates(void* env_arg) {
23792394
}
23802395
LF_MUTEX_UNLOCK(&lf_outbound_net_mutex);
23812396

2382-
received_federates++;
2397+
// Count each unique federate only once (not each reconnection).
2398+
if (!seen_fed[remote_fed_id]) {
2399+
seen_fed[remote_fed_id] = true;
2400+
received_federates++;
2401+
}
23832402
}
23842403

23852404
LF_PRINT_LOG("All %zu remote federates are connected.", _fed.number_of_inbound_p2p_connections);

include/core/federated/federate.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,11 @@ typedef struct federate_instance_t {
6363
size_t number_of_inbound_p2p_transients;
6464

6565
/**
66-
* Array of thread IDs for threads that listen for incoming messages.
67-
* This is NULL if there are none and otherwise has size given by
68-
* number_of_inbound_p2p_connections.
66+
* Array of thread IDs for threads that listen for incoming messages,
67+
* indexed by federate ID. This is NULL if there are none and otherwise
68+
* has size NUMBER_OF_FEDERATES. Using federate ID as the index allows
69+
* transient federates that reconnect to reuse the same slot without
70+
* overflowing the array.
6971
*/
7072
lf_thread_t* inbound_net_listeners;
7173

0 commit comments

Comments
 (0)