@@ -193,17 +193,6 @@ struct ConnHashTable
193193 (a)->srcPid == (b)->srcPid && \
194194 (a)->dstPid == (b)->dstPid && (a)->icId == (b)->icId))
195195
196-
197- /*
198- * Cursor IC table definition.
199- *
200- * For cursor case, there may be several concurrent interconnect
201- * instances on QD. The table is used to track the status of the
202- * instances, which is quite useful for "ACK the past and NAK the future" paradigm.
203- *
204- */
205- #define CURSOR_IC_TABLE_SIZE (128)
206-
207196/*
208197 * CursorICHistoryEntry
209198 *
@@ -236,8 +225,9 @@ struct CursorICHistoryEntry
236225typedef struct CursorICHistoryTable CursorICHistoryTable ;
237226struct CursorICHistoryTable
238227{
228+ uint32 size ;
239229 uint32 count ;
240- CursorICHistoryEntry * table [ CURSOR_IC_TABLE_SIZE ] ;
230+ CursorICHistoryEntry * * table ;
241231};
242232
243233/*
@@ -287,6 +277,13 @@ struct ReceiveControlInfo
287277
288278 /* Cursor history table. */
289279 CursorICHistoryTable cursorHistoryTable ;
280+
281+ /*
282+ * Last distributed transaction id when SetupUDPInterconnect is called.
283+ * Coupled with cursorHistoryTable, it is used to handle multiple
284+ * concurrent cursor cases.
285+ */
286+ DistributedTransactionId lastDXatId ;
290287};
291288
292289/*
@@ -925,8 +922,13 @@ dumpTransProtoStats()
925922static void
926923initCursorICHistoryTable (CursorICHistoryTable * t )
927924{
925+ MemoryContext old ;
928926 t -> count = 0 ;
929- memset (t -> table , 0 , sizeof (t -> table ));
927+ t -> size = Gp_interconnect_cursor_ic_table_size ;
928+
929+ old = MemoryContextSwitchTo (ic_control_info .memContext );
930+ t -> table = palloc0 (sizeof (struct CursorICHistoryEntry * ) * t -> size );
931+ MemoryContextSwitchTo (old );
930932}
931933
932934/*
@@ -938,7 +940,7 @@ addCursorIcEntry(CursorICHistoryTable *t, uint32 icId, uint32 cid)
938940{
939941 MemoryContext old ;
940942 CursorICHistoryEntry * p ;
941- uint32 index = icId % CURSOR_IC_TABLE_SIZE ;
943+ uint32 index = icId % t -> size ;
942944
943945 old = MemoryContextSwitchTo (ic_control_info .memContext );
944946 p = palloc0 (sizeof (struct CursorICHistoryEntry ));
@@ -968,7 +970,7 @@ static void
968970updateCursorIcEntry (CursorICHistoryTable * t , uint32 icId , uint8 status )
969971{
970972 struct CursorICHistoryEntry * p ;
971- uint8 index = icId % CURSOR_IC_TABLE_SIZE ;
973+ uint8 index = icId % t -> size ;
972974
973975 for (p = t -> table [index ]; p ; p = p -> next )
974976 {
@@ -989,7 +991,7 @@ static CursorICHistoryEntry *
989991getCursorIcEntry (CursorICHistoryTable * t , uint32 icId )
990992{
991993 struct CursorICHistoryEntry * p ;
992- uint8 index = icId % CURSOR_IC_TABLE_SIZE ;
994+ uint8 index = icId % t -> size ;
993995
994996 for (p = t -> table [index ]; p ; p = p -> next )
995997 {
@@ -1011,7 +1013,7 @@ pruneCursorIcEntry(CursorICHistoryTable *t, uint32 icId)
10111013{
10121014 uint8 index ;
10131015
1014- for (index = 0 ; index < CURSOR_IC_TABLE_SIZE ; index ++ )
1016+ for (index = 0 ; index < t -> size ; index ++ )
10151017 {
10161018 struct CursorICHistoryEntry * p ,
10171019 * q ;
@@ -1060,7 +1062,7 @@ purgeCursorIcEntry(CursorICHistoryTable *t)
10601062{
10611063 uint8 index ;
10621064
1063- for (index = 0 ; index < CURSOR_IC_TABLE_SIZE ; index ++ )
1065+ for (index = 0 ; index < t -> size ; index ++ )
10641066 {
10651067 struct CursorICHistoryEntry * trash ;
10661068
@@ -1435,6 +1437,7 @@ InitMotionUDPIFC(int *listenerSocketFd, int32 *listenerPort)
14351437
14361438 /* allocate a buffer for sending disorder messages */
14371439 rx_control_info .disorderBuffer = palloc0 (MIN_PACKET_SIZE );
1440+ rx_control_info .lastDXatId = InvalidTransactionId ;
14381441 rx_control_info .lastTornIcId = 0 ;
14391442 initCursorICHistoryTable (& rx_control_info .cursorHistoryTable );
14401443
@@ -3053,34 +3056,66 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
30533056 set_test_mode ();
30543057#endif
30553058
3059+ /* Prune the QD's history table if it is too large */
30563060 if (Gp_role == GP_ROLE_DISPATCH )
30573061 {
3058- /*
3059- * Prune the history table if it is too large
3060- *
3061- * We only keep history of constant length so that
3062- * - The history table takes only constant amount of memory.
3063- * - It is long enough so that it is almost impossible to receive
3064- * packets from an IC instance that is older than the first one
3065- * in the history.
3066- */
3067- if (rx_control_info .cursorHistoryTable .count > (2 * CURSOR_IC_TABLE_SIZE ))
3068- {
3069- uint32 prune_id = sliceTable -> ic_instance_id - CURSOR_IC_TABLE_SIZE ;
3062+ CursorICHistoryTable * ich_table = & rx_control_info .cursorHistoryTable ;
3063+ DistributedTransactionId distTransId = getDistributedTransactionId ();
30703064
3071- /*
3072- * Only prune if we didn't underflow -- also we want the prune id
3073- * to be newer than the limit (hysteresis)
3065+ if (ich_table -> count > (2 * ich_table -> size ))
3066+ {
3067+ /*
3068+ * distTransId != lastDXatId
3069+ * Means the last transaction is finished, it's ok to make a prune.
30743070 */
3075- if (prune_id < sliceTable -> ic_instance_id )
3071+ if (distTransId != rx_control_info . lastDXatId )
30763072 {
30773073 if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG )
3078- elog (DEBUG1 , "prune cursor history table (count %d), icid %d" , rx_control_info .cursorHistoryTable .count , sliceTable -> ic_instance_id );
3079- pruneCursorIcEntry (& rx_control_info .cursorHistoryTable , prune_id );
3074+ elog (DEBUG1 , "prune cursor history table (count %d), icid %d, prune_id %d" ,
3075+ ich_table -> count , sliceTable -> ic_instance_id , sliceTable -> ic_instance_id );
3076+ pruneCursorIcEntry (ich_table , sliceTable -> ic_instance_id );
3077+ }
3078+ /*
3079+ * distTransId == lastDXatId and they are not InvalidTransactionId(0)
3080+ * Means current (non Read-Only) transaction isn't finished, should not prune.
3081+ */
3082+ else if (rx_control_info .lastDXatId != InvalidTransactionId )
3083+ {
3084+ ;
3085+ }
3086+ /*
3087+ * distTransId == lastDXatId and they are InvalidTransactionId(0)
3088+ * Means they are the same transaction or different Read-Only transactions.
3089+ *
3090+ * For the latter, it's hard to get a perfect timepoint to prune: prune eagerly may
3091+ * cause problems (pruned current Txn's Ic instances), but prune in low frequency
3092+ * causes memory leak.
3093+ *
3094+ * So, we choose a simple algorithm to prune it here. And if it mistakenly prune out
3095+ * the still-in-used Ic instance (with lower id), the query may hang forever.
3096+ * Then user have to set a bigger gp_interconnect_cursor_ic_table_size value and
3097+ * try the query again, it is a workaround.
3098+ *
3099+ * More backgrounds please see: https://github.com/greenplum-db/gpdb/pull/16458
3100+ */
3101+ else
3102+ {
3103+ if (sliceTable -> ic_instance_id > ich_table -> size )
3104+ {
3105+ uint32 prune_id = sliceTable -> ic_instance_id - ich_table -> size ;
3106+ Assert (prune_id < sliceTable -> ic_instance_id );
3107+
3108+ if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG )
3109+ elog (DEBUG1 , "prune cursor history table (count %d), icid %d, prune_id %d" ,
3110+ ich_table -> count , sliceTable -> ic_instance_id , prune_id );
3111+ pruneCursorIcEntry (ich_table , prune_id );
3112+ }
30803113 }
30813114 }
30823115
3083- addCursorIcEntry (& rx_control_info .cursorHistoryTable , sliceTable -> ic_instance_id , gp_command_count );
3116+ addCursorIcEntry (ich_table , sliceTable -> ic_instance_id , gp_command_count );
3117+ /* save the latest transaction id */
3118+ rx_control_info .lastDXatId = distTransId ;
30843119 }
30853120
30863121 /* now we'll do some setup for each of our Receiving Motion Nodes. */
0 commit comments