lastModified: 2024/08/01
背景
在公司某个项目和某个盆友的面试中, 遇到了一个关于大文件上传的问题.
单纯的上传文件本身是非常简单的, 但是对于大文件来说, 因为上传的时间较长, 过程中较容易出现一些异常状态, 导致上传不完整(丢包)或者其中的切片上传失败.
故需要考虑以下问题:
- 如何切片上传大文件. 一次上传过大的文件, 如果中途失败, 就需要从头开始重新上传, 所以需要将大文件切成多个切片.
- 如何验证文件是否完整. 文件传输过程中, 有可能因为网络丢包/服务出现错误/浏览器中断崩溃等异常情况, 导致上传不完整, 所以需要验证文件是否完整.
- 如何进行重传. 发现某个切片未成功或不完整, 需要重新上传.
- 如何进行断点续传. 如果大文件上传中途, 用户或者程序崩溃, 需要继续上传未上传成功的切片, 其实和重传类似.
基本思路
前端
- 根据文件大小, 将文件分割成N个切片, 切片大小可以自定义.
- 对切片进行md5或者其他哈希算法的计算哈希值, 用来给后端进行切片校验.
- 对切片, 切片哈希值, 切片顺序, 切片大小等信息进行上传, 并记录上传成功的切片.
- 对于上传失败的情况, 重试N次.
- 在上传完成后, 调用服务端接口, 进行文件合并.
后端
- 接收切片和哈希值.
- 根据哈希值判断切片是否完整.
- 如果切片完整, 将切片写入文件.
- 如果切片不完整, 返回错误信息.
- 等待前端调用合并接口.
- 根据顺序合并切片, 生成文件.
- 删除切片.
代码实现
前端
index.html
html
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Document</title>
</head>
<body>
<script type="module" src="./main.ts"></script>
<input type="file" id="file" />
<p id="time"></p>
</body>
</html>
main.ts
typescript
import SparkMD5 from 'spark-md5';
import { readAsArrayBuffer, sliceFile } from './helper.ts';
// import { md5 } from 'hash-wasm'; 或者 hash-wasm
const chunkSize = 4 * 1024 * 1024;
const file$ = document.getElementById('file') as HTMLInputElement;
file$.addEventListener('change', async (e) => {
const file = file$.files?.[0];
if (!file) return;
const chunks = sliceFile(file, chunkSize);
const startTime = performance.now();
console.group();
for (const chunkIndex in chunks) {
const chunk = chunks[chunkIndex];
const arrayBuffer = await readAsArrayBuffer(chunk);
const spark = new SparkMD5.ArrayBuffer();
const chunkHash = await SparkMD5.ArrayBuffer.hash(arrayBuffer, true);
}
console.groupEnd();
const endTime = performance.now();
document.getElementById('time')!.innerHTML = `总耗时: ${endTime - startTime}ms`;
});
helper.ts
typescript
export function sliceFile(file: File, chunkSize: number) {
const chunks: Blob[] = [];
let start = 0;
while (start < file.size) {
chunks.push(file.slice(start, start + chunkSize));
start += chunkSize;
}
return chunks;
}
export function readAsArrayBuffer(blob: Blob) {
const fr = new FileReader();
return new Promise<ArrayBuffer>((resolve, reject) => {
fr.onload = () => {
resolve(fr.result as ArrayBuffer);
};
fr.readAsArrayBuffer(blob);
});
}
以上是切片文件及其哈希值的计算. 其中使用了spark-md5库用来计算文件的md5
.
这里使用了 hash-wasm 库, 该库使用C
编写, 然后转成wasm
使用.
对479MB
的文件切成119份
, 单hash
部分的计算, hash-wasm比spark-md5快2x
左右(单chunk
, hash-wasm 2+ms, spark-md5 4+ms).
加上读取文件的readAsArrayBuffer
部分, 总耗时在 3 - 5s左右.
符合hash-wasm的测试结果. 当然, 不同的客户机结果不一定相同. 而且hash-wasm体积上比spark-md5大, 在需要精细大小的场景, 需要考虑使用.
问题
以上代码虽然能够完成切片和计算, 但是计算过程中, 页面就直接卡死了. 因此就需要一个很少使用的功能: web worker.
web worker是js
的多线程实现方式, 可以在js
中开一个线程, 在线程中执行计算, 主线程可以继续执行其他任务.
main.ts
typescript
import { readAsArrayBuffer, sliceFile } from './helper.ts';
import { WorkerPool } from './worker-pool.ts';
const chunkSize = 4 * 1024 * 1024;
const file$ = document.getElementById('file') as HTMLInputElement;
const pool = new WorkerPool<ArrayBuffer, string>({
maxRunning: navigator.hardwareConcurrency || 4,
url: defineWorkerUrl('./worker.ts'),
});
file$.addEventListener('change', async (e) => {
const file = file$.files?.[0];
if (!file) return;
const chunks = sliceFile(file, chunkSize);
let finished = 0;
const startTime = performance.now();
for (const chunkIndex in chunks) {
const chunk = chunks[chunkIndex];
readAsArrayBuffer(chunk).then((buffer) => {
pool.enqueue({ chunk: buffer }).then((hash) => {
finished++;
if (finished === chunks.length) {
const endTime = performance.now();
document.getElementById('time')!.innerHTML = `总耗时: ${endTime - startTime}ms`;
pool.close();
}
});
});
}
});
worker-pool.ts
typescript
export class WorkerPool<T, R> {
private workers: Worker[] = [];
private queue: Array<{ resolve: (result: R) => void; reject: (err: unknown) => void; data: T }> = [];
private maxRunning = 4;
constructor({ maxRunning, url }: { maxRunning?: number; url: string }) {
this.maxRunning = maxRunning || 4;
for (let i = 0; i < this.maxRunning; i++) {
const worker = new Worker(url, {
type: 'module',
});
worker.postMessage({ type: 'validate' });
this.workers.push(worker);
}
}
async run() {
if (this.queue.length === 0) {
return;
}
if (this.workers.length === 0) {
return;
}
while (this.queue.length > 0 && this.workers.length > 0) {
const first = this.queue.shift();
if (!first) {
return;
}
const { data, resolve, reject } = first;
const worker = this.workers.shift();
if (!worker) {
return;
}
worker.onerror = (e) => {
worker.onmessage = null;
worker.onerror = null;
reject(e);
this.run();
};
worker.onmessage = (e) => {
worker.onmessage = null;
worker.onerror = null;
resolve(e.data);
this.workers.push(worker);
this.run();
};
worker.postMessage(data);
}
}
async enqueue(data: T) {
return new Promise<R>((resolve, reject) => {
this.queue.push({ resolve, reject, data });
this.run();
});
}
async close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}
worker.ts
typescript
import { md5 } from 'hash-wasm';
self.addEventListener('message', async (e) => {
const { type, chunk } = e.data;
if (type === 'validate') {
console.log('validate');
return;
}
md5(new Uint8Array(chunk)).then((hash) => {
self.postMessage({ hash });
});
});
以上是通过创建一个WorkerPool
, 将任务放入队列, 然后从队列中取出任务, 并且在Worker
中执行计算.
本机CPU AMD Ryzen 7 5800H with Radeon Graphics
479MB
整个过程运算用了 1.4s 上下, 效率提升的非常大.
上面的步骤都是通过主线程读取ArrayBuffer
, 然后传递给Worker
计算, 这是因为向Worker
传递数据时, 只有Blob``ArrayBuffer``TypedArray``DataView
类型是共享内存的, 其他类型都是需要复制给 Worker
线程.
所以也可以改成下面这样.
main.ts
typescript
import { WorkerPool } from './worker-pool.ts';
const chunkSize = 4 * 1024 * 1024;
const file$ = document.getElementById('file') as HTMLInputElement;
const pool = new WorkerPool<{ data: Blob }, string>({
maxRunning: navigator.hardwareConcurrency || 4,
url: defineWorkerUrl('./worker.ts'),
});
file$.addEventListener('change', async (e) => {
const file = file$.files?.[0];
if (!file) return;
let finished = 0;
const startTime = performance.now();
const length = Math.ceil(file.size / chunkSize);
for (let i = 0; i < length; i++) {
// 直接传递blob
pool
.enqueue({
data: file.slice(i * chunkSize, (i + 1) * chunkSize),
})
.then((hash) => {
finished++;
if (finished === length) {
const endTime = performance.now();
document.getElementById('time')!.innerHTML = `总耗时: ${endTime - startTime}ms`;
pool.close();
}
});
}
});
worker.ts
typescript
import { md5 } from 'hash-wasm';
function readAsArrayBuffer(blob: Blob) {
const fr = new FileReader();
return new Promise<ArrayBuffer>((resolve, reject) => {
fr.onload = () => {
resolve(fr.result as ArrayBuffer);
};
fr.readAsArrayBuffer(blob);
});
}
self.addEventListener('message', async (e) => {
const { type, data } = e.data;
if (type === 'validate') {
console.log('validate');
return;
}
readAsArrayBuffer(data).then((chunk) => {
md5(new Uint8Array(chunk)).then((hash) => {
self.postMessage({ hash });
});
});
});
结尾
到这里文件的切片和计算MD5就结束了, 然后就还有上传以及重试, 断点续传, 放在下一篇再说.