Correct way to call `async` fn synchronously: `worker_threads`

2 months切换至中文

翻译出现错误

What

同步调用异步函数 即使用同步执行的方式调用执行异步函数,并正确获取异步函数执行的结果。

众所周知,Node.js单线程事件循环模型,异步函数需要在事件循环中经过调度后执行,因此正常情况下直接调用异步函数是无法同步获取到结果的即:

const result = fs.readFile(filePath, 'utf8', (err, content) => content)

console.log(result) // `undefined`

Why

但在某些场景下我们无法使用异步函数,如开发 ESLint 插件时,目前 processor / rule 只支持同步 API,因此在与其他库或上游依赖整合时就可能出现无法使用的尴尬,例如:

  1. eslint-mdx: mdx-js/eslint-mdx#203

  2. eslint-plugin-svelte3: sveltejs/eslint-plugin-svelte3#10 (comment)

How

异步转同步的方式至少有如下几种:

  1. child_process:通过 exexSync/spawnSync 同步 API 创建一个子进程,在子进程中完成异步任务,并想办法将异步任务的结果传递到 exexSync/spawnSync,如 make-synchronous, synckitchild_process 模式
  2. node bindings:使用 node-addon-api 实现阻塞事件循环的 C++ 扩展,如 deasync
  3. worker_threads:使用工作线程共享内存配合 Atomics 等待 worker 执行完成,如 sync-threads, synckitworker_threads 模式

其中 child_process 的效率最低,而目前实测 node bindingsworker_threads macOS 上差异不大,但 Ubuntu 上 deasyncsynckit 慢了 25 倍左右,参考 GitHub Actions 日志

Implementation

worker_threads 的完整文档具体可以查看官网,这里介绍一下 synckit 的大致实现方式。

本质上 synckit 的灵感来自于sync-threadsesbuild,而目前的基准测试显示 synckitsync-threads 快 20 倍左右,具体原因可能有以下几点:

  1. synckit 在调用 createSyncFn 时就创建了 Worker 实例,并在 runtime 实际调用时一直复用这个 Worker 实例,但 sync-threads 是在 runtime 实际调用时才每次去创建一个新的 Worker 实例,这导致
    1. synckit 的初始化过程会比 sync-threads 慢,因此使用 synckit 时我们可以将这一步优化为 getter 懒加载,如 mdx-js/eslint-mdx#324
    2. synckit 的执行过程比 sync-threads 快很多
  2. synckit 会缓存同一个 worker 文件创建的 syncFn 减少不必要的创建过程,当然这也会增加内存占用
  3. sync-threads 在数据传递过程中使用 v8 模块的序列化与反序列化功能,而 synckit 则是直接使用 worker_threads 原生的 message 传递,这一部分按照我的理解 sync-threads 应该占优,但 Noe.js 自身也是基于 v8worker_threads 的性能也会不断提升,所以使用 v8 模块进行序列化与反序列化的提升未知,在 sync-threads 侧也没有相关的基准测试

具体到代码,我们使用 MessageChannel 创建出两个 MessagePort 实例供主线程和工作线程使用:

const { port1: mainPort, port2: workerPort } = new MessageChannel()

然后我们创建一个 Worker 实例复用:

const worker = new Worker(workerPath, {
  workerData: { workerPort },
  transferList: [workerPort],
  execArgv: [],
})

然后在 worker 实现中我们使用 parentPort 来监听主线程传递进来的数据,我们的异步任务将在这里完成并在完成后通知主线程:

export async function runAsWorker<T extends AnyAsyncFn>(fn: T): Promise<void>
export async function runAsWorker<R, T extends AnyAsyncFn<R>>(fn: T) {
  const { workerPort } = workerData as WorkerData
  parentPort!.on(
    'message',
    ({ sharedBuffer, id, args }: MainToWorkerMessage<Parameters<T>>) => {
      ;(async () => {
        const sharedBufferView = new Int32Array(sharedBuffer)
        let msg: WorkerToMainMessage<R>
        try {
          msg = { id, result: await fn(...args) } // 执行异步任务
        } catch (error: unknown) {
          msg = {
            id,
            error,
          }
        }
        workerPort.postMessage(msg) // 通知主线程任务结果
        Atomics.add(sharedBufferView, 0, 1)
        Atomics.notify(sharedBufferView, 0) // 触发 Atomics 通知
      })()
    },
  )
}

我们继续看一下 syncFn 的创建过程:

const syncFn = (...args: Parameters<T>): R => {
  const id = nextID++

  const sharedBuffer = new SharedArrayBuffer(bufferSize)
  const sharedBufferView = new Int32Array(sharedBuffer)

  const msg: MainToWorkerMessage<Parameters<T>> = { sharedBuffer, id, args }
  worker.postMessage(msg) // 通知 `worker` 开始处理异步任务,传递参数

  // 关键点:我们在这里等待 `runAsWorker` 中 `Atomics.notify` 触发的通知,正常结束即代表异步任务已经完成
  const status = Atomics.wait(sharedBufferView, 0, 0, timeout) 

  if (!["ok", "not-equal"].includes(status)) {
    throw new Error("Internal error: Atomics.wait() failed: " + status)
  }

  const {
    id: id2,
    result,
    error,
  } = receiveMessageOnPort(mainPort)!.message as WorkerToMainMessage<R> // 获取子线程通知的异步任务结果

  if (id !== id2) {
    throw new Error(`Internal error: Expected id ${id} but got id ${id2}`)
  }

  if (error) {
    throw error
  }

  return result!
}

通过上面的两个关键步骤我们就成功将异步任务转化为同步任务了,而且相对性能很快,同时不需要『难以使用』的 C++ node bindings

Limitation

由于我们使用 Atomics.wait 等待工作线程的通知,默认没有超时时间,所以如果异常发生在 runAsWorker 以外,那么将永远无法收到通知,例如以下 worker 的实现:

const { runAsWorker } = require('synckit')

require('non-exist-package') // 这里请求一个不存在的包,将会抛出异常,但是 `Atomics.wait` 无法收到通知

runAsWorker(() => {
  // do some job
})

因此 synckit 提供了一个 SYNCKIT_TIMEOUT 环境变量方便调试类似的问题,即如果遇到任务长时间没有结束的情况可以尝试设置如 SYNCKIT_TIMEOUT=5000 以观察 runAsWorker 是否有异常,类似的问题在开发阶段都可以解决掉。

Next step

既然在 Node.js 中可以使用 worker_threads 将异步 API 转化为相对高性能的同步 API,那么在拥有 Web Workers 接口的浏览器中是否也同样可行?理论上应该是行得通的,毕竟 MessageChannelAtomics 同样可以使用,因此下一步 synckit 将开始支持浏览器端的使用。

Conclusion

Node.jsworker_threads 和浏览器中 Web Workers 的引入为我们提供了提供了相对高性能的转换代码执行顺序的功能,但相对地它的性能肯定不如原生的异步 API,因此我们还是应该使用避免类似的解决方案,转而推进如 ESLint 对异步 API 的支持。

Related

目前在使用 synckit 的 ESLint 相关的包有:

  1. eslint-plugin-mdx
  2. eslint-plugin-markup

本文首发于 知乎专栏 - 1stG 全栈之路