TEN logo

Create an ASR Extension - Complete Guide

Build, test, and publish a complete ASR extension from scratch

Create an ASR Extension - Complete Guide

This guide covers the complete process of creating, developing, testing, and publishing an ASR Extension.

Usage Guide

  • Basic Part: Implement basic functionality to make the ASR Extension work and pass end-to-end verification.
  • Advanced Part: Implement production-grade features (auto-reconnection, finalize mechanism, standardized logging, audio dump, etc.), perfect testing, and publish to the store.

Choose the appropriate section to read based on your needs.

Prerequisites

  • Understand the development and testing process of TEN Extensions. Refer to How to Develop with Extensions.
  • Master Python asynchronous programming (asyncio, async/await).
  • Have the tman command-line tool installed and be familiar with its basic usage.
  • Have an API key from an ASR vendor ready (for testing).

Table of Contents

Part 1: Basic - Implement Basic Functionality

  1. Introduction to ASR Extension
  2. Create Project
  3. Understand Interface Specifications
  4. Implement Core Functionality
  5. Testing
  6. Basic Development Checklist

Part 2: Advanced - Production-Grade Quality

  1. Auto Reconnection Mechanism
  2. Optimize Finalize Mechanism
  3. Standardized Logging
  4. Error Reporting Specifications
  5. Audio Dump Functionality
  6. Audio Buffer Strategy
  7. Perfect Testing
  8. Advanced Development Checklist

Part 3: Publish and Contribute

  1. Publish to TEN Store
  2. Adapt Other ASR Services

Appendix


Part 1: Basic - Implement Basic Functionality

Introduction to ASR Extension

The ASR Extension is one of the standard building blocks of the TEN Framework, responsible for transcribing audio streams into text in corresponding languages in real-time.

Position in the Conversation Flow

[RTC Extension] ──audio stream──> [ASR Extension] ──text stream──> [LLM Extension]

Create Project

2.0 Preparation

If you haven't cloned the TEN Framework repository yet, please clone it first:

git clone https://github.com/TEN-framework/ten-framework.git
cd ten-framework

This tutorial assumes you are developing in the root directory of the TEN Framework repository.

2.1 Create with Template

Use the tman command-line tool to create a project from the ASR extension template:

cd ai_agents/agents/ten_packages/extension
tman create extension my_asr_extension --template default_asr_python --template-data class_name_prefix=MyAsr

Command Parameter Explanation:

  • extension my_asr_extension: Create an extension with the directory name and plugin name my_asr_extension.
  • --template default_asr_python: Use the ASR Python extension template.
  • --template-data class_name_prefix=MyAsr: Set the class name prefix for the Python Extension class to MyAsr (the generated class name will be MyAsrExtension).

2.2 Install TEN Package Dependencies

cd my_asr_extension
tman install --standalone

This command uses the dependency calculation and download capabilities of the tman tool to calculate the dependency tree based on the dependencies declared in manifest.json. Since these are development dependencies, the --standalone parameter is required. Dependencies will be installed in the .ten directory, including Python modules for development and system packages for standalone testing.

2.3 Project Structure

my_asr_extension/
├── manifest.json       # Extension metadata
├── property.json       # Default configuration
├── requirements.txt    # Python dependencies
├── extension.py        # Main implementation file
├── addon.py            # Extension entry point
├── __init__.py         # Python package initialization
├── docs/               # Documentation directory
├── .vscode/            # VS Code debug configuration
└── tests/              # Test files

File Descriptions

manifest.json and property.json are standard metadata files for TEN Extensions:

  • manifest.json: Contains the extension's name, version, description, dependencies, and schema definitions.
  • property.json: Defines the extension's default property values.

For a detailed understanding of the TEN Framework metadata system, please refer to the Metadata System Documentation.

requirements.txt is unique to Python extensions and is used to declare the extension's dependencies on third-party pip packages.

extension.py is the core source code of the extension, containing all business logic implementation.

tests/ folder is used for standalone extension testing, including unit tests and test configurations.

Understand Interface Specifications

The ASR Extension inherits from AsyncASRBaseExtension and needs to implement the following methods:

Required Methods

vendor() -> str                                          
    # Return vendor name (e.g., "deepgram", "azure")

start_connection() -> None                               
    # Establish connection with the vendor

stop_connection() -> None                                
    # Stop connection

send_audio(frame: AudioFrame, session_id: str | None) -> bool  
    # Send audio frame to the vendor
    # session_id: Audio source identifier (can be ignored for single-person scenarios)
    # Returns True if sent successfully

finalize(session_id: str | None) -> None                 
    # Called when VAD detects end of speech to trigger final results

is_connected() -> bool                                   
    # Return current connection status

input_audio_sample_rate() -> int                         
    # Return expected audio sample rate (e.g., 16000)

Base Class Automatic Handling

The base class AsyncASRBaseExtension already handles:

  • Audio frame reception and queue management
  • Performance metric calculation (TTFW, TTLW)
  • Session management and metadata passing

You only need to focus on integrating with the specific ASR vendor.

Implement Core Functionality

4.1 Configuration Management

Config Model Design

from pydantic import BaseModel
from typing import Dict, Optional

class MyAsrConfig(BaseModel):
    # Vendor parameters (pass-through design)
    params: Dict[str, Optional[str]] = {}
    
    # Audio dump functionality
    dump: bool = False
    dump_path: Optional[str] = None

Advantages of Params Pass-Through Design

params is a dictionary used to store all vendor-related parameters. These parameters will be passed directly to the vendor SDK without needing to be enumerated in the Extension code.

Why design it this way?

Flexibility: Users can configure any parameter supported by the vendor via property.json, without being limited by the Extension implementation.

Extensibility: When vendors add new parameters, the Extension code does not need modification.

Simplicity: Avoids defining a configuration field for every single parameter.

Example:

property.json
{
  "params": {
    "api_key": "your_api_key_here",
    "language": "zh-CN",
    "model": "nova-2",
    "punctuate": "true",
    "custom_param": "any_value"  // Any parameter supported by the vendor
  },
  "dump": false
}

Usage - directly pass through to the vendor:

# Read from params and pass to vendor
api_key = self.config.params.get("api_key")
language = self.config.params.get("language", "en-US")  # With default value

Note: The property.json file generated by the template is empty {}. You need to manually add configurations.

4.2 Read Configuration

@override
async def on_init(self, ten_env: AsyncTenEnv) -> None:
    await super().on_init(ten_env)
    
    # Read configuration
    config_json, _ = await ten_env.get_property_to_json("")
    self.config = MyAsrConfig.model_validate_json(config_json)
    
    ten_env.log_info(f"Config loaded: {self.config.model_dump_json()}")

4.3 Implement Basic Methods

class MyAsrExtension(AsyncASRBaseExtension):
    def __init__(self, name: str):
        super().__init__(name)
        self.config: MyAsrConfig = MyAsrConfig()
        self.client = None  # Vendor SDK client
        self.is_connected_flag = False
        
    @override
    def vendor(self) -> str:
        return "my_vendor"  # Change to your vendor name
    
    @override
    def input_audio_sample_rate(self) -> int:
        # Read from params with default value
        return int(self.config.params.get("sample_rate", "16000"))
    
    @override
    def is_connected(self) -> bool:
        return self.is_connected_flag

4.4 Implement Connection Management

Important: The following code is an example only. The actual implementation depends on your vendor.

Different vendors have very different connection methods:

  • Some provide ready-made SDKs (e.g., Azure, Deepgram).
  • Some require direct WebSocket connections.
  • Some use HTTP streaming APIs.

Please refer to your vendor's documentation and use the corresponding connection method.

Establish Connection

@override
async def start_connection(self) -> None:
    """Establish connection with the vendor"""
    try:
        await self.stop_connection()  # Stop existing connection first
        
        # 1. Initialize vendor client
        # Pass parameters from params to vendor SDK
        # Example (adjust according to actual vendor):
        self.client = VendorClient(
            api_key=self.config.params.get("api_key"),
            language=self.config.params.get("language", "en-US"),
            # ... Read other parameters directly from params and pass through
        )
        
        # 2. Register event handlers
        # Different vendors have different event mechanisms; this is just an example
        self.client.on("connected", self._on_open)
        self.client.on("result", self._on_transcript)
        self.client.on("error", self._on_error)
        
        # 3. Start connection
        await self.client.connect()
        
    except Exception as e:
        self.ten_env.log_error(f"Failed to connect: {e}")

@override
async def stop_connection(self) -> None:
    """Stop connection"""
    if self.client:
        await self.client.disconnect()
        self.client = None
        self.is_connected_flag = False

Value of Params Pass-Through: All parameters are read from the params dictionary and passed through to the vendor SDK. This allows users to flexibly configure any parameter supported by the vendor without modifying the code.

Reference Existing Implementations:

  • azure_asr_python - Uses Azure SDK
  • deepgram_asr_python - Uses Deepgram SDK

4.5 Send Audio

@override
async def send_audio(self, audio_frame: AudioFrame, session_id: str | None) -> bool:
    """Send audio data
    
    Args:
        audio_frame: Audio frame
        session_id: Session ID, used to identify different conversation turns
        
    Returns:
        bool: Returns True if sent successfully, otherwise False
    """
    if not self.is_connected() or not self.client:
        return False
        
    try:
        audio_buf = audio_frame.get_buf()
        if audio_buf:
            await self.client.send(bytes(audio_buf))
            return True
    except Exception as e:
        self.ten_env.log_error(f"Failed to send audio: {e}")
    
    return False

4.6 Handle Recognition Results

The vendor will return recognition results via callback functions. You need to convert them to the standard format and send them:

async def _on_transcript(self, result):
    """Handle recognition results"""
    # 1. Extract text
    text = result.text.strip()
    if not text:
        return
    
    # 2. Convert to standard ASR result
    asr_result = ASRResult(
        text=text,
        final=result.is_final,           # Whether it is the final result
        start_ms=result.start_time_ms,   # Start time
        duration_ms=result.duration_ms   # Duration
    )
    
    # 3. Send to downstream
    await self.send_asr_result(asr_result)

Other required event handlers:

async def _on_open(self):
    """Connection established"""
    self.is_connected_flag = True
    self.ten_env.log_info("Connection opened")

async def _on_error(self, error):
    """Handle error"""
    self.ten_env.log_error(f"Vendor error: {error}")

4.7 Implement Finalize

When VAD detects that the user has finished speaking, it calls the finalize() method to notify the vendor to return the final result as soon as possible:

@override
async def finalize(self, session_id: str | None) -> None:
    """Trigger final result"""
    if self.client:
        # Different vendors have different finalize methods:
        # 1. Call finalize API (Recommended)
        await self.client.finalize()
        
        # 2. Or send silence packets
        # await self.client.send_silence()
        
        # 3. Or disconnect and reconnect
        # await self.stop_connection()
        # await self.start_connection()
        
        # Notify completion
        await self.send_asr_finalize_end()

Different vendors have different finalize mechanisms. Choose the one that best suits your vendor. For detailed explanation, see the Advanced Part.

Basic functionality is now implemented! Next, let's test it.

Testing

5.1 Unit Testing

Unit tests are used to verify that the basic functionality of the ASR Extension works correctly.

Test Objectives

Basic functionality needs to verify at least:

  1. Config Loading: Can correctly read configurations from property.json.
  2. Connection Establishment: Can successfully connect to the vendor.
  3. Audio Processing: Can receive and send audio frames.
  4. Result Output: Can output results in the standard ASRResult format.

Test Flow Example

A typical unit test will:

  1. Prepare Test Audio: Load PCM audio data from a file.
  2. Send Frame by Frame: Send audio to the ASR Extension frame by frame.
  3. Verify Results: Check if ASRResult in the standard format is received.

ASRResult Standard Format:

ASRResult(
    text="Recognized text",           # Required: Recognized text
    final=True,                       # Required: Whether it is the final result
    start_ms=0,                       # Optional: Start time (ms)
    duration_ms=1000,                 # Optional: Duration (ms)
    language="zh-CN",                 # Optional: Language
    words=[]                          # Optional: Word-level information
)

Run Tests

cd my_asr_extension
./tests/bin/start

The template already contains basic test cases. You can add more tests in tests/test_basic.py.

Reference tests from existing implementations:

  • azure_asr_python/tests/test_asr_result.py - Test result output
  • deepgram_asr_python/tests/test_basic.py - Basic functionality tests

5.2 End-to-End Testing

Use TMan Designer in the TEN Agent project to replace the ASR extension:

cd /path/to/your/ten-agent-project
tman designer

Via the visual interface:

  1. Select the existing ASR node.
  2. Replace it with your my_asr_extension.
  3. Configure the API Key and other parameters.
  4. Start and conduct real conversation testing.

Basic Development Checklist

Complete the following checks to ensure basic functionality works:

  • Project Creation: Successfully created project using template, installed dependencies.
  • Config Management: Can correctly read config from property and properly pass through to vendor SDK.
  • Connection Establishment: start_connection can successfully connect to the vendor.
  • Audio Sending: Can send audio frames to the vendor via send_audio.
  • Result Reception: Can correctly receive recognition results from the vendor and convert to standard format.
  • Result Sending: Can send standardized results via send_asr_result.
  • Finalize: Implemented finalize method (even if simple implementation).
  • Connection Cleanup: stop_connection can correctly close the connection.
  • Unit Testing: Basic test cases pass.
  • End-to-End Testing: Can complete basic conversations in TEN Agent.

After completing the above checks, your ASR Extension has basic functionality and can be used in real scenarios.


Part 2: Advanced - Production-Grade Quality

The advanced part will help you implement a production-grade ASR Extension, covering stability, maintainability, and debuggability.

Auto Reconnection Mechanism

7.1 Why Reconnection is Needed

Network environments are complex, and ASR services may experience:

  • Temporary network jitter
  • Server-side active disconnection
  • Timeout errors

Implementing auto-reconnection ensures service stability.

7.2 Use ReconnectManager

Refer to reconnect_manager.py in existing ASR extensions (e.g., azure_asr_python).

from .reconnect_manager import ReconnectManager

class MyAsrExtension(AsyncASRBaseExtension):
    def __init__(self, name: str):
        super().__init__(name)
        # Retry up to 5 times, base delay 0.5 seconds (exponential backoff)
        self.reconnect_manager = ReconnectManager(max_attempts=5, base_delay=0.5)

7.3 Reset on Connection Success

async def _on_open(self, *args, **kwargs) -> None:
    self.is_connected_flag = True
    self.ten_env.log_info("Connection opened", category=LOG_CATEGORY_VENDOR)
    
    if self.reconnect_manager:
        self.reconnect_manager.mark_connection_successful()

7.4 Handle Disconnection and Errors

async def _on_close(self, *args, **kwargs) -> None:
    self.is_connected_flag = False
    self.ten_env.log_warn("Connection closed", category=LOG_CATEGORY_VENDOR)
    
    if self.client:  # Unexpected disconnection
        await self._handle_reconnect()

async def _on_error(self, *args, **kwargs) -> None:
    error = args[1] if len(args) > 1 else None
    self.ten_env.log_error(f"Vendor error: {error}", category=LOG_CATEGORY_VENDOR)
    
    await self.send_asr_error(
        ModuleError(module=MODULE_NAME_ASR, code=ModuleErrorCode.NON_FATAL_ERROR.value, message=str(error))
    )
    await self._handle_reconnect()

7.5 Implement Reconnection Logic

async def _handle_reconnect(self) -> None:
    if not self.reconnect_manager or not self.reconnect_manager.can_retry():
        self.ten_env.log_error("Max reconnection attempts reached", category=LOG_CATEGORY_VENDOR)
        await self.send_asr_error(
            ModuleError(module=MODULE_NAME_ASR, code=ModuleErrorCode.FATAL_ERROR.value, message="Reconnection failed")
        )
        return
    
    self.ten_env.log_info(
        f"Attempting reconnection {self.reconnect_manager.current_attempts + 1}/{self.reconnect_manager.max_attempts}",
        category=LOG_CATEGORY_VENDOR
    )
    
    success = await self.reconnect_manager.handle_reconnect(connect_func=self.start_connection)
    
    if success:
        self.ten_env.log_info("Reconnection successful", category=LOG_CATEGORY_VENDOR)
    else:
        self.ten_env.log_error("Reconnection failed", category=LOG_CATEGORY_VENDOR)

Optimize Finalize Mechanism

8.1 Role of Finalize

Trigger ASR to return the final result immediately when VAD detects the end of human speech to reduce conversation latency.

8.2 Three Implementation Methods

Different vendors support different methods:

  1. Call API (Recommended, e.g., Deepgram)
  2. Disconnect and Reconnect (Need to handle reconnection logic well)
  3. Send Silence Packets (Need to pay attention to timestamp calculation)

8.3 Notify Completion

@override
async def finalize(self, session_id: str | None) -> None:
    """Trigger final result
    
    Args:
        session_id: Session ID, used to identify different conversation turns
    """
    if self.client:
        # Record timestamp for latency calculation
        self.last_finalize_timestamp = asyncio.get_event_loop().time() * 1000
        
        # Method 1: Call API
        await self.client.finalize()
        
        # Notify completion
        await self.send_asr_finalize_end()

Standardized Logging

9.1 Log Categories

Use the category parameter to clarify logs:

  • LOG_CATEGORY_KEY_POINT: Key points (configuration, initialization)
  • LOG_CATEGORY_VENDOR: Vendor-related (connection, results, errors)
from ten_ai_base.const import LOG_CATEGORY_KEY_POINT, LOG_CATEGORY_VENDOR

self.ten_env.log_info("Config loaded", category=LOG_CATEGORY_KEY_POINT)
self.ten_env.log_error("Connection failed", category=LOG_CATEGORY_VENDOR)

9.2 Key Log Points

Logs that must be printed:

# Configuration Loading
self.ten_env.log_info(f"Config: {self.config.to_json(sensitive_handling=True)}", category=LOG_CATEGORY_KEY_POINT)

# Connection Status Changes
self.ten_env.log_info("Connection opened", category=LOG_CATEGORY_VENDOR)
self.ten_env.log_warn("Connection closed", category=LOG_CATEGORY_VENDOR)

# Error Information
self.ten_env.log_error(f"Vendor error: {error}", category=LOG_CATEGORY_VENDOR)

# Reconnection Attempts
self.ten_env.log_info(f"Reconnecting {attempt}/{max_attempts}", category=LOG_CATEGORY_VENDOR)

9.3 Sensitive Information Masking

from ten_ai_base.utils import encrypt

class MyAsrConfig(BaseModel):
    params: Dict[str, Optional[str]] = {}
    
    def to_json(self, sensitive_handling: bool = False) -> str:
        if not sensitive_handling:
            return self.model_dump_json()
        
        config = self.model_copy(deep=True)
        if config.params:
            for key in ['api_key', 'key', 'token', 'secret']:
                if key in config.params and config.params[key]:
                    config.params[key] = encrypt(config.params[key])
        return config.model_dump_json()

Error Reporting Specifications

10.1 Error Classification

Fatal Error (FATAL_ERROR):

  • Configuration parsing failure
  • Invalid API Key
  • Initial connection failure
  • Reached maximum reconnection attempts

Non-Fatal Error (NON_FATAL_ERROR):

  • Temporary network issues
  • Service temporarily unavailable
  • Audio processing errors

10.2 Include Vendor Information

from ten_ai_base.message import ModuleError, ModuleErrorCode, ModuleErrorVendorInfo

await self.send_asr_error(
    ModuleError(
        module=MODULE_NAME_ASR,
        code=ModuleErrorCode.NON_FATAL_ERROR.value,
        message=f"Vendor error: {str(error)}"
    ),
    ModuleErrorVendorInfo(
        vendor="deepgram",
        code=getattr(error, 'code', 'unknown'),
        message=str(error)
    )
)

Audio Dump Functionality

11.1 Why Dump is Needed

Save original audio when recognition issues occur for:

  • Reproducing issues
  • Analyzing audio quality
  • Comparing different vendors

11.2 Implement Dump

import os
from ten_ai_base.dumper import Dumper

DUMP_FILE_NAME = "my_asr_in.pcm"

class MyAsrExtension(AsyncASRBaseExtension):
    def __init__(self, name: str):
        super().__init__(name)
        self.audio_dumper: Optional[Dumper] = None
    
    @override
    async def on_init(self, ten_env: AsyncTenEnv) -> None:
        await super().on_init(ten_env)
        
        if self.config.dump:
            dump_file_path = os.path.join(self.config.dump_path, DUMP_FILE_NAME)
            self.audio_dumper = Dumper(dump_file_path)
            await self.audio_dumper.start()
    
    @override
    async def on_deinit(self, ten_env: AsyncTenEnv) -> None:
        await super().on_deinit(ten_env)
        if self.audio_dumper:
            await self.audio_dumper.stop()
            self.audio_dumper = None
    
    @override
    async def send_audio(self, audio_frame: AudioFrame) -> bool:
        buf = audio_frame.get_buf()
        
        # Dump audio
        if self.audio_dumper and buf:
            await self.audio_dumper.push_bytes(bytes(buf))
        
        # Send audio
        if self.is_connected() and self.client:
            await self.client.send(bytes(buf))
            return True
        
        return False

Audio Buffer Strategy

12.1 Why Recommend Keep Mode

Default uses Drop Mode: Drop audio frames when disconnected.

Recommend using Keep Mode: Cache audio frames and send them after the connection is restored.

Reason: Ensures timestamp accuracy.

If audio is dropped:

  • Time: 0-10s (Sent) → 10-15s (Disconnected & Dropped) → 15-20s (Sent)
  • ASR only receives 15s of audio, but the actual time span is 20s.
  • Timestamp deviation of 5s affects conversation synchronization and interruption detection.

12.2 Configure Keep Mode

from ten_ai_base.asr import ASRBufferConfig, ASRBufferConfigModeKeep

@override
def buffer_strategy(self) -> ASRBufferConfig:
    return ASRBufferConfig(
        mode=ASRBufferConfigModeKeep(byte_limit=10 * 1024 * 1024)  # 10MB cache limit
    )

Perfect Testing

13.1 Unit Test Coverage

Advanced unit tests need to cover production-grade features. Use Mock to avoid real API calls, ensuring tests are fast, stable, and repeatable.

Test Case Design

Referring to the implementation in azure_asr_python/tests, advanced tests should cover:

1. Reconnection Capability Test (test_reconnect.py)

Test Objective: Verify that the Extension can automatically reconnect and restore service.

Case Design:

# Simulate vendor disconnection scenario
def test_reconnect():
    # 1. Mock vendor: The first 3 connections will disconnect, the 4th succeeds
    # 2. Verify Extension automatically retries
    # 3. Verify normal operation after successful reconnection
    # 4. Check error report count (should have 3 NON_FATAL_ERROR)

Verification Points:

  • ✅ Automatically trigger reconnection after disconnection
  • ✅ Use exponential backoff strategy
  • ✅ Report FATAL_ERROR when max reconnection attempts reached
  • ✅ Reset counter after successful reconnection

2. Invalid Parameters Test (test_invalid_params.py)

Test Objective: Verify error handling when configuration is incorrect.

Case Design:

# Start Extension with invalid parameters
def test_invalid_params():
    # 1. Provide empty or invalid params (e.g., missing api_key)
    # 2. Start Extension
    # 3. Verify FATAL_ERROR received
    # 4. Check if error message contains useful information

Verification Points:

  • ✅ Report FATAL_ERROR when configuration validation fails
  • ✅ Error message is clear and easy to troubleshoot
  • ✅ Extension doesn't crash

3. Audio Dump Test (test_dump.py)

Test Objective: Verify the completeness of audio dump functionality.

Case Design:

# Send audio after enabling dump
def test_dump():
    # 1. Configure dump=True and dump_path
    # 2. Send N frames of audio (each frame has specific byte pattern)
    # 3. Check dump file after test ends
    # 4. Verify file size = N * frame size
    # 5. Verify each frame content is identical

Verification Points:

  • ✅ Dump file is created
  • ✅ All sent audio is completely dumped
  • ✅ Dump content is identical to sent content (byte-by-byte verification)
  • ✅ Frame order is correct

4. Finalize Latency Test (test_finalize.py)

Test Objective: Verify Extension can quickly output final results.

Case Design:

# Test finalize response speed
def test_finalize():
    # 1. Continuously send audio
    # 2. Send asr_finalize event after 1.5 seconds
    # 3. Mock vendor quickly returns final result after receiving finalize
    # 4. Verify asr_finalize_end event received
    # 5. Check if finalize_id and metadata are correctly passed

Verification Points:

  • ✅ Trigger vendor's finalize after receiving asr_finalize
  • ✅ Quickly receive final result (< 300ms typical)
  • ✅ Send asr_finalize_end to notify downstream
  • finalize_id and session_id correctly passed

5. Result Format Test (test_asr_result.py)

Test Objective: Verify standard format of ASR results.

Case Design:

# Verify output result data structure
def test_asr_result():
    # 1. Mock vendor returns recognition results
    # 2. Verify ASRResult contains all required fields
    # 3. Verify both interim and final results are correct
    # 4. Verify metadata is correctly passed (e.g., session_id)

Verification Points:

  • ✅ Contains required fields: text, final, start_ms, duration_ms, language
  • ✅ Optional fields correctly filled: words, metadata
  • session_id correctly passed from input to output

6. Error Reporting Test (test_vendor_error.py)

Test Objective: Verify error classification and vendor information reporting.

Case Design:

# Simulate vendor returning errors
def test_vendor_error():
    # 1. Mock vendor returns different types of errors
    # 2. Verify Extension reports correct error types
    # 3. Check if ModuleErrorVendorInfo is included

Verification Points:

  • ✅ Temporary errors report NON_FATAL_ERROR
  • ✅ Serious errors report FATAL_ERROR
  • ✅ Contains vendor error code and message
  • ✅ Error information is helpful for debugging

7. Performance Metrics Test (test_metrics.py)

Test Objective: Verify performance metrics are correctly calculated and reported.

Case Design:

# Test TTFW, TTLW metrics
def test_metrics():
    # 1. Send audio
    # 2. Mock vendor returns first and last word at specific times
    # 3. Verify calculated TTFW and TTLW metrics

Verification Points:

  • ✅ TTFW (first word latency) correctly calculated
  • ✅ TTLW (last word latency) correctly calculated
  • ✅ Metrics reported through metrics message

Run Tests

cd my_asr_extension
./tests/bin/start

Importance of Mock: Reasons for using Mock instead of real API:

  • 🚀 Fast: Tests complete in seconds
  • 💰 Zero Cost: Doesn't consume API quota
  • 🎯 Controllable: Can precisely simulate various scenarios (disconnection, errors, latency)
  • 🔁 Repeatable: Stable results, suitable for CI/CD

Refer to azure_asr_python/tests/mock.py to learn how to implement Mock.

13.2 Integration Testing (Guarder)

Run with real API Key:

cd ai_agents
task asr-guarder-test EXTENSION=my_asr_extension

Test content:

  • ASR recognition accuracy
  • Finalize latency
  • Multi-language support
  • Error handling
  • Performance metrics

13.3 VS Code Debugging

Use the preset .vscode/launch.json:

  1. Set breakpoints in the code.
  2. Press F5 to start debugging.
  3. Select "Python: Test Extension".

Advanced Development Checklist

Complete the following checks to ensure production-grade quality:

Stability:

  • Implement ReconnectManager auto-reconnection
  • Exponential backoff strategy (avoid frequent reconnection)
  • Maximum reconnection attempts limit
  • Reset reconnection counter after connection success

Finalize Mechanism:

  • Implement rapid finalize triggered by VAD
  • Call send_asr_finalize_end() to notify completion
  • Handle reconnection logic if using disconnect method
  • Correctly calculate timestamps if using silence packets

Logging Specifications:

  • Use LOG_CATEGORY_KEY_POINT and LOG_CATEGORY_VENDOR categories
  • Log connection status changes
  • Log all error information
  • Mask sensitive information (API Key, etc.)

Error Reporting:

  • Correctly distinguish FATAL_ERROR and NON_FATAL_ERROR
  • Include ModuleErrorVendorInfo vendor information
  • Report FATAL_ERROR for configuration errors
  • Report NON_FATAL_ERROR for temporary network errors

Audio Dump:

  • Implement Dumper integration
  • Support dump and dump_path configuration
  • Initialize in on_init, clean up in on_deinit

Audio Buffer:

  • Implement buffer_strategy() to return Keep Mode
  • Set reasonable cache limit (e.g., 10MB)

Test Coverage:

  • Unit tests cover all core functionality
  • Use Mock to avoid real API calls
  • Guarder tests pass (real API)
  • End-to-end tests verify stability

Code Quality:

  • Follow project code style
  • Add necessary comments
  • Provide clear README

After completing the above checks, your ASR Extension has reached production-grade quality and can be contributed to the community and published to the store.


Part 3: Publish and Contribute

Publish to TEN Store

15.1 Submit to Main Repository

# Fork TEN Framework repository
git clone https://github.com/your-username/ten-framework.git
cd ten-framework

# Copy extension to correct location
cp -r /path/to/your/my_asr_extension ai_agents/agents/ten_packages/extension/

# Create branch
git checkout -b feat/add-my-asr-extension

# Commit
git add ai_agents/agents/ten_packages/extension/my_asr_extension/
git commit -m "feat: add my_asr_extension for [Vendor Name] ASR service"
git push origin feat/add-my-asr-extension

15.2 Create Pull Request

  1. Visit your fork repository.
  2. Click "Compare & pull request".
  3. Fill in the title and description (functionality, features, test status).
  4. Submit and wait for review.

15.3 Auto Publish

After PR merge:

  • ✅ Automatically uploaded to TEN Store
  • ✅ Automatically handle version numbers
  • ✅ Available for global developers to download and use

Adapt Other ASR Services

Refer to other finished ASR extensions in the TEN Framework:

ten-framework/ai_agents/agents/ten_packages/extension/
├── azure_asr_python/          # Azure Speech Services
├── deepgram_asr_python/       # Deepgram ASR
├── google_asr_python/         # Google Cloud Speech
├── xfyun_asr_python/          # iFlytek (XFYun)
└── ...                        # More extensions

All extensions follow the same architecture and can serve as references for adapting new services.


Appendix

A. ASR Interface Specifications

A.1 Interface Inheritance

Declare in manifest.json:

{
  "api": {
    "interface": [
      {
        "import_uri": "../../system/ten_ai_base/api/asr-interface.json"
      }
    ]
  }
}

A.2 Standard Properties

Standard properties defined in asr-interface.json:

  • dump: Boolean, whether to enable audio dump.
  • dump_path: String, audio dump storage path.

A.3 Extended Properties

Declare unique properties in api.property:

{
  "api": {
    "property": {
      "properties": {
        "params": {
          "type": "object",
          "properties": {
            "key": { "type": "string" },
            "language": { "type": "string" }
          }
        }
      }
    }
  }
}

A.4 Input/Output Data Formats

Input:

  • pcm_frame: PCM audio frame
  • asr_finalize: VAD detected end of speech event

Output:

  • asr_result: Recognition result
  • asr_finalize_end: Finalize completion notification
  • error: Error information
  • metrics: Performance metrics

See asr-interface.json file for details.

B. Base Class Methods Reference

B.1 AsyncASRBaseExtension

Required Implementation:

  • vendor() -> str
  • start_connection() -> None
  • stop_connection() -> None
  • send_audio(frame: AudioFrame, session_id: str | None) -> bool
  • finalize(session_id: str | None) -> None
  • is_connected() -> bool
  • input_audio_sample_rate() -> int

Optional Implementation:

  • input_audio_channels() -> int
  • input_audio_sample_width() -> int
  • buffer_strategy() -> ASRBufferConfig
  • audio_actual_send_metrics_interval() -> int

Utility Methods:

  • send_asr_result(asr_result: ASRResult)
  • send_asr_error(error: ModuleError, vendor_info: ModuleErrorVendorInfo | None)
  • send_asr_finalize_end()
  • send_connect_delay_metrics(connect_delay: int)
  • send_vendor_metrics(vendor_metrics: dict)

C. Full Code Example

Refer to the full implementation of deepgram_asr_python or azure_asr_python extension:

ten-framework/ai_agents/agents/ten_packages/extension/
├── deepgram_asr_python/
│   ├── extension.py           # Full implementation
│   ├── reconnect_manager.py   # Reconnection manager
│   ├── manifest.json
│   ├── property.json
│   └── tests/                 # Full test cases

D. Debugging Tools

D.1 VS Code Debug Configuration

.vscode/launch.json:

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Python: Test Extension",
      "type": "python",
      "request": "launch",
      "program": "${workspaceFolder}/tests/bin/start",
      "console": "integratedTerminal",
      "cwd": "${workspaceFolder}",
      "env": {
        "PYTHONPATH": "${workspaceFolder}:..."
      }
    }
  ]
}

D.2 Debug Specific Test

Modify args parameter:

{
  "args": [
    "tests/test_basic.py::test_asr_basic_functionality",
    "-v"
  ]
}

Summary

This tutorial is divided into Basic and Advanced parts:

Basic Part:

  • Quickly create a project
  • Implement core functionality
  • Pass basic tests
  • Complete end-to-end verification

Advanced Part:

  • Auto-reconnection mechanism
  • Optimize finalize
  • Standardized logging
  • Error reporting
  • Audio dump
  • Perfect testing

By completing the checklist items one by one, you can develop a production-grade ASR Extension.

Happy hacking! If you encounter any issues, feel free to open an Issue on TEN Framework GitHub.

Edit on GitHub

Last Updated

Table of Contents

Create an ASR Extension - Complete Guide
Usage Guide
Prerequisites
Table of Contents
Part 1: Basic - Implement Basic Functionality
Part 2: Advanced - Production-Grade Quality
Part 3: Publish and Contribute
Appendix
Part 1: Basic - Implement Basic Functionality
Introduction to ASR Extension
Position in the Conversation Flow
Create Project
2.0 Preparation
2.1 Create with Template
2.2 Install TEN Package Dependencies
2.3 Project Structure
File Descriptions
Understand Interface Specifications
Required Methods
Base Class Automatic Handling
Implement Core Functionality
4.1 Configuration Management
Config Model Design
Advantages of Params Pass-Through Design
4.2 Read Configuration
4.3 Implement Basic Methods
4.4 Implement Connection Management
Establish Connection
4.5 Send Audio
4.6 Handle Recognition Results
4.7 Implement Finalize
Testing
5.1 Unit Testing
Test Objectives
Test Flow Example
Run Tests
5.2 End-to-End Testing
Basic Development Checklist
Part 2: Advanced - Production-Grade Quality
Auto Reconnection Mechanism
7.1 Why Reconnection is Needed
7.2 Use ReconnectManager
7.3 Reset on Connection Success
7.4 Handle Disconnection and Errors
7.5 Implement Reconnection Logic
Optimize Finalize Mechanism
8.1 Role of Finalize
8.2 Three Implementation Methods
8.3 Notify Completion
Standardized Logging
9.1 Log Categories
9.2 Key Log Points
9.3 Sensitive Information Masking
Error Reporting Specifications
10.1 Error Classification
10.2 Include Vendor Information
Audio Dump Functionality
11.1 Why Dump is Needed
11.2 Implement Dump
Audio Buffer Strategy
12.1 Why Recommend Keep Mode
12.2 Configure Keep Mode
Perfect Testing
13.1 Unit Test Coverage
Test Case Design
Run Tests
13.2 Integration Testing (Guarder)
13.3 VS Code Debugging
Advanced Development Checklist
Part 3: Publish and Contribute
Publish to TEN Store
15.1 Submit to Main Repository
15.2 Create Pull Request
15.3 Auto Publish
Adapt Other ASR Services
Appendix
A. ASR Interface Specifications
A.1 Interface Inheritance
A.2 Standard Properties
A.3 Extended Properties
A.4 Input/Output Data Formats
B. Base Class Methods Reference
B.1 AsyncASRBaseExtension
C. Full Code Example
D. Debugging Tools
D.1 VS Code Debug Configuration
D.2 Debug Specific Test
Summary
Create an ASR Extension - Complete Guide | TEN Framework