协作进程是可以与在系统内执行的其他进程互相影响的进程,他们可以直接共享逻辑地址空间(代码和数据),或者只通过文件或消息来共享数据。多个进程并发访问和操作同一数据且执行结果与访问发生的特定顺序有关,称为竞争条件。为了避免竞争条件,需要确保一段时间内只有一个进程能操作变量。为了实现这种保证,要求进行一定形式的进程同步。
1 临界区问题
每个进程有一个代码段称为临界区
(criticalsection),在该区中进程可能改变共同变量、更新一个表、写一个文件等,当一个进程进入临界区,没有其他进程可被允许在临界区内执行。临界区问题是设计一个以便进程协作的协议,每个进程必须请求允许进入其临界区,实现这一请求的代码段称为进入区
,临界区之后可有退出区
,其他代码段称为剩余区
,如下图所示。
解决临界区问题需要满足如下三个要求:
互斥
(mutual exclusion):如果某进程在其临界区内,那么其他进程都不能在其临界区内执行。前进
(progress):如果没有进程在其临界区内执行且有进程需要进入临界区,那么只有那些不再剩余区内执行的进程可参加选择。有限等待
(bounded waiting):从一个进程做出进入临界区的请求,直到该请求允许为止,其他进程允许进入其临界区的次数有上限。
有两种方法用于处理操作系统内的临界区问题:抢占内核
(preemptive kernel)与非抢占内核
(nonpreemptive kernel)。抢占内核允许处于内核模式的进程被抢占,非抢占内核不允许处于内核模式的进程被抢占。
2 临界区的实现方法
临界区的实现方法有三种:禁用中断、软件方法和更高级的抽象方法。禁用硬件中断后,无法进行上下文切换,因此没有并发。由于进程无法被停止,可能导致其他进程处于饥饿状态。一种经典的基于软件的临界区实现是Peterson算法,在该算法中进程的结构如下图所示。Peterson算法需要在两个进程之间共享两个数据项:turn和flag[],turn表示哪个进程可以进入临界区,数据flag表示哪个进程想要进入临界区。
操作系统也提供更高级的编程抽象来简化进程同步,例如:锁、信号量,并通过硬件原语来构建。现代操作系统都提供一些特殊的原子操作指令,如TestAndSet(TS)指令和Swap指令。使用这两种原子操作指令实现互斥的做法如下面两幅图所示。
boolean TestAndSet(boolean *target){
boolean rv = *target;
*targe = TRUE;
return rv;
}
do{
while(TestAndSet(&lock))
do nothing
critical section
lock = FALSE;
remainder section
}while(TRUE);
void Swap(boolean *a, boolean *b){
boolean temp = *a;
*a = *b;
*b = temp;
}
do{
key = TRUE;
while(key == TRUE)
Swap(&lock, &key);
cirtical section
lock = FALSE;
remainder section
}while(TRUE);
上面的算法实现了互斥,但是没有解决有限等待要求,下面是一种使用指令TestAndSet实现的算法,该算法满足所有临界区的三个要求。算法共用数据结构如下:
boolean waiting[n];
boolean lock;
这些数据结构均初始化为false,只有当 waiting[i] == false 或者 lock == false 时,进程才进入临界区。
do{
waiting[i] = TRUE
key = TRUE
while(waiting[i] && key)
key = TestAndSet(&lock);
waiting[i] = FALSE;
critical section
j = (i + 1) % n;
while((j != i) && !waiting[j])
j = (j + 1) % n;
if(j == i)
lock = FALSE;
else
waiting[j] = FALSE;
remainder section
}while(TRUE);
3 信号量
基于硬件的临界区问题的解决方案使用比较复杂,为了解决这个困难,可以使用称为信号量
(semaphore)的同步工具。信号量S是个整数变量,除了初始化外,它只能通过两个标准原子操作:wait()
和signal()
来访问,原来称为P()操作和V()操作,源自荷兰语测试和增加。在wait()和signal()操作中,对信号量整型值的修改必须不可分地执行。
wait(S){
while(s <= 0)
do nothing
S--;
}
signal(S){
S++;
}
通常操作系统区分计数信号量与二进制信号量(互斥锁),计数信号量的值域不受限制,而二进制信号量的值只能为0或1。计数信号量可以用来控制访问具有若干个实例的某种资源,初始化为可用资源的哦数量。当每个进程需要使用资源时,需要对该信号量执行wait()操作;当 进程释放资源时,需要对该信号量执行signal()操作。当信号量的计数为0时,需要使用资源的进程将被阻塞,直到其计数大于0。
这里定义的信号量的主要缺点是都要气忙等待
(busy waiting),当一个进程位于其临界区时,任何其他试图进入其临界区的进程都必须在其进入代码中持续地循环。这种类型的信号量也称为自旋锁
(spinlock),因为进程在其等待锁时仍然在运行。自旋锁的优点在于等待锁时不需要进行上下文切换,如果锁的占用时间短,那么自旋锁很有用。
为了克服忙等待,可以修改信号量操作wait()和signal()的定义,当一个进程执行wait()操作时,如果信号量不为正,则阻塞自己。然后在其他进程执行signal()操作后,通过wakeup()操作重新执行。该种信号量定义成如下结构:
typedef struct{
int value;
struct process *list;
}semaphore;
每个进程都有一个整型值和一个进程链表,当一个进程必须等待信号量时,将其加入到进程链表。使用该种信号量的wait()和signal()操作如下:
wait(semaphore *S){
s->value--;
if(s->value < 0){
add this process to S->list;
block(); //挂起进程
}
}
signal(semaphore *S){
S->value++;
if(S->value <= 0){
remove a process P from S->list;
wakeup(); //唤醒进程
}
}
信号量的关键之处是它们原子地执行,必须确保没有两个进程能同时对同一信号量执行操作wait()和signal()。具有等待队列的信号量可能导致这样的问题:两个或多个进程无限地等待一个事件,而该事件只能由这些等待进程之一来产生,这些进程就称为死锁
(deadlocked)。与死锁相关的另一个问题是是饥饿
(starvation),即进程在信号量内无限期等待。
下面是使用信号量解决生产者-消费者问题的例子,问题如下图所示,一个或多个生产者在生成数据后放在一个缓冲区里,单个消费者从缓冲区取出数据处理,任何时刻只能有一个生产者或消费者可访问缓冲区。为了实现互斥访问,任何时刻只能有一个线程操作缓冲区。为了实现条件同步,在缓冲区空时消费者必须等待生产者,在缓冲区满时生产者必须等待消费者。
为了实现每个约束,使用了三个信号量,其中二进制信号量mutex表示互斥关系,资源信号量fullBuffer表示有数据才能读,资源信号量emptyBuffer表示有空闲才能写。代码如下:
mutext = new Semaphore(1);
fullBuffer = new Semaphore(0);
emptyBuffer = new Semaphore(n);
// 生产者代码
Deposit(c){
emptyBuffer->P(); //测试是否有空闲
mutex->P(); //是否在线程访问缓冲区
add c to the buffer;
mutex->V();
fullBuffer->V(); //更新数据
}
// 消费者代码
Remove(c){
fullBuffer->P(); //测试是否有数据
mutex->P();
add c to the buffer;
mutex->V();
emptyBuffer->V(); //释放空闲
}
使用信号量解决生产者-消费者问题是存在缺点的,首先代码编写比较困难,需要数量信号量机制。否则容易出错,比如忘记信号量已经被占用,或者P()/V()操作的顺序颠倒,进而导致死锁问题。
4 管程
使用信号量解决同步问题是,进程需要在进入临界区之前执行wait()操作,之后执行signal()操作。如果这一顺序不被遵守,那么两个进程会同时出现在临界区。当信号量不正确地用来解决临界问题时,会很容易地产生各种类型的错误。为了处理这些错误,提出了管程
(monitor)。
管程类型的定义包括一组变量的声明和对这些变量操作的子程序和函数的实现,管程不能直接为各个进程锁使用,在管程内定义的子程序只能访问位于管程内局部声明的变量。管程结构确保一次只能有一个进程在管程内运行,不需要显式地编写同步代码。但是,管程还是需要一些额外的同步机制,这些可由条件变量
(condition)来提供,每个条件变量表示一种等待原因,对应一个等待队列,这是管程内的等待机制。管程与信号量的关键不同在于:管程中的线程可以临时放弃对管程的互斥访问,等待事件出现时再恢复。管程的结构如下图所示:
使用管程解决生产者-消费者问题的代码如下,优于信号量方案的地方在于,检查条件变量notFull和notEmpty的代码在管程互斥操作内部,所以线程可以放弃对管程的占用,直到事件出现。
Lock lock; // 能否进入管程的互斥变量
int count = 0; // 资源数量
Condition notFull, notEmpty; // 条件变量,分别表示是否有空间和是否有数据
// 生产者
Deposit(c){
lock->Acquire(); // 进入管程
while(count == n) // 是否有空闲
notFull.Wait(&lock);
add c to the buffer;
count++;
notEmpty.Signal();
lock->Release(); // 释放管程
}
// 消费者
Remove(c){
lock->Acquire();
while(count == 0)
notEmpty.wait(&lock);
remove c from the buffer;
count--;
norFull.Signal();
lock->Release();
}
5 经典同步问题
第一个经典同步问题是哲学家就餐问题,问题描述如下:5个哲学家围绕着一张圆桌而坐,桌子上放着五把叉子,每两个哲学家之间一把。哲学家的动作有思考和吃饭两种,吃饭时需要同时拿到左右两把叉子,思考是将叉子放回原处。问题是如何保证哲学家们的动作有序进行,比如不会有人拿不到叉子。
使用信号量的解决方案如下:使用五个信号量fork[5]对应五把叉子的状态,代码如下。
semaphore fork[5];
void philosopher(int i){
while(TRUE){
think(); // 哲学家思考
if(i%2 == 0){ // 将哲学家按照编号的奇偶性分类处理,以避免死锁问题。
P(fork[i]); // 偶数哲学家先拿左边叉子
P(fork[(i+1) % 5]);
}
else{
P(fork[(i+1) % 5]); // 奇数哲学家先拿右边叉子
P(fork[i]);
}
eat(); // 哲学家吃饭
V(fork[i]);
V(fork[(i+1) % 5]);
}
}
第二个经典同步问题是读者-写者问题,该问题可以看作是生产者-消费者问题的扩展,同时有两类(读者和写者)多个使用者使用共享数据,其中读者只能读取数据,写者可以读取和修改数据。读者-写者问题需要满足如下约束:1)同一时刻,允许有多个读者同时读。2)读者和写者不能同时使用数据。3)同一时刻,只能有一个写者可以使用数据。
首先,我们使用信号量来解决读者-写者问题,需要用到三个信号量来表示三个约束:WriteMutex,Rcount和CountMutex,代码如下。这段代码实现了读者优先的策略。
semaphore WriteMutex(1); //控制读写操作互斥
semaphore CountMutex(1); //控制对Rcount的操作互斥
int Rcount = 0; //正在进行读操作的读者数目
// 写者代码
void witre(d){
P(WriteMutex);
witre(d);
V(WriteMutex);
}
//读者代码
void read(d){
P(CountMutex);
if(Rcount == 0) //如果是第一个读者,那么需要申请WriteMutex控制权
P(WriteMutex);
++Rcount; // 否则直接给Rcount+1
V(CountMutex);
read;
P(CountMutex);
--Rcount;
if(Rcount == 0) //如果是最后一个读者,那么释放对WirteMutex控制权
V(WirteMutex);
V(CountMutex);
}
接下来,是使用管程解决读者-写者问题的方案,该方案实现了写者优先的策略。
Class Database{ // 管程采用了面向对象编程的思想,将一个管程的状态变量和一些方法都封装到Database类中。
private AR = 0; //active readers
private AW = 0; //active writers
private WR = 0; //waiting readers
private WW = 0; //waiting writers
private Lock lock; //管程的互斥访问
private Condition okRead; // 可进行读操作的条件变量
private Condition okWrite; // 可进行写操作的条件变量
...
}
下面是读者的操作。
Public Database::Read(){
// wait until no writers;
StartRead();
read database;
// check out - wake up waiting writers
DoneRead();
}
Public Database::StartRead(){
lock.Acquire();
while(AW+WW > 0){ 只要有写者执行或等待,读者就不能操作
WR++;
okRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
}
Public Database::DoneRead(){
lock.Acquire();
AR--;
if (AR == 0 && WW > 0)
okWrite.signal();
lock.Release();
}
写者的操作
Public Database::Wirte(){
// wait until no readers/writers
StartWrite();
write database;
// check out - wake up waiting readers/writers
DoneWrite();
}
Public Database::StartWrite(){
lock.Acquire();
while( AW+AR) > 0){ // 如果有正在执行的写者或读者则等待
WW++;
okWrite.wait(&lock);
WW--;
}
AW++;
lock.Release();
}
Public Database::DoneWrite(){
lock.Acquire();
AW--;
if(WW > 0){ // 先判断是否有写者等待,写者优先
okWrite.signal();
}
else if(WR > 0){ //再判断是否有读者等待
okRead.signal();
}
lock.Release();
}