Example

Build a Real-Time Transcription with TEN

Walk through the TEN transcription example that streams speech, cleans it with an LLM, and renders polished text in the browser.

EC
Elliot ChenSeptember 22, 2025

Build a Real-Time Transcription with TEN

Need a quick way to turn live speech into readable transcripts? The TEN transcription example pairs Agora RTC, a speech-to-text add-on, and a lightweight LLM polish pass to deliver clean captions in a single flow. This post breaks down the moving parts and shows how to run—and extend—the sample yourself.

Why TEN? The runtime abstracts graph orchestration, so you can swap providers or add post-processing without rewriting your whole stack.

Architecture at a Glance

  1. Agora ingress captures microphone audio and pushes PCM frames and stream-message data into the graph.
  2. Stream ID adapter normalises Agora payload IDs so downstream nodes stay in sync with the UI.
  3. Speech-to-Text (STT) converts audio frames into partial and final transcripts.
  4. Main control fans out ASR events, queues LLM prompts for optional clean-up, and forwards results to the collector.
  5. Message collector batches payloads and emits them over Agora's data channel.
  6. Next.js UI starts the graph, joins the channel, and renders raw vs polished text in real time.

Alongside the runtime graph you also get:

  • ten_packages/extension/main_python: the orchestration extension that implements the transcription flow.
  • web/: a minimal Next.js client that starts the graph, joins Agora, and displays transcripts.
  • manifest.json: the app manifest that pins runtime and extension versions.

Graph Wiring

The graph is declared in ai_agents/agents/examples/transcription/property.json and wires the whole loop together. A trimmed view highlights the core nodes:

ai_agents/agents/examples/transcription/property.json
{
  "nodes": [
    {
      "name": "agora_rtc",
      "addon": "agora_rtc",
      "property": {
        "app_id": "${env:AGORA_APP_ID}",
        "channel": "ten_transcription",
        "publish_data": true
      }
    },
    {
      "name": "streamid_adapter",
      "addon": "streamid_adapter"
    },
    {
      "name": "stt",
      "addon": "deepgram_asr_python",
      "property": {
        "params": {
          "api_key": "${env:DEEPGRAM_API_KEY}",
          "language": "en-US"
        }
      }
    },
    {
      "name": "main_control",
      "addon": "main_python",
      "property": {
        "enable_llm_correction": true
      }
    },
    {
      "name": "llm",
      "addon": "openai_llm2_python"
    },
    {
      "name": "message_collector",
      "addon": "message_collector2"
    }
  ]
}

Connections bind each stage: Agora PCM frames flow through the streamid_adapter into the STT node; the resulting asr_result data stream triggers main_control; and the message collector publishes concatenated transcripts back into Agora so the UI can subscribe without extra transports.

Inside main_control

The Python extension (ten_packages/extension/main_python/extension.py) is where ASR and LLM events are orchestrated. It registers handlers for each agent event, forwards partial transcripts immediately, and optionally calls the LLM when an utterance is final:

ten_packages/extension/main_python/extension.py
@agent_event_handler(ASRResultEvent)
async def _on_asr_result(self, event: ASRResultEvent):
    self.session_id = event.metadata.get("session_id", "100")
    stream_id = int(self.session_id)
    if not event.text:
        return
 
    await self._send_transcript("user", event.text, event.final, stream_id)
 
    if event.final and self.config.enable_llm_correction:
        self.turn_id += 1
        prompt = self.config.correction_prompt.replace("{text}", event.text)
        await self.agent.flush_llm()
        await self.agent.queue_llm_input(prompt)

When the LLM streams back, _send_transcript republishes the polished text as an assistant role. Setting enable_llm_correction to false in the graph properties skips the clean-up pass and surfaces raw ASR output only.

Event Queues and Backpressure

main_control delegates most orchestration to the lightweight Agent (agent/agent.py). Two asyncio queues guarantee ordered delivery and isolate ASR vs LLM work:

ten_packages/extension/main_python/agent/agent.py
self._asr_queue: asyncio.Queue[ASRResultEvent] = asyncio.Queue()
self._llm_queue: asyncio.Queue[LLMResponseEvent] = asyncio.Queue()
 
async def _consume_asr(self):
    while not self.stopped:
        event = await self._asr_queue.get()
        await self._dispatch(event)
 
async def _consume_llm(self):
    while not self.stopped:
        event = await self._llm_queue.get()
        self._llm_active_task = asyncio.create_task(self._dispatch(event))
        try:
            await self._llm_active_task
        except asyncio.CancelledError:
            self.ten_env.log_info("[Agent] Active LLM task cancelled")

Because LLM responses are processed in their own queue, the extension can cancel or flush in-flight prompts whenever a new final transcript arrives. Tool registration events (tool_register) also pass through the same dispatcher, so the graph can expose extra functions without touching the core logic.

LLM Correction Loop

agent/llm_exec.py wraps streaming chat completions and handles reasoning traces. Each transcript chunk is queued as a user message, and the class manages cancellation plus context updates:

ten_packages/extension/main_python/agent/llm_exec.py
async def _send_to_llm(self, ten_env, new_message):
    messages = self.contexts.copy()
    messages.append(new_message)
    request_id = str(uuid.uuid4())
    llm_input = LLMRequest(
        request_id=request_id,
        messages=messages,
        streaming=True,
        tools=self.available_tools,
    )
    async for cmd_result, _ in _send_cmd_ex(
        ten_env,
        "chat_completion",
        "llm",
        llm_input.model_dump(),
    ):
        ...

Reasoning deltas stream back as a separate channel, letting the UI present chain-of-thought data or hide it depending on the audience. If your model already returns clean text, disable correction and the loop simply passes through raw ASR events.

Message Flow to the Browser

message_collector2 emits segmented payloads using Agora stream-message. The Next.js client (web/src/app/page.tsx) reconstructs them, decodes the base64 payload, and pushes messages into local state:

web/src/app/page.tsx
const handleStreamMessage = useCallback((data: any) => {
  const ascii = String.fromCharCode(...new Uint8Array(data))
  const [message_id, partIndexStr, totalPartsStr, content] = ascii.split('|')
  ...
  const payload = JSON.parse(base64ToUtf8(msg))
  const { text, is_final, role } = payload
  appendItem({ id: message_id, text, isFinal: !!is_final, role: role || 'assistant', ts: payload.text_ts })
}, [])

Because everything flows through Agora, you can ship both audio and transcript data across the same connection—ideal for browsers that can’t open extra WebSocket ports.

Running the Example

  1. Configure credentials in the TEN repo root .env:

    • AGORA_APP_ID, AGORA_APP_CERTIFICATE (if tokens required)
    • DEEPGRAM_API_KEY (or swap in another STT addon + credentials)
    • OPENAI_API_KEY plus optional OPENAI_MODEL, OPENAI_PROXY_URL
  2. Select the agent:

    cd ai_agents
    task use AGENT=transcription
  3. Start the runtime and leave it running:

    task run
  4. Launch the UI:

    cd agents/examples/transcription/web
    cp .env.example .env  # set AGENT_SERVER_URL
    pnpm install
    pnpm dev
  5. Visit http://localhost:3000, click Start, allow microphone access, and watch transcripts stream in.

When the graph spins up it stays idle until the Agora client joins, so you’re only billed for STT/LLM usage while someone is actively publishing audio.

Where to Take It Next

  • Swap providers: point the STT node at Azure, Deepgram multilingual, or a self-hosted model—the graph contract stays intact.
  • Add summarisation: queue an extra LLM node that occasionally summarises the conversation and stream it back as a different message type.
  • Persist transcripts: mirror the collector payloads into a database or send them to a webhook alongside Agora stream metadata.
  • Expose tools: register custom LLM tools (via tool_register) so corrections can call domain-specific APIs before presenting text.

The transcription example is intentionally lean, but every piece is production ready: the runtime graph keeps latency predictable, the Python control extension deals with post-processing, and the browser client demonstrates a simple, vendor-agnostic UI surface. Use it as a template for live captioning, meeting notes, or any workflow where polished text needs to trail the spoken word by only a heartbeat.