Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 30 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
name: regression
on: [push]
on:
push:
branches:
- master
pull_request:
branches:
- master
workflow_dispatch:

jobs:
pg-10:
test:
name: PostgreSQL ${{ matrix.pg_version }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
- pg_version: 10
pg_tag: REL_10_20
- pg_version: 11
pg_tag: REL_11_15
- pg_version: 12
pg_tag: REL_12_10
- pg_version: 13
pg_tag: REL_13_6
- pg_version: 14
pg_tag: REL_14_2
- pg_version: 15
pg_tag: REL_15_0
- pg_version: 16
pg_tag: REL_16_1
env:
PG_TAG: REL_10_20
PG_VERSION: 10
PG_VERSION: ${{ matrix.pg_version }}
PG_TAG: ${{ matrix.pg_tag }}
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ PROGRAM := pg_backup

# pg_backup sources
OBJS := src/utils/configuration.o src/utils/json.o src/utils/logger.o \
src/utils/parray.o src/utils/pgut.o src/utils/thread.o src/utils/remote.o src/utils/file.o
src/utils/parray.o src/utils/pgut.o src/utils/thread.o src/utils/remote.o src/utils/file.o \
src/utils/direct_io.o
OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \
src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \
src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/stream.o \
Expand Down
157 changes: 157 additions & 0 deletions src/utils/direct_io.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#define _GNU_SOURCE
#include "utils/direct_io.h"
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#ifdef __APPLE__
#define O_DIRECT 0
#endif

/* Sector size for O_DIRECT alignment. Usually 512 or 4096. 4096 is safe for both. */
#define DIO_ALIGN 4096

void
dio_buffer_init(DirectIOBuffer *dio, size_t size)
{
if (dio->initialized)
return;

/* size must be aligned to DIO_ALIGN for O_DIRECT */
dio->buffer_size = (size + (DIO_ALIGN - 1)) & ~(DIO_ALIGN - 1);
dio->cached_start = 0;
dio->cached_end = 0;
dio->fd = -1;

if (posix_memalign((void **)&dio->buffer, DIO_ALIGN, dio->buffer_size) != 0)
{
elog(ERROR, "posix_memalign failed: %s", strerror(errno));
}

dio->initialized = true;
}

int
dio_open_file(const char *path, int flags, mode_t mode)
{
int fd;
int open_flags = flags;

#ifndef __APPLE__
open_flags |= O_DIRECT;
#endif

fd = open(path, open_flags, mode);

#ifdef __APPLE__
if (fd != -1)
{
if (fcntl(fd, F_NOCACHE, 1) == -1)
{
/* Not fatal, but good to know */
elog(WARNING, "fcntl(F_NOCACHE) failed for %s: %s", path, strerror(errno));
}
}
#endif

return fd;
}

ssize_t
dio_buffer_read_block(DirectIOBuffer *dio, char *target, off_t offset, size_t size)
{
off_t chunk_start;
ssize_t read_bytes;

if (!dio->initialized)
elog(ERROR, "DirectIOBuffer not initialized");

if (dio->fd == -1)
elog(ERROR, "DirectIOBuffer file descriptor not set");

/* Check if the requested block is already in the cache */
if (offset >= dio->cached_start && offset + size <= dio->cached_end)
{
memcpy(target, dio->buffer + (offset - dio->cached_start), size);
return size;
}

/* Block not in cache, read new chunk */
/* Align to DIO_ALIGN (page size / sector size) for O_DIRECT */
dio->cached_start = (offset / DIO_ALIGN) * DIO_ALIGN;
chunk_start = dio->cached_start;

/*
* Try reading with O_DIRECT (already set on fd if possible).
* If offset or size are not aligned, pread might return EINVAL.
*/
read_bytes = pread(dio->fd, dio->buffer, dio->buffer_size, chunk_start);

/* Fallback if O_DIRECT failed due to alignment or other reasons */
if (read_bytes < 0 && (errno == EINVAL || errno == EOPNOTSUPP))
{
int flags = fcntl(dio->fd, F_GETFL);
if (flags != -1 && (flags & O_DIRECT))
{
/* Temporary disable O_DIRECT for this read */
if (fcntl(dio->fd, F_SETFL, flags & ~O_DIRECT) != -1)
{
read_bytes = pread(dio->fd, dio->buffer, dio->buffer_size, chunk_start);
/* Restore O_DIRECT flags */
fcntl(dio->fd, F_SETFL, flags);
}
}
}

if (read_bytes < 0)
return -1;

dio->cached_end = chunk_start + read_bytes;

/* If we read enough to cover the requested block */
if (offset >= dio->cached_start && offset + size <= dio->cached_end)
{
memcpy(target, dio->buffer + (offset - dio->cached_start), size);
return size;
}

/* We might have reached EOF and read less than requested */
if (offset < dio->cached_end)
{
size_t available = dio->cached_end - offset;
size_t to_copy = (available < size) ? available : size;
memcpy(target, dio->buffer + (offset - dio->cached_start), to_copy);

/*
* For PostgreSQL blocks (BLCKSZ), we often need the full block.
* If we didn't read enough, and it's not EOF, we might need to try harder.
* But dio_buffer_read_block is usually called for BLCKSZ which is 8KB,
* and buffer_size is 1MB, so we should have it.
* If we reached EOF, returning less than size is correct.
*/
return to_copy;
}

return 0; /* EOF */
}

void
dio_buffer_cleanup(DirectIOBuffer *dio)
{
if (!dio->initialized)
return;

if (dio->buffer)
free(dio->buffer);

if (dio->fd != -1)
{
close(dio->fd);
dio->fd = -1;
}

dio->buffer = NULL;
dio->initialized = false;
}
28 changes: 28 additions & 0 deletions src/utils/direct_io.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#ifndef DIRECT_IO_H
#define DIRECT_IO_H

#include <sys/types.h>
#include "pg_probackup.h"

typedef struct DirectIOBuffer {
char *buffer; /* aligned буфер */
size_t buffer_size; /* размер буфера */
off_t cached_start; /* начало кэшированных данных в файле */
off_t cached_end; /* конец кэшированных данных */
int fd; /* file descriptor */
bool initialized; /* флаг инициализации */
} DirectIOBuffer;

/* Инициализация буфера */
extern void dio_buffer_init(DirectIOBuffer *dio, size_t size);

/* Чтение блока через Direct I/O буфер */
extern ssize_t dio_buffer_read_block(DirectIOBuffer *dio, char *target, off_t offset, size_t size);

/* Очистка ресурсов */
extern void dio_buffer_cleanup(DirectIOBuffer *dio);

/* Открытие файла с использованием O_DIRECT или аналогов */
extern int dio_open_file(const char *path, int flags, mode_t mode);

#endif /* DIRECT_IO_H */
44 changes: 17 additions & 27 deletions src/utils/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <signal.h>

#include "pg_probackup.h"
#include "utils/direct_io.h"

#include "file.h"
#include "storage/checksum.h"
Expand Down Expand Up @@ -2221,13 +2222,11 @@ fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
static void
fio_send_pages_impl(int out, char* buf)
{
FILE *in = NULL;
DirectIOBuffer dio = {0};
BlockNumber blknum = 0;
int current_pos = 0;
BlockNumber n_blocks_read = 0;
PageState page_st;
char read_buffer[BLCKSZ+1];
char in_buf[STDIO_BUFSIZE];
fio_header hdr;
fio_send_request *req = (fio_send_request*) buf;
char *from_fullpath = (char*) buf + sizeof(fio_send_request);
Expand All @@ -2242,9 +2241,11 @@ fio_send_pages_impl(int out, char* buf)
int32 cur_pos_out = 0;
BackupPageHeader2 *headers = NULL;

dio_buffer_init(&dio, 1024 * 1024); /* 1MB buffer */

/* open source file */
in = fopen(from_fullpath, PG_BINARY_R);
if (!in)
dio.fd = dio_open_file(from_fullpath, O_RDONLY, 0);
if (dio.fd < 0)
{
hdr.cop = FIO_ERROR;

Expand Down Expand Up @@ -2284,19 +2285,15 @@ fio_send_pages_impl(int out, char* buf)
/* get first block */
iter = datapagemap_iterate(map);
datapagemap_next(iter, &blknum);

setvbuf(in, NULL, _IONBF, BUFSIZ);
}
else
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);

/* TODO: what is this barrier for? */
read_buffer[BLCKSZ] = 1; /* barrier */

while (blknum < req->nblocks)
{
int rc = 0;
size_t read_len = 0;
ssize_t read_len = 0;
int retry_attempts = PAGE_READ_ATTEMPTS;

/* TODO: handle signals on the agent */
Expand All @@ -2307,23 +2304,15 @@ fio_send_pages_impl(int out, char* buf)
for (;;)
{
/*
* Optimize stdio buffer usage, fseek only when current position
* does not match the position of requested block.
* Pre-read logic: if we are using pagemap, we might want to
* ensure that the buffer is filled with useful data.
* However, dio_buffer_read_block already handles caching.
* We can just call it.
*/
if (current_pos != blknum*BLCKSZ)
{
current_pos = blknum*BLCKSZ;
if (fseek(in, current_pos, SEEK_SET) != 0)
elog(ERROR, "fseek to position %u is failed on remote file '%s': %s",
current_pos, from_fullpath, strerror(errno));
}

read_len = fread(read_buffer, 1, BLCKSZ, in);

current_pos += read_len;
read_len = dio_buffer_read_block(&dio, read_buffer, (off_t)blknum * BLCKSZ, BLCKSZ);

/* report error */
if (ferror(in))
if (read_len < 0)
{
hdr.cop = FIO_ERROR;
hdr.arg = READ_FAILED;
Expand Down Expand Up @@ -2353,7 +2342,7 @@ fio_send_pages_impl(int out, char* buf)
break;
}

if (feof(in))
if (read_len < BLCKSZ)
goto eof;
// else /* readed less than BLKSZ bytes, retry */

Expand Down Expand Up @@ -2474,8 +2463,9 @@ fio_send_pages_impl(int out, char* buf)
pg_free(iter);
pg_free(errormsg);
pg_free(headers);
if (in)
fclose(in);
if (dio.fd != -1)
close(dio.fd);
dio_buffer_cleanup(&dio);
return;
}

Expand Down