Mkdir700's Note

Mkdir700's Note

什么是 WebRTC 及简单实现

16
2025-03-24

WebRTC 基础概念

WebRTC (Web Real-Time Communication) 是一项开源技术,允许网页浏览器和移动应用程序通过简单的 API 实现实时语音、视频通话和点对点数据传输,无需安装任何插件或第三方软件。

WebRTC 连接建立流程

WebRTC 连接建立过程涉及多个步骤,下面是一个典型的连接建立流程:

sequenceDiagram
    participant Client1
    participant 信令服务器
    participant Client2
    
    %% 初始化阶段
    Client1->>信令服务器: 连接到信令服务器 (WebSocket)
    信令服务器-->>Client1: 连接确认
    Client2->>信令服务器: 连接到信令服务器 (WebSocket)
    信令服务器-->>Client2: 连接确认
    
    %% Offer/Answer 交换
    Client1->>Client1: 创建 RTCPeerConnection
    Client1->>Client1: 创建 DataChannel
    Client1->>Client1: 创建 Offer
    Client1->>Client1: setLocalDescription(offer)
    Client1->>信令服务器: 发送 Offer
    信令服务器->>Client2: 转发 Offer
    
    Client2->>Client2: 创建 RTCPeerConnection
    Client2->>Client2: setRemoteDescription(offer)
    Client2->>Client2: 创建 Answer
    Client2->>Client2: setLocalDescription(answer)
    Client2->>信令服务器: 发送 Answer
    信令服务器->>Client1: 转发 Answer
    
    Client1->>Client1: setRemoteDescription(answer)
    
    %% ICE 候选交换 (可能同时发生)
    par ICE 候选收集与交换
        Client1->>Client1: onicecandidate 事件触发
        Client1->>信令服务器: 发送 ICE candidate
        信令服务器->>Client2: 转发 ICE candidate
        Client2->>Client2: addIceCandidate()
    and
        Client2->>Client2: onicecandidate 事件触发
        Client2->>信令服务器: 发送 ICE candidate
        信令服务器->>Client1: 转发 ICE candidate
        Client1->>Client1: addIceCandidate()
    end
    
    %% 连接建立
    Client1->>Client1: ICE 连接状态变为 "connected"
    Client2->>Client2: ICE 连接状态变为 "connected"
    
    %% 数据通道通信
    Client2->>Client2: ondatachannel 事件触发
    Client1->>Client1: 数据通道状态变为 "open"
    Client2->>Client2: 数据通道状态变为 "open"
    
    %% 后续通信
    Client1->>Client2: 通过数据通道直接发送消息
    Client2->>Client1: 通过数据通道直接发送消息

WebRTC 核心组件

1. 信令服务器 (Signaling Server)

信令服务器是 WebRTC 连接建立过程中的中介,它不传输媒体数据,而是帮助对等方交换建立连接所需的信息。信令服务器的主要职责包括:

  • 帮助对等方发现彼此
  • 协调通信会话的建立
  • 交换网络和媒体信息
  • 传递 Offer/Answer 和 ICE 候选者

信令服务器可以使用任何通信协议实现,常见的选择包括 WebSocket、HTTP 长轮询或 Socket.IO

2. RTCPeerConnection

RTCPeerConnection 是 WebRTC 的核心 API,负责创建和管理与远程对等方的连接。它处理所有的信号处理、编解码协商、点对点通信和安全性。主要功能包括:

  • 管理完整的 ICE 工作流程
  • 发送和接收媒体流
  • 应用编解码器和其他媒体处理
  • 实现安全的数据传输(DTLS/SRTP)
  • 提供连接状态和统计信息

3. Offer 和 Answer

Offer 是 WebRTC 连接建立过程中的第一步会话描述。它是由连接的发起方创建的,包含了发起方的媒体能力、编解码器支持、传输参数等信息。
Offer 是一种 SDP(会话描述协议)格式的数据,包含了建立连接所需的配置信息,如:

  • 支持的音频/视频编解码器
  • ICE 候选者信息
  • 安全参数
  • 传输协议细节

发起方创建 Offer 后,通过信令服务器将它传递给接收方,接收方据此了解发起方的通信能力。

Answer 是接收方对 Offer 的响应,同样使用 SDP 格式,包含接收方的媒体能力和参数选择。通过 Offer/Answer 模型,双方协商出兼容的通信参数。

4. 会话描述 (Description)

Description 是对连接端点通信能力和参数的描述,使用 SDP 格式表示。在 WebRTC 中有两种描述:

  1. 本地描述(LocalDescription):表示本地端点的通信能力和参数。在代码中通过 setLocalDescription() 设置。

  2. 远程描述(RemoteDescription):表示远程端点的通信能力和参数。在代码中通过 setRemoteDescription() 设置。

描述的交换是 WebRTC 连接建立的核心步骤,它允许两个端点协商出兼容的通信参数,并最终建立直接的点对点连接。

5. 通道 (Channel)

在 WebRTC 中,有几种不同类型的通道:

  1. 数据通道(DataChannel):用于在对等连接之间传输任意数据。数据通道提供了一种在对等方之间直接传输文本或二进制数据的方式,无需经过服务器。它可以用于聊天消息、文件传输、游戏状态同步等应用。

  2. 媒体通道:用于传输音频和视频流,是 WebRTC 的核心功能之一。

通道建立后,客户端之间可以直接通信而无需继续使用信令服务器。

ICE (Interactive Connectivity Establishment)

ICE (交互式连接建立) 是 WebRTC 中用于解决网络连接问题的核心技术。它解决了在不同网络环境下如何建立点对点连接的复杂问题。

ICE 的主要作用

ICE 解决了以下关键问题:

  1. NAT 穿透:大多数设备位于 NAT(网络地址转换)后面,没有公网 IP,ICE 提供机制使这些设备能够互相发现和通信。

  2. 最佳路径选择:ICE 会尝试多种可能的连接路径,选择最优的一条用于通信。

  3. 连接恢复:当网络条件改变时,ICE 可以寻找新的连接路径来保持通信。

ICE 的组成部分

  1. ICE 候选者(Candidates)

    • 代表可能的连接端点(IP 地址和端口组合)
    • 有不同类型:主机候选者(本地地址)、反射候选者(STUN 获取的公网地址)、中继候选者(TURN 服务器地址)
    • 候选者通过 onicecandidate 事件收集并交换
  2. STUN 服务器

    • 帮助设备确定其公网 IP 地址和端口
    • 使设备了解其在 NAT 后的映射情况
  3. TURN 服务器

    • 当直接连接失败时作为通信中继
    • 提供一种后备方案,确保连接总是可以建立(代价是增加延迟)

ICE 工作流程

  1. 候选者收集

    • 收集所有可能的连接端点信息
    • 包括本地网络接口、反射地址和中继地址
  2. 候选者交换

    • 通过信令服务器将候选者信息发送给远程对等方
  3. 连接检查

    • 对每对候选者(本地+远程)进行连通性测试
    • 尝试建立 UDP 或 TCP 连接
  4. 选择最佳路径

    • 基于延迟、网络类型等因素选择最佳连接
    • 优先选择直接连接,只有在必要时才使用中继

ICE 是 WebRTC 能够在复杂网络环境中工作的关键。没有 ICE,WebRTC 连接在大多数真实网络环境中无法建立,尤其是当通信双方都位于不同的私有网络中时。

例子:基于 WebRTC 的点对点聊天

信令服务器

from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
import json
import logging
from typing import Dict

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()

# 配置 CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 存储连接的客户端
connected_clients: Dict[str, WebSocket] = {}


@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await websocket.accept()
    connected_clients[client_id] = websocket
    logger.info(f"客户端 {client_id} 已连接")

    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            logger.info(f"收到来自 {client_id} 的消息类型: {message['type']}")

            # 处理不同类型的信令消息
            if message["type"] == "offer":
                # 转发 offer 到目标客户端
                target_id = message["target"]
                if target_id in connected_clients:
                    await connected_clients[target_id].send_json(
                        {"type": "offer", "offer": message["offer"], "from": client_id}
                    )
                    logger.info(f"转发 offer 从 {client_id} 到 {target_id}")

            elif message["type"] == "answer":
                # 转发 answer 到目标客户端
                target_id = message["target"]
                if target_id in connected_clients:
                    await connected_clients[target_id].send_json(
                        {
                            "type": "answer",
                            "answer": message["answer"],
                            "from": client_id,
                        }
                    )
                    logger.info(f"转发 answer 从 {client_id} 到 {target_id}")

            elif message["type"] == "ice-candidate":
                # 转发 ICE candidate 到目标客户端
                target_id = message["target"]
                if target_id in connected_clients:
                    await connected_clients[target_id].send_json(
                        {
                            "type": "ice-candidate",
                            "candidate": message["candidate"],
                            "from": client_id,
                        }
                    )
                    logger.info(f"转发 ICE candidate 从 {client_id} 到 {target_id}")

    except Exception as e:
        logger.error(f"错误: {e}")
    finally:
        # 清理断开的连接
        if client_id in connected_clients:
            del connected_clients[client_id]
            logger.info(f"客户端 {client_id} 已断开连接")


@app.get("/")
async def root():
    return {"message": "WebRTC 信令服务器正在运行"}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

客户端

import argparse
import asyncio
import json
import logging

import websockets
from aiortc import RTCIceCandidate, RTCPeerConnection, RTCSessionDescription

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
host = "1.15.174.7"
port = 21115


class WebRTCClient:
    def __init__(self, client_id):
        self.client_id = client_id
        self.peers = {}  # 保存与其他客户端的连接
        self.ws = None

    async def connect_to_signaling_server(self, server_url):
        """连接到信令服务器"""
        logger.info(f"{self.client_id} 正在连接到信令服务器...")
        self.ws = await websockets.connect(f"{server_url}/ws/{self.client_id}")
        logger.info(f"{self.client_id} 已连接到信令服务器")
        return self.ws

    async def connect_to_peer(self, target_id):
        """连接到指定的对等方"""
        if target_id in self.peers:
            logger.info(f"已经与 {target_id} 建立连接")
            return self.peers[target_id]["channel"]

        logger.info(f"正在连接到 {target_id}...")
        pc = RTCPeerConnection()
        self.peers[target_id] = {"pc": pc, "channel": None}

        # 设置 ICE 事件处理
        @pc.on("icecandidate")
        def on_icecandidate(candidate):
            if candidate:
                asyncio.create_task(self.send_ice_candidate(candidate, target_id))

        @pc.on("connectionstatechange")
        def on_connectionstatechange():
            logger.info(f"与 {target_id} 的连接状态: {pc.connectionState}")
            if pc.connectionState == "failed":
                logger.error(f"与 {target_id} 的连接失败")

        # 设置数据通道处理
        @pc.on("datachannel")
        def on_datachannel(channel):
            logger.info(f"收到来自 {target_id} 的数据通道")
            self.peers[target_id]["channel"] = channel

            @channel.on("open")
            def on_open():
                logger.info(f"与 {target_id} 的数据通道已打开")

            @channel.on("message")
            def on_message(message):
                print(f"\n[来自 {target_id} 的消息] {message}")
                print(f"请输入发送给谁的消息 (格式: 目标ID:消息): ", end="", flush=True)

        # 创建数据通道
        channel = pc.createDataChannel("chat")
        self.peers[target_id]["channel"] = channel

        @channel.on("open")
        def on_open():
            logger.info(f"与 {target_id} a的数据通道已打开")

        @channel.on("message")
        def on_message(message):
            print(f"\n[来自 {target_id} 的消息] {message}")
            print(f"请输入发送给谁的消息 (格式: 目标ID:消息): ", end="", flush=True)

        # 创建并发送 offer
        offer = await pc.createOffer()
        await pc.setLocalDescription(offer)

        await self.ws.send(
            json.dumps(
                {
                    "type": "offer",
                    "offer": pc.localDescription.sdp,
                    "target": target_id,
                }
            )
        )
        logger.info(f"已向 {target_id} 发送连接请求")
        return channel

    async def send_ice_candidate(self, candidate, target_id):
        """发送 ICE candidate 到指定的对等方"""
        if self.ws:
            await self.ws.send(
                json.dumps(
                    {
                        "type": "ice-candidate",
                        "candidate": {
                            "component": candidate.component,
                            "foundation": candidate.foundation,
                            "ip": candidate.ip,
                            "port": candidate.port,
                            "priority": candidate.priority,
                            "protocol": candidate.protocol,
                            "type": candidate.type,
                            "sdpMid": candidate.sdpMid,
                            "sdpMLineIndex": candidate.sdpMLineIndex,
                        },
                        "target": target_id,
                    }
                )
            )

    async def send_message(self, target_id, message):
        """向指定的对等方发送消息"""
        if target_id not in self.peers or not self.peers[target_id]["channel"]:
            channel = await self.connect_to_peer(target_id)
            if not channel:
                logger.error(f"无法连接到 {target_id}")
                return False
        else:
            channel = self.peers[target_id]["channel"]

        if channel.readyState != "open":
            logger.warning(f"与 {target_id} 的通道未打开,正在等待...")
            await asyncio.sleep(1)
            if channel.readyState != "open":
                logger.error(f"与 {target_id} 的通道无法打开")
                return False

        channel.send(message)
        logger.info(f"已向 {target_id} 发送消息: {message}")
        return True

    async def handle_signaling_messages(self):
        """处理从信令服务器接收的消息"""
        while True:
            message = await self.ws.recv()
            data = json.loads(message)
            message_type = data["type"]
            from_id = data.get("from")

            logger.info(f"收到来自 {from_id} 的 {message_type} 消息")

            if message_type == "offer":
                # 收到 offer,建立连接
                if from_id not in self.peers:
                    pc = RTCPeerConnection()
                    self.peers[from_id] = {"pc": pc, "channel": None}

                    # 设置 ICE 事件处理
                    @pc.on("icecandidate")
                    def on_icecandidate(candidate):
                        if candidate:
                            asyncio.create_task(
                                self.send_ice_candidate(candidate, from_id)
                            )

                    @pc.on("connectionstatechange")
                    def on_connectionstatechange():
                        logger.info(f"与 {from_id} 的连接状态: {pc.connectionState}")

                    @pc.on("datachannel")
                    def on_datachannel(channel):
                        logger.info(f"收到来自 {from_id} 的数据通道")
                        self.peers[from_id]["channel"] = channel

                        @channel.on("open")
                        def on_open():
                            logger.info(f"与 {from_id} 的数据通道已打开")

                        @channel.on("message")
                        def on_message(message):
                            print(f"\n[来自 {from_id} 的消息] {message}")
                            print(
                                f"请输入发送给谁的消息 (格式: 目标ID:消息): ",
                                end="",
                                flush=True,
                            )

                pc = self.peers[from_id]["pc"]

                # 设置远程描述
                await pc.setRemoteDescription(
                    RTCSessionDescription(sdp=data["offer"], type="offer")
                )

                # 创建 answer
                answer = await pc.createAnswer()
                await pc.setLocalDescription(answer)

                # 发送 answer
                await self.ws.send(
                    json.dumps(
                        {
                            "type": "answer",
                            "answer": pc.localDescription.sdp,
                            "target": from_id,
                        }
                    )
                )
                logger.info(f"已向 {from_id} 发送 answer")

            elif message_type == "answer":
                # 收到 answer,完成连接
                pc = self.peers[from_id]["pc"]
                await pc.setRemoteDescription(
                    RTCSessionDescription(sdp=data["answer"], type="answer")
                )
                logger.info(f"已设置来自 {from_id} 的远程描述")

            elif message_type == "ice-candidate":
                # 收到 ICE candidate
                pc = self.peers[from_id]["pc"]
                candidate_data = data["candidate"]
                await pc.addIceCandidate(
                    RTCIceCandidate(
                        component=candidate_data["component"],
                        foundation=candidate_data["foundation"],
                        ip=candidate_data["ip"],
                        port=candidate_data["port"],
                        priority=candidate_data["priority"],
                        protocol=candidate_data["protocol"],
                        type=candidate_data["type"],
                        sdpMid=candidate_data["sdpMid"],
                        sdpMLineIndex=candidate_data["sdpMLineIndex"],
                    )
                )
                logger.info(f"已添加来自 {from_id} 的 ICE candidate")

    async def close_all_connections(self):
        """关闭所有连接"""
        for peer_id, peer_data in self.peers.items():
            if peer_data["channel"]:
                peer_data["channel"].close()
            await peer_data["pc"].close()
            logger.info(f"已关闭与 {peer_id} 的连接")

        if self.ws:
            await self.ws.close()
            logger.info("已断开与信令服务器的连接")


async def interactive_client():
    parser = argparse.ArgumentParser(description="交互式 WebRTC 客户端")
    parser.add_argument("--id", type=str, help="客户端 ID")
    parser.add_argument(
        "--server", type=str, default=f"ws://{host}:{port}", help="信令服务器 URL"
    )
    args = parser.parse_args()

    client_id = args.id
    if not client_id:
        client_id = input("请输入你的客户端 ID: ")

    client = WebRTCClient(client_id)
    await client.connect_to_signaling_server(args.server)

    # 启动处理信令消息的任务
    signaling_task = asyncio.create_task(client.handle_signaling_messages())

    try:
        print(f"你的客户端 ID 是: {client_id}")
        print("你可以使用 '目标ID:消息' 的格式发送消息")
        print("例如: client2:你好!")
        print("输入 'exit' 退出")

        while True:
            message_input = await asyncio.get_event_loop().run_in_executor(
                None, input, "请输入发送给谁的消息 (格式: 目标ID:消息): "
            )

            if message_input.lower() == "exit":
                break

            try:
                target_id, message = message_input.split(":", 1)
                target_id = target_id.strip()
                message = message.strip()

                if not message:
                    print("消息不能为空")
                    continue

                await client.send_message(target_id, message)
            except ValueError:
                print("格式错误,请使用 '目标ID:消息' 的格式")

    except KeyboardInterrupt:
        print("\n正在退出...")
    except Exception as e:
        logger.error(f"发生错误: {e}")
    finally:
        # 取消信令任务
        signaling_task.cancel()
        try:
            await signaling_task
        except asyncio.CancelledError:
            pass

        # 关闭所有连接
        await client.close_all_connections()


if __name__ == "__main__":
    asyncio.run(interactive_client())