Skip to content

Commit be74f23

Browse files
committed
Gracefully handle OOM when cancelling a query (#1804)
Previosly, if a query fails under resource group manager due to a lack of memory, the query is cancelled due to an error. The coordinator would still try to receive tuples from segments upon cancellation. Retrieving tuples requires more memory allocations, which may cause the same OOM error again. The query will get cancelled due to an error, entering into a recursion and leading to an eventual crash. This patch implements proper query cancellation by explicitly ignoring any libpq data from segments without extra memory allocations. Ticket: ADBDEV-7690
1 parent 9c4f32b commit be74f23

6 files changed

Lines changed: 243 additions & 25 deletions

File tree

src/backend/cdb/dispatcher/cdbdisp_async.c

Lines changed: 103 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ static bool processResults(CdbDispatchResult *dispatchResult);
135135
static void
136136
signalQEs(CdbDispatchCmdAsync *pParms);
137137

138+
static void
139+
signalQE(CdbDispatchResult *dispatchResult, DispatchWaitMode waitMode);
140+
138141
static void
139142
checkSegmentAlive(CdbDispatchCmdAsync *pParms);
140143

@@ -822,6 +825,60 @@ handlePollError(CdbDispatchCmdAsync *pParms)
822825
return;
823826
}
824827

828+
/*
829+
* Try to completely discard results from a dispatchResult's connection without
830+
* extra memory allocations by abusing libpq state-machine hacks.
831+
*/
832+
static void
833+
tryToWipeResults(CdbDispatchResult *dispatchResult)
834+
{
835+
PGresult *res;
836+
PGnotify *notify;
837+
PGconn *conn = dispatchResult->segdbDesc->conn;
838+
839+
/* Free some memory and replace current result with a fatal error dummy. */
840+
pqSaveErrorResult(conn);
841+
842+
/* Make sure PQgetResult() calls are not blocking. */
843+
PQconsumeInput(conn);
844+
845+
/*
846+
* Discard anything that is unread. Since our result contains a fatal
847+
* error, we'll just consume the entire message without actually parsing
848+
* it.
849+
*/
850+
while (!PQisBusy(conn) && (res = PQgetResult(conn)) != NULL)
851+
{
852+
switch (PQresultStatus(res))
853+
{
854+
case PGRES_COPY_IN:
855+
case PGRES_COPY_OUT:
856+
case PGRES_COPY_BOTH:
857+
PQendcopy(conn);
858+
/* fallthrough */
859+
default:
860+
PQclear(res);
861+
}
862+
863+
pqSaveErrorResult(conn);
864+
}
865+
866+
/* Free notices. */
867+
while ((notify = PQnotifies(conn)) != NULL)
868+
PQfreemem(notify);
869+
870+
/* The result is not needed anymore. */
871+
pqClearAsyncResult(conn);
872+
873+
if (PQisBusy(conn) && PQstatus(conn) != CONNECTION_BAD)
874+
{
875+
/* Some work is still remaining until we can die. */
876+
return;
877+
}
878+
879+
dispatchResult->stillRunning = false;
880+
}
881+
825882
/*
826883
* Receive and process results from QEs.
827884
*/
@@ -868,6 +925,24 @@ handlePollSuccess(CdbDispatchCmdAsync *pParms,
868925
ELOG_DISPATCHER_DEBUG("PQsocket says there are results from %d of %d (%s)",
869926
i + 1, pParms->dispatchCount, segdbDesc->whoami);
870927

928+
929+
/*
930+
* Was dispatchCancel() the callee? We don't need to read the results then.
931+
*/
932+
if (pParms->waitMode == DISPATCH_WAIT_CANCEL)
933+
{
934+
/*
935+
* If we're cancelling the transaction due to an OOM, there
936+
* might not be enough memory to discard the result properly.
937+
* Let's get the big guns out.
938+
*/
939+
tryToWipeResults(dispatchResult);
940+
941+
forwardQENotices();
942+
943+
continue;
944+
}
945+
871946
/*
872947
* Receive and process results from this QE.
873948
*/
@@ -914,38 +989,41 @@ static void
914989
signalQEs(CdbDispatchCmdAsync *pParms)
915990
{
916991
int i;
917-
DispatchWaitMode waitMode = pParms->waitMode;
918992

919993
for (i = 0; i < pParms->dispatchCount; i++)
920-
{
921-
char errbuf[256];
922-
bool sent = false;
923-
CdbDispatchResult *dispatchResult = pParms->dispatchResultPtrArray[i];
994+
signalQE(pParms->dispatchResultPtrArray[i], pParms->waitMode);
995+
}
924996

925-
Assert(dispatchResult != NULL);
926-
SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
997+
/*
998+
* Send finish or cancel signal to QE if needed.
999+
*/
1000+
static void
1001+
signalQE(CdbDispatchResult *dispatchResult, DispatchWaitMode waitMode)
1002+
{
1003+
Assert(dispatchResult != NULL);
1004+
SegmentDatabaseDescriptor *segdbDesc = dispatchResult->segdbDesc;
9271005

928-
/*
929-
* Don't send the signal if - QE is finished or canceled - the signal
930-
* was already sent - connection is dead
931-
*/
1006+
/*
1007+
* Don't send the signal if - QE is finished or canceled - the signal
1008+
* was already sent - connection is dead
1009+
*/
9321010

933-
if (!dispatchResult->stillRunning ||
934-
dispatchResult->wasCanceled ||
935-
(pParms->waitMode == DISPATCH_WAIT_ACK_ROOT &&
936-
dispatchResult->receivedAckMsg) ||
937-
cdbconn_isBadConnection(segdbDesc))
938-
continue;
1011+
if (!dispatchResult->stillRunning ||
1012+
dispatchResult->wasCanceled ||
1013+
(waitMode == DISPATCH_WAIT_ACK_ROOT &&
1014+
dispatchResult->receivedAckMsg) ||
1015+
cdbconn_isBadConnection(segdbDesc))
1016+
{
1017+
return;
1018+
}
9391019

940-
memset(errbuf, 0, sizeof(errbuf));
1020+
char errbuf[256] = {0};
9411021

942-
sent = cdbconn_signalQE(segdbDesc, errbuf, waitMode == DISPATCH_WAIT_CANCEL);
943-
if (sent)
944-
dispatchResult->sentSignal = waitMode;
945-
else
946-
elog(LOG, "Unable to cancel: %s",
947-
strlen(errbuf) == 0 ? "cannot allocate PGCancel" : errbuf);
948-
}
1022+
if (cdbconn_signalQE(segdbDesc, errbuf, waitMode == DISPATCH_WAIT_CANCEL))
1023+
dispatchResult->sentSignal = waitMode;
1024+
else
1025+
elog(LOG, "Unable to cancel: %s",
1026+
strlen(errbuf) == 0 ? "cannot allocate PGCancel" : errbuf);
9491027
}
9501028

9511029
/*

src/test/isolation2/input/parallel_retrieve_cursor/fault_inject.source

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,25 @@ SELECT gp_inject_fault('fetch_tuples_from_endpoint', 'resume', dbid)
249249
WHERE content=2 AND role='p';
250250

251251
DROP TABLE t2;
252+
253+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'reset', 2);
254+
SELECT gp_inject_fault('alloc_endpoint_slot_full_reset', 'reset', 2);
255+
256+
-- Test 8: QD shouldn't hang waiting.
257+
258+
DROP TABLE IF EXISTS t3;
259+
CREATE TABLE t3 AS SELECT generate_series(1, 10) AS a DISTRIBUTED by (a);
260+
261+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'error', dbid)
262+
FROM gp_segment_configuration WHERE role = 'p' AND content = 0;
263+
264+
BEGIN;
265+
-- QE encounters error and destroys the endpoint immediately. QD successfully
266+
-- resets the query, sends ACKs, and discards transaction.
267+
DECLARE c1 PARALLEL RETRIEVE CURSOR FOR SELECT * FROM t3;
268+
ROLLBACK;
269+
270+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'reset', dbid)
271+
FROM gp_segment_configuration WHERE role = 'p' AND content = 0;
272+
273+
DROP TABLE t3;

src/test/isolation2/input/resgroup/resgroup_oom.source

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ gp_segment_configuration WHERE role = 'p' AND content = -1;
4444

4545
1<:
4646

47+
-- And finally, make sure we don't enter error recursion on fail.
48+
ALTER RESOURCE GROUP rg_oom_test SET memory_shared_quota 0;
49+
1: SELECT count(*) FROM gp_mock_cdbdispatchcommand(10000000);
50+
4751
DROP FUNCTION gp_mock_cdbdispatchcommand(amount int);
4852
DROP ROLE role_oom_test;
4953
DROP RESOURCE GROUP rg_oom_test;

src/test/isolation2/output/parallel_retrieve_cursor/fault_inject.source

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -770,3 +770,45 @@ ROLLBACK
770770

771771
DROP TABLE t2;
772772
DROP
773+
774+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'reset', 2);
775+
gp_inject_fault
776+
-----------------
777+
Success:
778+
(1 row)
779+
SELECT gp_inject_fault('alloc_endpoint_slot_full_reset', 'reset', 2);
780+
gp_inject_fault
781+
-----------------
782+
Success:
783+
(1 row)
784+
785+
-- Test 8: QD shouldn't hang waiting.
786+
787+
DROP TABLE IF EXISTS t3;
788+
DROP
789+
CREATE TABLE t3 AS SELECT generate_series(1, 10) AS a DISTRIBUTED by (a);
790+
CREATE 10
791+
792+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'error', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = 0;
793+
gp_inject_fault
794+
-----------------
795+
Success:
796+
(1 row)
797+
798+
BEGIN;
799+
BEGIN
800+
-- QE encounters error and destroys the endpoint immediately. QD successfully
801+
-- resets the query, sends ACKs, and discards transaction.
802+
DECLARE c1 PARALLEL RETRIEVE CURSOR FOR SELECT * FROM t3;
803+
ERROR: fault triggered, fault name:'alloc_endpoint_slot_full' fault type:'error' (seg0 127.0.0.1:6002 pid=8916)
804+
ROLLBACK;
805+
ROLLBACK
806+
807+
SELECT gp_inject_fault('alloc_endpoint_slot_full', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = 0;
808+
gp_inject_fault
809+
-----------------
810+
Success:
811+
(1 row)
812+
813+
DROP TABLE t3;
814+
DROP

src/test/isolation2/output/resgroup/resgroup_oom.source

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ SELECT gp_inject_fault('check_dispatch_result_end', 'reset', dbid) FROM gp_segme
5151
4000000
5252
(1 row)
5353

54+
-- And finally, make sure we don't enter error recursion on fail.
55+
ALTER RESOURCE GROUP rg_oom_test SET memory_shared_quota 0;
56+
ALTER
57+
1: SELECT count(*) FROM gp_mock_cdbdispatchcommand(10000000);
58+
ERROR: Out of memory
59+
DETAIL: Resource group memory limit reached
60+
5461
DROP FUNCTION gp_mock_cdbdispatchcommand(amount int);
5562
DROP
5663
DROP ROLE role_oom_test;
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
-- start_matchsubs
2+
-- m/ERROR: Out of memory.*/
3+
-- s/ERROR: Out of memory.*/ERROR: Out of memory/
4+
-- m/ERROR: Canceling query because of high VMEM usage.*/
5+
-- s/ERROR: Canceling query because of high VMEM usage.*/ERROR: Out of memory/
6+
-- end_matchsubs
7+
8+
CREATE RESOURCE GROUP rg_oom_test WITH (cpu_rate_limit=20, memory_limit=20, memory_shared_quota=100);
9+
CREATE
10+
CREATE ROLE role_oom_test RESOURCE GROUP rg_oom_test;
11+
CREATE
12+
CREATE OR REPLACE FUNCTION gp_mock_cdbdispatchcommand(amount int) RETURNS SETOF text AS '@abs_srcdir@/../regress/regress.so', 'gp_mock_cdbdispatchcommand' LANGUAGE C;
13+
CREATE
14+
15+
1: SET ROLE TO role_oom_test;
16+
SET
17+
18+
-- Freeze coordinator's session after it reads results from segments.
19+
SELECT gp_inject_fault('check_dispatch_result_end', 'suspend', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = -1;
20+
gp_inject_fault
21+
-----------------
22+
Success:
23+
(1 row)
24+
25+
-- Send the heavy query.
26+
1&: SELECT count(*) FROM gp_mock_cdbdispatchcommand(1000000); <waiting ...>
27+
28+
-- Wait until we receive everything.
29+
SELECT gp_wait_until_triggered_fault('check_dispatch_result_end', 1, dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = -1;
30+
gp_wait_until_triggered_fault
31+
-------------------------------
32+
Success:
33+
(1 row)
34+
35+
-- The query should've used ~135 MB of memory. Allow 15 MB error.
36+
WITH r AS ( SELECT (memory_usage->'-1'->'used')::text::int AS mb FROM gp_toolkit.gp_resgroup_status WHERE rsgname = 'rg_oom_test' ) SELECT r.mb < 150 AS "under 150 MB", r.mb > 120 AS "above 120 MB" FROM r;
37+
under 150 MB | above 120 MB
38+
--------------+--------------
39+
t | t
40+
(1 row)
41+
42+
SELECT gp_inject_fault('check_dispatch_result_end', 'reset', dbid) FROM gp_segment_configuration WHERE role = 'p' AND content = -1;
43+
gp_inject_fault
44+
-----------------
45+
Success:
46+
(1 row)
47+
48+
1<: <... completed>
49+
count
50+
---------
51+
4000000
52+
(1 row)
53+
54+
-- And finally, make sure we don't enter error recursion on fail.
55+
ALTER RESOURCE GROUP rg_oom_test SET memory_shared_quota 0;
56+
ALTER
57+
1: SELECT count(*) FROM gp_mock_cdbdispatchcommand(10000000);
58+
ERROR: Out of memory
59+
60+
DROP FUNCTION gp_mock_cdbdispatchcommand(amount int);
61+
DROP
62+
DROP ROLE role_oom_test;
63+
DROP
64+
DROP RESOURCE GROUP rg_oom_test;
65+
DROP

0 commit comments

Comments
 (0)