使用 SSE 流式传输生成进度的实现文档
在视频生成过程中,由于生成操作耗时较长,传统的阻塞式等待会严重影响用户体验。为此,本方案采用了 Server-Sent Events(SSE)技术,在视频生成过程中实时将进度数据推送给客户端。下文详细介绍了核心代码实现、设计思路以及工作流程。
1. 整体架构与设计思路
本方案主要由两个模块组成:
- 请求处理模块(Handler)
处理 HTTP 请求,判断是否需要采用 SSE 流式响应,并调用具体的业务服务。 - 业务服务模块(Service)
主要负责接收客户端提交的代码、生成脚本文件、启动 Python 脚本执行,同时在脚本运行过程中监控生成的视频文件夹。通过 Java NIO 的 WatchService 监听指定目录中文件的新增事件,将文件更新信息通过 SSE 事件实时发送给客户端。最终,当视频生成完成后,将整个生成结果以 JSON 格式通知客户端。
该设计思路使得用户能够在生成视频的整个过程中看到每一帧或部分视频文件的生成状态,从而大大提升用户体验。
2. 代码详细说明
下面介绍两个核心类的具体实现。
2.1. 请求处理类:ManimHanlder.java
该类位于包 com.litongjava.linux.handler
中,是 HTTP 请求的入口。在处理请求时,主要完成以下工作:
- 解析请求参数,判断是否启用流式传输(SSE)。
- 获取客户端请求中包含的代码内容。
- 若启用 SSE 流传输,则为响应添加 SSE 头,并设置响应为非自动发送状态。
- 调用
ManimService
进行业务处理,并根据执行结果设置响应内容。
完整代码如下:
package com.litongjava.linux.handler;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.linux.ProcessResult;
import com.litongjava.linux.service.ManimService;
import com.litongjava.tio.boot.http.TioRequestContext;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.common.HttpResponse;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ManimHanlder {
ManimService manimService = Aop.get(ManimService.class);
public HttpResponse index(HttpRequest request) {
ChannelContext channelContext = request.getChannelContext();
Boolean stream = request.getBoolean("stream");
if (stream == null) {
stream = false;
}
String code = request.getBodyString();
log.info("code:{}", code);
HttpResponse response = TioRequestContext.getResponse();
if (stream) {
response.addServerSentEventsHeader();
Tio.send(channelContext, response);
response.setSend(false);
}
try {
ProcessResult executeScript = manimService.executeCode(code, stream, channelContext);
if (executeScript != null) {
response.setJson(executeScript);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
response.setStatus(500);
response.setString(e.getMessage());
}
return response;
}
}
2.2. 业务服务类:ManimService.java
该类位于包 com.litongjava.linux.service
中,主要负责以下几部分工作:
脚本文件生成与目录创建
- 在
cache
文件夹下为每次生成创建一个唯一子目录,用于存储生成的输出文件。 - 将客户端提交的代码中的占位符
#(output_path)
替换为实际生成的视频存储路径。 - 在
scripts
文件夹下生成对应的 Python 脚本文件,作为视频生成的执行脚本。
- 在
执行 Python 脚本
- 根据操作系统判断使用
python
或python3
命令执行脚本。 - 捕获标准输出和错误输出,将执行结果封装到
ProcessResult
对象中。
- 根据操作系统判断使用
流式传输与文件监控
- 当客户端请求启用流式传输时,在生成视频的部分文件所在目录(
partial_movie_files/CombinedScene
)下创建文件夹,并启动一个独立线程使用 Java NIO 的WatchService
监控目录文件的新增情况。 - 监控线程每当捕获到新的文件创建事件时,会通过
Tio.send
将该文件路径封装为 SSE 事件(事件类型为part
)推送给客户端。 - 脚本执行完毕后,通过一个新的线程发送最终的执行结果(事件类型为
result
),并关闭 SSE 连接。
- 当客户端请求启用流式传输时,在生成视频的部分文件所在目录(
完整代码如下:
package com.litongjava.linux.service;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.TimeUnit;
import com.litongjava.linux.ProcessResult;
import com.litongjava.tio.core.ChannelContext;
import com.litongjava.tio.core.Tio;
import com.litongjava.tio.http.common.sse.SsePacket;
import com.litongjava.tio.http.server.util.SseEmitter;
import com.litongjava.tio.utils.hutool.FileUtil;
import com.litongjava.tio.utils.json.JsonUtils;
import com.litongjava.tio.utils.snowflake.SnowflakeIdUtils;
import com.litongjava.tio.utils.thread.TioThreadUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ManimService {
public ProcessResult executeCode(String code, Boolean stream, ChannelContext channelContext) throws IOException, InterruptedException {
new File("cache").mkdirs();
long id = SnowflakeIdUtils.id();
String subFolder = "cache" + File.separator + id;
code = code.replace("#(output_path)", subFolder);
String folder = "scripts" + File.separator + id;
File fileFolder = new File(folder);
if (!fileFolder.exists()) {
fileFolder.mkdirs();
}
String scriptPath = folder + File.separator + "script.py";
FileUtil.writeString(code, scriptPath, StandardCharsets.UTF_8.toString());
// 执行脚本
String videoFolder = subFolder + File.separator + "videos" + File.separator + "1080p30";
// 定义需要监控的文件夹,注意此处为绝对路径或根据实际情况调整
String partVideoFolder = videoFolder + File.separator + "partial_movie_files" + File.separator + "CombinedScene";
// 如果需要流式发送,则启动文件夹监控线程
ProcessResult execute = null;
if (stream) {
File file = new File(partVideoFolder);
file.mkdirs();
log.info("watch:{}", file.getAbsolutePath());
Thread watcherThread = new Thread(() -> watchFolder(partVideoFolder, channelContext));
watcherThread.start();
TioThreadUtils.execute(() -> {
try {
ProcessResult execute2 = execute(scriptPath);
// 可以等待一段时间,以确保监控期间捕获到文件创建事件
Thread.sleep(2000);
if (watcherThread != null && watcherThread.isAlive()) {
watcherThread.interrupt();
}
String skipNullJson = JsonUtils.toSkipNullJson(execute2);
Tio.send(channelContext, new SsePacket("result", skipNullJson));
SseEmitter.closeSeeConnection(channelContext);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} else {
execute = execute(scriptPath);
execute.setTaskId(id);
String filePath = videoFolder + File.separator + "CombinedScene.mp4";
File file = new File(filePath);
if (file.exists()) {
execute.setOutput(filePath.replace("\\", "/"));
} else {
log.info("file is not exists:{}", filePath);
}
}
return execute;
}
public static ProcessResult execute(String scriptPath) throws IOException, InterruptedException {
String osName = System.getProperty("os.name");
ProcessBuilder pb = null;
if (osName.toLowerCase().contains("windows")) {
pb = new ProcessBuilder("python", scriptPath);
} else {
pb = new ProcessBuilder("python3", scriptPath);
}
pb.environment().put("PYTHONIOENCODING", "utf-8");
Process process = pb.start();
BufferedReader stdInput = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8));
BufferedReader stdError = new BufferedReader(new InputStreamReader(process.getErrorStream(), StandardCharsets.UTF_8));
StringBuilder outputBuilder = new StringBuilder();
String line;
while ((line = stdInput.readLine()) != null) {
outputBuilder.append(line).append("\n");
}
StringBuilder errorBuilder = new StringBuilder();
while ((line = stdError.readLine()) != null) {
errorBuilder.append(line).append("\n");
}
int exitCode = process.waitFor();
ProcessResult result = new ProcessResult();
result.setExitCode(exitCode);
result.setStdOut(outputBuilder.toString());
result.setStdErr(errorBuilder.toString());
return result;
}
/**
* 监控指定目录中新建文件的变化,并发送给客户端
*/
private void watchFolder(String folderPath, ChannelContext channelContext) {
Path path = Paths.get(folderPath);
try (WatchService watchService = FileSystems.getDefault().newWatchService()) {
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);
while (!Thread.currentThread().isInterrupted()) {
WatchKey key = null;
try {
key = watchService.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
if (key == null) {
continue;
}
for (WatchEvent<?> event : key.pollEvents()) {
if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
String newFileName = event.context().toString();
String fullPath = folderPath + File.separator + newFileName;
// 通过 Tio.send 发送新文件路径到客户端
Tio.send(channelContext, new SsePacket("part", fullPath));
}
}
key.reset();
}
} catch (IOException e) {
log.error("Error watching folder {}", folderPath, e);
}
}
}
3. 运行效果
在 SSE 流式传输模式下,客户端将收到如下形式的事件数据:
event:part
data:cache/496261712987107328/videos/1080p30/partial_movie_files/CombinedScene/uncached_00000.mp4
event:part
data:cache/496261712987107328/videos/1080p30/partial_movie_files/CombinedScene/uncached_00002.mp4
..
event:part
data:cache/496261712987107328/videos/1080p30/partial_movie_files/CombinedScene/uncached_00057.mp4
event:part
data:cache/496261712987107328/videos/1080p30/partial_movie_files/CombinedScene/uncached_00058.mp4
event:result
data:{"taskId":"496261712987107328","exitCode":0,"stdOut":"omit","output":"cache/496261712987107328/videos/1080p30/CombinedScene.mp4"}
解释:
event:part
:每当监控到新生成的部分视频文件时,会通过该事件推送文件路径,客户端可以据此更新生成进度。event:result
:脚本执行完毕后,最终结果(包括任务 ID、退出码、标准输出、生成视频路径等)以 JSON 格式发送给客户端,并关闭 SSE 连接。
4. 实现方法与技术亮点
4.1. 文件夹监控
使用 WatchService:
利用 Java NIO 提供的 WatchService 监控指定目录中文件的创建事件。该服务通过poll
方法定时检查新文件的产生,保证了实时性与资源效率。多线程处理:
为避免阻塞主线程,监控工作在独立线程中进行。监控到文件创建事件后,立即将文件路径以 SSE 消息的形式发送给客户端。
4.2. 流式传输
- Server-Sent Events(SSE):
通过 SSE 实现服务端向客户端的单向推送。在 HTTP 响应头中添加 SSE 所需的头信息,随后持续向客户端推送事件数据。 - 事件分离:
将文件生成进度(part
事件)和最终执行结果(result
事件)分离开来,客户端可根据事件类型分别处理。
4.3. 脚本执行
- 跨平台支持:
通过检测操作系统名称,决定使用python
或python3
命令执行脚本,保证在不同平台下的兼容性。 - 结果捕获:
同步捕获标准输出和错误输出,并等待进程结束后将结果封装为ProcessResult
对象返回给调用者。
5. 总结
本方案通过 SSE 技术和目录监控机制,实现了在视频生成过程中实时向客户端推送生成进度和最终结果。整个流程包括请求接收、脚本生成与执行、文件监控以及 SSE 消息发送,各个模块协同工作,从而显著提升了用户体验。此实现方案适合于需要长时间运行、实时反馈进度的场景,为用户提供了友好的交互方式。
以上即为本项目的完整实现文档,代码中每个部分都经过精心设计以保证高效、稳定和跨平台兼容性。