Skip to content

交互后端文档

交互后端可实现与 BROS-GUI 的实时交互,修改参数并获取实时反馈。

项目结构

plaintext
your_project/
├── interactive_backend.py  # 交互后端主程序
├── sensor/
│   └── camera.py  # 摄像头相关功能
└── ...  # 日志功能与其他

配置交互后端

修改和添加命令

  1. 定义新的命令处理函数

    • command_handlers 中添加新的命令映射。
    • 确保新的处理函数是异步函数 (async def) 并接受一个参数 data
    python
    async def new_command(data):
        # 新命令处理逻辑
        return "New command executed"
    
    command_handlers["new_command"] = new_command
  2. 更新命令调度函数

    • handle_command 函数会自动处理新添加的命令,不需要额外修改。

配置摄像头

确保摄像头配置正确,ThreadedCamera 类初始化时可根据需求调整参数:

python
camera = ThreadedCamera('BROS-camera', fps=10, interactive_debug=True)

启动服务器

运行程序以启动 WebSocket 服务器:

bash
python interactive_backend.py

注意

确保 BROS-GUI 的配置文件中的 WebSocket 地址与此处的服务器地址一致。

且目前不支持修改,必须为 ws://localhost:10502

提示

完成后,可以为后端的启动也写一个执行器,以便 BROS-GUI 可以直接启动后端。

主要功能细节

交互后端主要功能包括:

  1. 接收来自 BROS-GUI 的命令。
  2. 执行相应的命令并返回结果。
  3. 修改摄像头阈值并返回实时图像。

代码解释

导入必要的库和模块

python
import asyncio
import base64
import time
import cv2
import websockets
import json
from sensor.camera import ThreadedCamera, ProcessedFrame
from logger_br import logger

命令处理函数

定义具体的命令处理逻辑,以下是设置摄像头阈值的处理函数示例:

python
async def set_camera_threshold(data):
    threshold = data['threshold']
    logger.info(f"Setting camera threshold to {threshold}")
    camera.thresh = threshold
    time.sleep(camera.fps_sleep * 5)
    frame: ProcessedFrame = camera.processed_frame

    if frame is not None:
        _, img = cv2.imencode('.jpg', frame.frame)
        img_base64 = base64.b64encode(img.tobytes()).decode('utf-8')
        img_base64 = f"data:image/jpeg;base64,{img_base64}"
        return {"status": "success", "img": img_base64}

    return {"status": "error", "img": None}

命令到函数的映射

将命令字符串映射到相应的处理函数:

python
command_handlers = {
    "set_camera_threshold": set_camera_threshold,
    "another_command": another_command,
}

命令处理调度函数

根据接收到的命令调用相应的处理函数:

python
async def handle_command(data):
    command = data.get('cmd')
    if command in command_handlers:
        response = await command_handlers[command](data['data'])
        return response
    else:
        return "Unknown command"

WebSocket 服务器

定义 WebSocket 服务器,处理接收到的消息并返回处理结果:

python
async def server(websocket, path):
    async for message in websocket:
        data = json.loads(message)
        logger.debug(f"Received: {data}")
        response = await handle_command(data)
        await websocket.send(json.dumps({"cmd": data.get('cmd'), "data": response}))

主程序

初始化摄像头并启动 WebSocket 服务器:

python
if __name__ == "__main__":
    camera = ThreadedCamera('BROS-camera', fps=10, interactive_debug=True)

    start_server = websockets.serve(server, "localhost", 10502)

    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()