贵港公司做网站,网站建设费 大创,电子元器件商城,化妆品瓶子怎么做网站TinyWebServer 文章目录 TinyWebServer1 主体框架2 Buffer2.1 向Buffer写入数据2.2 从Buffer读取数据2.3 动态扩容2.4 从socket中读取数据2.5 具体实现 3 日志系统3.1 生产者-消费者模型3.2 数据一致3.3 代码 4 定时器4.1 调整堆中元素操作4.2 堆的操作4.2.1 增4.2.2 删4.2.3 改…TinyWebServer 文章目录 TinyWebServer1 主体框架2 Buffer2.1 向Buffer写入数据2.2 从Buffer读取数据2.3 动态扩容2.4 从socket中读取数据2.5 具体实现 3 日志系统3.1 生产者-消费者模型3.2 数据一致3.3 代码 4 定时器4.1 调整堆中元素操作4.2 堆的操作4.2.1 增4.2.2 删4.2.3 改4.2.4 查 4.3 代码 5 线程池5.1 代码 6 数据库连接池6.1 RAII6.2 代码 7 HTTP层处理7.1 HTTP解析7.1.1 代码 7.2 HTTP响应7.2.1 代码 7.3 HTTP处理7.3.1 代码 8 Server层处理8.1 代码 9 压力测试9.1 ET模式9.2 LT模式9.3 测试环境 10 运行说明10.1 数据库初始化10.2 导入mysql.h10.3 编译运行 11 致谢 1 主体框架
客户端如果想和服务器通信首先要和服务器建立TCP连接然后发送HTTP请求。服务端接收并处理HTTP请求然后发送HTTP响应。 服务器采用单Reactor多线程模式主线程使用IO多路复用接口监听事件收到事件后将其分发。连接事件直接处理而读写事件由线程池负责处理。 Reactor模式介绍https://xiaolincoding.com/os/8_network_system/reactor.html#单-reactor-多线程-多进程 项目主要包含以下模块
Server层-基于EPOLL的I/O多路复用和Reactor网络模式HTTP处理层-解析HTTP请求并处理生成返回的HTTP响应日志系统线程池数据库连接池定时器缓冲区-暂存缓冲数据读写socket TinyWebServer
├── build
│ ├── bin
│ │ └── server
│ └── Makefile
├── CMakeLists.txt
├── config.ini
├── log
├── Makefile
├── resources
├── webbench-1.5
│ ├── Makefile
│ ├── socket.c
│ ├── webbench
│ ├── webbench.c
│ └── webbench.o
└── webserver├── buffer│ ├── Buffer.cpp│ └── Buffer.h├── http│ ├── HttpResponse.cpp│ ├── HttpResponse.h│ ├── HttpWork.cpp│ ├── HttpWork.h│ ├── ParseHttpRequest.cpp│ └── ParseHttpRequest.h├── lib│ └── inih-r58│ ├── cpp│ │ ├── INIReader.cpp│ │ └── INIReader.h│ ├── ini.c│ └── ini.h├── log│ ├── Log.cpp│ ├── Log.h│ ├── LogLevel.h│ └── LogQueue.h├── main.cpp├── pool│ ├── sqlconnpool.cpp│ ├── sqlconnpool.h│ ├── sqlconnRAII.h│ ├── ThreadPool.cpp│ └── ThreadPool.h├── server│ ├── Epoll.cpp│ ├── Epoll.h│ ├── Server.cpp│ └── Server.h├── timer│ ├── Timer.cpp│ └── Timer.h└── utils└── Utils.h2 Buffer
为了高效便捷的实现数据的存取我们自定义了一个缓冲区数据结构其主要功能是实现数据的读取、写入和空间自增。下图是缓冲区的结构图我们使用vectorchar作为最基本的数据结构实现了一个队列结构的缓冲区并定义了三个指针分别是头指针、读指针和写指针这里的指针都使用下标代替并非真正的指针。 2.1 向Buffer写入数据
写指针被初始化为0指向vector的首元素。写入数据时需要提供待写入字符串的首地址和长度。待写入字符串被拷贝到写指针指向的位置此时可能会出现空间不够的情况。 待写入数据长度小于等于预留区域和空闲区域长度之和 将数据区域搬运到头指针指向的位置并更新读指针和写指针 待写入数据长度大于预留区域和空闲区域长度之和 利用vector的动态扩容机制增大缓冲区长度。
2.2 从Buffer读取数据
读指针被初始化为0从读指针开始读取字符串直到写指针为止。
2.3 动态扩容
当预留区域和空闲区域加在一起也不够写下新的数据时需要对缓冲区进行扩容。vector.resize()函数将被调用从而实现了扩充容量。
2.4 从socket中读取数据
由于从socket读取的数据长度未知直接向buffer中写入的数据的长度可能会超过vector的最大容量而引发错误而增大buffer的初始容量又会浪费资源。因此可以借用一个大容量的栈区作为缓冲。buffer和栈区同时接收数据之后再将栈区中的数据写入buffer中这样便可巧妙地解决问题。
2.5 具体实现
#ifndef TINYWEBSERVER_BUFFER_H
#define TINYWEBSERVER_BUFFER_H
#include vector
#include atomic // atomic
#include sys/uio.h // iovec readv
#include cstring // errno
#include iostream
#include cassert // assert
#include unistd.h // writeclass Buffer {
private:std::vectorchar buffer_;std::atomicsize_t readIdx_;std::atomicsize_t writeIdx_;int STACK_LEN;public:explicit Buffer(int init_size1024, int stack_len4096);~Buffer()default;size_t getContentLen();size_t getBufferLen();size_t getLeftLen();size_t getRealLeftLen();char* getReadPtr();const char *getConstReadPtr();char* getWritePtr();ssize_t readFd(int fd, int* Errno);ssize_t writeFd(int fd, int* Errno);void append(const char* str, size_t len);void append(const std::string str);void addWriteIdx(size_t len);void addReadIdx(size_t len);void addReadIdxUntil(const char* ed);// memset缓冲区void resetBuffer();std::string getStringAndReset();
private:char* getBeginPtr();bool confirmSpace(size_t len);
};#endif //TINYWEBSERVER_BUFFER_H
#include Buffer.hBuffer::Buffer(int init_size, int stack_len):buffer_(init_size), readIdx_(0), writeIdx_(0), STACK_LEN(stack_len){}// buffer中数据长度
size_t Buffer:: getContentLen() {return writeIdx_ - readIdx_;
}
// buffer的实际长度
size_t Buffer::getBufferLen() {return buffer_.size();
}
// 返回buffer的剩余空间
size_t Buffer::getLeftLen() {return getBufferLen() - writeIdx_;
}
// 返回buffer包含预留空间真正剩下的空间
size_t Buffer::getRealLeftLen() {return getBufferLen() - (writeIdx_ - readIdx_);
}
// 返回读指针
char *Buffer::getReadPtr() {return buffer_[readIdx_];
}
//返回写指针
char *Buffer::getWritePtr() {return buffer_[writeIdx_];
}// 返回buffer首元素的指针
char* Buffer::getBeginPtr() {return buffer_[0];
}// 移动写指针
void Buffer::addWriteIdx(size_t len) {writeIdx_ len;
}
// 移动读指针
void Buffer::addReadIdx(size_t len) {assert(len getContentLen());readIdx_ len;
}// 增加读指针移到ed位置
void Buffer::addReadIdxUntil(const char *ed) {assert(getReadPtr() ed ed getWritePtr());addReadIdx(ed - getReadPtr());
}ssize_t Buffer::readFd(int fd, int* Errno) {char stack_buf[STACK_LEN];iovec iv[2];size_t leftLen getLeftLen();iv[0].iov_base getWritePtr();iv[0].iov_len leftLen;iv[1].iov_base stack_buf;iv[1].iov_len STACK_LEN;ssize_t len readv(fd, iv, 2);if (len 0) {// 记录报错信息*Errno errno;return len;}// 刚好将buffer填满else if (static_castsize_t(len) leftLen) {addWriteIdx(len);}// 读入的数据超过bufferelse {writeIdx_ getBufferLen();// 将栈区的数据复制到buffer中append(stack_buf, len - static_castssize_t(leftLen));}return len;
}ssize_t Buffer::writeFd(int fd, int* Errno) {ssize_t len write(fd, getReadPtr(), getContentLen());if (len 0) {*Errno errno;return len;}addReadIdx(len);return len;
}// 添加char[]到buffer中
void Buffer::append(const char* str, size_t len) {assert(str);confirmSpace(len);std::copy(str, strlen, getWritePtr());addWriteIdx(len);
}
// 添加string到buffer中
void Buffer::append(const std::string str) {append(str.c_str(), str.length());
}// 将buffer中内容转换成string并清空buffer
std::string Buffer::getStringAndReset() {std::string str(getReadPtr(), getWritePtr());resetBuffer();return str;
}
// 重置buffer
void Buffer::resetBuffer() {writeIdx_ 0;readIdx_ 0;memset(buffer_[0], 0, buffer_.size());
}// 分配空间扩容
bool Buffer::confirmSpace(size_t len) {// 剩余空间能够满足写入lenif (getLeftLen() len) {return false;}// 不够Len但是能够借用预留空间满足要求else if (getRealLeftLen() len) {auto contentLen getContentLen();std::copy(getBeginPtr() readIdx_, getBeginPtr() writeIdx_, getBeginPtr());readIdx_ 0;writeIdx_ contentLen;assert(contentLen getContentLen());}// 即使挪动也不够空间需要对vector扩容else {buffer_.resize(writeIdx_ len 1);}assert(getLeftLen() len);return true;
}const char *Buffer::getConstReadPtr() {return buffer_[readIdx_];
}3 日志系统
日志系统是实现整个webserver项目的首要前提利用日志可以方便地调试代码记录输出。
为了使输出的日志清晰明了日志信息被划分成了不同的等级
DEBUGINFOWARNERRORFATAL
服务器初始化时提供了一个日志等级的参数只有大于等于该等级的日志条目才会输出。
该日志系统主要使用异步方式写入日志信息由一个队列负责维护要输出的日志信息。其他线程要打印日志时调用函数将内容插入到队列中日志输出线程负责从队列中取出日志信息并将其写入日志文件中。
3.1 生产者-消费者模型
在上述工作流程中其他线程和输出线程构成一个生产者-消费者模型。其他线程在队列不满的情况下插入日志信息并通知日志输出线程取出日志信息否则挂起等待而日志输出线程在队列不空的情况从队列中取出日志信息并通知其他线程插入日志信息否则挂起等待。
为了同步其他线程和日志输出线程可以使用条件变量。
3.2 数据一致
由于该日志系统会被其他不同的线程调用需要保证同一时间只有一个线程访问日志队列。可能会出现以下竞态情况
日志队列中插入日志和取出日志时对日志队列的访问其他线程要插入日志时会在buffer内构造日志信息 3.3 代码
#ifndef TINYWEBSERVER_LOG_H
#define TINYWEBSERVER_LOG_H#include string
#include thread
#include mutex
#include cstdarg
#include sys/time.h
#include ../buffer/Buffer.h
#include LogQueue.h
#include ../utils/Utils.h
#include LogLevel.h// 日志输出位置
enum LogTarget {LOG_TARGET_NONE 0,LOG_TARGET_CONSOLE 1,LOG_TARGET_FILE 2
};class Log {
private:const char* saveDir_; // 日志存储路径char* filename_; // 初始化提供的文件名const char* suffix_; // 日志文件名后缀std::unique_ptrLogQueuestd::string log_Queue_; // 日志队列std::unique_ptrstd::thread workThread_; // 处理写日志的线程FILE *fp_; // 日志文件描述符LogTarget target_; // 日志文件输出位置LogLevel::value logLevel_; // 日志级别std::mutex mtx_;Buffer buf_; // 缓冲区bool isRun_;unsigned long long logCnt;bool isAsync_;static const int MAX_LINES 50000;static const size_t MAX_FILENAME_LEN 50; // 最大文件名限制
public:void init(LogTarget target, const char* save_dir, const char* suffix, LogLevel::value logLevel,size_t maxQueueSize 1024); // 初始化日志系统static void asyncWriteLogThread(); // 工作线程将日志异步写入文件的函数bool initLogFile(); // 初始化日志文件// 外部调用接口输出不同类型的日志信息void addLog(LogLevel::value type, const char *format, ...);bool isRun() const { return isRun_; }// 外部获取实例的接口static Log* getInstance();LogLevel::value getLevel() { return logLevel_; };void flush();void AsyncWrite_();LogTarget getTarget() {return target_;}private:Log();~Log();void setEntryTime(); // 设置日志条目时间头void setEntryType(LogLevel::value t); // 设置日志条目类型void setEntryMsg(const std::string msg);void appendEntry(const std::string entry);
};#define LOG_BASE(level, format, ...) \do {\Log* l Log::getInstance();\if (l-isRun() l-getLevel() level l-getTarget() ! LOG_TARGET_NONE) {\l-addLog(level, format, ##__VA_ARGS__); \l-flush();\}\} while(0);#define LOG_DEBUG(format, ...) do {LOG_BASE(LogLevel::value::DEBUG, format, ##__VA_ARGS__)} while(0);
#define LOG_INFO(format, ...) do {LOG_BASE(LogLevel::value::INFO, format, ##__VA_ARGS__)} while(0);
#define LOG_WARN(format, ...) do {LOG_BASE(LogLevel::value::WARN, format, ##__VA_ARGS__)} while(0);
#define LOG_ERROR(format, ...) do {LOG_BASE(LogLevel::value::ERROR, format, ##__VA_ARGS__)} while(0);
#define LOG_FATAL(format, ...) do {LOG_BASE(LogLevel::value::FATAL, format, ##__VA_ARGS__)} while(0);
#endif //TINYWEBSERVER_LOG_H
#include Log.hLog::Log() {saveDir_ nullptr;filename_ nullptr;suffix_ nullptr;log_Queue_ nullptr;workThread_ nullptr;isRun_ true;fp_ nullptr;target_ LOG_TARGET_CONSOLE;logCnt 0;
}Log::~Log() {printf(close logging...\n);if (log_Queue_-size()) {sleep(2);}isRun_ false;fflush(fp_);if (fp_ ! nullptr) {fclose(fp_);fp_ nullptr;}delete[] filename_;
}Log* Log::getInstance() {static Log log_;return log_;
}void
Log::init(LogTarget target, const char *save_dir, const char *suffix, LogLevel::value logLevel,size_t maxQueueSize) {saveDir_ save_dir;suffix_ suffix;logLevel_ logLevel;filename_ new char(MAX_FILENAME_LEN);target_ target;if (maxQueueSize 0) {isAsync_ true;if (!log_Queue_) {std::unique_ptrLogQueuestd::string q(new LogQueuestd::string(maxQueueSize));log_Queue_ std::move(q);std::unique_ptrstd::thread t(new std::thread(asyncWriteLogThread));workThread_ std::move(t);}} else {isAsync_ false;}if (!initLogFile()) {printf(start loging failed...\n);return ;}
}void Log::asyncWriteLogThread() {Log::getInstance()-AsyncWrite_();
}bool Log::initLogFile() {if (target_ LOG_TARGET_CONSOLE) {fp_ stdout;} else if (target_ LOG_TARGET_FILE){char time_str[25];util::Date::getDateTimeByFormat(time_str, 25, %Y_%m_%d_%H_%M_%S);snprintf(filename_, MAX_FILENAME_LEN, %s%s, time_str, suffix_);fp_ util::File::createFile(saveDir_, filename_);if (fp_ nullptr) {return false;}} else {fp_ nullptr;return true;}printf(start logging...\n);return true;
}void Log::setEntryTime() {char time_str[25];util::Date::getDateTime(time_str, 25);size_t str_len strlen(time_str);time_str[str_len] ;buf_.append(time_str, str_len 1); // 追加空格
}void Log::setEntryType(LogLevel::value t) {buf_.append(LogLevel::toString(t) std::string( ));
}void Log::setEntryMsg(const std::string msg) {buf_.append(msg std::string(\n));
}void Log::appendEntry(const std::string entry) {log_Queue_-push(entry);
}void Log::flush() {if (isAsync_) {log_Queue_-flush();}fflush(fp_);
}void Log::addLog(LogLevel::value type, const char *format, ...) {struct timeval now {0, 0};gettimeofday(now, nullptr);time_t tSec now.tv_sec;struct tm *sysTime localtime(tSec);struct tm t *sysTime;va_list vaList;// 向buf_中添加数据如果多线程访问需要确保只有一个线程访问buf_std::unique_lockstd::mutex locker(mtx_);int n snprintf(buf_.getWritePtr(), 128, %d-%02d-%02d %02d:%02d:%02d.%06ld ,t.tm_year 1900, t.tm_mon 1, t.tm_mday,t.tm_hour, t.tm_min, t.tm_sec, now.tv_usec);buf_.addWriteIdx(n);setEntryType(type);va_start(vaList, format);int m vsnprintf(buf_.getWritePtr(), buf_.getLeftLen(), format, vaList);va_end(vaList);buf_.addWriteIdx(m);buf_.append(\n\0, 2);if (isAsync_ log_Queue_ !log_Queue_-full()) {log_Queue_-push(buf_.getStringAndReset());} else {fputs(buf_.getReadPtr(), fp_); // 这一部分不确定作用}
}void Log::AsyncWrite_() {std::string str;while (log_Queue_-pop(str)) {std::lock_guardstd::mutex locker(mtx_);fputs(str.c_str(), fp_);}
}
#ifndef TINYWEBSERVER_LOGQUEUE_H
#define TINYWEBSERVER_LOGQUEUE_H#include queue
#include cassert
#include mutex
#include condition_variable
template typename T
class LogQueue {
private:std::dequeT log;size_t capacity;bool deleted;std::mutex mtx_;std::condition_variable consumer_cv;std::condition_variable producer_cv;
public:explicit LogQueue(size_t c);~LogQueue();void push(const T data);bool pop(T item);size_t size();bool empty();void onDelete();bool full();void flush();
};templatetypename T
void LogQueueT::flush() {consumer_cv.notify_one();
}templatetypename T
bool LogQueueT::full() {std::lock_guardstd::mutex locker(mtx_);return log.size() capacity;
}templatetypename T
LogQueueT::LogQueue(size_t c): capacity(c), deleted(false) {assert(c 0);
}templatetypename T
LogQueueT::~LogQueue() {onDelete();
}templatetypename T
void LogQueueT::onDelete() {{std::lock_guardstd::mutex locker(mtx_);deleted true;log.clear();}consumer_cv.notify_one();producer_cv.notify_one();
}templatetypename T
bool LogQueueT::empty() {std::lock_guardstd::mutex locker(mtx_);return log.empty();
}templatetypename T
size_t LogQueueT::size() {std::lock_guardstd::mutex locker(mtx_);return log.size();
}
// 消费者读取日志
templatetypename T
bool LogQueueT::pop(T item) {std::unique_lockstd::mutex locker(mtx_);while(log.empty()) {consumer_cv.wait(locker);if (deleted) {return false;}}item log.front();log.pop_front();producer_cv.notify_one();return true;
}// 生产者插入日志
templatetypename T
void LogQueueT::push(const T data) {std::unique_lockstd::mutex locker(mtx_);// 直至log.size() capacity 缓冲区未满while(log.size() capacity) {producer_cv.wait(locker);}log.push_back(data);consumer_cv.notify_one();
}#endif //TINYWEBSERVER_LOGQUEUE_H
#ifndef TINYWEBSERVER_LOGLEVEL_H
#define TINYWEBSERVER_LOGLEVEL_H
class LogLevel
{
public:enum class value{UNKNOWN 0,DEBUG,INFO,WARN,ERROR,FATAL};static const char *toString(value level){switch (level){case LogLevel::value::DEBUG: return [DEBUG]:;case LogLevel::value::INFO: return [INFO] :;case LogLevel::value::WARN: return [WARN] :;case LogLevel::value::ERROR: return [ERROR]:;case LogLevel::value::FATAL: return [FATAL]:;case LogLevel::value::OFF: return [OFF] :;default: return UNKNOW;}}
};
#endif //TINYWEBSERVER_LOGLEVEL_H
4 定时器
客户端和服务器建立TCP连接后客户端可能会不再发送数据此时需要断开连接释放资源。在服务器初始化时指定超时时间当客户端和服务器之间未发生通信的时间超过超时时间后便关闭二者的连接。
定时器的数据结构基于小跟堆实现堆顶是距离过期最近的连接。当客户端连接服务器后向定时器内插入超时关闭连接事件。每当服务器收到来自客户端的请求时更新定时器内的超时时间。当某个连接超时时将其从堆顶取出执行关闭连接回调函数。
4.1 调整堆中元素操作
定时器的堆基于vector动态扩容数组实现。以下定义了两个调整堆中元素的操作
向上调整将某个结点不断地与其父结点比较交换直到不能交换为止向下调整将某个结点不断地与其子结点比较交换直到不能交换为止
4.2 堆的操作
4.2.1 增
向堆中插入元素可以现将新插入的元素放入vector最后位置并执行向上调整操作。
4.2.2 删
将堆顶元素与vector最后一个元素交换位置并将记录元素个数的变量递减然后执行向上调整操作。
4.2.3 改
利用元素与堆中下标的映射数组找到在堆中的位置然后分别执行向上调整和向下调整操作。
4.2.4 查
取出堆顶元素。
4.3 代码
#ifndef TINYWEBSERVER_TIMER_H
#define TINYWEBSERVER_TIMER_H
#include vector
#include functional
#include cassert
#include chrono
#include unordered_map
#include algorithm
#include iostream
#include atomic
#include ../log/Log.h
typedef std::functionvoid() TimerCallback;
typedef std::chrono::high_resolution_clock Clock;
typedef std::chrono::milliseconds MS;
typedef Clock::time_point TimeStamp;// 定时器结点
struct TimerNode {int id_; // 定时器idTimeStamp expires_; // 定时器过期时间点TimerCallback cb_; // 回调函数bool operator(const TimerNode tn) const {return expires_ tn.expires_;}bool operator(const TimerNode tn) const {return expires_ tn.expires_;}
};// 堆定时器存储定时事件
class Timer {
private:// 定时器堆采用vector数组方式存储std::vectorTimerNode heap_;std::unordered_mapint, size_t ref_; // 从id到heap中的下标映射方便直接操作某个nodestd::atomicsize_t si_{};private:void up(size_t u); // 将某个结点向上调整的操作void down(size_t u); // 将某个结点向下调整的操作void pop(); // 删除堆顶的结点TimerNode top(); // 获得堆顶的结点void del(size_t i); // 删除下标为i的结点 并执行回调函数void swap_(size_t t1, size_t t2); // 交换两个下标的位置public:explicit Timer(size_t cnt);~Timer();int getNextTick(); // 返回最近的定时器事件超时的间隔时间void reset(int id, int timeout); // 重新设置某个结点的过期时间void reset(int id, int timeout, TimerCallback cb); // 重设某个结点的过期时间和回调函数void execCb(int id); // 执行id的回调函数void tick(); // 处理堆中过期的定时器void push(int id, int timeout, const TimerCallback cb);bool empty();
};#endif //TINYWEBSERVER_TIMER_H
#include Timer.h#include utilityvoid Timer::up(size_t u) {while(u ! 1 heap_[u] heap_[u/2]) {swap_(u, u / 2);u / 2;}
}
// 从1开始 1 2 3 4 ... 1是根结点
void Timer::down(size_t u) {size_t t u;if (u * 2 si_ heap_[u*2] heap_[t]) t u * 2;if (u * 2 1 si_ heap_[u * 2 1] heap_[t]) t u * 2 1;if (t ! u) {swap_(u, t);down(t);}
}void Timer::swap_(size_t t1, size_t t2) {assert(t1 1 t1 heap_.size());assert(t2 1 t2 heap_.size());std::swap(heap_[t1], heap_[t2]);ref_[heap_[t1].id_] t1;ref_[heap_[t2].id_] t2;
}
// 删除顶部结点
void Timer::pop() {assert(si_ 0);del(1);
}TimerNode Timer::top() {return heap_[1];
}// 删除给定结点i将其和最后一个结点交换之后执行up和down操作
void Timer::del(size_t i) {assert(i 0 i si_);swap_(i, si_);-- si_;ref_.erase(heap_.back().id_);heap_.pop_back();up(i);down(i);
}
// 执行id的回调函数
void Timer::execCb(int id) {if (si_ 0 || ref_.count(id) 0) {return ;}auto idx ref_[id];auto node heap_[idx];node.cb_();del(idx);
}void Timer::push(int id, int timeout, const TimerCallback cb) {assert(id 0);if (ref_.count(id)) {auto idx ref_[id];auto node heap_[idx];node.expires_ Clock::now() MS(timeout);node.cb_ cb;up(idx);down(idx);} else {LOG_INFO(增加计时时间 id: %d timeout: %d, id, timeout);heap_.push_back({id, MS(timeout) Clock::now(), cb}); si_;ref_[id] si_;up(si_);}
}Timer::Timer(size_t cnt) {
// Log::INFO(%s, Timer start...);heap_.reserve(cnt 1);heap_.emplace_back();si_ 0;
}void Timer::reset(int id, int timeout, TimerCallback cb) {assert(id 0);auto idx ref_[id];auto node heap_[idx];node.expires_ Clock::now() MS(timeout);node.cb_ cb;down(idx);up(idx);
}void Timer::reset(int id, int timeout) {assert(id 0);auto idx ref_[id];auto node heap_[idx];node.expires_ Clock::now() MS(timeout);down(idx);up(idx);
}void Timer::tick() {while(si_) {auto node top();if (std::chrono::duration_castMS(node.expires_ - Clock::now()).count() 0)break;LOG_INFO(timer %d is expired, node.id_);node.cb_();pop();}
}bool Timer::empty() {return si_ 0;
}int Timer::getNextTick() {tick();size_t res -1;if (si_) {res std::chrono::duration_castMS(top().expires_ - Clock::now()).count();if (res 0) {res 0;}}return res;
}Timer::~Timer() {heap_.clear();ref_.clear();
}
5 线程池
由于线程的创建和销毁需要开销频繁创建和销毁线程会影响服务器的性能。因此服务器维护一个预先创建好的线程池。每当任务队列中有任务时某个线程将其从中取出并执行。执行完成后继续等待任务。 在这里使用一个条件变量当任务队列为空时阻塞线程当有任务插入队列时通知线程执行任务。
5.1 代码
#ifndef TINYWEBSERVER_THREADPOOL_H
#define TINYWEBSERVER_THREADPOOL_H
#include queue
#include functional
#include mutex
#include cassert
#include thread
#include condition_variable
#include unistd.h
#include ../log/Log.h
class ThreadPool {
private:struct Pool {std::mutex mtx_;bool isRun true;std::condition_variable cv;std::queuestd::functionvoid()taskQueue_;};std::shared_ptrPool pool_;//private:
// static void work(); // 工作函数从任务队列中取出任务并执行
public:explicit ThreadPool(int max_thread_cnt);ThreadPool() default;// 定义移动构造函数ThreadPool(ThreadPool) default;~ThreadPool();bool addTask(std::functionvoid() f); // 外部调用接口void resetTaskQueue();
};#endif //TINYWEBSERVER_THREADPOOL_H
#include ThreadPool.hThreadPool::ThreadPool(int max_thread_cnt): pool_(std::make_sharedPool()) {assert(max_thread_cnt 0);for (int i 0; i max_thread_cnt; i) {printf(init thread %d\n, i);std::thread([pool pool_, i]{std::unique_lockstd::mutex locker(pool-mtx_);while(true) {if (!pool-taskQueue_.empty()) {
// LOG_INFO(thread pool: thread %d process task, i);// 有任务开始干活// 这个地方使用move提高效率auto task std::move(pool-taskQueue_.front());pool-taskQueue_.pop();locker.unlock();task(); // 处理任务locker.lock();} else if (!pool-isRun) {break;} else {pool-cv.wait(locker);}}}).detach();}
}ThreadPool::~ThreadPool() {if (static_castbool(pool_)){{std::lock_guardstd::mutex locker(pool_-mtx_);pool_-isRun false;}pool_-cv.notify_all();}
}void ThreadPool::resetTaskQueue() {std::queuestd::functionvoid() q;swap(q, pool_-taskQueue_);
}bool ThreadPool::addTask(std::functionvoid() f) {{std::lock_guardstd::mutex locker(pool_-mtx_);pool_-taskQueue_.emplace(std::forwardstd::functionvoid()(f));}
// LOG_INFO(thead pool: %s, add task);pool_-cv.notify_one();return true;
}
6 数据库连接池
和线程池类似数据库的连接池由一个队列来维护与数据库的多个连接。初始化时创建n个数据库连接并将其插入到队列中。在需要访问数据库时从队列中取出一个连接进行数据库读写操作。在读写完数据后重新将连接插入到队列中。
6.1 RAII
RAIIResource Acquisition Is Initialization资源获取即初始化是一种C编程惯用法用于管理资源如内存、文件句柄、网络连接等确保它们在对象的生命周期内得到正确的管理和释放。
资源绑定到对象的生命周期 资源在对象创建时被获取并在对象销毁时被释放。构造函数负责获取资源析构函数负责释放资源。自动管理通过栈上对象的自动创建和销毁避免手动管理资源的复杂性和潜在错误如资源泄漏。
数据库连接的管理采用RAII机制可以简化资源管理。
6.2 代码
#ifndef SQLCONNPOOL_H
#define SQLCONNPOOL_H#include mysql/mysql.h
#include string
#include queue
#include mutex
#include semaphore.h
#include thread
#include cassert
#include ../log/Log.hclass SqlConnPool {
public:static SqlConnPool *Instance();MYSQL *GetConn();void FreeConn(MYSQL * conn);int GetFreeConnCount();void Init(const char* host, int port,const char* user,const char* pwd, const char* dbName, int connSize);void ClosePool();private:SqlConnPool();~SqlConnPool();int MAX_CONN_;int useCount_;int freeCount_;std::queueMYSQL * connQue_;std::mutex mtx_;sem_t semId_;
};#endif // SQLCONNPOOL_H#include sqlconnpool.h
using namespace std;SqlConnPool::SqlConnPool() {useCount_ 0;freeCount_ 0;
}SqlConnPool* SqlConnPool::Instance() {static SqlConnPool connPool;return connPool;
}void SqlConnPool::Init(const char* host, int port,const char* user,const char* pwd, const char* dbName,int connSize 10) {assert(connSize 0);for (int i 0; i connSize; i) {MYSQL *sql nullptr;sql mysql_init(sql);if (!sql) {LOG_ERROR(MySql init error!);assert(sql);}sql mysql_real_connect(sql, host,user, pwd,dbName, port, nullptr, 0);if (!sql) {LOG_ERROR(MySql Connect error!);}connQue_.push(sql);}MAX_CONN_ connSize;sem_init(semId_, 0, MAX_CONN_);
}MYSQL* SqlConnPool::GetConn() {MYSQL *sql nullptr;if(connQue_.empty()){LOG_WARN(SqlConnPool busy!);return nullptr;}sem_wait(semId_);{lock_guardmutex locker(mtx_);sql connQue_.front();connQue_.pop();}return sql;
}void SqlConnPool::FreeConn(MYSQL* sql) {assert(sql);lock_guardmutex locker(mtx_);connQue_.push(sql);sem_post(semId_);
}void SqlConnPool::ClosePool() {lock_guardmutex locker(mtx_);while(!connQue_.empty()) {auto item connQue_.front();connQue_.pop();mysql_close(item);}mysql_library_end();
}int SqlConnPool::GetFreeConnCount() {lock_guardmutex locker(mtx_);return connQue_.size();
}SqlConnPool::~SqlConnPool() {ClosePool();
}
#ifndef SQLCONNRAII_H
#define SQLCONNRAII_H
#include sqlconnpool.h/* 资源在对象构造初始化 资源在对象析构时释放*/
class SqlConnRAII {
public:SqlConnRAII(MYSQL** sql, SqlConnPool *connpool) {assert(connpool);*sql connpool-GetConn();sql_ *sql;connpool_ connpool;}~SqlConnRAII() {if(sql_) { connpool_-FreeConn(sql_); }}private:MYSQL *sql_;SqlConnPool* connpool_;
};#endif //SQLCONNRAII_H7 HTTP层处理
7.1 HTTP解析
HTTP请求的格式如下图所示 HTTP请求报文遵循着规定的格式我们只需要按要求即可准确解析。
报文分为三个部分
请求行包含请求方法GETPOST…请求url和HTTP版本。中间由空格分隔最后有个\r\n。请求头包含若干个请求体由key: value组成末尾有\r\n。最后一个请求头最后有两个\r\n。请求体可有可无
GET /index.html HTTP/1.1
Host: www.example.com
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36
Accept: text/html,application/xhtmlxml,application/xml;q0.9,image/avif,image/webp,image/apng,*/*;q0.8
Accept-Language: en-US,en;q0.5
Accept-Encoding: gzip, deflate, br
Connection: keep-alive
由于HTTP报文每一行最后都有一个\r\n我们可以从缓冲区中搜索该字符串将每一行截取出来再进行解析。整个过程由于分三个阶段进行因此可使用状态机解决。为此我们规定4个状态分别是解析请求行、解析请求头、解析请求体和结束。在不同的状态执行不同的操作来解析不同的内容。
7.1.1 代码 #ifndef TINYWEBSERVER_PARSEHTTPREQUEST_H
#define TINYWEBSERVER_PARSEHTTPREQUEST_H#include unordered_map
#include regex
#include unordered_set
#include mysql/mysql.h
#include ../buffer/Buffer.h
#include ../utils/Utils.h
#include ../log/Log.h
#include ../pool/sqlconnpool.h
#include ../pool/sqlconnRAII.hclass ParseHttpRequest {
public:// 当前的处理状态enum Status {PARSE_LINE,PARSE_HEADERS,PARSE_BODY,FINISH};
private:std::string version_; // HTTP版本std::unordered_mapstd::string, std::string headers_; // 头部字段Status state_ PARSE_LINE;std::string url_;std::string method_;std::string body_;static const std::unordered_setstd::string DEFAULT_HTML;static const std::unordered_mapstd::string, intDEFAULT_HTML_TAG;std::unordered_mapstd::string, std::string post_;
public:ParseHttpRequest();~ParseHttpRequest();void init();bool parse(Buffer buf);void parse_url();bool parseRequestLine(const std::string request_line);void parseRequestHeader(const std::string header_line);void parseRequestBody(const std::string body);std::string method();std::string version();std::string path();bool keepAlive();void parsePost();void parseFromUrlencoded();static bool userVerify(const std::string name, const std::string pwd, bool isLogin);static int convertHex(char ch);
};#endif //TINYWEBSERVER_PARSEHTTPREQUEST_H
#include ParseHttpRequest.h
using namespace std;const unordered_setstring ParseHttpRequest::DEFAULT_HTML{/index, /register, /login,/welcome, /video, /picture, };const unordered_mapstring, int ParseHttpRequest::DEFAULT_HTML_TAG {{/register.html, 0}, {/login.html, 1}, };void ParseHttpRequest::init() {method_ url_ version_ body_ ;state_ PARSE_LINE;headers_.clear();post_.clear();
}bool ParseHttpRequest::keepAlive() {if(headers_.count(Connection) 1) {return headers_.find(Connection)-second keep-alive version_ 1.1;}return false;
}bool ParseHttpRequest::parse(Buffer buff) {const char CRLF[] \r\n;if(buff.getContentLen() 0) {return false;}while(buff.getContentLen() state_ ! FINISH) {const char* lineEnd search(buff.getReadPtr(), buff.getWritePtr(), CRLF, CRLF 2);std::string line(buff.getConstReadPtr(), lineEnd);switch(state_){case PARSE_LINE:if(!parseRequestLine(line)) {return false;}parse_url();break;case PARSE_HEADERS:parseRequestHeader(line);if(buff.getContentLen() 2) {state_ FINISH;}break;case PARSE_BODY:parseRequestBody(line);break;default:break;}if(lineEnd buff.getWritePtr()) { break; }buff.addReadIdxUntil(lineEnd 2);}LOG_DEBUG([%s], [%s], [%s], method_.c_str(), url_.c_str(), version_.c_str());return true;
}void ParseHttpRequest::parse_url() {if(url_ /) {url_ /index.html;}else {for(auto item: DEFAULT_HTML) {if(item url_) {url_ .html;break;}}}
}bool ParseHttpRequest::parseRequestLine(const string line) {regex patten(^([^ ]*) ([^ ]*) HTTP/([^ ]*)$);smatch subMatch;if(regex_match(line, subMatch, patten)) {method_ subMatch[1];url_ subMatch[2];version_ subMatch[3];state_ PARSE_HEADERS;return true;}LOG_ERROR(RequestLine Error);return false;
}void ParseHttpRequest::parseRequestHeader(const string line) {regex patten(^([^:]*): ?(.*)$);smatch subMatch;if(regex_match(line, subMatch, patten)) {headers_[subMatch[1]] subMatch[2];}else {state_ PARSE_BODY;}
}void ParseHttpRequest::parseRequestBody(const string line) {body_ line;parsePost();state_ FINISH;LOG_DEBUG(Body:%s, len:%d, line.c_str(), line.size());
}int ParseHttpRequest::convertHex(char ch) {if(ch A ch F) return ch -A 10;if(ch a ch f) return ch -a 10;return ch;
}void ParseHttpRequest::parsePost() {if(method_ POST headers_[Content-Type] application/x-www-form-urlencoded) {parseFromUrlencoded();if(DEFAULT_HTML_TAG.count(url_)) {int tag DEFAULT_HTML_TAG.find(url_)-second;LOG_DEBUG(Tag:%d, tag);if(tag 0 || tag 1) {bool isLogin (tag 1);if(userVerify(post_[username], post_[password], isLogin)) {url_ /welcome.html;}else {url_ /error.html;}}}}
}void ParseHttpRequest::parseFromUrlencoded() {if(body_.size() 0) { return; }string key, value;int num 0;int n body_.size();int i 0, j 0;for(; i n; i) {char ch body_[i];switch (ch) {case :key body_.substr(j, i - j);j i 1;break;case :body_[i] ;break;case %:num convertHex(body_[i 1]) * 16 convertHex(body_[i 2]);body_[i 2] num % 10 0;body_[i 1] num / 10 0;i 2;break;case :value body_.substr(j, i - j);j i 1;post_[key] value;LOG_DEBUG(%s %s, key.c_str(), value.c_str());break;default:break;}}assert(j i);if(post_.count(key) 0 j i) {value body_.substr(j, i - j);post_[key] value;}
}bool ParseHttpRequest::userVerify(const string name, const string pwd, bool isLogin) {if(name.empty() || pwd.empty()) { return false; }LOG_INFO(Verify name:%s pwd:%s, name.c_str(), pwd.c_str());MYSQL* sql;SqlConnRAII give_me_a_name(sql, SqlConnPool::Instance());assert(sql);bool flag false;char order[256] { 0 };MYSQL_RES *res nullptr;if(!isLogin) { flag true; }/* 查询用户及密码 */snprintf(order, 256, SELECT username, password FROM user WHERE username%s LIMIT 1, name.c_str());LOG_DEBUG(%s, order);if(mysql_query(sql, order)) {mysql_free_result(res);return false;}res mysql_store_result(sql);mysql_num_fields(res);mysql_fetch_fields(res);while(MYSQL_ROW row mysql_fetch_row(res)) {LOG_DEBUG(MYSQL ROW: %s %s, row[0], row[1]);string password(row[1]);/* 注册行为 且 用户名未被使用*/if(isLogin) {if(pwd password) { flag true; }else {flag false;LOG_DEBUG(pwd error!);}}else {flag false;LOG_DEBUG(user used!);}}mysql_free_result(res);/* 注册行为 且 用户名未被使用*/if(!isLogin flag) {LOG_DEBUG(regirster!);bzero(order, 256);snprintf(order, 256,INSERT INTO user(username, password) VALUES(%s,%s), name.c_str(), pwd.c_str());LOG_DEBUG( %s, order)if(mysql_query(sql, order)) {LOG_DEBUG( Insert error!);flag false;}flag true;}SqlConnPool::Instance()-FreeConn(sql);LOG_DEBUG( UserVerify success!!);return flag;
}std::string ParseHttpRequest::path(){return url_;
}
std::string ParseHttpRequest::method() {return method_;
}std::string ParseHttpRequest::version() {return version_;
}ParseHttpRequest::ParseHttpRequest() {init();
}ParseHttpRequest::~ParseHttpRequest() {}
7.2 HTTP响应
以下是HTTP响应报文的一个例子主要包含响应行、响应头和响应体。
HTTP/1.1 200 OK
Date: Fri, 19 Jul 2024 10:00:00 GMT
Server: Apache/2.4.41 (Ubuntu)
Last-Modified: Mon, 28 Jun 2024 14:30:00 GMT
Content-Type: text/html; charsetUTF-8
Content-Length: 305
Connection: close!DOCTYPE html
html
headtitleExample Page/title
/head
bodyh1Welcome to Example Page/h1pThis is a sample HTML page./p
/body
/html
根据对HTTP请求的处理结果生成相应的HTTP响应结果。
响应行中包含HTTP版本、响应状态码和摘要。
响应头中包含连接状态、返回的文件类型和长度。
响应体中包含返回的资源文件。
7.2.1 代码
#ifndef TINYWEBSERVER_HTTPRESPONSE_H
#define TINYWEBSERVER_HTTPRESPONSE_H
#include unordered_map
#include sys/stat.h
#include fcntl.h
#include sys/mman.h
#include ../utils/Utils.h
#include ../buffer/Buffer.h
#include ../log/Log.h// 构造HTTP响应报文
class HttpResponse {
private:static const std::unordered_mapstd::string, std::string SUFFIX_TYPE;static const std::unordered_mapint, std::string CODE_PATH;static std::unordered_mapint, std::string CODE;std::string srcDir_;std::string path_;bool keepAlive_{};int code_{};char* mmFile_{};struct stat mmFileStat_{};
public:HttpResponse() default;~HttpResponse() default;static void addCors(Buffer buf);static void ErrorContent(Buffer buff, std::string message);void unmapFile();void makeResponse(Buffer buf);void init(const std::string srcDir, const std::string path, bool isKeepAlive, int code);void ErrorHtml_();void AddStateLine_(Buffer buf);void AddHeader_(Buffer buf);void AddContent_(Buffer buff);std::string GetFileType_();size_t fileLen() const;char* file();
};#endif //TINYWEBSERVER_HTTPRESPONSE_H
#include HttpResponse.hconst std::unordered_mapstd::string, std::string HttpResponse::SUFFIX_TYPE {{ .html, text/html },{ .xml, text/xml },{ .xhtml, application/xhtmlxml },{ .txt, text/plain },{ .rtf, application/rtf },{ .pdf, application/pdf },{ .word, application/nsword },{ .png, image/png },{ .gif, image/gif },{ .jpg, image/jpeg },{ .jpeg, image/jpeg },{ .au, audio/basic },{ .mpeg, video/mpeg },{ .mpg, video/mpeg },{ .avi, video/x-msvideo },{ .gz, application/x-gzip },{ .tar, application/x-tar },{ .css, text/css },{ .js, text/javascript },
};
std::unordered_mapint, std::string HttpResponse::CODE {{ 200, OK },{ 400, Bad Request },{ 403, Forbidden },{ 404, Not Found },
};
const std::unordered_mapint, std::string HttpResponse::CODE_PATH {{ 400, /400.html },{ 403, /403.html },{ 404, /404.html },
};void HttpResponse::init(const std::string srcDir, const std::string path, bool isKeepAlive, int code) {if (mmFile_) {unmapFile();}keepAlive_ isKeepAlive;srcDir_ srcDir;mmFile_ nullptr;mmFileStat_ { 0 };code_ code;path_ path;
}//void HttpResponse::addHeaders(bool keepAlive, Buffer buf, int type) {
// buf.append(Connection: );
// if (keepAlive) {
// buf.append(keep-alive\r\n);
// buf.append(keep-alive: max6, timeout120\r\n);
// } else {
// buf.append(close\r\n);
// }
// if (type) {
// buf.append(Content-Type:application/json\r\n);
// } else {
// buf.append(Content-Type:text/html\r\n);
// }
// buf.append(Access-Control-Allow-Origin:*\r\n);
//}void HttpResponse::addCors(Buffer buf) {buf.append(Access-Control-Allow-Methods:POST, OPTIONS, GET, PUT, DELETE\r\n);buf.append(Access-Control-Allow-Headers:Content-Type, Connection, Content-Length, Keep-Alive, \r\n);buf.append(Access-Control-Max-Age:3600\r\n);buf.append(Cache-Control:no-cache, no-store, must-revalidate\r\n);
}//void HttpResponse::addBody(const std::string data, Buffer buf) {
// buf.append(Content-Length: std::to_string(data.length()) \r\n\r\n);
// buf.append(data \r\n);
//}void HttpResponse::AddStateLine_(Buffer buf) {std::string status;if(CODE.count(code_) 1) {status CODE.find(code_)-second;}else {code_ 400;status CODE.find(400)-second;}buf.append(HTTP/1.1 std::to_string(code_) status \r\n);
}void HttpResponse::AddHeader_(Buffer buf) {buf.append(Connection: );if(keepAlive_) {buf.append(keep-alive\r\n);buf.append(keep-alive: max6, timeout120\r\n);} else{buf.append(close\r\n);}buf.append(Content-type: GetFileType_() \r\n);
}void HttpResponse::AddContent_(Buffer buf) {int srcFd open((srcDir_ path_).data(), O_RDONLY);if(srcFd 0) {ErrorContent(buf, File NotFound!);return;}/* 将文件映射到内存提高文件的访问速度MAP_PRIVATE 建立一个写入时拷贝的私有映射*/
// LOG_DEBUG(file path %s, size: %d, (srcDir_ path_).data(), mmFileStat_.st_size);int* mmRet (int*)mmap(nullptr, mmFileStat_.st_size, PROT_READ, MAP_PRIVATE, srcFd, 0);if(*mmRet -1) {LOG_ERROR(map file failed);ErrorContent(buf, File NotFound!);return;}mmFile_ (char*)mmRet;close(srcFd);buf.append(Content-length: std::to_string(mmFileStat_.st_size) \r\n\r\n);
}void HttpResponse::makeResponse(Buffer buf) {/* 判断请求的资源文件 */if(stat((srcDir_ path_).data(), mmFileStat_) 0 || S_ISDIR(mmFileStat_.st_mode)) {code_ 404;}else if(!(mmFileStat_.st_mode S_IROTH)) {code_ 403;}else if(code_ -1) {code_ 200;}ErrorHtml_();AddStateLine_(buf);AddHeader_(buf);AddContent_(buf);
}void HttpResponse::ErrorContent(Buffer buff, std::string message)
{std::string body;body htmltitleError/title;body body bgcolor\ffffff\;body p message /p;body hremTinyWebServer/em/body/html;buff.append(Content-length: std::to_string(body.size()) \r\n\r\n);buff.append(body);
}void HttpResponse::unmapFile() {if(mmFile_) {munmap(mmFile_, mmFileStat_.st_size);mmFile_ nullptr;}
}
void HttpResponse::ErrorHtml_() {if(CODE_PATH.count(code_) 1) {path_ CODE_PATH.find(code_)-second;stat((srcDir_ path_).data(), mmFileStat_);}
}
std::string HttpResponse::GetFileType_() {/* 判断文件类型 */std::string::size_type idx path_.find_last_of(.);if(idx std::string::npos) {return text/plain;}std::string suffix path_.substr(idx);if(SUFFIX_TYPE.count(suffix) 1) {return SUFFIX_TYPE.find(suffix)-second;}return text/plain;
}char *HttpResponse::file() {return mmFile_;
}size_t HttpResponse::fileLen() const {return mmFileStat_.st_size;
}
7.3 HTTP处理
HTTP处理模块是整个服务器的核心模块负责管理客户端的连接、读写数据、HTTP处理逻辑。 管理连接 当有客户端连接时初始化相关数据存储fd和客户端地址。 当由于某种原因需要断开连接时该模块关闭fd重置相关数据 读写数据 根据不同的模式读取数据调用buffer中的readFd函数。将缓冲区的数据写入socket中。此时需要注意一次可能不能将全部数据写出需要循环写出并更新指针。 负责整个HTTP的处理流程 首先调用ParseHttpRequest解析读缓冲区的数据然后再调用HttpResponse生成响应报文并放入写缓冲区中最后将写缓冲和请求文件地址赋值给iovec结点等待写出。
7.3.1 代码
#ifndef TINYWEBSERVER_HTTPWORK_H
#define TINYWEBSERVER_HTTPWORK_H#include sys/socket.h
#include netinet/in.h
#include HttpResponse.h
#include mutex
#include ParseHttpRequest.h
#include ../buffer/Buffer.h// 每个工作线程操纵的类接口负责读写数据处理Http请求每个用户持有一个类
class HttpWork {
private:Buffer writeBuf_;Buffer readBuf_;int fd_{};bool isRun_;struct sockaddr_in addr_{};iovec iv[2]{};int io_cnt 2;std::mutex mtx_;public:ParseHttpRequest request_;HttpResponse response_;static std::string srcDir_;static bool et_;static std::atomicint userCount;
public:HttpWork();~HttpWork();void init(int fd, const sockaddr_in addr);ssize_t writeFd(int *Errno);ssize_t readFd(int *Errno);bool processHttp();size_t getWriteLen();void closeConn();int getFd();bool isKeepAlive();void resetBuffer();bool getIsRun();
};#endif //TINYWEBSERVER_HTTPWORK_H
#include HttpWork.h
bool HttpWork::et_;
std::string HttpWork::srcDir_;
std::atomicint HttpWork::userCount;void HttpWork::init(int fd, const sockaddr_in addr) {assert(fd 0);std::lock_guardstd::mutex locker(mtx_);isRun_ true;fd_ fd;addr_ addr;writeBuf_.resetBuffer();readBuf_.resetBuffer();request_.init();userCount ;
}HttpWork::HttpWork() {isRun_ false;addr_ {0};
}ssize_t HttpWork::readFd(int *Errno) {assert(fd_ 0);ssize_t len 0;do {auto t_len readBuf_.readFd(fd_, Errno);// 返回0代表此次读取数据为0if (t_len 0) {break;}len t_len;} while(et_);// len是此次总计读取的数据return len;
}ssize_t HttpWork::writeFd(int *Errno) {assert(fd_ 0);ssize_t len 0;do {len writev(fd_, iv, io_cnt);if (len 0) {// 写错误*Errno errno;break;}// 处理第一个缓冲区if (iv[0].iov_len 0) {// 此时第一个iovec没有写完// 我们将更新iovec的base和buffer中的指针位置auto iv_len1 writeBuf_.getContentLen(); // 获取待写入数据的长度if (iv_len1 static_castsize_t(len)) { // buf中全部写完// iv1已经全部写完后续不再处理iv[0].iov_base nullptr;iv[0].iov_len 0;writeBuf_.resetBuffer();len static_castssize_t(static_castsize_t(len) - iv_len1); // 获取第二个iv结点写入的数据} else {// iv1写了一部分writeBuf_.addReadIdx(len); // 更新// 指针iv[0].iov_base writeBuf_.getReadPtr();iv[0].iov_len writeBuf_.getContentLen();len 0;}}// 处理第二个缓冲区if (iv[0].iov_len 0){iv[1].iov_base (uint8_t*)iv[1].iov_base len;iv[1].iov_len - len;}if (0 getWriteLen()) {iv[1].iov_base nullptr;iv[1].iov_len 0;break; // 写成功}} while(et_);return len;
}HttpWork::~HttpWork() {writeBuf_.resetBuffer();readBuf_.resetBuffer();fd_ -1;close(fd_);
}size_t HttpWork::getWriteLen() {return iv[0].iov_len iv[1].iov_len;
}void HttpWork::closeConn() {std::lock_guardstd::mutex locker(mtx_);if (isRun_) {close(fd_);fd_ -1;isRun_ false;userCount --;LOG_DEBUG(client %d is closed, fd_);}
}bool HttpWork::getIsRun() {std::lock_guardstd::mutex locker(mtx_);return isRun_;
}int HttpWork::getFd() {std::lock_guardstd::mutex locker(mtx_);return fd_;
}bool HttpWork::isKeepAlive() {return request_.keepAlive();
}void HttpWork::resetBuffer() {readBuf_.resetBuffer();writeBuf_.resetBuffer();
}bool HttpWork::processHttp() {// 读缓冲中没有数据接下来继续等待读if (readBuf_.getContentLen() 0) {return false;}
// LOG_DEBUG(readBuf: %s, std::string(readBuf_.getConstReadPtr(), readBuf_.getContentLen()).c_str());request_.init(); // 清空上一次的数据// 请求成功解析if (request_.parse(readBuf_)) {// 解析成功正式进入业务逻辑处理流程response_.init(srcDir_, request_.path(), request_.keepAlive(), 200);} else {response_.init(srcDir_, request_.path(), false, 400);}LOG_INFO(%s %s, request_.method().c_str(), request_.path().c_str());response_.makeResponse(writeBuf_);// 输出报文11iv[0].iov_base writeBuf_.getReadPtr();iv[0].iov_len writeBuf_.getContentLen();io_cnt 1;if (response_.file() response_.fileLen() 0) {iv[1].iov_base response_.file();iv[1].iov_len response_.fileLen();io_cnt 2;}
// LOG_DEBUG(wait for write data: %d, getWriteLen());// 返回true表示等待写return true;
}8 Server层处理
上层的基础API已经实现Server层主要负责监听事件等待客户端连接、接收请求和发送响应。
在这里使用EPOLL来监听各种事件之后分类处理。但是主线程并不是真正的处理而是将读写事件插入到任务队列中由线程池负责处理。
此外当某个fd有事件发生时要延长定时器的超时时间。
服务器所需的参数使用配置文件的形式传入程序中。
8.1 代码
#ifndef TINYWEBSERVER_SERVER_H
#define TINYWEBSERVER_SERVER_H
#include ../http/HttpWork.h
#include ../http/HttpResponse.h
#include ../http/ParseHttpRequest.h
#include ../log/Log.h
#include ../pool/ThreadPool.h
#include ../timer/Timer.h
#include Epoll.h
#include unordered_map
#include sys/epoll.h
#include arpa/inet.h
#include functionalclass Server {
private:const char *ip_;int port_;int trigMod_;int timeoutMs_;int MAXFD_;std::unique_ptrThreadPool threadPool_;std::unique_ptrTimer timer_;std::unique_ptrEpoll epoll_;std::unordered_mapint, HttpWork users_; // 负责处理HTTP请求uint32_t httpConnEvents_{};uint32_t listenEvents_{};int listenFd_{};bool isRun_;std::string log_dir_;std::string srcDir_;public:// 提供服务器运行参数Server(const char* ip, int port, int trigMod, int timeout, LogTarget target, LogLevel::value logLevel,int max_thread_cnt, int max_timer_cnt, int max_fd, int max_epoll_events, int sqlPort, const char * sqlUser,const char * sqlPwd, const char * dbName, int connPoolNum);~Server();void initTrigMode();bool startListen();static int setNonBlocking(int fd);void dealListen();void addClient(int fd, sockaddr_in addr);void dealWrite(HttpWork client);void dealRead(HttpWork client);static void sendError(int fd, const char *msg);void closeConn(HttpWork client);void extendTime(int fd);void readCb(HttpWork client);void writeCb(HttpWork client);void run();
};#endif //TINYWEBSERVER_SERVER_
#include Server.hServer::Server(const char *ip, int port, int trigMod, int timeout, LogTarget target, LogLevel::value logLevel, int max_thread_cnt,int max_timer_cnt, int max_fd, int max_epoll_events, int sqlPort, const char *sqlUser,const char * sqlPwd, const char * dbName, int connPoolNum):ip_(ip), port_(port),trigMod_(trigMod), timeoutMs_(timeout), MAXFD_(max_fd),threadPool_(new ThreadPool(max_thread_cnt)), timer_(new Timer(max_timer_cnt)),epoll_(new Epoll(max_epoll_events)) {isRun_ false;srcDir_ getcwd(nullptr, 256);auto l Log::getInstance();
// 初始化日志系统l-init(target, (srcDir_ /log).c_str(), .log, logLevel);HttpWork::srcDir_ srcDir_ /resources;SqlConnPool::Instance()-Init(localhost, sqlPort, sqlUser, sqlPwd, dbName, connPoolNum);
// 初始化监听事件initTrigMode();
// 启动listenFdif (startListen()) {isRun_ true;}
}void Server::initTrigMode() {listenEvents_ EPOLLRDHUP; // 检测socket关闭httpConnEvents_ EPOLLONESHOT | EPOLLRDHUP; // EPOLLONESHOT由一个线程处理switch (trigMod_) {case 0:break;case 1:httpConnEvents_ | EPOLLET;break;case 2:listenEvents_ | EPOLLET;break;case 3:listenEvents_ | EPOLLET;httpConnEvents_ | EPOLLET;break;default:listenEvents_ | EPOLLET;httpConnEvents_ | EPOLLET;}HttpWork::et_ (httpConnEvents_ EPOLLET);
}bool Server::startListen() {struct sockaddr_in address {0};address.sin_port htons(port_);address.sin_family AF_INET;inet_pton(AF_INET, ip_, address.sin_addr);listenFd_ socket(PF_INET, SOCK_STREAM, 0);if (listenFd_ 0) {LOG_FATAL(create socket failed);return false;}setNonBlocking(listenFd_);int res;int optVal 1;res setsockopt(listenFd_, SOL_SOCKET, SO_REUSEADDR, optVal, sizeof(int));if(res -1) {LOG_FATAL(set socket setsockopt error !);close(listenFd_);return false;}res bind(listenFd_, (struct sockaddr*)address, sizeof address);if (res -1) {LOG_FATAL(bind socket failed);return false;}res listen(listenFd_, 8);if (res 0) {LOG_FATAL(%s %d, listen failed, res);return false;}epoll_-addFd(listenFd_, EPOLLIN|listenEvents_);LOG_INFO(listening on %s:%d, ip_, port_);return true;
}int Server::setNonBlocking(int fd) {int old fcntl(fd, F_GETFL);int newOp old | O_NONBLOCK;fcntl(fd, F_SETFL, newOp);return old;
}void Server::run() {if (!isRun_) {LOG_ERROR(Server start failed);return;}int timeout -1;LOG_INFO(Server start running);while(isRun_) {if (timeoutMs_ 0) {// 清理过期时间timeout timer_-getNextTick();}// 等待直到下一个定时事件超时如果timeout为-1代表队列中已经没有定时任务阻塞等待int cnt epoll_-wait(timeout);for (int i 0; i cnt; i) {// 以此处理每个事件int fd epoll_-getEventFd(i);uint32_t events epoll_-getEvents(i);if (fd listenFd_) {// 处理服务器连接请求dealListen();} else if (events (EPOLLRDHUP EPOLLERR EPOLLHUP)) {LOG_WARN((main): close event: fd(%d), fd);closeConn(users_[fd]); // 关闭连接} else if (events EPOLLIN) {dealRead(users_[fd]);} else if (events EPOLLOUT) {dealWrite(users_[fd]);} else {LOG_ERROR((main): unexpected event);}}}
}void Server::dealListen() {sockaddr_in address{0};socklen_t addr_len sizeof address;do {int fd accept(listenFd_, (struct sockaddr*)address, addr_len);if (fd 0) {return;} else if (HttpWork::userCount MAXFD_) {sendError(fd, Server busy);LOG_ERROR(server is full);return;}addClient(fd, address);} while(listenEvents_ EPOLLET);
}
void Server::addClient(int fd, sockaddr_in addr) {// 初始化连接users_[fd].init(fd, addr);HttpWork client users_[fd];
// Log::DEBUG((main): user %d isRun: %s, fd, std::to_string(client.getIsRun()).c_str());setNonBlocking(fd);// 假如监听列表epoll_-addFd(fd, EPOLLIN|httpConnEvents_);// 超时后断开连接if (timeoutMs_ 0) {// 添加定时事件utimer_-push(fd, timeoutMs_, [this, client] { closeConn(client); }); // 这里报错了原因是closeConn的client参数应为指针}LOG_INFO((main): user[%d] in, ip: %s, port: %d, fd, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
}void Server::dealWrite(HttpWork client) {assert(client.getIsRun());extendTime(client.getFd());threadPool_-addTask([this, client] { writeCb(client); });
}void Server::dealRead(HttpWork client) {
// LOG_INFO((main): dealRead client: %d, client.getFd());assert(client.getIsRun());extendTime(client.getFd());threadPool_-addTask([this, client] { readCb(client); });
}void Server::sendError(int fd, const char *msg) {assert(fd 0);auto len write(fd, msg, sizeof msg);if (len 0) {LOG_WARN((main): send error to client %d error, fd);}close(fd);
}void Server::extendTime(int fd) {assert(fd 0);timer_-reset(fd, timeoutMs_);
}void Server::readCb(HttpWork client) {assert(client.getIsRun());int Errno 0;auto len client.readFd(Errno);if (len 0 !(Errno EAGAIN || Errno 0)) {// 出现了其他错误关闭连接LOG_ERROR((thread):read error: %d, client %d is closing, Errno, client.getFd());closeConn(client);return;}if (client.processHttp()) {// 成功处理了http读请求response已生成等待写出epoll_-modFd(client.getFd(), EPOLLOUT | httpConnEvents_);} else {// http请求未处理读缓冲为空重新等待请求epoll_-delFd(client.getFd());LOG_ERROR((thread): readBuf is none, client: %d, client.getFd());closeConn(client);}
}void Server::writeCb(HttpWork client) {assert(client.getIsRun()); // 连接未关闭int Errno 0;auto len client.writeFd(Errno);
// LOG_DEBUG(Error: %d, Errno);if (client.getWriteLen() 0) {LOG_INFO((thread): write successfully from user %d, client.getFd());// 传输成功if (client.isKeepAlive()) {epoll_-modFd(client.getFd(), EPOLLIN | httpConnEvents_);client.resetBuffer();return;}} else if (len 0 Errno EAGAIN) {// 写缓冲满了继续传输
// LOG_WARN(EAGAIN, continue write, client %d, client.getFd());epoll_-modFd(client.getFd(), EPOLLOUT | httpConnEvents_);return;}LOG_INFO((thread): client %d is closing, client.getFd());closeConn(client);
}void Server::closeConn(HttpWork client) {if (!client.getIsRun())return;LOG_INFO((main): client %d is closing, client.getFd());epoll_-delFd(client.getFd());client.closeConn();
}Server::~Server() {close(listenFd_);isRun_ false;
}9 压力测试
9.1 ET模式
./webbench-1.5/webbench -c 5000 -t 10 http://127.0.0.1:20001/./webbench-1.5/webbench -c 8000 -t 10 http://127.0.0.1:20001/./webbench-1.5/webbench -c 10000 -t 10 http://127.0.0.1:20001/ 9.2 LT模式
./webbench-1.5/webbench -c 10000 -t 10 http://127.0.0.1:20001/9.3 测试环境
Ubuntu: 20.04cpu: i5-1035G1内存: 16G
10 运行说明
10.1 数据库初始化
CREATE DATABASE webserver;
USE webserver;CREATE TABLE user (username VARCHAR(50) NOT NULL,password VARCHAR(50) NOT NULL,PRIMARY KEY (username)
);INSERT INTO user (username, password) VALUES (root, 123456);10.2 导入mysql.h
安装mysql驱动
sudo apt-get install libmysqlclient-dev10.3 编译运行
进入项目根目录
make
./build/bin/server11 致谢 https://github.com/markparticle/WebServer Linux高性能服务器编程游双著. 完整项目链接https://github.com/Joker0x00/TinyWebServer 文章转载自: http://www.morning.xxwhz.cn.gov.cn.xxwhz.cn http://www.morning.jrqbr.cn.gov.cn.jrqbr.cn http://www.morning.kxqfz.cn.gov.cn.kxqfz.cn http://www.morning.zntf.cn.gov.cn.zntf.cn http://www.morning.bcngs.cn.gov.cn.bcngs.cn http://www.morning.hwnqg.cn.gov.cn.hwnqg.cn http://www.morning.jsmyw.cn.gov.cn.jsmyw.cn http://www.morning.dycbp.cn.gov.cn.dycbp.cn http://www.morning.wwznd.cn.gov.cn.wwznd.cn http://www.morning.ggnkt.cn.gov.cn.ggnkt.cn http://www.morning.fylsz.cn.gov.cn.fylsz.cn http://www.morning.qyfqx.cn.gov.cn.qyfqx.cn http://www.morning.rfhmb.cn.gov.cn.rfhmb.cn http://www.morning.xcyzy.cn.gov.cn.xcyzy.cn http://www.morning.fqlxg.cn.gov.cn.fqlxg.cn http://www.morning.fssjw.cn.gov.cn.fssjw.cn http://www.morning.jkfyt.cn.gov.cn.jkfyt.cn http://www.morning.wqkfm.cn.gov.cn.wqkfm.cn http://www.morning.mgnrc.cn.gov.cn.mgnrc.cn http://www.morning.rftk.cn.gov.cn.rftk.cn http://www.morning.qbzfp.cn.gov.cn.qbzfp.cn http://www.morning.bbgn.cn.gov.cn.bbgn.cn http://www.morning.cxsdl.cn.gov.cn.cxsdl.cn http://www.morning.lmyq.cn.gov.cn.lmyq.cn http://www.morning.btwlp.cn.gov.cn.btwlp.cn http://www.morning.flxqm.cn.gov.cn.flxqm.cn http://www.morning.ydnxm.cn.gov.cn.ydnxm.cn http://www.morning.jhrtq.cn.gov.cn.jhrtq.cn http://www.morning.hyhzt.cn.gov.cn.hyhzt.cn http://www.morning.wbysj.cn.gov.cn.wbysj.cn http://www.morning.ymwny.cn.gov.cn.ymwny.cn http://www.morning.yxplz.cn.gov.cn.yxplz.cn http://www.morning.mtbth.cn.gov.cn.mtbth.cn http://www.morning.qmqgx.cn.gov.cn.qmqgx.cn http://www.morning.zmwd.cn.gov.cn.zmwd.cn http://www.morning.nswcw.cn.gov.cn.nswcw.cn http://www.morning.trpq.cn.gov.cn.trpq.cn http://www.morning.wljzr.cn.gov.cn.wljzr.cn http://www.morning.srrzb.cn.gov.cn.srrzb.cn http://www.morning.weitao0415.cn.gov.cn.weitao0415.cn http://www.morning.flchj.cn.gov.cn.flchj.cn http://www.morning.lwyqd.cn.gov.cn.lwyqd.cn http://www.morning.gprzp.cn.gov.cn.gprzp.cn http://www.morning.ysbhj.cn.gov.cn.ysbhj.cn http://www.morning.kcypc.cn.gov.cn.kcypc.cn http://www.morning.kgfsz.cn.gov.cn.kgfsz.cn http://www.morning.rccpl.cn.gov.cn.rccpl.cn http://www.morning.glkhx.cn.gov.cn.glkhx.cn http://www.morning.rcntx.cn.gov.cn.rcntx.cn http://www.morning.rlwcs.cn.gov.cn.rlwcs.cn http://www.morning.bftqc.cn.gov.cn.bftqc.cn http://www.morning.ryznd.cn.gov.cn.ryznd.cn http://www.morning.wmlby.cn.gov.cn.wmlby.cn http://www.morning.yckwt.cn.gov.cn.yckwt.cn http://www.morning.slnz.cn.gov.cn.slnz.cn http://www.morning.mlcwl.cn.gov.cn.mlcwl.cn http://www.morning.zmbzl.cn.gov.cn.zmbzl.cn http://www.morning.bdypl.cn.gov.cn.bdypl.cn http://www.morning.xrwbc.cn.gov.cn.xrwbc.cn http://www.morning.zbnts.cn.gov.cn.zbnts.cn http://www.morning.jlschmy.com.gov.cn.jlschmy.com http://www.morning.fssmx.com.gov.cn.fssmx.com http://www.morning.rymd.cn.gov.cn.rymd.cn http://www.morning.ycnqk.cn.gov.cn.ycnqk.cn http://www.morning.phxns.cn.gov.cn.phxns.cn http://www.morning.prkdl.cn.gov.cn.prkdl.cn http://www.morning.zlnmm.cn.gov.cn.zlnmm.cn http://www.morning.rlhh.cn.gov.cn.rlhh.cn http://www.morning.xscpq.cn.gov.cn.xscpq.cn http://www.morning.wnnlr.cn.gov.cn.wnnlr.cn http://www.morning.ybgyz.cn.gov.cn.ybgyz.cn http://www.morning.ccyns.cn.gov.cn.ccyns.cn http://www.morning.yjdql.cn.gov.cn.yjdql.cn http://www.morning.zlqyj.cn.gov.cn.zlqyj.cn http://www.morning.tntbs.cn.gov.cn.tntbs.cn http://www.morning.bmjfp.cn.gov.cn.bmjfp.cn http://www.morning.bntgy.cn.gov.cn.bntgy.cn http://www.morning.nqlcj.cn.gov.cn.nqlcj.cn http://www.morning.lrplh.cn.gov.cn.lrplh.cn http://www.morning.bhjyh.cn.gov.cn.bhjyh.cn