muduo事件循环线程相关类Thread、EventLoopThread、EventLoopThreadPool

muduo事件循环线程相关类Thread、EventLoopThread、EventLoopThreadPool,第1张

文章目录
    • 一、Thread类
      • 1. Thread.h
      • 2. Thread::Thread()和Thread::~Thread()
      • 3. Thread::start()
      • 4. Thread.cc
    • 二、EventLoopThread类
      • 1. EventLoopThread.h
      • 2. EventLoopThread.cc
    • 三、EventLoopThreadPool类
      • 1. EventLoopThreadPool.h
      • 2. EventLoopThreadPool::start()
      • 3. EventLoopThreadPool::getNextLoop()
      • 4. EventLoopThreadPool.cc

EventLoopThread:执行事件循环的线程

可以看到EventLoopThread打包了一个事件循环EventLoop和一个线程Thread

一、Thread类

我们先来看看Thread类

这个线程类就是封装了pthread_create创建的线程,我们使用C++提供的线程类thread,就不使用Linux原生的线程函数了

1. Thread.h
#pragma once

#include "noncopyable.h"

#include 
#include 
#include 
#include 
#include 
#include 

class Thread : noncopyable{
    public:
        using ThreadFunc = std::function<void()>;  // 没有参数,如果想传参,可以使用绑定器
        explicit Thread(ThreadFunc, const std::string& name = std::string());
        ~Thread();

        void start();
        void join();

        bool started() const { return started_; }
        pid_t tid() const { return tid_; }         // muduo返回的线程tid相当于使用top命令查看的线程tid,不是pthread_self打印出来的真实的线程号
        const std::string& name() const { return name_; }
        static int numCreated() { return numCreated_; }

    private:
        void setDefaultName();

        bool started_;           // 当前线程是否启动 
        bool joined_;            // 当前线程等待其他线程运行完,当前线程继续运行
        std::shared_ptr<std::thread> thread_;  // 自己控制线程的启动
        pid_t tid_;
        ThreadFunc func_;                 // 线程函数
        std::string name_;                // 线程名字,调试时可使用
        static std::atomic_int32_t numCreated_;   // 创建的线程数
};
2. Thread::Thread()和Thread::~Thread()
Thread::Thread(ThreadFunc func, const std::string& name)
    : started_(false)
    , joined_(false)
    , tid_(0)
    , func_(std::move(func))         // 把func底层的资源给func_,效率更高
    , name_(name)
{
    // 成员变量thread_采用默认构造
    setDefaultName();                // 给线程一个默认的名字
}

Thread::~Thread(){
    // joined_表示是一个工作线程,状态会受到主线程状态的一影响,而守护线程拥有自动结束自己生命周期的特性,而非守护线程不具备这个特点。
    if(started_ && !joined_){
        // 析构的时候,线程启动了才会有相应的 *** 作
        thread_->detach();           // std::thread类提供的设置分离线程的方法,detach后成为守护线程,守护线程结束后,内核自动回收,不会出现孤儿线程
    }
}
3. Thread::start()

Linux的pthread_create一旦调用,线程就直接启动了;同样的,使用C++的thread定义对象,绑定一个线程函数也是直接启动了,我们需要自己控制线程启动的时机,而不是创建线程的时候就直接启动,所以我们使用智能指针+lambda表达式的方式控制线程的启动

void Thread::start(){
    // 一个Thread对象记录的就是一个新线程的详细信息
    started_ = true;
    sem_t sem;
    sem_init(&sem, false, 0);

    // std::thread的构造函数需要一个函数对象,可以使用lambda表达式,以引用形式接收外部参数
    thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
        tid_ = CurrentThread::tid();
        sem_post(&sem);    // 信号量的V *** 作
        func_();  // 开启一个新线程,专门执行该线程函数
    }));

    // 这里必须等待获取上面新创建线程的tid,用来阻塞当前线程直到信号量sem的值大于0,sem_wait相当于P *** 作,等线程创建后获取tid,sem的值才能+1
    sem_wait(&sem);
}

这里涉及两个线程,线程A调用start并创建新的线程B,假如调用start的线程A走得快,很快就执行到了函数最后,如果新的执行func_的线程B还没有创建出来,线程A就会阻塞在sem_wait,等待子线程B创建

这样的话,如果一旦有一个线程调用了start,那就可以放心的使用这个线程的tid,因为此时这个线程是肯定存在的

4. Thread.cc
#include "Thread.h"
#include "CurrentThread.h"

#include 

std::atomic_int32_t Thread::numCreated_(0);

Thread::Thread(ThreadFunc func, const std::string& name)
    : started_(false)
    , joined_(false)
    , tid_(0)
    , func_(std::move(func))         // 把func底层的资源给func_,效率更高
    , name_(name)
{
    // 成员变量thread_采用默认构造
    setDefaultName();                // 给线程一个默认的名字
}

Thread::~Thread(){
    // joined_表示是一个工作线程,状态会受到主线程状态的一影响,而守护线程拥有自动结束自己生命周期的特性,而非守护线程不具备这个特点。
    if(started_ && !joined_){
        // 析构的时候,线程启动了才会有相应的 *** 作
        thread_->detach();           // std::thread类提供的设置分离线程的方法,detach后成为守护线程,守护线程结束后,内核自动回收,不会出现孤儿线程
    }
}

// 线程启动,这里涉及两个线程,线程A调用start并创建新的线程B,假如调用start的线程A走得快,很快就执行到了函数最后,如果新的执行func_()的线程B还没有创建出来,线程A就会阻塞在sem_wait,等待子线程B创建
// 这样的话,如果一旦有一个线程调用了start,那就可以放心的使用这个线程的tid,因为此时这个线程是肯定存在的
void Thread::start(){
    // 一个Thread对象记录的就是一个新线程的详细信息
    started_ = true;
    sem_t sem;
    sem_init(&sem, false, 0);

    // std::thread的构造函数需要一个函数对象,可以使用lambda表达式,以引用形式接收外部参数
    thread_ = std::shared_ptr<std::thread>(new std::thread([&](){
        tid_ = CurrentThread::tid();
        sem_post(&sem);    // 信号量的V *** 作
        func_();  // 开启一个新线程,专门执行该线程函数
    }));

    // 这里必须等待获取上面新创建线程的tid,用来阻塞当前线程直到信号量sem的值大于0,sem_wait相当于P *** 作,等线程创建后获取tid,sem的值才能+1
    sem_wait(&sem);
}

void Thread::join(){
    joined_ = true;
    thread_->join();
}

void Thread::setDefaultName(){
    int num = ++numCreated_;
    if(name_.empty()){
        char buff[32] = {0};
        snprintf(buff, sizeof(buff), "Thread%d", num);
        name_ = buff;
    }
}
二、EventLoopThread类

Thread类只关于一个线程,EventLoopThread类用于绑定一个EventLoop和一个Thread,在一个Thread里创建一个EventLoop,让这个Thread执行一个EventLoop,即:one loop per thread

1. EventLoopThread.h
#include "noncopyable.h"
#include "Thread.h"

#include 
#include 
#include 
#include 

class EventLoop;

class EventLoopThread : noncopyable{
    public:
        using ThreadInitCallback = std::function<void(EventLoop*)>;
        EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(), const std::string& name = std::string());
        ~EventLoopThread();
        
        EventLoop* startLoop();

    private:
        void threadFunc();

        EventLoop* loop_;
        bool exiting_;                 // 是否退出事件循环
        Thread thread_;
        std::mutex mutex_;
        std::condition_variable cond_;
        ThreadInitCallback callback_;  // 启一个新线程绑定EventLoop时调用的,进行一些init相关 *** 作
};
2. EventLoopThread.cc
#include "EventLoopThread.h"

#include "EventLoop.h"

EventLoopThread::EventLoopThread(const ThreadInitCallback& cb, const std::string& name)
    : loop_(nullptr)
    , exiting_(false)
    , thread_(std::bind(&EventLoopThread::threadFunc, this), name)
    , mutex_()
    , cond_()
    , callback_(cb)
{}

EventLoopThread::~EventLoopThread(){
    exiting_ = true;
    if(loop_ != nullptr){
        loop_->quit();    // 退出事件循环
        thread_.join();   // 等待子线程结束
    }
}

EventLoop* EventLoopThread::startLoop(){
    thread_.start();  // 底层创建一个新线程,并执行成员变量thread构造时传入的threadFunc

    EventLoop* loop = nullptr;
    {
        std::unique_lock<std::mutex> lock(mutex_);
        while(loop_ == nullptr){
            cond_.wait(lock);   // 成员变量loop_没有被新线程初始化的时候,一直wait在lock上
        }
        loop = loop_;
    }
    return loop;
}

// threadFunc是在单独的新线程里面运行的
void EventLoopThread::threadFunc(){
    // 创建一个独立的eventloop,和上面的线程一一对应,one loop per thread
    EventLoop loop;
    if(callback_){
        // 如果我们实现传递了callback_,ThreadInitCallback就是在底层启一个新线程绑定EventLoop时调用的,进行一些init相关 *** 作
        callback_(&loop);
    }

    {
        std::unique_lock<std::mutex> lock(mutex_);
        loop_ = &loop;
        cond_.notify_one();  // 通知
    }

    loop.loop();     // EventLoop loop  => Poller.poll,开启事件循环,监听新用户的连接或者已连接用户的读写事件

    // 一般来说,loop是一直执行的,能执行到下面的语句,说明程序要退出了,要关闭事件循环
    std::unique_lock<std::mutex> lock(mutex_);
    loop_ = nullptr;
}

当我们调用EventLoopThread::startLoop的时候,底层才创建一个新线程,而这个刚刚创建的新线程需要执行一个线程函数(我们在构造成员变量thread_时传入的EventLoopThread::threadFunc()),等新创建的线程初始化成员变量EventLoopThread::loop_完成后,才会通知调用startLoop的线程访问成员变量loop_

即调用EventLoopThread::startLoop就会返回一个EventLoop对象,并且成员变量EventLoopThread::loop_也记录了该EventLoop对象的地址

三、EventLoopThreadPool类

EventLoopThreadPoll是一个事件线程池,管理EventLoop,EventLoop绑定的就是一个线程

1. EventLoopThreadPool.h
#pragma once

#include "noncopyable.h"
#include "Thread.h"

#include 
#include 
#include 
#include 

class EventLoop;
class EventLoopThread;

class EventLoopThreadPool : noncopyable{
    public:
        using ThreadInitCallback = std::function<void(EventLoop*)>;

        EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg);
        ~EventLoopThreadPool();

        // 设置底层线程的数量,TcpServer::setThreadNum底层调用的就是EventLoopThreadPool::setThreadNum
        void setThreadNum(int numThreads){ numThreads_ = numThreads; }
        void start(const ThreadInitCallback& cb = ThreadInitCallback());

        // 如果工作在多线程中,baseLoop_默认以轮询的方式分配Channel给subLoop
        EventLoop* getNextLoop();
        // 返回事件循环池所有的EventLoop
        std::vector<EventLoop*> getAllLoops();

        bool started() const{ return started_; }
        const std::string name() const { return name_; }

    private:
        EventLoop* baseLoop_; // 我们使用muduo编写程序的时候,就会定义一个EventLoop变量,这个变量作为TcpServer构造函数的参数,用户创建的就叫做baseLoop
        std::string name_;    // 线程池的名字
        bool started_;
        int numThreads_;
        long unsigned int next_;
        std::vector<std::unique_ptr<EventLoopThread>> threads_;  // 包含了创建的所有subLoop的线程
        std::vector<EventLoop*> loops_;                          // 包含了所有创建的subLoop的指针,这些EventLoop对象都是栈上的(见EventLoopThread::threadFunc)
};
  • 成员变量baseLoop_:我们使用muduo编写程序的时候,就会定义一个EventLoop变量,这个变量作为TcpServer构造函数的参数,用户创建的就叫做baseLoop

  • 成员变量numThreads_:表示subReactor的数量。我们使用muduo编写程序的时候,就会定义一个EventLoop变量,这个变量作为TcpServer构造函数的参数,这就是baseLoop。如果我们不使用setThreadNum指定Reactor模型线程数量,那么muduo默认使用单线程模型,这个线程既负责新用户连接,也负责已连接用户的读写事件

  • 成员变量loops_:包含了所有创建的subLoop的指针,这些EventLoop对象都是栈上的(见EventLoopThread::threadFunc),不需要我们手动释放。EventLoopThread::threadFunc函数是由线程执行的,EventLoop对象存在于线程栈上

2. EventLoopThreadPool::start()

根据指定的numThreads_创建线程,开启事件循环

void EventLoopThreadPool::start(const ThreadInitCallback& cb){
    started_ = true;
    // 用户没有调用EventLoopThreadPool::setThreadNum()
    for(int i = 0; i < numThreads_; ++i){
        char buff[name_.size() + 32];  // 用“线程池的名字 + 下标”作为底层线程的名字
        snprintf(buff, sizeof(buff), "%s%d", name_.c_str());
        EventLoopThread* thread = new EventLoopThread(cb, buff);
        threads_.push_back(std::unique_ptr<EventLoopThread>(thread));  // 用unique_ptr管理堆上的EventLoopThread对象,以免我们手动释放
        loops_.push_back(thread->startLoop());                         // 调用EventLoopThread::startLoop后,会返回一个栈上的EventLoop对象,事件循环不停止,栈上的EventLoop对象不释放
    }

    // 如果用户没有调用EventLoopThreadPool::setThreadNum(),numThreads_默认为0
    if(numThreads_ == 0 && cb){
        // 整个服务器只有一个线程,即用户创建的baseLoop_
        cb(baseLoop_);
    }
}
3. EventLoopThreadPool::getNextLoop()

不使用setThreadNum指定Reactor模型线程数量,那么muduo默认只有一个baseLoop_

EventLoop* EventLoopThreadPool::getNextLoop(){
    EventLoop* loop = baseLoop_;
    if(!loops_.empty()){
    	// 通过轮询处理下一个事件的loop
        loop = loops_[next_];
        ++next_;
        if(next_ >= loops_.size()){
            next_ = 0;
        }
    }
    return loop;
}
4. EventLoopThreadPool.cc
#include "EventLoopThreadPool.h"
#include "EventLoopThread.h"

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const std::string& nameArg)
    : baseLoop_(baseLoop)
    , name_(nameArg)
    , started_(false)
    , numThreads_(0)
    , next_(0)
{}

EventLoopThreadPool::~EventLoopThreadPool(){}

void EventLoopThreadPool::start(const ThreadInitCallback& cb){
    started_ = true;
    // 用户没有调用EventLoopThreadPool::setThreadNum()
    for(int i = 0; i < numThreads_; ++i){
        char buff[name_.size() + 32];  // 用“线程池的名字 + 下标”作为底层线程的名字
        snprintf(buff, sizeof(buff), "%s%d", name_.c_str(), i);
        EventLoopThread* thread = new EventLoopThread(cb, buff);
        threads_.push_back(std::unique_ptr<EventLoopThread>(thread));  // 用unique_ptr管理堆上的EventLoopThread对象,以免我们手动释放
        loops_.push_back(thread->startLoop());                         // 调用EventLoopThread::startLoop后,会返回一个栈上的EventLoop对象,事件循环不停止,栈上的EventLoop对象不释放
    }

    // 如果用户没有调用EventLoopThreadPool::setThreadNum(),numThreads_默认为0
    if(numThreads_ == 0 && cb){
        // 整个服务器只有一个线程,即用户创建的baseLoop_
        cb(baseLoop_);
    }
}

// 如果工作在多线程中,baseLoop_默认以轮询的方式分配Channel给subLoop
EventLoop* EventLoopThreadPool::getNextLoop(){
    EventLoop* loop = baseLoop_;
    if(!loops_.empty()){
        loop = loops_[next_];
        ++next_;
        if(next_ >= loops_.size()){
            next_ = 0;
        }
    }
    return loop;
}

// 返回事件循环池所有的EventLoop
std::vector<EventLoop*> EventLoopThreadPool::getAllLoops(){
    if(loops_.empty()){
        return std::vector<EventLoop*>(1, baseLoop_);
    }
    return loops_;
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/langs/797585.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存