Skip to content

Commit abedb46

Browse files
kafka init (#13)
1 parent e652514 commit abedb46

14 files changed

Lines changed: 411 additions & 17 deletions

File tree

.github/workflows/build_and_publish.yml

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ jobs:
5858
name: Testing
5959
url: https://api.test.profcomff.com/userdata
6060
env:
61-
CONTAINER_NAME: com_profcomff_api_userdata_test
61+
API_CONTAINER_NAME: com_profcomff_api_userdata_test
62+
WORKER_CONTAINER_NAME: com_profcomff_worker_userdata_test
6263
permissions:
6364
packages: read
6465

@@ -72,25 +73,43 @@ jobs:
7273
--rm \
7374
--network=web \
7475
--env DB_DSN=${{ secrets.DB_DSN }} \
75-
--name ${{ env.CONTAINER_NAME }}_migration \
76+
--name ${{ env.API_CONTAINER_NAME }}_migration \
7677
--workdir="/" \
7778
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:test \
7879
alembic upgrade head
7980
80-
- name: Run new version
81-
id: run_prod
81+
- name: Run new version API
82+
id: run_test_api
8283
run: |
83-
docker stop ${{ env.CONTAINER_NAME }} || true && docker rm ${{ env.CONTAINER_NAME }} || true
84+
docker stop ${{ env.API_CONTAINER_NAME }} || true && docker rm ${{ env.API_CONTAINER_NAME }} || true
8485
docker run \
8586
--detach \
8687
--restart always \
8788
--network=web \
8889
--env DB_DSN='${{ secrets.DB_DSN }}' \
8990
--env ROOT_PATH='/userdata' \
9091
--env GUNICORN_CMD_ARGS='--log-config logging_test.conf' \
91-
--name ${{ env.CONTAINER_NAME }} \
92+
--name ${{ env.API_CONTAINER_NAME }} \
9293
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:test
9394
95+
- name: Run new version worker
96+
id: run_test_worker
97+
run: |
98+
docker stop ${{ env.WORKER_CONTAINER_NAME }} || true && docker rm ${{ env.WORKER_CONTAINER_NAME }} || true
99+
docker run \
100+
--detach \
101+
--restart always \
102+
--network=web \
103+
--env DB_DSN='${{ secrets.DB_DSN }}' \
104+
--env KAFKA_DSN='${{ secrets.KAFKA_DSN }}' \
105+
--env KAFKA_LOGIN='${{ secrets.KAFKA_LOGIN }}' \
106+
--env KAFKA_PASSWORD='${{ secrets.KAFKA_PASSWORD }}' \
107+
--env KAFKA_GROUP_ID='${{ vars.KAFKA_GROUP_ID }}' \
108+
--env KAFKA_TOPICS='${{ vars.KAFKA_TOPICS }}' \
109+
--name ${{ env.WORKER_CONTAINER_NAME }} \
110+
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:test python -m userdata_api start --instance worker
111+
112+
94113
deploy-production:
95114
name: Deploy Production
96115
needs: build-and-push-image
@@ -100,7 +119,8 @@ jobs:
100119
name: Production
101120
url: https://api.profcomff.com/userdata
102121
env:
103-
CONTAINER_NAME: com_profcomff_api_userdata
122+
API_CONTAINER_NAME: com_profcomff_api_userdata
123+
WORKER_CONTAINER_NAME: com_profcomff_worker_userdata
104124
permissions:
105125
packages: read
106126

@@ -114,21 +134,38 @@ jobs:
114134
--rm \
115135
--network=web \
116136
--env DB_DSN=${{ secrets.DB_DSN }} \
117-
--name ${{ env.CONTAINER_NAME }}_migration \
137+
--name ${{ env.API_CONTAINER_NAME }}_migration \
118138
--workdir="/" \
119139
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest \
120140
alembic upgrade head
121141
122-
- name: Run new version
123-
id: run_test
142+
- name: Run new version API
143+
id: run_prod_api
124144
run: |
125-
docker stop ${{ env.CONTAINER_NAME }} || true && docker rm ${{ env.CONTAINER_NAME }} || true
145+
docker stop ${{ env.API_CONTAINER_NAME }} || true && docker rm ${{ env.API_CONTAINER_NAME }} || true
126146
docker run \
127147
--detach \
128148
--restart always \
129149
--network=web \
130150
--env DB_DSN='${{ secrets.DB_DSN }}' \
131151
--env ROOT_PATH='/userdata' \
132152
--env GUNICORN_CMD_ARGS='--log-config logging_prod.conf' \
133-
--name ${{ env.CONTAINER_NAME }} \
153+
--name ${{ env.API_CONTAINER_NAME }} \
134154
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
155+
156+
- name: Run new version worker
157+
id: run_prod_worker
158+
run: |
159+
docker stop ${{ env.WORKER_CONTAINER_NAME }} || true && docker rm ${{ env.WORKER_CONTAINER_NAME }} || true
160+
docker run \
161+
--detach \
162+
--restart always \
163+
--network=web \
164+
--env DB_DSN='${{ secrets.DB_DSN }}' \
165+
--env KAFKA_DSN='${{ secrets.KAFKA_DSN }}' \
166+
--env KAFKA_LOGIN='${{ secrets.KAFKA_LOGIN }}' \
167+
--env KAFKA_PASSWORD='${{ secrets.KAFKA_PASSWORD }}' \
168+
--env KAFKA_GROUP_ID='${{ vars.KAFKA_GROUP_ID }}' \
169+
--env KAFKA_TOPICS='${{ vars.KAFKA_TOPICS }}' \
170+
--name ${{ env.WORKER_CONTAINER_NAME }} \
171+
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest python -m userdata_api start --instance worker

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,19 @@
2121
```
2222
4. Запускайте приложение!
2323
```console
24-
foo@bar:~$ python -m userdata_api
24+
foo@bar:~$ python -m userdata_api start --instance api -- запустит АПИ
25+
foo@bar:~$ python -m userdata_api start --instance worker -- запустит Kafka worker
2526
```
27+
28+
Приложение состоит из двух частей - АПИ и Kafka worker'а.
29+
30+
АПИ нужно для управления структурой пользовательских данных -
31+
контроль над категориями данных, параметрами, источниками данных.
32+
Также, в АПИ пользовательские данные может слать
33+
сам пользователь(владелец этих данных), а также админ
34+
35+
Kafka worker нужен для того, чтобы разгребать поступающие от OAuth
36+
методов авторизации AuthAPI пользовательские данные
2637

2738
## ENV-file description
2839
- `DB_DSN=postgresql://postgres@localhost:5432/postgres` – Данные для подключения к БД

migrations/env.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from alembic import context
44
from sqlalchemy import engine_from_config, pool
55

6+
from settings import get_settings
67
from userdata_api.models.base import Base
7-
from userdata_api.settings import get_settings
88

99

1010
# this is the Alembic Config object, which provides

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ pydantic[dotenv]
99
SQLAlchemy
1010
uvicorn
1111
pydantic-settings
12+
event_schema_profcomff
13+
confluent_kafka
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ class Settings(BaseSettings):
99
"""Application settings"""
1010

1111
DB_DSN: PostgresDsn = 'postgresql://postgres@localhost:5432/postgres'
12+
13+
KAFKA_DSN: str | None = None
14+
KAFKA_LOGIN: str | None = None
15+
KAFKA_PASSWORD: str | None = None
16+
KAFKA_TOPICS: list[str] | None = None
17+
KAFKA_GROUP_ID: str | None = None
18+
1219
ROOT_PATH: str = '/' + os.getenv("APP_NAME", "")
1320

1421
CORS_ALLOW_ORIGINS: list[str] = ['*']

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
from sqlalchemy import create_engine
44
from sqlalchemy.orm import sessionmaker
55

6+
from settings import get_settings
67
from userdata_api.models.db import *
78
from userdata_api.routes.base import app
8-
from userdata_api.settings import get_settings
99
from userdata_api.utils.utils import random_string
1010

1111

tests/test_worker/__init__.py

Whitespace-only changes.

tests/test_worker/test_worker.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import pytest
2+
import sqlalchemy.exc
3+
from event_schema.auth import UserLogin
4+
5+
from userdata_api.models.db import Category, Info, Param, Source
6+
from userdata_api.utils.utils import random_string
7+
from worker.user import patch_user_info
8+
9+
10+
@pytest.fixture()
11+
def category(dbsession):
12+
name = f"test{random_string()}"
13+
dbsession.add(
14+
_cat := Category(
15+
name=name, read_scope=f"testscope.{random_string()}", update_scope=f"testscope.{random_string()}"
16+
)
17+
)
18+
dbsession.commit()
19+
yield _cat
20+
dbsession.delete(_cat)
21+
dbsession.commit()
22+
23+
24+
@pytest.fixture()
25+
def param(dbsession, category):
26+
time_ = f"test{random_string()}"
27+
dbsession.add(
28+
_par := Param(name=f"test{time_}", category_id=category.id, type="last", changeable=True, is_required=True)
29+
)
30+
dbsession.commit()
31+
yield _par
32+
dbsession.delete(_par)
33+
dbsession.commit()
34+
35+
36+
@pytest.fixture()
37+
def source(dbsession):
38+
time_ = f"test{random_string()}"
39+
__source = Source(name=f"test{time_}", trust_level=8)
40+
dbsession.add(__source)
41+
dbsession.commit()
42+
yield __source
43+
dbsession.delete(__source)
44+
dbsession.commit()
45+
46+
47+
@pytest.fixture()
48+
def info(param, source, dbsession):
49+
time_ = f"test{random_string()}"
50+
__info = Info(value=f"test{time_}", source_id=source.id, param_id=param.id, owner_id=1)
51+
dbsession.add(__info)
52+
dbsession.commit()
53+
yield __info
54+
try:
55+
dbsession.delete(__info)
56+
dbsession.commit()
57+
except sqlalchemy.exc.Any:
58+
pass
59+
60+
61+
def test_create(param, source, dbsession):
62+
with pytest.raises(sqlalchemy.exc.NoResultFound):
63+
dbsession.query(Info).filter(Info.param_id == param.id, Info.source_id == source.id, Info.value == "test").one()
64+
patch_user_info(
65+
UserLogin.model_validate(
66+
{"items": [{"category": param.category.name, "param": param.name, "value": "test"}], "source": source.name}
67+
),
68+
1,
69+
session=dbsession,
70+
)
71+
info = (
72+
dbsession.query(Info).filter(Info.param_id == param.id, Info.source_id == source.id, Info.value == "test").one()
73+
)
74+
assert info
75+
dbsession.delete(info)
76+
dbsession.commit()
77+
78+
79+
def test_update(info, dbsession):
80+
assert info.value != "updated"
81+
patch_user_info(
82+
UserLogin.model_validate(
83+
{
84+
"items": [{"category": info.category.name, "param": info.param.name, "value": "updated"}],
85+
"source": info.source.name,
86+
}
87+
),
88+
1,
89+
session=dbsession,
90+
)
91+
92+
dbsession.expire(info)
93+
assert info.value == "updated"
94+
95+
96+
def test_delete(info, dbsession):
97+
assert info.is_deleted is False
98+
patch_user_info(
99+
UserLogin.model_validate(
100+
{
101+
"items": [{"category": info.category.name, "param": info.param.name, "value": None}],
102+
"source": info.source.name,
103+
}
104+
),
105+
1,
106+
session=dbsession,
107+
)
108+
109+
dbsession.expire(info)
110+
assert info.is_deleted is True

userdata_api/__main__.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
1+
import argparse
2+
13
import uvicorn
24

35
from userdata_api.routes.base import app
6+
from worker.consumer import process
7+
8+
9+
def get_args():
10+
parser = argparse.ArgumentParser()
11+
subparsers = parser.add_subparsers(dest='command')
12+
13+
start = subparsers.add_parser("start")
14+
start.add_argument('--instance', type=str, required=True)
15+
16+
return parser.parse_args()
417

518

619
if __name__ == '__main__':
7-
uvicorn.run(app)
20+
args = get_args()
21+
match args.instance:
22+
case "api":
23+
uvicorn.run(app)
24+
case "worker":
25+
process()

userdata_api/routes/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
from fastapi.middleware.cors import CORSMiddleware
33
from fastapi_sqlalchemy import DBSessionMiddleware
44

5+
from settings import get_settings
56
from userdata_api import __version__
6-
from userdata_api.settings import get_settings
77

88
from .category import category
99
from .param import param

0 commit comments

Comments
 (0)