鸿蒙next开发:多级Worker间高性能消息通信
本文详细介绍了如何在鸿蒙HarmonyOSNext系统中实现多级Worker间的高性能消息通信。
·
往期鸿蒙全套实战文章必看:(附带鸿蒙全栈学习资料)
多级Worker间高性能消息通信
多级Worker(即通过父Worker创建子Worker的机制形成层级线程关系)间通信是一种常见的需求,由于Worker线程生命周期由用户自行管理,因此需要注意多级Worker生命周期的正确管理,建议开发者确保销毁父Worker前先销毁所有子Worker。
本文介绍如何在多级Worker间实现高性能消息通信,高性能消息通信的关键在于Sendable对象,结合Worker的postMessageWithSharedSendable接口,可以实现线程间高性能的对象传递。以数据克隆场景为例,假设有三个Worker,一个父Worker和两个子Worker,父Worker负责创建子Worker,并向子Worker发送数据克隆任务,子Worker负责接收任务并执行数据克隆操作,完成后将克隆结果返回给父Worker。
-
准备一个Sendable类CopyEntry,用于封装克隆任务数据。
// CopyEntry.ets @Sendable export class CopyEntry { // 克隆类型 type: string; // 文件路径 filePath: string; constructor(type: string, filePath: string) { this.type = type; this.filePath = filePath; } } -
准备两个Worker文件,父Worker文件为ParentWorker.ets,子Worker文件为ChildWorker.ets。父Worker负责分发克隆任务并判断任务全部完成后关闭子Worker与父Worker;子Worker负责接收任务并执行数据克隆操作,并在任务完成后通知父Worker。
// ParentWorker.ets import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker, collections, ArkTSUtils } from '@kit.ArkTS' import { CopyEntry } from './CopyEntry' const workerPort: ThreadWorkerGlobalScope = worker.workerPort; // 计算worker1的任务数量 let count1 = 0; // 计算worker2的任务数量 let count2 = 0; // 计算总任务数量 let sum = 0; // 异步锁 const asyncLock = new ArkTSUtils.locks.AsyncLock(); // 创建子Worker const copyWorker1 = new worker.ThreadWorker('entry/ets/pages/ChildWorker'); const copyWorker2 = new worker.ThreadWorker('entry/ets/pages/ChildWorker'); workerPort.onmessage = (e : MessageEvents) => { let array = e.data as collections.Array<CopyEntry>; sum = array.length; for (let i = 0; i < array.length; i++) { let entry = array[i]; if (entry.type === 'copy1') { count1++; // 如果是copy1类型,则将数据传递给 copyWorker1 copyWorker1.postMessageWithSharedSendable(entry); } else if (entry.type === 'copy2') { count2++; // 如果是copy2类型,则将数据传递给 copyWorker2 copyWorker2.postMessageWithSharedSendable(entry); } } } copyWorker1.onmessage = async (e : MessageEvents) => { console.info('copyWorker1 onmessage:' + e.data); await asyncLock.lockAsync(() => { count1--; if (count1 == 0) { // 如果copyWorker1的任务全部完成,则关闭copyWorker1 console.info('copyWorker1 close'); copyWorker1.terminate(); } sum--; if (sum == 0) { // 如果所有任务全部完成,则关闭父Worker workerPort.close(); } }) } copyWorker2.onmessage = async (e : MessageEvents) => { console.info('copyWorker2 onmessage:' + e.data); await asyncLock.lockAsync(() => { count2--; sum--; if (count2 == 0) { // 如果copyWorker2的任务全部完成,则关闭copyWorker2 console.info('copyWorker2 close') copyWorker2.terminate(); } if (sum == 0) { // 如果所有任务全部完成,则关闭父Worker workerPort.close(); } }) } workerPort.onmessageerror = (e : MessageEvents) => { console.info('onmessageerror:' + e.data); } workerPort.onerror = (e : ErrorEvent) => { console.info('onerror:' + e.message); }// ChildWorker.ets import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker} from '@kit.ArkTS' import { CopyEntry } from './CopyEntry' const workerPort: ThreadWorkerGlobalScope = worker.workerPort; workerPort.onmessage = (e : MessageEvents) => { let data = e.data as CopyEntry; // 中间copy操作省略 console.info(data.filePath); workerPort.postMessageWithSharedSendable("done"); } workerPort.onmessageerror = (e : MessageEvents) => { console.info('onmessageerror:' + e.data); } workerPort.onerror = (e : ErrorEvent) => { console.info('onerror:' + e.message); } -
在UI主进程页面,创建父Worker并准备克隆任务所需的数据,准备完成后将数据发送给父Worker。
// Index.ets import { worker, collections } from '@kit.ArkTS'; import { BusinessError } from '@kit.BasicServicesKit'; import { CopyEntry } from './CopyEntry' function promiseCase() { let p: Promise<void> = new Promise<void>((resolve: Function, reject: Function) => { setTimeout(() => { resolve(1); }, 100) }).then(undefined, (error: BusinessError) => { }) return p; } async function postMessageTest() { let ss = new worker.ThreadWorker("entry/ets/pages/ParentWorker"); let isTerminate = false; ss.onexit = () => { isTerminate = true; } let array = new collections.Array<CopyEntry>(); // 准备数据 for (let i = 0; i < 4; i++) { if (i % 2 == 0) { array.push(new CopyEntry("copy1", "file://copy1.txt")); } else { array.push(new CopyEntry("copy2", "file://copy2.txt")); } } // 给Worker线程发送消息 ss.postMessageWithSharedSendable(array); while (!isTerminate) { await promiseCase(); } console.info("Worker线程已退出"); } @Entry @Component struct Index { @State message: string = 'Hello World'; build() { Row() { Column() { Text(this.message) .fontSize(50) .fontWeight(FontWeight.Bold) .onClick(() => { postMessageTest(); }) } .width('100%') } .height('100%') } }
更多推荐


所有评论(0)