Проект представляет собой полностью автоматизированную систему обработки данных, разворачиваемую одной командой. Конфигурация проекта облегчена для запуска на локальной машине.
- Airflow: Оркестрация.
- Spark: Обработка данных.
- PostgreSQL и MySQL: Реляционные базы данных.
- Kafka и Zookeeper: Системы обмена сообщениями.
- Python: Генераторы данных в PG и Kafka.
- Docker: Контейнеризация сервисов.
- Проект рекомендуется запускать на устройстве с RAM>=16GB. Из-за большого количества сервисов без предварительных ограничений Docker может занять практически всю свободную оперативную память, сильно замедляя локаль вплоть до необходимости перезапуска.
- Время сборки проекта: 10-20 минут в зависимости от доступных ресурсов и пропускной способности сети.
- Время развертывания проекта: 5-10 минут в зависимости от доступных ресурсов.
Система состоит из 12 контейнеров:
- PostgreSQL
- MySQL
- Spark Master
- Spark Worker
- Генератор данных для PostgreSQL (pg_datagen)
- Генератор данных для Kafka (kafka_datagen)
- Kafka Init
- Kafka
- Zookeeper
- Airflow Init
- Airflow Scheduler
- Airflow Webserver
Все контейнеры основаны на открытых Docker Image. Они автоматически настраиваются и проверяют свою готовность с использованием Healthcheck. Система обеспечена скриптами для автоматической генерации данных, их репликации, стриминга и создания аналитических витрин.
- Создание баз данных, пользователей, таблиц и выдача прав реализованы как идемпотентные операции, что гарантирует корректное повторное выполнение без дублирования.
- Генерация данных в PostgreSQL также идемпотентна: каждый запуск проверяет наличие ранее созданных данных и не выполняет повторную генерацию в случае их наличия.
- Все подключения и конфигурации в Airflow создаются автоматически при инициализации сервиса
airflow-init, включая соединения, пользователи и базовые настройки.
Каждая нода графа - отдельный сервис.
graph TD
PG[postgresql]
MY[mysql]
PGDG[pg-datagen]
KDG[kafka-datagen]
AI[airflow-init]
AS[airflow-scheduler]
AW[airflow-webserver]
SM[spark-master]
SW[spark-worker]
ZK[zookeeper]
KF[kafka]
KI[kafka-init]
subgraph Databases
PG
MY
PG -->|service_healthy| PGDG
end
subgraph Kafka
ZK -->|service_healthy| KF
KF -->|service_healthy| KI
KI -->|service_completed_successfully| KDG
end
subgraph Spark
SM -->|service_healthy| SW
end
subgraph Airflow
PG -->|service_healthy| AI
MY -->|service_healthy| AI
SM -->|service_healthy| AI
KF -->|service_healthy| AI
PGDG -->|service_completed_successfully| AI
KI -->|service_completed_successfully| AI
AI -->|service_completed_successfully| AS
AI -->|service_completed_successfully| AW
end
Вся конфигурация проекта управляется через единый файл .env, который содержит параметры для всех сервисов, включая логины, пароли, порты и настройки генерации данных.
- URL:
jdbc:postgresql://localhost:5432/postgres_finals_db?currentSchema=postgres_finals_schema - Логин:
finals_user - Пароль:
finals_pass
- URL:
jdbc:mysql://localhost:3306/mysql_finals_db - Логин:
finals_user - Пароль:
finals_pass
- URL:
http://localhost:8080 - Логин:
admin - Пароль:
admin
- URL:
http://localhost:8081
- Bootstrap Servers:
localhost:9092 - Топик:
new_user_events
Параметры генерации для PostgreSQL по умолчанию:
- Количество пользователей: 5000
- Количество товаров: 500
- Количество заказов: 10000
- Детали заказов: 40000
- Категории товаров: 5000
Параметры генерации для Kafka по умолчанию:
- Интервал генерации событий: 3 секунды
- Топик Kafka:
new_user_events - События генерируются в формате JSON с полями:
first_name,last_name,email,phone,registration_date,loyalty_status.
В Airflow реализована репликация данных из PostgreSQL в MySQL. DAG выполняет следующие задачи:
- Извлечение данных из PostgreSQL.
- Трансформация данных через Spark.
- Сохранение данных в MySQL.
В Airflow реализована обработка данных из Kafka. DAG выполняет следующие задачи:
- Получение данных из топика Kafka.
- Обработка данных с использованием Spark.
- Сохранение данных в PostgreSQL.
Витрина для последующего анализа поведения пользователей - количества заказов и общей суммы затрат, разбитых по статусам заказов.
| Поле | Описание |
|---|---|
| user_id | Идентификатор пользователя |
| first_name | Имя пользователя |
| last_name | Фамилия пользователя |
| status | Статус заказа |
| order_count | Количество заказов |
| total_spent | Общая сумма затрат |
Витрина для последующего анализа продаж товаров с учетом количества проданных единиц и общей выручки, разбитых по статусам заказов. Может быть использована, например, для исследования популярности товаров, их влияния на общий доход, эффективности категорий товаров и их соответствия ожиданиям клиентов.
| Поле | Описание |
|---|---|
| product_id | Идентификатор товара |
| name | Название товара |
| status | Статус заказа |
| total_quantity_sold | Общее количество проданных единиц |
| total_sales | Общая выручка |
Витрина для последующего анализа средних чеков с разбивкой по статусу заказа и статусу лояльности. Может быть использована, например, для оценки эффективностм маркетинговых стратегий и программ лояльности, или для настройки акций и предложений для разных групп клиентов.
| Поле | Описание |
|---|---|
| status | Статус заказа |
| loyalty_status | Статус лояльности пользователя |
| average_check | Средний чек для группы заказов |
Для создания витрин используется Spark. Скрипты загружают данные из MySQL, выполняют агрегации и сохраняют результаты обратно в базу данных. Шаги:
- Загрузка исходных данных из базы данных MySQL.
- Выполнение трансформаций (объединения, группировки, агрегации, etc.)
- Сохранение результирующих витрин в базу данных MySQL.
- Склонировать проект:
git clone git@github.com:avshapoval/python_de_finals.git - Перейти в корневую директорию проекта:
cd python_de_finals - Запустить команду:
docker compose up --build [-d] - После сборки проекта и его развертывания будут доступны интерфейсы PostgreSQL, MySQL, Airflow, Kafka и Spark по указанным выше URL.
- Все что остается сделать вручную после окончания деплоя - включить (переевсти в
unpaused) DAG в UI Airflow. Т.к. используется всего один executor (SequentialExecutor), включать необходимо в следующей последовательности:- replicate_from_pg_to_mysql
- create_analytical_marts
- stream_from_kafka_to_pg
Проект организован следующим образом:
python_de_finals/
├── .env # Переменные окружения для настройки всех сервисов
├── docker-compose.yml # Конфигурация Docker Compose
├── infra/ # Инфраструктура контейнеров
│ ├── airflow/ # Конфигурация Airflow
│ │ ├── init/
│ │ ├── scheduler/
│ │ └── webserver/
│ ├── datagen/ # Конфигурация для генераторов данных
│ │ ├── pg_datagen/
│ │ └── kafka_datagen/
│ ├── db/ # Конфигурация СУБД
│ │ ├── mysql/
│ │ └── postgresql/
│ ├── messaging/ # Конфигурация ZK&Kafka
│ │ ├── kafka/
│ │ ├── kafka_init/
│ │ └── zookeeper/
│ └── spark/ # Конфигурация Spark
│ ├── spark-master/
│ └── spark-worker/
├── code/ # Исходный код
│ ├── airflow/ # DAG и скрипты для Airflow
│ │ ├── dags/
│ │ └── scripts/
│ │ ├── helpers/
│ │ └── pyspark_scripts/
│ ├── datagen/ # Генераторы данных
│ │ ├── pg_datagen/ # Скрипты для генерации данных в PG
│ │ └── kafka_datagen/ # Скрипты для генерации данных в Kafka