第 5 章 通信协议:main/protocols/
922 行代码、3 个 .cc 文件、2 套传输方式(WebSocket 单连接 / MQTT 控制信令 + UDP 媒体流),都实现同一个
Protocol抽象基类。本章把基类、两种实现、帧格式、加密细节、握手流程全部拆开。
5.1 通信协议鸟瞰
┌──────────────────────┐
│ Protocol (基类) │
│ 纯虚 Start/Send 等 │
│ 注册回调: │
│ on_incoming_audio │
│ on_incoming_json │
│ on_audio_channel_* │
│ on_connected │
│ on_network_error │
└──────────┬───────────┘
│ 多态
┌────────────┴────────────┐
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ WebsocketProt│ │ MqttProtocol │
│ │ │ │
│ TLS WS 单连接 │ │ MQTT (TLS) ─┐ │
│ 二进制 + 文本 │ │ │ │
│ 帧头 BP2/BP3 │ │ │ ◄─ 控制信令 (JSON)
└──────────────┘ │ UDP (AES-CTR)
│ │ │
│ │ ◄─ 媒体流 (Opus + 帧头)
└─────────────┴────┘
两种协议对比表:
| 维度 | WebSocket | MQTT + UDP |
|---|---|---|
| 连接数 | 1 条 WSS(TLS) | 1 条 MQTTS(TLS)+ 1 条 UDP(应用层加密) |
| 控制信令 | WSS 文本帧 | MQTT publish/subscribe |
| 音频数据 | WSS 二进制帧 + 帧头 | UDP + AES-128-CTR + 自定义帧头 |
| 加密 | TLS(端到端) | MQTTS(控制信令)+ AES-CTR(媒体流) |
| 延迟 | 较高(TCP/TLS 重传 + 头部) | UDP 媒体流低延迟 |
| 抗丢包 | TCP 重传,可能阻塞 | UDP 容忍丢包(每帧带序号) |
| 双向能力 | 全双工,自然 | 控制走 MQTT,媒体走 UDP,能并发 |
| 服务器实现复杂度 | 简单(一个 ws server) | 较复杂(mqtt broker + udp gateway) |
| 适用场景 | 普通互联网,对延迟要求一般 | 移动网络/有 NAT,对延迟敏感的实时语音 |
服务器端在握手时下发 mqtt 配置或 websocket 配置,设备就走对应路径。
5.2 protocol.h —— 抽象基类与帧头
5.2.1 AudioStreamPacket 结构
struct AudioStreamPacket {
int sample_rate = 0;
int frame_duration = 0;
uint32_t timestamp = 0;
std::vector<uint8_t> payload;
};
贯穿全项目的"一帧音频"基本单元:
sample_rate:当前帧的 Opus 采样率(决定解码器配置);frame_duration:帧长(毫秒,常用 60);timestamp:用于服务器 AEC 和 UDP 顺序追踪;payload:原始 Opus bytes(不含帧头)。
5.2.2 二进制帧头两版
struct BinaryProtocol2 {
uint16_t version; // = 2,网络字节序
uint16_t type; // 0=OPUS, 1=JSON
uint32_t reserved;
uint32_t timestamp; // 毫秒级时间戳,用于服务端 AEC
uint32_t payload_size;
uint8_t payload[];
} __attribute__((packed));
struct BinaryProtocol3 {
uint8_t type;
uint8_t reserved;
uint16_t payload_size;
uint8_t payload[];
} __attribute__((packed));
两版帧头:
- v2:16 字节头,含版本号、消息类型、时间戳、长度。适合 WebSocket 复用单连接(同一条 WS 流上跑多种消息);
- v3:4 字节头,只有类型 + 长度。MQTT+UDP 模式用,时间戳和顺序号放在 UDP 的 nonce 里更紧凑。
__attribute__((packed)):禁止编译器按对齐补齐字段——结构体直接覆盖网络上的字节流。再加上 uint8_t payload[] 这种"灵活数组成员"(C99 引入),实现"零拷贝指向连续 payload"。
5.2.3 AbortReason 和 ListeningMode
enum AbortReason {
kAbortReasonNone,
kAbortReasonWakeWordDetected
};
enum ListeningMode {
kListeningModeAutoStop, // VAD 静音超时自动结束
kListeningModeManualStop, // 显式 stop 才结束(按住说话)
kListeningModeRealtime // 全双工,需要 AEC
};
由协议在 SendStartListening / SendAbortSpeaking 时携带。
5.2.4 基类公开接口
class Protocol {
public:
void OnIncomingAudio(std::function<void(std::unique_ptr<AudioStreamPacket>)>);
void OnIncomingJson(std::function<void(const cJSON* root)>);
void OnAudioChannelOpened(std::function<void()>);
void OnAudioChannelClosed(std::function<void()>);
void OnNetworkError(std::function<void(const std::string&)>);
void OnConnected(std::function<void()>);
void OnDisconnected(std::function<void()>);
virtual bool Start() = 0;
virtual bool OpenAudioChannel() = 0;
virtual void CloseAudioChannel() = 0;
virtual bool IsAudioChannelOpened() const = 0;
virtual bool SendAudio(std::unique_ptr<AudioStreamPacket>) = 0;
virtual void SendWakeWordDetected(const std::string& wake_word);
virtual void SendStartListening(ListeningMode mode);
virtual void SendStopListening();
virtual void SendAbortSpeaking(AbortReason reason);
virtual void SendMcpMessage(const std::string& message);
...
protected:
std::function<...> on_incoming_audio_; // 等等 7 个回调
int server_sample_rate_ = 24000;
int server_frame_duration_ = 60;
bool error_occurred_ = false;
std::string session_id_;
std::chrono::time_point<std::chrono::steady_clock> last_incoming_time_;
virtual bool SendText(const std::string& text) = 0;
virtual void SetError(const std::string& message);
virtual bool IsTimeout() const;
};
设计要点:
- 6 个发送类方法是虚函数有默认实现(
SendWakeWordDetected/SendStartListening/SendStopListening/SendAbortSpeaking/SendMcpMessage+ 一个Send*),共用底层SendText(json_str); SendText才是纯虚——具体走 WS 文本还是 MQTT publish 由子类决定;- 7 个回调全部
std::function,子类只负责"在合适的时机调用",业务逻辑由Application通过OnXxx(...)注入; server_sample_rate_ / server_frame_duration_由握手时服务器 hello 消息确定,下行音频解码用;session_id_也是握手时服务器分配,所有消息都带;last_incoming_time_给IsTimeout()判断 120 秒无任何数据则视为掉线。
5.2.5 protocol.cc 基类实现 —— 通用控制消息生成
void Protocol::SendAbortSpeaking(AbortReason reason) {
std::string message = "{\"session_id\":\"" + session_id_ + "\",\"type\":\"abort\"";
if (reason == kAbortReasonWakeWordDetected) {
message += ",\"reason\":\"wake_word_detected\"";
}
message += "}";
SendText(message);
}
void Protocol::SendWakeWordDetected(const std::string& wake_word) {
std::string json = "{\"session_id\":\"" + session_id_ +
"\",\"type\":\"listen\",\"state\":\"detect\",\"text\":\"" + wake_word + "\"}";
SendText(json);
}
void Protocol::SendStartListening(ListeningMode mode) {
std::string message = "{\"session_id\":\"" + session_id_ + "\"";
message += ",\"type\":\"listen\",\"state\":\"start\"";
if (mode == kListeningModeRealtime) {
message += ",\"mode\":\"realtime\"";
} else if (mode == kListeningModeAutoStop) {
message += ",\"mode\":\"auto\"";
} else {
message += ",\"mode\":\"manual\"";
}
message += "}";
SendText(message);
}
void Protocol::SendStopListening() {
std::string message = "{\"session_id\":\"" + session_id_ + "\",\"type\":\"listen\",\"state\":\"stop\"}";
SendText(message);
}
void Protocol::SendMcpMessage(const std::string& payload) {
std::string message = "{\"session_id\":\"" + session_id_ + "\",\"type\":\"mcp\",\"payload\":" + payload + "}";
SendText(message);
}
bool Protocol::IsTimeout() const {
const int kTimeoutSeconds = 120;
auto now = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(now - last_incoming_time_);
bool timeout = duration.count() > kTimeoutSeconds;
if (timeout) {
ESP_LOGE(TAG, "Channel timeout %ld seconds", (long)duration.count());
}
return timeout;
}
字符串拼接 JSON 而不是用 cJSON:
- 这些消息字段固定,手写更快;
- 避免 cJSON 频繁 alloc/free 内存碎片;
- 对应文档:
docs/websocket.md和docs/mqtt-udp.md里有完整字段说明。
要点:
| 消息 | type | state | 附加字段 |
|---|---|---|---|
| Hello(子类各自写) | hello | — | version, transport, features, audio_params |
| Goodbye(mqtt 关闭通道时) | goodbye | — | — |
| Abort | abort | — | reason: wake_word_detected (可选) |
| WakeWordDetected | listen | detect | text: 唤醒词 |
| StartListening | listen | start | mode: realtime/auto/manual |
| StopListening | listen | stop | — |
| McpMessage | mcp | — | payload: 完整 MCP 报文(已 JSON 字符串) |
服务器下行 JSON 见第 2 章 2.6.7(tts/stt/llm/mcp/system/alert/custom)。
5.3 WebsocketProtocol 实现详解
5.3.1 构造与生命期
WebsocketProtocol::WebsocketProtocol() {
event_group_handle_ = xEventGroupCreate();
}
WebsocketProtocol::~WebsocketProtocol() {
vEventGroupDelete(event_group_handle_);
}
事件组里只用了一位 WEBSOCKET_PROTOCOL_SERVER_HELLO_EVENT——等服务器 hello 回包的同步。
5.3.2 Start() —— 懒连接
bool WebsocketProtocol::Start() {
// Only connect to server when audio channel is needed
return true;
}
WebSocket 不在 Start() 里建连——懒连接:等到 OpenAudioChannel() 才真正握手,避免空 idle 也持有 TCP 连接耗资源。
5.3.3 OpenAudioChannel() —— 完整握手流程
bool WebsocketProtocol::OpenAudioChannel() {
Settings settings("websocket", false);
std::string url = settings.GetString("url");
std::string token = settings.GetString("token");
int version = settings.GetInt("version");
if (version != 0) version_ = version;
error_occurred_ = false;
auto network = Board::GetInstance().GetNetwork();
websocket_ = network->CreateWebSocket(1);
if (websocket_ == nullptr) return false;
if (!token.empty()) {
if (token.find(" ") == std::string::npos) {
token = "Bearer " + token;
}
websocket_->SetHeader("Authorization", token.c_str());
}
websocket_->SetHeader("Protocol-Version", std::to_string(version_).c_str());
websocket_->SetHeader("Device-Id", SystemInfo::GetMacAddress().c_str());
websocket_->SetHeader("Client-Id", Board::GetInstance().GetUuid().c_str());
第一步:
- 从 NVS 命名空间
websocket取 url / token / version(OTA 阶段写进去的); - 通过板子的
network->CreateWebSocket()拿一个 WebSocket 句柄(network抽象了 WiFi 和 4G 两种栈); - 设三个标准头:Authorization(带 Bearer 前缀)、Protocol-Version、Device-Id(MAC)、Client-Id(UUID,软件生成)。
websocket_->OnData([this](const char* data, size_t len, bool binary) {
if (binary) {
// ... 见下 5.3.4 二进制收包
} else {
auto root = cJSON_Parse(data);
auto type = cJSON_GetObjectItem(root, "type");
if (cJSON_IsString(type)) {
if (strcmp(type->valuestring, "hello") == 0) {
ParseServerHello(root);
} else {
if (on_incoming_json_ != nullptr) {
on_incoming_json_(root);
}
}
} else {
ESP_LOGE(TAG, "Missing message type, data: %s", data);
}
cJSON_Delete(root);
}
last_incoming_time_ = std::chrono::steady_clock::now();
});
websocket_->OnDisconnected([this]() {
if (on_audio_channel_closed_ != nullptr) {
on_audio_channel_closed_();
}
});
挂回调:
- 文本帧 → 解 JSON,hello 内部处理,其它交给
on_incoming_json_(业务层装的大分发器); - 断开 → 通知业务层。
if (!websocket_->Connect(url.c_str())) {
SetError(Lang::Strings::SERVER_NOT_CONNECTED);
return false;
}
auto message = GetHelloMessage();
if (!SendText(message)) return false;
EventBits_t bits = xEventGroupWaitBits(event_group_handle_, WEBSOCKET_PROTOCOL_SERVER_HELLO_EVENT,
pdTRUE, pdFALSE, pdMS_TO_TICKS(10000));
if (!(bits & WEBSOCKET_PROTOCOL_SERVER_HELLO_EVENT)) {
SetError(Lang::Strings::SERVER_TIMEOUT);
return false;
}
if (on_audio_channel_opened_ != nullptr) on_audio_channel_opened_();
return true;
}
握手三步:
- TCP+TLS 连接(
websocket_->Connect); - 发 client hello;
- 同步等 10 秒收 server hello——超时报错。
为什么用 xEventGroupWaitBits 等而不是直接在 OnData 回调里走?因为 OpenAudioChannel 是被业务层在主循环上调用的,需要返回 true/false。把"server hello 收到"翻译成事件位,让调用者同步阻塞等。
5.3.4 OnData 二进制帧解析
if (version_ == 2) {
BinaryProtocol2* bp2 = (BinaryProtocol2*)data;
bp2->version = ntohs(bp2->version);
bp2->type = ntohs(bp2->type);
bp2->timestamp = ntohl(bp2->timestamp);
bp2->payload_size = ntohl(bp2->payload_size);
auto payload = (uint8_t*)bp2->payload;
on_incoming_audio_(std::make_unique<AudioStreamPacket>(AudioStreamPacket{
.sample_rate = server_sample_rate_,
.frame_duration = server_frame_duration_,
.timestamp = bp2->timestamp,
.payload = std::vector<uint8_t>(payload, payload + bp2->payload_size)
}));
} else if (version_ == 3) {
BinaryProtocol3* bp3 = (BinaryProtocol3*)data;
bp3->payload_size = ntohs(bp3->payload_size);
auto payload = (uint8_t*)bp3->payload;
on_incoming_audio_(std::make_unique<AudioStreamPacket>(AudioStreamPacket{
.sample_rate = server_sample_rate_,
.frame_duration = server_frame_duration_,
.timestamp = 0,
.payload = std::vector<uint8_t>(payload, payload + bp3->payload_size)
}));
} else {
// version 1:无帧头,整个 binary 就是 Opus payload
on_incoming_audio_(std::make_unique<AudioStreamPacket>(AudioStreamPacket{
.sample_rate = server_sample_rate_,
.frame_duration = server_frame_duration_,
.timestamp = 0,
.payload = std::vector<uint8_t>((uint8_t*)data, (uint8_t*)data + len)
}));
}
要点:
ntohs / ntohl把网络字节序(大端)转主机字节序(小端 = ESP32 默认);- 用 C 风格 cast
(BinaryProtocol2*)data直接覆盖结构体到字节流——配合packed属性,零拷贝读字段; - payload 还是要拷贝一份到
std::vector——原data缓冲属于 websocket 客户端栈,回调返回后失效; - v1 没有帧头,整段就是 Opus。
5.3.5 SendAudio() 发送(与 OnData 镜像)
bool WebsocketProtocol::SendAudio(std::unique_ptr<AudioStreamPacket> packet) {
if (websocket_ == nullptr || !websocket_->IsConnected()) return false;
if (version_ == 2) {
std::string serialized;
serialized.resize(sizeof(BinaryProtocol2) + packet->payload.size());
auto bp2 = (BinaryProtocol2*)serialized.data();
bp2->version = htons(version_);
bp2->type = 0;
bp2->reserved = 0;
bp2->timestamp = htonl(packet->timestamp);
bp2->payload_size = htonl(packet->payload.size());
memcpy(bp2->payload, packet->payload.data(), packet->payload.size());
return websocket_->Send(serialized.data(), serialized.size(), true);
} else if (version_ == 3) {
// ... 类似但用 BP3
} else {
return websocket_->Send(packet->payload.data(), packet->payload.size(), true);
}
}
Send(data, size, true) 第三参 true = binary 帧。
5.3.6 GetHelloMessage() —— 客户端 hello
std::string WebsocketProtocol::GetHelloMessage() {
cJSON* root = cJSON_CreateObject();
cJSON_AddStringToObject(root, "type", "hello");
cJSON_AddNumberToObject(root, "version", version_);
cJSON* features = cJSON_CreateObject();
#if CONFIG_USE_SERVER_AEC
cJSON_AddBoolToObject(features, "aec", true);
#endif
cJSON_AddBoolToObject(features, "mcp", true);
cJSON_AddItemToObject(root, "features", features);
cJSON_AddStringToObject(root, "transport", "websocket");
cJSON* audio_params = cJSON_CreateObject();
cJSON_AddStringToObject(audio_params, "format", "opus");
cJSON_AddNumberToObject(audio_params, "sample_rate", 16000);
cJSON_AddNumberToObject(audio_params, "channels", 1);
cJSON_AddNumberToObject(audio_params, "frame_duration", OPUS_FRAME_DURATION_MS);
cJSON_AddItemToObject(root, "audio_params", audio_params);
auto json_str = cJSON_PrintUnformatted(root);
std::string message(json_str);
cJSON_free(json_str);
cJSON_Delete(root);
return message;
}
发出去类似:
{
"type": "hello",
"version": 3,
"features": {"aec": false, "mcp": true},
"transport": "websocket",
"audio_params": {
"format": "opus",
"sample_rate": 16000,
"channels": 1,
"frame_duration": 60
}
}
服务器看了知道客户端能力 → 决定下行 Opus 帧用什么采样率,回 hello。
5.3.7 ParseServerHello() —— 协议参数同步
void WebsocketProtocol::ParseServerHello(const cJSON* root) {
auto transport = cJSON_GetObjectItem(root, "transport");
if (transport == nullptr || strcmp(transport->valuestring, "websocket") != 0) return;
auto session_id = cJSON_GetObjectItem(root, "session_id");
if (cJSON_IsString(session_id)) {
session_id_ = session_id->valuestring;
}
auto audio_params = cJSON_GetObjectItem(root, "audio_params");
if (cJSON_IsObject(audio_params)) {
auto sample_rate = cJSON_GetObjectItem(audio_params, "sample_rate");
if (cJSON_IsNumber(sample_rate)) server_sample_rate_ = sample_rate->valueint;
auto frame_duration = cJSON_GetObjectItem(audio_params, "frame_duration");
if (cJSON_IsNumber(frame_duration)) server_frame_duration_ = frame_duration->valueint;
}
xEventGroupSetBits(event_group_handle_, WEBSOCKET_PROTOCOL_SERVER_HELLO_EVENT);
}
记下 session_id 和服务器音频参数,触发事件位让 OpenAudioChannel 解除阻塞。
5.3.8 CloseAudioChannel() 和 IsAudioChannelOpened()
void WebsocketProtocol::CloseAudioChannel() {
websocket_.reset();
}
bool WebsocketProtocol::IsAudioChannelOpened() const {
return websocket_ != nullptr && websocket_->IsConnected() && !error_occurred_ && !IsTimeout();
}
WebSocket 直接 reset 句柄触发析构,连接随之关闭。Open 判断要四条都满足:句柄存在 + TCP 连着 + 没出错 + 没超时。
5.4 MqttProtocol 实现详解
5.4.1 构造 —— 设置重连 timer
MqttProtocol::MqttProtocol() {
event_group_handle_ = xEventGroupCreate();
esp_timer_create_args_t reconnect_timer_args = {
.callback = [](void* arg) {
MqttProtocol* protocol = (MqttProtocol*)arg;
auto& app = Application::GetInstance();
if (app.GetDeviceState() == kDeviceStateIdle) {
auto alive = protocol->alive_;
app.Schedule([protocol, alive]() {
if (*alive) {
protocol->StartMqttClient(false);
}
});
}
},
.arg = this,
};
esp_timer_create(&reconnect_timer_args, &reconnect_timer_);
}
MQTT 经常掉线(移动网络弱信号),自带重连机制:
- 一个一次性 timer;
- 触发时先检查是不是 idle(其它状态比如正在通话不重连,避免打断);
alive_是shared_ptr<atomic<bool>>哨兵——析构时 set false,避免 timer 调度的 lambda 在对象已销毁后还跑(dangling pointer 的经典防御)。- 真正的重连推到主线程 (
Schedule) 跑——单线程化协议状态。
5.4.2 析构 —— 优雅停机
MqttProtocol::~MqttProtocol() {
*alive_ = false; // ★ 先标记自己死了
if (reconnect_timer_ != nullptr) {
esp_timer_stop(reconnect_timer_);
esp_timer_delete(reconnect_timer_);
}
udp_.reset();
mqtt_.reset();
if (event_group_handle_ != nullptr) vEventGroupDelete(event_group_handle_);
}
*alive_ = false 是关键——后续任何已经在 Schedule 队列里 pending 的 lambda 检查 if (*alive) 都会跳过执行。
5.4.3 Start() + StartMqttClient()
bool MqttProtocol::Start() {
return StartMqttClient(false);
}
bool MqttProtocol::StartMqttClient(bool report_error) {
if (mqtt_ != nullptr) mqtt_.reset();
Settings settings("mqtt", false);
auto endpoint = settings.GetString("endpoint");
auto client_id = settings.GetString("client_id");
auto username = settings.GetString("username");
auto password = settings.GetString("password");
int keepalive_interval = settings.GetInt("keepalive", 240);
publish_topic_ = settings.GetString("publish_topic");
if (endpoint.empty()) {
if (report_error) SetError(Lang::Strings::SERVER_NOT_FOUND);
return false;
}
auto network = Board::GetInstance().GetNetwork();
mqtt_ = network->CreateMqtt(0);
mqtt_->SetKeepAlive(keepalive_interval);
跟 WS 不一样的是——MQTT 在 Start() 就建连,因为 MQTT 需要持续在线接受 publish。控制信令随时可能从服务器发来(如 OTA 强制重启)。
mqtt_->OnDisconnected([this]() {
if (on_disconnected_ != nullptr) on_disconnected_();
esp_timer_start_once(reconnect_timer_, MQTT_RECONNECT_INTERVAL_MS * 1000);
});
mqtt_->OnConnected([this]() {
if (on_connected_ != nullptr) on_connected_();
esp_timer_stop(reconnect_timer_);
});
- 断开 → 启动 60 秒倒计时重连 timer;
- 连上 → 取消重连 timer(如果有的话)。
mqtt_->OnMessage([this](const std::string& topic, const std::string& payload) {
cJSON* root = cJSON_Parse(payload.c_str());
if (root == nullptr) return;
cJSON* type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) { cJSON_Delete(root); return; }
if (strcmp(type->valuestring, "hello") == 0) {
ParseServerHello(root);
} else if (strcmp(type->valuestring, "goodbye") == 0) {
auto session_id = cJSON_GetObjectItem(root, "session_id");
if (session_id == nullptr || session_id_ == session_id->valuestring) {
auto alive = alive_;
Application::GetInstance().Schedule([this, alive]() {
if (*alive) CloseAudioChannel();
});
}
} else if (on_incoming_json_ != nullptr) {
on_incoming_json_(root);
}
cJSON_Delete(root);
last_incoming_time_ = std::chrono::steady_clock::now();
});
MQTT 消息处理:
hello→ 本地处理(拿 UDP 端点);goodbye→ 服务端主动断开(比如会话结束或被踢),关闭 UDP;- 其它 → 透传给业务层。
注意 goodbye 用 Schedule() 延后到主线程,MQTT 回调线程不直接做关闭操作。
std::string broker_address;
int broker_port = 8883;
size_t pos = endpoint.find(':');
if (pos != std::string::npos) {
broker_address = endpoint.substr(0, pos);
broker_port = std::stoi(endpoint.substr(pos + 1));
} else {
broker_address = endpoint;
}
if (!mqtt_->Connect(broker_address, broker_port, client_id, username, password)) {
SetError(Lang::Strings::SERVER_NOT_CONNECTED);
return false;
}
return true;
}
解析 host:port,默认 8883(MQTTS),连接 broker。
5.4.4 OpenAudioChannel() —— 申请 UDP 通道
bool MqttProtocol::OpenAudioChannel() {
if (mqtt_ == nullptr || !mqtt_->IsConnected()) {
if (!StartMqttClient(true)) return false;
}
error_occurred_ = false;
session_id_ = "";
xEventGroupClearBits(event_group_handle_, MQTT_PROTOCOL_SERVER_HELLO_EVENT);
auto message = GetHelloMessage();
if (!SendText(message)) return false;
EventBits_t bits = xEventGroupWaitBits(event_group_handle_, MQTT_PROTOCOL_SERVER_HELLO_EVENT, pdTRUE, pdFALSE, pdMS_TO_TICKS(10000));
if (!(bits & MQTT_PROTOCOL_SERVER_HELLO_EVENT)) {
SetError(Lang::Strings::SERVER_TIMEOUT);
return false;
}
std::lock_guard<std::mutex> lock(channel_mutex_);
auto network = Board::GetInstance().GetNetwork();
udp_ = network->CreateUdp(2);
udp_->OnMessage([this](const std::string& data) { /* 见 5.4.6 */ });
udp_->Connect(udp_server_, udp_port_);
if (on_audio_channel_opened_ != nullptr) on_audio_channel_opened_();
return true;
}
握手流程:
- MQTT 控制信令通道已连(或重连);
- 通过 MQTT 发 client hello(type=hello, transport=udp);
- 等服务端 hello 回 UDP 服务器地址 + 加密 key + nonce;
- 用拿到的信息开 UDP 客户端、绑回调。
channel_mutex_ 保护 udp_ 指针——SendAudio 在另一个 task(主循环)调用。
5.4.5 ParseServerHello() —— 拿 UDP 凭证 + 初始化 AES
void MqttProtocol::ParseServerHello(const cJSON* root) {
auto transport = cJSON_GetObjectItem(root, "transport");
if (transport == nullptr || strcmp(transport->valuestring, "udp") != 0) return;
auto session_id = cJSON_GetObjectItem(root, "session_id");
if (cJSON_IsString(session_id)) session_id_ = session_id->valuestring;
auto audio_params = cJSON_GetObjectItem(root, "audio_params");
if (cJSON_IsObject(audio_params)) {
auto sample_rate = cJSON_GetObjectItem(audio_params, "sample_rate");
if (cJSON_IsNumber(sample_rate)) server_sample_rate_ = sample_rate->valueint;
auto frame_duration = cJSON_GetObjectItem(audio_params, "frame_duration");
if (cJSON_IsNumber(frame_duration)) server_frame_duration_ = frame_duration->valueint;
}
auto udp = cJSON_GetObjectItem(root, "udp");
if (!cJSON_IsObject(udp)) return;
udp_server_ = cJSON_GetObjectItem(udp, "server")->valuestring;
udp_port_ = cJSON_GetObjectItem(udp, "port")->valueint;
auto key = cJSON_GetObjectItem(udp, "key")->valuestring;
auto nonce = cJSON_GetObjectItem(udp, "nonce")->valuestring;
aes_nonce_ = DecodeHexString(nonce);
mbedtls_aes_init(&aes_ctx_);
mbedtls_aes_setkey_enc(&aes_ctx_, (const unsigned char*)DecodeHexString(key).c_str(), 128);
local_sequence_ = 0;
remote_sequence_ = 0;
xEventGroupSetBits(event_group_handle_, MQTT_PROTOCOL_SERVER_HELLO_EVENT);
}
服务器返回的 hello 类似:
{
"type": "hello",
"transport": "udp",
"session_id": "abc123",
"audio_params": { "sample_rate": 24000, "frame_duration": 60 },
"udp": {
"server": "1.2.3.4",
"port": 9000,
"key": "0011...AABB", // 32 hex chars = 16 bytes = AES-128 key
"nonce": "01...0000" // 32 hex chars = 16 bytes = AES nonce 初始模板
}
}
设备处理:
- 16 字节 AES-128 key 解 hex 后初始化
mbedtls_aes_setkey_enc(只 setkey_enc,因为 CTR 模式加密解密同一函数); - nonce 解 hex 后保存进
aes_nonce_; - 序号清零;
- 触发事件位让
OpenAudioChannel解除阻塞。
5.4.6 SendAudio() —— AES-128-CTR 加密
bool MqttProtocol::SendAudio(std::unique_ptr<AudioStreamPacket> packet) {
std::lock_guard<std::mutex> lock(channel_mutex_);
if (udp_ == nullptr) return false;
std::string nonce(aes_nonce_);
*(uint16_t*)&nonce[2] = htons(packet->payload.size());
*(uint32_t*)&nonce[8] = htonl(packet->timestamp);
*(uint32_t*)&nonce[12] = htonl(++local_sequence_);
std::string encrypted;
encrypted.resize(aes_nonce_.size() + packet->payload.size());
memcpy(encrypted.data(), nonce.data(), nonce.size());
size_t nc_off = 0;
uint8_t stream_block[16] = {0};
if (mbedtls_aes_crypt_ctr(&aes_ctx_, packet->payload.size(), &nc_off, (uint8_t*)nonce.c_str(), stream_block,
(uint8_t*)packet->payload.data(), (uint8_t*)&encrypted[nonce.size()]) != 0) {
return false;
}
return udp_->Send(encrypted) > 0;
}
UDP 加密音频帧结构(16 字节 nonce + payload):
nonce (16 字节):
┌─────┬─────┬─────────┬─────────┬──────────┬──────────┐
│type │flag │ size │ ssrc │timestamp │ sequence │
│ 1u │ 1u │ 2u │ 4u │ 4u │ 4u │
└─────┴─────┴─────────┴─────────┴──────────┴──────────┘
encrypted payload (变长):
AES-128-CTR(key, nonce, opus_bytes)
关键点:
- nonce 的高 8 字节由服务器固定(type/flag/ssrc 等),后 8 字节客户端每帧改(payload size、timestamp、sequence);
- 这样每帧 nonce 唯一,CTR 模式安全前提是 nonce 不重用;
- AES-CTR 是流密码,输出长度等于输入,不需要对齐 padding;
- nonce 也明文发出去——接收方需要它来解密;
local_sequence_严格递增——服务器可以检测乱序/重放。
mbedtls_aes_crypt_ctr 参数:
- ctx:AES 上下文(设了 128 位 key);
- length:加密多少字节;
- nc_off:内部偏移,连续多次调用时用;
- nonce_counter:CTR 计数器(也是 IV);
- stream_block:内部流块;
- input/output:源/目的。
5.4.7 udp_->OnMessage —— UDP 收包解密
udp_->OnMessage([this](const std::string& data) {
if (data.size() < sizeof(aes_nonce_)) return;
if (data[0] != 0x01) return; // 类型校验
uint32_t timestamp = ntohl(*(uint32_t*)&data[8]);
uint32_t sequence = ntohl(*(uint32_t*)&data[12]);
if (sequence < remote_sequence_) {
ESP_LOGW(TAG, "Received audio packet with old sequence: %lu, expected: %lu", sequence, remote_sequence_);
return;
}
if (sequence != remote_sequence_ + 1) {
ESP_LOGW(TAG, "Received audio packet with wrong sequence: %lu, expected: %lu", sequence, remote_sequence_ + 1);
}
size_t decrypted_size = data.size() - aes_nonce_.size();
size_t nc_off = 0;
uint8_t stream_block[16] = {0};
auto nonce = (uint8_t*)data.data();
auto encrypted = (uint8_t*)data.data() + aes_nonce_.size();
auto packet = std::make_unique<AudioStreamPacket>();
packet->sample_rate = server_sample_rate_;
packet->frame_duration = server_frame_duration_;
packet->timestamp = timestamp;
packet->payload.resize(decrypted_size);
if (mbedtls_aes_crypt_ctr(&aes_ctx_, decrypted_size, &nc_off, nonce, stream_block,
encrypted, (uint8_t*)packet->payload.data()) != 0) return;
if (on_incoming_audio_ != nullptr) on_incoming_audio_(std::move(packet));
remote_sequence_ = sequence;
last_incoming_time_ = std::chrono::steady_clock::now();
});
要点:
- 乱序丢弃:sequence 小于已收的最大值直接丢;
- 乱序但更大:打 warn 但仍然接收(可能丢了几帧但要继续,UDP 容忍丢包);
- 解密用同一个 AES 上下文(CTR 模式加解密同函数);
- 解密后 payload 包成
AudioStreamPacket推到上层 audio_service 的 decode 队列。
5.4.8 CloseAudioChannel() —— 发 goodbye + 关 UDP
void MqttProtocol::CloseAudioChannel() {
{
std::lock_guard<std::mutex> lock(channel_mutex_);
udp_.reset();
}
std::string message = "{";
message += "\"session_id\":\"" + session_id_ + "\",";
message += "\"type\":\"goodbye\"";
message += "}";
SendText(message);
if (on_audio_channel_closed_ != nullptr) on_audio_channel_closed_();
}
先关 UDP(释放本地资源),再通过 MQTT 发 goodbye 通知服务器(不强制等待 ack)。
5.4.9 IsAudioChannelOpened()
bool MqttProtocol::IsAudioChannelOpened() const {
return udp_ != nullptr && !error_occurred_ && !IsTimeout();
}
注意只看 udp_——MQTT 即使断了但 UDP 还在跑也可以发音频(控制信令暂时收不到罢了)。
5.4.10 DecodeHexString() —— hex 字符串转字节
static inline uint8_t CharToHex(char c) {
if (c >= '0' && c <= '9') return c - '0';
if (c >= 'A' && c <= 'F') return c - 'A' + 10;
if (c >= 'a' && c <= 'f') return c - 'a' + 10;
return 0;
}
std::string MqttProtocol::DecodeHexString(const std::string& hex_string) {
std::string decoded;
decoded.reserve(hex_string.size() / 2);
for (size_t i = 0; i < hex_string.size(); i += 2) {
char byte = (CharToHex(hex_string[i]) << 4) | CharToHex(hex_string[i + 1]);
decoded.push_back(byte);
}
return decoded;
}
把 "AB12CD" → \xAB\x12\xCD。简单的两位组合。
5.5 协议层 vs 业务层的"接线总表"
回看第 2 章 2.6.7 InitializeProtocol(),把它的所有挂钩列在这里方便整体回顾:
Application 注册的回调 协议层会在什么时候调
────────────────────────────── ─────────────────────────────────
OnConnected (DismissAlert) MQTT 重连成功 / 任意协议连上
OnNetworkError (set ERROR 位) 连接失败 / Send 失败 / Timeout
OnIncomingAudio (push decode 队列) WS binary 帧 / UDP 解密后
OnAudioChannelOpened (升性能档) OpenAudioChannel 握手成功
OnAudioChannelClosed (降功耗+清 UI) WS 断开 / udp_ 重置后
OnIncomingJson (大分发器) WS 文本帧 / MQTT publish(除 hello/goodbye)
Application 主动调协议 做什么
────────────────────────────── ─────────────────────────────────
protocol_->Start() MQTT: 立即连; WS: 不做事
protocol_->OpenAudioChannel() 建立音频通道(同步等 hello)
protocol_->CloseAudioChannel() 关音频通道
protocol_->IsAudioChannelOpened() 状态查询
protocol_->SendAudio(packet) 发 Opus 包
protocol_->SendStartListening(mode) 控制信令
protocol_->SendStopListening()
protocol_->SendAbortSpeaking(reason)
protocol_->SendWakeWordDetected(word)
protocol_->SendMcpMessage(payload)
5.6 一次完整对话的协议时序图
WebSocket 模式:
Device Server
│ │
│── TCP+TLS 握手 ────────────► │
│ │
│── WS Upgrade ───────────────► │ (HTTP 101)
│ │
│── {type:hello, ...} ───────► │
│ │
│ ◄── {type:hello, session_id} │ 保存 session
│ │
│── {type:listen,state:start} ► │
│ │
│ ──BP3 + opus ───────────────► │
│ ──BP3 + opus ───────────────► │
│ ──BP3 + opus ───────────────► │
│ │ (服务器做 ASR)
│ ◄── {type:stt, text:"...."} │
│ ◄── {type:llm, emotion:".."} │
│ ◄── {type:tts, state:start} │
│ ◄── {type:tts, sentence_start, text:"..."}
│ ◄──BP3 + opus ───────────── │
│ ◄──BP3 + opus ───────────── │
│ ◄── {type:tts, state:stop} │
│ │
│── {type:listen,state:start} ► │ (auto 模式回 listening)
│ ... │
MQTT+UDP 模式:
Device MQTT Broker UDP Gateway
│ │ │
│── MQTT Connect ────────────► │ │
│── subscribe topics ────────► │ │
│── publish hello (type=hello) │ │
│ │ ◄─ relay ──────────►│
│ │ │
│ ◄── publish hello (transport=udp, server, port, key, nonce)
│ │ (saves session) │
│ │ │
│── 用 UDP 连指定 server:port ──────────────────────► │
│ │ │
│── publish listen:start ────► │ ◄────────────────► │
│ │ │
│── UDP 加密 opus ────────────────────────────────► │ ASR
│ │ │
│ │ ◄─ publish tts:start (MQTT)
│ ◄── UDP 加密 opus ──────────────────────────────── │ TTS
│ │ │
│ │ ◄─ publish tts:stop
│ │ │
│── publish goodbye ─────────► │ │
│── UDP 关闭 ────────────────────────────────────── │
5.7 本章用到的核心技术汇总
| 技术 | 应用 |
|---|---|
| C++ 抽象基类 + 多态 | Protocol 基类 + WS/MQTT 两实现 |
std::function + lambda 回调 | 7 个回调挂钩业务层 |
__attribute__((packed)) + 灵活数组 | 帧头结构直接覆盖字节流 |
ntohs / htons / ntohl / htonl | 网络字节序转换 |
| C 风格 cast 直接读结构体 | 零拷贝读字段 |
| FreeRTOS EventGroup | 同步等服务器 hello(最多 10 秒) |
std::shared_ptr<std::atomic<bool>> 哨兵 | 防止延迟 timer 回调访问已销毁对象 |
esp_timer_start_once | 60 秒一次性重连 |
std::mutex 保护 udp_ | SendAudio 和 OnMessage 并发访问 |
mbedtls_aes_crypt_ctr | AES-128-CTR 加密音频 |
| Hex 字符串转字节 | 服务器下发 key/nonce 编码方式 |
| UDP 自定义序号 | 检测重放/乱序 |
| cJSON 解析/生成 | 控制信令 |
| 手写 JSON 字符串拼接 | 性能优化(高频小消息) |
std::chrono::steady_clock | 120 秒无活动超时 |
auto alive = alive_ 在 lambda 里捕获 shared_ptr | Schedule 队列里也安全 |
| NVS 配置存储 | url/token/version/endpoint/keepalive 等 |
| 板级 NetworkInterface 抽象 | WS/MQTT/UDP 各自由板子提供具体实现,WiFi 板和 4G 板共用上层代码 |
5.8 看完本章你应该掌握的
Protocol基类的 7 个回调和 5 个 send 方法- 两版二进制帧头 BP2/BP3 字段含义
- WebSocket 模式:单连接、懒连接、HTTP header 鉴权、Hello 协商音频参数
- MQTT+UDP 模式:MQTT 长在线 + UDP 一次性加密媒体流;hello 时拿 UDP key/nonce
- AES-128-CTR 在 nonce 里嵌入 size/timestamp/sequence 的设计
- UDP 序号检测乱序 + 重放
- Application 注册的 7 个回调 vs 主动调的 9 个方法 —— 完整的"协议接线表"
- MqttProtocol 用 shared_ptr<atomic
> 防止延迟回调悬空指针的技巧 - 一次对话从 hello 到 goodbye 的完整时序
下一章进入 mcp_server.cc/h——设备端 MCP 协议实现(让大模型直接调用设备功能的桥梁)。