Coze 平台 websocket quecpython 接入

Coze 平台 websocket quecpython 接入

该Coze聊天机器人方案具有以下特性:

  • 支持音色切换。
  • 支持语音中断/打断。
  • 支持语音唤醒。
  • 使用 Python 语言,便于二次开发。
  • 开发资源
  • 快速上手
  • 软件设计

硬件资料

软件资料

开发工具

  • QPYcom - QuecPython 调试工具
    • 版本:V3.9.0
    • 下载 QPYcom
  • VSCode - 代码编辑器

实验源码

  • 从 Github 仓库获取源代码步骤如下:
    git clone https://github.com/QuecPython/CozeWebSocket.git
    
  • 压缩包下载

硬件准备

  • Windows 电脑一台,建议 Win10 系统。

  • 一套 EC800MCNLE 开发板 (含天线、Type-C 数据线等)。

  • 一张可正常使用的 Nano SIM 卡。

  • 一个 2-5W 功率的喇叭。

环境搭建

硬件连接

按照下图进行硬件连接:

  1. WiFi天线连接至图中标识有WiFi字样的天线连接座上。
  2. 将天线连接至标识有LTE字样的天线连接座上。
  3. 在图示位置插入可用的 Nano SIM 卡。
  4. 将喇叭连接至图中标识有SPK+SPK-的排针上。
  5. 在图示的位置接入电池。
  6. 使用 Type-C 数据线连接开发板和电脑。

设备开发

开机

完成硬件连接的工作后,电脑设备管理器的端口列表中出现包含 Quectel USB 字样的 COM 口,表示开机成功。

comport.png

烧录固件包

参考此章节,烧录对应型号固件包至开发板。

脚本导入与运行

  1. 参考此章节,将源码目录下 src 文件夹中的所有文件导入到模组文件系统,如下图所示:

  2. 参考此章节,执行主程序文件 _main.py

  3. 参考此章节,停止程序运行。

业务调试

程序启动

执行 _main.py 脚本后,程序开始运行。

注册Coze

如果Coze没有被注册,那么无法进行正常交互,需要到主页 - 扣子进行设备注册,注册完成后我们需要复制AI智能体

复制AI智能体

a. 进入模板

进入模板->聊天陪伴->选择智能体。 可以根据自己需要选择智能体。

b. 复制模板

选择复制模板

c. 发布模板

选择语音通话,发布模板。

选中API后, 点击发布即可。

 

配置服务器信息到模组

a. 配置个人令牌

创建个人令牌, 并配置到模组demo中。具体过程参考下图。

 

b. 配置智能体botid

进入项目开发界面,点击进入刚刚复制得智能体。访问得url中携带botid信息,讲bot后面得字符串配置到模组demo中,具体信息参考下图说明。

  

运行代码

运行日志如下,开场白结束后,即可进行实时对话。

软件框架

框架设计图

代码讲解

类初始化(init

def __init__(self, url, auth, callback=None):
    # 初始化PCMA媒体实例(单例),检查占用状态
    self.media = singleton_media('pcma', 4)
    if self.media is None:
        print('media is busy, please stop it first')
        return
    # 初始化音频下行队列(存储服务端推送的音频数据)
    self.audio_queue = Queue()

    # 配置WS连接信息(地址+认证头)
    self.url = url
    self.headers = {"Authorization": "Bearer " + auth}

    # 初始化线程ID(用于后续停止线程)、活跃状态、音量配置
    self.ws_recv_task_id = None
    self.ws_audio_uplink_handler_id = None
    self.ws_audio_downlink_handler_id = None
    self.isactive = False
    self.volume = 8
    self.callback = callback

    # 若传入回调函数,初始化事件队列并启动回调处理线程
    if self.callback:
        self.event_queue = Queue()
        self.ws_callback_event_id = _thread.start_new_thread(self.ws_server_event_handler, ())

WS 连接启动(start)

建立 WebSocket 连接,发送初始化消息,启动消息接收线程。

def start(self):
    # 检查媒体实例空闲状态,避免资源冲突
    if self.media.is_idle() is False:
        print('media is busy, please stop it first')
        return
    # 创建WS客户端连接(基于uwebsocket.Client)
    self.client = uwebsocket.Client.connect(self.url, self.headers)
    # 发送update初始化数据包
    msg = ujson.dumps(packet.update)
    self.client.send(msg)

    # 启动WS消息接收线程,独立处理服务端推送数据
    self.ws_recv_task_id = _thread.start_new_thread(self.ws_recv_task, ())

资源停止释放(stop + stop_audio_stream)

停止音频流线程、释放媒体资源、关闭 WS 连接,完成全生命周期收尾。

  1. 音频流停止(stop_audio_stream)
def stop_audio_stream(self):
    # 停止音频上行线程并释放线程ID
    if self.ws_audio_uplink_handler_id:
        _thread.stop_thread(self.ws_audio_uplink_handler_id)
        self.ws_audio_uplink_handler_id = None
    # 停止音频下行线程并释放线程ID
    if self.ws_audio_downlink_handler_id:
        _thread.stop_thread(self.ws_audio_downlink_handler_id)
        self.ws_audio_downlink_handler_id = None
    # 停止媒体实例,释放音频硬件
    self.media.stop()
  1. 整体停止(stop)
def stop(self):
    # 先停止音频流
    self.stop_audio_stream()

    # 停止WS消息接收线程
    if self.ws_recv_task_id:
        _thread.stop_thread(self.ws_recv_task_id)
        self.ws_recv_task_id = None

    # 关闭WS连接,标记非活跃状态
    self.client.close()
    self.isactive = False

WS 消息接收(ws_recv_task)

持续监听 WS 服务端消息,分流音频数据和事件消息,处理断连异常。

def ws_recv_task(self):
    while True:
        try:
            # 接收WS消息(最大4096字节)
            recv_data = self.client.recv(4096)
            if recv_data is None or len(recv_data) <= 1:
                print('illegal data {}'.format(recv_data))
                continue
            # 分流:音频Delta消息 → 音频队列;其他消息 → 事件队列(回调处理)
            if  packet.EventType.CONVERSATION_AUDIO_DELTA in recv_data:
                self.audio_queue.put(recv_data)
            else:
                if self.callback:
                    self.event_queue.put(recv_data)
        except Exception as e:
            # 断连异常(EIO):停止音频流、关闭连接、触发回调通知
            if "EIO" in str(e):
                if self.isactive:
                    self.stop_audio_stream()
                    self.client.close()
                    self.isactive = False
                msg = '{"event_type": "client.disconnected"}'
                if self.callback:
                    self.event_queue.put(msg)
                break
            else:
                # 其他异常:打印错误日志
                if recv_data is not None:
                    print('recv error[{}] |{}|'.format(len(recv_data), recv_data))
                print('ws error |{}|'.format(e))
        utime.sleep_ms(1)

回调事件处理(ws_server_event_handler)

消费事件队列中的非音频消息,调用外部传入的回调函数处理。

def ws_server_event_handler(self):
    while True:
        # 阻塞获取事件队列中的消息
        recv_data = self.event_queue.get()
        # 调用外部回调函数,传递当前实例和消息数据
        self.callback(self, recv_data)
        utime.sleep_ms(1)

音频流启停(start_audio_stream)

启动音频硬件,配置音量,启动音频上下行线程,标记活跃状态

def start_audio_stream(self):
    # 启动媒体实例,设置音量
    self.media.start()
    self.media.set_volume(self.volume)

    # 启动音频上行/下行线程
    self.ws_audio_uplink_handler_id = _thread.start_new_thread(self.ws_audio_uplink_handler, ())
    self.ws_audio_downlink_handler_id = _thread.start_new_thread(self.ws_audio_downlink_handler, ())
    # 标记模块为活跃状态
    self.isactive = True

沟通无界,服务无限

了解更多关于移远公司、产品和技术支持的信息。