PushOccurrence — это высокопроизводительный сервис на Go, предназначенный для надежной доставки уведомлений о событиях из PostgreSQL в очередь сообщений RabbitMQ.
Сервис реализует архитектурный паттерн Transactional Outbox, что гарантирует "at-least-once" доставку сообщений и предотвращает потерю данных даже при сбоях в сети или падении брокера сообщений.
- Гарантированная доставка: Использование паттерна
Transactional Outboxдля атомарного сохранения данных и события в одной транзакции. - Высокая производительность: Мгновенная реакция на события в БД через механизм
LISTEN/NOTIFY. - Масштабируемость: Безопасный запуск нескольких экземпляров сервиса благодаря использованию блокировок
FOR UPDATE SKIP LOCKED, что предотвращает гонки данных (race conditions). - Отказоустойчивость:
- Автоматическое переподключение к PostgreSQL: При разрыве соединения с БД слушатель (
Listener) не падает, а пытается восстановить подключение с экспоненциальной задержкой (Exponential Backoff). - Fallback-механизм: При недоступности RabbitMQ сообщения сохраняются в MongoDB (Outbox-таблица) и отправляются позже фоновым процессом (Poller).
- Автоматическое переподключение к PostgreSQL: При разрыве соединения с БД слушатель (
- Подтверждение доставки: Используется механизм
Publisher Confirmsот RabbitMQ для уверенности в том, что сообщение принято брокером.
- Запись в БД: Внешний сервис (или триггер) сохраняет данные в основную таблицу и одновременно в таблицу
message_queue_logв рамках одной транзакции. - Уведомление: После коммита транзакции выполняется команда
NOTIFY, которая отправляетmessage_idв специальный канал PostgreSQL. - Слушатель (Listener): Сервис PushOccurrence, подписанный на канал, мгновенно получает
message_id. - Обработчик (Handler):
- Начинает транзакцию в БД.
- Блокирует строку с полученным
message_idс помощьюFOR UPDATE SKIP LOCKED. - Пытается отправить сообщение в RabbitMQ.
- Если RabbitMQ недоступен: Сохраняет сообщение в MongoDB для последующей отправки.
- Помечает запись в
message_queue_logкакtransferred = true. - Коммитит транзакцию.
- Poller: Отдельный фоновый процесс периодически опрашивает MongoDB на наличие неотправленных сообщений и пытается доставить их в RabbitMQ.
⚠️ Важное замечание по реализации Outbox В текущей архитектуре Outbox-таблица для fallback находится в MongoDB, а не в основной базе PostgreSQL. Это нарушает принцип атомарности паттерна, так как запись в Mongo не является частью основной транзакции. В идеальной реализации таблицаoutboxдолжна находиться в той же базе данных, что и основные бизнес-данные.
Конфигурация сервиса находится в файле config/config.json.
{
"postgres": {
"host": "localhost",
"port": 5433,
"database": "mydb",
"user": "postgres",
"password": "postgres",
"ssl_mode": "disable"
},
"listener": {
"channels": [
"queue_message_log"
]
},
"rabbitmq": {
"host": "localhost",
"port": 5672,
"user": "guest",
"password": "guest",
"queue": {
"name": "message_queue",
"durable": true,
"auto_delete": false,
"exclusive": false,
"no_wait": false
}
},
"mongo": {
"host": "localhost",
"port": 27017,
"database": "outbox"
}
}Рекомендация: Для production-среды следует выносить конфигурацию в переменные окружения, а не хранить ее в файле.
Сервис ожидает наличие схемы data_exchange и следующих таблиц в PostgreSQL.
CREATE SCHEMA IF NOT EXISTS data_exchange;
-- Таблица для отслеживания статуса сообщений
CREATE TABLE IF NOT EXISTS data_exchange.message_queue_log (
message_id UUID PRIMARY KEY,
transferred BOOLEAN DEFAULT FALSE,
transfer_time TIMESTAMP
);
-- Таблица с телом сообщения
CREATE TABLE IF NOT EXISTS data_exchange.message_queue_log_data (
message_id UUID REFERENCES data_exchange.message_queue_log(message_id),
message_body BYTEA
);- Запустите инфраструктуру:
docker-compose up -d
- Настройте
config/config.json. - Запустите приложение:
go run ./cmd/app/main.go
- Или соберите и запустите бинарный файл:
go build -o bin/PushOccurrence ./cmd/app/main.go ./bin/PushOccurrence
.
├── cmd/app/main.go # Точка входа в приложение
├── config/ # Загрузка конфигурации
├── internal/
│ ├── db/ # Логика работы с PostgreSQL (подключение, слушатель) и mongoDB (outbox)
│ ├── handlers/ # Обработка уведомлений
│ ├── mq/ # Логика работы с RabbitMQ
│ └── service/ # Инициализация и запуск сервиса
├── go.mod # Зависимости
├── docker-compose.yml # Файл для запуска инфраструктуры (PG, Rabbit, Mongo)
└── README.md # Этот файл
🧠 Детали реализации пакета `mq` (internal/mq)
Реализация пакета построена на взаимодействии нескольких горутин, управляемых через каналы.
Функция InitMq инициализирует структуру Mq и запускает горутины для управления соединением и повторной отправкой.
Логика разделена на мониторинг и восстановление:
- Monitor: Периодически проверяет статус соединения с RabbitMQ.
- ConnectManager: При получении сигнала о разрыве соединения, ожидает некоторое время и пытается восстановить его.
- Если соединение активно, сообщение публикуется в RabbitMQ с ожиданием подтверждения (
PublishWithDeferredConfirm). - Если соединение разорвано или получена ошибка, сообщение направляется в fallback-хранилище (MongoDB).
- При восстановлении соединения,
Pollerначинает отправлять накопленные сообщения.