操作系统

OS 并发与经典同步

哲学家就餐、银行家、生产者-消费者、读者-写者、Peterson、死锁检测——六大经典并发问题的算法实现

问题核心挑战解决方案
哲学家就餐死锁 & 饥饿(共享筷子)资源排序 / 仲裁者 / 监视器
银行家算法死锁预防(安全状态检测)安全算法 + 资源请求算法
生产者-消费者缓冲区同步信号量(mutex + empty + full)
读者-写者允许并发读,排他写读者优先 / 写者优先 / 公平
Peterson 算法无硬件原语的互斥共享 flag[] + turn 变量
死锁检测资源分配图中的环RAG 环检测 + 回收策略

哲学家就餐问题

5 个哲学家,5 根筷子,圆桌摆放。死锁根因:所有人同时拿左侧筷子 → 形成循环等待。

资源排序方案(破坏循环等待)

  1. 为每根筷子分配唯一编号 0..n-1
  2. 哲学家 i 的筷子对为 (i, (i+1)%n)
  3. 总是先拿编号小的,再拿编号大的
  4. 最后一个哲学家(n-1)改为先拿 0 号——循环被打破
// Dining Philosophers — resource ordering (breaks circular wait)
void philosopher(int i, int n, mutex* mtx) {
    int first  = min(i, (i + 1) % n); // always grab lower-numbered first
    int second = max(i, (i + 1) % n);
    while (true) {
        think();
        mtx[first].lock();
        mtx[second].lock();
        eat();
        mtx[second].unlock();
        mtx[first].unlock();
    }
}
// Arbiter variant: semaphore arbiter(n-1);
// Acquire arbiter before picking up any fork — at most n-1 can try simultaneously

仲裁者方案用信号量初始化为 n−1,最多允许 n−1 个哲学家同时尝试,至少一人能成功拿到两根筷子。

练习 — 哲学家就餐(资源排序)代码练习
测试用例(3 个)
用例 1
输入:5
期望:0 1 1 2 2 3 3 4 0 4
用例 2
输入:3
期望:0 1 1 2 0 2
用例 3
输入:2
期望:0 1 0 1

银行家算法

在分配资源前通过安全状态检测验证系统不会进入死锁,是经典的死锁预防算法。

矩阵定义

  • available[j]:资源 j 的空闲数量
  • allocation[i][j]:进程 i 已持有资源 j 的数量
  • need[i][j] = max[i][j] − allocation[i][j]:进程 i 还需要资源 j 的数量

安全算法(O(n² × m))

  1. work = available 的副本;finish[i] = false
  2. 找进程 i 使得 !finish[i] 且 need[i] ≤ work(逐分量比较)
  3. 若找到:work += allocation[i],finish[i] = true,重复步骤 2
  4. 若所有 finish[i] == true → 安全状态;否则 → 不安全
// Banker's Safety Algorithm — O(n^2 * m)
bool safeState(int n, int m,
               vector<int>& avail,
               vector<vector<int>>& alloc,
               vector<vector<int>>& need,
               vector<int>& seq) {
    vector<int> work = avail;
    vector<bool> finish(n, false);
    seq.clear();
    while ((int)seq.size() < n) {
        bool found = false;
        for (int i = 0; i < n; i++) {
            if (finish[i]) continue;
            bool ok = true;
            for (int j = 0; j < m; j++)
                if (need[i][j] > work[j]) { ok = false; break; }
            if (ok) {
                for (int j = 0; j < m; j++) work[j] += alloc[i][j];
                finish[i] = true;
                seq.push_back(i);
                found = true;
            }
        }
        if (!found) return false;
    }
    return true;
}

资源请求算法request > need → 报错;request > available → 阻塞;试探性分配 → 运行安全算法 → 不安全则回滚。

练习 — 银行家安全状态检测代码练习
测试用例(3 个)
用例 1
输入:5 3 3 3 2 0 1 0 2 0 0 3 0 2 2 1 1 0 0 2 7 4 3 1 2 2 6 0 0 0 1 1 4 3 1
期望:P1 P3 P4 P0 P2
用例 2
输入:5 3 1 0 0 0 1 0 2 0 0 3 0 2 2 1 1 0 0 2 7 4 3 1 2 2 6 0 0 0 1 1 4 3 1
期望:UNSAFE
用例 3
输入:3 2 2 2 1 0 0 1 1 1 1 2 2 1 0 0
期望:P2 P0 P1

生产者-消费者

有界缓冲区同步:生产者等待空位,消费者等待物品,两者互斥访问缓冲区。

三信号量方案

  • mutex = 1:互斥访问缓冲区
  • empty = N:空位计数(初始 = 缓冲区大小 N)
  • full = 0:物品计数(初始 = 0)
// Producer-Consumer with semaphores — three-semaphore pattern
// sem_t mutex(1), empty(N), full(0)
void producer(sem_t& mutex, sem_t& empty, sem_t& full) {
    sem_wait(&empty);    // wait for an empty slot
    sem_wait(&mutex);
    buffer.push(produce());
    sem_post(&mutex);
    sem_post(&full);     // signal: one more item available
}
void consumer(sem_t& mutex, sem_t& empty, sem_t& full) {
    sem_wait(&full);     // wait for an item
    sem_wait(&mutex);
    consume(buffer.pop());
    sem_post(&mutex);
    sem_post(&empty);    // signal: one more slot available
}
// DEADLOCK TRAP: sem_wait(&mutex) before sem_wait(&empty) in producer
// → full buffer: producer holds mutex and waits for empty,
//   consumer waits for mutex → deadlock

死锁陷阱若生产者先 wait(mutex) 再 wait(empty):缓冲区满时持有 mutex 的生产者等待 empty,消费者等待 mutex → 死锁。正确顺序:先 wait(empty/full),再 wait(mutex)。

练习 — 有界缓冲区模拟代码练习
测试用例(3 个)
用例 1
输入:3 P P P C P C C
期望:P: 1 P: 2 P: 3 C: 2 P: 3 C: 2 C: 1
用例 2
输入:2 P P P C
期望:P: 1 P: 2 P: FULL C: 1
用例 3
输入:3 C P P
期望:C: EMPTY P: 1 P: 2

读者-写者问题

允许多个读者并发访问(共享锁),写者需要排他访问(独占锁)。读者优先方案中写者可能饥饿。

读者优先变量

  • readers:当前活跃读者数量
  • mutex = 1:保护 readers 计数
  • rw_mutex = 1:写者互斥锁(第一个读者加锁,最后一个读者解锁)
// Reader-Writer — reader priority (writers may starve)
// sem_t mutex(1), rw_mutex(1); int readers = 0
void reader(sem_t& mutex, sem_t& rw_mutex, int& readers) {
    sem_wait(&mutex);
    if (++readers == 1) sem_wait(&rw_mutex); // 1st reader blocks writers
    sem_post(&mutex);
    /* read critical section */
    sem_wait(&mutex);
    if (--readers == 0) sem_post(&rw_mutex); // last reader unblocks writers
    sem_post(&mutex);
}
void writer(sem_t& rw_mutex) {
    sem_wait(&rw_mutex);
    /* write critical section */
    sem_post(&rw_mutex);
}
// Writer starvation: new readers keep joining → writer waits indefinitely
// Fair fix: add sem_t order(1); both reader and writer acquire order first

写者饥饿若读者源源不断,writers 永远等待。改进:加第三信号量 order,强制按 FIFO 到达顺序(公平方案)。

练习 — 读者优先调度代码练习
测试用例(3 个)
用例 1
输入:R R W R W
期望:R1 R2 R4 W3 W5
用例 2
输入:W R R W
期望:R2 R3 W1 W4
用例 3
输入:W W W
期望:W1 W2 W3

Peterson 算法

纯软件互斥,无需硬件原子指令,适用于两个进程。在现代乱序 CPU 上需要内存屏障。

互斥性证明(反证)

  1. 假设 P0 和 P1 同时在临界区
  2. P0 进入 CS → while 退出条件:flag[1]==false 或 turn==0
  3. P1 进入 CS → while 退出条件:flag[0]==false 或 turn==1
  4. 两者都在 CS → flag[0]=flag[1]=true,且 turn==0 且 turn==1,矛盾
// Peterson's Algorithm — mutual exclusion for 2 processes
bool flag[2] = {false, false};
int turn;

void entry(int i) {          // i ∈ {0, 1}
    flag[i] = true;          // declare intent to enter CS
    turn = 1 - i;            // yield: let the other go first
    while (flag[1-i] && turn == 1-i) {} // busy-wait
}
void exit_(int i) {
    flag[i] = false;         // withdraw intent
}
// Proof: if both in CS → flag[0]=flag[1]=true AND turn==0 AND turn==1 → impossible
// Satisfies: mutual exclusion, progress, bounded waiting (≤ 1 turn wait)

现代 CPU 注意现代 CPU 的指令重排可能破坏 Peterson 的假设。实际应用中需要 std::atomic<bool> 或显式内存屏障 atomic_thread_fence。

练习 — Peterson 互斥模拟代码练习
测试用例(3 个)
用例 1
输入:1
期望:P0 P1
用例 2
输入:2
期望:P0 P1 P0 P1
用例 3
输入:3
期望:P0 P1 P0 P1 P0 P1

死锁检测

类似银行家安全算法,但 finish[i] 初始化不同:若进程 i 未持有任何资源则 finish[i] = true(认为它不会占用资源阻塞他人)。

Coffman 四个必要条件(1971)

  • 互斥:资源不可共享
  • 持有并等待:进程持有资源同时等待其他资源
  • 不可抢占:资源不能被强制取走
  • 循环等待:P1→P2→…→P1 形成环路

检测算法

  1. work = available;finish[i] = (allocation[i] 全零)
  2. 找 i 使得 !finish[i] 且 request[i] ≤ work
  3. 若找到:work += allocation[i],finish[i] = true,重复
  4. finish[i] == false 的进程即处于死锁状态
// Deadlock Detection (resource allocation graph reduction)
// finish[i] starts true only if process i holds NO resources
vector<int> detect(int n, int m,
                   vector<int>& avail,
                   vector<vector<int>>& alloc,
                   vector<vector<int>>& req) {
    vector<int> work = avail;
    vector<bool> finish(n);
    for (int i = 0; i < n; i++) {
        finish[i] = true;
        for (int j = 0; j < m; j++)
            if (alloc[i][j]) { finish[i] = false; break; }
    }
    bool changed = true;
    while (changed) {
        changed = false;
        for (int i = 0; i < n; i++) {
            if (finish[i]) continue;
            bool ok = true;
            for (int j = 0; j < m; j++)
                if (req[i][j] > work[j]) { ok = false; break; }
            if (ok) {
                for (int j = 0; j < m; j++) work[j] += alloc[i][j];
                finish[i] = true; changed = true;
            }
        }
    }
    vector<int> dl;
    for (int i = 0; i < n; i++) if (!finish[i]) dl.push_back(i);
    return dl; // deadlocked process indices
}

恢复策略进程终止(全部终止或逐一终止直到死锁解除)、资源抢占(回滚代价最小的牺牲者)。

练习 — 死锁进程检测代码练习
测试用例(3 个)
用例 1
输入:3 2 0 0 1 0 0 1 0 0 0 1 1 0 0 0
期望:P0 P1
用例 2
输入:2 1 0 1 1 1 1
期望:P0 P1
用例 3
输入:2 1 1 1 1 0 1
期望:none