Skip to content

Commit d440cb1

Browse files
committed
Fix recovery slot handling before pg_basebackup
1 parent 32373aa commit d440cb1

5 files changed

Lines changed: 135 additions & 92 deletions

File tree

gpMgmt/bin/gppylib/commands/pg.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
#!/usr/bin/env python3
22
#
3-
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
3+
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
44
#
55

6+
from contextlib import closing
67
import os
78
import pipes
89

910
from gppylib.gplog import *
1011
from gppylib.gparray import *
12+
from gppylib.db import dbconn
1113
from .base import *
1214
from .unix import *
1315
from gppylib.commands.base import *
@@ -17,6 +19,31 @@
1719

1820
GPHOME=os.environ.get('GPHOME')
1921

22+
23+
def ensure_replication_slot_exists(source_host, source_port,
24+
replication_slot_name):
25+
if not replication_slot_name:
26+
return False
27+
28+
escaped_slot_name = replication_slot_name.replace("'", "''")
29+
dburl = dbconn.DbURL(hostname=source_host, port=source_port,
30+
dbname='template1')
31+
with closing(dbconn.connect(dburl, utility=True)) as conn:
32+
slot_exists = dbconn.querySingleton(
33+
conn,
34+
"SELECT count(*) FROM pg_catalog.pg_replication_slots "
35+
"WHERE slot_name = '{}'".format(escaped_slot_name))
36+
if slot_exists > 0:
37+
return False
38+
39+
dbconn.querySingleton(
40+
conn,
41+
"SELECT slot_name "
42+
"FROM pg_catalog.pg_create_physical_replication_slot('{}')"
43+
.format(escaped_slot_name))
44+
45+
return True
46+
2047
class DbStatus(Command):
2148
def __init__(self,name,db,ctxt=LOCAL,remoteHost=None):
2249
self.db=db

gpMgmt/bin/gppylib/commands/test/unit/test_unit_pg_base_backup.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
from unittest.mock import Mock, patch
23
from gppylib.commands import pg
34

45

@@ -44,6 +45,57 @@ def test_base_backup_does_not_pass_conflicting_xlog_method_argument_when_given_r
4445
self.assertNotIn("-x", base_backup.command_tokens)
4546
self.assertNotIn("--xlog", base_backup.command_tokens)
4647

48+
@patch('gppylib.commands.pg.dbconn.querySingleton', return_value=1)
49+
@patch('gppylib.commands.pg.dbconn.connect')
50+
@patch('gppylib.commands.pg.dbconn.DbURL')
51+
def test_ensure_replication_slot_exists_returns_false_when_slot_exists(self, mock_dburl,
52+
mock_connect,
53+
mock_query_singleton):
54+
mock_conn = Mock()
55+
mock_connect.return_value = mock_conn
56+
57+
created = pg.ensure_replication_slot_exists('source-host', 5432, 'slot_name')
58+
59+
self.assertFalse(created)
60+
mock_dburl.assert_called_once_with(hostname='source-host', port=5432, dbname='template1')
61+
mock_connect.assert_called_once_with(mock_dburl.return_value, utility=True)
62+
self.assertEqual(1, mock_query_singleton.call_count)
63+
self.assertIn("FROM pg_catalog.pg_replication_slots", mock_query_singleton.call_args[0][1])
64+
mock_conn.close.assert_called_once_with()
65+
66+
@patch('gppylib.commands.pg.dbconn.querySingleton', side_effect=[0, 'slot_name'])
67+
@patch('gppylib.commands.pg.dbconn.connect')
68+
@patch('gppylib.commands.pg.dbconn.DbURL')
69+
def test_ensure_replication_slot_exists_creates_missing_slot(self, mock_dburl,
70+
mock_connect,
71+
mock_query_singleton):
72+
mock_conn = Mock()
73+
mock_connect.return_value = mock_conn
74+
75+
created = pg.ensure_replication_slot_exists('source-host', 5432, 'slot_name')
76+
77+
self.assertTrue(created)
78+
mock_dburl.assert_called_once_with(hostname='source-host', port=5432, dbname='template1')
79+
mock_connect.assert_called_once_with(mock_dburl.return_value, utility=True)
80+
self.assertEqual(2, mock_query_singleton.call_count)
81+
self.assertIn("FROM pg_catalog.pg_replication_slots", mock_query_singleton.call_args_list[0][0][1])
82+
self.assertIn("pg_create_physical_replication_slot('slot_name')",
83+
mock_query_singleton.call_args_list[1][0][1])
84+
mock_conn.close.assert_called_once_with()
85+
86+
@patch('gppylib.commands.pg.dbconn.querySingleton')
87+
@patch('gppylib.commands.pg.dbconn.connect')
88+
@patch('gppylib.commands.pg.dbconn.DbURL')
89+
def test_ensure_replication_slot_exists_skips_empty_slot_name(self, mock_dburl,
90+
mock_connect,
91+
mock_query_singleton):
92+
created = pg.ensure_replication_slot_exists('source-host', 5432, None)
93+
94+
self.assertFalse(created)
95+
mock_dburl.assert_not_called()
96+
mock_connect.assert_not_called()
97+
mock_query_singleton.assert_not_called()
98+
4799

48100
if __name__ == '__main__':
49101
unittest.main()

gpMgmt/bin/gppylib/test/unit/test_unit_gpsegrecovery.py

Lines changed: 42 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,12 @@ def setUp(self):
106106
self.maxDiff = None
107107
self.mock_logger = Mock(spec=['log', 'info', 'debug', 'error', 'warn', 'exception'])
108108
self.apply_patches([
109+
patch('gpsegrecovery.ensure_replication_slot_exists'),
109110
patch('gpsegrecovery.start_segment', return_value=Mock()),
110111
patch('gpsegrecovery.PgBaseBackup.__init__', return_value=None),
111112
patch('gpsegrecovery.PgBaseBackup.run')
112113
])
114+
self.mock_ensure_slot = self.get_mock_from_apply_patch('ensure_replication_slot_exists')
113115
self.mock_pgbasebackup_run = self.get_mock_from_apply_patch('run')
114116
self.mock_pgbasebackup_init = self.get_mock_from_apply_patch('__init__')
115117

@@ -130,6 +132,7 @@ def tearDown(self):
130132
super(FullRecoveryTestCase, self).tearDown()
131133

132134
def _assert_basebackup_runs(self, expected_init_args):
135+
self.mock_ensure_slot.assert_called_once_with('sdw1', 40000, 'internal_wal_replication_slot')
133136
self.assertEqual(1, self.mock_pgbasebackup_init.call_count)
134137
self.assertEqual(expected_init_args, self.mock_pgbasebackup_init.call_args)
135138
self.assertEqual(1, self.mock_pgbasebackup_run.call_count)
@@ -172,66 +175,51 @@ def test_basebackup_run_no_forceoverwrite_passes(self):
172175
self._assert_basebackup_runs(expected_init_args1)
173176
self._assert_cmd_passed()
174177

175-
def test_basebackup_run_one_exception(self):
176-
self.mock_pgbasebackup_run.side_effect = [Exception('backup failed once'), Mock()]
178+
def test_basebackup_slot_check_exception(self):
179+
self.mock_ensure_slot.side_effect = [Exception('slot check failed')]
177180

178181
self.full_recovery_cmd.run()
179182

180-
expected_init_args1 = call("/data/mirror0", "sdw1", '40000', create_slot=False,
181-
replication_slot_name='internal_wal_replication_slot',
182-
forceoverwrite=True, target_gp_dbid=2, progress_file='/tmp/test_progress_file')
183-
expected_init_args2 = call("/data/mirror0", "sdw1", '40000', create_slot=True,
184-
replication_slot_name='internal_wal_replication_slot',
185-
forceoverwrite=True, target_gp_dbid=2, progress_file='/tmp/test_progress_file')
186-
self.assertEqual(2, self.mock_pgbasebackup_init.call_count)
187-
self.assertEqual([expected_init_args1, expected_init_args2] , self.mock_pgbasebackup_init.call_args_list)
188-
self.assertEqual(2, self.mock_pgbasebackup_run.call_count)
189-
self.assertEqual([call(validateAfter=True),call(validateAfter=True)], self.mock_pgbasebackup_run.call_args_list)
190-
gpsegrecovery.start_segment.assert_called_once_with(self.seg_recovery_info, self.mock_logger, self.era)
191-
self._assert_cmd_passed()
183+
self.assertEqual(0, self.mock_pgbasebackup_init.call_count)
184+
self.assertEqual(0, self.mock_pgbasebackup_run.call_count)
185+
self.assertEqual(0, gpsegrecovery.start_segment.call_count)
186+
self._assert_cmd_failed('{"error_type": "full", "error_msg": "slot check failed", "dbid": 2, '
187+
'"datadir": "/data/mirror0", "port": 50000, '
188+
'"progress_file": "/tmp/test_progress_file"}')
192189

193-
def test_basebackup_run_two_exceptions(self):
194-
self.mock_pgbasebackup_run.side_effect=[Exception('backup failed once'),
195-
Exception('backup failed twice')]
190+
def test_basebackup_run_exception(self):
191+
self.mock_pgbasebackup_run.side_effect=[Exception('backup failed once')]
196192

197193
self.full_recovery_cmd.run()
198194

199195
expected_init_args1 = call("/data/mirror0", "sdw1", '40000', create_slot=False,
200196
replication_slot_name='internal_wal_replication_slot',
201197
forceoverwrite=True, target_gp_dbid=2, progress_file='/tmp/test_progress_file')
202-
expected_init_args2 = call("/data/mirror0", "sdw1", '40000', create_slot=True,
203-
replication_slot_name='internal_wal_replication_slot',
204-
forceoverwrite=True, target_gp_dbid=2, progress_file='/tmp/test_progress_file')
205-
self.assertEqual(2, self.mock_pgbasebackup_init.call_count)
206-
self.assertEqual([expected_init_args1, expected_init_args2], self.mock_pgbasebackup_init.call_args_list)
207-
self.assertEqual(2, self.mock_pgbasebackup_run.call_count)
208-
self.assertEqual([call(validateAfter=True),call(validateAfter=True)], self.mock_pgbasebackup_run.call_args_list)
209-
self.mock_logger.info.any_call('Running pg_basebackup failed: backup failed once')
210-
self.mock_logger.info.assert_called_with("Re-running pg_basebackup, creating the slot this time")
198+
self.mock_ensure_slot.assert_called_once_with('sdw1', 40000, 'internal_wal_replication_slot')
199+
self.assertEqual(1, self.mock_pgbasebackup_init.call_count)
200+
self.assertEqual([expected_init_args1], self.mock_pgbasebackup_init.call_args_list)
201+
self.assertEqual(1, self.mock_pgbasebackup_run.call_count)
202+
self.assertEqual([call(validateAfter=True)], self.mock_pgbasebackup_run.call_args_list)
211203
self.assertEqual(0, gpsegrecovery.start_segment.call_count)
212-
self._assert_cmd_failed('{"error_type": "full", "error_msg": "backup failed twice", "dbid": 2, ' \
204+
self._assert_cmd_failed('{"error_type": "full", "error_msg": "backup failed once", "dbid": 2, ' \
213205
'"datadir": "/data/mirror0", "port": 50000, "progress_file": "/tmp/test_progress_file"}')
214206

215-
def test_basebackup_run_no_forceoverwrite_two_exceptions(self):
216-
self.mock_pgbasebackup_run.side_effect = [Exception('backup failed once'),
217-
Exception('backup failed twice')]
207+
def test_basebackup_run_no_forceoverwrite_exception(self):
208+
self.mock_pgbasebackup_run.side_effect = [Exception('backup failed once')]
218209
self.full_recovery_cmd.forceoverwrite = False
219210

220211
self.full_recovery_cmd.run()
221212

222213
expected_init_args1 = call("/data/mirror0", "sdw1", '40000', create_slot=False,
223214
replication_slot_name='internal_wal_replication_slot',
224215
forceoverwrite=False, target_gp_dbid=2, progress_file='/tmp/test_progress_file')
225-
# regardless of the passed in value, second call to pg_basebackup will always have forceoverwrite=True
226-
expected_init_args2 = call("/data/mirror0", "sdw1", '40000', create_slot=True,
227-
replication_slot_name='internal_wal_replication_slot',
228-
forceoverwrite=True, target_gp_dbid=2, progress_file='/tmp/test_progress_file')
229-
self.assertEqual(2, self.mock_pgbasebackup_init.call_count)
230-
self.assertEqual([expected_init_args1, expected_init_args2], self.mock_pgbasebackup_init.call_args_list)
231-
self.assertEqual(2, self.mock_pgbasebackup_run.call_count)
232-
self.assertEqual([call(validateAfter=True),call(validateAfter=True)], self.mock_pgbasebackup_run.call_args_list)
216+
self.mock_ensure_slot.assert_called_once_with('sdw1', 40000, 'internal_wal_replication_slot')
217+
self.assertEqual(1, self.mock_pgbasebackup_init.call_count)
218+
self.assertEqual([expected_init_args1], self.mock_pgbasebackup_init.call_args_list)
219+
self.assertEqual(1, self.mock_pgbasebackup_run.call_count)
220+
self.assertEqual([call(validateAfter=True)], self.mock_pgbasebackup_run.call_args_list)
233221
self.assertEqual(0, gpsegrecovery.start_segment.call_count)
234-
self._assert_cmd_failed('{"error_type": "full", "error_msg": "backup failed twice", "dbid": 2, ' \
222+
self._assert_cmd_failed('{"error_type": "full", "error_msg": "backup failed once", "dbid": 2, ' \
235223
'"datadir": "/data/mirror0", "port": 50000, "progress_file": "/tmp/test_progress_file"}')
236224

237225
def test_basebackup_init_exception(self):
@@ -287,7 +275,10 @@ def tearDown(self):
287275
@patch('gppylib.commands.pg.PgRewind.run')
288276
@patch('gpsegrecovery.PgBaseBackup.__init__', return_value=None)
289277
@patch('gpsegrecovery.PgBaseBackup.run')
290-
def test_complete_workflow(self, mock_pgbasebackup_run, mock_pgbasebackup_init, mock_pgrewind_run, mock_pgrewind_init):
278+
@patch('gpsegrecovery.ensure_replication_slot_exists')
279+
def test_complete_workflow(self, mock_ensure_slot, mock_pgbasebackup_run,
280+
mock_pgbasebackup_init, mock_pgrewind_run,
281+
mock_pgrewind_init):
291282
mix_confinfo = gppylib.recoveryinfo.serialize_list([
292283
self.full_r1, self.incr_r2])
293284
sys.argv = ['gpsegrecovery', '-l', '/tmp/logdir', '--era', '{}'.format(self.era), '-c {}'.format(mix_confinfo)]
@@ -301,17 +292,21 @@ def test_complete_workflow(self, mock_pgbasebackup_run, mock_pgbasebackup_init,
301292
self.assertEqual(1, mock_pgrewind_init.call_count)
302293
self.assertEqual(1, mock_pgbasebackup_run.call_count)
303294
self.assertEqual(1, mock_pgbasebackup_init.call_count)
295+
mock_ensure_slot.assert_called_once_with('source_hostname1', 6001, 'internal_wal_replication_slot')
304296
self.assertRegex(gplog.get_logfile(), '/gpsegrecovery.py_\d+\.log')
305297

306298
@patch('gppylib.commands.pg.PgRewind.__init__', return_value=None)
307299
@patch('gppylib.commands.pg.PgRewind.run')
308300
@patch('gpsegrecovery.PgBaseBackup.__init__', return_value=None)
309301
@patch('gpsegrecovery.PgBaseBackup.run')
310-
def test_complete_workflow_exception(self, mock_pgbasebackup_run, mock_pgbasebackup_init, mock_pgrewind_run,
302+
@patch('gpsegrecovery.ensure_replication_slot_exists')
303+
def test_complete_workflow_exception(self, mock_ensure_slot,
304+
mock_pgbasebackup_run,
305+
mock_pgbasebackup_init,
306+
mock_pgrewind_run,
311307
mock_pgrewind_init):
312308
mock_pgrewind_run.side_effect = [Exception('pg_rewind failed')]
313-
mock_pgbasebackup_run.side_effect = [Exception('pg_basebackup failed once'),
314-
Exception('pg_basebackup failed twice')]
309+
mock_pgbasebackup_run.side_effect = [Exception('pg_basebackup failed once')]
315310
mix_confinfo = gppylib.recoveryinfo.serialize_list([
316311
self.full_r1, self.incr_r2])
317312
sys.argv = ['gpsegrecovery', '-l', '/tmp/logdir', '--era={}'.format(self.era), '-c {}'.format(mix_confinfo)]
@@ -322,14 +317,15 @@ def test_complete_workflow_exception(self, mock_pgbasebackup_run, mock_pgbasebac
322317

323318
self.assertCountEqual('[{"error_type": "incremental", "error_msg": "pg_rewind failed", "dbid": 4, "datadir": "target_data_dir4", '
324319
'"port": 5004, "progress_file": "/tmp/progress_file4"} , '
325-
'{"error_type": "full", "error_msg": "pg_basebackup failed twice", "dbid": 1,'
320+
'{"error_type": "full", "error_msg": "pg_basebackup failed once", "dbid": 1,'
326321
'"datadir": "target_data_dir1", "port": 5001, "progress_file": "/tmp/progress_file1"}]',
327322
buf.getvalue().strip())
328323
self.assertEqual(1, ex.exception.code)
329324
self.assertEqual(1, mock_pgrewind_run.call_count)
330325
self.assertEqual(1, mock_pgrewind_init.call_count)
331-
self.assertEqual(2, mock_pgbasebackup_run.call_count)
332-
self.assertEqual(2, mock_pgbasebackup_init.call_count)
326+
self.assertEqual(1, mock_pgbasebackup_run.call_count)
327+
self.assertEqual(1, mock_pgbasebackup_init.call_count)
328+
mock_ensure_slot.assert_called_once_with('source_hostname1', 6001, 'internal_wal_replication_slot')
333329
self.assertRegex(gplog.get_logfile(), '/gpsegrecovery.py_\d+\.log')
334330

335331
@patch('recovery_base.gplog.setup_tool_logging')

gpMgmt/bin/lib/gpconfigurenewsegment

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ from optparse import Option, OptionGroup, OptionParser, OptionValueError, SUPPRE
1010

1111
from gppylib.gpparseopts import OptParser, OptChecker
1212
from gppylib.commands.gp import ModifyConfSetting, SegmentStart, SegmentStop
13-
from gppylib.commands.pg import PgBaseBackup
13+
from gppylib.commands.pg import PgBaseBackup, ensure_replication_slot_exists
1414
from gppylib.db import dbconn
1515
from gppylib.commands import unix
1616
from gppylib.commands.pg import DbStatus
@@ -134,6 +134,10 @@ class ConfExpSegCmd(Command):
134134
self.progressFile = '%s/pg_basebackup.%s.dbid%s.out' % (gplog.get_logger_dir(),
135135
datetime.datetime.today().strftime('%Y%m%d_%H%M%S'),
136136
self.dbid)
137+
ensure_replication_slot_exists(
138+
self.syncWithSegmentHostname,
139+
self.syncWithSegmentPort,
140+
self.replicationSlotName)
137141
# Create a mirror based on the primary
138142
cmd = PgBaseBackup(target_datadir=self.datadir,
139143
source_host=self.syncWithSegmentHostname,
@@ -149,30 +153,9 @@ class ConfExpSegCmd(Command):
149153
self.set_results(CommandResult(0, b'', b'', True, False))
150154
if shouldDeleteProgressFile:
151155
os.remove(self.progressFile)
152-
153156
except Exception as e:
154-
# If the cluster never has mirrors, cmd will fail
155-
# quickly because the internal slot doesn't exist.
156-
# Re-run with `create_slot`.
157-
# GPDB_12_MERGE_FIXME could we check it before? or let
158-
# pg_basebackup create slot if not exists.
159-
cmd = PgBaseBackup(target_datadir=self.datadir,
160-
source_host=self.syncWithSegmentHostname,
161-
source_port=str(self.syncWithSegmentPort),
162-
create_slot = True,
163-
replication_slot_name=self.replicationSlotName,
164-
forceoverwrite=True,
165-
target_gp_dbid=self.dbid,
166-
logfile=self.progressFile)
167-
try:
168-
logger.info("Re-running pg_basebackup, creating the slot this time")
169-
cmd.run(validateAfter=True)
170-
self.set_results(CommandResult(0, b'', b'', True, False))
171-
if shouldDeleteProgressFile:
172-
os.remove(self.progressFile)
173-
except Exception as e:
174-
self.set_results(CommandResult(1, b'', str(e).encode(), True, False))
175-
raise
157+
self.set_results(CommandResult(1, b'', str(e).encode(), True, False))
158+
raise
176159

177160
logger.info("Successfully ran pg_basebackup: %s" % cmd.cmdStr)
178161
return

0 commit comments

Comments
 (0)