Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FatJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 使用Systemctl启动项目
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
    • 已过时
    • 胖包与瘦包的打包与部署
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
    • 开启虚拟线程(Virtual Thread)
    • 框架级错误通知
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件上传
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • Controller拦截器
    • 请求拦截器
    • LoggingInterceptor
    • 全局异常处理器
    • 异步处理
    • 动态 返回 CSS 实现
    • 返回图片
    • 跨域
    • 添加 Controller
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • handler入门
    • 返回 multipart
    • 待定
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 常用工具类
    • HTTP Basic 认证
    • Http响应加密
    • 使用零拷贝发送大文件
    • 分片上传
    • 接口访问统计
    • 接口请求和响应数据记录
    • WebJars
    • JProtobuf
    • 测速
    • Gzip Bomb:使用压缩炸弹防御恶意爬虫
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板 (SqlTemplates)
    • 数据源配置与使用
    • ActiveRecord
    • Db 工具类
    • 批量操作
    • Model
    • Model生成器
    • 注解
    • 异常处理
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • 预留
    • 预留
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
    • TQL(Table SQL)前端输入规范
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 自定义注解拦截器
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • FixedTokenInterceptor
    • TokenManager
    • 数据表
    • 匿名登录
    • 注册和登录
    • 个人中心
    • 重置密码
    • Google 登录
    • 短信登录
    • 移动端微信登录
    • 移动端重置密码
    • 微信登录
    • 移动端微信登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • Tio-Boot 整合 Java-DB 与 Enjoy 模板引擎示例
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • 独立端口启动 TCP 服务器
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • SSL
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • HttpUtils
    • Notification
    • Email
    • JSON
    • File
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • EnvUtils 配置工具
    • 系统监控
    • 线程
    • 虚拟线程
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
    • TCP数据转发
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 与 magic-script 集成指南
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 33_文件存储

    • 文件上传数据表
    • 本地存储
    • 存储文件到 亚马逊 S3
    • 存储文件到 腾讯 COS
    • 存储文件到 阿里云 OSS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • tio-boot 整合 spring-boot-starter
    • Tio Boot 整合 Spring Boot Starter db
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_quarkus

    • Quarkus(无 HTTP)整合 tio-boot(有 HTTP)
    • tio-boot + Quarkus + Hibernate ORM Panache
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • 索引
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 全文检索
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 42_mysql

    • 使用 Docker 运行 MySQL
    • 常见问题
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • 待定
  • 49_jooq

    • 使用配置类方式整合 jOOQ
    • tio-boot + jOOQ 事务管理
    • 批量操作与性能优化
    • 代码生成(可选)与类型安全升级
    • JSONB、Upsert、窗口函数实战
    • 整合agroal
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • 待定
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
    • 获取视频长度
    • 保存视频的最后一帧
    • 添加水印
    • linux版本
  • 55_cv

    • 使用 Java 运行 YOLOv8 ONNX 模型进行目标检测
    • tio-boot整合yolo
    • ONNX Runtime 推理说明
  • 58_telegram4j

    • 数据库设计
    • 基于 HTTP 协议开发 Telegram 翻译机器人
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 59_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • 流式生成
    • 图片多模态输入
    • 协议自动转换 Google Gemini示例
    • 请求记录
    • 限流和错误处理
    • 整合Gemini realtime模型
    • Voice Agent 前端接入接口文档
    • 整合千问realtime模型
    • 增强检索(RAG)
    • 搜索+AI
    • AI 问答
    • 连接代码执行器
  • 61_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • Perplexity API
    • 意图识别
    • 智能问答
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 使用 OpenAI ASR 实现语音识别接口(Java 后端示例)
    • 定向搜索
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 65_ai-coding

    • Cline 提示词
    • Cline 提示词-中文版本
  • 66_java-uni-ai-server

    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 待定
  • 67_java-llm-proxy

    • 使用tio-boot搭建多模型LLM代理服务
  • 68_java-kit-server

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • 执行 Python (Manim) 代码
    • 待定
    • 待定
    • 待定
    • 视频下载增加水印说明文档
  • 69_ai-brower

    • AI Browser:基于用户指令的浏览器自动化系统
    • 提示词
    • dom构建- buildDomTree.js
    • dom构建- 将网页可点击元素提取与可视化
    • 提取网内容
    • 启动浏览器
    • 操作浏览器指令
  • 70_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 多图片管理
    • 单图片管理(只读模式)
    • 布尔值管理
    • 字段联动
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
  • 73_tio-mail-wing

    • tio-mail-wing简介
    • 任务1:实现POP3系统
    • 使用 getmail 验证 tio-mail-wing POP3 服务
    • 任务2:实现 SMTP 服务
    • 数据库初始化文档
    • 用户管理
    • 邮件管理
    • 任务3:实现 SMTP 服务 数据库版本
    • 任务4:实现 POP3 服务(数据库版本)
    • IMAP 协议
    • 拉取多封邮件
    • 任务5:实现 IMAP 服务(数据库版本)
    • IMAP实现讲解
    • IMAP 手动测试脚本
    • IMAP 认证机制
    • 主动推送
  • 74_tio-mcp-server

    • 实现 MCP Server 开发指南
  • 75_tio-sip

    • SIP Server 第一版原理说明
    • SIP Server 第一版实战
    • 使用livekit-sip进行测试
    • SIP Server 第二版实战
  • 76_manim

    • Teach me anything - 基于大语言的知识点讲解视频生成系统
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • TTS服务端
    • 废弃
    • 废弃
    • 废弃
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • ManimGL(manimgl)
    • Manim 实战入门:用代码创造数学动画
    • 欢迎
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
    • HTTP/1.1 Pipelining 性能测试报告
    • tio-boot vs Quarkus 性能对比测试报告
  • 81_tio-boot

    • 简介
    • Swagger 整合到 Tio-Boot 中的指南
    • 待定
    • 待定
    • 高性能网络编程中的 ByteBuffer 分配与回收策略
    • TioBootServerHandler 源码解析
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

SIP Server 第三版实战

可以这样描述第三版本。

背景

在前两个版本的基础上,SIP Server 已经逐步从“可跑通的最小 demo”演进为“具备基本协议能力的语音接入服务”。

第一版主要解决的是链路验证问题,完成了:

  • SIP TCP/UDP 5060 接入
  • INVITE 建链
  • 动态 RTP 端口分配
  • 200 OK + SDP 返回
  • RTP 原包回声验证

这一版的意义是证明:基于 tio-core 自研 SIP/RTP 接入层是可行的,通话链路可以建立,媒体可以打通。

第二版则进一步把协议层和会话层补扎实,重点完成了:

  • SIP TCP 流式解码与报文解析能力增强
  • SIP message parser / encoder 独立封装
  • CallSession 生命周期管理
  • ACK 超时回收
  • SDP offer/answer 协商
  • 从固定 PCMU 回包升级为按协商结果返回合法 answer
  • 能根据远端 SDP 选择 PCMU/PCMA

到了这个阶段,系统已经不再只是“能拨通”的 demo,而是具备了一个 SIP Server 的基本骨架。

但第二版的 RTP 处理虽然已经有会话信息、codec 选择和 SDP 协商基础,媒体侧本质上仍然偏“简单验证链路”:

  • RTP 处理仍然不是完整的媒体管线
  • 还没有真正按 codec 解码成音频帧后处理
  • 也没有为后续 AI 语音处理预留统一的音频处理接口

这就引出了第三版的核心问题:

如果后面要接 ASR、TTS、LLM、Gemini 之类的实时语音能力,媒体层不能再停留在“原包回显”,而必须升级成“音频帧处理架构”。

所以第三版的背景,本质上是: 在 SIP 和 SDP 基础已经基本稳定之后,开始把 RTP 层从“网络包级处理”升级为“媒体帧级处理”,为后续真实语音 AI 场景做准备。


第三版本实现目标

第三版本的目标,是把 RTP 从“原包 echo”升级成“音频帧 echo”,建立一条真正可扩展的媒体处理链。

可以概括为四个重点方向。

1. 建立 RTP 包级解析与重组能力

第三版首先要补齐 RTP 协议层的最小媒体能力,不再把 UDP 数据包当成黑盒直接原样发回,而是要做到:

  • 解析 RTP header
  • 识别 version、payload type、sequence number、timestamp、ssrc
  • 提取真实 audio payload
  • 在处理完成后重新生成新的 RTP 包并发送

目标是让系统具备真正意义上的 RTP 收发能力,而不是简单的 UDP 回包能力。


2. 建立 codec 解码与编码能力

由于 SIP/SDP 协商出来的 codec 可能是 PCMU 或 PCMA,第三版需要真正把媒体负载从 G.711 转成统一音频格式,再在输出前编码回去。

因此第三版要实现:

  • PCMU -> PCM16
  • PCMA -> PCM16
  • PCM16 -> PCMU
  • PCM16 -> PCMA

目标是把网络层的音频 payload 变成可处理的 PCM 音频样本,为后续音频算法和 AI 模型接入铺平道路。


3. 建立统一的音频帧处理模型

第三版最关键的设计变化,是引入统一的音频帧对象和处理接口,例如:

  • AudioFrame
  • MediaProcessor

这样 RTP handler 不再直接做“收包就原样回发”,而是走一条明确的媒体处理链:

  • RTP packet -> codec decode -> AudioFrame
  • AudioFrame -> MediaProcessor
  • MediaProcessor 输出新的 AudioFrame
  • 新音频帧 -> codec encode -> RTP packet

在第三版里,MediaProcessor 的第一个实现是 EchoMediaProcessor,它只是把输入帧原样返回。 但这个“原样返回”的层次已经从“网络包级别”上升到了“音频帧级别”。

目标是让媒体层真正从协议逻辑中解耦出来,使后续替换为:

  • ASR processor
  • TTS processor
  • Gemini processor
  • 录音、静音检测、降噪、打断控制等处理器

都不需要推翻 RTP 框架本身。


4. 为实时 AI 音频交互打基础

第三版虽然表面上还是 echo,但它的真正价值不是“再做一次回声”,而是建立了未来可接 AI 的媒体入口。

因为经过第三版之后,系统已经具备了:

  • 通话级 codec 协商信息
  • RTP 包解析与重组能力
  • PCM 音频帧抽象
  • 独立媒体处理器接口
  • 本地发送序列号、时间戳、SSRC 管理

这意味着后续如果接入实时语音 AI,只需要把 EchoMediaProcessor 替换成新的媒体处理器,就可以逐步打通:

  • 8k PCM -> 16k PCM 重采样
  • PCM -> ASR / Gemini 输入
  • LLM / TTS 输出音频
  • 再编码回 RTP

目标不是停留在 echo,而是让当前系统具备向“实时双向语音机器人”演进的基础结构。


第三版本的阶段性结果

第三版完成后,系统在媒体层面的能力会有一个明显跃迁。

从能力上看,它不再只是:

  • 能建 SIP 呼叫
  • 能收 RTP 包
  • 能原样回显 UDP 数据

而是升级为:

  • 能识别和解析 RTP
  • 能根据协商 codec 正确解码音频
  • 能把音频转换成统一的 PCM 帧
  • 能通过处理器对音频帧做处理
  • 能重新编码并组装 RTP 回传

这标志着整个项目从“验证信令和媒体连通性”进入到了“构建可扩展媒体处理平台”的阶段。


一句话总结

第三版本的核心背景,是在 SIP、SDP、会话管理已经基本稳定之后,继续向媒体层深入,把 RTP 从“原包回声验证”升级为“音频帧级处理架构”,从而为后续接入 ASR、TTS、Gemini 等实时语音 AI 能力打下基础。


一、建议包结构

com.litongjava.sip.rtp
├── codec
│   ├── AudioCodec.java
│   ├── PcmuCodec.java
│   └── PcmaCodec.java
│
├── media
│   ├── AudioFrame.java
│   ├── MediaProcessor.java
│   └── EchoMediaProcessor.java
│
├── packet
│   ├── RtpPacket.java
│   ├── RtpPacketParser.java
│   └── RtpPacketWriter.java
│
├── server
│   ├── RtpUdpHandler.java
│   └── RtpUdpServer.java

二、先补 CallSession 字段

为了让 RTP handler 知道当前通话该用哪个 codec、该回给谁,建议给 CallSession 再加一点字段。

package com.litongjava.sip.model;

import com.litongjava.sip.rtp.RtpUdpServer;
import com.litongjava.sip.sdp.CodecSpec;

public class CallSession {

  private String callId;
  private String fromTag;
  private String toTag;

  private String transport;

  private String remoteSipIp;
  private int remoteSipPort;

  private String remoteRtpIp;
  private int remoteRtpPort;
  private int localRtpPort;

  private long createdTime;
  private long updatedTime;
  private long ackDeadline;

  private boolean ackReceived;
  private boolean terminated;

  private String last200Ok;
  private RtpUdpServer rtpServer;

  private CodecSpec selectedCodec;
  private boolean telephoneEventSupported;
  private int remoteTelephoneEventPayloadType = -1;
  private int ptime = 20;

  // 第三阶段新增
  private long localSsrc = System.nanoTime() & 0xFFFFFFFFL;
  private int sendSequence = 0;
  private long sendTimestamp = 0;
  private boolean rtpInitialized = false;

  public synchronized int nextSendSequence() {
    sendSequence = (sendSequence + 1) & 0xFFFF;
    return sendSequence;
  }

  public synchronized long nextSendTimestamp(int sampleCount) {
    if (!rtpInitialized) {
      rtpInitialized = true;
      sendTimestamp = sampleCount;
      return sendTimestamp;
    }
    sendTimestamp = (sendTimestamp + sampleCount) & 0xFFFFFFFFL;
    return sendTimestamp;
  }

  public long getLocalSsrc() {
    return localSsrc;
  }

  public void setLocalSsrc(long localSsrc) {
    this.localSsrc = localSsrc;
  }

  public int getSendSequence() {
    return sendSequence;
  }

  public void setSendSequence(int sendSequence) {
    this.sendSequence = sendSequence;
  }

  public long getSendTimestamp() {
    return sendTimestamp;
  }

  public void setSendTimestamp(long sendTimestamp) {
    this.sendTimestamp = sendTimestamp;
  }

  public boolean isRtpInitialized() {
    return rtpInitialized;
  }

  public void setRtpInitialized(boolean rtpInitialized) {
    this.rtpInitialized = rtpInitialized;
  }

  public String getCallId() {
    return callId;
  }

  public void setCallId(String callId) {
    this.callId = callId;
  }

  public String getFromTag() {
    return fromTag;
  }

  public void setFromTag(String fromTag) {
    this.fromTag = fromTag;
  }

  public String getToTag() {
    return toTag;
  }

  public void setToTag(String toTag) {
    this.toTag = toTag;
  }

  public String getTransport() {
    return transport;
  }

  public void setTransport(String transport) {
    this.transport = transport;
  }

  public String getRemoteSipIp() {
    return remoteSipIp;
  }

  public void setRemoteSipIp(String remoteSipIp) {
    this.remoteSipIp = remoteSipIp;
  }

  public int getRemoteSipPort() {
    return remoteSipPort;
  }

  public void setRemoteSipPort(int remoteSipPort) {
    this.remoteSipPort = remoteSipPort;
  }

  public String getRemoteRtpIp() {
    return remoteRtpIp;
  }

  public void setRemoteRtpIp(String remoteRtpIp) {
    this.remoteRtpIp = remoteRtpIp;
  }

  public int getRemoteRtpPort() {
    return remoteRtpPort;
  }

  public void setRemoteRtpPort(int remoteRtpPort) {
    this.remoteRtpPort = remoteRtpPort;
  }

  public int getLocalRtpPort() {
    return localRtpPort;
  }

  public void setLocalRtpPort(int localRtpPort) {
    this.localRtpPort = localRtpPort;
  }

  public long getCreatedTime() {
    return createdTime;
  }

  public void setCreatedTime(long createdTime) {
    this.createdTime = createdTime;
  }

  public long getUpdatedTime() {
    return updatedTime;
  }

  public void setUpdatedTime(long updatedTime) {
    this.updatedTime = updatedTime;
  }

  public long getAckDeadline() {
    return ackDeadline;
  }

  public void setAckDeadline(long ackDeadline) {
    this.ackDeadline = ackDeadline;
  }

  public boolean isAckReceived() {
    return ackReceived;
  }

  public void setAckReceived(boolean ackReceived) {
    this.ackReceived = ackReceived;
  }

  public boolean isTerminated() {
    return terminated;
  }

  public void setTerminated(boolean terminated) {
    this.terminated = terminated;
  }

  public String getLast200Ok() {
    return last200Ok;
  }

  public void setLast200Ok(String last200Ok) {
    this.last200Ok = last200Ok;
  }

  public RtpUdpServer getRtpServer() {
    return rtpServer;
  }

  public void setRtpServer(RtpUdpServer rtpServer) {
    this.rtpServer = rtpServer;
  }

  public CodecSpec getSelectedCodec() {
    return selectedCodec;
  }

  public void setSelectedCodec(CodecSpec selectedCodec) {
    this.selectedCodec = selectedCodec;
  }

  public boolean isTelephoneEventSupported() {
    return telephoneEventSupported;
  }

  public void setTelephoneEventSupported(boolean telephoneEventSupported) {
    this.telephoneEventSupported = telephoneEventSupported;
  }

  public int getRemoteTelephoneEventPayloadType() {
    return remoteTelephoneEventPayloadType;
  }

  public void setRemoteTelephoneEventPayloadType(int remoteTelephoneEventPayloadType) {
    this.remoteTelephoneEventPayloadType = remoteTelephoneEventPayloadType;
  }

  public int getPtime() {
    return ptime;
  }

  public void setPtime(int ptime) {
    this.ptime = ptime;
  }
}

三、RTP 包对象

1)RtpPacket

package com.litongjava.sip.rtp.packet;

public class RtpPacket {

  private int version;
  private boolean padding;
  private boolean extension;
  private int csrcCount;
  private boolean marker;
  private int payloadType;
  private int sequenceNumber;
  private long timestamp;
  private long ssrc;
  private byte[] payload;

  public int getVersion() {
    return version;
  }

  public void setVersion(int version) {
    this.version = version;
  }

  public boolean isPadding() {
    return padding;
  }

  public void setPadding(boolean padding) {
    this.padding = padding;
  }

  public boolean isExtension() {
    return extension;
  }

  public void setExtension(boolean extension) {
    this.extension = extension;
  }

  public int getCsrcCount() {
    return csrcCount;
  }

  public void setCsrcCount(int csrcCount) {
    this.csrcCount = csrcCount;
  }

  public boolean isMarker() {
    return marker;
  }

  public void setMarker(boolean marker) {
    this.marker = marker;
  }

  public int getPayloadType() {
    return payloadType;
  }

  public void setPayloadType(int payloadType) {
    this.payloadType = payloadType;
  }

  public int getSequenceNumber() {
    return sequenceNumber;
  }

  public void setSequenceNumber(int sequenceNumber) {
    this.sequenceNumber = sequenceNumber;
  }

  public long getTimestamp() {
    return timestamp;
  }

  public void setTimestamp(long timestamp) {
    this.timestamp = timestamp;
  }

  public long getSsrc() {
    return ssrc;
  }

  public void setSsrc(long ssrc) {
    this.ssrc = ssrc;
  }

  public byte[] getPayload() {
    return payload;
  }

  public void setPayload(byte[] payload) {
    this.payload = payload;
  }
}

四、RTP 解析与写回

2)RtpPacketParser

先支持标准 12 字节 header,不处理 extension 和 CSRC 扩展之外的复杂情况,但保留兼容。

package com.litongjava.sip.rtp.packet;

public class RtpPacketParser {

  public RtpPacket parse(byte[] data) {
    if (data == null || data.length < 12) {
      throw new IllegalArgumentException("rtp packet too short");
    }

    int b0 = data[0] & 0xFF;
    int b1 = data[1] & 0xFF;

    int version = (b0 >> 6) & 0x03;
    if (version != 2) {
      throw new IllegalArgumentException("unsupported rtp version: " + version);
    }

    boolean padding = ((b0 >> 5) & 0x01) == 1;
    boolean extension = ((b0 >> 4) & 0x01) == 1;
    int csrcCount = b0 & 0x0F;

    boolean marker = ((b1 >> 7) & 0x01) == 1;
    int payloadType = b1 & 0x7F;

    int sequenceNumber = ((data[2] & 0xFF) << 8) | (data[3] & 0xFF);

    long timestamp =
        ((long) (data[4] & 0xFF) << 24) |
        ((long) (data[5] & 0xFF) << 16) |
        ((long) (data[6] & 0xFF) << 8) |
        ((long) (data[7] & 0xFF));

    long ssrc =
        ((long) (data[8] & 0xFF) << 24) |
        ((long) (data[9] & 0xFF) << 16) |
        ((long) (data[10] & 0xFF) << 8) |
        ((long) (data[11] & 0xFF));

    int headerLen = 12 + csrcCount * 4;

    if (data.length < headerLen) {
      throw new IllegalArgumentException("invalid rtp header length");
    }

    if (extension) {
      if (data.length < headerLen + 4) {
        throw new IllegalArgumentException("invalid rtp extension header");
      }
      int extLenWords = ((data[headerLen + 2] & 0xFF) << 8) | (data[headerLen + 3] & 0xFF);
      headerLen += 4 + extLenWords * 4;
      if (data.length < headerLen) {
        throw new IllegalArgumentException("invalid rtp extension payload");
      }
    }

    int payloadLen = data.length - headerLen;
    if (padding) {
      int paddingCount = data[data.length - 1] & 0xFF;
      payloadLen -= paddingCount;
      if (payloadLen < 0) {
        throw new IllegalArgumentException("invalid rtp padding");
      }
    }

    byte[] payload = new byte[payloadLen];
    System.arraycopy(data, headerLen, payload, 0, payloadLen);

    RtpPacket packet = new RtpPacket();
    packet.setVersion(version);
    packet.setPadding(padding);
    packet.setExtension(extension);
    packet.setCsrcCount(csrcCount);
    packet.setMarker(marker);
    packet.setPayloadType(payloadType);
    packet.setSequenceNumber(sequenceNumber);
    packet.setTimestamp(timestamp);
    packet.setSsrc(ssrc);
    packet.setPayload(payload);
    return packet;
  }
}

3)RtpPacketWriter

package com.litongjava.sip.rtp.packet;

public class RtpPacketWriter {

  public byte[] write(RtpPacket packet) {
    byte[] payload = packet.getPayload();
    if (payload == null) {
      payload = new byte[0];
    }

    byte[] out = new byte[12 + payload.length];

    int b0 = 0;
    b0 |= (2 & 0x03) << 6; // version=2
    if (packet.isPadding()) {
      b0 |= 1 << 5;
    }
    if (packet.isExtension()) {
      b0 |= 1 << 4;
    }
    b0 |= (packet.getCsrcCount() & 0x0F);

    int b1 = 0;
    if (packet.isMarker()) {
      b1 |= 1 << 7;
    }
    b1 |= (packet.getPayloadType() & 0x7F);

    out[0] = (byte) b0;
    out[1] = (byte) b1;

    int seq = packet.getSequenceNumber() & 0xFFFF;
    out[2] = (byte) ((seq >> 8) & 0xFF);
    out[3] = (byte) (seq & 0xFF);

    long ts = packet.getTimestamp() & 0xFFFFFFFFL;
    out[4] = (byte) ((ts >> 24) & 0xFF);
    out[5] = (byte) ((ts >> 16) & 0xFF);
    out[6] = (byte) ((ts >> 8) & 0xFF);
    out[7] = (byte) (ts & 0xFF);

    long ssrc = packet.getSsrc() & 0xFFFFFFFFL;
    out[8] = (byte) ((ssrc >> 24) & 0xFF);
    out[9] = (byte) ((ssrc >> 16) & 0xFF);
    out[10] = (byte) ((ssrc >> 8) & 0xFF);
    out[11] = (byte) (ssrc & 0xFF);

    System.arraycopy(payload, 0, out, 12, payload.length);
    return out;
  }
}

五、音频帧和处理器

4)AudioFrame

统一用 PCM16 单声道。

package com.litongjava.sip.rtp.media;

public class AudioFrame {

  private short[] samples;
  private int sampleRate;
  private int channels;
  private long rtpTimestamp;

  public AudioFrame() {
  }

  public AudioFrame(short[] samples, int sampleRate, int channels, long rtpTimestamp) {
    this.samples = samples;
    this.sampleRate = sampleRate;
    this.channels = channels;
    this.rtpTimestamp = rtpTimestamp;
  }

  public short[] getSamples() {
    return samples;
  }

  public void setSamples(short[] samples) {
    this.samples = samples;
  }

  public int getSampleRate() {
    return sampleRate;
  }

  public void setSampleRate(int sampleRate) {
    this.sampleRate = sampleRate;
  }

  public int getChannels() {
    return channels;
  }

  public void setChannels(int channels) {
    this.channels = channels;
  }

  public long getRtpTimestamp() {
    return rtpTimestamp;
  }

  public void setRtpTimestamp(long rtpTimestamp) {
    this.rtpTimestamp = rtpTimestamp;
  }

  public int sampleCount() {
    return samples == null ? 0 : samples.length;
  }
}

5)MediaProcessor

package com.litongjava.sip.rtp.media;

import com.litongjava.sip.model.CallSession;

public interface MediaProcessor {
  AudioFrame process(AudioFrame input, CallSession session);
}

6)EchoMediaProcessor

package com.litongjava.sip.rtp.media;

import com.litongjava.sip.model.CallSession;

public class EchoMediaProcessor implements MediaProcessor {

  @Override
  public AudioFrame process(AudioFrame input, CallSession session) {
    return input;
  }
}

六、Codec 接口与 G.711 实现

7)AudioCodec

package com.litongjava.sip.rtp.codec;

public interface AudioCodec {

  String codecName();

  int payloadType();

  int sampleRate();

  short[] decode(byte[] payload);

  byte[] encode(short[] pcm16);
}

8)PcmuCodec

G.711 μ-law。

package com.litongjava.sip.rtp.codec;

public class PcmuCodec implements AudioCodec {

  @Override
  public String codecName() {
    return "PCMU";
  }

  @Override
  public int payloadType() {
    return 0;
  }

  @Override
  public int sampleRate() {
    return 8000;
  }

  @Override
  public short[] decode(byte[] payload) {
    short[] out = new short[payload.length];
    for (int i = 0; i < payload.length; i++) {
      out[i] = ulawToLinear(payload[i]);
    }
    return out;
  }

  @Override
  public byte[] encode(short[] pcm16) {
    byte[] out = new byte[pcm16.length];
    for (int i = 0; i < pcm16.length; i++) {
      out[i] = linearToUlaw(pcm16[i]);
    }
    return out;
  }

  private short ulawToLinear(byte ulaw) {
    int u = ~ulaw & 0xFF;
    int sign = u & 0x80;
    int exponent = (u >> 4) & 0x07;
    int mantissa = u & 0x0F;
    int sample = ((mantissa << 3) + 0x84) << exponent;
    sample -= 0x84;
    return (short) (sign != 0 ? -sample : sample);
  }

  private byte linearToUlaw(short sample) {
    final int BIAS = 0x84;
    final int CLIP = 32635;

    int pcm = sample;
    int sign = (pcm >> 8) & 0x80;
    if (sign != 0) {
      pcm = -pcm;
    }
    if (pcm > CLIP) {
      pcm = CLIP;
    }

    pcm += BIAS;

    int exponent = 7;
    for (int expMask = 0x4000; (pcm & expMask) == 0 && exponent > 0; exponent--, expMask >>= 1) {
    }

    int mantissa = (pcm >> (exponent + 3)) & 0x0F;
    int ulaw = ~(sign | (exponent << 4) | mantissa) & 0xFF;
    return (byte) ulaw;
  }
}

9)PcmaCodec

G.711 A-law。

package com.litongjava.sip.rtp.codec;

public class PcmaCodec implements AudioCodec {

  @Override
  public String codecName() {
    return "PCMA";
  }

  @Override
  public int payloadType() {
    return 8;
  }

  @Override
  public int sampleRate() {
    return 8000;
  }

  @Override
  public short[] decode(byte[] payload) {
    short[] out = new short[payload.length];
    for (int i = 0; i < payload.length; i++) {
      out[i] = alawToLinear(payload[i]);
    }
    return out;
  }

  @Override
  public byte[] encode(short[] pcm16) {
    byte[] out = new byte[pcm16.length];
    for (int i = 0; i < pcm16.length; i++) {
      out[i] = linearToAlaw(pcm16[i]);
    }
    return out;
  }

  private short alawToLinear(byte alaw) {
    int a = alaw ^ 0x55;
    int sign = a & 0x80;
    int exponent = (a & 0x70) >> 4;
    int mantissa = a & 0x0F;

    int sample;
    if (exponent == 0) {
      sample = (mantissa << 4) + 8;
    } else {
      sample = ((mantissa << 4) + 0x108) << (exponent - 1);
    }

    return (short) (sign == 0 ? sample : -sample);
  }

  private byte linearToAlaw(short sample) {
    int pcm = sample;
    int sign;
    int exponent;
    int mantissa;
    int alaw;

    sign = (pcm & 0x8000) >> 8;
    if (sign != 0) {
      pcm = -pcm;
    }

    if (pcm > 32767) {
      pcm = 32767;
    }

    if (pcm >= 256) {
      exponent = 7;
      for (int expMask = 0x4000; (pcm & expMask) == 0 && exponent > 0; exponent--, expMask >>= 1) {
      }
      mantissa = (pcm >> (exponent + 3)) & 0x0F;
      alaw = (exponent << 4) | mantissa;
    } else {
      alaw = pcm >> 4;
    }

    alaw ^= (sign ^ 0x55);
    return (byte) alaw;
  }
}

七、升级后的 RtpUdpHandler

这个类是重点。

它做这些事:

  • 根据本地 RTP 端口找到 CallSession
  • 解析 RTP
  • 过滤 telephone-event
  • 根据 session.selectedCodec 选择解码器
  • 解码为 PCM16 音频帧
  • 交给 MediaProcessor
  • 再编码
  • 重新组包
  • 发送给远端 RTP 地址端口

10)RtpUdpHandler

package com.litongjava.sip.rtp.server;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioCodec;
import com.litongjava.sip.rtp.codec.PcmaCodec;
import com.litongjava.sip.rtp.codec.PcmuCodec;
import com.litongjava.sip.rtp.media.AudioFrame;
import com.litongjava.sip.rtp.media.EchoMediaProcessor;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.sip.rtp.packet.RtpPacket;
import com.litongjava.sip.rtp.packet.RtpPacketParser;
import com.litongjava.sip.rtp.packet.RtpPacketWriter;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RtpUdpHandler implements UdpHandler {

  private final int localPort;
  private final CallSessionManager sessionManager;

  private final RtpPacketParser rtpPacketParser = new RtpPacketParser();
  private final RtpPacketWriter rtpPacketWriter = new RtpPacketWriter();
  private final MediaProcessor mediaProcessor;

  private final AudioCodec pcmuCodec = new PcmuCodec();
  private final AudioCodec pcmaCodec = new PcmaCodec();

  public RtpUdpHandler(int localPort, CallSessionManager sessionManager) {
    this(localPort, sessionManager, new EchoMediaProcessor());
  }

  public RtpUdpHandler(int localPort, CallSessionManager sessionManager, MediaProcessor mediaProcessor) {
    this.localPort = localPort;
    this.sessionManager = sessionManager;
    this.mediaProcessor = mediaProcessor;
  }

  @Override
  public void handler(UdpPacket udpPacket, DatagramSocket socket) {
    try {
      CallSession session = sessionManager.getByLocalRtpPort(localPort);
      if (session == null || session.isTerminated()) {
        return;
      }

      Node remote = udpPacket.getRemote();
      byte[] data = udpPacket.getData();
      if (data == null || data.length < 12) {
        return;
      }

      RtpPacket in = rtpPacketParser.parse(data);

      // 更新远端 RTP 地址,适配首次学习或端口漂移
      if (session.getRemoteRtpIp() == null || session.getRemoteRtpIp().isEmpty()) {
        session.setRemoteRtpIp(remote.getIp());
      }
      if (session.getRemoteRtpPort() <= 0) {
        session.setRemoteRtpPort(remote.getPort());
      }

      // DTMF event 先忽略,不做 echo
      if (session.isTelephoneEventSupported() &&
          in.getPayloadType() == session.getRemoteTelephoneEventPayloadType()) {
        session.setUpdatedTime(System.currentTimeMillis());
        return;
      }

      AudioCodec codec = chooseCodec(session);
      if (codec == null) {
        return;
      }

      // 有些终端可能发来的 payload type 和协商结果不一致,先只按 session 选中 codec 解码
      short[] pcm = codec.decode(in.getPayload());
      AudioFrame inputFrame = new AudioFrame(pcm, codec.sampleRate(), 1, in.getTimestamp());

      AudioFrame outputFrame = mediaProcessor.process(inputFrame, session);
      if (outputFrame == null || outputFrame.getSamples() == null || outputFrame.getSamples().length == 0) {
        return;
      }

      byte[] outPayload = codec.encode(outputFrame.getSamples());

      RtpPacket out = new RtpPacket();
      out.setVersion(2);
      out.setPadding(false);
      out.setExtension(false);
      out.setCsrcCount(0);
      out.setMarker(false);
      out.setPayloadType(session.getSelectedCodec().getPayloadType());
      out.setSequenceNumber(session.nextSendSequence());
      out.setTimestamp(session.nextSendTimestamp(outputFrame.sampleCount()));
      out.setSsrc(session.getLocalSsrc());
      out.setPayload(outPayload);

      byte[] outBytes = rtpPacketWriter.write(out);

      DatagramPacket resp = new DatagramPacket(
          outBytes,
          outBytes.length,
          new InetSocketAddress(session.getRemoteRtpIp(), session.getRemoteRtpPort()));
      socket.send(resp);

      session.setUpdatedTime(System.currentTimeMillis());
    } catch (Exception e) {
      log.error("rtp handler error, localPort={}", localPort, e);
    }
  }

  private AudioCodec chooseCodec(CallSession session) {
    if (session.getSelectedCodec() == null) {
      return null;
    }

    String codecName = session.getSelectedCodec().getCodecName();
    if ("PCMU".equalsIgnoreCase(codecName)) {
      return pcmuCodec;
    }
    if ("PCMA".equalsIgnoreCase(codecName)) {
      return pcmaCodec;
    }
    return null;
  }
}

八、补 CallSessionManager 查询 RTP 端口的方法

你现在的 RtpUdpHandler 需要通过本地端口找到 session,所以 CallSessionManager 加一个查询。

11)升级 CallSessionManager

package com.litongjava.sip.server.session;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.litongjava.sip.model.CallSession;

public class CallSessionManager {

  private final Map<String, CallSession> sessions = new ConcurrentHashMap<>();

  public CallSession getByCallId(String callId) {
    if (callId == null) {
      return null;
    }
    return sessions.get(callId);
  }

  public CallSession getByLocalRtpPort(int localRtpPort) {
    for (CallSession session : sessions.values()) {
      if (session != null && session.getLocalRtpPort() == localRtpPort) {
        return session;
      }
    }
    return null;
  }

  public CallSession createOrUpdate(CallSession session) {
    if (session == null || session.getCallId() == null) {
      throw new IllegalArgumentException("call session or callId is null");
    }
    session.setUpdatedTime(System.currentTimeMillis());
    sessions.put(session.getCallId(), session);
    return session;
  }

  public void markAckReceived(String callId) {
    CallSession session = sessions.get(callId);
    if (session != null) {
      session.setAckReceived(true);
      session.setUpdatedTime(System.currentTimeMillis());
    }
  }

  public void markTerminated(String callId) {
    CallSession session = sessions.get(callId);
    if (session != null) {
      session.setTerminated(true);
      session.setUpdatedTime(System.currentTimeMillis());
    }
  }

  public void remove(String callId) {
    sessions.remove(callId);
  }

  public Map<String, CallSession> snapshot() {
    return Map.copyOf(sessions);
  }
}

九、升级 RtpUdpServer

你原来 RtpUdpServer 是写死用 RtpEchoUdpHandler。 现在要换成新的 RtpUdpHandler,所以改成可注入 CallSessionManager。

12)升级 RtpUdpServer

package com.litongjava.sip.rtp;

import java.net.SocketException;

import com.litongjava.sip.rtp.server.RtpUdpHandler;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;

public class RtpUdpServer {

  private final int port;
  private final CallSessionManager sessionManager;
  private UdpServer udpServer;

  public RtpUdpServer(int port, CallSessionManager sessionManager) {
    this.port = port;
    this.sessionManager = sessionManager;
  }

  public void start() throws SocketException {
    UdpServerConf conf = new UdpServerConf(port, new RtpUdpHandler(port, sessionManager), 5000);
    this.udpServer = new UdpServer(conf);
    this.udpServer.start();
  }

  public void stop() {
    if (udpServer != null) {
      udpServer.stop();
    }
  }

  public int port() {
    return port;
  }
}

十、升级 RtpServerManager

因为 RtpUdpServer 构造变了,RtpServerManager 也要改成持有 CallSessionManager。

13)升级 RtpServerManager

package com.litongjava.sip.rtp;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.server.session.CallSessionManager;

public class RtpServerManager {

  private final String localIp;
  private final RtpPortAllocator allocator;
  private final CallSessionManager sessionManager;

  public RtpServerManager(String localIp, CallSessionManager sessionManager) {
    this(localIp, new RtpPortAllocator(), sessionManager);
  }

  public RtpServerManager(String localIp, RtpPortAllocator allocator, CallSessionManager sessionManager) {
    this.localIp = localIp;
    this.allocator = allocator;
    this.sessionManager = sessionManager;
  }

  public CallSession allocateAndStart(CallSession session) throws Exception {
    int rtpPort = allocator.allocate();
    RtpUdpServer rtpServer = new RtpUdpServer(rtpPort, sessionManager);
    rtpServer.start();

    session.setLocalRtpPort(rtpPort);
    session.setRtpServer(rtpServer);
    session.setUpdatedTime(System.currentTimeMillis());
    return session;
  }

  public void stopAndRelease(CallSession session) {
    if (session == null) {
      return;
    }

    try {
      if (session.getRtpServer() != null) {
        session.getRtpServer().stop();
      }
    } finally {
      if (session.getLocalRtpPort() > 0) {
        allocator.release(session.getLocalRtpPort());
      }
    }
  }

  public String getLocalIp() {
    return localIp;
  }
}

十一、SipServerConfig 里构造方式要改一下

因为 RtpServerManager 现在需要 CallSessionManager。

CallSessionManager sessionManager = new CallSessionManager();
RtpServerManager rtpServerManager = new RtpServerManager(localIp, sessionManager);

SipInviteOnlyTcpHandler tcpHandler =
    new SipInviteOnlyTcpHandler(localIp, sessionManager, rtpServerManager);

SipInviteOnlyUdpHandler udpHandler =
    new SipInviteOnlyUdpHandler(localIp, sessionManager, rtpServerManager);

十二、这版实现了什么

这版落完后,你的 RTP 不再是“把原始 UDP 包原样返回”,而是:

  1. 收到 RTP
  2. 解析 RTP 头
  3. 按协商 codec 解码成 PCM16
  4. 构造成 AudioFrame
  5. 通过 EchoMediaProcessor 做音频帧级回环
  6. 再编码成 G.711 payload
  7. 重新生成 RTP header
  8. 发回远端

也就是说,你已经从:

  • packet echo

升级成了:

  • audio frame echo

这对后面接 AI 很关键,因为未来你只需要把:

new EchoMediaProcessor()

替换成:

new GeminiMediaProcessor()

媒体总框架就不需要推翻。


Edit this page
Last Updated: 3/7/26, 10:11 AM
Contributors: litongjava