正文
计算机硬件发展到今天,不管是专业服务器还是PC,甚至于最普遍的移动设备基本上都是多核CPU,程序的并发执行可以更加充分利用这些计算资源。除此之后,为了协调CPU与外设(如磁盘)的速度差异,我们也需要并发。本文是笔者学习清华大学和UCSD(加州大学圣迭戈分校)的操作系统课程的笔记和总结,以及自己的思考和实践。我们知道,系统调用以及执行流程的切换都是依靠软中断。禁用中断之后,进程(线程)就不会被切换出去,从而保证代码段能执行结束。但坏处也很明显,由于中断被禁用,如果临界区代码一直执行,其他进程就没机会执行了。而且,只能禁止单个CPU的中断。
基于软件同步:即基于代码实现同步互斥,比较有名的是peterson算法,用来解决两个进程对临界区的互斥访问问题。
基于原子操作原语的方法:上述两种方式都比较复杂,我们需要更加高级的武器。支持并发的语言都提供了锁(lock)这个概念,在现实生活中也很好理解,如果只能一个人在屋子里,那么进去之后就锁上,出来的时候再打开锁;没有锁的人只能在外面等着。在编程语言中,大概是这样样子的:
acquire(lock) critical section release(lock) acquire,release实现的也就是entry section和 exit section的功能,上面的代码是面向过程的写法,面向对象一般写成lock.acquire和lock.release,或者使用RALL(Resource Acquisition Is Initialization)来避免release函数未被调用到的问题。 lock的实现需要基于硬件提供的“ 原子操作指令”,这些操作虽然理解起来是几步操作,但硬件保证其原子性。比较常见的原子操作指令包括 test and set、 compare and swap,接下来通过test and set来看看lock的实现。对应的acqurie和release的伪码如下: void acquire(int *lock){ while(TestAndSet(*lock)); } void release(int *lock){ *lock = 0; } 在acquire函数中,如果TestAndSet返回1,那么while循环就一直执行(也就是在这里等待),直到另一个线程调用release。当然,这个实现看起来不太好,主要是等待的线程会不停的检查,浪费CPU,这个问题称之为 忙等待(busy-wait or spin-wait),所以这个lock的实现也叫自旋锁spinlock。解决办法是如果需要等待,那么该线程主动交出执行权,让其他线程有机会执行,这种方式称之为 让权等待(yield-wait or sleep-wait),应用开发人员使用的互斥锁一都是指这种情况。
Procedure P(Var S:Semaphore); Begin S:=S-1; If S<0 then w(S) {执行P操作的线程插入等待队列} End;
V操作表示释放一个资源,V操作定义:S=S+1,若S>0则执行V操作的线程继续执行;若S<0,则从阻塞状态唤醒一个线程,并将其插入就绪队列。伪代码如下:Procedure V(Var S:Semaphore); Begin S:=S+1 If S<=0 then R(s) {从阻塞队列中唤醒一个线程} End;
信号量在现实生活中很容易找到对比的例子,比如银行的窗口数量就是S,在窗口办理业务就是P操作,业务办理结束就是V操作。
根据S初始值的不同,semaphore就有不同的作用。如果S初始值为1,那么这个semaphore就是一个mutex semaphore,效果就是临界区的互斥访问。如果S初始值为0,那么就是用来做条件同步,效果就是必须等待某些条件发生。如果S初始值为N(N一般大于1),那么就是用来限制并发数目,也被称之为counting semaphone。
后文会利用具体的例子(生产者消费者问题)来阐述semaphore上面的三种用法。
mutex global_mutex condition global_cv bool p
可能需要等待的线程(code snippet1):acquire(global_mutex) while(!p): # 如果条件不满足,则需要等待 global_cv.wait(global_mutex) # wait操作将当前线程阻塞在等到队列中,释放对管程的互斥访问
// do a lot of thing release(global_mutex)
另外一个线程acquire(global_mutex) // do sth make "p" is true global_cv.signal() # 唤醒在管程中等待的线程
release(global_mutex)
看了上面的代码,可能会产生两个疑问:第一个是code snippet1中,global-wait在互斥锁内,那怎么实现将当前线程阻塞的呢,必须要释放(release)global_mutex才行啊?可以看到,wait操作其中一个参数是global_mutex, 可以把wait操作理解成下面的伪码:contidion::wait(mutex& mut){ release(mut) yield current thread # 当前线程释放控制权 acquire(mut) }
另外一个疑问,对条件p的判断用了while循环,为什么if不行呢?这个疑问,大多数人(包括写这篇文章之前的我:-D)都是知其然而不知其所以然。while形式称之为hansen模式,if形式称之为hoare模式。这两种模式对应着不同的管程实现,具体来说是调用signal操作时,是继续在当前线程执行,还是切换到被唤醒的线程执行?下图来自清华大学的网络课程《 操作系统》:可以看到,在T2线程调用x.signal之后,hansen模式会继续执行,所以当重新回到wait线程的时候,可能情况已经发生了变化,所以需要重新判断;而Hoare模式会立刻从T2线程切换到T1线程。Hansen看起来变得复杂,引入了不确定性,但是相比hoare模式,少了一次线程的切换,在真实的操作系统中就是这么实现的,所以我们编码的时候都需要用while循环判断条件是否成立。
从上面分析生产者消费者需要解决的同步问题(互斥与条件同步),都能用信号量来解决。对于互斥,信号量S为1就可以;对于消费者等待的情况,信号量初始值为0即可;对于生产者等待的情况,信号量初始值为消息队列长度即可。linux下提供了信号量的实现,头文件在/usr/include/semaphore.h,代码实现如下(producer_consumer_semaphore.cpp):
1 #include <stdio.h> 2 #include <pthread.h> 3 #include <time.h> 4 #include <stdlib.h> 5 #include <unistd.h> 6 #include <semaphore.h> 7 8 #define BUFF_SIZE 3 9 #define PRODUCE_THREAD_SIZE 5 10 int g_buff[BUFF_SIZE]; 11 int g_write_index = 0; 12 int g_read_index = 0; 13 14 sem_t lock; 15 sem_t consume_sem, produce_sem; 16 17 18 void* produce(void *ptr){ 19 int idx = *(int*)ptr; 20 printf("in produce %d %d %d\n",idx, g_write_index, g_read_index); 21 while(1){ 22 sem_wait(&produce_sem); # 限制了生产者并发的数目 23 24 sem_wait(&lock); # 对临界区的访问要加锁 25 g_buff[g_write_index] = idx; 26 g_write_index = (g_write_index + 1) % BUFF_SIZE; 27 sem_post(&lock); 28 29 sem_post(&consume_sem); 30 } 31 return NULL; 32 } 33 34 void* consume(void *ptr){ 35 while(1){ 36 sem_wait(&consume_sem); 37 sem_wait(&lock); 38 int data = g_buff[g_read_index]; 39 g_buff[g_read_index] = -1; 40 g_read_index = (g_read_index + 1) % BUFF_SIZE; 41 printf("consume %d %d %d\n", data, g_read_index, g_write_index); 42 sem_post(&lock); 43 sem_post(&produce_sem); 44 } 45 return NULL; 46 } 47 48 int main(int argc, char * argv[]){ 49 pthread_t con; 50 pthread_t pros[PRODUCE_THREAD_SIZE]; 51 sem_init(&lock, 0, 1); 52 sem_init(&consume_sem,0, 0); 53 sem_init(&produce_sem,0, BUFF_SIZE); 54 55 pthread_create(&con, NULL, consume, NULL); 56 int thread_args[PRODUCE_THREAD_SIZE]; 57 for(int i = 0; i < PRODUCE_THREAD_SIZE; i++){ 58 thread_args[i] = i + 1; 59 pthread_create(&pros[i], NULL, produce, (thread_args + i)); 60 } 61 62 pthread_join(con,0); 63 for(int i = 0; i < PRODUCE_THREAD_SIZE; i++) 64 pthread_join(pros[i],0); 65 66 sem_destroy(&lock); 67 sem_destroy(&consume_sem); 68 sem_destroy(&produce_sem); 69 70 return 0; 71 }代码中,消息队列的大小为3,produce_sem的初始值一定与消息队列的大小相同。总共有5个生产者线程,多余可并发的数量(produce_sem),因此很大概率会有生产者线程阻塞在produce_sem对应的等待队列。 另外两点需要注意:第一点在produce和consume线程中都是需要加锁(互斥锁lock),因为信号量是可以并发的,需要对临界资源(g_buff,g_read_index,g_write_index)互斥访问。另外,在produce线程,需要先判断能否并发,然后再对临界区加锁,二者的顺序不能反过来,否则会产生死锁。
上面的代码用:g++ -lpthread producer_consumer_semaphore.cpp producer_consumer_semaphore, 然后运行 ./producer_consumer_semaphore 即可
上面的代码用:g++ -lpthread producer_consumer_monitor.cpp producer_consumer_monitor, 然后运行 ./producer_consumer_monitor 即可
并发可能带来互斥、饥饿、死锁等问题,操作系统为了解决这些问题提供了很多的支持,从最底层的禁止硬件终端和原子操作指令(test-and-set),到更高级的同步原语:锁(互斥锁、自旋锁)、信号量、管程。管程是一种抽象数据结构,编程中使用互斥锁配合信号量使用。管程有两种不同的模式:hansen vs hoare,区别在于signal操作之后是否立即切换到被唤醒线程,实际的操作系统中,多使用hansen模式。