文件的切片上传、断点续传和并发控制
前端上传文件时候,如果文件很大,上传时会出现各种问题,比如连接超时,网断了,都会导致上传失败。为了避免上传大文件时超时,就需要用到切片断点上传。
# 整体思路
客户端:
- 使用 Blob.prototype.slice 方法对文件切片
- 使用 spark-md5 库计算文件 hash,并创建 web worker 开启新的线程执行
- 将各个分片文件上传到服务器,并携带 hash
- 上传进度计算
- 当所有文件上传完成后,发起合并请求,携带文件 hash、后缀名、分片大小
服务端:
- 根据接收到的 hash 创建文件夹,将分片文件存储到文件夹中
- 收到合并请求后,读取各个分片文件。根据 hash 和后缀名,合并生成完整文件
- 删除存储分片文件的文件夹及其内容
# 服务端部分
技术选型:Koa2 + @koa/multer 文件上传中间键
uploadController
公共部分:
// uploadController.js
const path = require('path');
const fse = require('fs-extra');
const { domain, port } = require('../../config');
// 大文件存储目录路径
const UPLOAD_DIR = path.resolve(__dirname, '../../', 'upload');
// 获取切片目录路径函数
const getChunkDir = (fileHash) => path.resolve(UPLOAD_DIR, `${fileHash}-chunks`);
// 提取文件后缀名函数
const extractExt = (filename) => filename.slice(filename.lastIndexOf('.'), filename.length);
# 切片上传接口
// uploadController.js
exports.uploadChunk = async (ctx) => {
// 文件哈希,切片哈希=${文件哈希}-${切片索引}
const { fileHash, hash } = ctx.request.body;
// 切片文件
const chunk = ctx.request.file;
// 切片目录路径
const chunkDir = getChunkDir(fileHash);
// 切片目录不存在,创建切片目录
await fse.ensureDir(chunkDir);
// 将切片文件移动到切片文件的目录中并重命名为切片哈希(即fileHash-index)
await fse.move(chunk.path, `${chunkDir}/${hash}`);
ctx.success("上传成功")
}
# 切片上传前的验证接口
在上传切片前,我们需要验证这个文件是否已经被上传过,上传过直接返回为文件引用地址,即文件秒传。若没有被上传过,则还需要进一步验证是否存在相应的切片文件(即之前可能没有全部上传完切片),如果存在的话,则后续上传需要跳过这些已经存在的切片,尽可能的减少不必要的请求,即断点续传。
思路:由于切片文件存储的目录,和合并后的文件都是根据整个文件的哈希进行命名的。所以后端可以通过前端传递的 fileHash 来判断这个文件是否已经被上传过。
- 存在上传文件,则通知前端不需要再上传并且返回文件的引用地址。
- 不存在上传的文件,但存在该文件的切片目录。则通知前端需要继续上传,并且返回已上传的切片文件列表,让前端可以跳过这些切片文件的上传
- 不存在该文件的切片目录,通知前端需要继续上传,已上传的文件列表返回一个空数组就行。
// uploadController.js
exports.verifyUpload = async (ctx) => {
// 请求体参数
const { fileHash, fileName } = ctx.request.body;
// 读取文件的后缀名 eg: .mp4
const ext = extractExt(fileName);
// 文件路径
const targetPath = path.resolve(UPLOAD_DIR, fileHash + ext);
// 判断文件是否存在
if (fse.existsSync(targetPath)) {
// 合成后的文件存在,不需要上传
ctx.success({ shouldUpload: false, url: `${domain}:${port}/upload/${fileHash}${ext}` })
} else {
// 切片目录路径
const chunkDir = getChunkDir(fileHash);
// 合成后的文件不存在
const uploadedList =
// 切片目录存在
fse.existsSync(chunkDir)
// 返回切片目录下已上传的切片文件列表
? await fse.readdir(chunkDir)
// 切片目录不存在,返回空数组
: [];
// 返回已经上传的切片列表
ctx.success({
shouldUpload: true,
uploadedList: uploadedList
})
}
}
# 切片合并接口
在切片上传完成后,前端需要调用合并接口,来合并上传的所有切片。合并成功后,返回相应的文件引用地址给前端。
// uploadController.js
exports.mergeChunk = async (ctx) => {
// 请求体参数
const { fileHash, fileName, splitSize, total } = ctx.request.body;
// 获取该文件的切片的文件夹路径
const chunkDir = getChunkDir(fileHash);
// 获取切片的文件列表
const chunkPaths = await fse.readdir(chunkDir);
// 已存在的切片数量与切片的总数不符,不能直接合并
if (chunkPaths.length !== total) {
ctx.fail("切片文件数量不符合");
}
// 根据切片下标进行排序,直接读取目录获得的顺序可能会错乱
chunkPaths.sort((a, b) => a.split("-")[1] - b.split("-")[1]);
// 获取该文件的后缀名 eg: .mp4
const ext = extractExt(fileName);
// 计算需要最终的存储的文件路径
const targetPath = path.resolve(UPLOAD_DIR, fileHash + ext);
// 开始合并切片
await Promise.all(
chunkPaths.map((chunkPath, index) => {
return pipeStream(
path.resolve(chunkDir, chunkPath),
// 指定位置创建可写流
fse.createWriteStream(targetPath, {
start: index * splitSize
})
);
})
);
// 合并后删除保存切片的目录
fse.rmdirSync(chunkDir);
// 返回文件地址给前端
ctx.success({ url: `${domain}:${port}/upload/${fileHash}${ext}` })
}
// 读流 -> 写流
const pipeStream = (path, writeStream) =>
new Promise((resolve) => {
// 创建可读流
const readStream = fse.createReadStream(path);
// 监听读取完成
readStream.on('end', () => {
// 删除 chunk 文件
fse.unlinkSync(path);
resolve();
});
readStream.pipe(writeStream);
});
# 客户端部分
技术选型:Vue3 + Axios + Vite
# 配置请求Api
// 导入配置的 axios 实例
import request from "/@/service/request";
// 上传切片接口
export const uploadChunk = (data, { onUploadProgress, cancelToken }) => request({
url: "/upload/chunk",
// 文件上传的请求头
header: { "Content-Type": "multipart/form-data" },
// 监控上传进度的回调
onUploadProgress,
// 配置取消请求令牌
cancelToken,
timeout: 1000 * 60 * 5,
method: "post",
data
});
// 合并切片接口
export const mergeChunk = (data) => request({
url: "/upload/merge",
method: "post",
data
});
// 上传切片前的验证接口
export const uploadVerify = (data) => request({
url: "/upload/verify",
method: "post",
data
});
# 前端状态
// 文件切片大小 2M
const defaultChunkSize = 1024 * 1024 * 2;
// 上传的文件
const uploadFile = ref(null);
// 切片数组
const chunks = ref([]);
// 文件哈希
const hash = ref("");
// 取消请求源 数组
const sources = ref([]);
// 暂停
const pasue = ref(false);
// 预览地址
const url = ref("");
# 文件切片
点击上传按钮时,先调用 handleFileSlice
方法将文件切片。具体实现过程是:设置默认单个切片文件的大小,通过while循环和slice方法将文件进行切割,并将生成的一个个 chunk 切片数据放到数组中:
// 开始切片
async function startSlice() {
// 切片文件是否存在
if (!isEmpty(chunks.value)) {
return ElMessage.warning("切片已存在");
}
// 进行切片
const _chunks = handleFileSlice(uploadFile.value, defaultChunkSize);
// 计算哈希
hash.value = await calculateHash(_chunks);
// 切片属性增强
chunks.value = enhanceChunks(_chunks, {
file: uploadFile.value,
hash: hash.value,
chunkSize: defaultChunkSize
});
}
// utils/index.js
export function handleFileSlice(file, chunkSize) {
// 切片数组
const chunks = [];
// 切片开始位置
let start = 0;
// 循环切片
while (start < file.size) {
// 切片结束位置
const end = Math.min(start + chunkSize, file.size);
// 切片
const chunk = file.slice(start, end);
// 添加切片
chunks.push({ chunk });
// 前进
start += chunkSize;
}
// 返回切片数组
return chunks;
}
# 生成hash(spark-md5)
无论是客户端还是服务端,都要用到文件和切片的 hash,生成 hash 最简单的方法是 文件名 + 切片下标,但是如果文件名一旦修改,生成的 hash 就会失效。事实上只要文件内容不变, hash 就不应该变化,所以我们根据文件内容生成 hash。
这里我们选用 spark-md5
库,它可以根据文件内容计算出文件的hash值。
如果上传的文件过大时,读取文件内容计算hash非常耗时,并且会引起 UI 阻塞,导致页面假死,所以我们使用 web-worker
在worker 线程计算 hash,这样仍可以在主界面正常做交互。具体做法如下:
// utils/index.js
const worker = new Worker(new URL("./worker.js", import.meta.url), { type: "module" });
// 计算文件哈希
export function calculateHash(chunks) {
return new Promise((resolve, reject) => {
// 发送消息
worker.postMessage({ chunks });
// 接收消息
worker.onmessage = (e) => {
// 哈希
const { hash } = e.data;
if (hash) {
// 哈希计算完成
resolve(hash);
}
};
// 错误处理
worker.onerror = reject;
});
}
// utils/worker.js
import SparkMD5 from "spark-md5";
// 接受主进程发送过来的数据
self.onmessage = function (e) {
const { chunks } = e.data;
// 计算整个文件哈希
getFileHash(chunks)
.then((hash) => {
// 发送哈希值给主进程
self.postMessage({ hash });
})
.catch((error) => {
// 发送错误信息给主进程
self.postMessage({ error });
});
};
// 计算整个文件哈希
async function getFileHash(chunks) {
// 创建 SparkMD5 对象
const spark = new SparkMD5.ArrayBuffer();
// 读取每个 chunk 内容
const promises = chunks.map(({ chunk }) => {
return getChunkContent(chunk);
});
try {
// 等待所有 chunk 内容读取完成
const contents = await Promise.all(promises);
// 将所有 chunk 内容添加到 spark 对象中
contents.forEach((content) => {
spark.append(content);
});
// 计算哈希值
const hash = spark.end();
// 返回哈希值
return hash;
} catch (error) {
console.log(error);
}
}
// 读取 chunk 内容
function getChunkContent(chunk) {
return new Promise((resolve, reject) => {
// 创建 FileReader 对象
const reader = new FileReader();
// 读取文件内容
reader.readAsArrayBuffer(chunk);
// 读取成功后的回调
reader.onload = function (e) {
resolve(e.target.result);
};
// 读取失败后的回调
reader.onerror = function () {
reject(reader.error);
reader.abort();
};
});
}
# 收集切片信息
生成文件切片后,将计算出来的hash、切片索引、文件个数、当前切片的hash、切片内容等信息都收集起来,发送给服务端。上传索引的目的是为了让后端可以知道当前切片是第几个切片,方便服务端合并切片。
// 增强 chunks 的信息
function enhanceChunks(chunks, meta) {
// 元数据
const { hash, file, chunkSize } = meta;
// 给切片对象添加属性
return chunks.map(({ chunk }, index) => {
return {
// 1. 文件哈希
fileHash: hash,
// 2. 切片索引
index,
// 3. 切片总数
total: chunks.length,
// 4. 文件总大小
totalSize: file.size,
// 5. 当前切片的哈希
hash: `${hash}-${index}`,
// 6. 切片对象
chunk,
// 7. 切片大小
size: chunk.size,
// 8. 分割大小
splitSize: chunkSize,
// 9. 文件名
fileName: file.name,
// 10. 上传进度
percentage: 0
};
});
}
# 创建切片请求
// 切片请求数组
function getChunkReqs(chunks) {
return chunks.map((enhancedChunk) => {
// chunk 属性
const { fileHash, hash, chunk } = enhancedChunk;
// 创建 FormData 对象
const formData = new FormData();
// 文件哈希
formData.append("fileHash", fileHash);
// 切片哈希
formData.append("hash", hash);
// 切片
formData.append("chunk", chunk);
// 取消令牌源
const source = axios.CancelToken.source();
// 收集取消令牌源
sources.value.push(source);
// 上传请求
return () => service.comm.uploadChunk(formData, {
// 取消令牌
cancelToken: source.token,
// 监听上传进度
onUploadProgress: (progressEvent) => {
if (progressEvent.lengthComputable) {
// 上传进度
const progress = floor((progressEvent.loaded / progressEvent.total) * 100 | 0);
// 更新进度
enhancedChunk.percentage = progress;
}
}
}).then(() => {
// 上传成功,查找请求成功的<取消令牌源>的索引
const sourceIndex = sources.value.findIndex((item) => item === source);
// 移除请求成功的<取消令牌源>
sources.value.splice(sourceIndex, 1);
});
});
}
# 上传切片
使用上面创建切片请求的方法,创建好一个执行切片请求的函数数组,调用 Promise.all
并发上传所有的切片。
// 开始上传
async function startUpload() {
const promises = getChunkReqs(chunks.value)
// 全部上传
await Promise.all(promises);
// 合并切片
const { url: fileURL } = await service.comm.mergeChunk({
fileHash: hash.value,
fileName: uploadFile.value.name,
splitSize: defaultChunkSize,
total: chunks.value.length
});
url.value = fileURL;
ElMessage.success("上传成功");
}
# 文件秒传
需要在开始上传前,调文件验证接口,验证文件是否已经上传过。
// 验证文件是否已经上传过
function verify() {
return service.comm
.uploadVerify({
fileHash: hash.value,
fileName: uploadFile.value.name
});
}
// 开始上传
async function startUpload() {
// 验证文件是否已经上传过
const { shouldUpload, url: fileURL } = await verify();
// 文件秒传
if (!shouldUpload) {
// 直接修改每个 chunk 的上传进度为100
changePercentage(chunks.value, get100());
url.value = fileURL;
ElMessage.success("上传成功");
return;
}
// ...
}
# 断点续传
文件验证接口会返回一个 uploadedList,根据这个数据判断需要过滤哪些切片,即续传剩下的切片。
// 开始上传
async function startUpload() {
// 验证文件是否已经上传过
const { shouldUpload, url: fileURL, uploadedList } = await verify();
let promises = [];
// 文件秒传
if (!shouldUpload) {
// ...
} else { // 断点续传
// 存在上传的切片
if (!isEmpty(uploadedList)) {
const unUploaded = [];
// 过滤出未上传的切片
chunks.value.forEach((chunk) => {
if (uploadedList.includes(chunk.hash)) {
// 这里需要将已上传的切片进度重置为100%,保证上传进度的准确性(有些切片上传上去了,但是请求取消了,导致上传进度监听没有被及时触发,从而导致切片上传进度没有更新,例如最终进度可能停留在73%,但实际切片已上传完成了)
chunk.percentage = get100();
} else {
// 未上传的切片
unUploaded.push(chunk);
}
});
// 未上传切片请求
promises = getChunkReqs(unUploaded);
} else { // 从头开始上传
// ...
}
// 全部上传
await Promise.all(promises);
// ...
}
}
# 并发控制
我们知道 http1.1 一个域名最多可以建立 6 个 TCP 连接,如果同时上传这么多切片的话,会导致网络阻塞,以至于其他的请求无法被处理。我们需要保证切片上传的请求数量不能同时占满 6 个 TCP 连接。
// utils/index.js
// 并发控制器
export function taskController(list, max = 4, cb, isAbort) {
let runIdx = 0;
let finished = 0;
const run = () => {
while(max && runIdx < list.length) {
// 中断(引用值可以保证取到变化后的布尔值)
if (isAbort.value) return;
const task = list[runIdx];
runIdx++;
if (task) {
max--;
task().finally(() => {
max++;
finished++;
if (finished === list.length) {
cb();
} else {
run();
}
});
}
}
};
run();
}
加入并发控制器后的上传函数。
// 开始上传
async function startUpload() {
// ...
// 并发控制上传
taskController(promises, 4, async () => {
// 合并文件
const { url: fileURL } = await service.comm.mergeChunk({
fileHash: hash.value,
fileName: uploadFile.value.name,
splitSize: defaultChunkSize,
total: chunks.value.length
});
url.value = fileURL;
ElMessage.success("上传成功");
}, pasue);
}
# 暂停上传
我们在创建切片请求时,创建了取消令牌源 const source = axios.CancelToken.source()
,并且收集了取消令牌源 sources.value.push(source)
,针对每一个切片请求都配置了 cancelToken: source.token
。
// 切片请求数组
function getChunkReqs(chunks) {
return chunks.map((enhancedChunk) => {
// ...
// 取消令牌源
const source = axios.CancelToken.source();
// 收集取消令牌源
sources.value.push(source);
// 上传请求
return () => service.comm.uploadChunk(formData, {
// 取消令牌
cancelToken: source.token,
// ...
}).then(() => {
// 上传成功,查找请求成功的<取消令牌源>的索引
const sourceIndex = sources.value.findIndex((item) => item === source);
// 移除请求成功的<取消令牌源>
sources.value.splice(sourceIndex, 1);
});
});
}
所以在修改为暂停状态时,我们直接取消正在进行的请求即可。
// 暂停上传
function handlePause() {
// 切换暂停状态
pasue.value = !pasue.value;
if (pasue.value) {
// 暂停
sources.value.forEach((source) => source.cancel());
sources.value = [];
} else {
// 继续
startUpload();
}
}
# 上传进度
前面我们在创建切片请求时,配置了上传进度监听的回调函数,我们根据监听触发更新了每个 chunk 的进度。
function getChunkReqs(chunks) {
return chunks.map((enhancedChunk) => {
// ...
// 上传请求
return () => service.comm.uploadChunk(formData, {
// ...
// 监听上传进度
onUploadProgress: (progressEvent) => {
if (progressEvent.lengthComputable) {
// 上传进度
const progress = floor((progressEvent.loaded / progressEvent.total) * 100 | 0);
// 更新进度
enhancedChunk.percentage = progress;
}
}
}).then(() => {
// ...
});
});
}
我们已经知道了每个切片上传的进度,所以可以得到每个切片已上传的字节数,即:切片大小 * 上传进度。将这些字节数累计加起来,我们可以得到总的已上传的字节数,所以总的上传进度为:已上传的字节数 / 文件的大小。
在这里 chunks
切片列表是一个 ref
数组,所以我们可以使用 Vue 中的计算属性来实现总进度的计算。
// 判断一下 chunks 是否被增强(即计算了 fileHash 以后),增强过后的 chunk 才会被额外添加的属性,例如下面计算需要的 percentage 进度属性
const isEnhanced = computed(() => {
return !isEmpty(chunks.value) && !isEmpty(chunks.value[0].fileHash);
});
// 总进度
const totalProgress = computed(() => {
if (isEnhanced.value) {
const loaded = chunks.value
.map(({ size, percentage }) => size * percentage)
.reduce((prev, curr) => prev + curr, 0);
// 总进度
return floor(loaded / uploadFile.value.size);
}
return 0;
});