Skip to content

Commit 07d91d0

Browse files
committed
pg_rewind: Handle partial writes with remote source
1 parent 76c67d9 commit 07d91d0

7 files changed

Lines changed: 201 additions & 7 deletions

File tree

fetools/pg18/pg_rewind/libpq_source.c

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
#include "pg_rewind.h"
1818
#include "port/pg_bswap.h"
1919
#include "rewind_source.h"
20+
#include "tde_ops.h"
21+
22+
#include "pg_tde.h"
2023

2124
/*
2225
* Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
@@ -31,6 +34,7 @@ typedef struct
3134
const char *path; /* path relative to data directory root */
3235
off_t offset;
3336
size_t length;
37+
bool encrypt;
3438
} fetch_range_request;
3539

3640
typedef struct
@@ -71,6 +75,10 @@ static char *libpq_fetch_file(rewind_source *source, const char *path,
7175
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
7276
static void libpq_destroy(rewind_source *source);
7377

78+
static void libpq_queue_process_fetch_range(rewind_source *source, const char *path,
79+
bool needs_encrypt, off_t off, size_t len);
80+
static void libpq_fetch_tde_keys(rewind_source *source);
81+
7482
/*
7583
* Create a new libpq source.
7684
*
@@ -100,6 +108,8 @@ init_libpq_source(PGconn *conn)
100108
initStringInfo(&src->offsets);
101109
initStringInfo(&src->lengths);
102110

111+
libpq_fetch_tde_keys(&src->common);
112+
103113
return &src->common;
104114
}
105115

@@ -345,7 +355,7 @@ libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
345355
* fetch-requests are for a whole file.
346356
*/
347357
open_target_file(path, true);
348-
libpq_queue_fetch_range(source, path, 0, Max(len, MAX_CHUNK_SIZE));
358+
libpq_queue_process_fetch_range(source, path, false, 0, Max(len, MAX_CHUNK_SIZE));
349359
}
350360

351361
/*
@@ -354,6 +364,17 @@ libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
354364
static void
355365
libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
356366
size_t len)
367+
{
368+
libpq_queue_process_fetch_range(source, path, true, off, len);
369+
}
370+
371+
/*
372+
* A workhorse for libpq_queue_fetch_range.
373+
* `needs_encrypt` indicates if file's blocks may need re-encryption.
374+
*/
375+
static void
376+
libpq_queue_process_fetch_range(rewind_source *source, const char *path,
377+
bool needs_encrypt, off_t off, size_t len)
357378
{
358379
libpq_source *src = (libpq_source *) source;
359380

@@ -406,6 +427,7 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
406427
src->request_queue[src->num_requests].path = path;
407428
src->request_queue[src->num_requests].offset = off;
408429
src->request_queue[src->num_requests].length = thislen;
430+
src->request_queue[src->num_requests].encrypt = needs_encrypt;
409431
src->num_requests++;
410432

411433
off += thislen;
@@ -420,6 +442,7 @@ static void
420442
libpq_finish_fetch(rewind_source *source)
421443
{
422444
process_queued_fetch_requests((libpq_source *) source);
445+
flush_current_key();
423446
}
424447

425448
static void
@@ -592,6 +615,19 @@ process_queued_fetch_requests(libpq_source *src)
592615

593616
open_target_file(filename, false);
594617

618+
if (rq->encrypt)
619+
{
620+
Assert(chunksize % BLCKSZ == 0);
621+
622+
ensure_tde_keys(filename);
623+
624+
for (int i = 0; i < chunksize / BLCKSZ; i++)
625+
{
626+
unsigned char *data = (unsigned char *) chunk + BLCKSZ * i;
627+
628+
encrypt_block(data, chunkoff + BLCKSZ * i);
629+
}
630+
}
595631
write_target_range(chunk, chunkoff, chunksize);
596632
}
597633

@@ -682,3 +718,52 @@ libpq_destroy(rewind_source *source)
682718

683719
/* NOTE: we don't close the connection here, as it was not opened by us. */
684720
}
721+
722+
static void
723+
libpq_fetch_tde_keys(rewind_source *source)
724+
{
725+
PGconn *conn = ((libpq_source *) source)->conn;
726+
PGresult *res;
727+
728+
res = PQexec(conn, "SELECT pg_ls_dir('" PG_TDE_DATA_DIR "', true, false)");
729+
730+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
731+
pg_fatal("could not fetch file list: %s",
732+
PQresultErrorMessage(res));
733+
734+
/* no tde dir, nothing to do */
735+
if (PQnfields(res) == 0)
736+
{
737+
PQclear(res);
738+
return;
739+
}
740+
741+
init_tde();
742+
743+
for (int i = 0; i < PQntuples(res); i++)
744+
{
745+
char *path;
746+
char *tde_file_buf;
747+
size_t size;
748+
char target_path[MAXPGPATH];
749+
750+
if (PQgetisnull(res, i, 0))
751+
{
752+
/*
753+
* The file was removed from the server while the query was
754+
* running. Ignore it.
755+
*/
756+
continue;
757+
}
758+
759+
path = PQgetvalue(res, i, 0);
760+
761+
snprintf(target_path, MAXPGPATH, "%s/%s", PG_TDE_DATA_DIR, path);
762+
tde_file_buf = libpq_fetch_file(source, target_path, &size);
763+
764+
write_tmp_source_file(path, tde_file_buf, size);
765+
pg_free(tde_file_buf);
766+
}
767+
768+
PQclear(res);
769+
}

fetools/pg18/pg_rewind/local_source.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ local_fetch_tde_keys(rewind_source *source)
210210
if (!directory_exists(tde_source_dir))
211211
return;
212212

213-
create_tde_tmp_dir();
213+
init_tde();
214214
copy_tmp_tde_files(tde_source_dir);
215215
}
216216

fetools/pg18/pg_rewind/tde_ops.c

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "pg_tde.h"
1616

1717
static void copy_dir(const char *src, const char *dst);
18+
static void create_tde_tmp_dir(void);
1819

1920
typedef struct
2021
{
@@ -32,6 +33,7 @@ static current_file_data current_tde_file =
3233

3334
/* Dir for an operational copy of source's tde files (_keys, etc) */
3435
static char tde_tmp_scource[MAXPGPATH] = "/tmp/pg_tde_rewindXXXXXX";
36+
static bool source_has_tde = false;
3537

3638
void
3739
flush_current_key(void)
@@ -55,13 +57,18 @@ ensure_tde_keys(const char *relpath)
5557
RelFileLocator rlocator;
5658
unsigned int segNo;
5759

60+
/* no TDE on source, nothing to do */
61+
if (!source_has_tde)
62+
return;
63+
5864
/* the same file, nothing to do */
5965
if (strcmp(current_tde_file.path, relpath) == 0)
6066
return;
6167

6268
flush_current_key();
6369

64-
path_rlocator(relpath, &rlocator, &segNo);
70+
if (!path_rlocator(relpath, &rlocator, &segNo))
71+
return;
6572

6673
pg_tde_set_data_dir(tde_tmp_scource);
6774
current_tde_file.source_key = pg_tde_get_smgr_key(rlocator);
@@ -107,14 +114,12 @@ encrypt_block(unsigned char *buf, off_t file_offset)
107114
}
108115

109116

110-
void
117+
static void
111118
create_tde_tmp_dir(void)
112119
{
113120
if (mkdtemp(tde_tmp_scource) == NULL)
114121
pg_fatal("could not create temporary directory \"%s\": %m", tde_tmp_scource);
115122

116-
atexit(destroy_tde_tmp_dir);
117-
118123
pg_log_debug("created temporary pg_tde directory: %s", tde_tmp_scource);
119124
}
120125

@@ -195,6 +200,14 @@ copy_dir(const char *src, const char *dst)
195200
pg_fatal("could not close directory \"%s\": %m", src);
196201
}
197202

203+
void
204+
init_tde(void)
205+
{
206+
source_has_tde = true;
207+
create_tde_tmp_dir();
208+
atexit(destroy_tde_tmp_dir);
209+
}
210+
198211
void
199212
copy_tmp_tde_files(const char *from)
200213
{
@@ -209,6 +222,9 @@ fetch_tde_dir(void)
209222
if (dry_run)
210223
return;
211224

225+
if (!source_has_tde)
226+
return;
227+
212228
snprintf(target_tde_dir, MAXPGPATH, "%s/%s", datadir_target, PG_TDE_DATA_DIR);
213229

214230
rmtree(target_tde_dir, false);

fetools/pg18/pg_rewind/tde_ops.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ extern void flush_current_key(void);
55
extern void ensure_tde_keys(const char *relpath);
66
extern void encrypt_block(unsigned char *buf, off_t file_offset);
77

8-
extern void create_tde_tmp_dir(void);
98
extern void destroy_tde_tmp_dir(void);
109
extern void write_tmp_source_file(const char *fname, char *buf, size_t size);
1110
extern void fetch_tde_dir(void);
1211
extern void copy_tmp_tde_files(const char *from);
12+
extern void init_tde(void);
1313

1414
#endif /* PG_REWIND_TDE_FILE_H */

meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ tap_tests = [
319319
't/pg_resetwal_corrupted.pl',
320320
't/pg_rewind_basic.pl',
321321
't/pg_rewind_databases.pl',
322+
't/pg_rewind_enc_copy_blocks.pl',
322323
't/pg_rewind_extrafiles.pl',
323324
't/pg_rewind_growing_files.pl',
324325
't/pg_rewind_keep_recycled_wals.pl',

t/RewindTest.pm

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,16 @@ shared_preload_libraries = 'pg_tde'
165165
"SELECT pg_tde_set_server_key_using_global_key_provider('global-db-principal-key', 'file-keyring-wal');"
166166
);
167167

168+
$node_primary->safe_psql('postgres',
169+
"SELECT pg_tde_add_database_key_provider_file('file-keyring','${tde_keyring_file}');"
170+
);
171+
$node_primary->safe_psql('postgres',
172+
"SELECT pg_tde_create_key_using_database_key_provider('test-db-key', 'file-keyring');"
173+
);
174+
$node_primary->safe_psql('postgres',
175+
"SELECT pg_tde_set_key_using_database_key_provider('test-db-key', 'file-keyring');"
176+
);
177+
168178
$node_primary->append_conf(
169179
'postgresql.conf', q{
170180
pg_tde.wal_encrypt = on

t/pg_rewind_enc_copy_blocks.pl

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
2+
# Copyright (c) 2021-2024, PostgreSQL Global Development Group
3+
4+
use strict;
5+
use warnings FATAL => 'all';
6+
use PostgreSQL::Test::Utils;
7+
use Test::More;
8+
9+
use FindBin;
10+
use lib $FindBin::RealBin;
11+
12+
use RewindTest;
13+
14+
sub run_test
15+
{
16+
my $test_mode = shift;
17+
my $extra_name = shift;
18+
my $extra_conf = shift;
19+
20+
my $cluster_name = $test_mode;
21+
22+
$cluster_name = $cluster_name . $extra_name if defined $extra_name;
23+
24+
RewindTest::setup_cluster($cluster_name, [], $extra_conf);
25+
RewindTest::start_primary();
26+
RewindTest::create_standby($cluster_name);
27+
28+
primary_psql(
29+
"CREATE TABLE tail_t (id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, f1 TEXT) USING tde_heap"
30+
);
31+
primary_psql(
32+
"INSERT INTO tail_t (f1) SELECT repeat('abcdeF', 1000) FROM generate_series(1, 1000)"
33+
);
34+
primary_psql(
35+
"CREATE TABLE block_t (id INTEGER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, f1 TEXT) USING tde_heap"
36+
);
37+
primary_psql(
38+
"INSERT INTO block_t (f1) SELECT repeat('abcdeF', 1000) FROM generate_series(1, 1000)"
39+
);
40+
primary_psql("CHECKPOINT");
41+
42+
RewindTest::promote_standby();
43+
44+
# Makes pg_rewind to copy some blocks of the relation
45+
# (mixing data encrypted with different keys on the target).
46+
primary_psql("UPDATE block_t SET f1='YYYYYYY' WHERE id % 10 = 0;");
47+
48+
# Insert some data making rewind to copy the tail of this relation
49+
# (mixing data encrypted with different keys on the target).
50+
standby_psql(
51+
"INSERT INTO tail_t (f1) SELECT repeat('ghijk', 100) FROM generate_series(1, 1000)"
52+
);
53+
standby_psql("CHECKPOINT");
54+
55+
56+
RewindTest::run_pg_rewind($test_mode);
57+
58+
check_query(
59+
'SELECT count(*) FROM tail_t',
60+
qq(2000
61+
),
62+
'tail-copy');
63+
64+
check_query(
65+
'SELECT count(*) FROM block_t',
66+
qq(1000
67+
),
68+
'blocks-copy');
69+
70+
RewindTest::clean_rewind_test();
71+
return;
72+
}
73+
74+
# Run the test in both modes
75+
run_test('local');
76+
run_test('remote');
77+
run_test('archive');
78+
79+
my @conf_params = ("pg_tde.cipher = 'aes_256'");
80+
run_test('local', "_aes_256", \@conf_params);
81+
82+
done_testing();

0 commit comments

Comments
 (0)