HarmonyOS ArkTS 并发编程实战:TaskPool 与 Worker 深度解析## 一、前言随着移动应用功能的日益复杂,异步任务处理和并发编程成为开发者必须掌握的核心技能。HarmonyOS 的 ArkTS 语言在 TypeScript 的基础上,对并发编程能力进行了大幅增强,提供了 TaskPoolWorker 两种并发 API,帮助开发者高效处理耗时任务,避免主线程阻塞。本文将以 HarmonyOS 5.0.0(API 12)为基础,结合华为官方文档,深入解析 ArkTS 并发编程的核心概念和实战技巧。## 二、并发编程基础概念### 2.1 为什么需要并发?在 ArkUI 应用中,主线程负责 UI 渲染和事件响应。如果在主线程执行耗时操作(如网络请求、数据库查询、复杂计算),会导致:- UI 卡顿,用户体验下降- 超过 3 秒未响应触发 ANR(Application Not Responding)- 丢帧,动画不流畅### 2.2 ArkTS 并发模型ArkTS 提供了两种并发模型:| 特性 | TaskPool | Worker ||------|----------|--------|| 生命周期 | 任务级,按需创建和销毁 | 长期运行,可复用 || 适用场景 | 大量独立的短期任务 | 需要与主线程频繁通信的长期任务 || 任务数量 | 不受限制 | 最多 8 个(API 12) || 优先级 | 支持 HIGH/MEDIUM/LOW | 不支持 || API 复杂度 | 简单易用 | 较复杂 |## 三、TaskPool:轻量级任务池TaskPool 是 HarmonyOS 推荐的首选并发方案。它是一个内置的任务调度器,自动管理线程的创建和回收。### 3.1 基本用法import { taskpool } from ‘@kit.ArkTS’;

// 定义要在子线程执行的任务函数
@Concurrent
function calculateSum(start: number, end: number): number {
let sum = 0;
for (let i = start; i <= end; i++) {
sum += i;
}
return sum;
}

@Entry
@Component
struct TaskPoolDemo {
@State result: string = ‘等待计算…’;
@State isRunning: boolean = false;

build() {
Column({ space: 20 }) {
Text(‘TaskPool 并发计算示例’)
.fontSize(24)
.fontWeight(FontWeight.Bold)

  Text(this.result)
    .fontSize(18)
    .fontColor('#007AFF')

  Button('开始计算 1 到 1 亿的和')
    .fontSize(16)
    .enabled(!this.isRunning)
    .onClick(async () => {
      this.isRunning = true;
      this.result = '计算中...';

      try {
        const task = new taskpool.Task(calculateSum, 1, 100000000);
        const sum = await taskpool.execute(task) as number;
        this.result = `计算结果: ${sum}`;
      } catch (error) {
        this.result = `计算失败: ${error}`;
      } finally {
        this.isRunning = false;
      }
    })
}
.width('100%')
.height('100%')
.justifyContent(FlexAlign.Center)
.padding(20)

}
}

3.2 关键注解:@Concurrent@Concurrent 是 ArkTS 中用于标记可并发执行函数的装饰器。被标记的函数需要满足以下约束:- 只能使用 Sendable 类型的数据- 不能使用 import 导入的模块级变量- 不能访问外层作用域的变量(闭包)// ✅ 正确:使用 @Concurrent 标记

@Concurrent
function processData(input: string): string {
return input.toUpperCase();
}

// ❌ 错误:访问了外层变量
let externalData: string = ‘hello’;

@Concurrent
function badFunction(): string {
return externalData; // 编译错误!
}

3.3 TaskPool 任务优先级TaskPool 支持设置任务优先级,让重要的任务优先执行:import { taskpool } from ‘@kit.ArkTS’;

@Concurrent
function importantTask(data: string): string {
return 处理完成: ${data};
}

@Concurrent
function backgroundTask(data: string): string {
return 后台完成: ${data};
}

async function executeTasks() {
const highPriorityTask = new taskpool.Task(importantTask, ‘重要数据’);
const lowPriorityTask = new taskpool.Task(backgroundTask, ‘后台数据’);

const [result1, result2] = await Promise.all([
taskpool.execute(highPriorityTask, taskpool.Priority.HIGH),
taskpool.execute(lowPriorityTask, taskpool.Priority.LOW)
]);

console.info(高优先级结果: ${result1});
console.info(低优先级结果: ${result2});
}

3.4 实战:并行图片处理import { taskpool } from ‘@kit.ArkTS’;

import { image } from ‘@kit.ImageKit’;

@Concurrent
function compressImage(imagePath: string, quality: number): ArrayBuffer {
const imageSource = image.createImageSource(imagePath);
const pixelMap = imageSource.createPixelMapSync();
const compressed = packImage(pixelMap, quality);
return compressed;
}

四、Worker:长期运行的子线程Worker 适合需要与主线程频繁通信的长期任务场景。### 4.1 Worker 基本架构Worker 需要独立的 .ets 文件存放子线程代码。**主线程代码(Index.ets):**import { worker } from ‘@kit.ArkTS’;

@Entry
@Component
struct WorkerDemo {
@State message: string = ‘等待 Worker 响应…’;
private myWorker?: worker.ThreadWorker;

aboutToAppear() {
this.myWorker = new worker.ThreadWorker(‘entry/ets/workers/MyWorker.ets’);

this.myWorker.onmessage = (event: MessageEvents) => {
  const data = event.data as worker.MessageEvents;
  this.message = `Worker 返回: ${data}`;
};

this.myWorker.onerror = (event: ErrorEvent) => {
  this.message = `Worker 错误: ${event.message}`;
};

}

aboutToDisappear() {
this.myWorker?.terminate();
}

build() {
Column({ space: 20 }) {
Text(‘Worker 通信示例’)
.fontSize(24)
.fontWeight(FontWeight.Bold)

  Text(this.message)
    .fontSize(16)
    .fontColor('#333')

  Row({ space: 16 }) {
    Button('发送消息')
      .fontSize(16)
      .onClick(() => {
        this.myWorker?.postMessage('Hello from main thread!');
      })

    Button('终止 Worker')
      .fontSize(16)
      .backgroundColor('#FF4444')
      .onClick(() => {
        this.myWorker?.terminate();
        this.myWorker = undefined;
        this.message = 'Worker 已终止';
      })
  }
}
.width('100%')
.height('100%')
.justifyContent(FlexAlign.Center)
.padding(20)

}
}
**Worker 子线程代码(workers/MyWorker.ets):**import { worker } from ‘@kit.ArkTS’;

const parentPort = worker.workerPort;

parentPort.onmessage = (event: MessageEvents) => {
const message = event.data;
console.info(Worker 收到消息: ${message});
const result = performHeavyTask(message as string);
parentPort.postMessage(result);
};

function performHeavyTask(input: string): string {
let result = ‘’;
for (let i = 0; i < 1000000; i++) {
result = input.split(‘’).reverse().join(‘’);
}
return 处理完成: ${result} (${new Date().toLocaleTimeString()});
}

4.2 Worker 线程数量限制HarmonyOS API 12 中,一个应用最多可以创建 8 个 Worker 线程。import { worker } from ‘@kit.ArkTS’;

function createWorkerIfPossible(scriptName: string): worker.ThreadWorker | null {
try {
return new worker.ThreadWorker(scriptName);
} catch (error) {
console.error(无法创建 Worker: ${error});
return null;
}
}

五、Sendable 类型:安全的跨线程数据传递### 5.1 什么是 Sendable?Sendable 是 ArkTS 引入的概念,用于标识可以安全地在并发实例间传递的数据类型。Sendable 类型包括:- 基本类型:numberstringbooleannullundefined- ArrayBufferSharedArrayBuffer- Sendable 类(使用 @Sendable 装饰器标记的类)- 由上述类型组成的容器:SendableArraySendableMapSendableSetimport { taskpool } from ‘@kit.ArkTS’;

@Sendable
class UserData {
name: string;
age: number;

constructor(name: string, age: number) {
this.name = name;
this.age = age;
}
}

@Concurrent
function processUser(user: UserData): string {
return 用户 ${user.name}, 年龄: ${user.age};
}

async function demo() {
const user = new UserData(‘张三’, 25);
const task = new taskpool.Task(processUser, user);
const result = await taskpool.execute(task);
console.info(result); // 输出: 用户 张三, 年龄: 25
}

5.2 Sendable 使用注意事项import { taskpool } from ‘@kit.ArkTS’;

@Sendable
class ConfigData {
url: string = ‘’;
timeout: number = 5000;

// ✅ 可以定义方法
getFullUrl(path: string): string {
return ${this.url}/${path};
}

// ❌ 不能使用静态属性
// static defaultTimeout: number = 3000; // 编译错误
}

@Concurrent
function fetchConfigData(config: ConfigData): string {
const fullUrl = config.getFullUrl(‘api/data’);
return 请求地址: ${fullUrl}, 超时: ${config.timeout}ms;
}

六、TaskPool 与 Worker 的选择策略### 6.1 推荐使用 TaskPool 的场景import { taskpool } from ‘@kit.ArkTS’;

@Concurrent
function heavyCalculation(params: string): string {
return 计算结果: ${params};
}

async function batchProcess(items: string[]) {
const tasks = items.map(item => new taskpool.Task(heavyCalculation, item));
const results = await Promise.all(tasks.map(t => taskpool.execute(t)));
return results;
}

6.2 推荐使用 Worker 的场景import { worker } from ‘@kit.ArkTS’;

// Worker 端代码 - workers/WebSocketWorker.ets
const parentPort = worker.workerPort;

let wsConnection: WebSocket | null = null;

parentPort.onmessage = (event: MessageEvents) => {
const command = event.data as string;

if (command === ‘connect’) {
wsConnection = new WebSocket(‘wss://api.example.com/ws’);
wsConnection.onmessage = (wsEvent: MessageEvent) => {
parentPort.postMessage({
type: ‘data’,
payload: wsEvent.data
});
};
} else if (command === ‘disconnect’) {
wsConnection?.close();
}
};

6.3 决策表| 条件 | 推荐方案 ||------|---------|| 独立计算任务,执行完即结束 | TaskPool || 需要频繁与主线程通信 | Worker || 批量处理 N 个相似任务 | TaskPool || 需要持久运行的子线程 | Worker || 任务有优先级要求 | TaskPool || 需要 9 个以上并发线程 | TaskPool |## 七、实际工程案例:大文件分片上传import { taskpool } from ‘@kit.ArkTS’;

import { fileIo } from ‘@kit.CoreFileKit’;
import { buffer } from ‘@kit.ArkTS’;

// 文件分片哈希计算任务
@Concurrent
function calculateChunkHash(chunkData: ArrayBuffer): string {
const hashData = buffer.from(chunkData).toString(‘base64’);
return hashData.substring(0, 32);
}

interface UploadChunkParams {
chunkData: ArrayBuffer;
url: string;
chunkIndex: number;
totalChunks: number;
}

// 文件分片上传任务
@Concurrent
function uploadChunk(params: UploadChunkParams): Promise {
const { chunkData, url, chunkIndex, totalChunks } = params;

return new Promise((resolve, reject) => {
const formData = new FormData();
formData.append(‘chunk’, new Blob([chunkData]));
formData.append(‘index’, chunkIndex.toString());
formData.append(‘total’, totalChunks.toString());

fetch(url, {
  method: 'POST',
  body: formData
})
.then(response => response.text())
.then(result => resolve(result))
.catch(error => reject(error));

});
}

@Entry
@Component
struct FileUploader {
@State progress: number = 0;
@State status: string = ‘准备上传…’;
private readonly CHUNK_SIZE = 1024 * 1024; // 1MB

async uploadFile(filePath: string) {
try {
this.status = ‘读取文件中…’;
const file = fileIo.openSync(filePath, fileIo.OpenMode.READ_ONLY);
const fileSize = fileIo.statSync(filePath).size;
const totalChunks = Math.ceil(fileSize / this.CHUNK_SIZE);
this.status = 开始上传 (${totalChunks} 个分片)...;

  for (let i = 0; i < totalChunks; i++) {
    const chunkBuffer = new ArrayBuffer(
      Math.min(this.CHUNK_SIZE, fileSize - i * this.CHUNK_SIZE)
    );
    fileIo.readSync(file.fd, chunkBuffer, { offset: i * this.CHUNK_SIZE });

    // TaskPool 并行计算哈希
    const hash = await taskpool.execute(
      new taskpool.Task(calculateChunkHash, chunkBuffer)
    ) as string;

    // 上传分片
    await taskpool.execute(new taskpool.Task(uploadChunk, {
      chunkData: chunkBuffer,
      url: 'https://api.example.com/upload',
      chunkIndex: i,
      totalChunks
    }));

    this.progress = Math.round(((i + 1) / totalChunks) * 100);
    this.status = `上传中: ${this.progress}% (${i + 1}/${totalChunks})`;
  }

  fileIo.closeSync(file);
  this.status = '上传完成!';
} catch (error) {
  this.status = `上传失败: ${error}`;
}

}

build() {
Column({ space: 20 }) {
Text(‘大文件分片上传’)
.fontSize(24)
.fontWeight(FontWeight.Bold)

  Progress({ value: this.progress, total: 100, type: ProgressType.Linear })
    .width('80%')

  Text(this.status)
    .fontSize(16)
    .fontColor('#666')

  Button('选择文件并上传')
    .fontSize(16)
    .onClick(async () => {
      await this.uploadFile('/data/storage/el2/base/files/sample.mp4');
    })
}
.width('100%')
.padding(20)

}
}

八、常见问题与排错### Q1: 为什么 @Concurrent 函数不能访问闭包变量?A: 并发任务运行在独立的子线程中,子线程没有主线程的上下文环境。闭包变量可能引用主线程的对象,导致不可预期的行为。// ❌ 错误

let counter = 0;
@Concurrent
function badTask(): number {
return ++counter; // 编译错误!
}

// ✅ 正确
@Concurrent
function goodTask(initialValue: number): number {
return initialValue + 1;
}

Q2: TaskPool 任务执行顺序是否保证?A: 不保证。TaskPool 是多线程并发执行的,若需要有序结果,应在所有任务完成后排序。### Q3: Worker 中是否可以使用 UI 相关 API?A: 不能。Worker 运行在无 UI 上下文的子线程中,任何涉及 UI 操作的 API 都会失败。### Q4: 如何调试 Worker 中的代码?A: 使用 console.info 输出日志,或通过 parentPort.postMessage 将调试信息传回主线程。## 九、总结ArkTS 的并发编程体系为开发者提供了强大而灵活的异步处理能力:| 方案 | 核心 API | 最佳场景 ||------|----------|---------|| TaskPool | @Concurrent + taskpool.execute() | 独立短期计算任务,批量处理 || Worker | new worker.ThreadWorker() | 长期运行的后台线程,频繁通信 |**最佳实践建议:**1. 默认优先使用 TaskPool:更简单、更安全,系统自动管理线程资源2. 大数据传递用 Sendable:避免数据深拷贝带来的性能开销3. Worker 数量控制在 8 个以内:超过限制会抛出异常4. 子线程不要操作 UI:所有 UI 更新必须回到主线程5. 及时释放 Worker:使用 terminate() 释放不再需要的线程> 参考文档:> - 华为开发者联盟 HarmonyOS 5.0.0 API 12 — ArkTS 并发编程指南> - ArkCompiler Sendable 类型规范

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐