鸿蒙开发:TaskPool 与 Worker 多线程详解

HarmonyOS NEXT 提供了两种多线程并发机制:TaskPoolWorker。两者都基于 Actor 并发模型,线程间不共享内存,通过消息传递通信。本文详细介绍两者的用法与区别。


一、TaskPool(任务池)

1.1 概念

TaskPool 是系统管理的线程池,开发者只需提交任务,无需关心线程的创建与销毁。系统会根据负载自动调度,适合短时、独立、高频的并发任务。

1.2 基本用法

import taskpool from '@ohos.taskpool';

// 定义一个可并发执行的函数(必须用 @Concurrent 装饰)
@Concurrent
function heavyCompute(input: number): number {
  let result = 0;
  for (let i = 0; i < input; i++) {
    result += i;
  }
  return result;
}

// 创建任务并执行
async function runTask() {
  const task = new taskpool.Task(heavyCompute, 1000000);
  const result = await taskpool.execute(task);
  console.log('计算结果:', result);
}

1.3 设置任务优先级

import taskpool from '@ohos.taskpool';

@Concurrent
function parseData(data: string): object {
  return JSON.parse(data);
}

async function runWithPriority() {
  const task = new taskpool.Task(parseData, '{"key":"value"}');

  // 优先级:HIGH / MEDIUM / LOW
  const result = await taskpool.execute(task, taskpool.Priority.HIGH);
  console.log('解析结果:', result);
}

1.4 任务取消

import taskpool from '@ohos.taskpool';

@Concurrent
function longTask(n: number): number {
  let sum = 0;
  for (let i = 0; i < n; i++) sum += i;
  return sum;
}

async function cancelDemo() {
  const task = new taskpool.Task(longTask, 999999999);

  // 异步执行,不等待结果
  taskpool.execute(task).catch((e: Error) => {
    console.log('任务被取消或出错:', e.message);
  });

  // 取消任务
  taskpool.cancel(task);
}

1.5 任务组(TaskGroup)

将多个任务打包,全部完成后统一返回结果:

import taskpool from '@ohos.taskpool';

@Concurrent
function multiply(a: number, b: number): number {
  return a * b;
}

async function runGroup() {
  const group = new taskpool.TaskGroup();
  group.addTask(multiply, 2, 3);
  group.addTask(multiply, 4, 5);
  group.addTask(multiply, 6, 7);

  // 返回所有任务结果的数组
  const results = await taskpool.execute(group) as number[];
  console.log('任务组结果:', results); // [6, 20, 42]
}

二、Worker(工作线程)

2.1 概念

Worker 是开发者手动创建和管理的独立线程,适合长时间运行、需要持续通信的任务,例如文件处理、持续监听、复杂状态维护等。

2.2 创建 Worker 文件

Worker 逻辑需要写在独立文件中,路径通常为 entry/src/main/ets/workers/MyWorker.ts

// entry/src/main/ets/workers/MyWorker.ts
import worker, { MessageEvents } from '@ohos.worker';

const workerPort = worker.workerPort;

// 接收主线程消息
workerPort.onmessage = (e: MessageEvents) => {
  const input = e.data as number;
  console.log('Worker 收到:', input);

  // 执行耗时操作
  let result = 0;
  for (let i = 0; i < input; i++) {
    result += i;
  }

  // 将结果发回主线程
  workerPort.postMessage(result);
};

// 错误处理
workerPort.onmessageerror = (e: MessageEvents) => {
  console.error('Worker 消息错误:', e);
};

2.3 主线程使用 Worker

import worker from '@ohos.worker';

function startWorker() {
  // 创建 Worker 实例,指向 Worker 文件路径
  const myWorker = new worker.ThreadWorker(
    'entry/ets/workers/MyWorker.ts'
  );

  // 接收 Worker 返回的消息
  myWorker.onmessage = (e) => {
    console.log('主线程收到结果:', e.data);
    // 任务完成后销毁 Worker
    myWorker.terminate();
  };

  // 错误处理
  myWorker.onerror = (e) => {
    console.error('Worker 出错:', e.message);
  };

  // 向 Worker 发送任务数据
  myWorker.postMessage(1000000);
}

2.4 主线程与 Worker 双向通信

// Worker 文件:持续接收并响应消息
workerPort.onmessage = (e: MessageEvents) => {
  const { type, data } = e.data;

  if (type === 'compute') {
    const result = data * data;
    workerPort.postMessage({ type: 'result', data: result });
  } else if (type === 'stop') {
    workerPort.close(); // Worker 主动关闭
  }
};
// 主线程:多次与 Worker 通信
const myWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');

myWorker.onmessage = (e) => {
  console.log('收到结果:', e.data);
};

// 发送多次消息
myWorker.postMessage({ type: 'compute', data: 5 });
myWorker.postMessage({ type: 'compute', data: 10 });
myWorker.postMessage({ type: 'stop', data: null });

2.5 传递可转移对象(ArrayBuffer)

通过转移所有权避免内存拷贝,提升大数据传输性能:

const buffer = new ArrayBuffer(1024 * 1024); // 1MB

// 第二个参数指定转移的对象,转移后主线程不可再访问该 buffer
myWorker.postMessage(buffer, [buffer]);

三、TaskPool vs Worker 对比

对比项 TaskPool Worker
线程管理 系统自动管理 开发者手动创建/销毁
适用场景 短时、独立、高频任务 长时运行、持续通信任务
任务粒度 单个函数 独立线程文件
通信方式 async/await 返回值 postMessage 消息传递
优先级控制 支持(HIGH/MEDIUM/LOW) 不支持
任务取消 支持 cancel 支持 terminate
代码复杂度 较高
线程数量 系统自动伸缩 每次 new 创建一个
状态维护 不适合(无状态) 适合(有状态)
数据传递 支持序列化对象 支持序列化 + 可转移对象

四、如何选择

任务是否需要长期运行或维护状态?
├── 是 → 使用 Worker
└── 否
    ├── 任务是否需要频繁提交?
    │   ├── 是 → 使用 TaskPool(系统自动复用线程)
    │   └── 否
    │       ├── 是否需要任务优先级控制?
    │       │   ├── 是 → 使用 TaskPool
    │       │   └── 否 → TaskPool 或 Worker 均可

简单原则:

  • 能用 TaskPool 就用 TaskPool,开销小、使用简单
  • 需要持续运行或复杂双向通信时,选 Worker

五、注意事项

  1. @Concurrent 装饰器:TaskPool 中执行的函数必须加此装饰器,且不能引用外部闭包变量。
  2. 数据隔离:两种方式线程间均不共享内存,传递的对象会被序列化(深拷贝),修改不会影响原对象。
  3. UI 操作:Worker 和 TaskPool 子线程中均不能操作 UI,需将结果传回主线程再更新。
  4. Worker 文件路径:创建 Worker 时路径需与实际文件位置一致,否则运行时报错。
  5. 及时销毁:Worker 使用完毕后应调用 terminate() 释放资源,避免内存泄漏。

六、总结

场景示例 推荐方案
图片压缩、JSON 解析 TaskPool
批量数学计算 TaskPool(TaskGroup)
文件持续读写监听 Worker
WebSocket 长连接维护 Worker
多个独立请求并发处理 TaskPool
需要与子线程反复交互 Worker

TaskPool 轻量易用,适合大多数短任务场景;Worker 灵活强大,适合需要精细控制的复杂场景。根据实际业务选择合适的方案,才能在鸿蒙平台上写出高性能的应用。

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐