diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3f0a3ed7..ceeb8d44 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,12 +1,38 @@ name: regression -on: [push] +on: + push: + branches: + - master + pull_request: + branches: + - master + workflow_dispatch: jobs: - pg-10: + test: + name: PostgreSQL ${{ matrix.pg_version }} runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + include: + - pg_version: 10 + pg_tag: REL_10_20 + - pg_version: 11 + pg_tag: REL_11_15 + - pg_version: 12 + pg_tag: REL_12_10 + - pg_version: 13 + pg_tag: REL_13_6 + - pg_version: 14 + pg_tag: REL_14_2 + - pg_version: 15 + pg_tag: REL_15_0 + - pg_version: 16 + pg_tag: REL_16_1 env: - PG_TAG: REL_10_20 - PG_VERSION: 10 + PG_VERSION: ${{ matrix.pg_version }} + PG_TAG: ${{ matrix.pg_tag }} steps: - name: Checkout uses: actions/checkout@v3 diff --git a/Makefile b/Makefile index c9361952..2f4e0f93 100644 --- a/Makefile +++ b/Makefile @@ -32,7 +32,8 @@ PROGRAM := pg_backup # pg_backup sources OBJS := src/utils/configuration.o src/utils/json.o src/utils/logger.o \ - src/utils/parray.o src/utils/pgut.o src/utils/thread.o src/utils/remote.o src/utils/file.o + src/utils/parray.o src/utils/pgut.o src/utils/thread.o src/utils/remote.o src/utils/file.o \ + src/utils/direct_io.o OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \ src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \ src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/stream.o \ diff --git a/src/utils/direct_io.c b/src/utils/direct_io.c new file mode 100644 index 00000000..eee0b369 --- /dev/null +++ b/src/utils/direct_io.c @@ -0,0 +1,157 @@ +#define _GNU_SOURCE +#include "utils/direct_io.h" +#include +#include +#include +#include +#include + +#ifdef __APPLE__ +#define O_DIRECT 0 +#endif + +/* Sector size for O_DIRECT alignment. Usually 512 or 4096. 4096 is safe for both. */ +#define DIO_ALIGN 4096 + +void +dio_buffer_init(DirectIOBuffer *dio, size_t size) +{ + if (dio->initialized) + return; + + /* size must be aligned to DIO_ALIGN for O_DIRECT */ + dio->buffer_size = (size + (DIO_ALIGN - 1)) & ~(DIO_ALIGN - 1); + dio->cached_start = 0; + dio->cached_end = 0; + dio->fd = -1; + + if (posix_memalign((void **)&dio->buffer, DIO_ALIGN, dio->buffer_size) != 0) + { + elog(ERROR, "posix_memalign failed: %s", strerror(errno)); + } + + dio->initialized = true; +} + +int +dio_open_file(const char *path, int flags, mode_t mode) +{ + int fd; + int open_flags = flags; + +#ifndef __APPLE__ + open_flags |= O_DIRECT; +#endif + + fd = open(path, open_flags, mode); + +#ifdef __APPLE__ + if (fd != -1) + { + if (fcntl(fd, F_NOCACHE, 1) == -1) + { + /* Not fatal, but good to know */ + elog(WARNING, "fcntl(F_NOCACHE) failed for %s: %s", path, strerror(errno)); + } + } +#endif + + return fd; +} + +ssize_t +dio_buffer_read_block(DirectIOBuffer *dio, char *target, off_t offset, size_t size) +{ + off_t chunk_start; + ssize_t read_bytes; + + if (!dio->initialized) + elog(ERROR, "DirectIOBuffer not initialized"); + + if (dio->fd == -1) + elog(ERROR, "DirectIOBuffer file descriptor not set"); + + /* Check if the requested block is already in the cache */ + if (offset >= dio->cached_start && offset + size <= dio->cached_end) + { + memcpy(target, dio->buffer + (offset - dio->cached_start), size); + return size; + } + + /* Block not in cache, read new chunk */ + /* Align to DIO_ALIGN (page size / sector size) for O_DIRECT */ + dio->cached_start = (offset / DIO_ALIGN) * DIO_ALIGN; + chunk_start = dio->cached_start; + + /* + * Try reading with O_DIRECT (already set on fd if possible). + * If offset or size are not aligned, pread might return EINVAL. + */ + read_bytes = pread(dio->fd, dio->buffer, dio->buffer_size, chunk_start); + + /* Fallback if O_DIRECT failed due to alignment or other reasons */ + if (read_bytes < 0 && (errno == EINVAL || errno == EOPNOTSUPP)) + { + int flags = fcntl(dio->fd, F_GETFL); + if (flags != -1 && (flags & O_DIRECT)) + { + /* Temporary disable O_DIRECT for this read */ + if (fcntl(dio->fd, F_SETFL, flags & ~O_DIRECT) != -1) + { + read_bytes = pread(dio->fd, dio->buffer, dio->buffer_size, chunk_start); + /* Restore O_DIRECT flags */ + fcntl(dio->fd, F_SETFL, flags); + } + } + } + + if (read_bytes < 0) + return -1; + + dio->cached_end = chunk_start + read_bytes; + + /* If we read enough to cover the requested block */ + if (offset >= dio->cached_start && offset + size <= dio->cached_end) + { + memcpy(target, dio->buffer + (offset - dio->cached_start), size); + return size; + } + + /* We might have reached EOF and read less than requested */ + if (offset < dio->cached_end) + { + size_t available = dio->cached_end - offset; + size_t to_copy = (available < size) ? available : size; + memcpy(target, dio->buffer + (offset - dio->cached_start), to_copy); + + /* + * For PostgreSQL blocks (BLCKSZ), we often need the full block. + * If we didn't read enough, and it's not EOF, we might need to try harder. + * But dio_buffer_read_block is usually called for BLCKSZ which is 8KB, + * and buffer_size is 1MB, so we should have it. + * If we reached EOF, returning less than size is correct. + */ + return to_copy; + } + + return 0; /* EOF */ +} + +void +dio_buffer_cleanup(DirectIOBuffer *dio) +{ + if (!dio->initialized) + return; + + if (dio->buffer) + free(dio->buffer); + + if (dio->fd != -1) + { + close(dio->fd); + dio->fd = -1; + } + + dio->buffer = NULL; + dio->initialized = false; +} diff --git a/src/utils/direct_io.h b/src/utils/direct_io.h new file mode 100644 index 00000000..38407ae0 --- /dev/null +++ b/src/utils/direct_io.h @@ -0,0 +1,28 @@ +#ifndef DIRECT_IO_H +#define DIRECT_IO_H + +#include +#include "pg_probackup.h" + +typedef struct DirectIOBuffer { + char *buffer; /* aligned буфер */ + size_t buffer_size; /* размер буфера */ + off_t cached_start; /* начало кэшированных данных в файле */ + off_t cached_end; /* конец кэшированных данных */ + int fd; /* file descriptor */ + bool initialized; /* флаг инициализации */ +} DirectIOBuffer; + +/* Инициализация буфера */ +extern void dio_buffer_init(DirectIOBuffer *dio, size_t size); + +/* Чтение блока через Direct I/O буфер */ +extern ssize_t dio_buffer_read_block(DirectIOBuffer *dio, char *target, off_t offset, size_t size); + +/* Очистка ресурсов */ +extern void dio_buffer_cleanup(DirectIOBuffer *dio); + +/* Открытие файла с использованием O_DIRECT или аналогов */ +extern int dio_open_file(const char *path, int flags, mode_t mode); + +#endif /* DIRECT_IO_H */ diff --git a/src/utils/file.c b/src/utils/file.c index 9577940e..e3e69413 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -4,6 +4,7 @@ #include #include "pg_probackup.h" +#include "utils/direct_io.h" #include "file.h" #include "storage/checksum.h" @@ -2221,13 +2222,11 @@ fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, static void fio_send_pages_impl(int out, char* buf) { - FILE *in = NULL; + DirectIOBuffer dio = {0}; BlockNumber blknum = 0; - int current_pos = 0; BlockNumber n_blocks_read = 0; PageState page_st; char read_buffer[BLCKSZ+1]; - char in_buf[STDIO_BUFSIZE]; fio_header hdr; fio_send_request *req = (fio_send_request*) buf; char *from_fullpath = (char*) buf + sizeof(fio_send_request); @@ -2242,9 +2241,11 @@ fio_send_pages_impl(int out, char* buf) int32 cur_pos_out = 0; BackupPageHeader2 *headers = NULL; + dio_buffer_init(&dio, 1024 * 1024); /* 1MB buffer */ + /* open source file */ - in = fopen(from_fullpath, PG_BINARY_R); - if (!in) + dio.fd = dio_open_file(from_fullpath, O_RDONLY, 0); + if (dio.fd < 0) { hdr.cop = FIO_ERROR; @@ -2284,11 +2285,7 @@ fio_send_pages_impl(int out, char* buf) /* get first block */ iter = datapagemap_iterate(map); datapagemap_next(iter, &blknum); - - setvbuf(in, NULL, _IONBF, BUFSIZ); } - else - setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE); /* TODO: what is this barrier for? */ read_buffer[BLCKSZ] = 1; /* barrier */ @@ -2296,7 +2293,7 @@ fio_send_pages_impl(int out, char* buf) while (blknum < req->nblocks) { int rc = 0; - size_t read_len = 0; + ssize_t read_len = 0; int retry_attempts = PAGE_READ_ATTEMPTS; /* TODO: handle signals on the agent */ @@ -2307,23 +2304,15 @@ fio_send_pages_impl(int out, char* buf) for (;;) { /* - * Optimize stdio buffer usage, fseek only when current position - * does not match the position of requested block. + * Pre-read logic: if we are using pagemap, we might want to + * ensure that the buffer is filled with useful data. + * However, dio_buffer_read_block already handles caching. + * We can just call it. */ - if (current_pos != blknum*BLCKSZ) - { - current_pos = blknum*BLCKSZ; - if (fseek(in, current_pos, SEEK_SET) != 0) - elog(ERROR, "fseek to position %u is failed on remote file '%s': %s", - current_pos, from_fullpath, strerror(errno)); - } - - read_len = fread(read_buffer, 1, BLCKSZ, in); - - current_pos += read_len; + read_len = dio_buffer_read_block(&dio, read_buffer, (off_t)blknum * BLCKSZ, BLCKSZ); /* report error */ - if (ferror(in)) + if (read_len < 0) { hdr.cop = FIO_ERROR; hdr.arg = READ_FAILED; @@ -2353,7 +2342,7 @@ fio_send_pages_impl(int out, char* buf) break; } - if (feof(in)) + if (read_len < BLCKSZ) goto eof; // else /* readed less than BLKSZ bytes, retry */ @@ -2474,8 +2463,9 @@ fio_send_pages_impl(int out, char* buf) pg_free(iter); pg_free(errormsg); pg_free(headers); - if (in) - fclose(in); + if (dio.fd != -1) + close(dio.fd); + dio_buffer_cleanup(&dio); return; }