logo头像

Aoho's Blog

高性能消息中间件 NSQ 解析-nsqd 实现细节介绍

本文于1542天之前发表,文中内容可能已经过时。

我们在 前面 介绍了 nsq 的相关概念以及 nsq 的安装与应用。从本篇开始将会结合源码介绍 nsq 的实现细节。

nsq 中单个 nsqd 可以有多个 topic,每个 topic 可以有多个 channel。channel 接收这个 topic 所有消息的副本,从而实现多播分发,而 channel 上的每个消息被均匀的分发给它的订阅者,从而实现负载均衡。

image.png

入口函数

首先看下 nsqd 的入口函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//位于 apps/nsqd/main.go:26
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}

func (p *program) Init(env svc.Environment) error {
if env.IsWindowsService() {
dir := filepath.Dir(os.Args[0])
return os.Chdir(dir)
}
return nil
}

func (p *program) Start() error {
opts := nsqd.NewOptions()

flagSet := nsqdFlagSet(opts)
flagSet.Parse(os.Args[1:])
...
}

通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqd 实例。

配置项初始化

初始化配置项(opts, cfg),加载历史数据(nsqd.LoadMetadata)、持久化最新数据(nsqd.PersistMetadata),然后开启协程,进入 nsqd.Main() 主函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 位于 apps/nsqd/main.go:64
options.Resolve(opts, flagSet, cfg)
nsqd, err := nsqd.New(opts)
if err != nil {
logFatal("failed to instantiate nsqd - %s", err)
}
p.nsqd = nsqd

err = p.nsqd.LoadMetadata()
if err != nil {
logFatal("failed to load metadata - %s", err)
}
err = p.nsqd.PersistMetadata()
if err != nil {
logFatal("failed to persist metadata - %s", err)
}

go func() {
err := p.nsqd.Main()
if err != nil {
p.Stop()
os.Exit(1)
}
}()

接着就是初始化 tcpServer, httpServer, httpsServer,然后循环监控队列信息(n.queueScanLoop)、节点信息管理(n.lookupLoop)、统计信息(n.statsdLoop)输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 位于 nsqd/nsqd.go:262
n.tcpServer.nsqd = n
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, tcpServer, n.logf))
})
httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(ctx, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}

n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}

处理请求

分别处理 tcp/http 请求,开启 handler 协程进行并发处理,其中 newHTTPServer 注册路由采用了 Decorate 装饰器模式(后面会进一步解析);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 位于 nsqd/http.go:44
router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(ctx.nsqd.logf)
router.NotFound = http_api.LogNotFoundHandler(ctx.nsqd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqd.logf)
s := &httpServer{
ctx: ctx,
tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired,
router: router,
}

router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

// v1 negotiate
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
router.Handle("POST", "/mpub", http_api.Decorate(s.doMPUB, http_api.V1))
router.Handle("GET", "/stats", http_api.Decorate(s.doStats, log, http_api.V1))

// only v1
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
router.Handle("POST", "/topic/delete", http_api.Decorate(s.doDeleteTopic, log, http_api.V1))

http-Decorate 路由分发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 位于 internal/protocol/tcp_server.go:22
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
logf(lg.WARN, "temporary Accept() failure - %s", err)
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
return fmt.Errorf("listener.Accept() error - %s", err)
}
break
}
go handler.Handle(clientConn)
}

如上的实现为 tcp-handler 处理的主要代码。

tcp 解析协议

tcp 解析 V2 协议,走内部协议封装的 prot.IOLoop(conn) 进行处理;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 位于 nsqd/tcp.go:34
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{ctx: p.ctx}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}

err = prot.IOLoop(clientConn)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
return
}

消息生成与消费

通过内部协议进行 p.Exec(执行命令)、p.Send(发送结果),保证每个 nsqd 节点都能正确的进行消息生成与消费,一旦上述过程有 error 都会被捕获处理,确保分布式投递的可靠性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 位于 nsqd/protocol_v2.go:79
params := bytes.Split(line, separatorBytes)

p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)

var response []byte
response, err = p.Exec(client, params)
if err != nil {
ctx := ""
if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil {
ctx = " - " + parentErr.Error()
}
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx)

sendErr := p.Send(client, frameTypeError, []byte(err.Error()))
if sendErr != nil {
p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx)
break
}

// errors of type FatalClientErr should forceably close the connection
if _, ok := err.(*protocol.FatalClientErr); ok {
break
}
continue
}

if response != nil {
err = p.Send(client, frameTypeResponse, response)
if err != nil {
err = fmt.Errorf("failed to send response - %s", err)
break
}
}

nsqd 也会同时开启 tcp 和 http 服务,两个服务都可以提供给生产者和消费者,http 服务还提供给 nsqadmin 获取该 nsqd 本地 topic 和 channel 信息。

小结

本文主要介绍 nsqd,总的来说 nsqd 的实现并不复杂。nsqd 是一个守护进程,负责接收(生产者 producer )、排队(最小堆实现)、投递(消费者 consumer )消息给客户端。nsqd 可以独立运行,但通常是由 nsqlookupd 实例所在集群配置的。

下一篇文章,将会具体分析 nsq 中其他模块实现的细节。

推荐阅读

高性能消息中间件 NSQ 解析-整体介绍

高性能消息中间件 NSQ 解析-应用实践

微服务架构中使用 ELK 进行日志采集以及统一处理

没有 try-catch,该如何处理 Go 错误异常?

订阅最新文章,欢迎关注我的公众号

微信打赏

赞赏是不耍流氓的鼓励

评论系统未开启,无法评论!