交互后端文档
交互后端可实现与 BROS-GUI 的实时交互,修改参数并获取实时反馈。
项目结构
plaintext
your_project/
├── interactive_backend.py # 交互后端主程序
├── sensor/
│ └── camera.py # 摄像头相关功能
└── ... # 日志功能与其他
配置交互后端
修改和添加命令
定义新的命令处理函数:
- 在
command_handlers
中添加新的命令映射。 - 确保新的处理函数是异步函数 (
async def
) 并接受一个参数data
。
pythonasync def new_command(data): # 新命令处理逻辑 return "New command executed" command_handlers["new_command"] = new_command
- 在
更新命令调度函数:
handle_command
函数会自动处理新添加的命令,不需要额外修改。
配置摄像头
确保摄像头配置正确,ThreadedCamera
类初始化时可根据需求调整参数:
python
camera = ThreadedCamera('BROS-camera', fps=10, interactive_debug=True)
启动服务器
运行程序以启动 WebSocket 服务器:
bash
python interactive_backend.py
注意
确保 BROS-GUI 的配置文件中的 WebSocket 地址与此处的服务器地址一致。
且目前不支持修改,必须为 ws://localhost:10502
。
提示
完成后,可以为后端的启动也写一个执行器,以便 BROS-GUI 可以直接启动后端。
主要功能细节
交互后端主要功能包括:
- 接收来自 BROS-GUI 的命令。
- 执行相应的命令并返回结果。
- 修改摄像头阈值并返回实时图像。
代码解释
导入必要的库和模块
python
import asyncio
import base64
import time
import cv2
import websockets
import json
from sensor.camera import ThreadedCamera, ProcessedFrame
from logger_br import logger
命令处理函数
定义具体的命令处理逻辑,以下是设置摄像头阈值的处理函数示例:
python
async def set_camera_threshold(data):
threshold = data['threshold']
logger.info(f"Setting camera threshold to {threshold}")
camera.thresh = threshold
time.sleep(camera.fps_sleep * 5)
frame: ProcessedFrame = camera.processed_frame
if frame is not None:
_, img = cv2.imencode('.jpg', frame.frame)
img_base64 = base64.b64encode(img.tobytes()).decode('utf-8')
img_base64 = f"data:image/jpeg;base64,{img_base64}"
return {"status": "success", "img": img_base64}
return {"status": "error", "img": None}
命令到函数的映射
将命令字符串映射到相应的处理函数:
python
command_handlers = {
"set_camera_threshold": set_camera_threshold,
"another_command": another_command,
}
命令处理调度函数
根据接收到的命令调用相应的处理函数:
python
async def handle_command(data):
command = data.get('cmd')
if command in command_handlers:
response = await command_handlers[command](data['data'])
return response
else:
return "Unknown command"
WebSocket 服务器
定义 WebSocket 服务器,处理接收到的消息并返回处理结果:
python
async def server(websocket, path):
async for message in websocket:
data = json.loads(message)
logger.debug(f"Received: {data}")
response = await handle_command(data)
await websocket.send(json.dumps({"cmd": data.get('cmd'), "data": response}))
主程序
初始化摄像头并启动 WebSocket 服务器:
python
if __name__ == "__main__":
camera = ThreadedCamera('BROS-camera', fps=10, interactive_debug=True)
start_server = websockets.serve(server, "localhost", 10502)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()