自定义Agent通过代码自定义

Python - Realtime V2V Main

Realtime V2V / MLLM Agent (extension.py)

The file extension.py is the entry point of the realtime voice-to-voice / multimodal-LLM app. It consumes Server→Client (S2C) events (transcripts, tool calls, interrupts, user join/leave), normalizes them into typed events, and routes them to the agent. It also provides Client→Server (C2S) primitives to set context, send messages, return tool results, and trigger responses.

Quick File Layout

.
├── extension.py      → Main extension: routing + context + trigger + interrupt
├── config.py         → Runtime config (e.g., greeting)
├── helper.py         → Cmd/Data send helpers
└── agent/
    ├── agent.py      → Event queue, function call handling
    └── events.py     → Typed events (UserJoined, Transcripts, FunctionCall, Interrupt, etc.)

Audio data is not handled here. RTC audio is sent directly to MLLM via TEN graph connections. extension.py only reacts to the S2C transcript events generated by the MLLM server.

Architecture Overview

Architecture

Event Routing

Unlike decorator-based handlers, this extension uses a single on_data method with a match/case to dispatch events.

async def on_data(self, ten_env: AsyncTenEnv, data: Data):
    event = parse_event(data)  # converted to a typed AgentEvent

    match event:
        case UserJoinedEvent():
            self._rtc_user_count += 1
            await self._greeting_if_ready()

        case UserLeftEvent():
            self._rtc_user_count -= 1

        case ToolRegisterEvent():
            await self.agent.register_tool(event.tool, event.source)

        case FunctionCallEvent():
            await self.agent.call_tool(event.call_id, event.function_name, event.arguments)

        case InputTranscriptEvent():
            self.current_metadata = {"session_id": event.metadata.get("session_id", "100")}
            self.session_ready = True
            await self._greeting_if_ready()

        case OutputTranscriptEvent():
            await self._send_transcript("assistant", event.text, event.is_final, event.stream_id)

        case ServerInterruptEvent():
            await self._interrupt()

        case _:
            self.ten_env.log_warn(f"[MainControlExtension] Unhandled event: {event}")

S2C events handled: UserJoinedEvent / UserLeftEvent track users. InputTranscriptEvent captures user speech text from RTC audio. OutputTranscriptEvent sends assistant response. ToolRegisterEvent registers new tools. FunctionCallEvent handles LLM tool requests. ServerInterruptEvent stops ongoing output.

C2S Primitives (Sending to MLLM)

extension.py provides simple methods for sending instructions to the MLLM server:

Set context

async def _set_context_messages(self, messages: list[MLLMClientMessageItem]):
    await _send_data(
        self.ten_env,
        DATA_MLLM_IN_SET_MESSAGE_CONTEXT,
        "v2v",
        MLLMClientSetMessageContext(messages=messages).model_dump(),
    )

Send a message

async def _send_message_item(self, message: MLLMClientMessageItem):
    await _send_data(
        self.ten_env,
        DATA_MLLM_IN_SEND_MESSAGE_ITEM,
        "v2v",
        MLLMClientSendMessageItem(item=message).model_dump(),
    )

Trigger a response

async def _send_create_response(self):
    await _send_data(
        self.ten_env,
        DATA_MLLM_IN_CREATE_RESPONSE,
        "v2v",
        MLLMClientCreateResponse().model_dump(),
    )

Send function output

(Sent from agent.py after handling a FunctionCallEvent)

await _send_data(
    self.ten_env,
    DATA_MLLM_IN_FUNCTION_CALL_OUTPUT,
    "v2v",
    MLLMClientFunctionCallOutput(
        output=result,
        call_id=call_id,
    ).model_dump(),
)

Common Implementation Patterns

Greeting Recipe

The greeting is handled in _greeting_if_ready() and triggered when the first user joins and session is ready:

async def _greeting_if_ready(self):
    if self._rtc_user_count == 1 and self.config.greeting and self.session_ready:
        await self._send_message_item(
            MLLMClientMessageItem(
                role="user",
                content=f"say {self.config.greeting} to me",
            )
        )
        await self._send_create_response()

This ensures the assistant greets the user automatically.

Function Call Handling

S2C: FunctionCallEvent means the model requests a tool. Agent: executes the tool logic. C2S: return result with DATA_MLLM_IN_FUNCTION_CALL_OUTPUT and the same call_id. The model may continue its response afterwards.

Interruption

async def _interrupt(self):
    await _send_cmd(self.ten_env, "flush", "agora_rtc")

Triggered on ServerInterruptEvent. Stops RTC playback/streaming.

Supporting Files

agent.py queues events, executes tools, sends function outputs. events.py defines typed events: UserJoinedEvent, UserLeftEvent, InputTranscriptEvent, OutputTranscriptEvent, ToolRegisterEvent, FunctionCallEvent, ServerInterruptEvent. helper.py provides wrappers for _send_cmd, _send_data. config.py holds config like greeting. Graph (property.json) wires RTC audio → MLLM.

Events/API Summary

DirectionEvent / ChannelPurpose
S2CUserJoinedEventTrack users; trigger greeting
S2CUserLeftEventTrack users leaving
S2CInputTranscriptEventUser speech text from RTC audio
S2COutputTranscriptEventAssistant response text
S2CToolRegisterEventRegister new tool
S2CFunctionCallEventModel requests tool
S2CServerInterruptEventStop ongoing output
C2S_set_context_messages([...])Provide system/dev/user context
C2S_send_message_item(...)Send user/developer message
C2S_send_create_response()Trigger assistant response
C2SFunction output (from agent)Return tool results

For detailed event documentation and parameters, see the API Reference.

Python - Realtime V2V Main | TEN Framework