常州模板网站建设咨询,广点通广告平台,模板网站开发,招投标网站开发记录下
一个线程专门用来接受accept获取客户端的fd
获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程
然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd
线程之间通过eventfd来通信 将客户端的fd传到 对应的线程中
参考了MediaServer 引入…记录下
一个线程专门用来接受accept获取客户端的fd
获取fd之后 从剩余的执行线程中 找到一个连接客户端数量最少的线程
然后将客户端的fd加入到这个线程中并通过EPOLL监听这个fd
线程之间通过eventfd来通信 将客户端的fd传到 对应的线程中
参考了MediaServer 引入EventPollerPoll 和 EventPoller的 概念 最少两个两个线程 设置为1的话 会改成2 cpp代码
#include durian.h#include sys/epoll.hnamespace DURIAN
{EventPoller::EventPoller(int id){m_id id;}EventPoller::~EventPoller(){printf(~EventPoller signal m_id %d m_run_flag %d\n,m_id,m_run_flag);Wait();}bool EventPoller::Init(){m_poll_fd epoll_create1(0);if(m_poll_fd -1){return false;}m_event_fd eventfd(0,0);if(m_event_fd -1){printf(new fd failed\n);close(m_poll_fd);return false ;}return true;}void EventPoller::RunLoop(){static const int MAX_EVENTS 1024;struct epoll_event events[MAX_EVENTS];while(m_run_flag){int ready_count epoll_wait(m_poll_fd,events,MAX_EVENTS,2000);if(ready_count -1){if(errno ! EINTR){//exit(1);}//ready_count 0;}else if(ready_count 0){if(m_run_flag false){//printf(time out and runflag false exit thread\n);//break;}}for(int i 0;iready_count;i){const struct epoll_event ev events[i];int fd events[i].data.fd;if(ev.events (EPOLLIN | EPOLLERR |EPOLLHUP)){auto handler m_accept_handlers[fd];handler(fd);}else if(ev.events (EPOLLOUT | EPOLLERR | EPOLLHUP)){auto it m_buffer_pool.find(fd);if(it! m_buffer_pool.end()){auto buf it-second;if(buf.WriteData(fd) false){Close(fd);}}}}}}int EventPoller::GetEventFD(){return m_event_fd;}int EventPoller::GetClients(){return m_accept_handlers.size();}void EventPoller::Stop(){m_run_flag false;}void EventPoller::Start(){//printf(Enter EventPoller Start m_id %d pollfd %d eventid %d\n,m_id,m_poll_fd,m_event_fd);m_run_flag true;m_thread_id std::thread(EventPoller::RunLoop,this);}void EventPoller::Wait(){if(m_thread_id.joinable()){m_thread_id.join();}}bool EventPoller::Add2Epoll(int fd){if(m_accept_handlers.count(fd) ! 0){return false;}int flags 1;if(ioctl(fd,FIONBIO,flags) -1){return false;}struct epoll_event ev;ev.events EPOLLIN |EPOLLOUT |EPOLLET;ev.data.fd fd;if(epoll_ctl(m_poll_fd,EPOLL_CTL_ADD,fd,ev)-1){return false;}return true;}void EventPoller::DeliverConn(int conn_fd){//printf(DeliverConn fd %d\n,conn_fd);uint64_t count conn_fd;if(write(m_event_fd,count,sizeof(count)) -1){printf(Deliverconn write failed\n);}}bool EventPoller::AddListener(int fd,ACCEPTER on_accept){if(Add2Epoll(fd) false){return false;}std::coutEventPoller AddListener fd fdstd::endl;m_accept_handlers[fd] [this,on_accept]( int server_fd){for(;;){int new_fd accept(server_fd,nullptr,nullptr);std::coutaccept client fd new_fdstd::endl; if(new_fd -1){if(errno! EAGAIN){Close(server_fd);}return 0;}int enable 1;setsockopt(new_fd,IPPROTO_TCP,TCP_NODELAY,enable,sizeof(enable));on_accept(new_fd);}return 0;};return true;}bool EventPoller::AddEventer(int fd, EVENTER on_event){if(Add2Epoll(fd) false){return false;}m_accept_handlers[fd] [this,on_event](int cfd){for(;;){uint64_t count;if(read(cfd,count,sizeof(count)) -1){if(errno ! EAGAIN){Close(cfd);}return 0;}on_event(count);}return 0;};return true;}bool EventPoller::AddReader(int fd, READER on_read){ if(Add2Epoll(fd) false){return false;}m_accept_handlers[fd] [this,on_read](int cfd){for(;;){char buf[4096] {0};ssize_t ret read(cfd,buf,sizeof(buf));if(ret -1){if(errno ! EAGAIN){Close(cfd);}return -1;}if(ret 0){Close(cfd);printf(客户端关闭了连接 %d\n,cfd);return 0 ;}on_read(cfd,buf,ret);}};return true;}void EventPoller::Close(int fd){m_accept_handlers.erase(fd);m_buffer_pool.erase(fd);close(fd);}bool EventPoller::FlushData(int fd, const char * buf, size_t len){WriteBuffer *wb nullptr;auto it m_buffer_pool.find(fd);if(it m_buffer_pool.end()){while(len 0){ssize_t ret write(fd,buf,len);if(ret -1){if(errno ! EAGAIN){Close(fd);return false;}wb m_buffer_pool[fd];break;}buf ret;len-ret;}if(len 0){//Successreturn true;}}else{wb it-second;}wb-Add2Buffer(buf,len);return true;}static size_t g_pool_size 0;
void EventPollerPool::SetPoolSize(size_t size)
{g_pool_size size;
}
EventPollerPool EventPollerPool::Instance()
{static std::shared_ptrEventPollerPool s_instance(new EventPollerPool()); static EventPollerPool s_instance_ref *s_instance; return s_instance_ref;
}EventPollerPool::EventPollerPool()
{auto size g_pool_size;auto cpus std::thread::hardware_concurrency();size size 0 ? size : cpus;std::coutThread size:sizestd::endl;if(size 2)size 2;for (int i 0; i size; i) {std::shared_ptrEventPoller poller std::make_sharedEventPoller(i);m_pollers.emplace_back(poller);}
}std::shared_ptrEventPoller EventPollerPool::GetPoller()
{if(m_pollers.size()1){int min_clients 10000;int target_index 0;for(int i 1;im_pollers.size();i){if(m_pollers[i]- GetClients() min_clients){min_clients m_pollers[i]-GetClients();target_index i;}}//printf(target index %d min_clients %d\n,target_index,min_clients);return m_pollers[target_index];}return m_pollers[0];}
std::shared_ptrEventPoller EventPollerPool::GetFirstPoller()
{return m_pollers[0];
}void EventPollerPool::StartPollers()
{for(int i 1;im_pollers.size();i){m_pollers[i]-Init();int event_fd m_pollers[i]-GetEventFD();m_pollers[i]-AddEventer(event_fd,[,i](uint64_t cfd){READER reader [,i](int fd,const char*data,size_t len){printf(Len[%s] content[%d] m_pollers[i] %p i %d\n,data,len,m_pollers[i],i);m_pollers[i]-FlushData(fd,data,len);return 0;};m_pollers[i]-AddReader(cfd,reader);return 0;});m_pollers[i]-Start(); }
}void EventPollerPool::Stop()
{for(int i 0;im_pollers.size();i){m_pollers[i]-Stop();}}}头文件 #include string.h
#include unistd.h
#include sys/ioctl.h
#include sys/socket.h
#include netinet/in.h
#include errno.h
#include netinet/tcp.h#include sys/eventfd.h
#include signal.h#include iostream
#include memory
#include list
#include vector
#include functional
#include thread
#include mutex#include unordered_mapnamespace DURIAN
{class WriteBuffer{private:std::liststd::string buf_items;size_t offset 0;public:bool IsEmpty() const{return buf_items.empty();}void Add2Buffer(const char* data,size_t len){if(buf_items.empty() || buf_items.back().size()len 4096){buf_items.emplace_back(data,len);}else{buf_items.back().append(data,len);}}bool WriteData(int fd){while (IsEmpty() false){auto const item buf_items.front();const char *p item.data() offset;size_t len item.size() -offset;while(len 0){ssize_t ret write(fd,p,len);if(ret -1){if(errno EAGAIN){return true;}return false;}offset ret;pret;len- ret;}buf_items.pop_front();}return true;}};using ACCEPTER std::functionint(int);using WRITER std::functionint(int);using EVENTER std::functionint(int);using READER std::functionint(int,const char *data,size_t);//static thread_local std::unordered_mapint fd,READERg_th_handlers;class EventPoller{private:int m_poll_fd -1;int m_id;bool m_run_flag false;std::unordered_mapint,ACCEPTER m_accept_handlers;std::unordered_mapint,WriteBuffer m_buffer_pool;std::mutex m_connction_lock;int m_event_fd;std::thread m_thread_id ;std::vectorintm_connections;void RunLoop();public:EventPoller(int i);~EventPoller();int GetEventFD();int GetClients();std::vectorint GetConnections();bool Init();void Start();void Stop();void Wait(); void DeliverConn(int conn_fd);bool AddListener(int fd,ACCEPTER on_listen);bool AddEventer(int fd,EVENTER on_event);bool AddReader(int fd,READER on_read);void Close(int fd);bool Add2Epoll(int fd);bool FlushData(int fd,const char *buf,size_t len);};class EventPollerPool{public:static EventPollerPool Instance();static void SetPoolSize(size_t size 0);std::shared_ptrEventPollerGetPoller(); std::shared_ptrEventPollerGetFirstPoller(); void StartPollers();void Stop(); private:int m_size;std::vectorstd::shared_ptrEventPoller m_pollers;EventPollerPool(); };} main文件 #include durian.hstatic bool g_run_flag true;
void sig_handler(int signo)
{signal(SIGINT, SIG_IGN);signal(SIGTERM, SIG_IGN);signal(SIGKILL, SIG_IGN);g_run_flag false;printf(Get exit flag\n);if (SIGINT signo || SIGTSTP signo || SIGTERM signo|| SIGKILL signo){g_run_flag false;printf(\033[0;31mprogram exit by kill cmd !\033[0;39m\n);}}bool StartServer()
{int listen_fd socket(AF_INET,SOCK_STREAM,0);if(listen_fd -1){printf(Create socket failed\n);return false;}else{printf(Server listen fd is:%d\n,listen_fd);}int reuseaddr 1;if(setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,reuseaddr ,sizeof(reuseaddr)) -1){return false;}struct sockaddr_in listen_addr {0};listen_addr.sin_family AF_INET;listen_addr.sin_addr.s_addr INADDR_ANY;listen_addr.sin_port htons(8888);if(bind(listen_fd,(struct sockaddr*)listen_addr,sizeof(listen_addr)) -1){printf(bind failed\n);return false;}if(listen(listen_fd,100) -1){printf(listen failed\n);return false;}DURIAN::EventPollerPool::SetPoolSize(1);DURIAN::EventPollerPool pool DURIAN::EventPollerPool::Instance(); pool.StartPollers();auto poller pool.GetFirstPoller(); if(poller-Init()){if(poller-AddListener(listen_fd,[](int conn_fd){printf(将新的fd加到epoll监听 fd %d\n,conn_fd);//Deliver client fd to other pollerspool.GetPoller()-DeliverConn(conn_fd);return 0;}) false){return false;}poller-Start();}while(g_run_flag){sleep(2);}pool.Stop();}void StopServer()
{DURIAN::EventPollerPool pool DURIAN::EventPollerPool::Instance(); pool.Stop();
}int main(int argc,char *argv[])
{printf( cpp version :%d\n,__cplusplus);int thread_size 1;bool run_flag true;signal(SIGPIPE,SIG_IGN);signal(SIGTERM, sig_handler);signal(SIGKILL, sig_handler);signal(SIGINT,sig_handler); StartServer();return 0;
} 性能测试
ulimit -HSn 102400
ab -n 100000 -c 20000 http://192.168.131.131:8888/index.html