-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
168 lines (127 loc) · 4.73 KB
/
app.py
File metadata and controls
168 lines (127 loc) · 4.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# -*- coding: utf-8 -*-
"""Flask 应用主入口 - 模块化结构"""
from flask import Flask, send_from_directory
from flask_socketio import SocketIO, emit, join_room
from flask_cors import CORS
import threading
import queue
import time
import logging
import os
# 获取项目根目录
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 数据库配置
DATABASE = os.path.join(BASE_DIR, 'database', 'ink.db')
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Flask 应用初始化
app = Flask(__name__,
static_folder=os.path.join(BASE_DIR, 'art-design-pro', 'dist', 'assets'),
static_url_path='/assets',
template_folder=os.path.join(BASE_DIR, 'art-design-pro', 'dist'))
app.config['SECRET_KEY'] = 'ink-secret-key-pingan'
CORS(app)
# SocketIO 初始化
socketio = SocketIO(app, cors_allowed_origins="*", async_mode='threading')
# 消息队列 - 用于 ROS2 与 Flask 之间通信
ros_to_flask_queue = queue.Queue()
flask_to_ros_queue = queue.Queue()
# 全局状态
robot_state = {
"connected": False,
"battery": 0,
"position": {"x": 0, "y": 0, "z": 0},
"status": "disconnected"
}
# ============== WebSocket 事件处理 ==============
@socketio.on('connect')
def handle_connect():
"""客户端连接事件"""
logger.info(f"Client connected: {request.sid}")
emit('connection_status', {'status': 'connected', 'sid': request.sid})
# 发送当前机器人状态
emit('robot_state', robot_state)
@socketio.on('disconnect')
def handle_disconnect():
"""客户端断开连接事件"""
logger.info(f"Client disconnected: {request.sid}")
@socketio.on('join_room')
def handle_join_room(data):
"""加入房间 - 用于分组管理"""
room = data.get('room', 'default')
join_room(room)
logger.info(f"Client {request.sid} joined room: {room}")
emit('room_joined', {'room': room})
@socketio.on('control_command')
def handle_control_command(data):
"""接收前端控制命令,转发给 ROS2"""
logger.info(f"Received control command: {data}")
# 将命令放入队列,由 ROS2 桥接线程处理
flask_to_ros_queue.put({
'type': 'control',
'data': data,
'timestamp': time.time()
})
# 确认命令已接收
emit('command_ack', {'status': 'received', 'command': data.get('command')})
@socketio.on('get_robot_state')
def handle_get_state():
"""前端请求机器人状态"""
emit('robot_state', robot_state)
@socketio.on('message')
def handle_message(data):
"""接收客户端消息并响应"""
text = data.get('text', '')
logger.info(f"Received message: {text}")
# 回显消息给客户端
emit('message_response', {
'text': f'收到: {text}',
'timestamp': time.time()
})
# ============== ROS2 消息推送 ==============
def ros_message_publisher():
"""后台线程:从队列读取 ROS2 消息并推送给前端"""
while True:
try:
# 非阻塞获取消息
msg = ros_to_flask_queue.get(timeout=1)
if msg['type'] == 'state_update':
# 更新全局状态
global robot_state
robot_state.update(msg['data'])
# 广播给所有客户端
socketio.emit('robot_state', robot_state)
logger.info(f"Published robot state: {robot_state}")
elif msg['type'] == 'sensor_data':
# 传感器数据推送
socketio.emit('sensor_data', msg['data'])
elif msg['type'] == 'log':
# 日志消息
socketio.emit('log_message', msg['data'])
except queue.Empty:
continue
except Exception as e:
logger.error(f"Error in ROS publisher: {e}")
# ============== 主页路由 ==============
@app.route('/')
def index():
"""Serve the homepage"""
dist_folder = os.path.join(BASE_DIR, 'art-design-pro', 'dist')
return send_from_directory(dist_folder, 'index.html')
# ============== 主程序入口 ==============
if __name__ == '__main__':
# 确保数据库目录存在
os.makedirs(os.path.dirname(DATABASE), exist_ok=True)
# 初始化数据库
from database import init_db
init_db()
# 注册路由
from routes.register import register_routes
register_routes(app)
# 启动 ROS 消息发布线程
ros_thread = threading.Thread(target=ros_message_publisher, daemon=True)
ros_thread.start()
logger.info("🚀 Flask-SocketIO Server Starting on http://localhost:5000")
# 启动服务器
socketio.run(app, host='0.0.0.0', port=5000, debug=False, allow_unsafe_werkzeug=True)