OS 并发与经典同步
哲学家就餐、银行家、生产者-消费者、读者-写者、Peterson、死锁检测——六大经典并发问题的算法实现
| 问题 | 核心挑战 | 解决方案 |
|---|---|---|
| 哲学家就餐 | 死锁 & 饥饿(共享筷子) | 资源排序 / 仲裁者 / 监视器 |
| 银行家算法 | 死锁预防(安全状态检测) | 安全算法 + 资源请求算法 |
| 生产者-消费者 | 缓冲区同步 | 信号量(mutex + empty + full) |
| 读者-写者 | 允许并发读,排他写 | 读者优先 / 写者优先 / 公平 |
| Peterson 算法 | 无硬件原语的互斥 | 共享 flag[] + turn 变量 |
| 死锁检测 | 资源分配图中的环 | RAG 环检测 + 回收策略 |
哲学家就餐问题
5 个哲学家,5 根筷子,圆桌摆放。死锁根因:所有人同时拿左侧筷子 → 形成循环等待。
资源排序方案(破坏循环等待)
- 为每根筷子分配唯一编号 0..n-1
- 哲学家 i 的筷子对为 (i, (i+1)%n)
- 总是先拿编号小的,再拿编号大的
- 最后一个哲学家(n-1)改为先拿 0 号——循环被打破
// Dining Philosophers — resource ordering (breaks circular wait)
void philosopher(int i, int n, mutex* mtx) {
int first = min(i, (i + 1) % n); // always grab lower-numbered first
int second = max(i, (i + 1) % n);
while (true) {
think();
mtx[first].lock();
mtx[second].lock();
eat();
mtx[second].unlock();
mtx[first].unlock();
}
}
// Arbiter variant: semaphore arbiter(n-1);
// Acquire arbiter before picking up any fork — at most n-1 can try simultaneously仲裁者方案:用信号量初始化为 n−1,最多允许 n−1 个哲学家同时尝试,至少一人能成功拿到两根筷子。
测试用例(3 个)
50 1
1 2
2 3
3 4
0 4
30 1
1 2
0 2
20 1
0 1
银行家算法
在分配资源前通过安全状态检测验证系统不会进入死锁,是经典的死锁预防算法。
矩阵定义
- available[j]:资源 j 的空闲数量
- allocation[i][j]:进程 i 已持有资源 j 的数量
- need[i][j] = max[i][j] − allocation[i][j]:进程 i 还需要资源 j 的数量
安全算法(O(n² × m))
- work = available 的副本;finish[i] = false
- 找进程 i 使得 !finish[i] 且 need[i] ≤ work(逐分量比较)
- 若找到:work += allocation[i],finish[i] = true,重复步骤 2
- 若所有 finish[i] == true → 安全状态;否则 → 不安全
// Banker's Safety Algorithm — O(n^2 * m)
bool safeState(int n, int m,
vector<int>& avail,
vector<vector<int>>& alloc,
vector<vector<int>>& need,
vector<int>& seq) {
vector<int> work = avail;
vector<bool> finish(n, false);
seq.clear();
while ((int)seq.size() < n) {
bool found = false;
for (int i = 0; i < n; i++) {
if (finish[i]) continue;
bool ok = true;
for (int j = 0; j < m; j++)
if (need[i][j] > work[j]) { ok = false; break; }
if (ok) {
for (int j = 0; j < m; j++) work[j] += alloc[i][j];
finish[i] = true;
seq.push_back(i);
found = true;
}
}
if (!found) return false;
}
return true;
}资源请求算法:request > need → 报错;request > available → 阻塞;试探性分配 → 运行安全算法 → 不安全则回滚。
测试用例(3 个)
5 3
3 3 2
0 1 0
2 0 0
3 0 2
2 1 1
0 0 2
7 4 3
1 2 2
6 0 0
0 1 1
4 3 1P1 P3 P4 P0 P2
5 3
1 0 0
0 1 0
2 0 0
3 0 2
2 1 1
0 0 2
7 4 3
1 2 2
6 0 0
0 1 1
4 3 1UNSAFE
3 2
2 2
1 0
0 1
1 1
1 2
2 1
0 0P2 P0 P1
生产者-消费者
有界缓冲区同步:生产者等待空位,消费者等待物品,两者互斥访问缓冲区。
三信号量方案
- mutex = 1:互斥访问缓冲区
- empty = N:空位计数(初始 = 缓冲区大小 N)
- full = 0:物品计数(初始 = 0)
// Producer-Consumer with semaphores — three-semaphore pattern
// sem_t mutex(1), empty(N), full(0)
void producer(sem_t& mutex, sem_t& empty, sem_t& full) {
sem_wait(&empty); // wait for an empty slot
sem_wait(&mutex);
buffer.push(produce());
sem_post(&mutex);
sem_post(&full); // signal: one more item available
}
void consumer(sem_t& mutex, sem_t& empty, sem_t& full) {
sem_wait(&full); // wait for an item
sem_wait(&mutex);
consume(buffer.pop());
sem_post(&mutex);
sem_post(&empty); // signal: one more slot available
}
// DEADLOCK TRAP: sem_wait(&mutex) before sem_wait(&empty) in producer
// → full buffer: producer holds mutex and waits for empty,
// consumer waits for mutex → deadlock死锁陷阱:若生产者先 wait(mutex) 再 wait(empty):缓冲区满时持有 mutex 的生产者等待 empty,消费者等待 mutex → 死锁。正确顺序:先 wait(empty/full),再 wait(mutex)。
测试用例(3 个)
3
P P P C P C CP: 1
P: 2
P: 3
C: 2
P: 3
C: 2
C: 1
2
P P P CP: 1
P: 2
P: FULL
C: 1
3
C P PC: EMPTY
P: 1
P: 2
读者-写者问题
允许多个读者并发访问(共享锁),写者需要排他访问(独占锁)。读者优先方案中写者可能饥饿。
读者优先变量
- readers:当前活跃读者数量
- mutex = 1:保护 readers 计数
- rw_mutex = 1:写者互斥锁(第一个读者加锁,最后一个读者解锁)
// Reader-Writer — reader priority (writers may starve)
// sem_t mutex(1), rw_mutex(1); int readers = 0
void reader(sem_t& mutex, sem_t& rw_mutex, int& readers) {
sem_wait(&mutex);
if (++readers == 1) sem_wait(&rw_mutex); // 1st reader blocks writers
sem_post(&mutex);
/* read critical section */
sem_wait(&mutex);
if (--readers == 0) sem_post(&rw_mutex); // last reader unblocks writers
sem_post(&mutex);
}
void writer(sem_t& rw_mutex) {
sem_wait(&rw_mutex);
/* write critical section */
sem_post(&rw_mutex);
}
// Writer starvation: new readers keep joining → writer waits indefinitely
// Fair fix: add sem_t order(1); both reader and writer acquire order first写者饥饿:若读者源源不断,writers 永远等待。改进:加第三信号量 order,强制按 FIFO 到达顺序(公平方案)。
测试用例(3 个)
R R W R WR1 R2 R4
W3
W5
W R R WR2 R3
W1
W4
W W WW1
W2
W3
Peterson 算法
纯软件互斥,无需硬件原子指令,适用于两个进程。在现代乱序 CPU 上需要内存屏障。
互斥性证明(反证)
- 假设 P0 和 P1 同时在临界区
- P0 进入 CS → while 退出条件:flag[1]==false 或 turn==0
- P1 进入 CS → while 退出条件:flag[0]==false 或 turn==1
- 两者都在 CS → flag[0]=flag[1]=true,且 turn==0 且 turn==1,矛盾
// Peterson's Algorithm — mutual exclusion for 2 processes
bool flag[2] = {false, false};
int turn;
void entry(int i) { // i ∈ {0, 1}
flag[i] = true; // declare intent to enter CS
turn = 1 - i; // yield: let the other go first
while (flag[1-i] && turn == 1-i) {} // busy-wait
}
void exit_(int i) {
flag[i] = false; // withdraw intent
}
// Proof: if both in CS → flag[0]=flag[1]=true AND turn==0 AND turn==1 → impossible
// Satisfies: mutual exclusion, progress, bounded waiting (≤ 1 turn wait)现代 CPU 注意:现代 CPU 的指令重排可能破坏 Peterson 的假设。实际应用中需要 std::atomic<bool> 或显式内存屏障 atomic_thread_fence。
测试用例(3 个)
1P0
P1
2P0
P1
P0
P1
3P0
P1
P0
P1
P0
P1
死锁检测
类似银行家安全算法,但 finish[i] 初始化不同:若进程 i 未持有任何资源则 finish[i] = true(认为它不会占用资源阻塞他人)。
Coffman 四个必要条件(1971)
- 互斥:资源不可共享
- 持有并等待:进程持有资源同时等待其他资源
- 不可抢占:资源不能被强制取走
- 循环等待:P1→P2→…→P1 形成环路
检测算法
- work = available;finish[i] = (allocation[i] 全零)
- 找 i 使得 !finish[i] 且 request[i] ≤ work
- 若找到:work += allocation[i],finish[i] = true,重复
- finish[i] == false 的进程即处于死锁状态
// Deadlock Detection (resource allocation graph reduction)
// finish[i] starts true only if process i holds NO resources
vector<int> detect(int n, int m,
vector<int>& avail,
vector<vector<int>>& alloc,
vector<vector<int>>& req) {
vector<int> work = avail;
vector<bool> finish(n);
for (int i = 0; i < n; i++) {
finish[i] = true;
for (int j = 0; j < m; j++)
if (alloc[i][j]) { finish[i] = false; break; }
}
bool changed = true;
while (changed) {
changed = false;
for (int i = 0; i < n; i++) {
if (finish[i]) continue;
bool ok = true;
for (int j = 0; j < m; j++)
if (req[i][j] > work[j]) { ok = false; break; }
if (ok) {
for (int j = 0; j < m; j++) work[j] += alloc[i][j];
finish[i] = true; changed = true;
}
}
}
vector<int> dl;
for (int i = 0; i < n; i++) if (!finish[i]) dl.push_back(i);
return dl; // deadlocked process indices
}恢复策略:进程终止(全部终止或逐一终止直到死锁解除)、资源抢占(回滚代价最小的牺牲者)。
测试用例(3 个)
3 2
0 0
1 0
0 1
0 0
0 1
1 0
0 0P0 P1
2 1
0
1
1
1
1P0 P1
2 1
1
1
1
0
1none