[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),第1张

[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第2张


[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第3张

一.网络层与传输层协议

  • 网络层与传输层内置于 *** 作系统的内核中,网络层一般使用

    ip

    协议,传输层常用协议为

    Tcp

    协议和

    Udp

    协议,

    Tcp

    协议和

    Udp

    协议拥有各自的特点和应用场景:

    [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第4张

sockaddr结构体继承体系(Linux体系)

  • sockaddr_in

    结构体用于存储网络通信主机进程的

    ip

    和端口号等信息

    [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第5张

贯穿计算机系统的网络通信架构图示:

[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第6张

二.实现并部署多线程并发Tcp服务器框架

小项目的完整文件的gittee链接

  • Tcp

    服务器架构:

    [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第7张

线程池模块

#pragma once #include <iostream> #include <pthread.h> #include "log.hpp" #include <semaphore.h> #include <vector> #include <cstdio>template<class T> class RingQueue{ private: pthread_mutex_t Clock_; pthread_mutex_t Plock_; sem_t Psem_; sem_t Csem_; std::vector<T> Queue_; int Pptr_; int Cptr_; int capacity_; public: RingQueue(int capacity = 10) : Queue_(capacity),Pptr_(0),Cptr_(0),capacity_(capacity){ sem_init(&Psem_,0,capacity); sem_init(&Csem_,0,0); pthread_mutex_init(&Clock_,nullptr); pthread_mutex_init(&Plock_,nullptr); } ~RingQueue(){ sem_destroy(&Psem_); sem_destroy(&Csem_); pthread_mutex_destroy(&Clock_); pthread_mutex_destroy(&Plock_); } T Pop(){ sem_wait(&Csem_); pthread_mutex_lock(&Clock_); T tem = Queue_[Cptr_]; Cptr_++; Cptr_ %= capacity_; pthread_mutex_unlock(&Clock_); sem_post(&Psem_); return tem; } void Push(T t){ sem_wait(&Psem_); pthread_mutex_lock(&Plock_); Queue_[Pptr_] = t; Pptr_++; Pptr_%= capacity_; pthread_mutex_unlock(&Plock_); sem_post(&Csem_); } };

#pragma once #include "sem_cp.cpp" #include <pthread.h> #include <iostream> #include <string> #include <mutex> #include "CalTask.cpp"template<class Task> class Thread_Pool{ struct Thread_Data{ int Thread_num; pthread_t tid; }; private: RingQueue<Task> Queue_; //线程安全的环形队列 std::vector<Thread_Data> thread_arr; //管理线程的容器 static std::mutex lock_; //单例锁 static Thread_Pool<Task> * ptr_; //单例指针 private: Thread_Pool(int capacity_Of_queue = 20) : Queue_(capacity_Of_queue){} Thread_Pool(const Thread_Pool<Task>& Tp) = delete; Thread_Pool<Task>& operator=(const Thread_Pool<Task> & Tp) = delete; public: ~Thread_Pool(){} //获取线程池单例-->注意C++的类模板静态成员函数需要在类体外进行定义 static Thread_Pool<Task> * Getinstance(); //创建多线程 void Create_thread(int thread_num = 10){ Thread_Data T_data; for(int i = 0 ; i < thread_num ; ++i){ //注意线程池对象的this指针传递给线程 pthread_create(&T_data.tid,nullptr,Routine,this); T_data.Thread_num = i + 1; thread_arr.push_back(T_data); } } //线程等待 void Thread_join(){ for(int i = 0 ;i < thread_arr.size() ; ++i){ pthread_join(thread_arr[i].tid,nullptr); } } //向线程池中加入任务 void Push(Task T){ Queue_.Push(T); } void Push(Task && T){ Queue_.Push(std::forward<Task>(T)); } private: //线程函数-->该函数没有在类外调用,所以无须在类体外定义 static void* Routine(void * args){ Thread_Pool<Task> * Pool = static_cast<Thread_Pool<Task> *>(args); while(true){ std::cout << "Thread prepare to work\n" << std::endl; Task Thread_Task = Pool->Queue_.Pop(); //要求Task类重载()-->用于执行具体任务 Thread_Task(); } return nullptr; } }; //初始化静态指针 template<class Task> Thread_Pool<Task> * Thread_Pool<Task>::ptr_ = nullptr; template<class Task> std::mutex Thread_Pool<Task>::lock_;//注意C++的类模板静态成员函数需要在类体外进行定义 template<class Task> Thread_Pool<Task> * Thread_Pool<Task>::Getinstance(){ if(ptr_ == nullptr){ lock_.lock(); if(ptr_ == nullptr){ ptr_ = new Thread_Pool<Task>; } lock_.unlock(); } return ptr_; }

序列化反序列化工具模块

  • 序列反序列化是保证通信过程中数据完整性的关键步骤,保证数据语义完整,结构完整

[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第8张

#pragma once #include <iostream> #include <string>// 自定义序列化反序列化协议 const std::string blank_space_sep = " "; const std::string protocol_sep = "\n"; //封装报文 std::string Encode(std::string &content){ //报文正文字节数 std::string package = std::to_string(content.size()); package += protocol_sep; package += content; //用分隔符封装正文 package += protocol_sep; return package; }//解析报文package-->"正文长度"\n"正文"\n bool Decode(std::string &package, std::string& content){ size_t pos = package.find(protocol_sep); if(pos == std::string::npos) return false; //解析报文正文长度 size_t Len = std::atoi(package.substr(0,pos).c_str()); //确定报文是否完整 size_t total_Len = pos + Len + 2; if(package.size() != total_Len) return false; //获取正文内容 content = package.substr(pos+1,Len); package.erase(0,total_Len); return true; } //用户层协议请求结构体 class Request{ public: int x; int y; char op; public: Request(int data1 , int data2 , char op) : x(data1),y(data2),op(op){} Request(){} public: //请求结构体 序列化 成报文正文字符串 "x op y" bool Serialize(std::string& out){ std::string content = std::to_string(x); content += blank_space_sep; content += op; content += blank_space_sep; content += std::to_string(y); out = content; return true; // 等价的jason代码 // Json::Value root; // root["x"] = x; // root["y"] = y; // root["op"] = op; // // Json::FastWriter w; // Json::StyledWriter w; // out = w.write(root); // return true; } //报文正文字符串 反序列化 成请求结构体 // "x op y" bool Deserialize(const std::string &in) { size_t left = in.find(blank_space_sep); if(left == std::string::npos)return false; x = std::stoi(in.substr(0,left).c_str()); std::size_t right = in.rfind(blank_space_sep); if (right == std::string::npos)return false; y = std::atoi(in.substr(right + 1).c_str()); if(left + 2 != right) return false; op = in[left+1]; return true; // 等价的jason代码 // Json::Value root; // Json::Reader r; // r.parse(in, root); // x = root["x"].asInt(); // y = root["y"].asInt(); // op = root["op"].asInt(); // return true; } void DebugPrint() { std::cout << "新请求构建完成: " << x << op << y << "=?" << std::endl; } };//用户层协议请求回应结构体 class Response{ public: int result; int code; public: Response(int res , int c) : result(res),code(c){} Response(){} public: //请求回应结构体 序列化 成报文正文字符串 "result code" bool Serialize(std::string& out){ std::string s = std::to_string(result); s += blank_space_sep; s += std::to_string(code); out = s; return true; // 等价的jason代码 // Json::Value root; // root["result"] = result; // root["code"] = code; // // Json::FastWriter w; // Json::StyledWriter w; // out = w.write(root); // return true; } //"result code" //报文正文字符串 反序列化 成请求回应结构体 bool Deserialize(const std::string &in) { std::size_t pos = in.find(blank_space_sep); if (pos == std::string::npos)return false; if(pos == 0 || pos == in.size() - 1) return false; result = std::stoi(in.substr(0, pos).c_str()); code = std::stoi(in.substr(pos+1).c_str()); return true; // 等价的jason代码 // Json::Value root; // Json::Reader r; // r.parse(in, root); // result = root["result"].asInt(); // code = root["code"].asInt(); // return true; } void DebugPrint() { std::cout << "结果响应完成, result: " << result << ", code: "<< code << std::endl; } };

通信信道建立模块

#pragma once #include <iostream> #include <string> #include <sys/types.h> #include <sys/socket.h> #include "log.hpp" #include <memory.h> #include <arpa/inet.h> #include <netinet/in.h> namespace MySocket{ //Tcp通讯构建器 class TcpServer{ enum{ UsageError = 1, SocketError, BindError, ListenError, }; private: int socketfd_ = -1; std :: string ip_; uint16_t port_; int backlog_ = 10; public: TcpServer(const std::string& ip = "172.19.29.44", uint16_t port = 8081) : ip_(ip) , port_(port){} ~TcpServer(){if(socketfd_ > 0) close(socketfd_);} public: //确定通信协议,建立文件描述符 void BuildSocket(){ socketfd_ = socket(AF_INET,SOCK_STREAM,0); if(socketfd_ < 0){ lg(Fatal,"socket error,%s\n",strerror(errno)); exit(SocketError); } } //文件描述符与服务器ip : 端口号绑定 void SocketBind(){ struct sockaddr_in addr; memset(&addr,0,sizeof(addr)); addr.sin_port = htons(port_); addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(ip_.c_str()); if(bind(socketfd_,(const sockaddr*)&addr,sizeof(addr)) < 0){ lg(Fatal,"socket bind error,%s\n",strerror(errno)); exit(BindError); } lg(Info,"socket bind success\n"); } //启动服务监听,等待客户端的连接 void Socklisten(){ if(socketfd_ <= 0){ lg(Fatal,"socket error,%s\n",strerror(errno)); exit(SocketError); } if(listen(socketfd_,backlog_) < 0){ lg(Fatal, "listen error, %s: %d", strerror(errno), errno); exit(ListenError); } } //服务器接收客户端的连接-->并创建用于通信的文件描述符-->一个客户端连接对应一个文件描述符 int SockAccept(std::string& cilent_ip, uint16_t& cilent_port){ struct sockaddr_in client_addr; // 输出型参数,用于获取用户的ip : 端口号 memset(&client_addr,0,sizeof(client_addr)); socklen_t Len = sizeof(client_addr); int newfd = accept(socketfd_,(struct sockaddr*)&client_addr,&Len); if(newfd < 0){ lg(Warning, "accept error, %s: %d", strerror(errno), errno); return -1; } //提取客户端信息-->输出参数 char ipstr[64]; cilent_ip = inet_ntop(AF_INET,&client_addr.sin_addr,ipstr,sizeof(ipstr)); cilent_ip = ipstr; cilent_port = ntohs(client_addr.sin_port); return newfd; } public: int Get_Server_fd(){ return socketfd_; } void Close_fd(){ if(socketfd_ > 0){ close(socketfd_); socketfd_ = -1; } } }; };

服务器主体模块

[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第9张

#pragma once #include "ThreadPool.cpp" #include "TcpServer.cpp" #include "CalTask.cpp" #include "log.hpp" #include <signal.h>//构建计算器服务器 class CalServer{ const int size = 2048; private: Thread_Pool<CalTask> * Pool_ptr_; MySocket::TcpServer Socket_; int Socket_fd_ = -1; public: CalServer(const std::string& de_ip = "172.19.29.44",uint16_t de_port = 8081) : Socket_(de_ip,de_port) { Pool_ptr_ = Thread_Pool<CalTask>::Getinstance(); if(Pool_ptr_ == nullptr){ lg(Fatal,"Pool_ptr_ is nullptr\n"); return; } Pool_ptr_->Create_thread(); } ~CalServer(){} public: //建立Tcp连接条件 bool Init(){ Socket_.BuildSocket(); Socket_fd_ = Socket_.Get_Server_fd(); if(Socket_fd_ < 0){ lg(Fatal,"BuildSocket failed\n"); return true; } Socket_.SocketBind(); Socket_.Socklisten(); lg(Info, "init server .... done"); return true; } //启动服务器 void Start(){ signal(SIGCHLD, SIG_IGN); signal(SIGPIPE, SIG_IGN); char ReadBuffer[size]; while(true){ //接受用户请求 std::string client_ip; uint16_t client_port; int client_fd = Socket_.SockAccept(client_ip,client_port); if(client_fd < 0){ lg(Warning,"SockAccept error\n"); continue; } lg(Info, "accept a new link, sockfd: %d, clientip: %s, clientport: %d", client_fd, client_ip.c_str(), client_port); int n = read(client_fd,ReadBuffer,sizeof(ReadBuffer)); ReadBuffer[n] = 0; std::string TaskStr(ReadBuffer); printf("receives mess from client : %s",ReadBuffer); if(n < 0){ lg(Warning,"read error\n"); break; } CalTask task(client_fd,client_ip,client_port,TaskStr); Pool_ptr_->Push(task); } } };

任务回调模块(根据具体应用场景可重构)

#pragma once #include <string> #include "ThreadPool.cpp" #include "Protocol.cpp" enum{ Div_Zero = 1, Mod_Zero, Other_Oper };class CalTask{ private: int socketfd_; //网络通信文件描述符 std :: string ip_; //客户端ip uint16_t port_; //客户端端口号 std::string package_; //客户请求字符串 public: CalTask(int socketfd,const std::string& ip , uint16_t & port,std::string & str) : socketfd_(socketfd),ip_(ip),port_(port),package_(str){} CalTask(){}//类一定要有默认构造函数 ~CalTask(){} public: //执行计算任务并将结果发送给用户 void operator() (){ std::cout << "Task Running ... \n" << std::endl; std::string content; //将用户发送的报文进行解包获取正文 bool r = Decode(package_, content); if (!r)return; //将报文正文进行反序列化 Request req; r = req.Deserialize(content); if (!r)return ; req.DebugPrint(); content = ""; //构建计算结果 Response resp = CalculatorHelper(req); resp.DebugPrint(); //计算结果序列化成字符串 resp.Serialize(content); //字符串正文封装成报文发送给用户 std::string ResStr = Encode(content); write(socketfd_,ResStr.c_str(),ResStr.size()); if(socketfd_ > 0)close(socketfd_); }private: Response CalculatorHelper(const Request &req){ //构建请求回应结构体 Response resp(0, 0); switch (req.op){ case '+': resp.result = req.x + req.y; break; case '-': resp.result = req.x - req.y; break; case '*': resp.result = req.x * req.y; break; case '/':{ if (req.y == 0) resp.code = Div_Zero; else resp.result = req.x / req.y; } break; case '%':{ if (req.y == 0) resp.code = Mod_Zero; else resp.result = req.x % req.y; } break; default: resp.code = Other_Oper; break; } return resp; } };

Tips:DebugC++代码过程中遇到的问题记录

  • 使用

    C++

    类模板时,若在类模板中定义了静态成员函数,且该静态成员函数在类外被调用,则该静态成员函数必须定义在类外,不然链接器无法找到函数体.
  • 注意类模板静态成员的声明格式需要加关键字

    temlpate<>

  • 声明类模板静态成员无需特化模版类型参数
  • 跨主机并发通信测试:
    [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第10张
    [计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构),在这里插入图片描述,第11张

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/yw/13518718.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2024-02-22
下一篇 2024-02-27

发表评论

登录后才能评论

评论列表(0条)

保存