Wenzi

JavaScript 中的 Promise 异步并发控制

蚊子前端博客
发布于 2022/08/03 14:26
前端开发或Node.js开发中,经常会遇到并发请求的场景,针对这些场景,我们怎么进行限制呢?

我们在开发的过程中,经常会遇到一些并发的情况,而如果并发量比较大时,需要进行限制。比如可能出现的场景:

  • 传入多个异步请求,但最多只能触发 limit 个请求;额外的功能,所有的请求都执行完后,返回成功;
  • 生成一个新函数,调用多次发起请求,但有并发限制;
  • 多个 Promise 按顺序执行,这个其实可以认为并发数限制是 1,但我们可以用另一种方式来实现;
  • 1. 多个异步请求的并发限制 #

    有一系列的异步请求,比如爬虫抓取、后端接口请求、图片加载等场景,需要限制下并发请求的数量。这里要考虑下结果的处理,是每个请求完成后就可以了,还是要收集到所有的结果,类似于 Promise.all() 的效果。

    1.1 递归的方式 #

    思路大概是:首先发起 limit 个的请求,哪个完成了就递归发起下一个异步请求,所有的请求都完成后,则整体返回一个 Promise。如果不需要收集所有的数据,则不用写这个 Promise。

    * 递归方式实现异步并发控制 * @param arr 所有的数据集合,如请求的url等 * @param limit 限制并发的个数 * @param iteratorFn 对每个数据的处理 const promiseLimitByDepth = <T> ( arr : T[], limit : number , iteratorFn : (item: T, urls?: T[]) => Promise < any > ) => { const { length } = arr; const result = []; let i = 0 ; let finishedNum = 0 ; // 完成的个数 // 若不考虑最后数据的收集,可以不写这个Promise return new Promise ( ( resolve ) => { const request = async ( index : number ) => { kk. show (arr[index], index); const p = Promise . resolve (). then ( iteratorFn (arr[index])); const res = await p; result[index] = res; finishedNum++; if (finishedNum === length) { resolve (result); if (i < length) { request (i); for (; i < limit; i++) { request (i);

    这里我们用 setTimeout 来模拟下异步请求。

    const newFetch = (delay) => {
      return new Promise((resolve) => {
        setTimeout(() => {
          resolve(delay);
        }, delay);
    

    调用方式:

    promiseLimitByDepth([2000, 1000, 3000, 2500, 1200, 5000, 3500, 2300], 2, (num) => {
      return newFetch(num);
    }).then(console.log);
    

    您可以查看 demo:递归实现的异步并发控制。在 demo 中可以看到,控制着同一时刻的请求个数,某一个请求结束后,再启动下一个请求。

    上面的代码还可以用来控制图片的加载:

    const arr = [];
    let i = 10;
    while (i--) {
      arr.push(`https://www.xiabingbao.com/upload/368662d904df5cbe4.jpg?t=${Math.random()}`);
    promiseLimitByDepth(arr, 2, (url) => {
      return new Promise((resolve) => {
        const img = new Image();
        img.src = url;
        // 这里暂时只考虑成功的情况
        img.onload = resolve;
    }).then(console.log);
    

    我们从图片加载的瀑布流里可以看到,每次最多只加载 2 张图片:

    1.2 循环的方式 #

    使用循环的方式,肯定得用到 async-await 了。

    const promiseLimitByCycle = async <T>(arr: T[], limit: number, iteratorFn: (item: T, arr?: T[]) => Promise<any>) => {
      const { length } = arr;
      const result: Promise<any>[] = [];
      const runningList: Promise<any>[] = []; // 正在执行的异步任务
      for (const url of arr) {
        const p = Promise.resolve().then(iteratorFn(url)); // 转为promise
        result.push(p);
        // 若limit大于length,则不再进行控制,直接用Promise.all()即可
        if (limit <= length) {
          const e = p.then(() => {
            // promise p 执行完毕时,会触发这个,这个是后执行的,先执行的是下面的push操作
            const index = runningList.indexOf(e);
            // 当p执行成功的时候,从runningList中删除该Promise,同时也会触发下面的Promise.race()
            return runningList.splice(index, 1);
          // promise e 是 p执行的过程,若p执行成功,则e.value就是p.then()里的return的值
          runningList.push(e);
          // 超过限制,则先存储起来
          if (runningList.length >= limit) {
            // 哪个先完成,都会触发race,然后进入下一层循环
            await Promise.race(runningList);
      // 所有的都完成了,才最后返回结果
      return Promise.all(result);
    

    上面有段代码比较绕,我们再单独拿出来讲解下:

    // Promise是可以链式调用的,then()本身返回的就是Promise
    // 因此e是p.then()的返回值,e自己也是Promise
    // e.then()什么时候执行,取决于p.then()什么执行,又再取决于p什么时候执行
    // const e = p.then()是同步执行的,因此先得到的变量e,再执行的p.then()里的操作
    // 当p执行完成后,则就执行p.then()里的操作,找出e所在的位置并进行删除
    // e.then()回调里的值据说splice()的返回值,其实就是e,但这里我们并不用关心他的返回值是什么
    const e = p.then(() => {
      const index = runningList.indexOf(e);
      return runningList.splice(index, 1);
    runningList.push(e);
    // 这里监听的是runningList,即里面的某个e完成了,就会触发Promise.race()
    // 若e完成了,必然p也是完成了的
    await Promise.race(runningList);
    

    这里充分用到了Promise.all()Promise.race()的特性,来实现的。

    2. 新函数的并发限制 #

    我们来简单描述下题目:创建返回一个新函数,在调用这个新函数产生异步请求时,有并发的限制。

    // 创建返回一个新函数,在调用这个新函数产生异步请求时,限制并发的数量
    // 问,如何实现这个create方法?
    const createFetch = (limit) => {
      return () => {};
    const newFetch = createFetch(2); // 最多只能并发2个
    newFetch(url);
    newFetch(url);
    newFetch(url);
    newFetch(url);
    

    这里参考了 npm 包 p-queue 的源码,并对其进行了精简。新函数 newFetch() 每次都是要返回一个 Promise 的,就看什么时候执行 resolve(),并启动下一个。

    const createFetch = (limit) => {
      let runningNum = 0; // 当前正在进行的数量
      const queue = []; // 所有将要执行的任务队列
      // 尝试启动下一个任务
      const tryNextOne = () => {
        if (queue.length === 0) {
          return false;
        if (runningNum < limit) {
          // 若没有达到限制,则直接启动
          const job = queue.shift();
          if (!job) {
            return false;
          job();
          return true;
        return false;
      // 返回一个新函数,新函数里直接返回一个Promise
      return (url, iteratorFn) => {
        return new Promise((resolve) => {
          // 定义一个函数,但不立即执行
          const run = async () => {
            runningNum++; // 启动一个任务,数量+1
            const result = await Promise.resolve(iteratorFn(url));
            resolve(result);
            runningNum--; // 完成一个任务,数量-1
            tryNextOne(); // 启动下一个任务
          queue.push(run); // 将所有的任务,都推送到队列中
          tryNextOne(); // 启动队列中任务的入口
    

    我们用 sleep() 函数模拟下:

    const sleep = (delay) => {
      return new Promise((resolve) => {
        setTimeout(() => {
          resolve(delay);
        }, delay);
    const newFetch = createFetch(2);
    for (let i = 0; i < 10; i++) {
      console.log(`${i} start`);
      newFetch(i, async (i) => {
        await sleep(600 + 10 * i);
        return `${i}`;
      }).then((i) => {
        console.log(`${i} end`);
    

    3. 多个异步任务的顺序执行 #

    我们其实把上面实现的一些函数,并发数量设置为 1,就是多个异步任务的顺序执行了。不过我们这里还有一些其他的方式。

    3.1 async-wait #

    把所有的异步任务都放到数组中,然后用 async-wait 的方式来控制:

    const arr = [600, 500, 400, 700, 300, 450];
    const asyncLoop = async (arr, iteratorFn) => {
      const result = [];
      for (const item of arr) {
        console.log(`${item} start`);
        const res = await Promise.resolve(iteratorFn(item));
        console.log(`${res} end`);
        result.push(res);
      return result;
    asyncLoop(arr, (item) => {
      return sleep(item);
    

    3.2 纯 Promise #

    如果不使用 async-await,用 Promise 可以实现吗?

    Promise 是异步的,在一个同步流程中,是无法等待这个 Promise 完成的,因此这里我用递归的方式来实现的。

    const promiseLoop = (arr, iteratorFn) => {
      const result = [];
      return new Promise((allResolve) => {
        const run = (index = 0) => {
          if (index < arr.length) {
            return new Promise((resolve) => {
              const p = Promise.resolve(iteratorFn(arr[index]));
              p.then((res) => {
                console.log(res);
                result.push(res);
                resolve(res);
                if (index + 1 < arr.length) {
                  // 上一个Promise完成后,启动下一个
                  run(index + 1);
                } else {
                  // 若全部都完成了,则执行最外层的Promise
                  allResolve(result);
        run();
    

    使用方式与上面的一样:

    promiseLoop(arr, (item) => {
      return sleep(item);
    }).then(console.log);
    

    4. 同时请求,但按顺序尽快输出 #

    如并发请求一些数据,结果按照请求顺序依次输出,而且要尽可能早的输出结果。

    如 a,b,c 三个请求并发请求:

  • a 需要 200ms;
  • b 需要 100ms;
  • c 需要 300ms;
  • 即使 b 先完成,也得等着 a 完成输出结果后,b 再输出,c 稍后完成后,再输出 c 的结果。等所有的请求都执行完毕后,再整体按照顺序返回请求的结果。

    我实现的思路是在后面的请求先完成的,则将结果先存储起来,等前面的请求完成后,再一并输出。

    // 并发请求但顺序输出
    const concurrentAndSyncLog = (arr, iteratorFn) => {
      const { length } = arr;
      const list = new Array(length).fill({ fulfilled: false, value: null }); // fulfilled表示数据是否已准备好
      let showStart = 0; // 开始输出的位置
      let fulfilledNum = 0; // 完成的个数
      return new Promise((resolve) => {
        for (let i = 0; i < length; i++) {
          const p = Promise.resolve(iteratorFn(arr[i]));
          p.then((result) => {
            list[i] = { fulfilled: true, value: result };
            fulfilledNum++;
            if (i === showStart) {
              let j = showStart;
              while (j < length) {
                if (list[j].fulfilled) {
                  // 输出所有完成的数据
                  console.log(list[j].value);
                } else {
                  // 当前位置的数据还没准备好,直接停止,并设置下次输出的位置
                  showStart = j;
                  break;
            if (fulfilledNum >= length) {
              resolve(list.map((item) => item.value));
    
    concurrentAndSyncLog([200, 100, 300], sleep).then(console.log);
    // 200, 100, 300