Skip to content

TimurSharipovv/PushOccurrence

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

115 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

PushOccurrence 🚀

PushOccurrence — это высокопроизводительный сервис на Go, предназначенный для надежной доставки уведомлений о событиях из PostgreSQL в очередь сообщений RabbitMQ.

Сервис реализует архитектурный паттерн Transactional Outbox, что гарантирует "at-least-once" доставку сообщений и предотвращает потерю данных даже при сбоях в сети или падении брокера сообщений.

🛡️ Ключевые возможности

  • Гарантированная доставка: Использование паттерна Transactional Outbox для атомарного сохранения данных и события в одной транзакции.
  • Высокая производительность: Мгновенная реакция на события в БД через механизм LISTEN/NOTIFY.
  • Масштабируемость: Безопасный запуск нескольких экземпляров сервиса благодаря использованию блокировок FOR UPDATE SKIP LOCKED, что предотвращает гонки данных (race conditions).
  • Отказоустойчивость:
    • Автоматическое переподключение к PostgreSQL: При разрыве соединения с БД слушатель (Listener) не падает, а пытается восстановить подключение с экспоненциальной задержкой (Exponential Backoff).
    • Fallback-механизм: При недоступности RabbitMQ сообщения сохраняются в MongoDB (Outbox-таблица) и отправляются позже фоновым процессом (Poller).
  • Подтверждение доставки: Используется механизм Publisher Confirms от RabbitMQ для уверенности в том, что сообщение принято брокером.

🏗️ Архитектура и принцип работы

  1. Запись в БД: Внешний сервис (или триггер) сохраняет данные в основную таблицу и одновременно в таблицу message_queue_log в рамках одной транзакции.
  2. Уведомление: После коммита транзакции выполняется команда NOTIFY, которая отправляет message_id в специальный канал PostgreSQL.
  3. Слушатель (Listener): Сервис PushOccurrence, подписанный на канал, мгновенно получает message_id.
  4. Обработчик (Handler):
    • Начинает транзакцию в БД.
    • Блокирует строку с полученным message_id с помощью FOR UPDATE SKIP LOCKED.
    • Пытается отправить сообщение в RabbitMQ.
    • Если RabbitMQ недоступен: Сохраняет сообщение в MongoDB для последующей отправки.
    • Помечает запись в message_queue_log как transferred = true.
    • Коммитит транзакцию.
  5. Poller: Отдельный фоновый процесс периодически опрашивает MongoDB на наличие неотправленных сообщений и пытается доставить их в RabbitMQ.

⚠️ Важное замечание по реализации Outbox В текущей архитектуре Outbox-таблица для fallback находится в MongoDB, а не в основной базе PostgreSQL. Это нарушает принцип атомарности паттерна, так как запись в Mongo не является частью основной транзакции. В идеальной реализации таблица outbox должна находиться в той же базе данных, что и основные бизнес-данные.

⚙️ Конфигурация

Конфигурация сервиса находится в файле config/config.json.

{
  "postgres": {
    "host": "localhost",
    "port": 5433,
    "database": "mydb",
    "user": "postgres",
    "password": "postgres",
    "ssl_mode": "disable"
  },
  "listener": {
    "channels": [
      "queue_message_log"
    ]
  },
  "rabbitmq": {
    "host": "localhost",
    "port": 5672,
    "user": "guest",
    "password": "guest",
    "queue": {
      "name": "message_queue",
      "durable": true,
      "auto_delete": false,
      "exclusive": false,
      "no_wait": false
    }
  },
  "mongo": {
    "host": "localhost",
    "port": 27017,
    "database": "outbox"
  }
}

Рекомендация: Для production-среды следует выносить конфигурацию в переменные окружения, а не хранить ее в файле.

📦 Структура базы данных

Сервис ожидает наличие схемы data_exchange и следующих таблиц в PostgreSQL.

CREATE SCHEMA IF NOT EXISTS data_exchange;

-- Таблица для отслеживания статуса сообщений
CREATE TABLE IF NOT EXISTS data_exchange.message_queue_log (
    message_id UUID PRIMARY KEY,
    transferred BOOLEAN DEFAULT FALSE,
    transfer_time TIMESTAMP
);

-- Таблица с телом сообщения
CREATE TABLE IF NOT EXISTS data_exchange.message_queue_log_data (
    message_id UUID REFERENCES data_exchange.message_queue_log(message_id),
    message_body BYTEA
);

▶️ Запуск сервиса

  1. Запустите инфраструктуру:
    docker-compose up -d
  2. Настройте config/config.json.
  3. Запустите приложение:
    go run ./cmd/app/main.go
  4. Или соберите и запустите бинарный файл:
    go build -o bin/PushOccurrence ./cmd/app/main.go
    ./bin/PushOccurrence

📂 Структура проекта

.
├── cmd/app/main.go         # Точка входа в приложение
├── config/                 # Загрузка конфигурации
├── internal/
│   ├── db/                 # Логика работы с PostgreSQL (подключение, слушатель) и mongoDB (outbox)
│   ├── handlers/           # Обработка уведомлений
│   ├── mq/                 # Логика работы с RabbitMQ
│   └── service/            # Инициализация и запуск сервиса
├── go.mod                  # Зависимости
├── docker-compose.yml      # Файл для запуска инфраструктуры (PG, Rabbit, Mongo)
└── README.md               # Этот файл
🧠 Детали реализации пакета `mq` (internal/mq)

Реализация пакета построена на взаимодействии нескольких горутин, управляемых через каналы.

Структура и Инициализация

Функция InitMq инициализирует структуру Mq и запускает горутины для управления соединением и повторной отправкой.

Управление соединением

Логика разделена на мониторинг и восстановление:

  • Monitor: Периодически проверяет статус соединения с RabbitMQ.
  • ConnectManager: При получении сигнала о разрыве соединения, ожидает некоторое время и пытается восстановить его.

Обработка сообщений

  • Если соединение активно, сообщение публикуется в RabbitMQ с ожиданием подтверждения (PublishWithDeferredConfirm).
  • Если соединение разорвано или получена ошибка, сообщение направляется в fallback-хранилище (MongoDB).
  • При восстановлении соединения, Poller начинает отправлять накопленные сообщения.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors