“都是鸿蒙分布式时代了,还在单机排队算?来,把任务丢给隔壁设备干!”——ArkTS 跨端任务调度,从 0 到能跑!
本文分享了作者从移动开发转型鸿蒙开发的经验,重点介绍了鸿蒙分布式任务调度的实现方案。文章首先阐述了分布式调度的核心概念:通过DeviceManager发现可信设备,利用DistributedScheduler实现跨设备任务分发。随后详细讲解了任务模型的构建、任务分发的实现方式(包括Want构造和跨设备启动)、以及三种结果回传通道。此外还提供了DeviceManager的实用封装方法,用于设备发现和
我是兰瓶Coding,一枚刚踏入鸿蒙领域的转型小白,原是移动开发中级,如下是我学习笔记《零基础学鸿蒙》,若对你所有帮助,还请不吝啬的给个大大的赞~
前言
我吃过不少“单机当牛马”的亏:一台手机,既要抓网络、又要做重算,列表还要丝滑滚,结果就是——UI 抽风、耗电飙升、用户差评。后来我把鸿蒙的分布式家族认真抠了个底儿掉:DeviceManager 负责“认识谁能干活”、DistributedScheduler 顶着“把谁叫过来干”、再加上“跨端协同计算”的回路接上,整套调度就从“单机苦力”升到“多机协作”。这篇,我按你给的路径排兵布阵:任务分发 → 设备发现 → 跨端协同计算,中间穿插可直接改抄的 ArkTS/ArkUI 代码,力求能跑、好拆、好维护。
我会尽量说人话,但专业细节一个都不藏。偶尔吐槽两句,也是我半夜掉过的坑、交过的学费。走起。
一、开场白:先把“调度”说清楚
“分布式任务调度”在端上是什么?一句话:把一份工作拆成“可描述的任务”,并且把它交给“最合适的设备”去执行。合适的标准可以是:CPU/GPU 能力、网络、是否充电、是否闲置、是否同账号可信、是否在同一局域网,等等。
而在鸿蒙语境里,两件事是地基:
- 设备发现与可信关系:靠 DeviceManager 获取本地/可信设备、监听上下线、做认证。它是“我有谁能用”的来源。
- 跨端启动/调用/迁移:分布式任务调度子系统(DistributedScheduler,DMS)的职责是“把远端组件拉起来、让它干活”。从开发者视角,常见就是跨设备启动 Ability / 连接会话 / 迁移等,属于“把执行权交出去”的那只手。
小结:DeviceManager 决定“人选”,DistributedScheduler 决定“调度动作”。配合一条数据通路(比如分布式数据对象、跨端 Ability 连接、或你自建的网络 RPC),就能闭环。
二、任务分发:从“任务模型”到“把人叫来干活”
2.1 先定任务模型(Task Model)
所有调度系统第一步,都是把任务定义“可传输、可落地、可重试”。一个轻量的 ArkTS 版 Task 接口:
// task/model.ts
export type TaskType = 'TEXT_SUMMARY' | 'THUMBNAIL' | 'VEC_SEARCH'
export interface Task {
id: string // 全局唯一ID(雪花、UUID都行)
type: TaskType
payload: any // 任务输入(JSON可序列化)
priority?: number // 0-100
deadlineMs?: number // 软截止
retry?: { times: number; backoffMs: number }
}
export interface TaskResult {
id: string
ok: boolean
output?: any
error?: string
execDeviceId?: string // 哪台设备干的活
durationMs?: number
}
为什么要这么“俗套”:
- 可序列化:跨设备传输必须 JSON 化。
- 可重试:移动网络真会“抽风”。
- 可观测:谁执行、耗时多少、失败原因,都是线上排障的命根子。
2.2 分发器(Dispatcher):把“可执行的 Want”装出来
在鸿蒙上把“远端能力拉起来”,从 ArkTS 开发者视角,核心动作其实是构造一个含远端 deviceId 的 Want,然后 startAbility(或创建跨端连接会话),DMS 在下层完成“调度”。远端可以是 UIAbility 或 ServiceExtensionAbility(做计算更合适)。重点是 Want 中携带 deviceId/bundle/abilityName 等信息:这一步属于“分发动作”的具体落点。DMS 的开源实现描述了它“跨设备组件管理(启动/迁移/绑定)”的职责边界。
// dispatcher/remote.ts
// 简化:把任务封到 Want 里,发给远端ServiceExtension执行
interface RemoteTarget {
deviceId: string
bundleName: string
abilityName: string // ServiceExtensionAbility 名
}
function buildTaskWant(target: RemoteTarget, task: Task): Want {
return {
deviceId: target.deviceId,
bundleName: target.bundleName,
abilityName: target.abilityName,
parameters: { task: JSON.stringify(task) },
} as Want
}
export async function dispatchToRemote(ctx: UIAbilityContext, target: RemoteTarget, task: Task) {
const want = buildTaskWant(target, task)
// 分布式调度:跨端启动远端能力
await ctx.startAbility(want) // 由 DMS 完成远端拉起
// 数据回路你可以选择:1) ability 连接会话收发文本;2) 分布式数据对象;3) 自建RPC
}
说明:上面用了“带 deviceId 的 Want + startAbility”作为分发落地方式,这符合“分布式调度跨设备启动组件”的范式;具体 API 命名/版本以官方文档为准,但概念不变:分发动作=构造远端 Want + 启动/连接。
2.3 会话通道:如何把结果拿回来?
通道有三条常用路:
- ability 连接会话:HarmonyOS 提供了跨设备 UIAbility 会话管理接口(abilityConnectionManager),可以在会话内进行文本消息传递(API 18+,同账号协同拉起)。做请求-响应非常顺手。
- 分布式数据对象(DDO):把结果写入某个共享 DataObject,另一端订阅变更即可,适合多对多协同。
- 自建网络 RPC:走 HTTP/WebSocket/自研协议,伸缩自如,但要自己打理安全与重试。
一个“能力会话”风格的最小骨架(伪代码,聚焦思路):
// channel/session.ts
import abilityConnMgr from '@ohos.distributed-abilityconnectionmanager' // 名称以文档为准
export async function requestResponseOnce(target: RemoteTarget, payload: string): Promise<string> {
// 1) 创建会话(同账号&组网前提)
const sessionId = await abilityConnMgr.createAbilityConnectionSession({ serviceName: 'dms-compute' })
// 2) 连接到远端 Ability
await abilityConnMgr.connectAbility(sessionId, {
deviceId: target.deviceId,
bundleName: target.bundleName,
abilityName: target.abilityName
})
// 3) 发送请求
await abilityConnMgr.send(sessionId, payload)
// 4) 等待响应(可加超时)
const resp = await abilityConnMgr.receive(sessionId, { timeoutMs: 8000 })
// 5) 断开 & 释放
await abilityConnMgr.disconnectAbility(sessionId)
await abilityConnMgr.closeAbilityConnectionSession(sessionId)
return resp
}
要点:官方文档明确该模块用于创建协同会话并跨设备拉起/连接 UIAbility,并进行文本传递(API 18+)。命名可能会有小差异,但“会话→连接→消息→释放”这条链路是固定的。
三、设备发现:把“谁能干活”摸清楚
没有设备清单和可信关系,谈什么调度?DeviceManager 就是我们的人事部:发现、认证、监听、查询可信设备。官方文档(ArkTS API)写得清清楚楚:你可以注册上下线监听、发现周边不可信设备、发起认证、拿可信列表、拿本机信息,这一切都在分布式管理服务(Distributed Service Kit)里。
3.1 一个实用的 DeviceManager 封装
// device/dm.ts
import { distributedDeviceManager as ddm } from '@kit.DistributedServiceKit' // 按当前SDK导入
type Device = { deviceId: string; deviceName: string; deviceType: string }
export class DeviceDirectory {
private dm!: ddm.DeviceManager
private devices: Device[] = []
async init(bundleName: string) {
this.dm = ddm.createDeviceManager(bundleName)
// 上下线监听
this.dm.on('deviceStateChange', (data) => {
// data 里会有设备id/状态;正式代码里做diff并刷新缓存
this.refreshTrusted()
})
await this.refreshTrusted()
}
async refreshTrusted() {
const list = this.dm.getTrustedDeviceListSync() // 同步拿可信列表
this.devices = list.map(it => ({
deviceId: it.deviceId,
deviceName: it.deviceName,
deviceType: String(it.deviceType)
}))
}
// 发现周边设备(不可信→可发起认证流程)
startDiscovery() {
this.dm.startDeviceDiscovery({ subscribeId: 9527 }) // 订阅ID自定义
}
stopDiscovery() { this.dm.stopDeviceDiscovery(9527) }
// 认证(用户侧会有配对/确认)
async authenticate(deviceId: string) {
await this.dm.authenticateDevice({ deviceId, authType: 1 /* 示例 */ })
await this.refreshTrusted()
}
getTrusted(): Device[] { return this.devices }
}
关键事实:DeviceManager 提供发现不可信设备、认证、查询可信列表、监听上下线等核心能力,是“跨端协同”的入场券。
3.2 把“人选”挑出来:调度选择策略
有了清单,还得挑人。最小可用的“选择器”:
// device/pick.ts
type ScoreRule = (d: Device) => number
export function pickBest(devs: Device[], rules: ScoreRule[]): Device | null {
let best: Device | null = null
let bestScore = -1
for (const d of devs) {
const s = rules.reduce((acc, r) => acc + r(d), 0)
if (s > bestScore) { best = d; bestScore = s }
}
return best
}
规则示例:
- 同账户 & 同一局域网(这通常由系统协同前提保证)+50;
- 平板/电视优先(CPU/GPU 更强)+20;
- 设备名包含“office”(自定义偏好)+10;
- 最近 5 分钟无任务 +10;
- 电量 > 60 且在充电 +10。
别忘了:失败兜底要回落到本地执行,或者切到“第二人选”。这才是真正“健壮”的调度。
四、跨端协同计算:把“算力”拎到对的设备上
4.1 远端执行的基本套路
远端我们准备一个 ServiceExtensionAbility(名字就叫 ComputeServiceExtAbility 吧),被拉起后从 Want 里把任务 JSON 拿出来,执行,并通过会话/数据对象/网络把结果回传。
(远端)执行器骨架:
// service/ComputeServiceExtAbility.ets
import { AbilityConstant, ServiceExtensionAbility } from '@ohos.app.ability'
export default class ComputeServiceExtAbility extends ServiceExtensionAbility {
onStart(want: Want) {
// 取任务
const taskJson = want?.parameters?.task as string
if (!taskJson) { console.error('No task payload'); return }
const task: Task = JSON.parse(taskJson)
// 异步执行
this.runTask(task).then(res => {
// 1) 写入分布式数据对象;或
// 2) 通过会话发送回执;或
// 3) 发HTTP回源
console.info('task done', res.id)
}).catch(e => console.error(e))
}
private async runTask(task: Task): Promise<TaskResult> {
const t0 = Date.now()
try {
let output: any
switch (task.type) {
case 'TEXT_SUMMARY':
output = summarize(task.payload.text); break
case 'THUMBNAIL':
output = await genThumb(task.payload.uri); break
case 'VEC_SEARCH':
output = await vecSearch(task.payload.query); break
default:
throw new Error('unknown task type')
}
return { id: task.id, ok: true, output, execDeviceId: getLocalDeviceId(), durationMs: Date.now()-t0 }
} catch (e) {
return { id: task.id, ok: false, error: String(e), execDeviceId: getLocalDeviceId(), durationMs: Date.now()-t0 }
}
}
}
这段落地到哪?让 DMS(DistributedScheduler) 把它“跨端启动”,你这边只需给 Want 填上
deviceId/bundle/abilityName即可。它属于 DMS “远端组件启动/管理”的适用面。
4.2 结果回传:我更推荐“会话通道”或“DataObject”
方案 A:ability 连接会话(文本消息)
官方文档指出可创建跨设备协同会话并进行文本消息传输;适合“请求-响应”。服务端拿到 sessionId 后 send(),客户端 receive(),流程清爽。
方案 B:分布式数据对象(DDO)
建立一个 TaskResult 的共享对象(或集合),服务端写、客户端订阅;多设备协同/掉线重连更友好。
4.3 调度策略:并发度、回退与幂等
- 并发度:一次只派 N 个任务到远端,避免把对端打爆;对端也做小型队列。
- 回退:远端 8 秒不回,就在本地跑(或换下一台)。
- 幂等:
Task.id作为幂等键;对端若收到重复任务,直接返回“已有结果”。 - 观测:记录
execDeviceId/耗时/失败码,下一次“挑人”可以更聪明。
把“三板斧”写进代码(主端发起):
// orchestrator/runner.ts
import { dispatchToRemote } from '../dispatcher/remote'
import { requestResponseOnce } from '../channel/session'
import { DeviceDirectory } from '../device/dm'
import { pickBest } from '../device/pick'
export class Orchestrator {
constructor(private ctx: UIAbilityContext, private dm: DeviceDirectory) {}
async execute(task: Task): Promise<TaskResult> {
const devs = this.dm.getTrusted()
const best = pickBest(devs, [/* 你的评分规则 */])
if (!best) return this.runLocally(task, 'NO_DEVICE')
try {
await dispatchToRemote(this.ctx, {
deviceId: best.deviceId,
bundleName: 'com.example.compute',
abilityName: 'ComputeServiceExtAbility'
}, task)
// 等待回执(会话通道的简单形态)
const resp = await withTimeout(
requestResponseOnce({ deviceId: best.deviceId, bundleName: 'com.example.compute', abilityName: 'ComputeServiceExtAbility' },
JSON.stringify({ type: 'FETCH_RESULT', id: task.id })),
8000, 'remote-timeout'
)
const res: TaskResult = JSON.parse(resp)
return res
} catch (e) {
// 回退本地
return this.runLocally(task, String(e))
}
}
private async runLocally(task: Task, reason: string): Promise<TaskResult> {
const t0 = Date.now()
try {
// 本地执行(同步或TaskPool)
const output = await localExec(task)
return { id: task.id, ok: true, output, execDeviceId: 'LOCAL', durationMs: Date.now()-t0 }
} catch (e) {
return { id: task.id, ok: false, error: `${reason}:${String(e)}`, execDeviceId: 'LOCAL', durationMs: Date.now()-t0 }
}
}
}
五、把“任务分发 → 设备发现 → 协同计算”串成可跑 Demo
5.1 UI:一个“分布式计算面板”
// pages/DistComputePanel.ets
@Entry
@Component
struct DistComputePanel {
@State text: string = ''
@State logs: string[] = []
private dm = new DeviceDirectory()
private oc!: Orchestrator
aboutToAppear() {
this.dm.init(getBundleName()).then(() => this.log('DeviceManager ready'))
this.oc = new Orchestrator(getContext(this), this.dm)
}
private log(s: string) { this.logs = [s, ...this.logs].slice(0, 30) }
build() {
Column({ space: 12 }) {
Row() {
Button('发现设备').onClick(() => { this.dm.startDiscovery(); this.log('Start discovery') })
Button('停止发现').onClick(() => { this.dm.stopDiscovery(); this.log('Stop discovery') })
Blank()
Button('刷新可信').onClick(() => { this.dm.refreshTrusted(); this.log('Refresh trusted') })
}.padding(12)
// 可信设备列表
List() {
ForEach(this.dm.getTrusted(), (d) => ListItem() {
Row() {
Text(`${d.deviceName} (${d.deviceType})`).fontWeight(FontWeight.Medium)
Blank()
Button('认证').onClick(async () => { await this.dm.authenticate(d.deviceId); this.log(`认证:${d.deviceName}`) })
}.padding(8)
}, d => d.deviceId)
}.height(160)
TextArea({ placeholder: '贴点文本来个跨端摘要?' }).height(120).onChange(v => this.text = v)
Row({ space: 12 }) {
Button('分布式摘要').onClick(async () => {
const task: Task = {
id: `${Date.now()}`,
type: 'TEXT_SUMMARY',
payload: { text: this.text },
retry: { times: 1, backoffMs: 400 }
}
const res = await this.oc.execute(task)
if (res.ok) this.log(`✅ ${res.execDeviceId} 用时 ${res.durationMs}ms`)
else this.log(`❌ ${res.error}`)
})
}.padding(12)
Text('Logs').fontWeight(FontWeight.Bold)
List() {
ForEach(this.logs, (s, i) => ListItem() { Text(s).fontSize(12) }, (s, i) => String(i))
}.height(160)
}.padding(12).height('100%')
}
}
体验走向:已经建立可信关系的平板/电视优先“吃活”,手机 UI 丝滑不抖;一旦远端不回,秒回落本机。这就是分布式调度给用户“无感”的那点魔法。
六、工程抛光:权限、安全、可观测
-
权限与前置条件
- 跨设备协同通常要求同一华为账号登录、蓝牙/同一网络环境可发现,并在系统侧建立可信关系。这属于分布式管理/协同的通用前提。
- API 版本差异:ability 连接会话属于较新的公开能力(API 18+),用前确认当前 SDK/API Level(文档中心有“版本标识”与系统能力说明)。
-
安全
- 只传必要数据,避免隐私越权;
- 对端结果要签名/验参(哪怕是同账号设备);
- 远端能力拉起要白名单控制(bundle/abilityName),拒绝“野设备”。
-
可观测
- 对每个任务记录:派发时间、派往设备、回执耗时、失败码;
- 建一个健康面板(成功率、P95 时延、回退率、设备得分),调度策略靠数据说话。
七、跨端协同计算的“老江湖诀窍”
- 先做本地可跑,再加远端:把“执行器”做成同一接口,远端失败时直接落到本地。
- 任务切碎:小任务拼成批;短平快、可并行、好重试。
- 传指纹不传大对象:让远端按 URI 拉取素材;结果只回摘要或产物地址。
- 会话通道文本化:文本(JSON)最稳,复杂二进制放到对象存储/共享数据再引用。
- 设备画像要动态:把成功率、耗时写回“分数”,让“挑人”进化。
八、常见坑位(我都中招过)
-
“我 startAbility 了,远端没起来?”
- 看看
deviceId/bundle/abilityName拼得对不对; - 设备是否同账号且已建立可信;
- 某些能力仅支持特定 API 级别,对照文档版本标识。
- 看看
-
“能发现设备,但拿不到可信列表?”
- 发现是不可信的第一步,还需认证才能进可信池;认证流程通常伴随系统 UI/确认。
-
“会话能连上,但收不到回应?”
- 确认双方使用同一服务标识、消息格式吻合;
- 记得释放会话,否则资源泄漏、后续连不上。
九、把方案“产品化”:三种落地档位
- 轻量版:手机把重算丢给平板;回执失败就本地算。代价小、收益立竿见影。
- 协同版:手机分片下发到平板 + 电视双端并行;结果合并(Map-Reduce 味儿)。吞吐翻倍。
- 平台版:做“设备调度服务”(可含云端)+ 动态权重 + 灰度试验;跨团队/跨产品复用。
十、附录:极简“文本摘要”示例的端端契约
请求(Task.payload):
{ "text": "请对这段文本做 120 字摘要……" }
响应(TaskResult.output):
{
"summary": "……",
"confidence": 0.87,
"device": "HUAWEI PAD 11 2025"
}
端到端对齐“契约”,就不用在会上各执一词地吵“你们返回的是啥”。
结语:把“多设备”当你的“天然线程池”
我现在写端上复杂功能,脑子已自带一个小人儿:**这活儿,一定要手机干吗?**如果不是,那就交给“隔壁壮实的家伙”(平板/电视/笔记本)。
分布式调度不是魔法,它就是工程化地把“谁干什么、在哪儿干、干完怎么算账”说清楚。DeviceManager 负责认识“人”、DistributedScheduler 负责指挥“干”、会话/数据对象负责“说话传账”。当你的项目从“单机苦力”升级为“团队协作”,你会明显感觉到:UI 更稳、功耗更友好、用户更快乐。
下次需求评审,产品说“这段超重计算能不能不卡?”你就笑笑:
“不用卡。分出去干。”
参考与进一步阅读
- 分布式设备管理(@ohos.distributedDeviceManager / Distributed Service Kit):官方 ArkTS API,覆盖设备发现/认证/可信列表/本机信息/上下线监听等核心能力。
- 文档中心 & API 版本说明:查看系统能力(SysCap)、API 等级、是否支持元服务/卡片等信息,做版本适配。
- 分布式任务调度(DMS)开源仓 / 能力说明:底层跨设备组件管理(启动/迁移/绑定)职责与功能概览。
- ability 连接会话(AbilityConnectionManager):创建跨设备会话并传递文本消息(API 18+),便于“请求-响应”式协同。
- 分布式数据对象(DataObject):端端共享数据、订阅变更,适合“多对多协同”。
注:示例中的模块导入名/函数名以你项目所用 HarmonyOS SDK 版本为准;概念与链路保持稳定(设备发现→分发启动→通道回执)。在接入前先对照当前 API 文档页的版本标识与系统能力提示,避免“在不支持的 API Level 上用新功能”的尴尬。
…
(未完待续)
更多推荐



所有评论(0)