之前已经学习过web-worker了,传送门。但是使用方法较为繁琐,今天来看下比较简洁的使用web-worker的方法。

web-worker

先来简单的回忆下使用方法

// main.ts
// 创建一个新的 Web Worker
const worker = new Worker('worker.js');

// 监听来自 Web Worker 的消息
worker.addEventListener('message', (event) => {
  console.log('主线程收到消息:', event.data);
});

// 向 Web Worker 发送消息
worker.postMessage('你好,Web Worker!');

// worker.ts
// 监听来自主线程的消息
self.addEventListener('message', (event) => {
  console.log('Web Worker 收到消息:', event.data);

  // 在工作线程中执行一些任务
  const result = performTask();

  // 向主线程发送消息
  self.postMessage(result);
});

// 模拟在工作线程中执行的任务
function performTask() {
  let sum = 0;
  for (let i = 1; i <= 5; i++) {
    sum += i;
  }
  return sum;
}

PS:果然还是那么繁琐……

Comlink 是由 Google Chrome Labs 开源出的项目,提供了前端多线程编程的 PRC 能力,是一个简化 Web Workers 通信的库,它使得在主线程和 Worker 之间传递对象变得更加容易,而不需要手动处理消息传递和序列化。

Comlink最主要的两个函数是wrap和expose:

  • wrap(endpoint):用于主线程访问worker中的属性和方法,返回一个代理,对worker API的消息机制进行了封装,可以直接在主线程基于Promise进行通信;
  • expose(value, endpoint?, allowedOrigins?):用于worker中暴露属性和方法,endpoint代表通信双方,这里就代表主线程,普通的worker可以忽略,在sharedWorker中用于连接时绑定port。

代码示例

代码示例均基于Vite环境中。

import { expose } from 'comlink';

export class ComlinkTest {
  private _count: number = 0;
  
  get count() {
    return this._count;
  }

  add() {
    this._count++;
  }

  performTask() {
    let sum = 0;
    for (let i = 1; i <= 5; i++) {
      sum += i;
    }
    return sum;
  }
}

expose(ComlinkTest)
// 或者暴露实例
// expose(new ComlinkTest())

主线程中

import {wrap} from 'comlink'
import comlinkWorker from './worker/comlink?worker'
import { ComlinkTest } from './worker/comlink'

const worker = new comlinkWorker();
const MyClass = wrap<new () => ComlinkTest>(worker);

const obj = await new MyClass();
const res = await obj.performTask()
console.log(res); // 15

// 实例引入
const worker = new comlinkWorker();
const obj = wrap<ComlinkTest>(worker);

const res = await obj.performTask()
console.log(res); // 15

worker可以暴露类或者对象示例,如果暴露实例则所有的引用都共享同一份数据,如果暴露类则每个实例化对象都是独立的。

另外,主线程获取的构造函数实例化是异步的。

原理探秘

Comlink通过封装,让使用者可以省略Web Wroker API中繁杂的通讯,让worker的使用可以和普通代码一样。

下面来“解剖”一下它的内部构造,看一下用了什么“黑魔法”。(为了便于理解,删减边界条件判断的代码)

先来看一些wrap函数,作为主线程调用worker实例的包装器,通过Proxy API建立了主线程和worker之间通信的桥梁。

wrap函数主要是创建了一个代理实例,对worker中的一些操作进行了代理

export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
  return createProxy<T>(ep, [], target) as any;
}

function createProxy<T>(
  ep: Endpoint,
  path: (string | number | symbol)[] = [],
  target: object = function () {}
): Remote<T> {
  let isProxyReleased = false;
  const proxy = new Proxy(target, {
    get(_target, prop) {
      // ...
    },
    set(_target, prop, rawValue) {
      // ...
    },
    apply(_target, _thisArg, rawArgumentList) {
      // ...
    },
    construct(_target, rawArgumentList) {
      // ...
    },
  });
  registerProxy(proxy, ep);
  return proxy as any;
}

get,对实例的读取操作进行拦截,当属性为then时(异步执行)获取值并返回,否则返回当前属性的代理对象。

get(_target, prop) {
  if (prop === "then") {
    if (path.length === 0) {
      return { then: () => proxy };
    }
    const r = requestResponseMessage(ep, {
      type: MessageType.GET,
      path: path.map((p) => p.toString()),
    }).then(fromWireValue);
    return r.then.bind(r);
  }
  return createProxy(ep, [...path, prop]);
}

为什么要返回的值要继续使用createProxy包装?

目的是为了到最后的操作时能够保留属性路径,例如a.b.c的path为["a", "b", "c"]

Proxy的默认对象是一个匿名函数,可以看一下createProxy的参数列表,在使用await和Promise.then时拦截到的属性值为thenimage-20231221202954233

set,拦截赋值操作,没有花哨的东西,就是发送修改值的消息。

set(_target, prop, rawValue) {
  const [value, transferables] = toWireValue(rawValue);
  return requestResponseMessage(
    ep,
    {
      type: MessageType.SET,
      path: [...path, prop].map((p) => p.toString()),
      value,
    },
    transferables
  ).then(fromWireValue) as any;
}

apply,拦截函数调用,向worker发送apply数据,额外处理了创建ep和bind

apply(_target, _thisArg, rawArgumentList) {
  const last = path[path.length - 1];
  if ((last as any) === createEndpoint) {
    return requestResponseMessage(ep, {
      type: MessageType.ENDPOINT,
    }).then(fromWireValue);
  }
  if (last === "bind") {
    return createProxy(ep, path.slice(0, -1));
  }
  const [argumentList, transferables] = processArguments(rawArgumentList);
  return requestResponseMessage(
    ep,
    {
      type: MessageType.APPLY,
      path: path.map((p) => p.toString()),
      argumentList,
    },
    transferables
  ).then(fromWireValue);
}

construct,拦截 new 操作符,向worker发送构造消息。

construct(_target, rawArgumentList) {
  const [argumentList, transferables] = processArguments(rawArgumentList);
  return requestResponseMessage(
    ep,
    {
      type: MessageType.CONSTRUCT,
      path: path.map((p) => p.toString()),
      argumentList,
    },
    transferables
  ).then(fromWireValue);
}

可以看到创建proxy实例的时候,requestResponseMessage这个函数出现频率挺高,它是用来和worker进行通信的关键,将不同的操作通过消息机制发送到worker中。

function requestResponseMessage(
  ep: Endpoint,
  msg: Message,
  transfers?: Transferable[]
): Promise<WireValue> {
  return new Promise((resolve) => {
    const id = generateUUID();
    ep.addEventListener("message", function l(ev: MessageEvent) {
      if (!ev.data || !ev.data.id || ev.data.id !== id) {
        return;
      }
      ep.removeEventListener("message", l as any);
      resolve(ev.data);
    } as any);
    if (ep.start) {
      ep.start();
    }
    ep.postMessage({ id, ...msg }, transfers);
  });
}

function generateUUID(): string {
  return new Array(4)
    .fill(0)
    .map(() => Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16))
    .join("-");
}

每次requestResponseMessage执行都会生成一个UUID,用来区分响应对应的原始请求。执行requestResponseMessage时添加一个message的回调函数,收到响应数据时检验id是否对应,如果是当前操作的响应则删除当前一次执行添加message回调。我们使用worker时消息通信的postMessage和onmessage就是通过这里进行隐藏,造成本地调用的“错觉”。

再来看expose函数,可以看到type对应前面wrap中发送的type了,根据类型对传入的对象进行处理。

export function expose(
  obj: any,
  ep: Endpoint = globalThis as any,
  allowedOrigins: (string | RegExp)[] = ["*"]
) {
  ep.addEventListener("message", function callback(ev: MessageEvent) {
    const { id, type, path } = {
      path: [] as string[],
      ...(ev.data as Message),
    };
    const argumentList = (ev.data.argumentList || []).map(fromWireValue);
    let returnValue;
    try {
      const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj);
      const rawValue = path.reduce((obj, prop) => obj[prop], obj);
      switch (type) {
        case MessageType.GET:
          // ...
        case MessageType.SET:
          // ...
        case MessageType.APPLY:
          // ...
        case MessageType.CONSTRUCT:
          // ...
        case MessageType.ENDPOINT:
          // ...
        case MessageType.RELEASE:
          // ...
        default:
          return;
      }
    } catch (value) {
      returnValue = { value, [throwMarker]: 0 };
    }
    Promise.resolve(returnValue)
      .catch((value) => {
        return { value, [throwMarker]: 0 };
      })
      .then((returnValue) => {
        const [wireValue, transferables] = toWireValue(returnValue);
        ep.postMessage({ ...wireValue, id }, transferables);
      });
  } as any);
  if (ep.start) {
    ep.start();
  }
}

前端小白