外贸seo网站推广,上海做网站站优云一一十七,营销型网站建设的目标是,保亭网站建设文章目录
一 nsq的整体代码结构
二 回顾nsq的整体架构图
三 nsqd进程的作用
四 nsqd启动流程的源码分析
五 本篇博客总结 在博客 nsq整体架构及各个部件作用详解_YZF_Kevin的博客-CSDN博客 中我们讲了nsq的整体框架#xff0c;各个部件的大致作用。如果没看过的各个部件的大致作用。如果没看过的建议大家去学习下不然理解后续的内容会有难度
这篇博客开始我们来看下每个部件的详细功能从源码入手分析其内部实现原理 一 nsq的整体代码结构
建议大家也下载nsq的代码一边看博客一边看代码印象更深刻。nsq的官方git代码地址GitHub - nsqio/nsq: A realtime distributed messaging platform
nsq代码结构如下图中有注释大家先有个整体印象知道各个模块的代码在哪就行 二 回顾nsq的整体架构图 图中最上面的四个节点就是nsqd进程至少要有1个可以多开。我们画了4个分别是nsq1nsq2nsq3nsq4
注意看nsqd的连接关系每个nsqd节点和所有客户端都有连接(tcphttp)且每个nsqd节点和所有的nsqlookupd节点也有连接(tcp)
三 nsqd进程的作用
1. topic的创建清空暂停重新激活删除持久化(保存到文件从文件加载)同步给nsqlookupd进程
2. channel的创建清空暂停重新激活删除持久化(保存到文件从文件加载)同步给nsqlookupd进程
3. message的监听中转持久化(保存到文件从文件加载)主动推送消息给各个客户端超时重发消息计数
4. 配置修改运行状态协程、内存统计
5. 抽检channel的延迟队列飞行队列消息超时的重新入队
6. 统计和上报工作主要统计topicchannel消息各种队列客户端连接GC等信息通过UDP协议上报给指定地址的进程 可以说nsqd进程是整个nsq平台的核心消息队列架构简单的话只有一个nsqd进程就够了。
我画了一个图来概括一个nsqd进程的工作内容如下 四 nsqd启动流程的源码分析
nsqd的代码主要在两块
1. 代码框架及main函数目录在 nsq/apps/nsqd/*
2. 实现代码目录在 nsq/nsqd/*
值得一提的是nsqdnsqlookupdnsqadmin这三个进程的框架都使用了go-svc包这个包很简单使用者只需实现它的三个函数即可
Init() 配置初始化等操作
Start() 真正启动
Stop() 结束时的关闭操作
好了我们看nsqd的入口也就是main函数代码在nsq/apps/nsqd/main.go代码如下(已加注释)
type program struct {once sync.Oncensqd *nsqd.NSQD // nsqd对象
}// nsqd的启动入口
func main() {prg : program{}// Run内部会调用Init()Start()监听到这两个系统信号时会调用Stop()if err : svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err ! nil {logFatal(%s, err)}
}
main()函数内部只有一个对象program也只有一处调用svc.Run()这个函数内部会调用program.Init()和program.Start()
其中program.Init()函数主要是创建并检测nsqd的配置然后根据配置创建出一个nsqd实例
重点在program.Start()函数代码如下(已加注释)
// nsqd的启动重点在调用的Main()函数
func (p *program) Start() error {// 加载元数据并创建初始化出所有的topic对象所有的channel对象err : p.nsqd.LoadMetadata()if err ! nil {logFatal(failed to load metadata - %s, err)}// 再持久化元数据到文件不要觉得奇怪因为上面的LoadMetadata()函数可能会过滤掉一些无效的topicchannel这里再重写算是刷新了元数据err p.nsqd.PersistMetadata()if err ! nil {logFatal(failed to persist metadata - %s, err)}// 启动一个新协程专门运行nsqd的Main()循环注意这个Main()是永不退出的(除非出错)go func() {err : p.nsqd.Main()if err ! nil {p.Stop()os.Exit(1)}}()return nil
}
对上面的代码解释下program.Start()函数一共干了3件事
1. nsqd.LoadMetadata(), 这个函数根据配置加载旧nsqd元数据。这些元数据包含版本号topicchannel过滤掉不合法的topic和channel合法的topic和channel都创建出对象并且为每个topic建立处理循环
2. nsqd.PersistMetadata(), 把过滤后的topic和channel再保存到文件nsqd.dat算是把旧数据过滤了一遍
3. 新启动一个协程调用nsqd.Main()这个Main()是nsqd的核心启动了nsqd的全部服务。除非遇到错误否则永不退出
接下来看nsqd.Main()的内部实现代码在nsq/nsqd/nsqd.go代码如下(已加注释)
// nsqd主协程内部启动tcp循环http循环https循环, 扫描队列池和nsqlookupd循环永不退出除非严重错误
func (n *NSQD) Main() error {exitCh : make(chan error)var once sync.Once// 退出函数独立协程运行一直监听遇到错误exitFunc : func(err error) {once.Do(func() {if err ! nil {n.logf(LOG_FATAL, %s, err)}exitCh - err})}// TCP服务独立协程n.waitGroup.Wrap(func() {exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))})// HTTP服务独立协程if n.httpListener ! nil {httpServer : newHTTPServer(n, false, n.getOpts().TLSRequired TLSRequired)n.waitGroup.Wrap(func() {exitFunc(http_api.Serve(n.httpListener, httpServer, HTTP, n.logf))})}// HTTPS服务独立协程if n.httpsListener ! nil {httpsServer : newHTTPServer(n, true, true)n.waitGroup.Wrap(func() {exitFunc(http_api.Serve(n.httpsListener, httpsServer, HTTPS, n.logf))})}// 独立协程抽检扫描各个队列n.waitGroup.Wrap(n.queueScanLoop)// 独立协程和nsqlookupd的循环连接和重连心跳维持topicchannel变化通知等n.waitGroup.Wrap(n.lookupLoop)if n.getOpts().StatsdAddress ! {n.waitGroup.Wrap(n.statsdLoop)}err : -exitChreturn err
}
对上面的代码解释下nsqd.Main()主要干了6件事
1. 开一个新协程启动tcp服务并一直监听为客户端提供tcp服务。我们的客户端最常用因为生产消息中转消息处理消息都是这里实现的
2. 开一个新协程启动http服务并一直监听为客户端提供htttp服务
3. 开一个新协程启动https服务并一直监听为客户端提供htttps服务
4. 开一个新协程建立并维持扫描池这些扫描协程会扫描所有channel的延迟队列飞行队列如果消息超时了就重新入队。很有意思的是nsqd作者很大方地承认他抄袭了redis的抽检策略内部实现也确实是类redis操作这个我们后面再讲todo
5. 开一个新协程和nsqlookupd建立循环主要是连接和重连心跳维持实时报告自己的topic和channel变化
6. 如果你配置了信息上报的地址nsqd会再开一个新协程做统计上报操作统计topicchannel消息内存GC等信息通过UDP协议上报过去
五 本篇博客总结
1. 给大家看了nsq平台下代码整体结构建议大家下载源码自己看下加强印象
2. 讲了nsqd进程提供的功能实现
3. 跟踪了nsqd进程启动流程最核心的nsqd.Main()建议大家仔细看后面讲的nsqd内容也都是这几个协程里面干的活 下一篇博客我们开始详解分析nsqd内部各个协程的具体工作
todo