Vivia Preview

Theme Toggle

大文件的上传与校验(下)

关于在 JavaScript 中如何切片上传大文件, 并且计算哈希值用于文件校验, 对于上传失败的情况, 如何进行重传, 如何进行断点续传等

上一篇写了了切片上传大文件, 并且计算哈希值用于文件校验, 这一篇将会实现断点续传, 以及上传失败的情况, 重传等.

断点续传/重传

断点续传是指, 在上传过程中, 上传失败或者用户关闭了浏览器, 那么下次再上传的时候, 可以从上次上传的位置继续上传, 而不是对整个重新上传.

大文件切片上传中, 重传和断点续传的实现方式都是一样的, 都是使用切片上传, 然后计算哈希值, 然后进行校验, 如果校验失败, 那么就重新上传.

大致过程

  1. 首先需要知道上次上传的位置, 从服务器获取的已经上次的位置.
  2. 然后对再次上传的文件进行切片, 然后计算哈希值.
  3. 然后对比服务器已经上传的文件的哈希值, 如果一致, 那么就跳过这个切片, 如果不一致, 那么就重新上传这个切片.

前端

index.html

html

<!doctype html>
<html lang="en">

<head>
  <meta charset="UTF-8" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Document</title>
</head>

<body>
  <script type="module" src="./main.ts"></script>
  <input type="file" id="file" />
  <p id="progress"></p>
</body>

</html>
main.ts

typescript

import { fileStatus, upload } from './apis.ts';
import { AsyncQueue } from './async-queue.ts';
import { fileToFormData, retry } from './helper.ts';

const file$ = document.getElementById('file') as HTMLInputElement;
const progress$ = document.getElementById('progress') as HTMLProgressElement;

file$.addEventListener('change', async (e) => {
  const file = file$.files?.[0];
  if (!file) return;
  const fds = await fileToFormData(file);
  const uploads = await unuploaded(fds, file.name);
  const asyncPool = new AsyncQueue({ maxRunning: 4 });
  let finished = 0;
  for (let i = 0; i < uploads.length; i++) {
    const fd = uploads[i];
    const req = retry({ promise: () => upload(fd), retry: 3 });
    asyncPool.enqueue(req).then((chunks) => {
      finished++;
      progress$.innerHTML = `已上传: ${finished}/待上传: ${uploads.length - finished}/总数: ${uploads.length}`;
      if (finished === uploads.length) {
        console.log('finished');
        merge(file.name);
      }
    });
  }
});

const merge = async (filename: string) => {
  const res = await fetch('http://localhost:3001/merge?filename=' + filename);
  const data = await res.json();
  console.log(data);
};

const unuploaded = async (hashs: FormData[], filename: string) => {
  const uploaded = await fileStatus(filename);
  const { list } = uploaded;
  const un = hashs.filter((fd) => {
    const chunk = fd.get('chunk') as string;
    const hash = fd.get('hash') as string;
    return !list.find((item) => item.chunk === chunk && item.hash === hash);
  });
  return un;
};

retry方法是用来对类Promise方法的重试, fileToFormData将文件切片计算包装成FormData对象, 用来在后面进行传输.

helper.ts

typescript

import { WorkerPool } from './worker-pool.ts';

const chunkSize = 4 * 1024 * 1024;
const pool = new WorkerPool<{ data: Blob }, { hash: string }>({
  maxRunning: navigator.hardwareConcurrency || 4,
  url: defineWorkerUrl('./worker.ts'),
});

export const retry = <T>({ promise, retry }: { promise: () => PromiseLike<T>; retry: number }) => {
  return async function () {
    let retryCount = 0;
    const _retry = Math.max(retry, 0);
    while (retryCount < _retry) {
      try {
        return await promise();
      } catch (err) {
        retryCount++;
      }
    }
    throw new Error('retry failed');
  };
};

export const fileToFormData = (file: File) => {
  let finished = 0;
  const length = Math.ceil(file.size / chunkSize);
  return new Promise<FormData[]>((resolve, reject) => {
    const chunks: FormData[] = [];
    for (let i = 0; i < length; i++) {
      const formData = new FormData();
      const blob = file.slice(i * chunkSize, (i + 1) * chunkSize);
      // 直接传递blob
      pool
        .enqueue({
          data: blob,
        })
        .then(({ hash }) => {
          finished++;
          formData.append('file', blob, `${file.name}-${i}`);
          formData.append('name', file.name);
          formData.append('hash', hash);
          formData.append('length', blob.size.toString());
          formData.append('chunk', i.toString());
          chunks.push(formData);
          if (finished === length) {
            resolve(chunks);
            pool.close();
          }
        })
        .catch((err) => {
          reject(err);
        });
    }
  });
};

对文件切片计算的Web Worker

worker.ts

typescript

import { md5 } from 'hash-wasm';
function readAsArrayBuffer(blob: Blob) {
  const fr = new FileReader();
  return new Promise<ArrayBuffer>((resolve, reject) => {
    fr.onload = () => {
      resolve(fr.result as ArrayBuffer);
    };
    fr.readAsArrayBuffer(blob);
  });
}
self.addEventListener('message', async (e) => {
  const { type, data } = e.data;
  if (type === 'validate') {
    console.log('validate');
    return;
  }
  readAsArrayBuffer(data).then((chunk) => {
    md5(new Uint8Array(chunk)).then((hash) => {
      self.postMessage({ hash });
    });
  });
});

WorkerPool, 用于管理Web Worker的创建和销毁.

worker-pool.ts

typescript

export class WorkerPool<T, R> {
  private workers: Worker[] = [];
  private queue: Array<{ resolve: (result: R) => void; reject: (err: unknown) => void; data: T }> = [];
  private maxRunning = 4;

  constructor({ maxRunning, url }: { maxRunning?: number; url: string }) {
    this.maxRunning = maxRunning || 4;
    for (let i = 0; i < this.maxRunning; i++) {
      const worker = new Worker(url, {
        type: 'module',
      });
      worker.postMessage({ type: 'validate' });
      this.workers.push(worker);
    }
  }
  async run() {
    if (this.queue.length === 0) {
      return;
    }
    if (this.workers.length === 0) {
      return;
    }
    while (this.queue.length > 0 && this.workers.length > 0) {
      const first = this.queue.shift();
      if (!first) {
        return;
      }
      const { data, resolve, reject } = first;
      const worker = this.workers.shift();
      if (!worker) {
        return;
      }
      worker.onerror = (e) => {
        worker.onmessage = null;
        worker.onerror = null;
        reject(e);
        this.run();
      };
      worker.onmessage = (e) => {
        worker.onmessage = null;
        worker.onerror = null;
        resolve(e.data);
        this.workers.push(worker);
        this.run();
      };
      worker.postMessage(data);
    }
  }

  async enqueue(data: T) {
    return new Promise<R>((resolve, reject) => {
      this.queue.push({ resolve, reject, data });
      this.run();
    });
  }

  async close() {
    for (const worker of this.workers) {
      worker.terminate();
    }
  }
}

AsyncQueue, 用于处理并发请求和队列.

async-queue.ts

typescript

export class AsyncQueue<DataType, Type extends () => PromiseLike<DataType>> {
  private queue: Array<{ resolve: (result: DataType) => void; reject: (err: unknown) => void; promise: Type }> = [];
  private running = 0;
  private maxRunning = 4;

  constructor({ maxRunning }: { maxRunning?: number }) {
    this.maxRunning = maxRunning || 4;
  }

  async enqueue(promise: Type) {
    return new Promise((resolve, reject) => {
      this.queue.push({ resolve, reject, promise });
      this.run();
    });
  }
  async run() {
    while (this.queue.length > 0 && this.running < this.maxRunning) {
      this.running++;
      const { resolve, reject, promise } = this.queue.shift()!;
      try {
        const result = await promise();
        resolve(result);
      } catch (err) {
        reject(err);
      } finally {
        this.running--;
        this.run();
      }
    }
  }
}

后端

由于基本只会Nodejs, 所以这里使用Nodejsexpressmulter来处理上传的文件.

app.mjs

javascript

import cors from 'cors';
import express from 'express';
import { createReadStream, createWriteStream, existsSync, mkdirSync, rmSync, unlinkSync, writeFileSync } from 'fs';
import { md5 } from 'hash-wasm';
import multer from 'multer';
import { join } from 'path';
import { fileURLToPath } from 'url';

const outputDir = fileURLToPath(new URL('./temp/output', import.meta.url));
const init = () => {
  const cleanDir = () => {
    // 清理临时文件
    const exist = existsSync(fileURLToPath(new URL('./temp/multer', import.meta.url)));
    if (exist) {
      rmSync(fileURLToPath(new URL('./temp/multer', import.meta.url)), {
        recursive: true,
      });
    }
  };

  cleanDir();
  mkdirSync(outputDir, {
    recursive: true,
  });
  mkdirSync(fileURLToPath(new URL('./temp/multer', import.meta.url)), {
    recursive: true,
  });
};
init();
const filemap = {};

const server = express();

server.use(cors());
server.use(express.json());

const upload = multer({
  storage: multer.memoryStorage(),
});

const router = express.Router({
  caseSensitive: true,
});

router.post('/upload', upload.single('file'), async (req, res, next) => {
  const filename = req.body.name;
  const fileId = await md5(filename);
  const filecontent = req.file.buffer;
  const filehash = await md5(filecontent);
  if (filehash === req.body.hash) {
    const hashlist = filemap[fileId] || (filemap[fileId] = []);
    const filepath = join(fileURLToPath(new URL('./temp/multer', import.meta.url)), filehash);
    writeFileSync(filepath, filecontent);
    hashlist.push({
      size: req.body.length,
      chunk: req.body.chunk,
      hash: filehash,
      filepath,
    });
    res.json({
      success: true,
      hash: filehash,
    });
    res.end();
  } else {
    res.status(400);
    res.end();
  }
});

router.get('/merge', async (req, res, next) => {
  const filename = req.query.filename;
  const fileId = await md5(filename);
  if (filemap[fileId]) {
    const files = filemap[fileId];
    const size = files.reduce((acc, cur) => {
      return acc + parseInt(cur.size);
    }, 0);
    console.log(size / 1024 / 1024, 'MB');
    let offset = 0;
    files
      .sort((a, b) => {
        return a.chunk - b.chunk;
      })
      .forEach(({ chunk, size, hash, filepath }) => {
        chunk = parseInt(chunk);
        size = parseInt(size);
        const ws = createWriteStream(join(outputDir, filename), {
          start: offset,
        });
        offset += size;
        const file = createReadStream(filepath);
        file.pipe(ws);
        file.on('end', () => {
          unlinkSync(filepath);
        });
      });
  }
});

router.get('/status', async (req, res, next) => {
  const filename = req.query.filename;
  const fileId = await md5(filename);
  res.json({
    success: true,
    list: filemap[fileId] || [],
  });
  res.end();
});

server.use(router);

server.listen(3001, () => {
  console.log('server is running on port 3001');
});

实际业务中使用要比这些复杂一些, 比如文件的存储和验证, 这里为了方便只是做了简短的实现. 之后还有包括如果重试多次失败的处理, 临时文件的清理等.

至于其中为什么临时文件使用了内存的方式接收, 一方面是为了减少磁盘的IO, 另一方面是为了方便md5的计算. 这需要根据不同的场景进行选择.而对于已经上传的文件切片, 可以使用Redis或者存入数据库, 防止程序崩溃后丢失数据.

总结

整个过程中使用了Web Worker来并行处理切片计算, 防止阻塞主线程导致的用户界面阻塞. 使用hash-wasm更加快速的计算文件的哈希值. 使用了AsyncQueue来处理并发请求和队列, 防止请求过多导致服务器压力过大, 减少服务器压力.

同时对上传失败的情况, 进行了重试, 以及对断点续传的简单实现, 虽然代码看着很长, 但是实际的过程并不复杂, 不过还是需要根据具体的业务场景进行修改.

整个上传和校验其实是可以在上一篇文章就全部写完的, 不过为了调试monaco-editor花费了太多时间, 导致在写这个的时候有点缓慢. 对于后端部分, 我不是非常熟悉, 所以只是简单的写了一个栗子, 毕竟实际业务中很少有使用(基本没有)Nodejs去写后端的.

以上是大文件的上传与校验的实现, 对于上传失败的情况, 如何进行重传, 如何进行断点续传等, 本文章就此算是暂时结束了.

© 9999 Vivia Name

Powered by Nextjs & Theme Vivia

主题完全模仿 Vivia 主题, 有些许差异, 及使用nextjs乱写

Theme Toggle