ThinkNotes

Simple is not easy | 化繁为简,知易行难

0%

多线程笔记:多线程的同步机制

本文讲解并发环境中的几个线程同步示例
线程同步,即多个线程如何协调,谁先谁后
本文基于Linux/POSIX API
本系列源码:cursorhu/SimpleMultiThread

生产者消费者模式

生产者/消费者模式是并发环境常见的模式,简单地讲,通过中介缓冲,支持多组任务并发执行,避免任务间发生通信阻塞。
参考:生产者/消费者模式的理解及实现

常用的实现方式

信号量实现

关于LInux信号量:Linux信号量

示例:

#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <list>
#include <semaphore.h>
#include <iostream>

class Task
{
public:
	Task(int taskID)
	{
		this->taskID = taskID;
	}
	
	void doTask()
	{
		std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; 
	}
	
private:
	int taskID;
};

pthread_mutex_t  mymutex;
std::list<Task*> tasks;
sem_t            mysemaphore;

void* consumer_thread(void* param)
{	
	Task* pTask = NULL;
	while (true)
	{
		struct timespec ts;
		ts.tv_sec = 3;
		ts.tv_nsec = 0;
		
		if (sem_timewait(&mysemaphore, &ts) != 0)
		{
			if (errno == ETIMEOUT)
			{
				std::cout << "ETIMEOUT" << std::endl;
			}
			continue;
		}
		
		if (tasks.empty())
			continue;
		
		pthread_mutex_lock(&mymutex);	
		pTask = tasks.front();
		tasks.pop_front();
		pthread_mutex_unlock(&mymutex);
		
		pTask->doTask();
		delete pTask;
	}
	
	return NULL;
}

void* producer_thread(void* param)
{
	int taskID = 0;
	Task* pTask = NULL;
	
	while (true)
	{
		pTask = new Task(taskID);
			
		pthread_mutex_lock(&mymutex);
		tasks.push_back(pTask);
		std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; 
		
		pthread_mutex_unlock(&mymutex);
		
		//释放信号量,通知消费者线程
		sem_post(&mysemaphore);
		
		taskID ++;

		//休眠1秒
		sleep(1);
	}
	
	return NULL;
}

int main()
{
	pthread_mutex_init(&mymutex, NULL);
	//初始信号量资源计数为0
	sem_init(&mysemaphore, 0, 0);

	//创建5个消费者线程
	pthread_t consumerThreadID[5];
	for (int i = 0; i < 5; ++i)
	{
		pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
	}
	
	//创建一个生产者线程
	pthread_t producerThreadID;
	pthread_create(&producerThreadID, NULL, producer_thread, NULL);

	pthread_join(producerThreadID, NULL);
	
	for (int i = 0; i < 5; ++i)
	{
		pthread_join(consumerThreadID[i], NULL);
	}
	
	sem_destroy(&mysemaphore);
	pthread_mutex_destroy(&mymutex);

	return 0;
}

说明几点:

  • 信号量和锁一样,全局的
  • sem_post和sem_wait是P(), V()操作的具体实现,即计数+1,-1

条件变量实现

关于条件变量(cv):深入解析条件变量(condition variables)
条件变量同锁一起使用使得线程可以以一种无竞争的方式等待任意条件的发生。所谓无竞争就是,条件改变这个信号会发送到所有等待这个信号的线程。

示例:

#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <list>
#include <semaphore.h>
#include <iostream>

class Task
{
public:
	Task(int taskID)
	{
		this->taskID = taskID;
	}
	
	void doTask()
	{
		std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; 
	}
	
private:
	int taskID;
};

pthread_mutex_t  mymutex;
std::list<Task*> tasks;
pthread_cond_t   mycv;

void* consumer_thread(void* param)
{	
	Task* pTask = NULL;
	while (true)
	{
		pthread_mutex_lock(&mymutex);
		while (tasks.empty())
		{				
			//如果获得了互斥锁,但是条件不合适的话,pthread_cond_wait会释放锁,不往下执行。
			//当发生变化后,条件合适,pthread_cond_wait将直接获得锁。
			pthread_cond_wait(&mycv, &mymutex);
		}
		
		pTask = tasks.front();
		tasks.pop_front();

		pthread_mutex_unlock(&mymutex);
		
		if (pTask == NULL)
			continue;

		pTask->doTask();
		delete pTask;
		pTask = NULL;		
	}
	
	return NULL;
}

void* producer_thread(void* param)
{
	int taskID = 0;
	Task* pTask = NULL;
	
	while (true)
	{
		pTask = new Task(taskID);
			
		pthread_mutex_lock(&mymutex);
		tasks.push_back(pTask);
		std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl; 
		
		pthread_mutex_unlock(&mymutex);
		
		//释放条件信号,通知消费者线程
		pthread_cond_signal(&mycv);
		
		taskID ++;

		//休眠1秒
		sleep(1);
	}
	
	return NULL;
}

int main()
{
	pthread_mutex_init(&mymutex, NULL);
	pthread_cond_init(&mycv, NULL);

	//创建5个消费者线程
	pthread_t consumerThreadID[5];
	for (int i = 0; i < 5; ++i)
	{
		pthread_create(&consumerThreadID[i], NULL, consumer_thread, NULL);
	}
	
	//创建一个生产者线程
	pthread_t producerThreadID;
	pthread_create(&producerThreadID, NULL, producer_thread, NULL);

	pthread_join(producerThreadID, NULL);
	
	for (int i = 0; i < 5; ++i)
	{
		pthread_join(consumerThreadID[i], NULL);
	}
	
	pthread_cond_destroy(&mycv);
	pthread_mutex_destroy(&mymutex);

	return 0;
}

读写锁实现

关于读写锁,参考:Linux:使用读写锁使线程同步

示例:

#include <pthread.h>
#include <unistd.h>
#include <iostream>

int resourceID = 0;
pthread_rwlock_t myrwlock;

void* read_thread(void* param)
{	
	while (true)
	{
		//请求读锁
		pthread_rwlock_rdlock(&myrwlock);

		std::cout << "read thread ID: " << pthread_self() << ", resourceID: " << resourceID << std::endl;
				
		//使用睡眠模拟读线程读的过程消耗了很久的时间
		sleep(1);
				
		pthread_rwlock_unlock(&myrwlock);
	}
	
	return NULL;
}

void* write_thread(void* param)
{
	while (true)
	{
		//请求写锁
		pthread_rwlock_wrlock(&myrwlock);

		++resourceID;
		std::cout << "write thread ID: " << pthread_self() << ", resourceID: " << resourceID << std::endl;
				
		//使用睡眠模拟读线程读的过程消耗了很久的时间
		sleep(1);
				
		pthread_rwlock_unlock(&myrwlock);
	}
	
	return NULL;
}

int main()
{
	pthread_rwlock_init(&myrwlock, NULL);

	//创建5个请求读锁线程
	pthread_t readThreadID[5];
	for (int i = 0; i < 5; ++i)
	{
		pthread_create(&readThreadID[i], NULL, read_thread, NULL);
	}
	
	//创建一个请求写锁线程
	pthread_t writeThreadID;
	pthread_create(&writeThreadID, NULL, write_thread, NULL);

	pthread_join(writeThreadID, NULL);
	
	for (int i = 0; i < 5; ++i)
	{
		pthread_join(readThreadID[i], NULL);
	}
	
	pthread_rwlock_destroy(&myrwlock);

	return 0;
}