本节将讨论Linux并发编程中经典的生产者/消费者(producer-consumer)问题。该问题涉及一个大小限定的有界缓冲区(bounded buffer)和两类线程或进程(生产者和消费者)。
在缓冲区中有可用空间时,一个或一组生产者(线程或进程)将创建的产品(数据条目)放入缓冲区,然后由一个或一组消费者(线程或进程)提取这些产品。产品在生产者和消费者之间通过某种类型的IPC传递。
在生产者/消费者问题中,生产者线程必须在缓冲区中有可用空间后才能向其中放置内容,否则将阻塞(进入休眠状态)直到出现下一个可用的空位置。生产者线程可使用互斥量原子性地检查缓冲区,而不受其他线程干扰。当发现缓冲区已满后,生产者阻塞自己并在缓冲区变为非满时被唤醒,这可由条件变量实现。
此处,QUEUE_SIZE按照产品数重新定义。互斥量gtQueLock用于保护全局队列gtQueue和两个条件变量。条件变量gtPrdCond用于队列由满变为非满时通知(唤醒)生产者线程,而gtCsmCond用于队列由空变为非空时通知消费者线程。
若首先创建并启动生产者线程,再立即或稍候创建消费者线程,则不需要条件变量gtCsmCond(队列中始终有产品)。本节为全面展现线程间的同步,约定消费者线程创建和启动完毕之后,再创建生产者线程。这样,所有消费者线程将会阻塞,在条件变量gtCsmCond的线程列表里条件状态的改变。生产者线程并开始“产出”后,广播通知所有消费者线程。反之,因为消费者线程不会全部阻塞,可单播唤醒某个消费者。
初始化队列和创建线程的主函数如下:
1 int main(void) 2 { 3 InitQue(>Queue); 4 srand(getpid()); //初始化随机函数发生器 5 6 pthread_t aThrd[CONSUMER_NUM+PRODUCER_NUM]; 7 int dwThrdIdx; 8 for(dwThrdIdx = 0; dwThrdIdx < CONSUMER_NUM; dwThrdIdx++) 9 { //创建CONSUMER_NUM个消费者线程,传入(void*)dwThrdIdx作为ConsumerThread的参数 10 pthread_create(&aThrd[dwThrdIdx], NULL, ConsumerThread, (void*)dwThrdIdx); 11 } 12 sleep(2); 13 for(dwThrdIdx = 0; dwThrdIdx < PRODUCER_NUM; dwThrdIdx++) 14 { 15 pthread_create(&aThrd[dwThrdIdx], NULL, ProducerThread, (void*)dwThrdIdx); 16 } 17 while(1); 18 return 0 ; 19 } |
生产者线程启动例程ProducerThread()实现如下:
1 void *ProducerThread(void *pvArg) 2 { 3 pthread_detach(pthread_self()); 4 int dwThrdNo = (int)pvArg; 5 while(1) 6 { 7 pthread_mutex_lock(>QueLock); 8 while(IsQueFull(>Queue)) //队列由满变为非满时,生产者线程被唤醒 9 pthread_cond_wait(>PrdCond, >QueLock); 10 11 EnterQue(>Queue, GetQueTail(>Queue)); //将队列元素下标作为元素值入队 12 if(QueDataNum(>Queue) == 1) //当生产者开始产出后,通知(唤醒)消费者线程 13 pthread_cond_broadcast(>CsmCond); 14 printf("[Producer %2u]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue)); 15 16 pthread_mutex_unlock(>QueLock); 17 sleep(rand()%DELAY_TIME + 1); 18 } 19 } 队列变满时,生产者线程进入休眠状态。消费者线程取出产品,将队列由满变为非满时,生产者线程再次被唤醒。 消费者线程启动例程ConsumerThread()实现如下: 1 void *ConsumerThread(void *pvArg) 2 { 3 pthread_detach(pthread_self()); 4 int dwThrdNo = (int)pvArg; 5 while(1) 6 { 7 pthread_mutex_lock(>QueLock); 8 while(IsQueEmpty(>Queue)) //队列由空变为非空时,消费者线程将被唤醒 9 pthread_cond_wait(>CsmCond, >QueLock); 10 11 if(GetQueHead(>Queue) != GetQueHeadData(>Queue)) 12 { 13 printf("[Consumer %2u]Product: %d, Expect: %d\n", dwThrdNo, 14 GetQueHead(>Queue), GetQueHeadData(>Queue)); 15 exit(0); 16 } 17 LeaveQue(>Queue); 18 if(QueDataNum(>Queue) == (PRD_NUM-1)) //当队列由满变为非满时,通知(唤醒)生产者线程 19 pthread_cond_broadcast(>PrdCond); 20 printf("[Consumer %2u]Current Product Num: %u\n", dwThrdNo, QueDataNum(>Queue)); 21 22 pthread_mutex_unlock(>QueLock); 23 sleep(rand()%DELAY_TIME + 1); 24 } 25 } |
当队列元素值不符合期望时,打印出错提示并调用exit()终止进程。
编译链接后,截取部分运行结果如下(因每次执行时线程延时随机值,故执行顺序可能不同):
[wangxiaoyuan_@localhost Thread]$ gcc -Wall -o procon procon.c -pthread
[wangxiaoyuan_@localhost Thread]$ ./procon
[Producer 4]Current Product Num: 1
[Consumer 1]Current Product Num: 0
[Producer 3]Current Product Num: 1
[Consumer 0]Current Product Num: 0
[Producer 2]Current Product Num: 1
[Consumer 2]Current Product Num: 0
[Producer 1]Current Product Num: 1
[Producer 0]Current Product Num: 2
//... ...
[Consumer 0]Current Product Num: 17
[Producer 3]Current Product Num: 18
[Producer 1]Current Product Num: 19
[Consumer 2]Current Product Num: 18
[Producer 4]Current Product Num: 19
[Producer 2]Current Product Num: 20
[Consumer 1]Current Product Num: 19
[Consumer 2]Current Product Num: 18
[Producer 0]Current Product Num: 19
[Producer 4]Current Product Num: 20
//Ctrl + C
2.3 注意事项
本节基于生产者/消费者问题的多线程实现,讨论一些编程注意事项。为简化书写,省略线程相关函数名的前缀"pthread_"。
1. 互斥量实际上保护的是临界区中被多个线程或进程共享的数据(shared data)。此外,互斥量是协作性(cooperative)锁,无法禁止绕过这种机制的访问。例如,若共享数据为一个链表,则操作该链表的所有线程都应该在实际操作前获取该互斥量(但无法防止某个线程不首先获取该互斥量就操作链表)。
对本文实现而言,互斥量gtQueLock保护全局队列gtQueue。但因为各生产者(或消费者)线程启动例程相同,内部对该队列的操作逻辑和顺序相同,故gtQueLock看起来在对队列函数加锁。进而,在生产者与消费者线程之间,均在加锁期间操作队列,因此对队列函数加锁近似等效于对队列数据加锁。但仍需认识到,这种协作对于频繁调用的基础性函数(如库函数)而言并不现实。当某些调用未经加锁直接操作时,对于其他使用互斥量的调用而言,互斥保护就失去意义。本文也可考虑在队列函数集内加锁(互斥量或读写锁),但因为互斥量gtQueLock同时还保护条件变量,队列内再加锁就稍显复杂,而且线程内互斥量同名时容易产生死锁。
因此,设计时最好约定所有线程遵守相同的数据访问规则。
2. 应尽量减少由一个互斥量锁住的代码量,以增强并发性。
例如,生产者线程启动例程ProducerThread()中,对pvArg进行赋值操作时并不需要互斥量保护(不属于临界区)。再者,若各生产者线程之间与生产者/消费者线程之间共享不同的数据,则可使用两个互斥量,但复杂度也随之上升。
多线程软件设计经常要考虑加锁粒度的折中。若使用粗粒度锁定(coarse-grained locking),即长期持有锁定以便尽可能降低获取和释放锁的开销,则可能有很多线程阻塞等待相同的锁,从而降低并发性;若使用细粒度锁定(fine-grained locking),即仅在必要时锁定并在不必要时尽快解锁以便尽可能提高并发性,则较多的锁开销可能会降低系统性能,而且代码变得相当复杂。因此,设计时应划分适当数目的锁来保护数据,以在代码复杂性和性能优化之间找好平衡点。
3. 条件本身是由互斥量保护的。当线程调用cond_wait()函数基于条件变量阻塞之前,应先锁定互斥量以避免线程间的竞争和饥饿情况(否则将导致未定义的行为)。
调用cond_wait()时,线程将锁定的互斥量传递给该函数,对条件进行保护。该函数将调用线程放到等待条件的线程列表上,然后对互斥量解锁。这两个操作是原子性的,即关闭了条件检查和线程进入休眠状态等待条件改变这两个操作之间的时间通道,这样线程就不会错过条件的任何变化。该函数对互斥量解锁后,其它线程可以获得加锁的权利,并访问和修改临界资源(条件状态)。一旦其它某个线程改变了条件使其为真,该线程将通知相应的条件变量唤醒一个或多个正被该条件变量阻塞的线程。cond_wait()返回时,互斥量再次被锁定并被调用线程拥有,即使返回错误时也是如此。
可见,cond_wait()函数解锁和阻塞调用线程的“原子性”针对其他线程访问互斥量和条件变量而言。若线程B在即将阻塞的线程A解锁之后获得互斥量,则线程随后B调用cond_ signal/broadcast()函数通告条件状态变化时,该信号必然在线程A阻塞之后发出(否则将遗漏该信号)。
使用多个互斥量保护基于相同条件变量的cond_wait()调用时,其效果未定义。即当线程基于条件变量阻塞时,该条件变量绑定到唯一的互斥量上,且这种(动态)绑定将在cond_wait()调用返回时结束。
4. 函数cond_signal()唤醒被阻塞在条件变量上的至少一个线程。在多处理器上,该函数在不同处理器上同时单播信号时可能唤醒多个线程。当多个线程阻塞在条件变量上时,哪个或哪些线程被唤醒由线程的调度策略(scheduling policy)所决定。
cond_broadcast()会唤醒阻塞在条件变量上的所有线程。这些线程被唤醒后将再次竞争相应的互斥量。唤醒多个线程的典型场景是读出者与写入者问题。当一个写入者完成访问并释放相应的锁后,它希望唤醒所有正在排队的读出者,因为同时允许多个读出者访问。
若已确定只有一个等待者需要唤醒,且唤醒哪个等待者无关紧要,则可使用单播发送;所有其他情况下都应使用广播发送(更为安全)。
若当前没有任何线程基于条件变量阻塞,则调用cond_signal/broadcast()不起作用。
5. 虚假唤醒(spurious wakeup)指没有线程明确调用cond_signal/broadcast()时,cond_wait()偶尔也会返回;或者条件状态尚不满足时就调用cond_signal/broadcast()。此时,线程虽被唤醒但条件并不成立,若不再次检查条件而往下执行,很可能导致后续的处理出现错误。因此,当从cond_wait()返回时,线程应重新测试条件成立与否。该过程一般用while循环实现,即:
1 pthread_mutex_lock();
2 while(IsConditionFalse)
3 pthread_cond_wait();
4 //Do something
5 pthread_mutex_unlock();
使用while循环不仅能避免虚假唤醒造成的错误,还能避免唤醒线程间竞争导致的“惊群效应”。
例如,ProducerThread()内,调用cond_wait()对互斥量自动解锁后,在允许消费者线程修改条件的同时,也允许其他生产者线程调用该函数依次阻塞。当这些线程被唤醒时(如队列由满变为非满),会再次竞争相应的互斥量。获得互斥量的那个线程进入临界区处理,这可能改变测试条件(如产出一件使得队列再次变满)。该线程释放互斥量后,其他某个处于等待状态的线程获得该互斥量时,虽然cond_wait()成功返回但很可能条件已不成立。因此,调用cond_wait()成功返回时,线程需要重新测试条件是否满足。
调用cond_timedwait()的情况与之类似。由于等待超时与条件改变之间存在无法避免的竞争,当该函数返回超时错误时条件仍可能成立。
6. 若线程未持有与条件相关联的互斥量,则调用cond_signal/broadcast()可能会产生唤醒丢失问题。
当满足以下所有条件时,即会出现唤醒丢失问题:
1) 一个线程调用cond_signal/broadcast()函数;
2) 另一个线程已测试该条件,但尚未调用cond_wait()函数;
3) 没有正处于阻塞等待状态的线程。
只要仅在持有关联互斥量的同时修改所测试的条件,即可调用cond_signal/broadcast(),而无论这些函数是否持有关联的互斥量。
可见,调用cond_signal/broadcast()时若无线程等待条件变量,则该信号将被丢失。因此,发送信号前最好检查下是否有正在等待的线程。例如,可维护一个等待线程计数器,在触发条件变量前检查该计数器。本文线程代码内对队列元素数目的检查与之类似。
对于等待条件的线程而言,若错失信号(如启动过迟),则会一直阻塞到其它线程再次发送信号到该条件变量。
7. ProducerThread()中,cond_broadcast()函数由当前锁住互斥量的生产者线程调用,本函数将发送信号给该互斥量所关联的条件变量。当该条件变量被发送信号后,系统立即调度等待在其上的消费者线程;该线程开始运行后试图获取互斥量,但该互斥量仍由生产者线程所持有。因此被唤醒的消费者线程被迫进入休眠状态,直至生产者线程释放互斥量后再次被唤醒。这种不必要的上下文切换可能会严重影响性能。
为避免这种加锁冲突(以提高效率),可将判断队列元素数目的语句值赋给一个局部变量bHasOnePrd,直到释放互斥量gtQueLock后才判断bHasOnePrd并向与之关联的条件变量gtCsmCond发送信号。
Posix标准规定,调用cond_signal/broadcast()的线程不必是与之关联的互斥量的当前拥有者,即允许在释放互斥量后才给与之关联的条件变量发送信号。若程序不关心线程可预知的调度行为,最好在锁定区域以外调用cond_signal/broadcast()。
当然,本文所示的生产者和消费者线程中,即使不判断队列元素数目而直接发送信号也可以(通过修改队列指针间接地改变条件状态,虽然会发送一些无效信号)。
8. 当只有一个生产者和一个消费者时,通过谨慎操作队列可避免线程间的原子性同步。例如,生产者线程仅更新队尾指针,消费者线程仅更新队首指针。简化的示例如下:
1 #define QUEUE_SIZE 20 2 volatile unsigned int gdwPrdNum = 0, gdwCsmNum = 0; 3 int gQueue[QUEUE_SIZE] = {0}; 4 5 void *Producer(void *pvArg) { 6 while(1) { 7 while(gdwPrdNum - gdwCsmNum == QUEUE_SIZE) 8 ; //Full 9 10 gQueue[gdwPrdNum % QUEUE_SIZE]++; 11 gdwPrdNum++; 12 } 13 pthread_exit(0); 14 } 15 16 void *Consumer(void *pvArg) { 17 while(1) { 18 while(gdwPrdNum - gdwCsmNum == 0) 19 ; //Empty 20 21 gQueue[gdwCsmNum % QUEUE_SIZE]--; 22 gdwCsmNum++; 23 } 24 pthread_exit(0); 25 } |
该例中使用轮询(polling)方式,可在不依赖互斥量和条件变量的情况下高效地共享数据。判空和判满的循环保证两个线程不可能同时操作同一个队列元素。当线程被取消时,可观察到队列中元素值为全零(若无循环则为随机值)。虽然轮询方式通常比较低效,但在多处理器环境中线程休眠和唤醒的频率较小,故绕开数据原子性操作是有利的。
另一则实例则可参考《守护进程接收终端输入的一种变通性方法》一文。
9. 使用Posix信号量可模拟互斥量和条件变量,而且通常更有优势。
当函数sem_wait()和sem_post()用于线程内时,两个调用间的区域就是所要保护的临界区代码;当用于线程间时,则与条件变量等效。
此外,信号量还可用作资源计数器,即初始化信号量的值作为某个资源当前可用的数量,使用时递减释放时递增。这样,原先一些保存队列状态的变量都不再需要。
最后,内核会记录信号的存在,不会将信号丢失;而唤醒条件变量时若没有线程在等待该条件变量,信号将被丢失。