并发编程:TaskPool与Worker线程通信与数据共享(52)
在鸿蒙(HarmonyOS)的 ArkTS 并发编程中,TaskPool 和 Worker 是实现多线程的两大核心机制。由于 ArkTS 采用了 Actor 内存隔离模型,线程之间不存在共享内存,因此线程间的数据传递与通信是开发中的关键环节。
以下是关于 TaskPool 与 Worker 的线程通信与数据共享机制的详细解析:
一、 核心通信机制:从“深拷贝”到“转移控制权”
无论是 TaskPool 还是 Worker,默认情况下跨线程传递普通 JS 对象时,系统会采用标准的结构化克隆算法(Structured Clone),即深拷贝。这种方式安全但存在性能开销。为了优化大数据传输,两者均提供了以下优化机制:
-
转移控制权(Transferable Objects):
对于ArrayBuffer等支持转移的对象,可以在传输时直接转移其内存控制权给目标线程。转移后,原宿主线程中的该对象将失效不可用,但这种方式避免了数据拷贝,极大提升了传输效率。- TaskPool:通过
taskpool.setTransferList接口设置。 - Worker:通过
postMessage的 transfer 参数列表设置。
- TaskPool:通过
-
Sendable 对象共享(零拷贝):
对于需要在多线程间频繁交互且无需隔离的复杂对象,可以使用@Sendable装饰器。被标记为 Sendable 的类,其实例会分配在共享堆(SharedHeap)中,跨线程传递时采用引用传递而非拷贝。- TaskPool:支持直接传递 Sendable 对象。
- Worker:需要使用专门的
postMessageWithSharedSendable接口进行共享传输。 - 注意:Sendable 具有“传染性”,其关联属性也必须是 Sendable 类型,且为了保证线程安全,通常需要配合
AsyncLock(异步锁)来保护并发修改。
1、 转移控制权(Transferable Objects)
当需要跨线程传递 ArrayBuffer 等海量二进制数据(如图像像素、音视频流)时,使用转移控制权可以实现“零拷贝”传输,但代价是原线程将失去对该内存的访问权。
1. TaskPool 中的转移传输
在 TaskPool 中,默认传递 ArrayBuffer 时就会采用转移方式。如果确实需要保留原数据,可以通过 setTransferList([]) 强制切换为拷贝模式。
// 定义并发任务函数
@Concurrent
function adjustImageValue(arrayBuffer: ArrayBuffer): ArrayBuffer {
// 在子线程中对 arrayBuffer 进行耗时操作
return arrayBuffer; // 返回值默认也是转移回主线程
}
// 在主线程中执行任务
let task = new taskpool.Task(adjustImageValue, largeBuffer);
// 【注意】如果不调用此方法,默认就是转移模式。
// 如果希望原线程保留 largeBuffer 副本,则传入空数组强制拷贝:
// task.setTransferList([]);
taskpool.execute(task).then((result) => {
// 此时 largeBuffer 已不可用(byteLength 为 0),数据已转移至 result 中
});
2. Worker 中的转移传输
在 Worker 通信中,通过在 postMessage 的第二个参数中指定要转移的对象列表来实现。
// 主线程向 Worker 转移内存
const bigBuffer = new ArrayBuffer(100 * 1024 * 1024); // 100MB 数据
workerInstance.postMessage(bigBuffer, [bigBuffer]); // 第二个参数指定转移对象
// 转移后,主线程的 bigBuffer 将失效,Worker 接收后可直接使用
2、 Sendable 对象共享(零拷贝引用传递)
对于需要在多线程间频繁交互、且需要保持状态的复杂对象,使用 @Sendable 装饰器可以将其分配在共享堆(SharedHeap)中,跨线程传递时仅传递引用。
1. 定义 Sendable 类与并发任务
// 必须使用 @Sendable 装饰器,且属性需显式初始化
@Sendable
class DownloadTaskInfo {
url: string = '';
progress: number = 0;
constructor(url: string) {
this.url = url;
}
}
// 并发任务中接收 Sendable 对象
@Concurrent
function startDownload(taskInfo: DownloadTaskInfo) {
// 子线程直接修改共享对象的进度
taskInfo.progress = 50;
return taskInfo;
}
2. TaskPool 传递 Sendable 对象
TaskPool 执行时,直接传入 Sendable 对象实例即可实现共享传输。
const taskInfo = new DownloadTaskInfo('https://example.com/video.mp4');
let task = new taskpool.Task(startDownload, taskInfo);
taskpool.execute(task).then((res) => {
// 主线程与子线程操作的是同一块内存,res === taskInfo
console.info(`下载进度: ${res.progress}`);
});
3. Worker 传递 Sendable 对象
Worker 必须使用专用的 postMessageWithSharedSendable 方法,才能触发共享传输(若使用普通的 postMessage 则会退化为深拷贝)。
// 主线程向 Worker 发送共享对象
const taskInfo = new DownloadTaskInfo('https://example.com/image.png');
workerInstance.postMessageWithSharedSendable(taskInfo);
// Worker 内部接收
// entry/ets/workers/Worker.ets
workerPort.onmessage = (e: worker.MessageEvents) => {
const info = e.data as DownloadTaskInfo;
info.progress = 100; // 修改共享数据
};
二、 TaskPool 的通信特点
TaskPool 是系统级的任务池,适合处理短时、独立、无状态的耗时任务(如 JSON 解析、图片压缩)。
- 通信方式:采用“提交任务 -> 获取 Promise 结果”的单向异步模式。主线程通过
taskpool.execute()提交带有@Concurrent装饰器的函数及参数,任务执行完毕后通过Promise将结果序列化返回主线程。 - 数据限制:传递的参数和返回的结果必须是可序列化的对象。由于每次执行都是全新的上下文,TaskPool 无法在多次任务间保持状态。
- 适用场景:高并发的小任务调度,支持任务优先级(HIGH/NORMAL/LOW)和任务组(TaskGroup)管理。
1. 基础通信:提交任务与 Promise 结果返回
TaskPool 最基础的用法是将耗时计算封装在带有 @Concurrent 装饰器的函数中,通过 execute() 提交后,主线程通过 await 或 .then() 获取序列化后的结果。
import { taskpool } from '@kit.ArkTS';
// 1. 定义并发任务函数(必须使用 @Concurrent 装饰器)
@Concurrent
function parseLargeJson(jsonStr: string): object {
// 在子线程中执行耗时解析,不阻塞 UI
return JSON.parse(jsonStr);
}
// 2. 主线程提交任务并接收结果
async function loadData() {
const jsonData = '{"name": "HarmonyOS", "version": 5}';
const task = new taskpool.Task(parseLargeJson, jsonData);
try {
// 通过 Promise 获取子线程返回的结果
const result = await taskpool.execute(task) as object;
console.info('解析结果:', JSON.stringify(result));
} catch (error) {
console.error('任务执行失败:', error);
}
}
2. 任务优先级调度(HIGH / NORMAL / LOW)
TaskPool 支持根据任务的紧急程度设置优先级,系统底层会动态调度 CPU 核心,优先处理高优先级任务。
import { taskpool } from '@kit.ArkTS';
@Concurrent
function heavyComputation(data: number[]): number {
// 模拟耗时计算
return data.reduce((a, b) => a + b, 0);
}
// 设置任务优先级为 HIGH,确保优先被调度执行
const urgentTask = new taskpool.Task(heavyComputation, [1, 2, 3, 4, 5]);
urgentTask.setPriority(taskpool.Priority.HIGH);
taskpool.execute(urgentTask).then((res) => {
console.info('高优先级任务完成,结果:', res);
});
3. 任务组管理(TaskGroup)
当需要并行处理多个独立任务,并等待所有任务全部完成后再进行下一步操作时,可以使用 TaskGroup(类似于 Promise.all 的并发版本)。
import { taskpool } from '@kit.ArkTS';
@Concurrent
function fetchAndProcessData(id: number): string {
// 模拟多个独立的数据处理任务
return `Data_${id}_Processed`;
}
async function batchProcess() {
// 1. 创建任务组
const group = new taskpool.TaskGroup();
// 2. 将多个任务添加到任务组中
for (let i = 0; i < 5; i++) {
const task = new taskpool.Task(fetchAndProcessData, i);
group.addTask(task);
}
// 3. 执行任务组,等待所有任务完成后返回结果数组
const results = await taskpool.execute(group) as string[];
console.info('批量处理完成:', results);
}
4. 进阶通信:子线程向主线程持续发送数据(sendData)
虽然 TaskPool 默认是单次返回结果,但对于需要实时反馈进度的场景(如图片批量处理进度),可以通过 sendData 和 onReceiveData 实现子线程向主线程的持续通信。
import { taskpool } from '@kit.ArkTS';
@Concurrent
function processImages(count: number): string[] {
const results: string[] = [];
for (let i = 0; i < count; i++) {
// 模拟耗时操作
results.push(`Image_${i}`);
// 【关键】在子线程中持续向主线程发送进度数据
taskpool.Task.sendData(i + 1);
}
return results;
}
// 主线程监听进度并执行任务
const task = new taskpool.Task(processImages, 100);
// 必须在 execute 之前注册数据接收回调
task.onReceiveData((progress: number) => {
// 回调在宿主线程(UI线程)中执行,可安全更新 UI
console.info(`当前处理进度: ${progress}%`);
});
taskpool.execute(task).then((finalResult) => {
console.info('全部处理完成,共:', finalResult.length, '张');
});
三、 Worker 的通信特点
Worker 是独立的工作线程,适合处理长耗时、有状态、需要持续运行的任务(如 WebSocket 长连接、游戏主逻辑、持续的传感器数据采集)。
- 通信方式:采用基于事件的双向消息传递机制。
- 主线程 -> Worker:主线程通过
worker.postMessage()发送消息,Worker 内部通过onmessage监听接收。 - Worker -> 主线程:Worker 通过
workerPort.postMessage()发送消息,主线程通过监听 Worker 实例的onmessage事件接收。
- 主线程 -> Worker:主线程通过
- 数据限制:同样基于序列化通信,但 Worker 拥有独立的内存堆栈和事件循环,可以在多次消息交互之间持久维护内部状态(变量、对象)。
- 生命周期:Worker 需要开发者手动创建(
new worker.ThreadWorker)和销毁(terminate),且系统对同时运行的 Worker 数量有严格限制(通常最多 64 个),不可滥用。
1. 主线程:创建 Worker 与双向通信
在主线程中,我们通过 new worker.ThreadWorker 显式创建 Worker 实例,并通过监听 onmessage 接收子线程发来的消息。
import { worker } from '@kit.ArkTS';
// 1. 显式创建 Worker 实例(需传入 Worker 脚本路径)
const myWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ets');
// 2. 主线程向 Worker 发送消息
myWorker.postMessage({ type: 'START_TASK', payload: { id: 1001 } });
// 3. 监听并接收 Worker 发回的消息
myWorker.onmessage = (e: worker.MessageEvents) => {
const data = e.data;
console.info(`收到 Worker 消息: ${data.type}, 进度: ${data.progress}%`);
};
// 4. 生命周期管理:在不需要时手动销毁 Worker,防止内存泄漏
// myWorker.terminate();
2. Worker 侧:独立脚本与状态维护
Worker 拥有独立的 JS 执行环境和事件循环,可以维护全局状态,并通过 workerPort 主动向主线程推送消息。
// entry/ets/workers/MyWorker.ets
import { worker } from '@kit.ArkTS';
// Worker 内部的全局状态(独立内存堆栈,多次消息交互间持久存在)
let taskCounter = 0;
// 监听来自主线程的消息
workerPort.onmessage = (e: worker.MessageEvents) => {
const msg = e.data;
if (msg.type === 'START_TASK') {
taskCounter++;
// 模拟耗时任务后,主动向主线程推送进度
workerPort.postMessage({
type: 'PROGRESS',
progress: 50,
taskId: msg.payload.id,
totalTasks: taskCounter
});
}
};
3. 实战:模拟 WebSocket 长连接与实时数据流
Worker 非常适合处理需要持续运行、且需要高频双向通信的场景,例如后台持续采集传感器数据或维持网络长连接。
主线程侧:
const sensorWorker = new worker.ThreadWorker('entry/ets/workers/SensorWorker.ets');
// 持续接收传感器数据并更新 UI 状态
sensorWorker.onmessage = (e: worker.MessageEvents) => {
const sensorData = e.data;
// 安全地更新 UI 状态
this.currentSpeed = sensorData.speed;
};
// 发送指令:开始采集
sensorWorker.postMessage({ action: 'START_COLLECTING' });
Worker 侧:
// entry/ets/workers/SensorWorker.ets
let timerId: number = -1;
workerPort.onmessage = (e: worker.MessageEvents) => {
if (e.data.action === 'START_COLLECTING') {
// 启动周期性数据采集(持续运行状态)
timerId = setInterval(() => {
const speed = Math.random() * 100;
// 实时向主线程推送高频数据
workerPort.postMessage({ speed: speed });
}, 100);
} else if (e.data.action === 'STOP_COLLECTING') {
clearInterval(timerId);
}
};
四、 选型与案例
- UI 操作红线:只有主线程(UI 线程)能操作 ArkUI 组件(如更新
@State变量)。子线程(TaskPool/Worker)中严禁调用 UI 相关 API,否则会直接抛出异常。 - 避免频繁交互:在并发场景下,子线程任务应保持相对独立,尽量减少与主线程的数据交互频率。如果必须传递超大对象,优先考虑 Transferable 转移控制权或 Sendable 共享机制,避免深拷贝带来的性能瓶颈。
- 决策依据:如果任务是一次性计算且耗时较短(通常 ≤5 秒),优先使用 TaskPool;如果任务需要长期运行、维持内部状态或进行复杂的跨线程双向通信,则选择 Worker。
1、 极致共享:Sendable 与共享模块单例
当多线程需要频繁读写同一个复杂对象时,传统的深拷贝开销极大。通过 @Sendable 装饰器,可以实现跨线程的零拷贝引用传递。
实战场景:跨线程共享状态(单例模式)
利用 'use shared' 指令标记共享模块,并在其中导出 Sendable 对象。结合 AsyncLock(异步锁)保障线程安全,可实现主线程与 TaskPool 线程对同一对象的安全访问。
// SharedModule.ets (共享模块)
'use shared';
import { ArkTSUtils } from '@kit.ArkTS';
@Sendable
class SharedCounter {
private count_: number = 0;
private lock_: ArkTSUtils.locks.AsyncLock = new ArkTSUtils.locks.AsyncLock();
public async increase() {
await this.lock_.lockAsync(() => { this.count_++; });
}
public async getCount(): Promise<number> {
return this.lock_.lockAsync(() => this.count_);
}
}
export const sharedCounter = new SharedCounter();
2、 大数据传输:ArrayBuffer 与 SharedArrayBuffer
对于图像像素、音视频流等海量数据,除了使用 Transferable 转移所有权外,还可以使用 SharedArrayBuffer 实现真正的底层内存共享。
- 零拷贝共享:
SharedArrayBuffer内部的 Native 内存支持跨并发实例直接共享。 - 防竞争机制:由于多个线程可同时访问同一块内存,必须配合
Atomics类(如Atomics.add,Atomics.load)进行原子操作,防止数据竞争(Data Race)。
3、 长时任务处理:TaskPool.LongTask
虽然 TaskPool 适合短时任务,但对于执行周期长、但不频繁阻塞线程的任务(如定期传感器数据采集、Socket 端口监听),官方提供了 TaskPool.LongTask 机制。
- 核心优势:开发者无需像 Worker 那样手动管理线程的生命周期,避免了线程泛滥。
- 双向通信:在长时任务内部,可以通过
taskpool.Task.sendData()不定期将阶段性结果返回给宿主线程,宿主线程通过onData事件接收。 - 避坑指南:长时任务不等于阻塞任务。如果任务需要长时间独占线程(如产线硬件老化压测、游戏主逻辑线程),依然必须使用 Worker。
4、 跨语言并发:NAPI 与 TaskPool 的深度结合
在混合开发中,如果 Native 层的 C++ 计算极其耗时,可以将其封装后放入 ArkTS 的 TaskPool 中调度。
- Native 对象跨线程传递:当 Native 对象(如 C++ 类的实例)需要传递给 TaskPool 时,需使用
napi_coerce_to_native_binding_object接口。 - 生命周期接管:通过绑定
detach(序列化前执行)和attach(反序列化后执行)回调,安全地将 Native 对象的内存控制权从主线程转移至 TaskPool 的工作线程,处理完毕后再安全传回。
五、 架构演进
鸿蒙的并发模型正在从传统的“共享内存 + 锁”向“Actor 内存隔离 + 显式共享”演进。在实际工程中:
- 轻量级计算/解析:首选
TaskPool,享受系统自动调度与优先级管理。 - 海量数据处理:使用
SharedArrayBuffer+Atomics,或Transferable转移控制权。 - 复杂状态共享:使用
@Sendable+ 共享模块 +AsyncLock。 - 长驻后台服务:使用
Worker,但务必在组件销毁时调用terminate()释放资源,防止内存泄漏。
更多推荐


所有评论(0)