之前已经学习过web-worker了,传送门。但是使用方法较为繁琐,今天来看下比较简洁的使用web-worker的方法。
web-worker
先来简单的回忆下使用方法
// main.ts
// 创建一个新的 Web Worker
const worker = new Worker('worker.js');
// 监听来自 Web Worker 的消息
worker.addEventListener('message', (event) => {
console.log('主线程收到消息:', event.data);
});
// 向 Web Worker 发送消息
worker.postMessage('你好,Web Worker!');
// worker.ts
// 监听来自主线程的消息
self.addEventListener('message', (event) => {
console.log('Web Worker 收到消息:', event.data);
// 在工作线程中执行一些任务
const result = performTask();
// 向主线程发送消息
self.postMessage(result);
});
// 模拟在工作线程中执行的任务
function performTask() {
let sum = 0;
for (let i = 1; i <= 5; i++) {
sum += i;
}
return sum;
}
PS:果然还是那么繁琐……
Comlink
Comlink 是由 Google Chrome Labs 开源出的项目,提供了前端多线程编程的 PRC 能力,是一个简化 Web Workers 通信的库,它使得在主线程和 Worker 之间传递对象变得更加容易,而不需要手动处理消息传递和序列化。
Comlink最主要的两个函数是wrap和expose:
- wrap(endpoint):用于主线程访问worker中的属性和方法,返回一个代理,对worker API的消息机制进行了封装,可以直接在主线程基于Promise进行通信;
- expose(value, endpoint?, allowedOrigins?):用于worker中暴露属性和方法,endpoint代表通信双方,这里就代表主线程,普通的worker可以忽略,在sharedWorker中用于连接时绑定port。
代码示例
代码示例均基于Vite环境中。
import { expose } from 'comlink';
export class ComlinkTest {
private _count: number = 0;
get count() {
return this._count;
}
add() {
this._count++;
}
performTask() {
let sum = 0;
for (let i = 1; i <= 5; i++) {
sum += i;
}
return sum;
}
}
expose(ComlinkTest)
// 或者暴露实例
// expose(new ComlinkTest())
主线程中
import {wrap} from 'comlink'
import comlinkWorker from './worker/comlink?worker'
import { ComlinkTest } from './worker/comlink'
const worker = new comlinkWorker();
const MyClass = wrap<new () => ComlinkTest>(worker);
const obj = await new MyClass();
const res = await obj.performTask()
console.log(res); // 15
// 实例引入
const worker = new comlinkWorker();
const obj = wrap<ComlinkTest>(worker);
const res = await obj.performTask()
console.log(res); // 15
worker可以暴露类或者对象示例,如果暴露实例则所有的引用都共享同一份数据,如果暴露类则每个实例化对象都是独立的。
另外,主线程获取的构造函数实例化是异步的。
原理探秘
Comlink通过封装,让使用者可以省略Web Wroker API中繁杂的通讯,让worker的使用可以和普通代码一样。
下面来“解剖”一下它的内部构造,看一下用了什么“黑魔法”。(为了便于理解,删减边界条件判断的代码)
先来看一些wrap函数,作为主线程调用worker实例的包装器,通过Proxy API建立了主线程和worker之间通信的桥梁。
wrap函数主要是创建了一个代理实例,对worker中的一些操作进行了代理
export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
return createProxy<T>(ep, [], target) as any;
}
function createProxy<T>(
ep: Endpoint,
path: (string | number | symbol)[] = [],
target: object = function () {}
): Remote<T> {
let isProxyReleased = false;
const proxy = new Proxy(target, {
get(_target, prop) {
// ...
},
set(_target, prop, rawValue) {
// ...
},
apply(_target, _thisArg, rawArgumentList) {
// ...
},
construct(_target, rawArgumentList) {
// ...
},
});
registerProxy(proxy, ep);
return proxy as any;
}
get,对实例的读取操作进行拦截,当属性为then时(异步执行)获取值并返回,否则返回当前属性的代理对象。
get(_target, prop) {
if (prop === "then") {
if (path.length === 0) {
return { then: () => proxy };
}
const r = requestResponseMessage(ep, {
type: MessageType.GET,
path: path.map((p) => p.toString()),
}).then(fromWireValue);
return r.then.bind(r);
}
return createProxy(ep, [...path, prop]);
}
为什么要返回的值要继续使用createProxy包装?
目的是为了到最后的操作时能够保留属性路径,例如a.b.c
的path为["a", "b", "c"]
。
Proxy的默认对象是一个匿名函数,可以看一下createProxy的参数列表,在使用await和Promise.then时拦截到的属性值为
then
。
set,拦截赋值操作,没有花哨的东西,就是发送修改值的消息。
set(_target, prop, rawValue) {
const [value, transferables] = toWireValue(rawValue);
return requestResponseMessage(
ep,
{
type: MessageType.SET,
path: [...path, prop].map((p) => p.toString()),
value,
},
transferables
).then(fromWireValue) as any;
}
apply,拦截函数调用,向worker发送apply数据,额外处理了创建ep和bind
apply(_target, _thisArg, rawArgumentList) {
const last = path[path.length - 1];
if ((last as any) === createEndpoint) {
return requestResponseMessage(ep, {
type: MessageType.ENDPOINT,
}).then(fromWireValue);
}
if (last === "bind") {
return createProxy(ep, path.slice(0, -1));
}
const [argumentList, transferables] = processArguments(rawArgumentList);
return requestResponseMessage(
ep,
{
type: MessageType.APPLY,
path: path.map((p) => p.toString()),
argumentList,
},
transferables
).then(fromWireValue);
}
construct,拦截 new 操作符,向worker发送构造消息。
construct(_target, rawArgumentList) {
const [argumentList, transferables] = processArguments(rawArgumentList);
return requestResponseMessage(
ep,
{
type: MessageType.CONSTRUCT,
path: path.map((p) => p.toString()),
argumentList,
},
transferables
).then(fromWireValue);
}
可以看到创建proxy实例的时候,requestResponseMessage这个函数出现频率挺高,它是用来和worker进行通信的关键,将不同的操作通过消息机制发送到worker中。
function requestResponseMessage(
ep: Endpoint,
msg: Message,
transfers?: Transferable[]
): Promise<WireValue> {
return new Promise((resolve) => {
const id = generateUUID();
ep.addEventListener("message", function l(ev: MessageEvent) {
if (!ev.data || !ev.data.id || ev.data.id !== id) {
return;
}
ep.removeEventListener("message", l as any);
resolve(ev.data);
} as any);
if (ep.start) {
ep.start();
}
ep.postMessage({ id, ...msg }, transfers);
});
}
function generateUUID(): string {
return new Array(4)
.fill(0)
.map(() => Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(16))
.join("-");
}
每次requestResponseMessage执行都会生成一个UUID,用来区分响应对应的原始请求。执行requestResponseMessage时添加一个message的回调函数,收到响应数据时检验id是否对应,如果是当前操作的响应则删除当前一次执行添加message回调。我们使用worker时消息通信的postMessage和onmessage就是通过这里进行隐藏,造成本地调用的“错觉”。
再来看expose函数,可以看到type对应前面wrap中发送的type了,根据类型对传入的对象进行处理。
export function expose(
obj: any,
ep: Endpoint = globalThis as any,
allowedOrigins: (string | RegExp)[] = ["*"]
) {
ep.addEventListener("message", function callback(ev: MessageEvent) {
const { id, type, path } = {
path: [] as string[],
...(ev.data as Message),
};
const argumentList = (ev.data.argumentList || []).map(fromWireValue);
let returnValue;
try {
const parent = path.slice(0, -1).reduce((obj, prop) => obj[prop], obj);
const rawValue = path.reduce((obj, prop) => obj[prop], obj);
switch (type) {
case MessageType.GET:
// ...
case MessageType.SET:
// ...
case MessageType.APPLY:
// ...
case MessageType.CONSTRUCT:
// ...
case MessageType.ENDPOINT:
// ...
case MessageType.RELEASE:
// ...
default:
return;
}
} catch (value) {
returnValue = { value, [throwMarker]: 0 };
}
Promise.resolve(returnValue)
.catch((value) => {
return { value, [throwMarker]: 0 };
})
.then((returnValue) => {
const [wireValue, transferables] = toWireValue(returnValue);
ep.postMessage({ ...wireValue, id }, transferables);
});
} as any);
if (ep.start) {
ep.start();
}
}