Skip to content

Commit 7a31b62

Browse files
authored
BF: CS-1735 qrsh doing massive output reports a commlib read error and truncates output
* BF: CS-1735 qrsh doing massive output reports a commlib read error and truncates output * reverted a performance improvement, it seems to cause issues with drmaa clients * marked disabled code with corresponding Jira-ID
1 parent 44d5659 commit 7a31b62

9 files changed

Lines changed: 171 additions & 58 deletions

File tree

source/clients/qsh/sge_client_ijs.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
*
3030
* Portions of this code are Copyright 2011 Univa Inc.
3131
*
32-
* Portions of this software are Copyright (c) 2023-2025 HPC-Gridware GmbH
32+
* Portions of this software are Copyright (c) 2024-2025 HPC-Gridware GmbH
3333
*
3434
************************************************************************/
3535
/*___INFO__MARK_END__*/
@@ -291,7 +291,7 @@ static void client_check_window_change(COMM_HANDLE *handle)
291291
*/
292292
received_window_change_signal = 0;
293293
if (ioctl(fileno(stdin), TIOCGWINSZ, &ws) >= 0) {
294-
DPRINTF("sendig WINDOW_SIZE_CTRL_MSG with new window size: %d, %d, %d, %d to shepherd\n",
294+
DPRINTF("sending WINDOW_SIZE_CTRL_MSG with new window size: %d, %d, %d, %d to shepherd\n",
295295
ws.ws_row, ws.ws_col, ws.ws_xpixel, ws.ws_ypixel);
296296

297297
snprintf(buf, sizeof(buf), "WS %d %d %d %d", ws.ws_row, ws.ws_col, ws.ws_xpixel, ws.ws_ypixel);

source/common/sge_ijs_comm.cc

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
*
3030
* Portions of this software are Copyright (c) 2011 Univa Corporation.
3131
*
32-
* Portions of this software are Copyright (c) 2023-2025 HPC-Gridware GmbH
32+
* Portions of this software are Copyright (c) 2024-2025 HPC-Gridware GmbH
3333
*
3434
************************************************************************/
3535
/*___INFO__MARK_END__*/
@@ -65,7 +65,6 @@
6565
#include "sge_ijs_comm.h"
6666

6767
extern sig_atomic_t received_signal;
68-
6968
/*
7069
* TODO: Cleanup / Headers
7170
* This is just slightly modified copy of the gdi commlib error handling,
@@ -309,6 +308,7 @@ int my_log_list_flush_list(cl_raw_list_t* list_p) {
309308
*
310309
* INPUTS
311310
* dstring *err_msg - Gets the error reason in case of error.
311+
* cl_log_func_t - a commlib logging function which will print CL_LOG messages
312312
*
313313
* RESULT
314314
* int - COMM_RETVAL_OK:
@@ -324,19 +324,20 @@ int my_log_list_flush_list(cl_raw_list_t* list_p) {
324324
* SEE ALSO
325325
* communication/comm_cleanup_lib()
326326
*******************************************************************************/
327-
int comm_init_lib(dstring *err_msg)
327+
int comm_init_lib(dstring *err_msg, cl_log_func_t commlib_log_func)
328328
{
329329
int ret, ret_val = COMM_RETVAL_OK;
330330

331331
DENTER(TOP_LAYER);
332332

333-
/*
334-
* To enable commlib logging to a file (see my_log_list_flush_list()
335-
* for the file path), exchange this line with the one below.
336-
* Caution: On some architectures, logging causes problems!
337-
*/
338-
/*ret = cl_com_setup_commlib(CL_RW_THREAD, CL_LOG_DEBUG, my_log_list_flush_list);*/
339-
ret = cl_com_setup_commlib(CL_RW_THREAD, CL_LOG_OFF, nullptr);
333+
// When we pass a logging function to see commlib logging
334+
// (in sge_shepherd, when compiled with EXTENSIVE_TRACING)
335+
// we want to see INFO logging.
336+
cl_log_type debug_level = CL_LOG_OFF;
337+
if (commlib_log_func != nullptr) {
338+
debug_level = CL_LOG_INFO;
339+
}
340+
ret = cl_com_setup_commlib(CL_RW_THREAD, debug_level, commlib_log_func);
340341
if (ret != CL_RETVAL_OK) {
341342
sge_dstring_sprintf(err_msg, cl_get_error_text(ret));
342343
DPRINTF("cl_com_setup_commlib() failed: %s (%d)\n", sge_dstring_get_string(err_msg), ret);
@@ -768,9 +769,9 @@ int comm_ignore_timeouts(bool b_ignore, dstring *err_msg)
768769

769770
cl_com_ignore_timeouts(b_ignore);
770771
if (ret != CL_RETVAL_OK) {
771-
sge_dstring_sprintf(err_msg, cl_get_error_text(ret));
772-
DPRINTF("cl_com_ignore_timeouts() failed: %s (%d)\n", sge_dstring_get_string(err_msg), ret);
773-
ret_val = COMM_CANT_SET_IGNORE_TIMEOUTS;
772+
sge_dstring_sprintf(err_msg, cl_get_error_text(ret));
773+
DPRINTF("cl_com_ignore_timeouts() failed: %s (%d)\n", sge_dstring_get_string(err_msg), ret);
774+
ret_val = COMM_CANT_SET_IGNORE_TIMEOUTS;
774775
}
775776
DRETURN(ret_val);
776777
}
@@ -1223,40 +1224,44 @@ unsigned long comm_write_message(COMM_HANDLE *handle,
12231224
*******************************************************************************/
12241225
int comm_flush_write_messages(COMM_HANDLE *handle, dstring *err_msg)
12251226
{
1226-
unsigned long elems = 0;
1227-
int ret = 0, retries = 0;
1227+
int retries = 0;
12281228

1229-
elems = cl_com_messages_in_send_queue(handle);
1229+
unsigned long elems = cl_com_messages_in_send_queue(handle);
12301230
while (elems > 0) {
12311231
/*
12321232
* Don't set the cl_commlib_trigger()-call to be blocking and
12331233
* get rid of the usleep() - it's much slower!
12341234
* The last cl_commlib_trigger()-call will take 1 s.
12351235
*/
1236-
ret = cl_commlib_trigger(handle, 0);
1236+
int trigger_ret = cl_commlib_trigger(handle, 0);
12371237
/*
12381238
* Bail out if trigger fails with an error that indicates that we
12391239
* won't be able to send the messages in the near future.
12401240
*/
1241-
if (ret != CL_RETVAL_OK &&
1242-
ret != CL_RETVAL_SELECT_TIMEOUT &&
1243-
ret != CL_RETVAL_SELECT_INTERRUPT) {
1244-
sge_dstring_sprintf(err_msg, cl_get_error_text(ret));
1245-
retries = ret;
1246-
break;
1241+
if (trigger_ret != CL_RETVAL_OK &&
1242+
trigger_ret != CL_RETVAL_SELECT_TIMEOUT &&
1243+
trigger_ret != CL_RETVAL_SELECT_INTERRUPT &&
1244+
trigger_ret != CL_RETVAL_THREADS_ENABLED) {
1245+
sge_dstring_sprintf(err_msg, cl_get_error_text(trigger_ret));
1246+
sge_dstring_sprintf_append(err_msg, " - after %d retries", retries);
1247+
return trigger_ret;
12471248
}
1249+
12481250
elems = cl_com_messages_in_send_queue(handle);
12491251
/*
12501252
* We just tried to send the messages and it wasn't possible to send
12511253
* all messages - give the network some time to recover.
1254+
* @todo CS-1739 cl_commlib_trigger() does *not* wait until all messages are sent!
1255+
* @todo Shall we have a maximum number of retries? A timeout?
1256+
* But if the qrsh client is suspended, we probably need to wait until it is unsuspended again.
12521257
*/
12531258
/* TODO (NEW): make this work correctly by calling check_client_alive */
12541259
if (elems > 0) {
12551260
usleep(10000);
1256-
retries--;
1261+
retries++;
12571262
}
12581263
}
1259-
return retries;
1264+
return -retries;
12601265
}
12611266

12621267
/****** sge_ijs_comm/comm_recv_message() **************************************
@@ -1323,7 +1328,7 @@ int comm_recv_message(COMM_HANDLE *handle, bool b_synchron,
13231328
nullptr, /* unresolved_hostname, */
13241329
nullptr, /* component_name, */
13251330
0, /* component_id, */
1326-
false,
1331+
b_synchron,
13271332
0,
13281333
&message,
13291334
&sender);
@@ -1356,7 +1361,7 @@ int comm_recv_message(COMM_HANDLE *handle, bool b_synchron,
13561361
}
13571362
}
13581363

1359-
if(sender != nullptr) {
1364+
if (sender != nullptr) {
13601365
cl_com_free_endpoint(&sender);
13611366
}
13621367

@@ -1407,6 +1412,10 @@ int comm_recv_message(COMM_HANDLE *handle, bool b_synchron,
14071412
}
14081413
}
14091414
} else {
1415+
// @todo CS-1739 do we need the cl_commlib_trigger, when we are using multi-threaded commlib?
1416+
// if b_synchron is 0, then it does essentially nothing
1417+
// otherwise it waits, until a message is available - the same which is done by cl_commlib_receive_message()
1418+
// itself
14101419
cl_commlib_trigger(handle, b_synchron);
14111420
}
14121421
DRETURN(ret_val);

source/common/sge_ijs_comm.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*
3131
* Portions of this code are Copyright 2011 Univa Inc.
3232
*
33-
* Portions of this software are Copyright (c) 2023-2024 HPC-Gridware GmbH
33+
* Portions of this software are Copyright (c) 2024-2025 HPC-Gridware GmbH
3434
*
3535
************************************************************************/
3636
/*___INFO__MARK_END__*/
@@ -96,7 +96,7 @@ typedef struct recv_message_s {
9696
} recv_message_t;
9797

9898

99-
int comm_init_lib(dstring *err_msg);
99+
int comm_init_lib(dstring *err_msg, cl_log_func_t commlib_log_func = nullptr);
100100
int comm_cleanup_lib(dstring *err_msg);
101101

102102
int comm_open_connection(bool b_server,

source/daemons/qmaster/sge_thread_listener.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*
2828
* All Rights Reserved.
2929
*
30-
* Portions of this software are Copyright (c) 2023-2025 HPC-Gridware GmbH
30+
* Portions of this software are Copyright (c) 2024-2025 HPC-Gridware GmbH
3131
*
3232
************************************************************************/
3333
/*___INFO__MARK_END__*/
@@ -113,7 +113,8 @@ sge_listener_terminate() {
113113
#if 0
114114
// signal (broadcast) the commlib handle app_condition variable
115115
// this will make it leave waiting for new messages in sge_qmaster_process_message->sge_gdi_get_any_request()
116-
// @todo that's the theory, but it doesn't work
116+
// @todo CS-982 that's the theory, but it doesn't work
117+
// => should work now with the fix of CS-1735
117118
// as it is now shutting down the listener threads takes some time (> 1 second)
118119
cl_thread_trigger_thread_condition(handle->app_condition, 1);
119120
#endif

source/daemons/shepherd/sge_shepherd_ijs.cc

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,41 @@ extern int received_signal; /* defined in shepherd.c */
126126
* SEE ALSO
127127
*******************************************************************************/
128128
#ifdef EXTENSIVE_TRACING
129+
// This function will output commlib logging to the shepherd trace file
130+
// when EXTENSIVE_TRACING is enabled.
131+
static int
132+
shepherd_log_list_flush_list(cl_raw_list_t* list_p) {
133+
134+
if (list_p == nullptr) {
135+
return CL_RETVAL_LOG_NO_LOGLIST;
136+
}
137+
138+
int ret_val;
139+
if ((ret_val = cl_raw_list_lock(list_p)) != CL_RETVAL_OK) {
140+
return ret_val;
141+
}
142+
143+
// Log all entries of the log list via shepherd_trace().
144+
cl_log_list_elem_t *elem = nullptr;
145+
while ((elem = cl_log_list_get_first_elem(list_p)) != nullptr) {
146+
if (elem->log_parameter == nullptr) {
147+
shepherd_trace("COMMLIB|%s|%s|%s",
148+
cl_thread_convert_state_id(elem->log_thread_state),
149+
cl_log_list_convert_type_id(elem->log_type),
150+
elem->log_message);
151+
} else {
152+
shepherd_trace("COMMLIB|%s|%s|%s %s",
153+
cl_thread_convert_state_id(elem->log_thread_state),
154+
cl_log_list_convert_type_id(elem->log_type),
155+
elem->log_message, elem->log_parameter);
156+
}
157+
158+
cl_log_list_del_log(list_p);
159+
}
160+
161+
return cl_raw_list_unlock(list_p);
162+
}
163+
129164
static int trace_buf(const char *buffer, int length, const char *format, ...)
130165
{
131166
int ret;
@@ -467,7 +502,17 @@ static void* pty_to_commlib(void *t_conf)
467502
shepherd_trace("pty_to_commlib: send_buf() failed -> exiting");
468503
do_exit = 1;
469504
}
470-
comm_flush_write_messages(g_comm_handle, &err_msg);
505+
int flush_ret = comm_flush_write_messages(g_comm_handle, &err_msg);
506+
if (flush_ret > 0) {
507+
// comm_flush_write_messages reported an error - log to trace file
508+
shepherd_trace("pty_to_commlib: comm_flush_write_messages() returned error %d: %s",
509+
flush_ret, sge_dstring_get_string(&err_msg));
510+
511+
} else if (flush_ret < 0) {
512+
#ifdef EXTENSIVE_TRACING
513+
shepherd_trace("pty_to_commlib: comm_flush_write_messages() did %d retries", -flush_ret);
514+
#endif
515+
}
471516
}
472517
}
473518

@@ -537,32 +582,39 @@ static void* commlib_to_pty(void *t_conf)
537582
shepherd_trace("commlib_to_pty: no valid handle for stdin available. Exiting!");
538583
}
539584

585+
// Set timeout for synchronous receiving of messages.
540586
cl_com_set_synchron_receive_timeout(g_comm_handle, 1);
541587

542588
while (do_exit == 0) {
543-
/* wait blocking for a message from commlib */
589+
// We wait synchronously (blocking) for a message from commlib, timeout is 1s.
544590
recv_mess.cl_message = nullptr;
545591
recv_mess.data = nullptr;
546592
sge_dstring_free(&err_msg);
547593
sge_dstring_sprintf(&err_msg, "");
548594

595+
#ifdef EXTENSIVE_TRACING
596+
shepherd_trace("commlib_to_pty: calling comm_recv_message() synchronously, timeout %d",
597+
g_comm_handle->synchron_receive_timeout);
598+
#endif
599+
549600
ret = comm_recv_message(g_comm_handle, true, &recv_mess, &err_msg);
550601

551-
/*
552-
* Check if the thread was cancelled. Exit thread if it was.
553-
* It shouldn't be neccessary to do the check here, as the cancel state
554-
* of the thread is 1, i.e. the thread may be cancelled at any time,
602+
#ifdef EXTENSIVE_TRACING
603+
shepherd_trace("commlib_to_pty: comm_recv_message() returned %d, err_msg: %s",
604+
ret, sge_dstring_get_string(&err_msg));
605+
#endif
606+
607+
/*
608+
* Check if the thread was canceled. Exit the thread if it was.
609+
* It shouldn't be necessary to do the check here, as the cancel state
610+
* of the thread is 1, i.e., the thread may be canceled at any time,
555611
* but this doesn't work on some architectures (Darwin, older Solaris).
556612
*/
557613
thread_testcancel(t_conf);
558614
if (g_raised_event > 0) {
559615
do_exit = 1;
560616
continue;
561617
}
562-
#ifdef EXTENSIVE_TRACING
563-
shepherd_trace("commlib_to_pty: comm_recv_message() returned %d, err_msg: %s",
564-
ret, sge_dstring_get_string(&err_msg));
565-
#endif
566618

567619
if (ret != COMM_RETVAL_OK) {
568620
/* handle error cases */
@@ -788,7 +840,12 @@ parent_loop(int job_pid, const char *childname, int timeout, ckpt_info_t *p_ckpt
788840
*/
789841
sge_dstring_sprintf(err_msg, "");
790842

843+
844+
#ifdef EXTENSIVE_TRACING
845+
ret = comm_init_lib(err_msg, shepherd_log_list_flush_list);
846+
#else
791847
ret = comm_init_lib(err_msg);
848+
#endif
792849
if (ret != COMM_RETVAL_OK) {
793850
shepherd_trace("parent: init comm lib failed: %d", ret);
794851
return 1;
@@ -935,8 +992,13 @@ parent_loop(int job_pid, const char *childname, int timeout, ckpt_info_t *p_ckpt
935992
/*
936993
* This will wake up all threads waiting for a message
937994
*/
995+
#ifdef EXTENSIVE_TRACING
996+
shepherd_trace("parent: calling cl_thread_trigger_thread_condition()");
997+
#endif
938998
cl_thread_trigger_thread_condition(g_comm_handle->app_condition, 1);
939-
999+
#ifdef EXTENSIVE_TRACING
1000+
shepherd_trace("parent: after cl_thread_trigger_thread_condition()");
1001+
#endif
9401002

9411003
close(g_p_ijs_fds->pty_master);
9421004

@@ -952,7 +1014,6 @@ parent_loop(int job_pid, const char *childname, int timeout, ckpt_info_t *p_ckpt
9521014
cl_thread_cleanup(thread_pty_to_commlib);
9531015
cl_thread_cleanup(thread_commlib_to_pty);
9541016

955-
9561017
#if 0
9571018
{
9581019
struct timeb ts;
@@ -1046,7 +1107,7 @@ int close_parent_loop(int exit_status)
10461107

10471108
sge_free(&g_hostname);
10481109
sge_dstring_free(&err_msg);
1049-
shepherd_trace("parent: leaving closinge_parent_loop()");
1110+
shepherd_trace("parent: leaving close_parent_loop()");
10501111
return 0;
10511112
}
10521113

0 commit comments

Comments
 (0)