相关文章推荐
谦虚好学的脸盆  ·  2021-06-28 ...·  6 月前    · 
踢足球的茄子  ·  Amazon Live·  9 月前    · 
开朗的骆驼  ·  poi的XSSFWorkbook转SXSSF ...·  1 年前    · 

携手创作,共同成长!这是我参与「掘金日新计划 · 8 月更文挑战」的第1天, 点击查看活动详情

大家好,我是码农小余。很久不见,怪是想念。

今天给老表们分享 npm-run-all 源码实现, npm-run-all 是一个用来并行或者串行运行多个 npm 脚本的 CLI 工具。阅读完本文,你能收获到:

  • 了解整个流程概览;
  • 了解核心模块逻辑,入口分析、参数解析、任务流、任务执行等;
  • 直入主题,整个 npm-run-all 的整体执行流程如下:

    当我们在终端敲入命令,实际上会去调用 bin/xx/index.js 函数,然后调用 bootstrap 去分发不同命令不同参数的逻辑。help 和 version 比较简单,本文不做分析。任务控制方面,会先调用 npmRunAll 做参数解析,然后执行 runTasks 执行任务组中任务,全部任务执行后返回结果,结束整个流程。

    npm-run-all 包支持三条命令,我们看到源码根目录的 package.json 文件:

    "name" : "npm-run-all" , "version" : "4.1.5" , "description" : "A CLI tool to run multiple npm-scripts in parallel or sequential." , "bin" : { "run-p" : "bin/run-p/index.js" , "run-s" : "bin/run-s/index.js" , "npm-run-all" : "bin/npm-run-all/index.js" "main" : "lib/index.js" , "files" : [ "bin" , "lib" , "docs" "engines" : { "node" : ">= 4"

    bin 下面定义的命令脚本:

  • run-p,简化使用的脚本,代表并行执行脚本;
  • run-s,简化使用的脚本,代表串行执行脚本;
  • npm-run-all,复杂命令,通过 --serial 和 --parallel 参数实现前两者一样的效果。
  • 直接看到 bin/npm-run-all/index.js

    require("../common/bootstrap")("npm-run-all")
    

    上述代码中,如果是执行 run-p 这条命令,则函数传入的参数是 run-prun-s 同理。bootstrap 通过参数的不同,将任务分发到 bin 下不同目录中:

    ├── common │ ├── bootstrap.js │ ├── parse-cli-args.js │ └── version.js ├── npm-run-all │ ├── help.js │ ├── index.js │ └── main.js ├── run-p │ ├── help.js │ ├── index.js │ └── main.js └── run-s ├── help.js ├── index.js └── main.js

    照着上述代码结构,结合 ../common/bootstrap 代码:

    "use strict"
    module.exports = function bootstrap(name) {
        const argv = process.argv.slice(2)
        switch (argv[0]) {
            case undefined:
            case "-h":
            case "--help":
                return require(`../${name}/help`)(process.stdout)
            case "-v":
            case "--version":
                return require("./version")(process.stdout)
            default:
                // https://github.com/mysticatea/npm-run-all/issues/105
                // Avoid MaxListenersExceededWarnings.
                process.stdout.setMaxListeners(0)
                process.stderr.setMaxListeners(0)
                process.stdin.setMaxListeners(0)
                // Main
                return require(`../${name}/main`)(
                    argv,
                    process.stdout,
                    process.stderr
                ).then(
                    () => {
                        // I'm not sure why, but maybe the process never exits
                        // on Git Bash (MINGW64)
                        process.exit(0)
                    () => {
                        process.exit(1)
    

    bootstrap 函数依据调用的命令调用不同目录下的 help、version 或者调用 main 函数,达到了差异消除的效果。

    然后再把目光放在 process.stdout.setMaxListeners(0) 这是啥玩意?打开 issue 链接,通过报错信息和翻阅官方文档:

    By default EventEmitters will print a warning if more than 10 listeners are added for a particular event. This is a useful default that helps finding memory leaks. The emitter.setMaxListeners() method allows the limit to be modified for this specific EventEmitter instance. The value can be set to Infinity (or 0) to indicate an unlimited number of listeners.

    默认情况下,如果为特定事件添加了超过 10 个侦听器,EventEmitters 将发出警告。这是一个有用的默认值,有助于发现内存泄漏。 emitter.setMaxListeners() 方法允许为这个特定的 EventEmitter 实例修改限制。该值可以设置为 Infinity(或 0)以指示无限数量的侦听器。

    为什么要处理这个情况呢?因为用户可能这么使用:

    $ run-p a1 a2 a3 a4 a5 a6 a7 a8 a9 a10 a11
    

    你永远想象不到用户会怎么使用你的工具!

    分析完不同命令的控制逻辑,我们进入核心的 npmRunAll 函数,参数解析部分逻辑如下:

    module.exports = function npmRunAll(args, stdout, stderr) {
      try {
        const stdin = process.stdin
        const argv = parseCLIArgs(args)
      } catch () {
        // ...
    

    解析处理所有标准输入流参数,最终生成并返回 ArgumentSet 实例 set。parseCLIArgsCore 只看控制任务流执行的参数:

    function addGroup(groups, initialValues) {
        groups.push(Object.assign(
            { parallel: false, patterns: [] },
            initialValues || {}
    function parseCLIArgsCore(set, args) {
        LOOP:
        for (let i = 0; i < args.length; ++i) {
            const arg = args[i]
            switch (arg) {
    					  // ...
                case "-s":
                case "--sequential":
                case "--serial":
                    if (set.singleMode && arg === "-s") {
                        set.silent = true
                        break
                    if (set.singleMode) {
                        throw new Error(`Invalid Option: ${arg}`)
                    addGroup(set.groups)
                    break
                case "-p":
                case "--parallel":
                    if (set.singleMode) {
                        throw new Error(`Invalid Option: ${arg}`)
                    addGroup(set.groups, { parallel: true })
                    break
                default: {
                    // ...
                    break
        // ...
        return set
    

    将任务都装到 groups 数组中,如果是并行任务(传了 -p--parallel 参数),就给任务加上 { parallel: true } 标记。默认是 { parallel: false },即串行任务。

    执行任务组

    在进入这一小节之前,我们就 npm-run-all 源码在 scripts 下加一条 debug 命令:

    $ "node ./bin/npm-run-all/index.js lint test"
    

    解析完参数生成的 argv.groups 如下:

    paralles: false, patterns: ['lint', 'test']

    有了这个数组结果,我们再看任务执行流程会更加明朗。

    // bin/npm-run-all/main.js
    module.exports = function npmRunAll(args, stdout, stderr) {
        try {
        		// 省略解析参数
            // 执行任务
            const promise = argv.groups.reduce(
                (prev, group) => {
                    // 分组中没有任务,直接返回 null
                    if (group.patterns.length === 0) {
                        return prev
                    return prev.then(() => runAll(
                        group.patterns,	// ['lint', 'test']
                           	// ……
                          	// 是否并行执行
                            parallel: group.parallel,
                          	// 并行的最大数量
                            maxParallel: group.parallel ? argv.maxParallel : 1,
                            // 一个任务失败后继续执行其他任务
                          	// ……
                            arguments: argv.rest,
                            // 这个📌用于当任务以0码退出时,终止全部任务
                            race: group.parallel && argv.race,
                Promise.resolve(null)
    				// ...
            return promise
        catch (err) {	
            //eslint-disable-next-line no-console
            console.error("ERROR:", err.message)
            return Promise.reject(err)
    

    上述代码解析完命令行中的参数之后,通过 reduce 拼接所有任务组的结果。任务组就是 npm-run-all 支持同时配置并行和串行的任务,并生成多个任务组。例如:

    $ npm-run-all -p a b -s c d e
    

    上述命令会生成两个任务组,并行任务组是 ['a', 'b'],串行任务组是 ['c', 'd', 'e']。就会执行两次 runAll。接下来就看看单个任务组的执行逻辑:

    // lib/index.js
    module.exports = function npmRunAll(patternOrPatterns, options) { //eslint-disable-line complexity
       	// 省略一系列参数格式化和默认值处理……
        try {
            const patterns = parsePatterns(patternOrPatterns, args)
            // 省略非法参数报错和参数之间的校验……
            return Promise.resolve()
                .then(() => {
                    if (taskList != null) {
                        return { taskList, packageInfo: null }
                    return readPackageJson()
                .then(x => {
                    // 从 package.json 中匹配任务
              			// 🌰中 patterns 是 ['lint', 'tests'],所以 lint 和 test 这两个任务一定要从 package.json 的 scripts 中能查看到
              			const tasks = matchTasks(x.taskList, patterns)
                    return runTasks(tasks, {
                        // 省略上面格式化和校验后的参数……
        catch (err) {
            return Promise.reject(new Error(err.message))
    

    上述代码将参数的处理和校验全部省略掉了,看到核心的逻辑 matchTasksrunTasksmatchTasks 通过读取 package.jsonscripts 中的命令,然后判断任务组 patterns 中的任务是否都存在于 taskList 中。

    然后就来到了本小节的核心逻辑——调用 runTasks 依次执行每个任务组中的任务:

    module.exports = function runTasks(tasks, options) {
        return new Promise((resolve, reject) => {
          	// 任务组中不存在任务,直接返回空数组
          	if (tasks.length === 0) {
                resolve([])
                return
          	// 结果数组
            const results = tasks.map(task => ({ name: task, code: undefined }))
            // 任务队列
            const queue = tasks.map((task, index) => ({ name: task, index }))
            // 用于判断并行的时候任务是否全部完成
            const promises = []
            let error = null
            let aborted = false
             * Done.
             * @returns {void}
            function done() {
               	// ……
             * Aborts all tasks.
             * @returns {void}
            function abort() {
                // ……
             * Runs a next task.
             * @returns {void}
            function next() {
              	// 任务被终止了,则不需要再往下执行了
                if (aborted) {
                    return
              	// 并行时,只有满足 queue 和 promises 的长度都为 0,才可以判断任务组完成 
                if (queue.length === 0) {
                    if (promises.length === 0) {
                        done()
                    return
                const task = queue.shift()
                const promise = runTask(task.name, optionsClone)
                promises.push(promise)
                promise.then(
                    (result) => {
                      	// 完成一个任务,就将其从 promises 中删除
                        remove(promises, promise)
                        if (aborted) {
                            return
                        if (result.code) {
                            error = new NpmRunAllError(result, results)
                            // 失败后不继续执行后续的任务,则直接终止整个任务组
                            if (!options.continueOnError) {
                                abort()
                                return
                        // Aborts all tasks if options.race is true.
                        if (options.race && !result.code) {
                            abort()
                            return
                        // Call the next task.
                        next()
                    (thisError) => {
                        remove(promises, promise)
                        if (!options.continueOnError || options.race) {
                            error = thisError
                            abort()
                            return
                        next()
          	// 最大并发数
            const max = options.maxParallel
            // 对比任务数量、配置的并发数、取较小者。这是为了防止配置的 maxParallel 比实际执行的任务数量还大的情况
            const end = (typeof max === "number" && max > 0)
                ? Math.min(tasks.length, max)
                : tasks.length
            for (let i = 0; i < end; ++i) {
                next()
    

    通过队列这个数据结构,依次执行组中的每一条任务,任务成功后将结果存入 result,然后调用 next 执行下一个任务;可以通过 abort 终止全部任务;通过 done 完成整个队列的状态更新,并将结果返回。

    接下来,通过一张图和本小节的示例,来更好地理解串行机制:

    从左到右得流程,讲解一下:

  • 初始化时,根据任务组的 patterns,生成任务队列;
  • 计算完任务数量 end 之后,执行 next 函数。此时会从任务队列中取出 lint 任务,调用 runTask 去执行该任务(图2所示)。(runTask 的细节放到下一小节分析。)执行完成后,会执行以下子任务:
  • 如果配置了 aggregateOutput 参数,会将任务的输出流写入到内存流;
  • 更新 result.code,如果配置了失败不继续执行(!continueOnError) 或者 race 参数,就直接调用 abort 终止整个任务队列,返回结果;
  • 如果成功或配置失败了继续执行其他任务( continueOnError),就去任务队列中取出下一个任务(图3所示),会去执行 test 任务,重复上一步骤的逻辑。最终任务队列中没有其他任务了,此时也会执行 done 函数,结束整个任务组,并将 results 返回。
  • 并行机制的不同就在于初始的时候会调用多次 next 函数,并且会判断当前是否还有正在执行的任务。

    上图是我们执行以下命令的流程图:

    $ node ./bin/npm-run-all/index.js  -p lint test --max-parallel 2
    

    命令的意思是并行执行 lint 和 test 任务,并且最大并发数是 2。

    回到上面的流程图:

  • 初始时还是会创建一个任务队列,并将 lint 和 test 两个任务添加到队列中;
  • 然后在首次执行时,因为我们是并发执行,所以会调用两次 next 函数,promises 数组会保存两个 promise 实例;
  • 当 lint 任务先完成(此时 test 任务还在执行,即 test promise 还未结束),此时会再调用 next 函数。此时会判断任务队列和正在进行的任务队列是否为空,如果是的话就调用 done 返回结果,否则什么都不做,等待其他任务执行完成。
  • 当 test 任务也完成时(假设此时 lint 任务已经完成),同样也会再次执行 next。但此时 queue 和 promises 两个数组的长度都是 0,就执行 done 逻辑,输出任务组的结果。
  • 本节我们学习了任务组中的任务不管是串行机制还是并行机制,都通过任务队列依次执行。不同的是,串行是首次只执行一次 next,并行根据参数执行多次 next。当满足队列为空并且所有任务都完成,就结束当前任务组,并将缓存在 results 中的结果返回。

    单个任务如何执行

    了解完任务组的串行和并行机制,这一小节就来了解单个任务是如何被执行的。

    module.exports = function runTask(task, options) {
        let cp = null
        const promise = new Promise((resolve, reject) => {
            // 包装输出、输入、错误信息流……
            // 在输出流中写入任务名称
            if (options.printName && stdout != null) {
                stdout.write(createHeader(
                    task,
                    options.packageInfo,
                    options.stdout.isTTY
            // 执行命令的 npm 路径,npm-cli 的路径
            const npmPath = options.npmPath || process.env.npm_execpath
            const npmPathIsJs = typeof npmPath === "string" && /\.m?js/.test(path.extname(npmPath))
            // 执行路径,一般是全局的 bin/node 路径
            const execPath = (npmPathIsJs ? process.execPath : npmPath || "npm")
            // 判断是不是 yarn
            const isYarn = path.basename(npmPath || "npm").startsWith("yarn")
            const spawnArgs = ["run"]
            if (npmPathIsJs) {
                spawnArgs.unshift(npmPath)
            if (!isYarn) {
                Array.prototype.push.apply(spawnArgs, options.prefixOptions)
            else if (options.prefixOptions.indexOf("--silent") !== -1) {
                spawnArgs.push("--silent")
            Array.prototype.push.apply(spawnArgs, parseArgs(task).map(cleanTaskArg))
          	// 执行命令
            cp = spawn(execPath, spawnArgs, spawnOptions)
            // 省略输出流格式化……
            // Register
            cp.on("error", (err) => {
                cp = null
                reject(err)
            cp.on("close", (code, signal) => {
                cp = null
              	// 成功后返回任务名称、状态
                resolve({ task, code, signal })
        // 给当前的promise挂上静态方法,用于结束当前子进程任务
        promise.abort = function abort() {
            if (cp != null) {
                cp.kill()
                cp = null
        return promise
    

    runTask 做了四件事:

  • 格式化标准输入、输出流,添加一些任务名称头部信息之类的;
  • 获取任务的执行器,获取 npm-cli、node 等路径信息,然后拼接整个任务的执行命令;
  • 调用封装后的 spawn 执行命令,并监听 error 和 close 事件用于返回执行结果;因为系统的不一致,所以 spawn 通过 cross-spawn 做了一层封装。
  • 给当前任务挂上了 abort 的静态方法,用于结束当前进程;当在任务组执行 abort 方法时,实际会调用这个静态方法。
  • 有人会问为什么要去看一个 6 年前写的源码?语法老旧、甚至还能看到标记语法 。但我想表达的是,npm-run-all 这个包的核心逻辑——通过任务队列去实现串行和并行的任务流模型是非常经典的,类似 continueOnError、race 这样的逻辑控制节点也随处可见。例如当前非常时髦的构建系统 Turborepo ,它对 pipeline 的控制逻辑也能复用这个任务流模型。除此之外,通过 npm-run-all 的目录结构,我们可以从中总结出做一个开源项目所需的基本要求。

    关注小余,下一篇文章我们结合 npm-run-all 聊聊开源项目所需的基本要素。

  • 放弃 console.log 吧!用 Debugger 你能读懂各种源码
  •