Tio Boot DocsTio Boot Docs
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
Home
  • java-db
  • api-table
  • mysql
  • postgresql
  • oceanbase
  • Enjoy
  • Tio Boot Admin
  • ai_agent
  • translator
  • knowlege_base
  • ai-search
  • 案例
Abount
  • Github
  • Gitee
  • 01_tio-boot 简介

    • tio-boot:新一代高性能 Java Web 开发框架
    • tio-boot 入门示例
    • Tio-Boot 配置 : 现代化的配置方案
    • tio-boot 整合 Logback
    • tio-boot 整合 hotswap-classloader 实现热加载
    • 自行编译 tio-boot
    • 最新版本
    • 开发规范
  • 02_部署

    • 使用 Maven Profile 实现分环境打包 tio-boot 项目
    • Maven 项目配置详解:依赖与 Profiles 配置
    • tio-boot 打包成 FatJar
    • 使用 GraalVM 构建 tio-boot Native 程序
    • 使用 Docker 部署 tio-boot
    • 部署到 Fly.io
    • 部署到 AWS Lambda
    • 到阿里云云函数
    • 使用 Deploy 工具部署
    • 使用Systemctl启动项目
    • 使用 Jenkins 部署 Tio-Boot 项目
    • 使用 Nginx 反向代理 Tio-Boot
    • 使用 Supervisor 管理 Java 应用
    • 已过时
    • 胖包与瘦包的打包与部署
  • 03_配置

    • 配置参数
    • 服务器监听器
    • 内置缓存系统 AbsCache
    • 使用 Redis 作为内部 Cache
    • 静态文件处理器
    • 基于域名的静态资源隔离
    • DecodeExceptionHandler
    • 开启虚拟线程(Virtual Thread)
    • 框架级错误通知
  • 04_原理

    • 生命周期
    • 请求处理流程
    • 重要的类
  • 05_json

    • Json
    • 接受 JSON 和响应 JSON
    • 响应实体类
  • 06_web

    • 概述
    • 接收请求参数
    • 接收日期参数
    • 接收数组参数
    • 返回字符串
    • 返回文本数据
    • 返回网页
    • 请求和响应字节
    • 文件上传
    • 文件下载
    • 返回视频文件并支持断点续传
    • http Session
    • Cookie
    • HttpRequest
    • HttpResponse
    • Resps
    • RespBodyVo
    • Controller拦截器
    • 请求拦截器
    • LoggingInterceptor
    • 全局异常处理器
    • 异步处理
    • 动态 返回 CSS 实现
    • 返回图片
    • 跨域
    • 添加 Controller
    • Transfer-Encoding: chunked 实时音频播放
    • Server-Sent Events (SSE)
    • handler入门
    • 返回 multipart
    • 待定
    • 自定义 Handler 转发请求
    • 使用 HttpForwardHandler 转发所有请求
    • 常用工具类
    • HTTP Basic 认证
    • Http响应加密
    • 使用零拷贝发送大文件
    • 分片上传
    • 接口访问统计
    • 接口请求和响应数据记录
    • WebJars
    • JProtobuf
    • 测速
    • Gzip Bomb:使用压缩炸弹防御恶意爬虫
  • 07_validate

    • 数据紧校验规范
    • 参数校验
  • 08_websocket

    • 使用 tio-boot 搭建 WebSocket 服务
    • WebSocket 聊天室项目示例
  • 09_java-db

    • java‑db
    • 操作数据库入门示例
    • SQL 模板 (SqlTemplates)
    • 数据源配置与使用
    • ActiveRecord
    • Db 工具类
    • 批量操作
    • Model
    • Model生成器
    • 注解
    • 异常处理
    • 数据库事务处理
    • Cache 缓存
    • Dialect 多数据库支持
    • 表关联操作
    • 复合主键
    • Oracle 支持
    • Enjoy SQL 模板
    • 整合 Enjoy 模板最佳实践
    • 多数据源支持
    • 独立使用 ActiveRecord
    • 调用存储过程
    • java-db 整合 Guava 的 Striped 锁优化
    • 生成 SQL
    • 通过实体类操作数据库
    • java-db 读写分离
    • Spring Boot 整合 Java-DB
    • like 查询
    • 常用操作示例
    • Druid 监控集成指南
    • SQL 统计
  • 10_api-table

    • ApiTable 概述
    • 使用 ApiTable 连接 SQLite
    • 使用 ApiTable 连接 Mysql
    • 使用 ApiTable 连接 Postgres
    • 使用 ApiTable 连接 TDEngine
    • 使用 api-table 连接 oracle
    • 使用 api-table 连接 mysql and tdengine 多数据源
    • EasyExcel 导出
    • EasyExcel 导入
    • 预留
    • 预留
    • ApiTable 实现增删改查
    • 数组类型
    • 单独使用 ApiTable
    • TQL(Table SQL)前端输入规范
  • 11_aop

    • JFinal-aop
    • Aop 工具类
    • 配置
    • 配置
    • 独立使用 JFinal Aop
    • @AImport
    • 自定义注解拦截器
    • 原理解析
  • 12_cache

    • Caffine
    • Jedis-redis
    • hutool RedisDS
    • Redisson
    • Caffeine and redis
    • CacheUtils 工具类
    • 使用 CacheUtils 整合 caffeine 和 redis 实现的两级缓存
    • 使用 java-db 整合 ehcache
    • 使用 java-db 整合 redis
    • Java DB Redis 相关 Api
    • redis 使用示例
  • 13_认证和权限

    • FixedTokenInterceptor
    • TokenManager
    • 数据表
    • 匿名登录
    • 注册和登录
    • 个人中心
    • 重置密码
    • Google 登录
    • 短信登录
    • 移动端微信登录
    • 移动端重置密码
    • 微信登录
    • 移动端微信登录
    • 权限校验注解
    • Sa-Token
    • sa-token 登录注册
    • StpUtil.isLogin() 源码解析
  • 14_i18n

    • i18n
  • 15_enjoy

    • tio-boot 整合 Enjoy 模版引擎文档
    • Tio-Boot 整合 Java-DB 与 Enjoy 模板引擎示例
    • 引擎配置
    • 表达式
    • 指令
    • 注释
    • 原样输出
    • Shared Method 扩展
    • Shared Object 扩展
    • Extension Method 扩展
    • Spring boot 整合
    • 独立使用 Enjoy
    • tio-boot enjoy 自定义指令 localeDate
    • PromptEngine
    • Enjoy 入门示例-擎渲染大模型请求体
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
    • Tio Boot + Enjoy:分页与 SEO 实战指南
  • 16_定时任务

    • Quartz 定时任务集成指南
    • 分布式定时任务 xxl-jb
    • cron4j 使用指南
  • 17_tests

    • TioBootTest 类
  • 18_tio

    • TioBootServer
    • 独立端口启动 TCP 服务器
    • 内置 TCP 处理器
    • 独立启动 UDPServer
    • 使用内置 UDPServer
    • t-io 消息处理流程
    • tio-运行原理详解
    • TioConfig
    • ChannelContext
    • Tio 工具类
    • 业务数据绑定
    • 业务数据解绑
    • 发送数据
    • 关闭连接
    • Packet
    • 监控: 心跳
    • 监控: 客户端的流量数据
    • 监控: 单条 TCP 连接的流量数据
    • 监控: 端口的流量数据
    • 单条通道统计: ChannelStat
    • 所有通道统计: GroupStat
    • 资源共享
    • 成员排序
    • SSL
    • DecodeRunnable
    • 使用 AsynchronousSocketChannel 响应数据
    • 拉黑 IP
    • 深入解析 Tio 源码:构建高性能 Java 网络应用
  • 19_aio

    • ByteBuffer
    • AIO HTTP 服务器
    • 自定义和线程池和池化 ByteBuffer
    • AioHttpServer 应用示例 IP 属地查询
    • 手写 AIO Http 服务器
  • 20_netty

    • Netty TCP Server
    • Netty Web Socket Server
    • 使用 protoc 生成 Java 包文件
    • Netty WebSocket Server 二进制数据传输
    • Netty 组件详解
  • 21_netty-boot

    • Netty-Boot
    • 原理解析
    • 整合 Hot Reload
    • 整合 数据库
    • 整合 Redis
    • 整合 Elasticsearch
    • 整合 Dubbo
    • Listener
    • 文件上传
    • 拦截器
    • Spring Boot 整合 Netty-Boot
    • SSL 配置指南
    • ChannelInitializer
    • Reserve
  • 22_MQ

    • Mica-mqtt
    • EMQX
    • Disruptor
  • 23_tio-utils

    • tio-utils
    • HttpUtils
    • Notification
    • Email
    • JSON
    • File
    • Base64
    • 上传和下载
    • Http
    • Telegram
    • RsaUtils
    • EnvUtils 配置工具
    • 系统监控
    • 线程
    • 虚拟线程
    • 毫秒并发 ID (MCID) 生成方案
  • 24_tio-http-server

    • 使用 Tio-Http-Server 搭建简单的 HTTP 服务
    • tio-boot 添加 HttpRequestHandler
    • 在 Android 上使用 tio-boot 运行 HTTP 服务
    • tio-http-server-native
    • handler 常用操作
  • 25_tio-websocket

    • WebSocket 服务器
    • WebSocket Client
    • TCP数据转发
  • 26_tio-im

    • 通讯协议文档
    • ChatPacket.proto 文档
    • java protobuf
    • 数据表设计
    • 创建工程
    • 登录
    • 历史消息
    • 发消息
  • 27_mybatis

    • Tio-Boot 整合 MyBatis
    • 使用配置类方式整合 MyBatis
    • 整合数据源
    • 使用 mybatis-plus 整合 tdengine
    • 整合 mybatis-plus
  • 28_mongodb

    • tio-boot 使用 mongo-java-driver 操作 mongodb
  • 29_elastic-search

    • Elasticsearch
    • JavaDB 整合 ElasticSearch
    • Elastic 工具类使用指南
    • Elastic-search 注意事项
    • ES 课程示例文档
  • 30_magic-script

    • tio-boot 与 magic-script 集成指南
  • 31_groovy

    • tio-boot 整合 Groovy
  • 32_firebase

    • 整合 google firebase
    • Firebase Storage
    • Firebase Authentication
    • 使用 Firebase Admin SDK 进行匿名用户管理与自定义状态标记
    • 导出用户
    • 注册回调
    • 登录注册
  • 33_文件存储

    • 文件上传数据表
    • 本地存储
    • 存储文件到 亚马逊 S3
    • 存储文件到 腾讯 COS
    • 存储文件到 阿里云 OSS
  • 34_spider

    • jsoup
    • 爬取 z-lib.io 数据
    • 整合 WebMagic
    • WebMagic 示例:爬取学校课程数据
    • Playwright
    • Flexmark (Markdown 处理器)
    • tio-boot 整合 Playwright
    • 缓存网页数据
  • 36_integration_thirty_party

    • 整合 okhttp
    • 整合 GrpahQL
    • 集成 Mailjet
    • 整合 ip2region
    • 整合 GeoLite 离线库
    • 整合 Lark 机器人指南
    • 集成 Lark Mail 实现邮件发送
    • Thymeleaf
    • Swagger
    • Clerk 验证
  • 37_dubbo

    • 概述
    • dubbo 2.6.0
    • dubbo 2.6.0 调用过程
    • dubbo 3.2.0
  • 38_spring

    • Spring Boot Web 整合 Tio Boot
    • spring-boot-starter-webflux 整合 tio-boot
    • tio-boot 整合 spring-boot-starter
    • Tio Boot 整合 Spring Boot Starter db
    • Tio Boot 整合 Spring Boot Starter Data Redis 指南
  • 39_spring-cloud

    • tio-boot spring-cloud
  • 40_quarkus

    • Quarkus(无 HTTP)整合 tio-boot(有 HTTP)
    • tio-boot + Quarkus + Hibernate ORM Panache
  • 41_postgresql

    • PostgreSQL 安装
    • PostgreSQL 主键自增
    • PostgreSQL 日期类型
    • Postgresql 金融类型
    • PostgreSQL 数组类型
    • 索引
    • PostgreSQL 查询优化
    • 获取字段类型
    • PostgreSQL 全文检索
    • PostgreSQL 向量
    • PostgreSQL 优化向量查询
    • PostgreSQL 其他
  • 42_mysql

    • 使用 Docker 运行 MySQL
    • 常见问题
  • 43_oceanbase

    • 快速体验 OceanBase 社区版
    • 快速上手 OceanBase 数据库单机部署与管理
    • 诊断集群性能
    • 优化 SQL 性能指南
    • 待定
  • 49_jooq

    • 使用配置类方式整合 jOOQ
    • tio-boot + jOOQ 事务管理
    • 批量操作与性能优化
    • 代码生成(可选)与类型安全升级
    • JSONB、Upsert、窗口函数实战
    • 整合agroal
  • 50_media

    • JAVE 提取视频中的声音
    • Jave 提取视频中的图片
    • 待定
  • 51_asr

    • Whisper-JNI
  • 54_native-media

    • java-native-media
    • JNI 入门示例
    • mp3 拆分
    • mp4 转 mp3
    • 使用 libmp3lame 实现高质量 MP3 编码
    • Linux 编译
    • macOS 编译
    • 从 JAR 包中加载本地库文件
    • 支持的音频和视频格式
    • 任意格式转为 mp3
    • 通用格式转换
    • 通用格式拆分
    • 视频合并
    • VideoToHLS
    • split_video_to_hls 支持其他语言
    • 持久化 HLS 会话
    • 获取视频长度
    • 保存视频的最后一帧
    • 添加水印
    • linux版本
  • 55_cv

    • 使用 Java 运行 YOLOv8 ONNX 模型进行目标检测
    • tio-boot整合yolo
    • ONNX Runtime 推理说明
  • 58_telegram4j

    • 数据库设计
    • 基于 HTTP 协议开发 Telegram 翻译机器人
    • 基于 MTProto 协议开发 Telegram 翻译机器人
    • 过滤旧消息
    • 保存机器人消息
    • 定时推送
    • 增加命令菜单
    • 使用 telegram-Client
    • 使用自定义 StoreLayout
    • 延迟测试
    • Reactor 错误处理
    • Telegram4J 常见错误处理指南
  • 59_telegram-bots

    • TelegramBots 入门指南
    • 使用工具库 telegram-bot-base 开发翻译机器人
  • 60_LLM

    • 简介
    • 流式生成
    • 图片多模态输入
    • 协议自动转换 Google Gemini示例
    • 请求记录
    • 限流和错误处理
    • 整合Gemini realtime模型
    • Voice Agent 前端接入接口文档
    • 整合千问realtime模型
    • 增强检索(RAG)
    • 搜索+AI
    • AI 问答
    • 连接代码执行器
  • 61_ai_agent

    • 数据库设计
    • 示例问题管理
    • 会话管理
    • 历史记录
    • Perplexity API
    • 意图识别
    • 智能问答
    • 文件上传与解析文档
    • 翻译
    • 名人搜索功能实现
    • Ai studio gemini youbue 问答使用说明
    • 自建 YouTube 字幕问答系统
    • 自建 获取 youtube 字幕服务
    • 使用 OpenAI ASR 实现语音识别接口(Java 后端示例)
    • 定向搜索
    • 16
    • 17
    • 18
    • 在 tio-boot 应用中整合 ai-agent
    • 16
  • 63_knowlege_base

    • 数据库设计
    • 用户登录实现
    • 模型管理
    • 知识库管理
    • 文档拆分
    • 片段向量
    • 命中测试
    • 文档管理
    • 片段管理
    • 问题管理
    • 应用管理
    • 向量检索
    • 推理问答
    • 问答模块
    • 统计分析
    • 用户管理
    • api 管理
    • 存储文件到 S3
    • 文档解析优化
    • 片段汇总
    • 段落分块与检索
    • 多文档解析
    • 对话日志
    • 检索性能优化
    • Milvus
    • 文档解析方案和费用对比
    • 离线运行向量模型
  • 64_ai-search

    • ai-search 项目简介
    • ai-search 数据库文档
    • ai-search SearxNG 搜索引擎
    • ai-search Jina Reader API
    • ai-search Jina Search API
    • ai-search 搜索、重排与读取内容
    • ai-search PDF 文件处理
    • ai-search 推理问答
    • Google Custom Search JSON API
    • ai-search 意图识别
    • ai-search 问题重写
    • ai-search 系统 API 接口 WebSocket 版本
    • ai-search 搜索代码实现 WebSocket 版本
    • ai-search 生成建议问
    • ai-search 生成问题标题
    • ai-search 历史记录
    • Discover API
    • 翻译
    • Tavily Search API 文档
    • 对接 Tavily Search
    • 火山引擎 DeepSeek
    • 对接 火山引擎 DeepSeek
    • ai-search 搜索代码实现 SSE 版本
    • jar 包部署
    • Docker 部署
    • 爬取一个静态网站的所有数据
    • 网页数据预处理
    • 网页数据检索与问答流程整合
  • 65_ai-coding

    • Cline 提示词
    • Cline 提示词-中文版本
  • 66_java-uni-ai-server

    • 语音合成系统
    • Fish.audio TTS 接口说明文档与 Java 客户端封装
    • 整合 fishaudio 到 java-uni-ai-server 项目
    • 待定
  • 67_java-llm-proxy

    • 使用tio-boot搭建多模型LLM代理服务
  • 68_java-kit-server

    • Java 执行 python 代码
    • 通过大模型执行 Python 代码
    • 执行 Python (Manim) 代码
    • 待定
    • 待定
    • 待定
    • 视频下载增加水印说明文档
  • 69_ai-brower

    • AI Browser:基于用户指令的浏览器自动化系统
    • 提示词
    • dom构建- buildDomTree.js
    • dom构建- 将网页可点击元素提取与可视化
    • 提取网内容
    • 启动浏览器
    • 操作浏览器指令
  • 70_tio-boot-admin

    • 入门指南
    • 初始化数据
    • token 存储
    • 与前端集成
    • 文件上传
    • 网络请求
    • 多图片管理
    • 单图片管理(只读模式)
    • 布尔值管理
    • 字段联动
    • Word 管理
    • PDF 管理
    • 文章管理
    • 富文本编辑器
  • 73_tio-mail-wing

    • tio-mail-wing简介
    • 任务1:实现POP3系统
    • 使用 getmail 验证 tio-mail-wing POP3 服务
    • 任务2:实现 SMTP 服务
    • 数据库初始化文档
    • 用户管理
    • 邮件管理
    • 任务3:实现 SMTP 服务 数据库版本
    • 任务4:实现 POP3 服务(数据库版本)
    • IMAP 协议
    • 拉取多封邮件
    • 任务5:实现 IMAP 服务(数据库版本)
    • IMAP实现讲解
    • IMAP 手动测试脚本
    • IMAP 认证机制
    • 主动推送
  • 74_tio-mcp-server

    • 实现 MCP Server 开发指南
  • 75_tio-sip

    • SIP Server 第一版原理说明
    • SIP Server 第一版实战
    • 使用livekit-sip进行测试
    • SIP Server 第二版实战
  • 76_manim

    • Teach me anything - 基于大语言的知识点讲解视频生成系统
    • Manim 开发环境搭建
    • 生成场景提示词
    • 生成代码
    • 完整脚本示例
    • TTS服务端
    • 废弃
    • 废弃
    • 废弃
    • 使用 SSE 流式传输生成进度的实现文档
    • 整合全流程完整文档
    • HLS 动态推流技术文档
    • manim 分场景生成代码
    • 分场景运行代码及流式播放支持
    • 分场景业务端完整实现流程
    • Maiim布局管理器
    • 仅仅生成场景代码
    • 使用 modal 运行 manim 代码
    • Python 使用 Modal GPU 加速渲染
    • Modal 平台 GPU 环境下运行 Manim
    • Modal Manim OpenGL 安装与使用
    • 优化 GPU 加速
    • 生成视频封面流程
    • Java 调用 manim 命令 执行代码 生成封面
    • Manim 图像生成服务客户端文档
    • manim render help
    • 显示 中文公式
    • ManimGL(manimgl)
    • Manim 实战入门:用代码创造数学动画
    • 欢迎
  • 80_性能测试

    • 压力测试 - tio-http-serer
    • 压力测试 - tio-boot
    • 压力测试 - tio-boot-native
    • 压力测试 - netty-boot
    • 性能测试对比
    • TechEmpower FrameworkBenchmarks
    • 压力测试 - tio-boot 12 C 32G
    • HTTP/1.1 Pipelining 性能测试报告
    • tio-boot vs Quarkus 性能对比测试报告
  • 81_tio-boot

    • 简介
    • Swagger 整合到 Tio-Boot 中的指南
    • 待定
    • 待定
    • 高性能网络编程中的 ByteBuffer 分配与回收策略
    • TioBootServerHandler 源码解析
  • 99_案例

    • 封装 IP 查询服务
    • tio-boot 案例 - 全局异常捕获与企业微信群通知
    • tio-boot 案例 - 文件上传和下载
    • tio-boot 案例 - 整合 ant design pro 增删改查
    • tio-boot 案例 - 流失响应
    • tio-boot 案例 - 增强检索
    • tio-boot 案例 - 整合 function call
    • tio-boot 案例 - 定时任务 监控 PostgreSQL、Redis 和 Elasticsearch
    • Tio-Boot 案例:使用 SQLite 整合到登录注册系统
    • tio-boot 案例 - 执行 shell 命令

SIP Server 第二版实战

可以这么描述。

背景

当前项目基于 tio-core 自研了一个轻量级 SIP Server,用来承接语音呼叫接入,并为后续语音机器人、ASR/TTS、LLM 实时对话打基础。

第一版已经完成了最小链路验证,核心能力包括:

  • 监听 SIP TCP 5060 和 SIP UDP 5060
  • 收到 INVITE 后分配动态 RTP UDP 端口
  • 返回 200 OK + SDP
  • 接收对端 RTP 音频
  • 通过 RTP 回声方式完成通话闭环验证

也就是说,第一版已经证明了两件事:

  • tio-core 可以承载 SIP 信令收发
  • Java 侧可以完成动态 RTP 端口分配和媒体链路建立

但第一版的实现更偏“最小可跑通”,还存在几个明显不足:

  • SIP 报文解析偏粗糙,TCP 粘包拆包能力不够扎实
  • INVITE 的 SDP 应答还是固定写死,不是真正的 offer/answer 协商
  • 会话管理较弱,缺少 ACK 超时、异常中断、资源释放等生命周期控制
  • RTP 处理仍是“原包回显”,还不是后续可接 AI 音频处理链的媒体架构

因此,需要推进第二版,把系统从“能验证链路的 demo”升级成“具备持续演进能力的 SIP/RTP 服务骨架”。


第二版实现目标

SIP Server 第二版的目标,不再只是“拨通后能听到回声”,而是把协议层、会话层、媒体层的基础能力补扎实,为后续真正接入语音机器人场景做准备。

具体目标有四个方向。

1. 把 SIP 解析和 TCP 解码做扎实

第二版需要把 SIP 处理从字符串拼接和简单 split,升级为更规范的协议解析能力,包括:

  • 基于 \r\n\r\n + Content-Length 的 TCP 流式切包
  • 正确处理 SIP 请求行、响应行、Header、Body
  • 支持 Content-Length 与 compact header 的解析
  • 为后续多方法、更多 SIP 头、事务扩展打基础

目标是让 SIP Server 在 TCP/UDP 两种传输下都具备稳定的信令解析能力,而不只是满足当前测试报文。


2. 建立真正的会话生命周期管理

第二版需要从“收到包就回”升级为“有状态的呼叫会话管理”,至少要覆盖:

  • 按 Call-ID 维护呼叫会话
  • INVITE 建会话
  • ACK 标记已建立
  • BYE 结束会话
  • 已发 200 OK 但长时间未收到 ACK 时自动回收
  • 异常结束时释放 RTP 资源和端口

目标是避免会话泄漏、RTP 端口泄漏、长时间挂死等问题,使系统具备连续拨测和长期运行能力。


3. 把 SDP 从固定回包升级为真正协商

第二版需要真正解析对端 INVITE 里的 SDP offer,并根据本端支持能力生成合法 answer,而不是固定返回单一模板。

主要包括:

  • 解析远端 RTP IP、RTP 端口
  • 解析 m=audio 中的 payload type 列表
  • 识别 PCMU/8000、PCMA/8000
  • 识别 telephone-event
  • 根据双方共同支持的 codec 选择最终音频编解码
  • 若没有共同 codec,则返回 488 Not Acceptable Here

目标是让 SIP Server 从“写死一个能跑通的 SDP”变成“真正具备 SIP/SDP 协商能力的媒体接入端”。


4. 为后续 RTP 媒体处理链升级打基础

虽然第二版媒体层还没有完全从“原包回显”升级到“解码-处理-重组包”,但会先把上层会话和协商信息补齐,包括:

  • 记录远端 RTP 地址和端口
  • 记录本次通话选中的 codec
  • 记录 ptime、telephone-event 等能力
  • 将这些信息挂到 CallSession 中

这样第三版就可以进一步实现:

  • RTP header 解析
  • PCMU/PCMA 解码
  • PCM 音频帧处理
  • 再编码和 RTP 重组
  • 接入 ASR / TTS / Gemini 等音频处理流程

目标是让当前 SIP Server 不只是一个“回声 demo”,而是未来 AI 语音交互服务的信令接入层和媒体入口。


第二版的落地结果

经过第二版改造,系统达到的阶段性结果可以概括为:

  • 信令层:SIP TCP/UDP 接入结构更清晰,解析与编码职责分离
  • 协议层:支持更规范的 SIP 报文解析和 SDP offer/answer 协商
  • 会话层:具备基本的呼叫状态维护、ACK 超时回收、BYE 资源释放能力
  • 媒体层:为后续 RTP 解码与 AI 音频处理保留了清晰扩展点

一、先补 CallSession 字段

第二阶段要做 SDP 协商,所以你先给 CallSession 增加这些字段。

package com.litongjava.sip.model;

import com.litongjava.sip.rtp.RtpUdpServer;
import com.litongjava.sip.sdp.CodecSpec;

public class CallSession {

  private String callId;
  private String fromTag;
  private String toTag;

  private String transport; // TCP / UDP

  private String remoteSipIp;
  private int remoteSipPort;

  private String remoteRtpIp;
  private int remoteRtpPort;
  private int localRtpPort;

  private long createdTime;
  private long updatedTime;
  private long ackDeadline;

  private boolean ackReceived;
  private boolean terminated;

  private String last200Ok;

  private RtpUdpServer rtpServer;

  // 第二阶段新增
  private CodecSpec selectedCodec;
  private boolean telephoneEventSupported;
  private int remoteTelephoneEventPayloadType = -1;
  private int ptime = 20;

  public String getCallId() {
    return callId;
  }

  public void setCallId(String callId) {
    this.callId = callId;
  }

  public String getFromTag() {
    return fromTag;
  }

  public void setFromTag(String fromTag) {
    this.fromTag = fromTag;
  }

  public String getToTag() {
    return toTag;
  }

  public void setToTag(String toTag) {
    this.toTag = toTag;
  }

  public String getTransport() {
    return transport;
  }

  public void setTransport(String transport) {
    this.transport = transport;
  }

  public String getRemoteSipIp() {
    return remoteSipIp;
  }

  public void setRemoteSipIp(String remoteSipIp) {
    this.remoteSipIp = remoteSipIp;
  }

  public int getRemoteSipPort() {
    return remoteSipPort;
  }

  public void setRemoteSipPort(int remoteSipPort) {
    this.remoteSipPort = remoteSipPort;
  }

  public String getRemoteRtpIp() {
    return remoteRtpIp;
  }

  public void setRemoteRtpIp(String remoteRtpIp) {
    this.remoteRtpIp = remoteRtpIp;
  }

  public int getRemoteRtpPort() {
    return remoteRtpPort;
  }

  public void setRemoteRtpPort(int remoteRtpPort) {
    this.remoteRtpPort = remoteRtpPort;
  }

  public int getLocalRtpPort() {
    return localRtpPort;
  }

  public void setLocalRtpPort(int localRtpPort) {
    this.localRtpPort = localRtpPort;
  }

  public long getCreatedTime() {
    return createdTime;
  }

  public void setCreatedTime(long createdTime) {
    this.createdTime = createdTime;
  }

  public long getUpdatedTime() {
    return updatedTime;
  }

  public void setUpdatedTime(long updatedTime) {
    this.updatedTime = updatedTime;
  }

  public long getAckDeadline() {
    return ackDeadline;
  }

  public void setAckDeadline(long ackDeadline) {
    this.ackDeadline = ackDeadline;
  }

  public boolean isAckReceived() {
    return ackReceived;
  }

  public void setAckReceived(boolean ackReceived) {
    this.ackReceived = ackReceived;
  }

  public boolean isTerminated() {
    return terminated;
  }

  public void setTerminated(boolean terminated) {
    this.terminated = terminated;
  }

  public String getLast200Ok() {
    return last200Ok;
  }

  public void setLast200Ok(String last200Ok) {
    this.last200Ok = last200Ok;
  }

  public RtpUdpServer getRtpServer() {
    return rtpServer;
  }

  public void setRtpServer(RtpUdpServer rtpServer) {
    this.rtpServer = rtpServer;
  }

  public CodecSpec getSelectedCodec() {
    return selectedCodec;
  }

  public void setSelectedCodec(CodecSpec selectedCodec) {
    this.selectedCodec = selectedCodec;
  }

  public boolean isTelephoneEventSupported() {
    return telephoneEventSupported;
  }

  public void setTelephoneEventSupported(boolean telephoneEventSupported) {
    this.telephoneEventSupported = telephoneEventSupported;
  }

  public int getRemoteTelephoneEventPayloadType() {
    return remoteTelephoneEventPayloadType;
  }

  public void setRemoteTelephoneEventPayloadType(int remoteTelephoneEventPayloadType) {
    this.remoteTelephoneEventPayloadType = remoteTelephoneEventPayloadType;
  }

  public int getPtime() {
    return ptime;
  }

  public void setPtime(int ptime) {
    this.ptime = ptime;
  }
}

二、CodecSpec

这个类表示一个 codec。

package com.litongjava.sip.sdp;

public class CodecSpec {

  private final int payloadType;
  private final String codecName;
  private final int clockRate;

  public CodecSpec(int payloadType, String codecName, int clockRate) {
    this.payloadType = payloadType;
    this.codecName = codecName;
    this.clockRate = clockRate;
  }

  public int getPayloadType() {
    return payloadType;
  }

  public String getCodecName() {
    return codecName;
  }

  public int getClockRate() {
    return clockRate;
  }

  public boolean isSameCodec(String codecName, int clockRate) {
    if (codecName == null) {
      return false;
    }
    return this.codecName.equalsIgnoreCase(codecName) && this.clockRate == clockRate;
  }

  public boolean isStaticPcmu() {
    return payloadType == 0 && "PCMU".equalsIgnoreCase(codecName) && clockRate == 8000;
  }

  public boolean isStaticPcma() {
    return payloadType == 8 && "PCMA".equalsIgnoreCase(codecName) && clockRate == 8000;
  }

  @Override
  public String toString() {
    return "CodecSpec{" +
        "payloadType=" + payloadType +
        ", codecName='" + codecName + '\'' +
        ", clockRate=" + clockRate +
        '}';
  }
}

三、SdpNegotiationResult

协商结果。

package com.litongjava.sip.sdp;

public class SdpNegotiationResult {

  private boolean success;
  private String failureReason;

  private String remoteRtpIp;
  private int remoteRtpPort;

  private CodecSpec selectedCodec;

  private boolean telephoneEventSupported;
  private int remoteTelephoneEventPayloadType = -1;

  private int ptime = 20;

  public boolean isSuccess() {
    return success;
  }

  public void setSuccess(boolean success) {
    this.success = success;
  }

  public String getFailureReason() {
    return failureReason;
  }

  public void setFailureReason(String failureReason) {
    this.failureReason = failureReason;
  }

  public String getRemoteRtpIp() {
    return remoteRtpIp;
  }

  public void setRemoteRtpIp(String remoteRtpIp) {
    this.remoteRtpIp = remoteRtpIp;
  }

  public int getRemoteRtpPort() {
    return remoteRtpPort;
  }

  public void setRemoteRtpPort(int remoteRtpPort) {
    this.remoteRtpPort = remoteRtpPort;
  }

  public CodecSpec getSelectedCodec() {
    return selectedCodec;
  }

  public void setSelectedCodec(CodecSpec selectedCodec) {
    this.selectedCodec = selectedCodec;
  }

  public boolean isTelephoneEventSupported() {
    return telephoneEventSupported;
  }

  public void setTelephoneEventSupported(boolean telephoneEventSupported) {
    this.telephoneEventSupported = telephoneEventSupported;
  }

  public int getRemoteTelephoneEventPayloadType() {
    return remoteTelephoneEventPayloadType;
  }

  public void setRemoteTelephoneEventPayloadType(int remoteTelephoneEventPayloadType) {
    this.remoteTelephoneEventPayloadType = remoteTelephoneEventPayloadType;
  }

  public int getPtime() {
    return ptime;
  }

  public void setPtime(int ptime) {
    this.ptime = ptime;
  }

  public static SdpNegotiationResult fail(String reason) {
    SdpNegotiationResult r = new SdpNegotiationResult();
    r.setSuccess(false);
    r.setFailureReason(reason);
    return r;
  }

  public static SdpNegotiationResult ok() {
    SdpNegotiationResult r = new SdpNegotiationResult();
    r.setSuccess(true);
    return r;
  }
}

四、SdpParser

这个类做两件事:

  • 解析 INVITE 里的 SDP offer
  • 选出双方都支持的 codec

先支持:

  • PCMU/8000
  • PCMA/8000
  • telephone-event/8000
  • ptime
  • 只处理 audio
package com.litongjava.sip.sdp;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SdpParser {

  private final List<CodecSpec> localSupportedCodecs;

  public SdpParser() {
    this.localSupportedCodecs = defaultSupportedCodecs();
  }

  public SdpParser(List<CodecSpec> localSupportedCodecs) {
    this.localSupportedCodecs = localSupportedCodecs;
  }

  public SdpNegotiationResult negotiate(byte[] sdpBytes) {
    if (sdpBytes == null || sdpBytes.length == 0) {
      return SdpNegotiationResult.fail("missing sdp offer");
    }

    String sdp = new String(sdpBytes, StandardCharsets.US_ASCII);
    String[] lines = sdp.split("\r\n");

    String sessionConnectionIp = null;
    String mediaConnectionIp = null;
    int remoteAudioPort = 0;
    int ptime = 20;

    List<Integer> offeredPayloadTypes = new ArrayList<>();
    Map<Integer, CodecSpec> offeredCodecMap = new HashMap<>();

    boolean inAudioMedia = false;
    boolean telephoneEventSupported = false;
    int telephoneEventPt = -1;

    for (String line : lines) {
      if (line == null || line.isEmpty()) {
        continue;
      }

      if (line.startsWith("c=")) {
        String[] parts = line.split(" ");
        if (parts.length >= 3) {
          String ip = parts[2].trim();
          if (inAudioMedia) {
            mediaConnectionIp = ip;
          } else {
            sessionConnectionIp = ip;
          }
        }
        continue;
      }

      if (line.startsWith("m=")) {
        inAudioMedia = false;

        String[] parts = line.split(" ");
        if (parts.length >= 4 && parts[0].startsWith("m=audio")) {
          inAudioMedia = true;
          try {
            remoteAudioPort = Integer.parseInt(parts[1].trim());
          } catch (Exception e) {
            return SdpNegotiationResult.fail("invalid remote audio port");
          }

          for (int i = 3; i < parts.length; i++) {
            try {
              offeredPayloadTypes.add(Integer.parseInt(parts[i].trim()));
            } catch (Exception ignore) {
            }
          }
        }
        continue;
      }

      if (!inAudioMedia) {
        continue;
      }

      if (line.startsWith("a=rtpmap:")) {
        // a=rtpmap:0 PCMU/8000
        // a=rtpmap:101 telephone-event/8000
        try {
          int colon = line.indexOf(':');
          int space = line.indexOf(' ');
          if (colon < 0 || space < 0 || space <= colon) {
            continue;
          }

          int pt = Integer.parseInt(line.substring(colon + 1, space).trim());
          String[] enc = line.substring(space + 1).trim().split("/");
          if (enc.length < 2) {
            continue;
          }

          String codecName = enc[0].trim();
          int clockRate = Integer.parseInt(enc[1].trim());

          CodecSpec spec = new CodecSpec(pt, codecName, clockRate);
          offeredCodecMap.put(pt, spec);

          if ("telephone-event".equalsIgnoreCase(codecName) && clockRate == 8000) {
            telephoneEventSupported = true;
            telephoneEventPt = pt;
          }
        } catch (Exception ignore) {
        }
        continue;
      }

      if (line.startsWith("a=ptime:")) {
        try {
          ptime = Integer.parseInt(line.substring("a=ptime:".length()).trim());
        } catch (Exception ignore) {
        }
      }
    }

    if (remoteAudioPort <= 0) {
      return SdpNegotiationResult.fail("missing audio media");
    }

    String remoteIp = mediaConnectionIp != null ? mediaConnectionIp : sessionConnectionIp;
    if (remoteIp == null || remoteIp.isEmpty()) {
      return SdpNegotiationResult.fail("missing connection address");
    }

    CodecSpec selected = chooseCodec(offeredPayloadTypes, offeredCodecMap);
    if (selected == null) {
      return SdpNegotiationResult.fail("no supported audio codec");
    }

    SdpNegotiationResult result = SdpNegotiationResult.ok();
    result.setRemoteRtpIp(remoteIp);
    result.setRemoteRtpPort(remoteAudioPort);
    result.setSelectedCodec(selected);
    result.setTelephoneEventSupported(telephoneEventSupported);
    result.setRemoteTelephoneEventPayloadType(telephoneEventPt);
    result.setPtime(ptime);

    return result;
  }

  private CodecSpec chooseCodec(List<Integer> offeredPayloadTypes, Map<Integer, CodecSpec> offeredCodecMap) {
    // 优先顺序按本地支持列表
    for (CodecSpec local : localSupportedCodecs) {
      for (Integer pt : offeredPayloadTypes) {
        CodecSpec offered = offeredCodecMap.get(pt);

        if (offered != null) {
          if (local.isSameCodec(offered.getCodecName(), offered.getClockRate())) {
            return new CodecSpec(pt, offered.getCodecName(), offered.getClockRate());
          }
        } else {
          // 静态 payload type 可能没有 rtpmap,也要能识别
          if (pt == 0 && local.isStaticPcmu()) {
            return new CodecSpec(0, "PCMU", 8000);
          }
          if (pt == 8 && local.isStaticPcma()) {
            return new CodecSpec(8, "PCMA", 8000);
          }
        }
      }
    }
    return null;
  }

  public static List<CodecSpec> defaultSupportedCodecs() {
    List<CodecSpec> codecs = new ArrayList<>();
    codecs.add(new CodecSpec(0, "PCMU", 8000));
    codecs.add(new CodecSpec(8, "PCMA", 8000));
    return codecs;
  }
}

五、SdpAnswerBuilder

这个类按协商结果生成 200 OK 里的 SDP answer。

package com.litongjava.sip.sdp;

public class SdpAnswerBuilder {

  public String buildAnswer(String localIp, int localRtpPort, SdpNegotiationResult result) {
    CodecSpec codec = result.getSelectedCodec();
    int ptime = result.getPtime() > 0 ? result.getPtime() : 20;

    StringBuilder sb = new StringBuilder();
    sb.append("v=0\r\n");
    sb.append("o=- 1 1 IN IP4 ").append(localIp).append("\r\n");
    sb.append("s=JavaSip\r\n");
    sb.append("c=IN IP4 ").append(localIp).append("\r\n");
    sb.append("t=0 0\r\n");

    sb.append("m=audio ")
        .append(localRtpPort)
        .append(" RTP/AVP ")
        .append(codec.getPayloadType());

    if (result.isTelephoneEventSupported()) {
      sb.append(" 101");
    }
    sb.append("\r\n");

    sb.append("a=rtpmap:")
        .append(codec.getPayloadType())
        .append(" ")
        .append(codec.getCodecName())
        .append("/")
        .append(codec.getClockRate())
        .append("\r\n");

    if (result.isTelephoneEventSupported()) {
      sb.append("a=rtpmap:101 telephone-event/8000\r\n");
      sb.append("a=fmtp:101 0-15\r\n");
    }

    sb.append("a=ptime:").append(ptime).append("\r\n");
    sb.append("a=sendrecv\r\n");

    return sb.toString();
  }
}

说明一下: 这里 answer 里把 telephone-event 固定写成 101,这是常见做法。第一阶段够用。后面如果你想更严谨,可以把本地 DTMF PT 也做成配置。


六、CallSessionReaper

这个类负责定时清理:

  • 已发 200 但 ACK 超时
  • 已 terminate 的会话
  • 可选地清理长时间空闲会话

先做最小版本。

package com.litongjava.sip.server.session;

import java.util.Map;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.RtpServerManager;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CallSessionReaper implements Runnable {

  private final CallSessionManager sessionManager;
  private final RtpServerManager rtpServerManager;

  // 已发 200 OK 但未 ACK,超时回收
  private final long ackTimeoutMs;

  // 已 terminated 的会话多留一小会儿再删
  private final long terminatedRetentionMs;

  public CallSessionReaper(CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
    this(sessionManager, rtpServerManager, 32000L, 5000L);
  }

  public CallSessionReaper(CallSessionManager sessionManager, RtpServerManager rtpServerManager,
      long ackTimeoutMs, long terminatedRetentionMs) {
    this.sessionManager = sessionManager;
    this.rtpServerManager = rtpServerManager;
    this.ackTimeoutMs = ackTimeoutMs;
    this.terminatedRetentionMs = terminatedRetentionMs;
  }

  @Override
  public void run() {
    long now = System.currentTimeMillis();
    Map<String, CallSession> snapshot = sessionManager.snapshot();

    for (Map.Entry<String, CallSession> entry : snapshot.entrySet()) {
      String callId = entry.getKey();
      CallSession session = entry.getValue();
      if (session == null) {
        continue;
      }

      try {
        reapAckTimeout(callId, session, now);
        reapTerminated(callId, session, now);
      } catch (Exception e) {
        log.error("reap session error, callId={}", callId, e);
      }
    }
  }

  private void reapAckTimeout(String callId, CallSession session, long now) {
    if (session.isTerminated()) {
      return;
    }

    if (session.isAckReceived()) {
      return;
    }

    long deadline = session.getAckDeadline();
    if (deadline <= 0) {
      return;
    }

    // 支持沿用传入 session 的 deadline,也支持兜底 createdTime + ackTimeoutMs
    long actualDeadline = deadline > 0 ? deadline : session.getCreatedTime() + ackTimeoutMs;
    if (now < actualDeadline) {
      return;
    }

    log.info("ACK timeout, release callId={}, localRtpPort={}", callId, session.getLocalRtpPort());
    rtpServerManager.stopAndRelease(session);
    session.setTerminated(true);
    session.setUpdatedTime(now);
  }

  private void reapTerminated(String callId, CallSession session, long now) {
    if (!session.isTerminated()) {
      return;
    }

    long base = session.getUpdatedTime() > 0 ? session.getUpdatedTime() : session.getCreatedTime();
    if (now - base < terminatedRetentionMs) {
      return;
    }

    sessionManager.remove(callId);
    log.info("session removed, callId={}", callId);
  }
}

七、CallSessionManager 小补丁

为了配合 CallSessionReaper,你可以把第一阶段的 terminate() 调整一下,避免重复 stop 引发混乱。 建议改成下面这样。

package com.litongjava.sip.server.session;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.litongjava.sip.model.CallSession;

public class CallSessionManager {

  private final Map<String, CallSession> sessions = new ConcurrentHashMap<>();

  public CallSession getByCallId(String callId) {
    if (callId == null) {
      return null;
    }
    return sessions.get(callId);
  }

  public CallSession createOrUpdate(CallSession session) {
    if (session == null || session.getCallId() == null) {
      throw new IllegalArgumentException("call session or callId is null");
    }
    session.setUpdatedTime(System.currentTimeMillis());
    sessions.put(session.getCallId(), session);
    return session;
  }

  public void markAckReceived(String callId) {
    CallSession session = sessions.get(callId);
    if (session != null) {
      session.setAckReceived(true);
      session.setUpdatedTime(System.currentTimeMillis());
    }
  }

  public void markTerminated(String callId) {
    CallSession session = sessions.get(callId);
    if (session != null) {
      session.setTerminated(true);
      session.setUpdatedTime(System.currentTimeMillis());
    }
  }

  public void remove(String callId) {
    sessions.remove(callId);
  }

  public Map<String, CallSession> snapshot() {
    return Map.copyOf(sessions);
  }
}

八、升级 SipInviteOnlyTcpHandler

关键变化:

  • 新增 SdpParser
  • 新增 SdpAnswerBuilder
  • INVITE 先解析 offer
  • 不支持 codec 时返回 488 Not Acceptable Here
  • 成功协商后把 codec、RTP 地址、DTMF 能力写到 CallSession

下面是完整 TCP 版。

package com.litongjava.sip.server.handler;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import com.litongjava.aio.ByteBufferPacket;
import com.litongjava.aio.Packet;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
import com.litongjava.sip.parser.SipMessageEncoder;
import com.litongjava.sip.parser.SipMessageParser;
import com.litongjava.sip.parser.SipTcpFrameDecoder;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.sdp.SdpAnswerBuilder;
import com.litongjava.sip.sdp.SdpNegotiationResult;
import com.litongjava.sip.sdp.SdpParser;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.core.TioConfig;
import com.litongjava.tio.server.intf.ServerAioHandler;

public class SipInviteOnlyTcpHandler implements ServerAioHandler {

  private final String localIp;
  private final SipTcpFrameDecoder frameDecoder = new SipTcpFrameDecoder();
  private final SipMessageParser messageParser = new SipMessageParser();
  private final SipMessageEncoder messageEncoder = new SipMessageEncoder();
  private final CallSessionManager sessionManager;
  private final RtpServerManager rtpServerManager;

  private final SdpParser sdpParser = new SdpParser();
  private final SdpAnswerBuilder sdpAnswerBuilder = new SdpAnswerBuilder();

  public SipInviteOnlyTcpHandler(String localIp) {
    this(localIp, new CallSessionManager(), new RtpServerManager(localIp));
  }

  public SipInviteOnlyTcpHandler(String localIp, CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
    this.localIp = localIp;
    this.sessionManager = sessionManager;
    this.rtpServerManager = rtpServerManager;
  }

  @Override
  public Packet decode(ByteBuffer buffer, int limit, int position, int readableLength, ChannelContext ctx)
      throws Exception {
    byte[] frame = frameDecoder.decode(buffer, readableLength, ctx);
    if (frame == null) {
      return null;
    }
    return new ByteBufferPacket(ByteBuffer.wrap(frame));
  }

  @Override
  public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext ctx) {
    ByteBufferPacket p = (ByteBufferPacket) packet;
    ByteBuffer bb = p.getByteBuffer();
    if (bb.position() != 0) {
      bb.rewind();
    }
    return bb;
  }

  @Override
  public void handler(Packet packet, ChannelContext ctx) throws Exception {
    ByteBufferPacket p = (ByteBufferPacket) packet;
    ByteBuffer bb = p.getByteBuffer();

    byte[] bytes = new byte[bb.remaining()];
    bb.get(bytes);

    SipMessage msg = messageParser.parse(bytes);
    if (!(msg instanceof SipRequest)) {
      return;
    }

    SipRequest req = (SipRequest) msg;
    String method = req.getMethod();

    if ("INVITE".equalsIgnoreCase(method)) {
      handleInvite(req, ctx);
      return;
    }

    if ("ACK".equalsIgnoreCase(method)) {
      handleAck(req);
      return;
    }

    if ("BYE".equalsIgnoreCase(method)) {
      handleBye(req, ctx);
      return;
    }

    SipResponse resp = buildSimpleResponse(req, 200, "OK", null);
    send(ctx, resp);
  }

  private void handleInvite(SipRequest req, ChannelContext ctx) throws Exception {
    String callId = req.getHeader("Call-ID");
    CallSession exist = sessionManager.getByCallId(callId);

    if (exist != null && exist.getLast200Ok() != null) {
      sendRaw(ctx, exist.getLast200Ok());
      return;
    }

    SdpNegotiationResult negotiation = sdpParser.negotiate(req.getBody());
    if (!negotiation.isSuccess()) {
      SipResponse fail = buildSimpleResponse(req, 488, "Not Acceptable Here", null);
      send(ctx, fail);
      return;
    }

    String remoteIp = ctx.getClientNode() != null ? ctx.getClientNode().getIp() : null;
    int remotePort = ctx.getClientNode() != null ? ctx.getClientNode().getPort() : 0;

    String toTag = "java" + System.nanoTime();

    CallSession session = new CallSession();
    session.setCallId(callId);
    session.setFromTag(parseTag(req.getHeader("From")));
    session.setToTag(toTag);
    session.setTransport("TCP");
    session.setRemoteSipIp(remoteIp);
    session.setRemoteSipPort(remotePort);
    session.setCreatedTime(System.currentTimeMillis());
    session.setUpdatedTime(System.currentTimeMillis());
    session.setAckDeadline(System.currentTimeMillis() + 32000);

    session.setRemoteRtpIp(negotiation.getRemoteRtpIp());
    session.setRemoteRtpPort(negotiation.getRemoteRtpPort());
    session.setSelectedCodec(negotiation.getSelectedCodec());
    session.setTelephoneEventSupported(negotiation.isTelephoneEventSupported());
    session.setRemoteTelephoneEventPayloadType(negotiation.getRemoteTelephoneEventPayloadType());
    session.setPtime(negotiation.getPtime());

    rtpServerManager.allocateAndStart(session);

    SipResponse resp = buildInvite200Ok(req, session, negotiation);
    byte[] encoded = messageEncoder.encodeResponse(resp);
    String raw200 = new String(encoded, StandardCharsets.US_ASCII);

    session.setLast200Ok(raw200);
    sessionManager.createOrUpdate(session);

    Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(encoded)));
  }

  private void handleAck(SipRequest req) {
    String callId = req.getHeader("Call-ID");
    sessionManager.markAckReceived(callId);
  }

  private void handleBye(SipRequest req, ChannelContext ctx) throws Exception {
    String callId = req.getHeader("Call-ID");
    CallSession session = sessionManager.getByCallId(callId);

    SipResponse resp = buildSimpleResponse(req, 200, "OK", session != null ? session.getToTag() : null);
    send(ctx, resp);

    if (session != null) {
      rtpServerManager.stopAndRelease(session);
      sessionManager.markTerminated(callId);
    }
  }

  private void send(ChannelContext ctx, SipResponse response) {
    byte[] bytes = messageEncoder.encodeResponse(response);
    Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(bytes)));
  }

  private void sendRaw(ChannelContext ctx, String text) {
    byte[] bytes = text.getBytes(StandardCharsets.US_ASCII);
    Tio.send(ctx, new ByteBufferPacket(ByteBuffer.wrap(bytes)));
  }

  private SipResponse buildInvite200Ok(SipRequest req, CallSession session, SdpNegotiationResult negotiation) {
    SipResponse resp = new SipResponse();
    resp.setStatusCode(200);
    resp.setReasonPhrase("OK");

    copyIfPresent(req, resp, "Via");
    copyIfPresent(req, resp, "From");

    String to = req.getHeader("To");
    if (to != null && !to.toLowerCase().contains("tag=")) {
      to = to + ";tag=" + session.getToTag();
    }
    if (to != null) {
      resp.addHeader("To", to);
    }

    copyIfPresent(req, resp, "Call-ID");
    copyIfPresent(req, resp, "CSeq");
    resp.addHeader("Contact", "<sip:java@" + localIp + ":5060>");
    resp.addHeader("Content-Type", "application/sdp");

    String sdp = sdpAnswerBuilder.buildAnswer(localIp, session.getLocalRtpPort(), negotiation);
    resp.setBody(sdp.getBytes(StandardCharsets.US_ASCII));
    return resp;
  }

  private SipResponse buildSimpleResponse(SipRequest req, int code, String reason, String toTag) {
    SipResponse resp = new SipResponse();
    resp.setStatusCode(code);
    resp.setReasonPhrase(reason);

    copyIfPresent(req, resp, "Via");
    copyIfPresent(req, resp, "From");

    String to = req.getHeader("To");
    if (toTag != null && to != null && !to.toLowerCase().contains("tag=")) {
      to = to + ";tag=" + toTag;
    }
    if (to != null) {
      resp.addHeader("To", to);
    }

    copyIfPresent(req, resp, "Call-ID");
    copyIfPresent(req, resp, "CSeq");
    resp.setBody(new byte[0]);
    return resp;
  }

  private void copyIfPresent(SipRequest req, SipResponse resp, String headerName) {
    for (String v : req.getHeaders(headerName)) {
      resp.addHeader(headerName, v);
    }
  }

  private String parseTag(String headerValue) {
    if (headerValue == null) {
      return null;
    }

    String lower = headerValue.toLowerCase();
    int idx = lower.indexOf("tag=");
    if (idx < 0) {
      return null;
    }

    String sub = headerValue.substring(idx + 4);
    int semi = sub.indexOf(';');
    if (semi >= 0) {
      sub = sub.substring(0, semi);
    }
    return sub.trim();
  }
}

九、升级 SipInviteOnlyUdpHandler

UDP 版同理。

package com.litongjava.sip.server.handler;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;

import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.model.SipMessage;
import com.litongjava.sip.model.SipRequest;
import com.litongjava.sip.model.SipResponse;
import com.litongjava.sip.parser.SipMessageEncoder;
import com.litongjava.sip.parser.SipMessageParser;
import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.sdp.SdpAnswerBuilder;
import com.litongjava.sip.sdp.SdpNegotiationResult;
import com.litongjava.sip.sdp.SdpParser;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;

public class SipInviteOnlyUdpHandler implements UdpHandler {

  private final String localIp;
  private final SipMessageParser messageParser = new SipMessageParser();
  private final SipMessageEncoder messageEncoder = new SipMessageEncoder();
  private final CallSessionManager sessionManager;
  private final RtpServerManager rtpServerManager;

  private final SdpParser sdpParser = new SdpParser();
  private final SdpAnswerBuilder sdpAnswerBuilder = new SdpAnswerBuilder();

  public SipInviteOnlyUdpHandler(String localIp) {
    this(localIp, new CallSessionManager(), new RtpServerManager(localIp));
  }

  public SipInviteOnlyUdpHandler(String localIp, CallSessionManager sessionManager, RtpServerManager rtpServerManager) {
    this.localIp = localIp;
    this.sessionManager = sessionManager;
    this.rtpServerManager = rtpServerManager;
  }

  @Override
  public void handler(UdpPacket udpPacket, DatagramSocket socket) {
    try {
      Node remote = udpPacket.getRemote();
      byte[] data = udpPacket.getData();

      SipMessage msg = messageParser.parse(data);
      if (!(msg instanceof SipRequest)) {
        return;
      }

      SipRequest req = (SipRequest) msg;
      String method = req.getMethod();

      if ("INVITE".equalsIgnoreCase(method)) {
        handleInvite(req, remote, socket);
        return;
      }

      if ("ACK".equalsIgnoreCase(method)) {
        handleAck(req);
        return;
      }

      if ("BYE".equalsIgnoreCase(method)) {
        handleBye(req, remote, socket);
        return;
      }

      SipResponse resp = buildSimpleResponse(req, 200, "OK", null);
      send(socket, remote, resp);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  private void handleInvite(SipRequest req, Node remote, DatagramSocket socket) throws Exception {
    String callId = req.getHeader("Call-ID");
    CallSession exist = sessionManager.getByCallId(callId);

    if (exist != null && exist.getLast200Ok() != null) {
      sendRaw(socket, remote, exist.getLast200Ok());
      return;
    }

    SdpNegotiationResult negotiation = sdpParser.negotiate(req.getBody());
    if (!negotiation.isSuccess()) {
      SipResponse fail = buildSimpleResponse(req, 488, "Not Acceptable Here", null);
      send(socket, remote, fail);
      return;
    }

    String toTag = "java" + System.nanoTime();

    CallSession session = new CallSession();
    session.setCallId(callId);
    session.setFromTag(parseTag(req.getHeader("From")));
    session.setToTag(toTag);
    session.setTransport("UDP");
    session.setRemoteSipIp(remote.getIp());
    session.setRemoteSipPort(remote.getPort());
    session.setCreatedTime(System.currentTimeMillis());
    session.setUpdatedTime(System.currentTimeMillis());
    session.setAckDeadline(System.currentTimeMillis() + 32000);

    session.setRemoteRtpIp(negotiation.getRemoteRtpIp());
    session.setRemoteRtpPort(negotiation.getRemoteRtpPort());
    session.setSelectedCodec(negotiation.getSelectedCodec());
    session.setTelephoneEventSupported(negotiation.isTelephoneEventSupported());
    session.setRemoteTelephoneEventPayloadType(negotiation.getRemoteTelephoneEventPayloadType());
    session.setPtime(negotiation.getPtime());

    rtpServerManager.allocateAndStart(session);

    SipResponse trying = buildSimpleResponse(req, 100, "Trying", null);
    send(socket, remote, trying);

    SipResponse ok = buildInvite200Ok(req, session, negotiation);
    byte[] encoded = messageEncoder.encodeResponse(ok);
    String raw200 = new String(encoded, StandardCharsets.US_ASCII);

    session.setLast200Ok(raw200);
    sessionManager.createOrUpdate(session);

    sendBytes(socket, remote, encoded);
  }

  private void handleAck(SipRequest req) {
    String callId = req.getHeader("Call-ID");
    sessionManager.markAckReceived(callId);
  }

  private void handleBye(SipRequest req, Node remote, DatagramSocket socket) throws Exception {
    String callId = req.getHeader("Call-ID");
    CallSession session = sessionManager.getByCallId(callId);

    SipResponse resp = buildSimpleResponse(req, 200, "OK", session != null ? session.getToTag() : null);
    send(socket, remote, resp);

    if (session != null) {
      rtpServerManager.stopAndRelease(session);
      sessionManager.markTerminated(callId);
    }
  }

  private void send(DatagramSocket socket, Node remote, SipResponse response) throws Exception {
    byte[] bytes = messageEncoder.encodeResponse(response);
    sendBytes(socket, remote, bytes);
  }

  private void sendRaw(DatagramSocket socket, Node remote, String text) throws Exception {
    byte[] bytes = text.getBytes(StandardCharsets.US_ASCII);
    sendBytes(socket, remote, bytes);
  }

  private void sendBytes(DatagramSocket socket, Node remote, byte[] bytes) throws Exception {
    DatagramPacket packet = new DatagramPacket(
        bytes, bytes.length, new InetSocketAddress(remote.getIp(), remote.getPort()));
    socket.send(packet);
  }

  private SipResponse buildInvite200Ok(SipRequest req, CallSession session, SdpNegotiationResult negotiation) {
    SipResponse resp = new SipResponse();
    resp.setStatusCode(200);
    resp.setReasonPhrase("OK");

    copyIfPresent(req, resp, "Via");
    copyIfPresent(req, resp, "From");

    String to = req.getHeader("To");
    if (to != null && !to.toLowerCase().contains("tag=")) {
      to = to + ";tag=" + session.getToTag();
    }
    if (to != null) {
      resp.addHeader("To", to);
    }

    copyIfPresent(req, resp, "Call-ID");
    copyIfPresent(req, resp, "CSeq");
    resp.addHeader("Contact", "<sip:java@" + localIp + ":5060>");
    resp.addHeader("Content-Type", "application/sdp");

    String sdp = sdpAnswerBuilder.buildAnswer(localIp, session.getLocalRtpPort(), negotiation);
    resp.setBody(sdp.getBytes(StandardCharsets.US_ASCII));
    return resp;
  }

  private SipResponse buildSimpleResponse(SipRequest req, int code, String reason, String toTag) {
    SipResponse resp = new SipResponse();
    resp.setStatusCode(code);
    resp.setReasonPhrase(reason);

    copyIfPresent(req, resp, "Via");
    copyIfPresent(req, resp, "From");

    String to = req.getHeader("To");
    if (toTag != null && to != null && !to.toLowerCase().contains("tag=")) {
      to = to + ";tag=" + toTag;
    }
    if (to != null) {
      resp.addHeader("To", to);
    }

    copyIfPresent(req, resp, "Call-ID");
    copyIfPresent(req, resp, "CSeq");
    resp.setBody(new byte[0]);
    return resp;
  }

  private void copyIfPresent(SipRequest req, SipResponse resp, String headerName) {
    for (String v : req.getHeaders(headerName)) {
      resp.addHeader(headerName, v);
    }
  }

  private String parseTag(String headerValue) {
    if (headerValue == null) {
      return null;
    }

    String lower = headerValue.toLowerCase();
    int idx = lower.indexOf("tag=");
    if (idx < 0) {
      return null;
    }

    String sub = headerValue.substring(idx + 4);
    int semi = sub.indexOf(';');
    if (semi >= 0) {
      sub = sub.substring(0, semi);
    }
    return sub.trim();
  }
}

十、SipServerConfig 里把 CallSessionReaper 跑起来

你需要加一个定时器。

package com.jobright.study.voice.agent.config;

import java.io.IOException;
import java.net.SocketException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.litongjava.sip.rtp.RtpServerManager;
import com.litongjava.sip.server.handler.SipInviteOnlyTcpHandler;
import com.litongjava.sip.server.handler.SipInviteOnlyUdpHandler;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.sip.server.session.CallSessionReaper;
import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;
import com.litongjava.tio.server.ServerTioConfig;
import com.litongjava.tio.server.TioServer;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SipServerConfig {

  private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

  public void config() {
    String localIp = "192.168.3.219";

    CallSessionManager sessionManager = new CallSessionManager();
    RtpServerManager rtpServerManager = new RtpServerManager(localIp);

    SipInviteOnlyTcpHandler tcpHandler = new SipInviteOnlyTcpHandler(localIp, sessionManager, rtpServerManager);

    ServerTioConfig serverTioConfig = new ServerTioConfig("sip-server");
    serverTioConfig.setServerAioHandler(tcpHandler);
    serverTioConfig.setHeartbeatTimeout(-1L);

    TioServer tioServer = new TioServer(serverTioConfig);

    int port = 5060;
    try {
      tioServer.start(null, port);
      log.info("独立 TCP 服务器已成功启动,监听端口: {}", port);
    } catch (IOException e) {
      log.error("启动 TCP 服务器失败", e);
    }

    SipInviteOnlyUdpHandler udpHandler = new SipInviteOnlyUdpHandler(localIp, sessionManager, rtpServerManager);

    UdpServerConf udpServerConf = new UdpServerConf(5060, udpHandler, 5000);

    try {
      UdpServer udpServer = new UdpServer(udpServerConf);
      udpServer.start();
      log.info("UDP 服务器已成功启动,监听端口: {}", port);
    } catch (SocketException e) {
      log.error("启动 UDP 服务器失败", e);
    }

    scheduledExecutor.scheduleAtFixedRate(new CallSessionReaper(sessionManager, rtpServerManager), 5, 5,
        TimeUnit.SECONDS);
  }
}

十一、这版已经实现了什么

这版第二阶段落完后,你得到的是:

已完成

  • INVITE 不再固定写死 PCMU
  • 会解析远端 SDP offer
  • 会在 PCMU/PCMA 里选共同支持的 codec
  • 不支持时返回 488 Not Acceptable Here
  • 会记录远端 RTP 地址、端口、选中的 codec、ptime、telephone-event 能力
  • 会清理 ACK 超时会话

还没做

  • 真正的 Call-ID + FromTag + ToTag dialog key
  • CANCEL
  • OPTIONS
  • 180 Ringing
  • Record-Route / Route
  • re-INVITE
  • RTP 真正按 codec 解码-处理-重组

Edit this page
Last Updated: 3/7/26, 10:11 AM
Contributors: litongjava
Prev
使用livekit-sip进行测试