对接 火山引擎 DeepSeek
下面是一篇完整的文档,详细介绍了如何对接火山引擎 DeepSeek 推理 API,将其集成到问答系统中作为推理接口。文档中包含所有代码,并逐步解释各个部分的作用、调用流程和关键实现逻辑,帮助读者深入理解系统的工作原理。
对接火山引擎 DeepSeek 推理 API
在本系统中,我们将火山引擎 DeepSeek 推理 API 集成到问答流程中,作为生成回答的后端推理接口。注意,此处对接的是 DeepSeek 的推理 API,而非搜索 API。在业务逻辑中,当需要调用 DeepSeek 推理功能时,会在 AiSearchService
中调用如下接口:
return deepSeekPredictService.predict(channelContext, reqMessageVo, chatParamVo);
下面我们详细介绍 DeepSeek 推理接口的两个关键实现类:
- DeepSeekPredictService:封装了构造 DeepSeek 请求、调用推理 API 以及通知前端开始推理的逻辑。
- DeepSeekSseCallback:实现了 OkHttp 的 Callback 接口,用于处理 DeepSeek 推理 API 的流式响应,将生成的回答实时返回给客户端,并在响应结束后进行后续处理。
1. DeepSeekPredictService
该类负责将对话上下文、系统提示、历史记录及当前用户问题整合成一个请求,调用火山引擎 DeepSeek 推理 API,并返回用于流式处理回答的 Call 对象。
package com.litongjava.perplexica.services;
import java.util.ArrayList;
import java.util.List;
import com.litongjava.openai.chat.OpenAiChatMessage;
import com.litongjava.openai.chat.OpenAiChatRequestVo;
import com.litongjava.openai.client.OpenAiClient;
import com.litongjava.perplexica.callback.DeepSeekSseCallback;
import com.litongjava.perplexica.vo.ChatParamVo;
import com.litongjava.perplexica.vo.ChatWsReqMessageVo;
import com.litongjava.perplexica.vo.ChatWsRespVo;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.websocket.common.WebSocketResponse;
import com.litongjava.volcengine.VolcEngineConst;
import com.litongjava.volcengine.VolcEngineModels;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Callback;
@Slf4j
public class DeepSeekPredictService {
/**
* 调用火山引擎 DeepSeek 推理 API 进行回答生成
*
* @param channelContext WebSocket 通道上下文,用于与客户端实时通信
* @param reqMessageVo 用户请求消息对象,包含用户输入、历史记录等信息
* @param chatParamVo 对话参数对象,包含系统提示、问题重写结果、回答消息 ID 等
* @return 返回一个 Call 对象,表示流式推理请求
*/
public Call predict(ChannelContext channelContext, ChatWsReqMessageVo reqMessageVo, ChatParamVo chatParamVo) {
// 获取系统提示,如果存在则作为系统消息添加到对话内容中
String systemPrompt = chatParamVo.getSystemPrompt();
Long sessionId = reqMessageVo.getMessage().getChatId();
Long questionMessageId = reqMessageVo.getMessage().getMessageId();
String content = reqMessageVo.getMessage().getContent();
Long answerMessageId = chatParamVo.getAnswerMessageId();
// 构造对话消息列表
List<OpenAiChatMessage> contents = new ArrayList<>();
if (systemPrompt != null) {
contents.add(new OpenAiChatMessage("system", systemPrompt));
log.info("deepkseek:{}", systemPrompt);
}
// 如果存在历史记录,则将其添加到对话列表中(转换角色:"human" -> "user",其他转换为 "assistant")
List<List<String>> history = reqMessageVo.getHistory();
if (history != null && history.size() > 0) {
for (int i = 0; i < history.size(); i++) {
String role = history.get(i).get(0);
String message = history.get(i).get(1);
if ("human".equals(role)) {
role = "user";
} else {
role = "assistant";
}
contents.add(new OpenAiChatMessage(role, message));
}
}
// 添加当前用户输入的消息
contents.add(new OpenAiChatMessage("user", content));
// 构造请求对象,设置使用火山引擎 DeepSeek 模型
OpenAiChatRequestVo chatRequestVo = new OpenAiChatRequestVo().setModel(VolcEngineModels.DEEPSEEK_R1_250120)
.setMessages(contents);
chatRequestVo.setStream(true);
// 向前端发送空消息及 "start thinking..." 消息,通知客户端进入推理阶段
ChatWsRespVo<String> chatVo = ChatWsRespVo.message(answerMessageId, "");
WebSocketResponse websocketResponse = WebSocketResponse.fromJson(chatVo);
if (channelContext != null) {
Tio.bSend(channelContext, websocketResponse);
chatVo = ChatWsRespVo.message(answerMessageId, "start thinking...");
websocketResponse = WebSocketResponse.fromJson(chatVo);
Tio.bSend(channelContext, websocketResponse);
}
long start = System.currentTimeMillis();
// 构造 DeepSeekSseCallback 回调对象,用于处理 DeepSeek 推理 API 的流式响应
Callback callback = new DeepSeekSseCallback(channelContext, sessionId, questionMessageId, answerMessageId, start);
String apiKey = EnvUtils.getStr("VOLCENGINE_API_KEY");
// 调用火山引擎 DeepSeek 推理 API,返回 Call 对象
Call call = OpenAiClient.chatCompletions(VolcEngineConst.BASE_URL, apiKey, chatRequestVo, callback);
return call;
}
}
详细解释
对话构造
- 如果存在
systemPrompt
(通常包含预定义的系统指令或上下文信息),则将其添加为角色为 "system" 的消息。 - 遍历历史记录,将每条历史消息转换为
OpenAiChatMessage
,其中 "human" 转换为 "user",其他则设为 "assistant"。 - 最后,将当前用户输入的消息以 "user" 角色添加到消息列表中。
- 如果存在
请求对象设置
使用OpenAiChatRequestVo
构造请求,并设置 DeepSeek 模型(通过VolcEngineModels.DEEPSEEK_R1_250120
),同时启用流式返回。客户端通知
在发起推理请求前,通过 WebSocket 向客户端发送两个消息:- 一个空消息,标识搜索阶段结束。
- 消息 "start thinking...",告知客户端推理已开始。
回调与请求发起
构造DeepSeekSseCallback
对象作为回调处理响应;从环境变量中获取 API KEY,调用OpenAiClient.chatCompletions
发起请求,返回一个 Call 对象以便后续管理或取消请求。
2. DeepSeekSseCallback
该回调类实现了 OkHttp 的 Callback 接口,用于处理火山引擎 DeepSeek 推理 API 的响应数据。其主要职责包括:
- 处理响应失败情况,通知客户端错误信息。
- 解析响应流数据,并实时通过 WebSocket 分段发送给客户端。
- 在响应结束后,将完整的生成回答保存到数据库,并通知客户端消息结束。
package com.litongjava.perplexica.callback;
import java.io.IOException;
import java.util.List;
import com.jfinal.kit.Kv;
import com.litongjava.openai.chat.ChatResponseDelta;
import com.litongjava.openai.chat.Choice;
import com.litongjava.openai.chat.OpenAiChatResponseVo;
import com.litongjava.perplexica.can.ChatWsStreamCallCan;
import com.litongjava.perplexica.model.MaxSearchChatMessage;
import com.litongjava.perplexica.vo.ChatWsRespVo;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.server.util.SseEmitter;
import com.litongjava.tio.utils.json.FastJson2Utils;
import com.litongjava.tio.websocket.common.WebSocketResponse;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.BufferedSource;
@Slf4j
public class DeepSeekSseCallback implements Callback {
private ChannelContext channelContext;
private Long sessionId;
private Long messageId;
private Long answerMessageId;
private Long start;
public DeepSeekSseCallback(ChannelContext channelContext, Long sessionId, Long messageId, Long answerMessageId, Long start) {
this.channelContext = channelContext;
this.sessionId = sessionId;
this.messageId = messageId;
this.answerMessageId = answerMessageId;
this.start = start;
}
@Override
public void onFailure(Call call, IOException e) {
ChatWsRespVo<String> error = ChatWsRespVo.error("CHAT_ERROR", e.getMessage());
WebSocketResponse packet = WebSocketResponse.fromJson(error);
Tio.bSend(channelContext, packet);
ChatWsStreamCallCan.remove(sessionId + "");
SseEmitter.closeSeeConnection(channelContext);
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if (!response.isSuccessful()) {
String string = response.body().string();
String message = "Chat model response an unsuccessful message:" + string;
log.error("message:{}", message);
ChatWsRespVo<String> data = ChatWsRespVo.error("STREAM_ERROR", message);
WebSocketResponse webSocketResponse = WebSocketResponse.fromJson(data);
Tio.bSend(channelContext, webSocketResponse);
return;
}
try (ResponseBody responseBody = response.body()) {
if (responseBody == null) {
String message = "response body is null";
log.error(message);
ChatWsRespVo<String> data = ChatWsRespVo.progress(message);
WebSocketResponse webSocketResponse = WebSocketResponse.fromJson(data);
Tio.bSend(channelContext, webSocketResponse);
return;
}
StringBuffer completionContent = onResponseSuccess(channelContext, answerMessageId, start, responseBody);
// 保存生成的回答消息到数据库
new MaxSearchChatMessage().setId(answerMessageId).setChatId(sessionId)
.setRole("assistant").setContent(completionContent.toString())
.save();
Kv end = Kv.by("type", "messageEnd").set("messageId", answerMessageId);
Tio.bSend(channelContext, WebSocketResponse.fromJson(end));
// 记录推理完成所花费的时间
long endTime = System.currentTimeMillis();
log.info("finish llm in {} (ms)", (endTime - start));
// 可在此处进行后续函数调用处理
}
ChatWsStreamCallCan.remove(sessionId + "");
}
/**
* 处理 ChatGPT 成功响应
*
* @param channelContext 通道上下文
* @param responseBody 响应体
* @return 完整的生成内容
* @throws IOException
*/
public StringBuffer onResponseSuccess(ChannelContext channelContext, Long answerMessageId, Long start, ResponseBody responseBody) throws IOException {
StringBuffer completionContent = new StringBuffer();
BufferedSource source = responseBody.source();
String line;
while ((line = source.readUtf8Line()) != null) {
if (line.length() < 1) {
continue;
}
// 处理每一行数据
if (line.length() > 6) {
String data = line.substring(6);
if (data.endsWith("}")) {
OpenAiChatResponseVo chatResponse = FastJson2Utils.parse(data, OpenAiChatResponseVo.class);
List<Choice> choices = chatResponse.getChoices();
if (!choices.isEmpty()) {
ChatResponseDelta delta = choices.get(0).getDelta();
String part = delta.getContent();
if (part != null && !part.isEmpty()) {
completionContent.append(part);
ChatWsRespVo<String> vo = ChatWsRespVo.message(answerMessageId, part);
Tio.bSend(channelContext, WebSocketResponse.fromJson(vo));
}
String reasoning_content = delta.getReasoning_content();
if (reasoning_content != null && !reasoning_content.isEmpty()) {
ChatWsRespVo<String> vo = ChatWsRespVo.message(answerMessageId, reasoning_content);
Tio.bSend(channelContext, WebSocketResponse.fromJson(vo));
}
}
} else if (": keep-alive".equals(line)) {
ChatWsRespVo<String> vo = ChatWsRespVo.keepAlive(answerMessageId);
WebSocketResponse websocketResponse = WebSocketResponse.fromJson(vo);
if (channelContext != null) {
Tio.bSend(channelContext, websocketResponse);
}
} else {
log.info("Data does not end with }:{}", line);
// 例如:{"type":"messageEnd","messageId":"654b8bdb25e853"}
}
}
}
return completionContent;
}
}
详细解释
onFailure 方法
当 DeepSeek 推理 API 调用失败时,构造错误响应(使用 ChatWsRespVo.error),通过 WebSocket 向客户端发送错误信息,同时移除该 sessionId 对应的流式调用记录,并关闭 SSE 连接。onResponse 方法
- 若响应不成功,则读取响应体字符串并构造错误响应,通知客户端错误信息。
- 若响应成功,则调用
onResponseSuccess
方法处理响应流数据,逐行解析数据,将生成的文本和推理内容实时通过 WebSocket 发送给客户端。 - 完整回答生成后保存到数据库(这里使用
MaxSearchChatMessage
模型保存回答内容),并发送消息结束信号给客户端。 - 记录整个推理过程的耗时。
onResponseSuccess 方法
该方法从响应流中逐行读取数据(去掉前 6 个字符),判断是否为有效的 JSON 数据,若是,则解析为OpenAiChatResponseVo
对象,从中提取出生成的文本片段(以及附带的 reasoning_content),实时发送给客户端;同时处理 keep-alive 信号,返回完整生成内容。
总结
本文档详细介绍了如何对接火山引擎 DeepSeek 推理 API,包括两个关键类的完整实现和详细说明:
DeepSeekPredictService
负责构造请求数据(包含系统提示、历史记录和当前问题),调用火山引擎 DeepSeek 推理 API,并在调用前通过 WebSocket 通知客户端推理开始。返回一个用于流式处理回答的 Call 对象。DeepSeekSseCallback
负责处理 DeepSeek 推理 API 的流式响应,逐行读取响应数据并解析生成内容,实时通过 WebSocket 分段返回给客户端,并在响应结束后保存回答到数据库和发送结束信号。