一、并发同步机制概述

1.1 数据竞争与同步需求

在并发编程中,当多个线程同时访问共享数据且至少有一个线程执行写操作时,就会出现数据竞争(Data Race)。仓颉语言通过类型系统在编译期检测潜在的数据竞争,并提供多种同步机制确保线程安全:

  • 原子操作:最基础的同步原语
  • 互斥锁(Mutex):强制串行化访问
  • 条件变量(Condition):线程间通信机制
  • 内存屏障:控制指令执行顺序
  • 同步代码块:结构化同步方式

1.2 仓颉同步设计哲学

仓颉语言的同步机制遵循三个核心原则:

  1. 安全性优先:默认阻止数据竞争
  2. 显式同步:开发者必须明确标注同步点
  3. 零成本抽象:高级同步原语在无竞争时无额外开销

1.3 同步机制性能影响

不同同步机制的性能特征对比:

机制类型 无竞争开销 高竞争适应性 适用场景
原子操作 1-5ns 简单计数器
互斥锁 20-50ns 临界区保护
条件变量 50-100ns 事件通知
同步块 15-30ns 结构化同步

二、原子操作机制

2.1 原子类型体系

仓颉提供完备的原子类型系统:

 
public class AtomicInt32 {
    // 原子加载当前值
    public func load(order: MemoryOrder = SeqCst): Int32
    
    // 原子存储新值
    public func store(val: Int32, order: MemoryOrder = SeqCst)
    
    // 比较交换(CAS)
    public func compareAndSwap(expected: Int32, new: Int32, order: MemoryOrder = SeqCst): Bool
    
    // 原子加法
    public func fetchAdd(val: Int32, order: MemoryOrder = SeqCst): Int32
    
    // 原子减法
    public func fetchSub(val: Int32, order: MemoryOrder = SeqCst): Int32
}

支持的类型包括:AtomicBoolAtomicInt8/16/32/64AtomicUInt8/16/32/64AtomicPtr<T>

 

2.2 内存顺序模型

仓颉采用C++风格的内存顺序控制:

 
enum MemoryOrder {
    Relaxed  // 无顺序保证
    Consume  // 数据依赖顺序
    Acquire  // 获取语义
    Release  // 释放语义
    AcqRel   // 获取-释放
    SeqCst   // 顺序一致性(默认)
}

典型使用模式:

 
let flag = AtomicBool(false)
let data = AtomicPtr<Data>(null)

// 线程A(生产者)
data.store(createData(), Release)
flag.store(true, Release)

// 线程B(消费者)
while (!flag.load(Acquire)) {}
let d = data.load(Acquire)

2.3 原子操作实战案例

无锁队列实现示例

 
class LockFreeQueue<T> {
    private let head = AtomicPtr<Node<T>>()
    private let tail = AtomicPtr<Node<T>>()
    
    public func enqueue(item: T): Unit {
        let node = Node(item)
        var t = tail.load(Relaxed)
        while (true) {
            let next = t.next.load(Relaxed)
            if (next == null) {
                if (t.next.compareAndSwap(null, node, Release)) {
                    tail.compareAndSwap(t, node, Release)
                    return
                }
            } else {
                tail.compareAndSwap(t, next, Release)
            }
            t = tail.load(Relaxed)
        }
    }
}

三、互斥锁机制

3.1 Mutex类型体系

仓颉提供多层次的互斥锁抽象:

 
interface Lock {
    func lock(): Unit
    func unlock(): Unit
    func tryLock(): Bool
}

class Mutex <: Lock { ... }          // 基本互斥锁
class ReentrantMutex <: Lock { ... } // 可重入锁
class SpinLock <: Lock { ... }       // 自旋锁

3.2 锁的最佳实践

基础用法

 
let mtx = Mutex()
var counter = 0

spawn {
    mtx.lock()
    defer { mtx.unlock() }
    counter += 1
}

结构化同步块

 
synchronized(mtx) {
    // 临界区代码
    counter += 1
}

3.3 高级锁特性

锁性能对比(单位:ns):

锁类型 无竞争获取 轻度竞争 重度竞争
Mutex 25 150 5000
SpinLock 5 200 10000
Reentrant 30 180 5500

死锁预防模式

 
func transfer(from: Account, to: Account, amount: Int64): Unit {
    val lock1 = if (from.id < to.id) from.lock else to.lock
    val lock2 = if (from.id < to.id) to.lock else from.lock
    
    lock1.lock()
    try {
        lock2.lock()
        try {
            from.withdraw(amount)
            to.deposit(amount)
        } finally {
            lock2.unlock()
        }
    } finally {
        lock1.unlock()
    }
}

四、条件变量机制

4.1 Condition接口规范

 
interface Condition {
    func wait(mtx: Lock): Unit
    func wait(mtx: Lock, timeout: Duration): Bool
    func notify(): Unit
    func notifyAll(): Unit
}

4.2 生产者-消费者模式

有界缓冲区实现

 
class BoundedBuffer<T> {
    private let mtx = Mutex()
    private let notFull = mtx.condition()
    private let notEmpty = mtx.condition()
    private let buffer = Array<T?>(capacity)
    private var count = 0
    
    public func put(item: T): Unit {
        synchronized(mtx) {
            while (count == buffer.size) {
                notFull.wait()
            }
            buffer[count++] = item
            notEmpty.notify()
        }
    }
    
    public func take(): T {
        synchronized(mtx) {
            while (count == 0) {
                notEmpty.wait()
            }
            let item = buffer[--count]
            notFull.notify()
            return item!
        }
    }
}

4.3 条件变量性能优化

虚假唤醒处理

 
while (!condition) {
    cond.wait(mtx)
}

批量通知策略

 
// 低效方式
for (_ in 0..<10) {
    cond.notify()
}

// 高效方式
cond.notifyAll()

定时等待模式

 
val deadline = System.currentTimeMillis() + 5000
while (!ready) {
    val remaining = deadline - System.currentTimeMillis()
    if (remaining <= 0) throw TimeoutException()
    cond.wait(mtx, remaining * Duration.millisecond)
}

五、线程睡眠控制

5.1 sleep函数详解

函数原型

 
public func sleep(dur: Duration): Unit

精度与行为

  • 纳秒级精度(实际依赖OS调度器)
  • 总是让出CPU使用权
  • 可被提前唤醒(通过线程中断)
  • 不保证精确时长(受系统负载影响)

5.2 睡眠时长规范

Duration构造方式

 
// 明确单位
sleep(100 * Duration.millisecond)
sleep(2 * Duration.second)
sleep(500 * Duration.microsecond)

// 组合时长
val timeout = 1 * Duration.minute + 30 * Duration.second

特殊值处理

 
sleep(Duration.Zero)  // 主动让出CPU但不保证睡眠
sleep(Duration.Max)    // 最大可能睡眠时长(约292年)

5.3 睡眠控制模式

定时任务调度

 
class Timer {
    private let interval: Duration
    private var running = AtomicBool(false)
    
    public func start(task: () -> Unit): Unit {
        running.store(true)
        spawn {
            while (running.load()) {
                val start = System.currentTimeMillis()
                task()
                val elapsed = System.currentTimeMillis() - start
                if (elapsed < interval.toMillis()) {
                    sleep(interval - elapsed * Duration.millisecond)
                }
            }
        }
    }
}

超时控制模式

 
func withTimeout<T>(timeout: Duration, block: () -> T): Option<T> {
    let result = AtomicRef<Option<T>>(None)
    let done = AtomicBool(false)
    
    val worker = spawn {
        result.set(Some(block()))
        done.store(true)
    }
    
    val watchdog = spawn {
        sleep(timeout)
        if (!done.load()) {
            worker.cancel()
        }
    }
    
    worker.get()
    watchdog.cancel()
    return result.get()
}

六、高级同步模式

6.1 屏障同步(Barrier)

 
class Barrier {
    private let mtx = Mutex()
    private let cond = mtx.condition()
    private let threshold: Int
    private var count = 0
    
    public init(threshold: Int) {
        this.threshold = threshold
    }
    
    public func await(): Unit {
        synchronized(mtx) {
            if (++count == threshold) {
                cond.notifyAll()
            } else {
                while (count < threshold) {
                    cond.wait()
                }
            }
        }
    }
}

6.2 读写锁模式

 
class ReadWriteLock {
    private let mtx = Mutex()
    private let cond = mtx.condition()
    private var readers = 0
    private var writer = false
    
    public func readLock(): Unit {
        synchronized(mtx) {
            while (writer) {
                cond.wait()
            }
            readers++
        }
    }
    
    public func writeLock(): Unit {
        synchronized(mtx) {
            while (writer || readers > 0) {
                cond.wait()
            }
            writer = true
        }
    }
}

6.3 信号量控制

 
class Semaphore {
    private let mtx = Mutex()
    private let cond = mtx.condition()
    private var permits: Int
    
    public init(permits: Int) {
        this.permits = permits
    }
    
    public func acquire(): Unit {
        synchronized(mtx) {
            while (permits == 0) {
                cond.wait()
            }
            permits--
        }
    }
    
    public func release(): Unit {
        synchronized(mtx) {
            permits++
            cond.notify()
        }
    }
}

七、性能调优指南

7.1 同步开销分析工具

性能剖析方法

 
import std.profiler.*

func analyzeContention(): Unit {
    let tracker = LockContentionTracker()
    
    // 注册要监控的锁
    tracker.track(myLock)
    
    // 运行工作负载
    runWorkload()
    
    // 输出争用报告
    println(tracker.report())
}

7.2 锁粒度优化

细粒度锁示例

 
class ConcurrentDict<K, V> {
    private val stripes = Array<Mutex>(16)
    private val data = Array<HashMap<K, V>>(16)
    
    private func stripeFor(key: K): Int {
        return key.hashCode().abs() % stripes.size
    }
    
    public func put(key: K, value: V): Unit {
        val s = stripeFor(key)
        synchronized(stripes[s]) {
            data[s][key] = value
        }
    }
}

7.3 无锁数据结构

无锁栈实现

 
class LockFreeStack<T> {
    private val head = AtomicPtr<Node<T>>(null)
    
    public func push(item: T): Unit {
        val node = Node(item)
        while (true) {
            val h = head.load()
            node.next = h
            if (head.compareAndSwap(h, node)) {
                return
            }
        }
    }
    
    public func pop(): Option<T> {
        while (true) {
            val h = head.load()
            if (h == null) return None
            if (head.compareAndSwap(h, h.next)) {
                return Some(h.item)
            }
        }
    }
}

仓颉语言的同步机制提供了从低层原子操作到高层同步原语的完整体系,结合精确的线程睡眠控制,使开发者能够构建高效且安全的并发程序。理解这些机制的内在原理和适用场景,是编写高性能并发代码的关键。在实际开发中,应当根据具体场景选择最合适的同步策略,并通过性能剖析不断优化同步开销。

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐