ASR Extension 开发完整指南
从零开始创建、开发、测试并发布一个完整的 ASR 扩展
ASR Extension 开发完整指南
本教程涵盖 ASR Extension 从创建、开发、测试到发布的完整流程。
使用指南
- Basic 部分:实现基本功能,让 ASR Extension 能正常工作并通过端到端验证
- Advanced 部分:实现生产级特性(自动重连、finalize机制、规范化日志、音频dump等),完善测试并发布到商店
根据你的需求选择阅读相应部分。
前置条件
- 了解 TEN Extension 的开发、测试流程,参考 TEN 扩展开发完整指南
- 掌握 Python 异步编程(
asyncio、async/await) - 安装好 tman 命令行工具,并熟悉其基本使用
- 准备好 ASR 服务商的 API 密钥(用于测试)
目录
Part 1: Basic - 实现基础功能
Part 2: Advanced - 生产级质量
Part 3: 发布与贡献
附录
Part 1: Basic - 实现基础功能
ASR Extension 简介
ASR Extension 是 TEN Framework 的标准积木组件之一,负责实时地将音频流转写为对应语言的文字。
在对话流中的位置
[RTC Extension] ──音频流──> [ASR Extension] ──文字流──> [LLM Extension]创建项目
2.0 准备工作
如果你还没有 TEN Framework 仓库,请先克隆:
git clone https://github.com/TEN-framework/ten-framework.git
cd ten-framework本教程假设你在 TEN Framework 仓库根目录下进行开发。
2.1 使用模板创建
使用 tman 命令行工具,从 ASR 扩展模板创建项目:
cd ai_agents/agents/ten_packages/extension
tman create extension my_asr_extension --template default_asr_python --template-data class_name_prefix=MyAsr命令参数说明:
extension my_asr_extension:创建一个 extension,目录名和插件名为my_asr_extension--template default_asr_python:使用 ASR Python 扩展模板--template-data class_name_prefix=MyAsr:设置 Python Extension class 的类名前缀为MyAsr(生成的类名为MyAsrExtension)
2.2 安装TEN packages的依赖
cd my_asr_extension
tman install --standalone这条命令使用 tman 工具的依赖计算和下载能力,根据 manifest.json 中声明的依赖计算出依赖树。由于是开发态的依赖,需要使用 --standalone 参数,依赖会被安装在 .ten 目录下,包括开发依赖的 Python module 和独立测试时依赖的系统包。
2.3 项目结构
my_asr_extension/
├── manifest.json # 扩展元数据
├── property.json # 默认配置
├── requirements.txt # Python 依赖
├── extension.py # 主实现文件
├── addon.py # 扩展入口
├── __init__.py # Python 包初始化
├── docs/ # 文档目录
├── .vscode/ # VS Code 调试配置
└── tests/ # 测试文件文件说明
manifest.json 和 property.json 是 TEN Extension 的标准 metadata 文件:
manifest.json:包含插件的名称、版本、介绍、依赖以及 schema 定义property.json:定义插件的默认属性值
详细了解 TEN Framework 的元数据系统,请参考 元数据系统文档。
requirements.txt 是 Python 插件独有的文件,用于声明插件本身对第三方 pip 包的依赖。
extension.py 是 extension 的核心源代码,包含所有业务逻辑实现。
tests/ 文件夹用于插件独立测试,包含单元测试和测试配置。
理解接口规范
ASR Extension 继承自 AsyncASRBaseExtension,需实现以下方法:
必须实现的方法
vendor() -> str
# 返回服务商名称(如 "deepgram", "azure")
start_connection() -> None
# 建立与服务商的连接
stop_connection() -> None
# 停止连接
send_audio(frame: AudioFrame, session_id: str | None) -> bool
# 发送音频帧给服务商
# session_id: 音频来源标识(单人场景可忽略)
# 返回 True 表示发送成功
finalize(session_id: str | None) -> None
# VAD 检测到说话结束后调用,触发最终结果
is_connected() -> bool
# 返回当前连接状态
input_audio_sample_rate() -> int
# 返回期望的音频采样率(如 16000)基类自动处理
基类 AsyncASRBaseExtension 已经帮你处理了:
- 音频帧接收和队列管理
- 性能指标计算(TTFW、TTLW)
- 会话管理和元数据传递
你只需要专注于对接具体的 ASR 服务商。
实现核心功能
4.1 配置管理
配置模型设计
from pydantic import BaseModel
from typing import Dict, Optional
class MyAsrConfig(BaseModel):
# 服务商参数(透传设计)
params: Dict[str, Optional[str]] = {}
# 音频 dump 功能
dump: bool = False
dump_path: Optional[str] = Noneparams 透传设计的优势
params 是一个字典,用于存放所有服务商相关的参数。这些参数会直接透传给服务商 SDK,而不需要在 Extension 代码中枚举。
为什么这样设计?
✅ 灵活性:使用者可以通过 property.json 配置任何服务商支持的参数,不受 Extension 实现限制
✅ 可扩展:服务商新增参数时,Extension 代码无需修改
✅ 简洁性:避免为每个参数都定义一个配置字段
示例:
{
"params": {
"api_key": "your_api_key_here",
"language": "zh-CN",
"model": "nova-2",
"punctuate": "true",
"custom_param": "any_value" // 任何服务商支持的参数
},
"dump": false
}使用时直接透传给服务商:
# 从 params 中读取并传给服务商
api_key = self.config.params.get("api_key")
language = self.config.params.get("language", "en-US") # 带默认值注意:模板生成的 property.json 文件内容为空 {},你需要手动添加配置。
4.2 读取配置
@override
async def on_init(self, ten_env: AsyncTenEnv) -> None:
await super().on_init(ten_env)
# 读取配置
config_json, _ = await ten_env.get_property_to_json("")
self.config = MyAsrConfig.model_validate_json(config_json)
ten_env.log_info(f"Config loaded: {self.config.model_dump_json()}")4.3 实现基础方法
class MyAsrExtension(AsyncASRBaseExtension):
def __init__(self, name: str):
super().__init__(name)
self.config: MyAsrConfig = MyAsrConfig()
self.client = None # 服务商 SDK 客户端
self.is_connected_flag = False
@override
def vendor(self) -> str:
return "my_vendor" # 改为你的服务商名称
@override
def input_audio_sample_rate(self) -> int:
# 从 params 中读取,带默认值
return int(self.config.params.get("sample_rate", "16000"))
@override
def is_connected(self) -> bool:
return self.is_connected_flag4.4 实现连接管理
重要提示:以下代码仅为示例,实际实现需要根据你的服务商而定。
不同服务商的连接方式差异很大:
- 有些提供封装好的 SDK(如 Azure、Deepgram)
- 有些需要直接使用 WebSocket 连接
- 有些使用 HTTP 流式 API
请参考你的服务商文档,使用对应的连接方式。
建立连接
@override
async def start_connection(self) -> None:
"""建立与服务商的连接"""
try:
await self.stop_connection() # 先停止已有连接
# 1. 初始化服务商客户端
# 从 params 透传参数给服务商 SDK
# 示例(需根据实际服务商调整):
self.client = VendorClient(
api_key=self.config.params.get("api_key"),
language=self.config.params.get("language", "en-US"),
# ... 其他参数直接从 params 中读取并透传
)
# 2. 注册事件处理器
# 不同服务商的事件机制不同,这里仅为示例
self.client.on("connected", self._on_open)
self.client.on("result", self._on_transcript)
self.client.on("error", self._on_error)
# 3. 启动连接
await self.client.connect()
except Exception as e:
self.ten_env.log_error(f"Failed to connect: {e}")
@override
async def stop_connection(self) -> None:
"""停止连接"""
if self.client:
await self.client.disconnect()
self.client = None
self.is_connected_flag = Falseparams 透传的价值:所有参数都从 params 字典中读取并透传给服务商 SDK,这样用户可以灵活配置任何服务商支持的参数,不需要修改代码。
参考现有实现:
azure_asr_python- 使用 Azure SDKdeepgram_asr_python- 使用 Deepgram SDK
4.5 发送音频
@override
async def send_audio(self, audio_frame: AudioFrame, session_id: str | None) -> bool:
"""发送音频数据
Args:
audio_frame: 音频帧
session_id: 会话ID,用于标识不同的对话轮次
Returns:
bool: 发送成功返回 True,否则返回 False
"""
if not self.is_connected() or not self.client:
return False
try:
audio_buf = audio_frame.get_buf()
if audio_buf:
await self.client.send(bytes(audio_buf))
return True
except Exception as e:
self.ten_env.log_error(f"Failed to send audio: {e}")
return False4.6 处理识别结果
服务商会通过回调函数返回识别结果,你需要转换为标准格式并发送:
async def _on_transcript(self, result):
"""处理识别结果"""
# 1. 提取文本
text = result.text.strip()
if not text:
return
# 2. 转换为标准 ASR 结果
asr_result = ASRResult(
text=text,
final=result.is_final, # 是否为最终结果
start_ms=result.start_time_ms, # 开始时间
duration_ms=result.duration_ms # 持续时间
)
# 3. 发送给下游
await self.send_asr_result(asr_result)其他必需的事件处理:
async def _on_open(self):
"""连接建立"""
self.is_connected_flag = True
self.ten_env.log_info("Connection opened")
async def _on_error(self, error):
"""处理错误"""
self.ten_env.log_error(f"Vendor error: {error}")4.7 实现 finalize
当 VAD 检测到用户说话结束时,会调用 finalize() 方法,通知服务商尽快返回最终结果:
@override
async def finalize(self, session_id: str | None) -> None:
"""触发最终结果"""
if self.client:
# 不同服务商有不同的 finalize 方式:
# 1. 调用 finalize API(推荐)
await self.client.finalize()
# 2. 或发送静音包
# await self.client.send_silence()
# 3. 或断连重连
# await self.stop_connection()
# await self.start_connection()
# 通知完成
await self.send_asr_finalize_end()不同服务商的 finalize 机制不同,选择最适合你的服务商的方式。详细说明见 Advanced 部分。
到这里,基础功能已经实现完成!接下来可以进行测试。
测试
5.1 单元测试
单元测试用于验证 ASR Extension 的基础功能是否正常工作。
测试目标
基础功能至少需要验证:
- 配置加载:能正确读取
property.json中的配置 - 连接建立:能成功连接到服务商
- 音频处理:能接收并发送音频帧
- 结果输出:能输出标准的
ASRResult格式结果
测试流程示例
一个典型的单元测试会:
- 准备测试音频:从文件加载 PCM 音频数据
- 逐帧发送:将音频按帧发送给 ASR Extension
- 验证结果:检查是否收到标准格式的
ASRResult
ASRResult 标准格式:
ASRResult(
text="识别出的文本", # 必需:识别文本
final=True, # 必需:是否为最终结果
start_ms=0, # 可选:开始时间(毫秒)
duration_ms=1000, # 可选:持续时间(毫秒)
language="zh-CN", # 可选:语言
words=[] # 可选:词级别信息
)运行测试
cd my_asr_extension
./tests/bin/start模板已经包含基础测试用例,你可以在 tests/test_basic.py 中添加更多测试。
参考现有实现的测试:
azure_asr_python/tests/test_asr_result.py- 测试结果输出deepgram_asr_python/tests/test_basic.py- 基础功能测试
5.2 端到端测试
在 TEN Agent 项目中使用 TMan Designer 替换 ASR 扩展:
cd /path/to/your/ten-agent-project
tman designer通过可视化界面:
- 选择现有 ASR 节点
- 替换为你的
my_asr_extension - 配置 API Key 等参数
- 启动并进行真实对话测试
Basic 开发自检清单
完成以下检查项,确保基础功能正常:
- 项目创建:使用模板创建项目,安装依赖成功
- 配置管理:能从property中正确读取配置,并正确透传给服务商SDK
- 连接建立:
start_connection能成功连接到服务商 - 音频发送:能通过
send_audio发送音频帧给服务商 - 结果接收:能正确接收服务商返回的识别结果,并转换为标准格式
- 结果发送:能通过
send_asr_result发送标准化结果 - Finalize:实现
finalize方法(即使是简单实现) - 连接清理:
stop_connection能正确关闭连接 - 单元测试:基础测试用例能通过
- 端到端测试:在 TEN Agent 中能完成基本对话
完成以上检查项后,你的 ASR Extension 已具备基本功能,可以在实际场景中使用。
Part 2: Advanced - 生产级质量
进阶部分将帮助你实现生产级 ASR Extension,包括稳定性、可维护性和可调试性。
自动重连机制
7.1 为什么需要重连
网络环境复杂,ASR 服务可能出现:
- 临时网络抖动
- 服务端主动断连
- 超时错误
实现自动重连确保服务稳定性。
7.2 使用 ReconnectManager
参考现有 ASR 扩展(如 azure_asr_python)中的 reconnect_manager.py。
from .reconnect_manager import ReconnectManager
class MyAsrExtension(AsyncASRBaseExtension):
def __init__(self, name: str):
super().__init__(name)
# 最多重连 5 次,基础延迟 0.5 秒(指数退避)
self.reconnect_manager = ReconnectManager(max_attempts=5, base_delay=0.5)7.3 在连接成功时重置
async def _on_open(self, *args, **kwargs) -> None:
self.is_connected_flag = True
self.ten_env.log_info("Connection opened", category=LOG_CATEGORY_VENDOR)
if self.reconnect_manager:
self.reconnect_manager.mark_connection_successful()7.4 处理断连和错误
async def _on_close(self, *args, **kwargs) -> None:
self.is_connected_flag = False
self.ten_env.log_warn("Connection closed", category=LOG_CATEGORY_VENDOR)
if self.client: # 意外断连
await self._handle_reconnect()
async def _on_error(self, *args, **kwargs) -> None:
error = args[1] if len(args) > 1 else None
self.ten_env.log_error(f"Vendor error: {error}", category=LOG_CATEGORY_VENDOR)
await self.send_asr_error(
ModuleError(module=MODULE_NAME_ASR, code=ModuleErrorCode.NON_FATAL_ERROR.value, message=str(error))
)
await self._handle_reconnect()7.5 实现重连逻辑
async def _handle_reconnect(self) -> None:
if not self.reconnect_manager or not self.reconnect_manager.can_retry():
self.ten_env.log_error("Max reconnection attempts reached", category=LOG_CATEGORY_VENDOR)
await self.send_asr_error(
ModuleError(module=MODULE_NAME_ASR, code=ModuleErrorCode.FATAL_ERROR.value, message="Reconnection failed")
)
return
self.ten_env.log_info(
f"Attempting reconnection {self.reconnect_manager.current_attempts + 1}/{self.reconnect_manager.max_attempts}",
category=LOG_CATEGORY_VENDOR
)
success = await self.reconnect_manager.handle_reconnect(connect_func=self.start_connection)
if success:
self.ten_env.log_info("Reconnection successful", category=LOG_CATEGORY_VENDOR)
else:
self.ten_env.log_error("Reconnection failed", category=LOG_CATEGORY_VENDOR)优化 Finalize 机制
8.1 Finalize 的作用
当 VAD 检测到人声结束后,立即触发 ASR 返回 final 结果,降低对话延迟。
8.2 三种实现方式
不同服务商支持的方式不同:
- 调用 API(推荐,如 Deepgram)
- 断连重连(需处理好重连逻辑)
- 发送静音包(需注意时间戳计算)
8.3 通知完成
@override
async def finalize(self, session_id: str | None) -> None:
"""触发最终结果
Args:
session_id: 会话ID,用于标识不同的对话轮次
"""
if self.client:
# 记录时间戳用于延迟计算
self.last_finalize_timestamp = asyncio.get_event_loop().time() * 1000
# 方式1: 调用 API
await self.client.finalize()
# 通知完成
await self.send_asr_finalize_end()规范化日志
9.1 日志分类
使用 category 参数让日志清晰:
LOG_CATEGORY_KEY_POINT: 关键节点(配置、初始化)LOG_CATEGORY_VENDOR: 服务商相关(连接、结果、错误)
from ten_ai_base.const import LOG_CATEGORY_KEY_POINT, LOG_CATEGORY_VENDOR
self.ten_env.log_info("Config loaded", category=LOG_CATEGORY_KEY_POINT)
self.ten_env.log_error("Connection failed", category=LOG_CATEGORY_VENDOR)9.2 关键日志点
必须打印的日志:
# 配置加载
self.ten_env.log_info(f"Config: {self.config.to_json(sensitive_handling=True)}", category=LOG_CATEGORY_KEY_POINT)
# 连接状态变化
self.ten_env.log_info("Connection opened", category=LOG_CATEGORY_VENDOR)
self.ten_env.log_warn("Connection closed", category=LOG_CATEGORY_VENDOR)
# 错误信息
self.ten_env.log_error(f"Vendor error: {error}", category=LOG_CATEGORY_VENDOR)
# 重连尝试
self.ten_env.log_info(f"Reconnecting {attempt}/{max_attempts}", category=LOG_CATEGORY_VENDOR)9.3 敏感信息脱敏
from ten_ai_base.utils import encrypt
class MyAsrConfig(BaseModel):
params: Dict[str, Optional[str]] = {}
def to_json(self, sensitive_handling: bool = False) -> str:
if not sensitive_handling:
return self.model_dump_json()
config = self.model_copy(deep=True)
if config.params:
for key in ['api_key', 'key', 'token', 'secret']:
if key in config.params and config.params[key]:
config.params[key] = encrypt(config.params[key])
return config.model_dump_json()错误上报规范
10.1 错误分类
致命错误 (FATAL_ERROR):
- 配置解析失败
- 无效的 API Key
- 初始连接失败
- 达到最大重连次数
非致命错误 (NON_FATAL_ERROR):
- 临时网络问题
- 服务暂时不可用
- 音频处理错误
10.2 包含供应商信息
from ten_ai_base.message import ModuleError, ModuleErrorCode, ModuleErrorVendorInfo
await self.send_asr_error(
ModuleError(
module=MODULE_NAME_ASR,
code=ModuleErrorCode.NON_FATAL_ERROR.value,
message=f"Vendor error: {str(error)}"
),
ModuleErrorVendorInfo(
vendor="deepgram",
code=getattr(error, 'code', 'unknown'),
message=str(error)
)
)音频 Dump 功能
11.1 为什么需要 Dump
在出现识别问题时,保存原始音频用于:
- 复现问题
- 分析音频质量
- 对比不同服务商
11.2 实现 Dump
import os
from ten_ai_base.dumper import Dumper
DUMP_FILE_NAME = "my_asr_in.pcm"
class MyAsrExtension(AsyncASRBaseExtension):
def __init__(self, name: str):
super().__init__(name)
self.audio_dumper: Optional[Dumper] = None
@override
async def on_init(self, ten_env: AsyncTenEnv) -> None:
await super().on_init(ten_env)
if self.config.dump:
dump_file_path = os.path.join(self.config.dump_path, DUMP_FILE_NAME)
self.audio_dumper = Dumper(dump_file_path)
await self.audio_dumper.start()
@override
async def on_deinit(self, ten_env: AsyncTenEnv) -> None:
await super().on_deinit(ten_env)
if self.audio_dumper:
await self.audio_dumper.stop()
self.audio_dumper = None
@override
async def send_audio(self, audio_frame: AudioFrame) -> bool:
buf = audio_frame.get_buf()
# Dump 音频
if self.audio_dumper and buf:
await self.audio_dumper.push_bytes(bytes(buf))
# 发送音频
if self.is_connected() and self.client:
await self.client.send(bytes(buf))
return True
return False音频缓冲策略
12.1 为什么推荐保持模式
默认使用丢弃模式:连接断开时丢弃音频帧。
推荐使用保持模式:缓存音频帧,连接恢复后发送。
原因:保证时间戳准确性。
如果丢弃音频:
- 时间:0-10s(发送)→ 10-15s(断连丢弃)→ 15-20s(发送)
- ASR 只收到 15s 音频,但实际时间跨度 20s
- 时间戳偏差 5s,影响对话同步和打断检测
12.2 配置保持模式
from ten_ai_base.asr import ASRBufferConfig, ASRBufferConfigModeKeep
@override
def buffer_strategy(self) -> ASRBufferConfig:
return ASRBufferConfig(
mode=ASRBufferConfigModeKeep(byte_limit=10 * 1024 * 1024) # 10MB 缓存上限
)完善测试
13.1 单元测试覆盖
Advanced 部分的单元测试需要覆盖生产级特性。使用 Mock 避免真实 API 调用,确保测试快速、稳定、可重复。
测试用例设计
参考 azure_asr_python/tests 的实现,高级测试应覆盖:
1. 重连能力测试 (test_reconnect.py)
测试目标:验证 Extension 能够自动重连并恢复服务
用例设计:
# 模拟供应商断连场景
def test_reconnect():
# 1. Mock 供应商:前3次连接会断开,第4次成功
# 2. 验证 Extension 会自动重试
# 3. 验证重连成功后能正常工作
# 4. 检查错误上报次数(应该有3次 NON_FATAL_ERROR)验证点:
- ✅ 断连后自动触发重连
- ✅ 使用指数退避策略
- ✅ 达到重连次数上限时上报 FATAL_ERROR
- ✅ 重连成功后重置计数器
2. 非法参数测试 (test_invalid_params.py)
测试目标:验证配置错误时的错误处理
用例设计:
# 使用非法参数启动 Extension
def test_invalid_params():
# 1. 提供空的或无效的 params(如缺少 api_key)
# 2. 启动 Extension
# 3. 验证收到 FATAL_ERROR
# 4. 检查错误信息是否包含有用信息验证点:
- ✅ 配置验证失败时上报 FATAL_ERROR
- ✅ 错误消息清晰,便于排查
- ✅ Extension 不会崩溃
3. 音频 Dump 测试 (test_dump.py)
测试目标:验证音频 Dump 功能的完整性
用例设计:
# 开启 dump 后发送音频
def test_dump():
# 1. 配置 dump=True 和 dump_path
# 2. 发送 N 帧音频(每帧有特定的字节模式)
# 3. 测试结束后检查 dump 文件
# 4. 验证文件大小 = N * 帧大小
# 5. 验证每一帧的内容完全一致验证点:
- ✅ Dump 文件被创建
- ✅ 所有发送的音频都被完整 dump
- ✅ Dump 内容与发送内容完全一致(逐字节验证)
- ✅ 帧顺序正确
4. Finalize 延迟测试 (test_finalize.py)
测试目标:验证 Extension 能快速输出 final 结果
用例设计:
# 测试 finalize 响应速度
def test_finalize():
# 1. 持续发送音频
# 2. 1.5秒后发送 asr_finalize 事件
# 3. Mock 供应商在收到 finalize 后快速返回 final 结果
# 4. 验证收到 asr_finalize_end 事件
# 5. 检查 finalize_id 和 metadata 是否正确传递验证点:
- ✅ 收到
asr_finalize后触发供应商的 finalize - ✅ 快速收到 final 结果(< 300ms 典型)
- ✅ 发送
asr_finalize_end通知下游 - ✅
finalize_id和session_id正确传递
5. 结果格式测试 (test_asr_result.py)
测试目标:验证 ASR 结果的标准格式
用例设计:
# 验证输出结果的数据结构
def test_asr_result():
# 1. Mock 供应商返回识别结果
# 2. 验证 ASRResult 包含所有必需字段
# 3. 验证 interim 和 final 结果都正确
# 4. 验证 metadata 正确传递(如 session_id)验证点:
- ✅ 包含必需字段:
text,final,start_ms,duration_ms,language - ✅ 可选字段正确填充:
words,metadata - ✅
session_id从输入正确传递到输出
6. 错误上报测试 (test_vendor_error.py)
测试目标:验证错误分类和供应商信息上报
用例设计:
# 模拟供应商返回错误
def test_vendor_error():
# 1. Mock 供应商返回不同类型的错误
# 2. 验证 Extension 上报正确的错误类型
# 3. 检查是否包含 ModuleErrorVendorInfo验证点:
- ✅ 临时错误上报 NON_FATAL_ERROR
- ✅ 严重错误上报 FATAL_ERROR
- ✅ 包含供应商错误码和消息
- ✅ 错误信息便于调试
7. 性能指标测试 (test_metrics.py)
测试目标:验证性能指标正确计算和上报
用例设计:
# 测试 TTFW、TTLW 指标
def test_metrics():
# 1. 发送音频
# 2. Mock 供应商在特定时间返回首字和最后一字
# 3. 验证计算出的 TTFW 和 TTLW 指标验证点:
- ✅ TTFW(首字延迟)正确计算
- ✅ TTLW(末字延迟)正确计算
- ✅ 指标通过
metrics消息上报
运行测试
cd my_asr_extension
./tests/bin/startMock 的重要性:使用 Mock 而不是真实 API 的原因:
- 🚀 速度快:测试在几秒内完成
- 💰 零成本:不消耗 API 配额
- 🎯 可控性:可以精确模拟各种场景(断连、错误、延迟)
- 🔁 可重复:结果稳定,适合 CI/CD
参考 azure_asr_python/tests/mock.py 了解如何实现 Mock。
13.2 集成测试(Guarder)
使用真实 API Key 运行:
cd ai_agents
task asr-guarder-test EXTENSION=my_asr_extension测试内容:
- ASR 识别准确性
- Finalize 延迟
- 多语言支持
- 错误处理
- 性能指标
13.3 VS Code 调试
使用预置的 .vscode/launch.json:
- 在代码中设置断点
- 按
F5启动调试 - 选择 "Python: Test Extension"
Advanced 开发自检清单
完成以下检查项,确保生产级质量:
稳定性:
- 实现
ReconnectManager自动重连 - 指数退避策略(避免频繁重连)
- 最大重连次数限制
- 连接成功后重置重连计数器
Finalize 机制:
- 实现 VAD 触发后的快速 finalize
- 调用
send_asr_finalize_end()通知完成 - 如使用断连方式,处理好重连逻辑
- 如使用静音包,正确计算时间戳
日志规范:
- 使用
LOG_CATEGORY_KEY_POINT和LOG_CATEGORY_VENDOR分类 - 记录连接状态变化
- 记录所有错误信息
- 敏感信息脱敏(API Key 等)
错误上报:
- 正确区分 FATAL_ERROR 和 NON_FATAL_ERROR
- 包含
ModuleErrorVendorInfo供应商信息 - 配置错误上报 FATAL_ERROR
- 临时网络错误上报 NON_FATAL_ERROR
音频 Dump:
- 实现
Dumper集成 - 支持
dump和dump_path配置 - 在
on_init初始化,on_deinit清理
音频缓冲:
- 实现
buffer_strategy()返回保持模式 - 设置合理的缓存上限(如 10MB)
测试覆盖:
- 单元测试覆盖所有核心功能
- 使用 Mock 避免真实 API 调用
- Guarder 测试通过(真实 API)
- 端到端测试验证稳定性
代码质量:
- 遵循项目代码风格
- 添加必要的注释
- 提供清晰的 README
完成以上检查项后,你的 ASR Extension 达到生产级质量,可以贡献到社区并发布到商店。
Part 3: 发布与贡献
发布到 TEN Store
15.1 提交到主仓库
# Fork TEN Framework 仓库
git clone https://github.com/your-username/ten-framework.git
cd ten-framework
# 复制扩展到正确位置
cp -r /path/to/your/my_asr_extension ai_agents/agents/ten_packages/extension/
# 创建分支
git checkout -b feat/add-my-asr-extension
# 提交
git add ai_agents/agents/ten_packages/extension/my_asr_extension/
git commit -m "feat: add my_asr_extension for [供应商名称] ASR service"
git push origin feat/add-my-asr-extension15.2 创建 Pull Request
- 访问你的 fork 仓库
- 点击 "Compare & pull request"
- 填写标题和描述(功能、特性、测试情况)
- 提交等待审查
15.3 自动发布
PR 合并后:
- ✅ 自动上传到 TEN Store
- ✅ 自动处理版本号
- ✅ 全球开发者可下载使用
适配其他 ASR 服务
参考 TEN Framework 中的其他成品 ASR 扩展:
ten-framework/ai_agents/agents/ten_packages/extension/
├── azure_asr_python/ # Azure Speech Services
├── deepgram_asr_python/ # Deepgram ASR
├── google_asr_python/ # Google Cloud Speech
├── xfyun_asr_python/ # 科大讯飞
└── ... # 更多扩展所有扩展都遵循相同架构,可作为适配新服务的参考。
附录
A. ASR Interface 规范
A.1 Interface 继承
在 manifest.json 中声明:
{
"api": {
"interface": [
{
"import_uri": "../../system/ten_ai_base/api/asr-interface.json"
}
]
}
}A.2 标准属性
asr-interface.json 定义的标准属性:
dump: 布尔值,是否开启音频 dumpdump_path: 字符串,音频 dump 存储路径
A.3 扩展属性
在 api.property 中声明特有属性:
{
"api": {
"property": {
"properties": {
"params": {
"type": "object",
"properties": {
"key": { "type": "string" },
"language": { "type": "string" }
}
}
}
}
}
}A.4 输入输出数据格式
输入:
pcm_frame: PCM 音频帧asr_finalize: VAD 检测到人声结束事件
输出:
asr_result: 识别结果asr_finalize_end: Finalize 完成通知error: 错误信息metrics: 性能指标
详见 asr-interface.json 文件。
B. 基类方法参考
B.1 AsyncASRBaseExtension
必须实现:
vendor() -> strstart_connection() -> Nonestop_connection() -> Nonesend_audio(frame: AudioFrame, session_id: str | None) -> boolfinalize(session_id: str | None) -> Noneis_connected() -> boolinput_audio_sample_rate() -> int
可选实现:
input_audio_channels() -> intinput_audio_sample_width() -> intbuffer_strategy() -> ASRBufferConfigaudio_actual_send_metrics_interval() -> int
工具方法:
send_asr_result(asr_result: ASRResult)send_asr_error(error: ModuleError, vendor_info: ModuleErrorVendorInfo | None)send_asr_finalize_end()send_connect_delay_metrics(connect_delay: int)send_vendor_metrics(vendor_metrics: dict)
C. 完整示例代码
参考 deepgram_asr_python 或 azure_asr_python 扩展的完整实现:
ten-framework/ai_agents/agents/ten_packages/extension/
├── deepgram_asr_python/
│ ├── extension.py # 完整实现
│ ├── reconnect_manager.py # 重连管理器
│ ├── manifest.json
│ ├── property.json
│ └── tests/ # 完整测试用例D. 调试工具
D.1 VS Code 调试配置
.vscode/launch.json:
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Test Extension",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/tests/bin/start",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}",
"env": {
"PYTHONPATH": "${workspaceFolder}:..."
}
}
]
}D.2 调试特定测试
修改 args 参数:
{
"args": [
"tests/test_basic.py::test_asr_basic_functionality",
"-v"
]
}总结
本教程分为 Basic 和 Advanced 两部分:
Basic 部分:
- 快速创建项目
- 实现核心功能
- 通过基础测试
- 完成端到端验证
Advanced 部分:
- 自动重连机制
- 优化 finalize
- 规范化日志
- 错误上报
- 音频 dump
- 完善测试
按照自检清单逐项完成,即可开发出生产级 ASR Extension。
开发愉快! 遇到问题欢迎在 TEN Framework GitHub 提 Issue。
最后更新