临界区管理

多线程编程中, 互斥 (mutual exclusion, a.k.a. mutex) 是一个很重要的概念. 而互斥的本质就是对于临界区 (critical section) 的管理.

以前一直以为, 要实现互斥必须需要 CPU 提供相关的原语指令 (比如 Test and Set 指令) 才行. 现在发现原来有很多算法可以实现互斥.

软件方案

互斥这一问题最早是由 Dijkstra 在 1965 年提出的, 要求一个算法使得 N 台计算机在任意时刻, 有且只有一台计算机在执行临界区的代码. N 台电脑之间通过一块共享的存储区域传递消息, 对于共享区域的读或者写是原子操作.

如果一个变量会被多个线程读写的话, 我们称之为 公共变量 (Public). 如果一个变量会被多个线程读但只会被一个线程写的话, 我们称之为 保护变量 (Protected). 如果一个变量只会被一个线程读写的话, 我们称之为 私有变量 (Private).

保护变量被多个线程读取并在一定条件下使用是没有问题的. 我们会发现, 下面提到的互斥算法从本质上来讲都是 通过使用保护变量来对共享变量的读写进行同步.

我们来看一个具体的 Java 的多线程问题: N 个线程对一个变量加 M 次 1, 如何不用任何现成的锁机制使得最后这个变量的值为 N * M.

public class MutualExclusion {
    static final int N = 10;
    static final int M = 10000;

    volatile static int count = 0;

    public static void main(String[] args) {
        Thread[] threads = new Thread[N];
        for (int i = 0; i < N; i++) {
            Thread t = new Worker(i);
            threads[i] = t;
            t.start();
        }
        for (Thread t: threads) {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        if (count != N * M) {
            System.out.println("count (" + count + ") != N * M (" + String.valueOf(N * M) + ")");
        }
    }

    static class Worker extends Thread {
        int id;
        Worker(int  id) {
            this.id = id;
        }

        @Override
        public void run() {
            for (int i = 0; i < M; i++) {
                this.lock();
                // critical section
                count++;
                if (i % 1000 == 0) {
                    System.out.println(this.getName() + ": " + count);
                }
                this.unlock();
            }
        }

        void lock() {
            // TODO
        }

        void unlock() {
            // TODO
        }
    }
}

Dijkstra 提出的算法

这个算法是 Dijkstra 在提出互斥问题的那篇论文里给出的:

static AtomicIntegerArray b = new AtomicIntegerArray(N);
static AtomicIntegerArray c = new AtomicIntegerArray(N);
volatile static int k = 0;

void lock() {
    b.set(this.id, 0);
    outer:
    for (;;) {
        if (k == this.id) {
            c.set(this.id, 0);

            for (int i = 0; i < N; i++) {
                if (i != this.id && c.get(i) == 0) {
                    continue outer;
                }
            }
            break;
        } else {
            c.set(this.id, 1);
            if (b.get(k) == 1) {
                k = this.id;
            }
        }
    }
}

void unlock() {
    b.set(this.id, 1);
    c.set(this.id, 1);
}

为了防止因为 JMM 中的 cache 影响程序的正常运行, 所以这里使用 AtomicIntegerArray 替代 boolean[].

这里的 k 是共享变量, Dijkstra 提出的算法利用 bc 这两个保护变量来进行同步.

这个算法存在一个问题, 就是 k == this.id 的线程可能会一直获得执行权, 从而导致其它线程一直等待. 后来 Knuth 等人针对这个问题对算法进行了改进, 感兴趣的可以找相关文献看看.

Perterson 算法

这个算法是解决两线程的互斥算法, 结构非常简单, 这里就直接把 Peterseon 原文里的代码摘抄过来:

/* trying protocol for P1 */
Q1 := true;
TURN := 1;
wait until not Q2 or TURN = 2;
Critical Section;
/* exit protocol for P1 */
Q1 := false.

/* trying protocol for P2 */
Q2 := true;
TURN := 2;
wait until not Q1 or TURN = 1;
Critical Section;
/* exit protocol for P2 */
Q2 := false.

Filter 算法

Peterson 在同一篇论文里把互斥算法扩展到支持 N 个线程, 这个算法又被称作 Filter 算法.

static AtomicIntegerArray level = new AtomicIntegerArray(N);
static AtomicIntegerArray lastToEnter = new AtomicIntegerArray(N - 1);

void lock() {
    for (int i = 0; i < (N - 1); i++) {
        level.set(this.id, i);
        lastToEnter.set(i, this.id);
        outer:
        while (lastToEnter.get(i) == this.id) {
            for (int k = 0; k < N; k++ ) {
                if (k != this.id && level.get(k) >= i) {
                    continue outer;
                }
            }
            break;
        }
    }
}

void unlock() {
    level.set(this.id, -1);
}

算法中, 每个线程都有个 level, 线程需要一级一级的升级, 直到 level 达到 N (算法中并没有真的达到 N, 但相当于升级到 N) 才获得执行权限.

在每个 level 中应用 Peterson 算法确保会有一个线程维持在该等级.

P.S. 互斥算法不仅仅上面介绍的这几种, 还有很多, 比如 Dekker 算法和面包店算法, 感兴趣的就自己查阅资料吧, 本质都差不多的.

硬件方案

解决互斥问题的关键就是对共享变量的读写进行同步, 如果硬件上能保证对一个共享变量的读写是同步的就可以解决互斥问题.

下面简单介绍几种常见的方案.

关中断

实现互斥最简单的办法就是关中断. 但是这样会导致系统的性能严重下降, 对于多 CPU 的系统关中断也不能解决问题.

Test and Set 指令

硬件提供的 TS 指令可以看做是一个函数(函数的执行不会被打断):

int TS(int* locked) {
    if (*locked == 0) {
        *locked = 1;
        return 1;
    }
    return 0;
}

有了 TS 指令, 可以很容易对临界区进行管理:

while (TS(locked) == 0) {
    // wait
}

// critical section

*locked = 1

对换 (Swap) 指令

和 TS 指令类似, Swap 也可以看作是一个函数:

void swap(int* a, int* b) {
    int tmp = *a;
    *a = *b;
    *b = tmp;
}

进程可以这样使用 Swap 指令:

int pi = 1; // 私有变量
while (pi == 1) {
    swap(locked, &pi);
}

// critical section

*locked = 0;

操作系统教程

经典互斥算法 - 郑思遥