Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ RWOBJS = \
$(FETOOLS)/pg_rewind/local_source.o \
$(FETOOLS)/pg_rewind/parsexlog.o \
$(FETOOLS)/pg_rewind/pg_rewind.o \
$(FETOOLS)/pg_rewind/tde_ops.o \
$(FETOOLS)/pg_rewind/timeline.o

RMGRDESCSOURCES = $(sort $(wildcard $(FETOOLS)/rmgrdesc/*desc*.c))
Expand Down
96 changes: 60 additions & 36 deletions fetools/pg18/pg_rewind/filemap.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,8 @@ action_to_str(file_action_t action)
return "CREATE";
case FILE_ACTION_REMOVE:
return "REMOVE";
case FILE_ACTION_ENSURE_TDE_KEY:
return "ENSURE_KEY";

default:
return "unknown";
Expand Down Expand Up @@ -572,9 +574,40 @@ isRelDataFile(const char *path)
{
RelFileLocator rlocator;
unsigned int segNo;
int nmatch;
bool matched;

matched = path_rlocator(path, &rlocator, &segNo);

/*
* path_rlocator() above can match files that have extra characters at the
* end. To eliminate such cases, cross-check that GetRelationPath creates
* the exact same filename, when passed the RelFileLocator information we
* extracted from the filename.
*/
if (matched)
{
char *check_path = datasegpath(rlocator, MAIN_FORKNUM, segNo);

if (strcmp(check_path, path) != 0)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you accidentally removed the comment explaining this part of the code.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is in inside path_rlocator where the path parsing is

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No?

	/*
	 * The sscanf tests above can match files that have extra characters at
	 * the end. To eliminate such cases, cross-check that GetRelationPath
	 * creates the exact same filename, when passed the RelFileLocator
	 * information we extracted from the filename.
	 */

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought you are talking about another comment. Yes, indeed, this one got disappeared somehow. I'll fix, thanks!

matched = false;

pfree(check_path);
}

return matched;
}

/*
* Sets rlocator and segNo based on the given path. Returns false if no match
* is found.
*
* Only concerned with files belonging to the main fork.
*/
bool
path_rlocator(const char *path, RelFileLocator *rlocator, unsigned int *segNo)
{
int nmatch;

/*----
* Relation data files can be in one of the following directories:
*
Expand All @@ -594,55 +627,38 @@ isRelDataFile(const char *path)
*
*----
*/
rlocator.spcOid = InvalidOid;
rlocator.dbOid = InvalidOid;
rlocator.relNumber = InvalidRelFileNumber;
segNo = 0;
matched = false;
rlocator->spcOid = InvalidOid;
rlocator->dbOid = InvalidOid;
rlocator->relNumber = InvalidRelFileNumber;
*segNo = 0;

nmatch = sscanf(path, "global/%u.%u", &rlocator.relNumber, &segNo);
nmatch = sscanf(path, "global/%u.%u", &rlocator->relNumber, segNo);
if (nmatch == 1 || nmatch == 2)
{
rlocator.spcOid = GLOBALTABLESPACE_OID;
rlocator.dbOid = 0;
matched = true;
rlocator->spcOid = GLOBALTABLESPACE_OID;
rlocator->dbOid = 0;
return true;
}
else
{
nmatch = sscanf(path, "base/%u/%u.%u",
&rlocator.dbOid, &rlocator.relNumber, &segNo);
&rlocator->dbOid, &rlocator->relNumber, segNo);
if (nmatch == 2 || nmatch == 3)
{
rlocator.spcOid = DEFAULTTABLESPACE_OID;
matched = true;
rlocator->spcOid = DEFAULTTABLESPACE_OID;
return true;
}
else
{
nmatch = sscanf(path, "pg_tblspc/%u/" TABLESPACE_VERSION_DIRECTORY "/%u/%u.%u",
&rlocator.spcOid, &rlocator.dbOid, &rlocator.relNumber,
&segNo);
&rlocator->spcOid, &rlocator->dbOid, &rlocator->relNumber,
segNo);
if (nmatch == 3 || nmatch == 4)
matched = true;
return true;
}
}

/*
* The sscanf tests above can match files that have extra characters at
* the end. To eliminate such cases, cross-check that GetRelationPath
* creates the exact same filename, when passed the RelFileLocator
* information we extracted from the filename.
*/
if (matched)
{
char *check_path = datasegpath(rlocator, MAIN_FORKNUM, segNo);

if (strcmp(check_path, path) != 0)
matched = false;

pfree(check_path);
}

return matched;
return false;
}

/*
Expand Down Expand Up @@ -712,6 +728,13 @@ decide_file_action(file_entry_t *entry)
if (strstr(path, ".DS_Store") != NULL)
return FILE_ACTION_NONE;

/*
* Skip pg_tde key data. This is handled separately by combining the
* source and target keys when processing relation files.
*/
if (strstr(path, "pg_tde/") != NULL)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Is it in the righyt commit? If so it is a bit confusing the comment did not change.

return FILE_ACTION_NONE;

/*
* Remove all files matching the exclusion filters in the target.
*/
Expand Down Expand Up @@ -754,7 +777,7 @@ decide_file_action(file_entry_t *entry)
if (keepwal_entry_exists(path))
{
pg_log_debug("Not removing file \"%s\" because it is required for recovery", path);
return FILE_ACTION_NONE;
return FILE_ACTION_ENSURE_WAL_SEG;
}
return FILE_ACTION_REMOVE;
}
Expand Down Expand Up @@ -831,14 +854,15 @@ decide_file_action(file_entry_t *entry)
* in the target will be copied based on parsing the target
* system's WAL, and any blocks modified in the source will be
* updated after rewinding, when the source system's WAL is
* replayed.
* replayed. But we still have to sync source/target keys in
* case it is encrypted.
*/
if (entry->target_size < entry->source_size)
return FILE_ACTION_COPY_TAIL;
else if (entry->target_size > entry->source_size)
return FILE_ACTION_TRUNCATE;
else
return FILE_ACTION_NONE;
return FILE_ACTION_ENSURE_TDE_KEY;
}
break;

Expand Down
6 changes: 6 additions & 0 deletions fetools/pg18/pg_rewind/filemap.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ typedef enum
* blocks based on the parsed WAL) */
FILE_ACTION_TRUNCATE, /* truncate local file to 'newsize' bytes */
FILE_ACTION_REMOVE, /* remove local file / directory / symlink */
FILE_ACTION_ENSURE_TDE_KEY, /* data file with no action, but we to check
* if it is encrypted and sync source/target
* keys */
FILE_ACTION_ENSURE_WAL_SEG /* kept WAL segment might need reencryption */
} file_action_t;

typedef enum
Expand Down Expand Up @@ -113,4 +117,6 @@ extern void print_filemap(filemap_t *filemap);
extern void keepwal_init(void);
extern void keepwal_add_entry(const char *path);

extern bool path_rlocator(const char *path, RelFileLocator *rlocator, unsigned int *segNo);

#endif /* FILEMAP_H */
87 changes: 86 additions & 1 deletion fetools/pg18/pg_rewind/libpq_source.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "pg_rewind.h"
#include "port/pg_bswap.h"
#include "rewind_source.h"
#include "tde_ops.h"

#include "pg_tde.h"

/*
* Files are fetched MAX_CHUNK_SIZE bytes at a time, and with a
Expand All @@ -31,6 +34,7 @@ typedef struct
const char *path; /* path relative to data directory root */
off_t offset;
size_t length;
bool encrypt;
} fetch_range_request;

typedef struct
Expand Down Expand Up @@ -71,6 +75,10 @@ static char *libpq_fetch_file(rewind_source *source, const char *path,
static XLogRecPtr libpq_get_current_wal_insert_lsn(rewind_source *source);
static void libpq_destroy(rewind_source *source);

static void libpq_queue_process_fetch_range(rewind_source *source, const char *path,
bool needs_encrypt, off_t off, size_t len);
static void libpq_fetch_tde_keys(rewind_source *source);

/*
* Create a new libpq source.
*
Expand Down Expand Up @@ -100,6 +108,8 @@ init_libpq_source(PGconn *conn)
initStringInfo(&src->offsets);
initStringInfo(&src->lengths);

libpq_fetch_tde_keys(&src->common);

return &src->common;
}

Expand Down Expand Up @@ -345,7 +355,7 @@ libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
* fetch-requests are for a whole file.
*/
open_target_file(path, true);
libpq_queue_fetch_range(source, path, 0, Max(len, MAX_CHUNK_SIZE));
libpq_queue_process_fetch_range(source, path, false, 0, Max(len, MAX_CHUNK_SIZE));
}

/*
Expand All @@ -354,6 +364,17 @@ libpq_queue_fetch_file(rewind_source *source, const char *path, size_t len)
static void
libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have this wrapper instead of just changing all callsites to pass either true or false?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what rewind_source interface demands. For the "local source", queue_fetch_range and queue_fetch_range are actually two different functions. I'm not eager to change this interface tbh, as it might bring additional pain with upstream changes.

size_t len)
{
libpq_queue_process_fetch_range(source, path, true, off, len);
}

/*
* A workhorse for libpq_queue_fetch_range.
* `needs_encrypt` indicates if file's blocks may need re-encryption.
*/
static void
libpq_queue_process_fetch_range(rewind_source *source, const char *path,
bool needs_encrypt, off_t off, size_t len)
{
libpq_source *src = (libpq_source *) source;

Expand Down Expand Up @@ -406,6 +427,7 @@ libpq_queue_fetch_range(rewind_source *source, const char *path, off_t off,
src->request_queue[src->num_requests].path = path;
src->request_queue[src->num_requests].offset = off;
src->request_queue[src->num_requests].length = thislen;
src->request_queue[src->num_requests].encrypt = needs_encrypt;
src->num_requests++;

off += thislen;
Expand All @@ -420,6 +442,7 @@ static void
libpq_finish_fetch(rewind_source *source)
{
process_queued_fetch_requests((libpq_source *) source);
flush_current_tde_rel_key();
}

static void
Expand Down Expand Up @@ -592,6 +615,19 @@ process_queued_fetch_requests(libpq_source *src)

open_target_file(filename, false);

if (rq->encrypt)
{
Assert(chunksize % BLCKSZ == 0);

ensure_tde_keys(filename);

for (int i = 0; i < chunksize / BLCKSZ; i++)
{
unsigned char *data = (unsigned char *) chunk + BLCKSZ * i;

tde_reencrypt_block(data, chunkoff + BLCKSZ * i, MAIN_FORKNUM);
}
}
write_target_range(chunk, chunkoff, chunksize);
}

Expand Down Expand Up @@ -682,3 +718,52 @@ libpq_destroy(rewind_source *source)

/* NOTE: we don't close the connection here, as it was not opened by us. */
}

static void
libpq_fetch_tde_keys(rewind_source *source)
{
PGconn *conn = ((libpq_source *) source)->conn;
PGresult *res;

res = PQexec(conn, "SELECT pg_ls_dir('" PG_TDE_DATA_DIR "', true, false)");

if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not fetch file list: %s",
PQresultErrorMessage(res));

/* no tde dir, nothing to do */
if (PQntuples(res) == 0)
{
PQclear(res);
return;
}

init_tde();

for (int i = 0; i < PQntuples(res); i++)
{
char *path;
char *tde_file_buf;
size_t size;
char target_path[MAXPGPATH];

if (PQgetisnull(res, i, 0))
{
/*
* The file was removed from the server while the query was
* running. Ignore it.
*/
continue;
}

path = PQgetvalue(res, i, 0);

snprintf(target_path, MAXPGPATH, "%s/%s", PG_TDE_DATA_DIR, path);
tde_file_buf = libpq_fetch_file(source, target_path, &size);

write_tmp_source_file(path, tde_file_buf, size);
pg_free(tde_file_buf);
}

PQclear(res);
}
Loading
Loading