续
上一篇写了了切片上传大文件, 并且计算哈希值用于文件校验, 这一篇将会实现断点续传, 以及上传失败的情况, 重传等.
断点续传/重传
断点续传是指, 在上传过程中, 上传失败或者用户关闭了浏览器, 那么下次再上传的时候, 可以从上次上传的位置继续上传, 而不是对整个重新上传.
大文件切片上传中, 重传和断点续传的实现方式都是一样的, 都是使用切片上传, 然后计算哈希值, 然后进行校验, 如果校验失败, 那么就重新上传.
大致过程
- 首先需要知道上次上传的位置, 从服务器获取的已经上次的位置.
- 然后对再次上传的文件进行切片, 然后计算哈希值.
- 然后对比服务器已经上传的文件的哈希值, 如果一致, 那么就跳过这个切片, 如果不一致, 那么就重新上传这个切片.
前端
index.html
html
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Document</title>
</head>
<body>
<script type="module" src="./main.ts"></script>
<input type="file" id="file" />
<p id="progress"></p>
</body>
</html>
main.ts
typescript
import { fileStatus, upload } from './apis.ts';
import { AsyncQueue } from './async-queue.ts';
import { fileToFormData, retry } from './helper.ts';
const file$ = document.getElementById('file') as HTMLInputElement;
const progress$ = document.getElementById('progress') as HTMLProgressElement;
file$.addEventListener('change', async (e) => {
const file = file$.files?.[0];
if (!file) return;
const fds = await fileToFormData(file);
const uploads = await unuploaded(fds, file.name);
const asyncPool = new AsyncQueue({ maxRunning: 4 });
let finished = 0;
for (let i = 0; i < uploads.length; i++) {
const fd = uploads[i];
const req = retry({ promise: () => upload(fd), retry: 3 });
asyncPool.enqueue(req).then((chunks) => {
finished++;
progress$.innerHTML = `已上传: ${finished}/待上传: ${uploads.length - finished}/总数: ${uploads.length}`;
if (finished === uploads.length) {
console.log('finished');
merge(file.name);
}
});
}
});
const merge = async (filename: string) => {
const res = await fetch('http://localhost:3001/merge?filename=' + filename);
const data = await res.json();
console.log(data);
};
const unuploaded = async (hashs: FormData[], filename: string) => {
const uploaded = await fileStatus(filename);
const { list } = uploaded;
const un = hashs.filter((fd) => {
const chunk = fd.get('chunk') as string;
const hash = fd.get('hash') as string;
return !list.find((item) => item.chunk === chunk && item.hash === hash);
});
return un;
};
retry
方法是用来对类Promise
方法的重试, fileToFormData
将文件切片计算包装成FormData
对象, 用来在后面进行传输.
helper.ts
typescript
import { WorkerPool } from './worker-pool.ts';
const chunkSize = 4 * 1024 * 1024;
const pool = new WorkerPool<{ data: Blob }, { hash: string }>({
maxRunning: navigator.hardwareConcurrency || 4,
url: defineWorkerUrl('./worker.ts'),
});
export const retry = <T>({ promise, retry }: { promise: () => PromiseLike<T>; retry: number }) => {
return async function () {
let retryCount = 0;
const _retry = Math.max(retry, 0);
while (retryCount < _retry) {
try {
return await promise();
} catch (err) {
retryCount++;
}
}
throw new Error('retry failed');
};
};
export const fileToFormData = (file: File) => {
let finished = 0;
const length = Math.ceil(file.size / chunkSize);
return new Promise<FormData[]>((resolve, reject) => {
const chunks: FormData[] = [];
for (let i = 0; i < length; i++) {
const formData = new FormData();
const blob = file.slice(i * chunkSize, (i + 1) * chunkSize);
// 直接传递blob
pool
.enqueue({
data: blob,
})
.then(({ hash }) => {
finished++;
formData.append('file', blob, `${file.name}-${i}`);
formData.append('name', file.name);
formData.append('hash', hash);
formData.append('length', blob.size.toString());
formData.append('chunk', i.toString());
chunks.push(formData);
if (finished === length) {
resolve(chunks);
pool.close();
}
})
.catch((err) => {
reject(err);
});
}
});
};
对文件切片计算的Web Worker
worker.ts
typescript
import { md5 } from 'hash-wasm';
function readAsArrayBuffer(blob: Blob) {
const fr = new FileReader();
return new Promise<ArrayBuffer>((resolve, reject) => {
fr.onload = () => {
resolve(fr.result as ArrayBuffer);
};
fr.readAsArrayBuffer(blob);
});
}
self.addEventListener('message', async (e) => {
const { type, data } = e.data;
if (type === 'validate') {
console.log('validate');
return;
}
readAsArrayBuffer(data).then((chunk) => {
md5(new Uint8Array(chunk)).then((hash) => {
self.postMessage({ hash });
});
});
});
WorkerPool
, 用于管理Web Worker
的创建和销毁.
worker-pool.ts
typescript
export class WorkerPool<T, R> {
private workers: Worker[] = [];
private queue: Array<{ resolve: (result: R) => void; reject: (err: unknown) => void; data: T }> = [];
private maxRunning = 4;
constructor({ maxRunning, url }: { maxRunning?: number; url: string }) {
this.maxRunning = maxRunning || 4;
for (let i = 0; i < this.maxRunning; i++) {
const worker = new Worker(url, {
type: 'module',
});
worker.postMessage({ type: 'validate' });
this.workers.push(worker);
}
}
async run() {
if (this.queue.length === 0) {
return;
}
if (this.workers.length === 0) {
return;
}
while (this.queue.length > 0 && this.workers.length > 0) {
const first = this.queue.shift();
if (!first) {
return;
}
const { data, resolve, reject } = first;
const worker = this.workers.shift();
if (!worker) {
return;
}
worker.onerror = (e) => {
worker.onmessage = null;
worker.onerror = null;
reject(e);
this.run();
};
worker.onmessage = (e) => {
worker.onmessage = null;
worker.onerror = null;
resolve(e.data);
this.workers.push(worker);
this.run();
};
worker.postMessage(data);
}
}
async enqueue(data: T) {
return new Promise<R>((resolve, reject) => {
this.queue.push({ resolve, reject, data });
this.run();
});
}
async close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}
AsyncQueue
, 用于处理并发请求和队列.
async-queue.ts
typescript
export class AsyncQueue<DataType, Type extends () => PromiseLike<DataType>> {
private queue: Array<{ resolve: (result: DataType) => void; reject: (err: unknown) => void; promise: Type }> = [];
private running = 0;
private maxRunning = 4;
constructor({ maxRunning }: { maxRunning?: number }) {
this.maxRunning = maxRunning || 4;
}
async enqueue(promise: Type) {
return new Promise((resolve, reject) => {
this.queue.push({ resolve, reject, promise });
this.run();
});
}
async run() {
while (this.queue.length > 0 && this.running < this.maxRunning) {
this.running++;
const { resolve, reject, promise } = this.queue.shift()!;
try {
const result = await promise();
resolve(result);
} catch (err) {
reject(err);
} finally {
this.running--;
this.run();
}
}
}
}
后端
由于基本只会Nodejs, 所以这里使用Nodejs的express和multer来处理上传的文件.
app.mjs
javascript
import cors from 'cors';
import express from 'express';
import { createReadStream, createWriteStream, existsSync, mkdirSync, rmSync, unlinkSync, writeFileSync } from 'fs';
import { md5 } from 'hash-wasm';
import multer from 'multer';
import { join } from 'path';
import { fileURLToPath } from 'url';
const outputDir = fileURLToPath(new URL('./temp/output', import.meta.url));
const init = () => {
const cleanDir = () => {
// 清理临时文件
const exist = existsSync(fileURLToPath(new URL('./temp/multer', import.meta.url)));
if (exist) {
rmSync(fileURLToPath(new URL('./temp/multer', import.meta.url)), {
recursive: true,
});
}
};
cleanDir();
mkdirSync(outputDir, {
recursive: true,
});
mkdirSync(fileURLToPath(new URL('./temp/multer', import.meta.url)), {
recursive: true,
});
};
init();
const filemap = {};
const server = express();
server.use(cors());
server.use(express.json());
const upload = multer({
storage: multer.memoryStorage(),
});
const router = express.Router({
caseSensitive: true,
});
router.post('/upload', upload.single('file'), async (req, res, next) => {
const filename = req.body.name;
const fileId = await md5(filename);
const filecontent = req.file.buffer;
const filehash = await md5(filecontent);
if (filehash === req.body.hash) {
const hashlist = filemap[fileId] || (filemap[fileId] = []);
const filepath = join(fileURLToPath(new URL('./temp/multer', import.meta.url)), filehash);
writeFileSync(filepath, filecontent);
hashlist.push({
size: req.body.length,
chunk: req.body.chunk,
hash: filehash,
filepath,
});
res.json({
success: true,
hash: filehash,
});
res.end();
} else {
res.status(400);
res.end();
}
});
router.get('/merge', async (req, res, next) => {
const filename = req.query.filename;
const fileId = await md5(filename);
if (filemap[fileId]) {
const files = filemap[fileId];
const size = files.reduce((acc, cur) => {
return acc + parseInt(cur.size);
}, 0);
console.log(size / 1024 / 1024, 'MB');
let offset = 0;
files
.sort((a, b) => {
return a.chunk - b.chunk;
})
.forEach(({ chunk, size, hash, filepath }) => {
chunk = parseInt(chunk);
size = parseInt(size);
const ws = createWriteStream(join(outputDir, filename), {
start: offset,
});
offset += size;
const file = createReadStream(filepath);
file.pipe(ws);
file.on('end', () => {
unlinkSync(filepath);
});
});
}
});
router.get('/status', async (req, res, next) => {
const filename = req.query.filename;
const fileId = await md5(filename);
res.json({
success: true,
list: filemap[fileId] || [],
});
res.end();
});
server.use(router);
server.listen(3001, () => {
console.log('server is running on port 3001');
});
实际业务中使用要比这些复杂一些, 比如文件的存储和验证, 这里为了方便只是做了简短的实现. 之后还有包括如果重试多次失败的处理, 临时文件的清理等.
至于其中为什么临时文件使用了内存的方式接收, 一方面是为了减少磁盘的IO, 另一方面是为了方便md5
的计算.
这需要根据不同的场景进行选择.而对于已经上传的文件切片, 可以使用Redis或者存入数据库, 防止程序崩溃后丢失数据.
总结
整个过程中使用了Web Worker
来并行处理切片计算, 防止阻塞主线程导致的用户界面阻塞.
使用hash-wasm
更加快速的计算文件的哈希值.
使用了AsyncQueue
来处理并发请求和队列, 防止请求过多导致服务器压力过大, 减少服务器压力.
同时对上传失败的情况, 进行了重试, 以及对断点续传的简单实现, 虽然代码看着很长, 但是实际的过程并不复杂, 不过还是需要根据具体的业务场景进行修改.
整个上传和校验其实是可以在上一篇文章就全部写完的, 不过为了调试monaco-editor
花费了太多时间, 导致在写这个的时候有点缓慢.
对于后端部分, 我不是非常熟悉, 所以只是简单的写了一个栗子, 毕竟实际业务中很少有使用(基本没有)Nodejs去写后端的.
以上是大文件的上传与校验的实现, 对于上传失败的情况, 如何进行重传, 如何进行断点续传等, 本文章就此算是暂时结束了.