-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
109 lines (86 loc) · 2.3 KB
/
main.py
File metadata and controls
109 lines (86 loc) · 2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import yaml
from hashlib import sha256
from typing import Optional
from fastapi import FastAPI
from pydantic import BaseModel
class Source():
record = "record"
redirect = "redirect"
url = ""
class SourceProtocol():
auto = "automatic"
udp = "udp"
tcp = "tcp"
class PasswortManager:
def __init__(self, passwd):
self.passwd = passwd
def encrypt(self):
return "sha256:"+sha256(self.passwd.encode('utf-8')).hexdigest()
class Path(BaseModel):
source: str
sourceProtocol: str
sourceOnDemand: bool
sourceOnDemandStartTimeout: str
sourceOnDemandCloseAfter: str
sourceRedirect: str
disablePublisherOverride: bool
fallback: str
publishUser: str
publishPass: str
publishIps: list
readUser: str
readPass: str
readIps: list
runOnInit: str
runOnInitRestart: bool
runOnDemand: str
runOnDemandRestart: bool
runOnDemandStartTimeout: str
runOnDemandCloseAfter: str
runOnPublish: str
runOnPublishRestart: bool
runOnRead: str
runOnReadRestart: bool
class PathRequest(BaseModel):
name: str
path: Path
def read():
with open("rtsp-simple-server-test.yml", 'r') as stream:
try:
return yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
def write(data):
with open("rtsp-simple-server-test.yml", 'w') as stream:
try:
stream.write(yaml.safe_dump(data))
except yaml.YAMLError as exc:
print(exc)
def Convert(lst):
it = iter(lst)
res_dct = dict(zip(it, it))
return res_dct
app = FastAPI(
title="RTSP Server API",
description="This Service is to manage a RTSP Simple Server",
version="1",)
@app.get("/")
async def read_root():
return {"Hello": "World"}
@app.get("/config/server")
async def server_config():
return read()
@app.get("/config/paths")
async def server_paths():
return read()['paths']
@app.post("/config/paths")
async def server_path(PathRequest: PathRequest):
try:
config = read()
path = dict(PathRequest.path)
path['publishPass'] = PasswortManager(path['publishPass']).encrypt()
config['paths'][PathRequest.name] = path
write(config)
return {"success": True}
except Exception as exc:
return exc