- 一、Thread类
- 1. Thread.h
- 2. Thread::Thread()和Thread::~Thread()
- 3. Thread::start()
- 4. Thread.cc
- 二、EventLoopThread类
- 1. EventLoopThread.h
- 2. EventLoopThread.cc
- 三、EventLoopThreadPool类
- 1. EventLoopThreadPool.h
- 2. EventLoopThreadPool::start()
- 3. EventLoopThreadPool::getNextLoop()
- 4. EventLoopThreadPool.cc
EventLoopThread:执行事件循环的线程
可以看到EventLoopThread打包了一个事件循环EventLoop和一个线程Thread
我们先来看看Thread类
这个线程类就是封装了pthread_create创建的线程,我们使用C++提供的线程类thread,就不使用Linux原生的线程函数了
#pragma once
#include "noncopyable.h"
#include
#include
#include
#include
#include
#include
class Thread : noncopyable{
public:
using ThreadFunc = std::function<void()>; // 没有参数,如果想传参,可以使用绑定器
explicit Thread(ThreadFunc, const std::string& name = std::string());
~Thread();
void start();
void join();
bool started() const { return started_; }
pid_t tid() const { return tid_; } // muduo返回的线程tid相当于使用top命令查看的线程tid,不是pthread_self打印出来的真实的线程号
const std::string& name() const { return name_; }
static int numCreated() { return numCreated_; }
private:
void setDefaultName();
bool started_; // 当前线程是否启动
bool joined_; // 当前线程等待其他线程运行完,当前线程继续运行
std::shared_ptr<std::thread> thread_; // 自己控制线程的启动
pid_t tid_;
ThreadFunc func_; // 线程函数
std::string name_; // 线程名字,调试时可使用
static std::atomic_int32_t numCreated_; // 创建的线程数
};
2. Thread::Thread()和Thread::~Thread()
Thread::Thread(ThreadFunc func, const std::string& name)
: started_(false)
, joined_(false)
, tid_(0)
, func_(std::move(func)) // 把func底层的资源给func_,效率更高
, name_(name)
{
// 成员变量thread_采用默认构造
setDefaultName(); // 给线程一个默认的名字
}
Thread::~Thread(){
// joined_表示是一个工作线程,状态会受到主线程状态的一影响,而守护线程拥有自动结束自己生命周期的特性,而非守护线程不具备这个特点。
if(started_ && !joined_){
// 析构的时候,线程启动了才会有相应的 *** 作
thread_->detach(); // std::thread类提供的设置分离线程的方法,detach后成为守护线程,守护线程结束后,内核自动回收,不会出现孤儿线程
}
}
3. Thread::start()
Linux的pthread_create一旦调用,线程就直接启动了;同样的,使用C++的thread定义对象,绑定一个线程函数也是直接启动了,我们需要自己控制线程启动的时机,而不是创建线程的时候就直接启动,所以我们使用智能指针+lambda表达式的方式控制线程的启动
void Thread::start(){
// 一个Thread对象记录的就是一个新线程的详细信息
started_ = true;
sem_t sem;
sem_init(&sem, false, 0);
// std::thread的构造函数需要一个函数对象,可以使用lambda表达式,以引用形式接收外部参数
thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
tid_ = CurrentThread::tid();
sem_post(&sem); // 信号量的V *** 作
func_(); // 开启一个新线程,专门执行该线程函数
}));
// 这里必须等待获取上面新创建线程的tid,用来阻塞当前线程直到信号量sem的值大于0,sem_wait相当于P *** 作,等线程创建后获取tid,sem的值才能+1
sem_wait(&sem);
}
这里涉及两个线程,线程A调用start并创建新的线程B,假如调用start的线程A走得快,很快就执行到了函数最后,如果新的执行func_的线程B还没有创建出来,线程A就会阻塞在sem_wait,等待子线程B创建
这样的话,如果一旦有一个线程调用了start,那就可以放心的使用这个线程的tid,因为此时这个线程是肯定存在的
4. Thread.cc#include "Thread.h"
#include "CurrentThread.h"
#include
std::atomic_int32_t Thread::numCreated_(0);
Thread::Thread(ThreadFunc func, const std::string& name)
: started_(false)
, joined_(false)
, tid_(0)
, func_(std::move(func)) // 把func底层的资源给func_,效率更高
, name_(name)
{
// 成员变量thread_采用默认构造
setDefaultName(); // 给线程一个默认的名字
}
Thread::~Thread(){
// joined_表示是一个工作线程,状态会受到主线程状态的一影响,而守护线程拥有自动结束自己生命周期的特性,而非守护线程不具备这个特点。
if(started_ && !joined_){
// 析构的时候,线程启动了才会有相应的 *** 作
thread_->detach(); // std::thread类提供的设置分离线程的方法,detach后成为守护线程,守护线程结束后,内核自动回收,不会出现孤儿线程
}
}
// 线程启动,这里涉及两个线程,线程A调用start并创建新的线程B,假如调用start的线程A走得快,很快就执行到了函数最后,如果新的执行func_()的线程B还没有创建出来,线程A就会阻塞在sem_wait,等待子线程B创建
// 这样的话,如果一旦有一个线程调用了start,那就可以放心的使用这个线程的tid,因为此时这个线程是肯定存在的
void Thread::start(){
// 一个Thread对象记录的就是一个新线程的详细信息
started_ = true;
sem_t sem;
sem_init(&sem, false, 0);
// std::thread的构造函数需要一个函数对象,可以使用lambda表达式,以引用形式接收外部参数
thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
tid_ = CurrentThread::tid();
sem_post(&sem); // 信号量的V *** 作
func_(); // 开启一个新线程,专门执行该线程函数
}));
// 这里必须等待获取上面新创建线程的tid,用来阻塞当前线程直到信号量sem的值大于0,sem_wait相当于P *** 作,等线程创建后获取tid,sem的值才能+1
sem_wait(&sem);
}
void Thread::join(){
joined_ = true;
thread_->join();
}
void Thread::setDefaultName(){
int num = ++numCreated_;
if(name_.empty()){
char buff[32] = {0};
snprintf(buff, sizeof(buff), "Thread%d", num);
name_ = buff;
}
}
二、EventLoopThread类
Thread类只关于一个线程,EventLoopThread类用于绑定一个EventLoop和一个Thread,在一个Thread里创建一个EventLoop,让这个Thread执行一个EventLoop,即:one loop per thread
#include "noncopyable.h"
#include "Thread.h"
#include
#include
#include
#include
class EventLoop;
class EventLoopThread : noncopyable{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;
EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(), const std::string& name = std::string());
~EventLoopThread();
EventLoop* startLoop();
private:
void threadFunc();
EventLoop* loop_;
bool exiting_; // 是否退出事件循环
Thread thread_;
std::mutex mutex_;
std::condition_variable cond_;
ThreadInitCallback callback_; // 启一个新线程绑定EventLoop时调用的,进行一些init相关 *** 作
};
2. EventLoopThread.cc
#include "EventLoopThread.h"
#include "EventLoop.h"
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb, const std::string& name)
: loop_(nullptr)
, exiting_(false)
, thread_(std::bind(&EventLoopThread::threadFunc, this), name)
, mutex_()
, cond_()
, callback_(cb)
{}
EventLoopThread::~EventLoopThread(){
exiting_ = true;
if(loop_ != nullptr){
loop_->quit(); // 退出事件循环
thread_.join(); // 等待子线程结束
}
}
EventLoop* EventLoopThread::startLoop(){
thread_.start(); // 底层创建一个新线程,并执行成员变量thread构造时传入的threadFunc
EventLoop* loop = nullptr;
{
std::unique_lock<std::mutex> lock(mutex_);
while(loop_ == nullptr){
cond_.wait(lock); // 成员变量loop_没有被新线程初始化的时候,一直wait在lock上
}
loop = loop_;
}
return loop;
}
// threadFunc是在单独的新线程里面运行的
void EventLoopThread::threadFunc(){
// 创建一个独立的eventloop,和上面的线程一一对应,one loop per thread
EventLoop loop;
if(callback_){
// 如果我们实现传递了callback_,ThreadInitCallback就是在底层启一个新线程绑定EventLoop时调用的,进行一些init相关 *** 作
callback_(&loop);
}
{
std::unique_lock<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_one(); // 通知
}
loop.loop(); // EventLoop loop => Poller.poll,开启事件循环,监听新用户的连接或者已连接用户的读写事件
// 一般来说,loop是一直执行的,能执行到下面的语句,说明程序要退出了,要关闭事件循环
std::unique_lock<std::mutex> lock(mutex_);
loop_ = nullptr;
}
当我们调用EventLoopThread::startLoop
的时候,底层才创建一个新线程,而这个刚刚创建的新线程需要执行一个线程函数(我们在构造成员变量thread_时传入的EventLoopThread::threadFunc()
),等新创建的线程初始化成员变量EventLoopThread::loop_
完成后,才会通知调用startLoop
的线程访问成员变量loop_
即调用EventLoopThread::startLoop
就会返回一个EventLoop对象,并且成员变量EventLoopThread::loop_
也记录了该EventLoop对象的地址
EventLoopThreadPoll是一个事件线程池,管理EventLoop,EventLoop绑定的就是一个线程
1. EventLoopThreadPool.h#pragma once
#include "noncopyable.h"
#include "Thread.h"
#include
#include
#include
#include
class EventLoop;
class EventLoopThread;
class EventLoopThreadPool : noncopyable{
public:
using ThreadInitCallback = std::function<void(EventLoop*)>;
EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg);
~EventLoopThreadPool();
// 设置底层线程的数量,TcpServer::setThreadNum底层调用的就是EventLoopThreadPool::setThreadNum
void setThreadNum(int numThreads){ numThreads_ = numThreads; }
void start(const ThreadInitCallback& cb = ThreadInitCallback());
// 如果工作在多线程中,baseLoop_默认以轮询的方式分配Channel给subLoop
EventLoop* getNextLoop();
// 返回事件循环池所有的EventLoop
std::vector<EventLoop*> getAllLoops();
bool started() const{ return started_; }
const std::string name() const { return name_; }
private:
EventLoop* baseLoop_; // 我们使用muduo编写程序的时候,就会定义一个EventLoop变量,这个变量作为TcpServer构造函数的参数,用户创建的就叫做baseLoop
std::string name_; // 线程池的名字
bool started_;
int numThreads_;
long unsigned int next_;
std::vector<std::unique_ptr<EventLoopThread>> threads_; // 包含了创建的所有subLoop的线程
std::vector<EventLoop*> loops_; // 包含了所有创建的subLoop的指针,这些EventLoop对象都是栈上的(见EventLoopThread::threadFunc)
};
-
成员变量baseLoop_:我们使用muduo编写程序的时候,就会定义一个EventLoop变量,这个变量作为TcpServer构造函数的参数,用户创建的就叫做baseLoop
-
成员变量numThreads_:表示subReactor的数量。我们使用muduo编写程序的时候,就会定义一个EventLoop变量,这个变量作为TcpServer构造函数的参数,这就是baseLoop。如果我们不使用setThreadNum指定Reactor模型线程数量,那么muduo默认使用单线程模型,这个线程既负责新用户连接,也负责已连接用户的读写事件
-
成员变量loops_:包含了所有创建的subLoop的指针,这些EventLoop对象都是栈上的(见EventLoopThread::threadFunc),不需要我们手动释放。EventLoopThread::threadFunc函数是由线程执行的,EventLoop对象存在于线程栈上
根据指定的numThreads_创建线程,开启事件循环
void EventLoopThreadPool::start(const ThreadInitCallback& cb){
started_ = true;
// 用户没有调用EventLoopThreadPool::setThreadNum()
for(int i = 0; i < numThreads_; ++i){
char buff[name_.size() + 32]; // 用“线程池的名字 + 下标”作为底层线程的名字
snprintf(buff, sizeof(buff), "%s%d", name_.c_str());
EventLoopThread* thread = new EventLoopThread(cb, buff);
threads_.push_back(std::unique_ptr<EventLoopThread>(thread)); // 用unique_ptr管理堆上的EventLoopThread对象,以免我们手动释放
loops_.push_back(thread->startLoop()); // 调用EventLoopThread::startLoop后,会返回一个栈上的EventLoop对象,事件循环不停止,栈上的EventLoop对象不释放
}
// 如果用户没有调用EventLoopThreadPool::setThreadNum(),numThreads_默认为0
if(numThreads_ == 0 && cb){
// 整个服务器只有一个线程,即用户创建的baseLoop_
cb(baseLoop_);
}
}
3. EventLoopThreadPool::getNextLoop()
不使用setThreadNum指定Reactor模型线程数量,那么muduo默认只有一个baseLoop_
EventLoop* EventLoopThreadPool::getNextLoop(){
EventLoop* loop = baseLoop_;
if(!loops_.empty()){
// 通过轮询处理下一个事件的loop
loop = loops_[next_];
++next_;
if(next_ >= loops_.size()){
next_ = 0;
}
}
return loop;
}
4. EventLoopThreadPool.cc
#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg)
: baseLoop_(baseLoop)
, name_(nameArg)
, started_(false)
, numThreads_(0)
, next_(0)
{}
EventLoopThreadPool::~EventLoopThreadPool(){}
void EventLoopThreadPool::start(const ThreadInitCallback& cb){
started_ = true;
// 用户没有调用EventLoopThreadPool::setThreadNum()
for(int i = 0; i < numThreads_; ++i){
char buff[name_.size() + 32]; // 用“线程池的名字 + 下标”作为底层线程的名字
snprintf(buff, sizeof(buff), "%s%d", name_.c_str(), i);
EventLoopThread* thread = new EventLoopThread(cb, buff);
threads_.push_back(std::unique_ptr<EventLoopThread>(thread)); // 用unique_ptr管理堆上的EventLoopThread对象,以免我们手动释放
loops_.push_back(thread->startLoop()); // 调用EventLoopThread::startLoop后,会返回一个栈上的EventLoop对象,事件循环不停止,栈上的EventLoop对象不释放
}
// 如果用户没有调用EventLoopThreadPool::setThreadNum(),numThreads_默认为0
if(numThreads_ == 0 && cb){
// 整个服务器只有一个线程,即用户创建的baseLoop_
cb(baseLoop_);
}
}
// 如果工作在多线程中,baseLoop_默认以轮询的方式分配Channel给subLoop
EventLoop* EventLoopThreadPool::getNextLoop(){
EventLoop* loop = baseLoop_;
if(!loops_.empty()){
loop = loops_[next_];
++next_;
if(next_ >= loops_.size()){
next_ = 0;
}
}
return loop;
}
// 返回事件循环池所有的EventLoop
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops(){
if(loops_.empty()){
return std::vector<EventLoop*>(1, baseLoop_);
}
return loops_;
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)