Extension Development

ASR Extension 开发完整指南

从零开始创建、开发、测试并发布一个完整的 ASR 扩展

ASR Extension 开发完整指南

本教程涵盖 ASR Extension 从创建、开发、测试到发布的完整流程。

使用指南

  • Basic 部分:实现基本功能,让 ASR Extension 能正常工作并通过端到端验证
  • Advanced 部分:实现生产级特性(自动重连、finalize机制、规范化日志、音频dump等),完善测试并发布到商店

根据你的需求选择阅读相应部分。

前置条件

  • 了解 TEN Extension 的开发、测试流程,参考 TEN 扩展开发完整指南
  • 掌握 Python 异步编程(asyncioasync/await
  • 安装好 tman 命令行工具,并熟悉其基本使用
  • 准备好 ASR 服务商的 API 密钥(用于测试)

目录

Part 1: Basic - 实现基础功能

  1. ASR Extension 简介
  2. 创建项目
  3. 理解接口规范
  4. 实现核心功能
  5. 测试
  6. Basic 开发自检清单

Part 2: Advanced - 生产级质量

  1. 自动重连机制
  2. 优化 Finalize 机制
  3. 规范化日志
  4. 错误上报规范
  5. 音频 Dump 功能
  6. 音频缓冲策略
  7. 完善测试
  8. Advanced 开发自检清单

Part 3: 发布与贡献

  1. 发布到 TEN Store
  2. 适配其他 ASR 服务

附录


Part 1: Basic - 实现基础功能

ASR Extension 简介

ASR Extension 是 TEN Framework 的标准积木组件之一,负责实时地将音频流转写为对应语言的文字。

在对话流中的位置

[RTC Extension] ──音频流──> [ASR Extension] ──文字流──> [LLM Extension]

创建项目

2.0 准备工作

如果你还没有 TEN Framework 仓库,请先克隆:

git clone https://github.com/TEN-framework/ten-framework.git
cd ten-framework

本教程假设你在 TEN Framework 仓库根目录下进行开发。

2.1 使用模板创建

使用 tman 命令行工具,从 ASR 扩展模板创建项目:

cd ai_agents/agents/ten_packages/extension
tman create extension my_asr_extension --template default_asr_python --template-data class_name_prefix=MyAsr

命令参数说明

  • extension my_asr_extension:创建一个 extension,目录名和插件名为 my_asr_extension
  • --template default_asr_python:使用 ASR Python 扩展模板
  • --template-data class_name_prefix=MyAsr:设置 Python Extension class 的类名前缀为 MyAsr(生成的类名为 MyAsrExtension

2.2 安装TEN packages的依赖

cd my_asr_extension
tman install --standalone

这条命令使用 tman 工具的依赖计算和下载能力,根据 manifest.json 中声明的依赖计算出依赖树。由于是开发态的依赖,需要使用 --standalone 参数,依赖会被安装在 .ten 目录下,包括开发依赖的 Python module 和独立测试时依赖的系统包。

2.3 项目结构

my_asr_extension/
├── manifest.json       # 扩展元数据
├── property.json       # 默认配置
├── requirements.txt    # Python 依赖
├── extension.py        # 主实现文件
├── addon.py            # 扩展入口
├── __init__.py         # Python 包初始化
├── docs/               # 文档目录
├── .vscode/            # VS Code 调试配置
└── tests/              # 测试文件

文件说明

manifest.json 和 property.json 是 TEN Extension 的标准 metadata 文件:

  • manifest.json:包含插件的名称、版本、介绍、依赖以及 schema 定义
  • property.json:定义插件的默认属性值

详细了解 TEN Framework 的元数据系统,请参考 元数据系统文档

requirements.txt 是 Python 插件独有的文件,用于声明插件本身对第三方 pip 包的依赖

extension.py 是 extension 的核心源代码,包含所有业务逻辑实现。

tests/ 文件夹用于插件独立测试,包含单元测试和测试配置。

理解接口规范

ASR Extension 继承自 AsyncASRBaseExtension,需实现以下方法:

必须实现的方法

vendor() -> str                                          
    # 返回服务商名称(如 "deepgram", "azure")

start_connection() -> None                               
    # 建立与服务商的连接

stop_connection() -> None                                
    # 停止连接

send_audio(frame: AudioFrame, session_id: str | None) -> bool  
    # 发送音频帧给服务商
    # session_id: 音频来源标识(单人场景可忽略)
    # 返回 True 表示发送成功

finalize(session_id: str | None) -> None                 
    # VAD 检测到说话结束后调用,触发最终结果

is_connected() -> bool                                   
    # 返回当前连接状态

input_audio_sample_rate() -> int                         
    # 返回期望的音频采样率(如 16000)

基类自动处理

基类 AsyncASRBaseExtension 已经帮你处理了:

  • 音频帧接收和队列管理
  • 性能指标计算(TTFW、TTLW)
  • 会话管理和元数据传递

你只需要专注于对接具体的 ASR 服务商。

实现核心功能

4.1 配置管理

配置模型设计

from pydantic import BaseModel
from typing import Dict, Optional

class MyAsrConfig(BaseModel):
    # 服务商参数(透传设计)
    params: Dict[str, Optional[str]] = {}
    
    # 音频 dump 功能
    dump: bool = False
    dump_path: Optional[str] = None

params 透传设计的优势

params 是一个字典,用于存放所有服务商相关的参数。这些参数会直接透传给服务商 SDK,而不需要在 Extension 代码中枚举。

为什么这样设计?

灵活性:使用者可以通过 property.json 配置任何服务商支持的参数,不受 Extension 实现限制

可扩展:服务商新增参数时,Extension 代码无需修改

简洁性:避免为每个参数都定义一个配置字段

示例

property.json
{
  "params": {
    "api_key": "your_api_key_here",
    "language": "zh-CN",
    "model": "nova-2",
    "punctuate": "true",
    "custom_param": "any_value"  // 任何服务商支持的参数
  },
  "dump": false
}

使用时直接透传给服务商:

# 从 params 中读取并传给服务商
api_key = self.config.params.get("api_key")
language = self.config.params.get("language", "en-US")  # 带默认值

注意:模板生成的 property.json 文件内容为空 {},你需要手动添加配置。

4.2 读取配置

@override
async def on_init(self, ten_env: AsyncTenEnv) -> None:
    await super().on_init(ten_env)
    
    # 读取配置
    config_json, _ = await ten_env.get_property_to_json("")
    self.config = MyAsrConfig.model_validate_json(config_json)
    
    ten_env.log_info(f"Config loaded: {self.config.model_dump_json()}")

4.3 实现基础方法

class MyAsrExtension(AsyncASRBaseExtension):
    def __init__(self, name: str):
        super().__init__(name)
        self.config: MyAsrConfig = MyAsrConfig()
        self.client = None  # 服务商 SDK 客户端
        self.is_connected_flag = False
        
    @override
    def vendor(self) -> str:
        return "my_vendor"  # 改为你的服务商名称
    
    @override
    def input_audio_sample_rate(self) -> int:
        # 从 params 中读取,带默认值
        return int(self.config.params.get("sample_rate", "16000"))
    
    @override
    def is_connected(self) -> bool:
        return self.is_connected_flag

4.4 实现连接管理

重要提示:以下代码仅为示例,实际实现需要根据你的服务商而定。

不同服务商的连接方式差异很大:

  • 有些提供封装好的 SDK(如 Azure、Deepgram)
  • 有些需要直接使用 WebSocket 连接
  • 有些使用 HTTP 流式 API

请参考你的服务商文档,使用对应的连接方式。

建立连接

@override
async def start_connection(self) -> None:
    """建立与服务商的连接"""
    try:
        await self.stop_connection()  # 先停止已有连接
        
        # 1. 初始化服务商客户端
        # 从 params 透传参数给服务商 SDK
        # 示例(需根据实际服务商调整):
        self.client = VendorClient(
            api_key=self.config.params.get("api_key"),
            language=self.config.params.get("language", "en-US"),
            # ... 其他参数直接从 params 中读取并透传
        )
        
        # 2. 注册事件处理器
        # 不同服务商的事件机制不同,这里仅为示例
        self.client.on("connected", self._on_open)
        self.client.on("result", self._on_transcript)
        self.client.on("error", self._on_error)
        
        # 3. 启动连接
        await self.client.connect()
        
    except Exception as e:
        self.ten_env.log_error(f"Failed to connect: {e}")

@override
async def stop_connection(self) -> None:
    """停止连接"""
    if self.client:
        await self.client.disconnect()
        self.client = None
        self.is_connected_flag = False

params 透传的价值:所有参数都从 params 字典中读取并透传给服务商 SDK,这样用户可以灵活配置任何服务商支持的参数,不需要修改代码。

参考现有实现

  • azure_asr_python - 使用 Azure SDK
  • deepgram_asr_python - 使用 Deepgram SDK

4.5 发送音频

@override
async def send_audio(self, audio_frame: AudioFrame, session_id: str | None) -> bool:
    """发送音频数据
    
    Args:
        audio_frame: 音频帧
        session_id: 会话ID,用于标识不同的对话轮次
        
    Returns:
        bool: 发送成功返回 True,否则返回 False
    """
    if not self.is_connected() or not self.client:
        return False
        
    try:
        audio_buf = audio_frame.get_buf()
        if audio_buf:
            await self.client.send(bytes(audio_buf))
            return True
    except Exception as e:
        self.ten_env.log_error(f"Failed to send audio: {e}")
    
    return False

4.6 处理识别结果

服务商会通过回调函数返回识别结果,你需要转换为标准格式并发送:

async def _on_transcript(self, result):
    """处理识别结果"""
    # 1. 提取文本
    text = result.text.strip()
    if not text:
        return
    
    # 2. 转换为标准 ASR 结果
    asr_result = ASRResult(
        text=text,
        final=result.is_final,           # 是否为最终结果
        start_ms=result.start_time_ms,   # 开始时间
        duration_ms=result.duration_ms   # 持续时间
    )
    
    # 3. 发送给下游
    await self.send_asr_result(asr_result)

其他必需的事件处理:

async def _on_open(self):
    """连接建立"""
    self.is_connected_flag = True
    self.ten_env.log_info("Connection opened")

async def _on_error(self, error):
    """处理错误"""
    self.ten_env.log_error(f"Vendor error: {error}")

4.7 实现 finalize

当 VAD 检测到用户说话结束时,会调用 finalize() 方法,通知服务商尽快返回最终结果:

@override
async def finalize(self, session_id: str | None) -> None:
    """触发最终结果"""
    if self.client:
        # 不同服务商有不同的 finalize 方式:
        # 1. 调用 finalize API(推荐)
        await self.client.finalize()
        
        # 2. 或发送静音包
        # await self.client.send_silence()
        
        # 3. 或断连重连
        # await self.stop_connection()
        # await self.start_connection()
        
        # 通知完成
        await self.send_asr_finalize_end()

不同服务商的 finalize 机制不同,选择最适合你的服务商的方式。详细说明见 Advanced 部分

到这里,基础功能已经实现完成!接下来可以进行测试。

测试

5.1 单元测试

单元测试用于验证 ASR Extension 的基础功能是否正常工作。

测试目标

基础功能至少需要验证:

  1. 配置加载:能正确读取 property.json 中的配置
  2. 连接建立:能成功连接到服务商
  3. 音频处理:能接收并发送音频帧
  4. 结果输出:能输出标准的 ASRResult 格式结果

测试流程示例

一个典型的单元测试会:

  1. 准备测试音频:从文件加载 PCM 音频数据
  2. 逐帧发送:将音频按帧发送给 ASR Extension
  3. 验证结果:检查是否收到标准格式的 ASRResult

ASRResult 标准格式

ASRResult(
    text="识别出的文本",          # 必需:识别文本
    final=True,                   # 必需:是否为最终结果
    start_ms=0,                   # 可选:开始时间(毫秒)
    duration_ms=1000,             # 可选:持续时间(毫秒)
    language="zh-CN",             # 可选:语言
    words=[]                      # 可选:词级别信息
)

运行测试

cd my_asr_extension
./tests/bin/start

模板已经包含基础测试用例,你可以在 tests/test_basic.py 中添加更多测试。

参考现有实现的测试:

  • azure_asr_python/tests/test_asr_result.py - 测试结果输出
  • deepgram_asr_python/tests/test_basic.py - 基础功能测试

5.2 端到端测试

在 TEN Agent 项目中使用 TMan Designer 替换 ASR 扩展:

cd /path/to/your/ten-agent-project
tman designer

通过可视化界面:

  1. 选择现有 ASR 节点
  2. 替换为你的 my_asr_extension
  3. 配置 API Key 等参数
  4. 启动并进行真实对话测试

Basic 开发自检清单

完成以下检查项,确保基础功能正常:

  • 项目创建:使用模板创建项目,安装依赖成功
  • 配置管理:能从property中正确读取配置,并正确透传给服务商SDK
  • 连接建立start_connection 能成功连接到服务商
  • 音频发送:能通过 send_audio 发送音频帧给服务商
  • 结果接收:能正确接收服务商返回的识别结果,并转换为标准格式
  • 结果发送:能通过 send_asr_result 发送标准化结果
  • Finalize:实现 finalize 方法(即使是简单实现)
  • 连接清理stop_connection 能正确关闭连接
  • 单元测试:基础测试用例能通过
  • 端到端测试:在 TEN Agent 中能完成基本对话

完成以上检查项后,你的 ASR Extension 已具备基本功能,可以在实际场景中使用。


Part 2: Advanced - 生产级质量

进阶部分将帮助你实现生产级 ASR Extension,包括稳定性、可维护性和可调试性。

自动重连机制

7.1 为什么需要重连

网络环境复杂,ASR 服务可能出现:

  • 临时网络抖动
  • 服务端主动断连
  • 超时错误

实现自动重连确保服务稳定性。

7.2 使用 ReconnectManager

参考现有 ASR 扩展(如 azure_asr_python)中的 reconnect_manager.py

from .reconnect_manager import ReconnectManager

class MyAsrExtension(AsyncASRBaseExtension):
    def __init__(self, name: str):
        super().__init__(name)
        # 最多重连 5 次,基础延迟 0.5 秒(指数退避)
        self.reconnect_manager = ReconnectManager(max_attempts=5, base_delay=0.5)

7.3 在连接成功时重置

async def _on_open(self, *args, **kwargs) -> None:
    self.is_connected_flag = True
    self.ten_env.log_info("Connection opened", category=LOG_CATEGORY_VENDOR)
    
    if self.reconnect_manager:
        self.reconnect_manager.mark_connection_successful()

7.4 处理断连和错误

async def _on_close(self, *args, **kwargs) -> None:
    self.is_connected_flag = False
    self.ten_env.log_warn("Connection closed", category=LOG_CATEGORY_VENDOR)
    
    if self.client:  # 意外断连
        await self._handle_reconnect()

async def _on_error(self, *args, **kwargs) -> None:
    error = args[1] if len(args) > 1 else None
    self.ten_env.log_error(f"Vendor error: {error}", category=LOG_CATEGORY_VENDOR)
    
    await self.send_asr_error(
        ModuleError(module=MODULE_NAME_ASR, code=ModuleErrorCode.NON_FATAL_ERROR.value, message=str(error))
    )
    await self._handle_reconnect()

7.5 实现重连逻辑

async def _handle_reconnect(self) -> None:
    if not self.reconnect_manager or not self.reconnect_manager.can_retry():
        self.ten_env.log_error("Max reconnection attempts reached", category=LOG_CATEGORY_VENDOR)
        await self.send_asr_error(
            ModuleError(module=MODULE_NAME_ASR, code=ModuleErrorCode.FATAL_ERROR.value, message="Reconnection failed")
        )
        return
    
    self.ten_env.log_info(
        f"Attempting reconnection {self.reconnect_manager.current_attempts + 1}/{self.reconnect_manager.max_attempts}",
        category=LOG_CATEGORY_VENDOR
    )
    
    success = await self.reconnect_manager.handle_reconnect(connect_func=self.start_connection)
    
    if success:
        self.ten_env.log_info("Reconnection successful", category=LOG_CATEGORY_VENDOR)
    else:
        self.ten_env.log_error("Reconnection failed", category=LOG_CATEGORY_VENDOR)

优化 Finalize 机制

8.1 Finalize 的作用

当 VAD 检测到人声结束后,立即触发 ASR 返回 final 结果,降低对话延迟。

8.2 三种实现方式

不同服务商支持的方式不同:

  1. 调用 API(推荐,如 Deepgram)
  2. 断连重连(需处理好重连逻辑)
  3. 发送静音包(需注意时间戳计算)

8.3 通知完成

@override
async def finalize(self, session_id: str | None) -> None:
    """触发最终结果
    
    Args:
        session_id: 会话ID,用于标识不同的对话轮次
    """
    if self.client:
        # 记录时间戳用于延迟计算
        self.last_finalize_timestamp = asyncio.get_event_loop().time() * 1000
        
        # 方式1: 调用 API
        await self.client.finalize()
        
        # 通知完成
        await self.send_asr_finalize_end()

规范化日志

9.1 日志分类

使用 category 参数让日志清晰:

  • LOG_CATEGORY_KEY_POINT: 关键节点(配置、初始化)
  • LOG_CATEGORY_VENDOR: 服务商相关(连接、结果、错误)
from ten_ai_base.const import LOG_CATEGORY_KEY_POINT, LOG_CATEGORY_VENDOR

self.ten_env.log_info("Config loaded", category=LOG_CATEGORY_KEY_POINT)
self.ten_env.log_error("Connection failed", category=LOG_CATEGORY_VENDOR)

9.2 关键日志点

必须打印的日志:

# 配置加载
self.ten_env.log_info(f"Config: {self.config.to_json(sensitive_handling=True)}", category=LOG_CATEGORY_KEY_POINT)

# 连接状态变化
self.ten_env.log_info("Connection opened", category=LOG_CATEGORY_VENDOR)
self.ten_env.log_warn("Connection closed", category=LOG_CATEGORY_VENDOR)

# 错误信息
self.ten_env.log_error(f"Vendor error: {error}", category=LOG_CATEGORY_VENDOR)

# 重连尝试
self.ten_env.log_info(f"Reconnecting {attempt}/{max_attempts}", category=LOG_CATEGORY_VENDOR)

9.3 敏感信息脱敏

from ten_ai_base.utils import encrypt

class MyAsrConfig(BaseModel):
    params: Dict[str, Optional[str]] = {}
    
    def to_json(self, sensitive_handling: bool = False) -> str:
        if not sensitive_handling:
            return self.model_dump_json()
        
        config = self.model_copy(deep=True)
        if config.params:
            for key in ['api_key', 'key', 'token', 'secret']:
                if key in config.params and config.params[key]:
                    config.params[key] = encrypt(config.params[key])
        return config.model_dump_json()

错误上报规范

10.1 错误分类

致命错误 (FATAL_ERROR)

  • 配置解析失败
  • 无效的 API Key
  • 初始连接失败
  • 达到最大重连次数

非致命错误 (NON_FATAL_ERROR)

  • 临时网络问题
  • 服务暂时不可用
  • 音频处理错误

10.2 包含供应商信息

from ten_ai_base.message import ModuleError, ModuleErrorCode, ModuleErrorVendorInfo

await self.send_asr_error(
    ModuleError(
        module=MODULE_NAME_ASR,
        code=ModuleErrorCode.NON_FATAL_ERROR.value,
        message=f"Vendor error: {str(error)}"
    ),
    ModuleErrorVendorInfo(
        vendor="deepgram",
        code=getattr(error, 'code', 'unknown'),
        message=str(error)
    )
)

音频 Dump 功能

11.1 为什么需要 Dump

在出现识别问题时,保存原始音频用于:

  • 复现问题
  • 分析音频质量
  • 对比不同服务商

11.2 实现 Dump

import os
from ten_ai_base.dumper import Dumper

DUMP_FILE_NAME = "my_asr_in.pcm"

class MyAsrExtension(AsyncASRBaseExtension):
    def __init__(self, name: str):
        super().__init__(name)
        self.audio_dumper: Optional[Dumper] = None
    
    @override
    async def on_init(self, ten_env: AsyncTenEnv) -> None:
        await super().on_init(ten_env)
        
        if self.config.dump:
            dump_file_path = os.path.join(self.config.dump_path, DUMP_FILE_NAME)
            self.audio_dumper = Dumper(dump_file_path)
            await self.audio_dumper.start()
    
    @override
    async def on_deinit(self, ten_env: AsyncTenEnv) -> None:
        await super().on_deinit(ten_env)
        if self.audio_dumper:
            await self.audio_dumper.stop()
            self.audio_dumper = None
    
    @override
    async def send_audio(self, audio_frame: AudioFrame) -> bool:
        buf = audio_frame.get_buf()
        
        # Dump 音频
        if self.audio_dumper and buf:
            await self.audio_dumper.push_bytes(bytes(buf))
        
        # 发送音频
        if self.is_connected() and self.client:
            await self.client.send(bytes(buf))
            return True
        
        return False

音频缓冲策略

12.1 为什么推荐保持模式

默认使用丢弃模式:连接断开时丢弃音频帧。

推荐使用保持模式:缓存音频帧,连接恢复后发送。

原因:保证时间戳准确性。

如果丢弃音频:

  • 时间:0-10s(发送)→ 10-15s(断连丢弃)→ 15-20s(发送)
  • ASR 只收到 15s 音频,但实际时间跨度 20s
  • 时间戳偏差 5s,影响对话同步和打断检测

12.2 配置保持模式

from ten_ai_base.asr import ASRBufferConfig, ASRBufferConfigModeKeep

@override
def buffer_strategy(self) -> ASRBufferConfig:
    return ASRBufferConfig(
        mode=ASRBufferConfigModeKeep(byte_limit=10 * 1024 * 1024)  # 10MB 缓存上限
    )

完善测试

13.1 单元测试覆盖

Advanced 部分的单元测试需要覆盖生产级特性。使用 Mock 避免真实 API 调用,确保测试快速、稳定、可重复。

测试用例设计

参考 azure_asr_python/tests 的实现,高级测试应覆盖:

1. 重连能力测试 (test_reconnect.py)

测试目标:验证 Extension 能够自动重连并恢复服务

用例设计

# 模拟供应商断连场景
def test_reconnect():
    # 1. Mock 供应商:前3次连接会断开,第4次成功
    # 2. 验证 Extension 会自动重试
    # 3. 验证重连成功后能正常工作
    # 4. 检查错误上报次数(应该有3次 NON_FATAL_ERROR)

验证点

  • ✅ 断连后自动触发重连
  • ✅ 使用指数退避策略
  • ✅ 达到重连次数上限时上报 FATAL_ERROR
  • ✅ 重连成功后重置计数器

2. 非法参数测试 (test_invalid_params.py)

测试目标:验证配置错误时的错误处理

用例设计

# 使用非法参数启动 Extension
def test_invalid_params():
    # 1. 提供空的或无效的 params(如缺少 api_key)
    # 2. 启动 Extension
    # 3. 验证收到 FATAL_ERROR
    # 4. 检查错误信息是否包含有用信息

验证点

  • ✅ 配置验证失败时上报 FATAL_ERROR
  • ✅ 错误消息清晰,便于排查
  • ✅ Extension 不会崩溃

3. 音频 Dump 测试 (test_dump.py)

测试目标:验证音频 Dump 功能的完整性

用例设计

# 开启 dump 后发送音频
def test_dump():
    # 1. 配置 dump=True 和 dump_path
    # 2. 发送 N 帧音频(每帧有特定的字节模式)
    # 3. 测试结束后检查 dump 文件
    # 4. 验证文件大小 = N * 帧大小
    # 5. 验证每一帧的内容完全一致

验证点

  • ✅ Dump 文件被创建
  • ✅ 所有发送的音频都被完整 dump
  • ✅ Dump 内容与发送内容完全一致(逐字节验证)
  • ✅ 帧顺序正确

4. Finalize 延迟测试 (test_finalize.py)

测试目标:验证 Extension 能快速输出 final 结果

用例设计

# 测试 finalize 响应速度
def test_finalize():
    # 1. 持续发送音频
    # 2. 1.5秒后发送 asr_finalize 事件
    # 3. Mock 供应商在收到 finalize 后快速返回 final 结果
    # 4. 验证收到 asr_finalize_end 事件
    # 5. 检查 finalize_id 和 metadata 是否正确传递

验证点

  • ✅ 收到 asr_finalize 后触发供应商的 finalize
  • ✅ 快速收到 final 结果(< 300ms 典型)
  • ✅ 发送 asr_finalize_end 通知下游
  • finalize_idsession_id 正确传递

5. 结果格式测试 (test_asr_result.py)

测试目标:验证 ASR 结果的标准格式

用例设计

# 验证输出结果的数据结构
def test_asr_result():
    # 1. Mock 供应商返回识别结果
    # 2. 验证 ASRResult 包含所有必需字段
    # 3. 验证 interim 和 final 结果都正确
    # 4. 验证 metadata 正确传递(如 session_id)

验证点

  • ✅ 包含必需字段:text, final, start_ms, duration_ms, language
  • ✅ 可选字段正确填充:words, metadata
  • session_id 从输入正确传递到输出

6. 错误上报测试 (test_vendor_error.py)

测试目标:验证错误分类和供应商信息上报

用例设计

# 模拟供应商返回错误
def test_vendor_error():
    # 1. Mock 供应商返回不同类型的错误
    # 2. 验证 Extension 上报正确的错误类型
    # 3. 检查是否包含 ModuleErrorVendorInfo

验证点

  • ✅ 临时错误上报 NON_FATAL_ERROR
  • ✅ 严重错误上报 FATAL_ERROR
  • ✅ 包含供应商错误码和消息
  • ✅ 错误信息便于调试

7. 性能指标测试 (test_metrics.py)

测试目标:验证性能指标正确计算和上报

用例设计

# 测试 TTFW、TTLW 指标
def test_metrics():
    # 1. 发送音频
    # 2. Mock 供应商在特定时间返回首字和最后一字
    # 3. 验证计算出的 TTFW 和 TTLW 指标

验证点

  • ✅ TTFW(首字延迟)正确计算
  • ✅ TTLW(末字延迟)正确计算
  • ✅ 指标通过 metrics 消息上报

运行测试

cd my_asr_extension
./tests/bin/start

Mock 的重要性:使用 Mock 而不是真实 API 的原因:

  • 🚀 速度快:测试在几秒内完成
  • 💰 零成本:不消耗 API 配额
  • 🎯 可控性:可以精确模拟各种场景(断连、错误、延迟)
  • 🔁 可重复:结果稳定,适合 CI/CD

参考 azure_asr_python/tests/mock.py 了解如何实现 Mock。

13.2 集成测试(Guarder)

使用真实 API Key 运行:

cd ai_agents
task asr-guarder-test EXTENSION=my_asr_extension

测试内容:

  • ASR 识别准确性
  • Finalize 延迟
  • 多语言支持
  • 错误处理
  • 性能指标

13.3 VS Code 调试

使用预置的 .vscode/launch.json

  1. 在代码中设置断点
  2. F5 启动调试
  3. 选择 "Python: Test Extension"

Advanced 开发自检清单

完成以下检查项,确保生产级质量:

稳定性

  • 实现 ReconnectManager 自动重连
  • 指数退避策略(避免频繁重连)
  • 最大重连次数限制
  • 连接成功后重置重连计数器

Finalize 机制

  • 实现 VAD 触发后的快速 finalize
  • 调用 send_asr_finalize_end() 通知完成
  • 如使用断连方式,处理好重连逻辑
  • 如使用静音包,正确计算时间戳

日志规范

  • 使用 LOG_CATEGORY_KEY_POINTLOG_CATEGORY_VENDOR 分类
  • 记录连接状态变化
  • 记录所有错误信息
  • 敏感信息脱敏(API Key 等)

错误上报

  • 正确区分 FATAL_ERROR 和 NON_FATAL_ERROR
  • 包含 ModuleErrorVendorInfo 供应商信息
  • 配置错误上报 FATAL_ERROR
  • 临时网络错误上报 NON_FATAL_ERROR

音频 Dump

  • 实现 Dumper 集成
  • 支持 dumpdump_path 配置
  • on_init 初始化,on_deinit 清理

音频缓冲

  • 实现 buffer_strategy() 返回保持模式
  • 设置合理的缓存上限(如 10MB)

测试覆盖

  • 单元测试覆盖所有核心功能
  • 使用 Mock 避免真实 API 调用
  • Guarder 测试通过(真实 API)
  • 端到端测试验证稳定性

代码质量

  • 遵循项目代码风格
  • 添加必要的注释
  • 提供清晰的 README

完成以上检查项后,你的 ASR Extension 达到生产级质量,可以贡献到社区并发布到商店。


Part 3: 发布与贡献

发布到 TEN Store

15.1 提交到主仓库

# Fork TEN Framework 仓库
git clone https://github.com/your-username/ten-framework.git
cd ten-framework

# 复制扩展到正确位置
cp -r /path/to/your/my_asr_extension ai_agents/agents/ten_packages/extension/

# 创建分支
git checkout -b feat/add-my-asr-extension

# 提交
git add ai_agents/agents/ten_packages/extension/my_asr_extension/
git commit -m "feat: add my_asr_extension for [供应商名称] ASR service"
git push origin feat/add-my-asr-extension

15.2 创建 Pull Request

  1. 访问你的 fork 仓库
  2. 点击 "Compare & pull request"
  3. 填写标题和描述(功能、特性、测试情况)
  4. 提交等待审查

15.3 自动发布

PR 合并后:

  • ✅ 自动上传到 TEN Store
  • ✅ 自动处理版本号
  • ✅ 全球开发者可下载使用

适配其他 ASR 服务

参考 TEN Framework 中的其他成品 ASR 扩展:

ten-framework/ai_agents/agents/ten_packages/extension/
├── azure_asr_python/          # Azure Speech Services
├── deepgram_asr_python/       # Deepgram ASR
├── google_asr_python/         # Google Cloud Speech
├── xfyun_asr_python/          # 科大讯飞
└── ...                        # 更多扩展

所有扩展都遵循相同架构,可作为适配新服务的参考。


附录

A. ASR Interface 规范

A.1 Interface 继承

manifest.json 中声明:

{
  "api": {
    "interface": [
      {
        "import_uri": "../../system/ten_ai_base/api/asr-interface.json"
      }
    ]
  }
}

A.2 标准属性

asr-interface.json 定义的标准属性:

  • dump: 布尔值,是否开启音频 dump
  • dump_path: 字符串,音频 dump 存储路径

A.3 扩展属性

api.property 中声明特有属性:

{
  "api": {
    "property": {
      "properties": {
        "params": {
          "type": "object",
          "properties": {
            "key": { "type": "string" },
            "language": { "type": "string" }
          }
        }
      }
    }
  }
}

A.4 输入输出数据格式

输入

  • pcm_frame: PCM 音频帧
  • asr_finalize: VAD 检测到人声结束事件

输出

  • asr_result: 识别结果
  • asr_finalize_end: Finalize 完成通知
  • error: 错误信息
  • metrics: 性能指标

详见 asr-interface.json 文件。

B. 基类方法参考

B.1 AsyncASRBaseExtension

必须实现

  • vendor() -> str
  • start_connection() -> None
  • stop_connection() -> None
  • send_audio(frame: AudioFrame, session_id: str | None) -> bool
  • finalize(session_id: str | None) -> None
  • is_connected() -> bool
  • input_audio_sample_rate() -> int

可选实现

  • input_audio_channels() -> int
  • input_audio_sample_width() -> int
  • buffer_strategy() -> ASRBufferConfig
  • audio_actual_send_metrics_interval() -> int

工具方法

  • send_asr_result(asr_result: ASRResult)
  • send_asr_error(error: ModuleError, vendor_info: ModuleErrorVendorInfo | None)
  • send_asr_finalize_end()
  • send_connect_delay_metrics(connect_delay: int)
  • send_vendor_metrics(vendor_metrics: dict)

C. 完整示例代码

参考 deepgram_asr_pythonazure_asr_python 扩展的完整实现:

ten-framework/ai_agents/agents/ten_packages/extension/
├── deepgram_asr_python/
│   ├── extension.py           # 完整实现
│   ├── reconnect_manager.py   # 重连管理器
│   ├── manifest.json
│   ├── property.json
│   └── tests/                 # 完整测试用例

D. 调试工具

D.1 VS Code 调试配置

.vscode/launch.json

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Python: Test Extension",
      "type": "python",
      "request": "launch",
      "program": "${workspaceFolder}/tests/bin/start",
      "console": "integratedTerminal",
      "cwd": "${workspaceFolder}",
      "env": {
        "PYTHONPATH": "${workspaceFolder}:..."
      }
    }
  ]
}

D.2 调试特定测试

修改 args 参数:

{
  "args": [
    "tests/test_basic.py::test_asr_basic_functionality",
    "-v"
  ]
}

总结

本教程分为 Basic 和 Advanced 两部分:

Basic 部分

  • 快速创建项目
  • 实现核心功能
  • 通过基础测试
  • 完成端到端验证

Advanced 部分

  • 自动重连机制
  • 优化 finalize
  • 规范化日志
  • 错误上报
  • 音频 dump
  • 完善测试

按照自检清单逐项完成,即可开发出生产级 ASR Extension。

开发愉快! 遇到问题欢迎在 TEN Framework GitHub 提 Issue。

目录

ASR Extension 开发完整指南
使用指南
前置条件
目录
Part 1: Basic - 实现基础功能
Part 2: Advanced - 生产级质量
Part 3: 发布与贡献
附录
Part 1: Basic - 实现基础功能
ASR Extension 简介
在对话流中的位置
创建项目
2.0 准备工作
2.1 使用模板创建
2.2 安装TEN packages的依赖
2.3 项目结构
文件说明
理解接口规范
必须实现的方法
基类自动处理
实现核心功能
4.1 配置管理
配置模型设计
params 透传设计的优势
4.2 读取配置
4.3 实现基础方法
4.4 实现连接管理
建立连接
4.5 发送音频
4.6 处理识别结果
4.7 实现 finalize
测试
5.1 单元测试
测试目标
测试流程示例
运行测试
5.2 端到端测试
Basic 开发自检清单
Part 2: Advanced - 生产级质量
自动重连机制
7.1 为什么需要重连
7.2 使用 ReconnectManager
7.3 在连接成功时重置
7.4 处理断连和错误
7.5 实现重连逻辑
优化 Finalize 机制
8.1 Finalize 的作用
8.2 三种实现方式
8.3 通知完成
规范化日志
9.1 日志分类
9.2 关键日志点
9.3 敏感信息脱敏
错误上报规范
10.1 错误分类
10.2 包含供应商信息
音频 Dump 功能
11.1 为什么需要 Dump
11.2 实现 Dump
音频缓冲策略
12.1 为什么推荐保持模式
12.2 配置保持模式
完善测试
13.1 单元测试覆盖
测试用例设计
运行测试
13.2 集成测试(Guarder)
13.3 VS Code 调试
Advanced 开发自检清单
Part 3: 发布与贡献
发布到 TEN Store
15.1 提交到主仓库
15.2 创建 Pull Request
15.3 自动发布
适配其他 ASR 服务
附录
A. ASR Interface 规范
A.1 Interface 继承
A.2 标准属性
A.3 扩展属性
A.4 输入输出数据格式
B. 基类方法参考
B.1 AsyncASRBaseExtension
C. 完整示例代码
D. 调试工具
D.1 VS Code 调试配置
D.2 调试特定测试
总结
ASR Extension 开发完整指南 | TEN Framework