SIP Server 第三版实战
可以这样描述第三版本。
背景
在前两个版本的基础上,SIP Server 已经逐步从“可跑通的最小 demo”演进为“具备基本协议能力的语音接入服务”。
第一版主要解决的是链路验证问题,完成了:
- SIP TCP/UDP 5060 接入
- INVITE 建链
- 动态 RTP 端口分配
- 200 OK + SDP 返回
- RTP 原包回声验证
这一版的意义是证明:基于 tio-core 自研 SIP/RTP 接入层是可行的,通话链路可以建立,媒体可以打通。
第二版则进一步把协议层和会话层补扎实,重点完成了:
- SIP TCP 流式解码与报文解析能力增强
- SIP message parser / encoder 独立封装
CallSession生命周期管理- ACK 超时回收
- SDP offer/answer 协商
- 从固定 PCMU 回包升级为按协商结果返回合法 answer
- 能根据远端 SDP 选择
PCMU/PCMA
到了这个阶段,系统已经不再只是“能拨通”的 demo,而是具备了一个 SIP Server 的基本骨架。
但第二版的 RTP 处理虽然已经有会话信息、codec 选择和 SDP 协商基础,媒体侧本质上仍然偏“简单验证链路”:
- RTP 处理仍然不是完整的媒体管线
- 还没有真正按 codec 解码成音频帧后处理
- 也没有为后续 AI 语音处理预留统一的音频处理接口
这就引出了第三版的核心问题:
如果后面要接 ASR、TTS、LLM、Gemini 之类的实时语音能力,媒体层不能再停留在“原包回显”,而必须升级成“音频帧处理架构”。
所以第三版的背景,本质上是: 在 SIP 和 SDP 基础已经基本稳定之后,开始把 RTP 层从“网络包级处理”升级为“媒体帧级处理”,为后续真实语音 AI 场景做准备。
第三版本实现目标
第三版本的目标,是把 RTP 从“原包 echo”升级成“音频帧 echo”,建立一条真正可扩展的媒体处理链。
可以概括为四个重点方向。
1. 建立 RTP 包级解析与重组能力
第三版首先要补齐 RTP 协议层的最小媒体能力,不再把 UDP 数据包当成黑盒直接原样发回,而是要做到:
- 解析 RTP header
- 识别 version、payload type、sequence number、timestamp、ssrc
- 提取真实 audio payload
- 在处理完成后重新生成新的 RTP 包并发送
目标是让系统具备真正意义上的 RTP 收发能力,而不是简单的 UDP 回包能力。
2. 建立 codec 解码与编码能力
由于 SIP/SDP 协商出来的 codec 可能是 PCMU 或 PCMA,第三版需要真正把媒体负载从 G.711 转成统一音频格式,再在输出前编码回去。
因此第三版要实现:
PCMU -> PCM16PCMA -> PCM16PCM16 -> PCMUPCM16 -> PCMA
目标是把网络层的音频 payload 变成可处理的 PCM 音频样本,为后续音频算法和 AI 模型接入铺平道路。
3. 建立统一的音频帧处理模型
第三版最关键的设计变化,是引入统一的音频帧对象和处理接口,例如:
AudioFrameMediaProcessor
这样 RTP handler 不再直接做“收包就原样回发”,而是走一条明确的媒体处理链:
- RTP packet -> codec decode ->
AudioFrame AudioFrame->MediaProcessorMediaProcessor输出新的AudioFrame- 新音频帧 -> codec encode -> RTP packet
在第三版里,MediaProcessor 的第一个实现是 EchoMediaProcessor,它只是把输入帧原样返回。 但这个“原样返回”的层次已经从“网络包级别”上升到了“音频帧级别”。
目标是让媒体层真正从协议逻辑中解耦出来,使后续替换为:
- ASR processor
- TTS processor
- Gemini processor
- 录音、静音检测、降噪、打断控制等处理器
都不需要推翻 RTP 框架本身。
4. 为实时 AI 音频交互打基础
第三版虽然表面上还是 echo,但它的真正价值不是“再做一次回声”,而是建立了未来可接 AI 的媒体入口。
因为经过第三版之后,系统已经具备了:
- 通话级 codec 协商信息
- RTP 包解析与重组能力
- PCM 音频帧抽象
- 独立媒体处理器接口
- 本地发送序列号、时间戳、SSRC 管理
这意味着后续如果接入实时语音 AI,只需要把 EchoMediaProcessor 替换成新的媒体处理器,就可以逐步打通:
- 8k PCM -> 16k PCM 重采样
- PCM -> ASR / Gemini 输入
- LLM / TTS 输出音频
- 再编码回 RTP
目标不是停留在 echo,而是让当前系统具备向“实时双向语音机器人”演进的基础结构。
第三版本的阶段性结果
第三版完成后,系统在媒体层面的能力会有一个明显跃迁。
从能力上看,它不再只是:
- 能建 SIP 呼叫
- 能收 RTP 包
- 能原样回显 UDP 数据
而是升级为:
- 能识别和解析 RTP
- 能根据协商 codec 正确解码音频
- 能把音频转换成统一的 PCM 帧
- 能通过处理器对音频帧做处理
- 能重新编码并组装 RTP 回传
这标志着整个项目从“验证信令和媒体连通性”进入到了“构建可扩展媒体处理平台”的阶段。
一句话总结
第三版本的核心背景,是在 SIP、SDP、会话管理已经基本稳定之后,继续向媒体层深入,把 RTP 从“原包回声验证”升级为“音频帧级处理架构”,从而为后续接入 ASR、TTS、Gemini 等实时语音 AI 能力打下基础。
一、建议包结构
com.litongjava.sip.rtp
├── codec
│ ├── AudioCodec.java
│ ├── PcmuCodec.java
│ └── PcmaCodec.java
│
├── media
│ ├── AudioFrame.java
│ ├── MediaProcessor.java
│ └── EchoMediaProcessor.java
│
├── packet
│ ├── RtpPacket.java
│ ├── RtpPacketParser.java
│ └── RtpPacketWriter.java
│
├── server
│ ├── RtpUdpHandler.java
│ └── RtpUdpServer.java
二、先补 CallSession 字段
为了让 RTP handler 知道当前通话该用哪个 codec、该回给谁,建议给 CallSession 再加一点字段。
package com.litongjava.sip.model;
import com.litongjava.sip.rtp.RtpUdpServer;
import com.litongjava.sip.sdp.CodecSpec;
public class CallSession {
private String callId;
private String fromTag;
private String toTag;
private String transport;
private String remoteSipIp;
private int remoteSipPort;
private String remoteRtpIp;
private int remoteRtpPort;
private int localRtpPort;
private long createdTime;
private long updatedTime;
private long ackDeadline;
private boolean ackReceived;
private boolean terminated;
private String last200Ok;
private RtpUdpServer rtpServer;
private CodecSpec selectedCodec;
private boolean telephoneEventSupported;
private int remoteTelephoneEventPayloadType = -1;
private int ptime = 20;
// 第三阶段新增
private long localSsrc = System.nanoTime() & 0xFFFFFFFFL;
private int sendSequence = 0;
private long sendTimestamp = 0;
private boolean rtpInitialized = false;
public synchronized int nextSendSequence() {
sendSequence = (sendSequence + 1) & 0xFFFF;
return sendSequence;
}
public synchronized long nextSendTimestamp(int sampleCount) {
if (!rtpInitialized) {
rtpInitialized = true;
sendTimestamp = sampleCount;
return sendTimestamp;
}
sendTimestamp = (sendTimestamp + sampleCount) & 0xFFFFFFFFL;
return sendTimestamp;
}
public long getLocalSsrc() {
return localSsrc;
}
public void setLocalSsrc(long localSsrc) {
this.localSsrc = localSsrc;
}
public int getSendSequence() {
return sendSequence;
}
public void setSendSequence(int sendSequence) {
this.sendSequence = sendSequence;
}
public long getSendTimestamp() {
return sendTimestamp;
}
public void setSendTimestamp(long sendTimestamp) {
this.sendTimestamp = sendTimestamp;
}
public boolean isRtpInitialized() {
return rtpInitialized;
}
public void setRtpInitialized(boolean rtpInitialized) {
this.rtpInitialized = rtpInitialized;
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
public String getFromTag() {
return fromTag;
}
public void setFromTag(String fromTag) {
this.fromTag = fromTag;
}
public String getToTag() {
return toTag;
}
public void setToTag(String toTag) {
this.toTag = toTag;
}
public String getTransport() {
return transport;
}
public void setTransport(String transport) {
this.transport = transport;
}
public String getRemoteSipIp() {
return remoteSipIp;
}
public void setRemoteSipIp(String remoteSipIp) {
this.remoteSipIp = remoteSipIp;
}
public int getRemoteSipPort() {
return remoteSipPort;
}
public void setRemoteSipPort(int remoteSipPort) {
this.remoteSipPort = remoteSipPort;
}
public String getRemoteRtpIp() {
return remoteRtpIp;
}
public void setRemoteRtpIp(String remoteRtpIp) {
this.remoteRtpIp = remoteRtpIp;
}
public int getRemoteRtpPort() {
return remoteRtpPort;
}
public void setRemoteRtpPort(int remoteRtpPort) {
this.remoteRtpPort = remoteRtpPort;
}
public int getLocalRtpPort() {
return localRtpPort;
}
public void setLocalRtpPort(int localRtpPort) {
this.localRtpPort = localRtpPort;
}
public long getCreatedTime() {
return createdTime;
}
public void setCreatedTime(long createdTime) {
this.createdTime = createdTime;
}
public long getUpdatedTime() {
return updatedTime;
}
public void setUpdatedTime(long updatedTime) {
this.updatedTime = updatedTime;
}
public long getAckDeadline() {
return ackDeadline;
}
public void setAckDeadline(long ackDeadline) {
this.ackDeadline = ackDeadline;
}
public boolean isAckReceived() {
return ackReceived;
}
public void setAckReceived(boolean ackReceived) {
this.ackReceived = ackReceived;
}
public boolean isTerminated() {
return terminated;
}
public void setTerminated(boolean terminated) {
this.terminated = terminated;
}
public String getLast200Ok() {
return last200Ok;
}
public void setLast200Ok(String last200Ok) {
this.last200Ok = last200Ok;
}
public RtpUdpServer getRtpServer() {
return rtpServer;
}
public void setRtpServer(RtpUdpServer rtpServer) {
this.rtpServer = rtpServer;
}
public CodecSpec getSelectedCodec() {
return selectedCodec;
}
public void setSelectedCodec(CodecSpec selectedCodec) {
this.selectedCodec = selectedCodec;
}
public boolean isTelephoneEventSupported() {
return telephoneEventSupported;
}
public void setTelephoneEventSupported(boolean telephoneEventSupported) {
this.telephoneEventSupported = telephoneEventSupported;
}
public int getRemoteTelephoneEventPayloadType() {
return remoteTelephoneEventPayloadType;
}
public void setRemoteTelephoneEventPayloadType(int remoteTelephoneEventPayloadType) {
this.remoteTelephoneEventPayloadType = remoteTelephoneEventPayloadType;
}
public int getPtime() {
return ptime;
}
public void setPtime(int ptime) {
this.ptime = ptime;
}
}
三、RTP 包对象
1)RtpPacket
package com.litongjava.sip.rtp.packet;
public class RtpPacket {
private int version;
private boolean padding;
private boolean extension;
private int csrcCount;
private boolean marker;
private int payloadType;
private int sequenceNumber;
private long timestamp;
private long ssrc;
private byte[] payload;
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public boolean isPadding() {
return padding;
}
public void setPadding(boolean padding) {
this.padding = padding;
}
public boolean isExtension() {
return extension;
}
public void setExtension(boolean extension) {
this.extension = extension;
}
public int getCsrcCount() {
return csrcCount;
}
public void setCsrcCount(int csrcCount) {
this.csrcCount = csrcCount;
}
public boolean isMarker() {
return marker;
}
public void setMarker(boolean marker) {
this.marker = marker;
}
public int getPayloadType() {
return payloadType;
}
public void setPayloadType(int payloadType) {
this.payloadType = payloadType;
}
public int getSequenceNumber() {
return sequenceNumber;
}
public void setSequenceNumber(int sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
public long getTimestamp() {
return timestamp;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public long getSsrc() {
return ssrc;
}
public void setSsrc(long ssrc) {
this.ssrc = ssrc;
}
public byte[] getPayload() {
return payload;
}
public void setPayload(byte[] payload) {
this.payload = payload;
}
}
四、RTP 解析与写回
2)RtpPacketParser
先支持标准 12 字节 header,不处理 extension 和 CSRC 扩展之外的复杂情况,但保留兼容。
package com.litongjava.sip.rtp.packet;
public class RtpPacketParser {
public RtpPacket parse(byte[] data) {
if (data == null || data.length < 12) {
throw new IllegalArgumentException("rtp packet too short");
}
int b0 = data[0] & 0xFF;
int b1 = data[1] & 0xFF;
int version = (b0 >> 6) & 0x03;
if (version != 2) {
throw new IllegalArgumentException("unsupported rtp version: " + version);
}
boolean padding = ((b0 >> 5) & 0x01) == 1;
boolean extension = ((b0 >> 4) & 0x01) == 1;
int csrcCount = b0 & 0x0F;
boolean marker = ((b1 >> 7) & 0x01) == 1;
int payloadType = b1 & 0x7F;
int sequenceNumber = ((data[2] & 0xFF) << 8) | (data[3] & 0xFF);
long timestamp =
((long) (data[4] & 0xFF) << 24) |
((long) (data[5] & 0xFF) << 16) |
((long) (data[6] & 0xFF) << 8) |
((long) (data[7] & 0xFF));
long ssrc =
((long) (data[8] & 0xFF) << 24) |
((long) (data[9] & 0xFF) << 16) |
((long) (data[10] & 0xFF) << 8) |
((long) (data[11] & 0xFF));
int headerLen = 12 + csrcCount * 4;
if (data.length < headerLen) {
throw new IllegalArgumentException("invalid rtp header length");
}
if (extension) {
if (data.length < headerLen + 4) {
throw new IllegalArgumentException("invalid rtp extension header");
}
int extLenWords = ((data[headerLen + 2] & 0xFF) << 8) | (data[headerLen + 3] & 0xFF);
headerLen += 4 + extLenWords * 4;
if (data.length < headerLen) {
throw new IllegalArgumentException("invalid rtp extension payload");
}
}
int payloadLen = data.length - headerLen;
if (padding) {
int paddingCount = data[data.length - 1] & 0xFF;
payloadLen -= paddingCount;
if (payloadLen < 0) {
throw new IllegalArgumentException("invalid rtp padding");
}
}
byte[] payload = new byte[payloadLen];
System.arraycopy(data, headerLen, payload, 0, payloadLen);
RtpPacket packet = new RtpPacket();
packet.setVersion(version);
packet.setPadding(padding);
packet.setExtension(extension);
packet.setCsrcCount(csrcCount);
packet.setMarker(marker);
packet.setPayloadType(payloadType);
packet.setSequenceNumber(sequenceNumber);
packet.setTimestamp(timestamp);
packet.setSsrc(ssrc);
packet.setPayload(payload);
return packet;
}
}
3)RtpPacketWriter
package com.litongjava.sip.rtp.packet;
public class RtpPacketWriter {
public byte[] write(RtpPacket packet) {
byte[] payload = packet.getPayload();
if (payload == null) {
payload = new byte[0];
}
byte[] out = new byte[12 + payload.length];
int b0 = 0;
b0 |= (2 & 0x03) << 6; // version=2
if (packet.isPadding()) {
b0 |= 1 << 5;
}
if (packet.isExtension()) {
b0 |= 1 << 4;
}
b0 |= (packet.getCsrcCount() & 0x0F);
int b1 = 0;
if (packet.isMarker()) {
b1 |= 1 << 7;
}
b1 |= (packet.getPayloadType() & 0x7F);
out[0] = (byte) b0;
out[1] = (byte) b1;
int seq = packet.getSequenceNumber() & 0xFFFF;
out[2] = (byte) ((seq >> 8) & 0xFF);
out[3] = (byte) (seq & 0xFF);
long ts = packet.getTimestamp() & 0xFFFFFFFFL;
out[4] = (byte) ((ts >> 24) & 0xFF);
out[5] = (byte) ((ts >> 16) & 0xFF);
out[6] = (byte) ((ts >> 8) & 0xFF);
out[7] = (byte) (ts & 0xFF);
long ssrc = packet.getSsrc() & 0xFFFFFFFFL;
out[8] = (byte) ((ssrc >> 24) & 0xFF);
out[9] = (byte) ((ssrc >> 16) & 0xFF);
out[10] = (byte) ((ssrc >> 8) & 0xFF);
out[11] = (byte) (ssrc & 0xFF);
System.arraycopy(payload, 0, out, 12, payload.length);
return out;
}
}
五、音频帧和处理器
4)AudioFrame
统一用 PCM16 单声道。
package com.litongjava.sip.rtp.media;
public class AudioFrame {
private short[] samples;
private int sampleRate;
private int channels;
private long rtpTimestamp;
public AudioFrame() {
}
public AudioFrame(short[] samples, int sampleRate, int channels, long rtpTimestamp) {
this.samples = samples;
this.sampleRate = sampleRate;
this.channels = channels;
this.rtpTimestamp = rtpTimestamp;
}
public short[] getSamples() {
return samples;
}
public void setSamples(short[] samples) {
this.samples = samples;
}
public int getSampleRate() {
return sampleRate;
}
public void setSampleRate(int sampleRate) {
this.sampleRate = sampleRate;
}
public int getChannels() {
return channels;
}
public void setChannels(int channels) {
this.channels = channels;
}
public long getRtpTimestamp() {
return rtpTimestamp;
}
public void setRtpTimestamp(long rtpTimestamp) {
this.rtpTimestamp = rtpTimestamp;
}
public int sampleCount() {
return samples == null ? 0 : samples.length;
}
}
5)MediaProcessor
package com.litongjava.sip.rtp.media;
import com.litongjava.sip.model.CallSession;
public interface MediaProcessor {
AudioFrame process(AudioFrame input, CallSession session);
}
6)EchoMediaProcessor
package com.litongjava.sip.rtp.media;
import com.litongjava.sip.model.CallSession;
public class EchoMediaProcessor implements MediaProcessor {
@Override
public AudioFrame process(AudioFrame input, CallSession session) {
return input;
}
}
六、Codec 接口与 G.711 实现
7)AudioCodec
package com.litongjava.sip.rtp.codec;
public interface AudioCodec {
String codecName();
int payloadType();
int sampleRate();
short[] decode(byte[] payload);
byte[] encode(short[] pcm16);
}
8)PcmuCodec
G.711 μ-law。
package com.litongjava.sip.rtp.codec;
public class PcmuCodec implements AudioCodec {
@Override
public String codecName() {
return "PCMU";
}
@Override
public int payloadType() {
return 0;
}
@Override
public int sampleRate() {
return 8000;
}
@Override
public short[] decode(byte[] payload) {
short[] out = new short[payload.length];
for (int i = 0; i < payload.length; i++) {
out[i] = ulawToLinear(payload[i]);
}
return out;
}
@Override
public byte[] encode(short[] pcm16) {
byte[] out = new byte[pcm16.length];
for (int i = 0; i < pcm16.length; i++) {
out[i] = linearToUlaw(pcm16[i]);
}
return out;
}
private short ulawToLinear(byte ulaw) {
int u = ~ulaw & 0xFF;
int sign = u & 0x80;
int exponent = (u >> 4) & 0x07;
int mantissa = u & 0x0F;
int sample = ((mantissa << 3) + 0x84) << exponent;
sample -= 0x84;
return (short) (sign != 0 ? -sample : sample);
}
private byte linearToUlaw(short sample) {
final int BIAS = 0x84;
final int CLIP = 32635;
int pcm = sample;
int sign = (pcm >> 8) & 0x80;
if (sign != 0) {
pcm = -pcm;
}
if (pcm > CLIP) {
pcm = CLIP;
}
pcm += BIAS;
int exponent = 7;
for (int expMask = 0x4000; (pcm & expMask) == 0 && exponent > 0; exponent--, expMask >>= 1) {
}
int mantissa = (pcm >> (exponent + 3)) & 0x0F;
int ulaw = ~(sign | (exponent << 4) | mantissa) & 0xFF;
return (byte) ulaw;
}
}
9)PcmaCodec
G.711 A-law。
package com.litongjava.sip.rtp.codec;
public class PcmaCodec implements AudioCodec {
@Override
public String codecName() {
return "PCMA";
}
@Override
public int payloadType() {
return 8;
}
@Override
public int sampleRate() {
return 8000;
}
@Override
public short[] decode(byte[] payload) {
short[] out = new short[payload.length];
for (int i = 0; i < payload.length; i++) {
out[i] = alawToLinear(payload[i]);
}
return out;
}
@Override
public byte[] encode(short[] pcm16) {
byte[] out = new byte[pcm16.length];
for (int i = 0; i < pcm16.length; i++) {
out[i] = linearToAlaw(pcm16[i]);
}
return out;
}
private short alawToLinear(byte alaw) {
int a = alaw ^ 0x55;
int sign = a & 0x80;
int exponent = (a & 0x70) >> 4;
int mantissa = a & 0x0F;
int sample;
if (exponent == 0) {
sample = (mantissa << 4) + 8;
} else {
sample = ((mantissa << 4) + 0x108) << (exponent - 1);
}
return (short) (sign == 0 ? sample : -sample);
}
private byte linearToAlaw(short sample) {
int pcm = sample;
int sign;
int exponent;
int mantissa;
int alaw;
sign = (pcm & 0x8000) >> 8;
if (sign != 0) {
pcm = -pcm;
}
if (pcm > 32767) {
pcm = 32767;
}
if (pcm >= 256) {
exponent = 7;
for (int expMask = 0x4000; (pcm & expMask) == 0 && exponent > 0; exponent--, expMask >>= 1) {
}
mantissa = (pcm >> (exponent + 3)) & 0x0F;
alaw = (exponent << 4) | mantissa;
} else {
alaw = pcm >> 4;
}
alaw ^= (sign ^ 0x55);
return (byte) alaw;
}
}
七、升级后的 RtpUdpHandler
这个类是重点。
它做这些事:
- 根据本地 RTP 端口找到
CallSession - 解析 RTP
- 过滤 telephone-event
- 根据 session.selectedCodec 选择解码器
- 解码为 PCM16 音频帧
- 交给
MediaProcessor - 再编码
- 重新组包
- 发送给远端 RTP 地址端口
10)RtpUdpHandler
package com.litongjava.sip.rtp.server;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.rtp.codec.AudioCodec;
import com.litongjava.sip.rtp.codec.PcmaCodec;
import com.litongjava.sip.rtp.codec.PcmuCodec;
import com.litongjava.sip.rtp.media.AudioFrame;
import com.litongjava.sip.rtp.media.EchoMediaProcessor;
import com.litongjava.sip.rtp.media.MediaProcessor;
import com.litongjava.sip.rtp.packet.RtpPacket;
import com.litongjava.sip.rtp.packet.RtpPacketParser;
import com.litongjava.sip.rtp.packet.RtpPacketWriter;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.Node;
import com.litongjava.tio.core.udp.UdpPacket;
import com.litongjava.tio.core.udp.intf.UdpHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RtpUdpHandler implements UdpHandler {
private final int localPort;
private final CallSessionManager sessionManager;
private final RtpPacketParser rtpPacketParser = new RtpPacketParser();
private final RtpPacketWriter rtpPacketWriter = new RtpPacketWriter();
private final MediaProcessor mediaProcessor;
private final AudioCodec pcmuCodec = new PcmuCodec();
private final AudioCodec pcmaCodec = new PcmaCodec();
public RtpUdpHandler(int localPort, CallSessionManager sessionManager) {
this(localPort, sessionManager, new EchoMediaProcessor());
}
public RtpUdpHandler(int localPort, CallSessionManager sessionManager, MediaProcessor mediaProcessor) {
this.localPort = localPort;
this.sessionManager = sessionManager;
this.mediaProcessor = mediaProcessor;
}
@Override
public void handler(UdpPacket udpPacket, DatagramSocket socket) {
try {
CallSession session = sessionManager.getByLocalRtpPort(localPort);
if (session == null || session.isTerminated()) {
return;
}
Node remote = udpPacket.getRemote();
byte[] data = udpPacket.getData();
if (data == null || data.length < 12) {
return;
}
RtpPacket in = rtpPacketParser.parse(data);
// 更新远端 RTP 地址,适配首次学习或端口漂移
if (session.getRemoteRtpIp() == null || session.getRemoteRtpIp().isEmpty()) {
session.setRemoteRtpIp(remote.getIp());
}
if (session.getRemoteRtpPort() <= 0) {
session.setRemoteRtpPort(remote.getPort());
}
// DTMF event 先忽略,不做 echo
if (session.isTelephoneEventSupported() &&
in.getPayloadType() == session.getRemoteTelephoneEventPayloadType()) {
session.setUpdatedTime(System.currentTimeMillis());
return;
}
AudioCodec codec = chooseCodec(session);
if (codec == null) {
return;
}
// 有些终端可能发来的 payload type 和协商结果不一致,先只按 session 选中 codec 解码
short[] pcm = codec.decode(in.getPayload());
AudioFrame inputFrame = new AudioFrame(pcm, codec.sampleRate(), 1, in.getTimestamp());
AudioFrame outputFrame = mediaProcessor.process(inputFrame, session);
if (outputFrame == null || outputFrame.getSamples() == null || outputFrame.getSamples().length == 0) {
return;
}
byte[] outPayload = codec.encode(outputFrame.getSamples());
RtpPacket out = new RtpPacket();
out.setVersion(2);
out.setPadding(false);
out.setExtension(false);
out.setCsrcCount(0);
out.setMarker(false);
out.setPayloadType(session.getSelectedCodec().getPayloadType());
out.setSequenceNumber(session.nextSendSequence());
out.setTimestamp(session.nextSendTimestamp(outputFrame.sampleCount()));
out.setSsrc(session.getLocalSsrc());
out.setPayload(outPayload);
byte[] outBytes = rtpPacketWriter.write(out);
DatagramPacket resp = new DatagramPacket(
outBytes,
outBytes.length,
new InetSocketAddress(session.getRemoteRtpIp(), session.getRemoteRtpPort()));
socket.send(resp);
session.setUpdatedTime(System.currentTimeMillis());
} catch (Exception e) {
log.error("rtp handler error, localPort={}", localPort, e);
}
}
private AudioCodec chooseCodec(CallSession session) {
if (session.getSelectedCodec() == null) {
return null;
}
String codecName = session.getSelectedCodec().getCodecName();
if ("PCMU".equalsIgnoreCase(codecName)) {
return pcmuCodec;
}
if ("PCMA".equalsIgnoreCase(codecName)) {
return pcmaCodec;
}
return null;
}
}
八、补 CallSessionManager 查询 RTP 端口的方法
你现在的 RtpUdpHandler 需要通过本地端口找到 session,所以 CallSessionManager 加一个查询。
11)升级 CallSessionManager
package com.litongjava.sip.server.session;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.litongjava.sip.model.CallSession;
public class CallSessionManager {
private final Map<String, CallSession> sessions = new ConcurrentHashMap<>();
public CallSession getByCallId(String callId) {
if (callId == null) {
return null;
}
return sessions.get(callId);
}
public CallSession getByLocalRtpPort(int localRtpPort) {
for (CallSession session : sessions.values()) {
if (session != null && session.getLocalRtpPort() == localRtpPort) {
return session;
}
}
return null;
}
public CallSession createOrUpdate(CallSession session) {
if (session == null || session.getCallId() == null) {
throw new IllegalArgumentException("call session or callId is null");
}
session.setUpdatedTime(System.currentTimeMillis());
sessions.put(session.getCallId(), session);
return session;
}
public void markAckReceived(String callId) {
CallSession session = sessions.get(callId);
if (session != null) {
session.setAckReceived(true);
session.setUpdatedTime(System.currentTimeMillis());
}
}
public void markTerminated(String callId) {
CallSession session = sessions.get(callId);
if (session != null) {
session.setTerminated(true);
session.setUpdatedTime(System.currentTimeMillis());
}
}
public void remove(String callId) {
sessions.remove(callId);
}
public Map<String, CallSession> snapshot() {
return Map.copyOf(sessions);
}
}
九、升级 RtpUdpServer
你原来 RtpUdpServer 是写死用 RtpEchoUdpHandler。 现在要换成新的 RtpUdpHandler,所以改成可注入 CallSessionManager。
12)升级 RtpUdpServer
package com.litongjava.sip.rtp;
import java.net.SocketException;
import com.litongjava.sip.rtp.server.RtpUdpHandler;
import com.litongjava.sip.server.session.CallSessionManager;
import com.litongjava.tio.core.udp.UdpServer;
import com.litongjava.tio.core.udp.UdpServerConf;
public class RtpUdpServer {
private final int port;
private final CallSessionManager sessionManager;
private UdpServer udpServer;
public RtpUdpServer(int port, CallSessionManager sessionManager) {
this.port = port;
this.sessionManager = sessionManager;
}
public void start() throws SocketException {
UdpServerConf conf = new UdpServerConf(port, new RtpUdpHandler(port, sessionManager), 5000);
this.udpServer = new UdpServer(conf);
this.udpServer.start();
}
public void stop() {
if (udpServer != null) {
udpServer.stop();
}
}
public int port() {
return port;
}
}
十、升级 RtpServerManager
因为 RtpUdpServer 构造变了,RtpServerManager 也要改成持有 CallSessionManager。
13)升级 RtpServerManager
package com.litongjava.sip.rtp;
import com.litongjava.sip.model.CallSession;
import com.litongjava.sip.server.session.CallSessionManager;
public class RtpServerManager {
private final String localIp;
private final RtpPortAllocator allocator;
private final CallSessionManager sessionManager;
public RtpServerManager(String localIp, CallSessionManager sessionManager) {
this(localIp, new RtpPortAllocator(), sessionManager);
}
public RtpServerManager(String localIp, RtpPortAllocator allocator, CallSessionManager sessionManager) {
this.localIp = localIp;
this.allocator = allocator;
this.sessionManager = sessionManager;
}
public CallSession allocateAndStart(CallSession session) throws Exception {
int rtpPort = allocator.allocate();
RtpUdpServer rtpServer = new RtpUdpServer(rtpPort, sessionManager);
rtpServer.start();
session.setLocalRtpPort(rtpPort);
session.setRtpServer(rtpServer);
session.setUpdatedTime(System.currentTimeMillis());
return session;
}
public void stopAndRelease(CallSession session) {
if (session == null) {
return;
}
try {
if (session.getRtpServer() != null) {
session.getRtpServer().stop();
}
} finally {
if (session.getLocalRtpPort() > 0) {
allocator.release(session.getLocalRtpPort());
}
}
}
public String getLocalIp() {
return localIp;
}
}
十一、SipServerConfig 里构造方式要改一下
因为 RtpServerManager 现在需要 CallSessionManager。
CallSessionManager sessionManager = new CallSessionManager();
RtpServerManager rtpServerManager = new RtpServerManager(localIp, sessionManager);
SipInviteOnlyTcpHandler tcpHandler =
new SipInviteOnlyTcpHandler(localIp, sessionManager, rtpServerManager);
SipInviteOnlyUdpHandler udpHandler =
new SipInviteOnlyUdpHandler(localIp, sessionManager, rtpServerManager);
十二、这版实现了什么
这版落完后,你的 RTP 不再是“把原始 UDP 包原样返回”,而是:
- 收到 RTP
- 解析 RTP 头
- 按协商 codec 解码成 PCM16
- 构造成
AudioFrame - 通过
EchoMediaProcessor做音频帧级回环 - 再编码成 G.711 payload
- 重新生成 RTP header
- 发回远端
也就是说,你已经从:
- packet echo
升级成了:
- audio frame echo
这对后面接 AI 很关键,因为未来你只需要把:
new EchoMediaProcessor()
替换成:
new GeminiMediaProcessor()
媒体总框架就不需要推翻。
