怎么使用nsq消息中间件

本篇内容主要讲解“怎么使用nsq消息中间件”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么使用nsq消息中间件”吧!

目前成都创新互联公司已为上千余家的企业提供了网站建设、域名、网页空间、网站托管运营、企业网站设计、崇明网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。

组成

nsq是一款轻量级的消息中间件,查看nsq官网给出的解释,可知nsq的组成和分工:


nsqdis the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.

从上面可以看出:

  • nsqd是一个守护进程,负责与客户端打交道,负责缓存来自客户端的消息

  • nsqd可以作为一个单实例独自运行,通常在nsqlookupd实例的协同下组成集群(集群场景下,nsqd能用于发现topics和channels)

  • nsqd监听两个TCP端口,分别用于客户端(默认4150)和HTTP API(默认4151),另外可监听用于HTTPS的端口


nsqlookupdis the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.

从上面可以看出:

  • nsqlookupd是一个守护进程,负责管理拓扑信息,可供客户端查询得到nsqd节点(nsqd广播topics和channels信息)

  • nsqlookupd提供两种接口,TCP接口(默认4160)被nsqd用来发送广播,HTTP接口(默认4161)被客户端用于发现nsqd和连接nsqadmin

nsqadminis a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.

从上面可以看出:

  • nsqadmin是一个后台管控Web进程,可实时浏览集群状态,可发起多种管理任务(nsqadmin依赖nsqlookupd来处理用户操作)

安装

这里为了快速搭建,使用docker compose方式安装(docker-compose.yaml见附件)

拷贝docker-compose.yaml到虚拟机,相关命令如下:

怎么使用nsq消息中间件

分别启动 nsqlookupd/nsqadmin/nsqd,对应三个容器和端口映射

浏览器中可打开 http://192.168.1.91:32770 访问 nsqadmin(虚拟机IP为192.168.1.91)

测试

package main

import (
	"bufio"
	"fmt"
	"github.com/bitly/go-nsq"
	"nsq-demo/src/config"
	"os"
)

var producer *nsq.Producer

func InitProducer(addr string) {
    var err error
	producer, err = nsq.NewProducer(addr, nsq.NewConfig())
	if err != nil {
		panic(err)
	}
	fmt.Println("connect to ", producer.String())
}

func Publish(topic, msg string) error {
	if producer == nil {// check producer
		return fmt.Errorf("producer is nil")
	}
	if msg == "" {// void empty msg
		return nil
	}
	return producer.Publish(topic, []byte(msg))// publish msg
}

func main() {
	InitProducer(config.Nsqd01)
	running := true

	reader := bufio.NewReader(os.Stdin)
	for running {
		data, _, _ := reader.ReadLine()
		command := string(data)
		if command == "stop" {
			running = false
		}

		for err := Publish(config.Topic, command); err != nil; err = Publish(config.Topic, command) {
			config.ExchangeNsqdIPs()
			InitProducer(config.Nsqd01)
		}
	}
	producer.Stop()
}

// producer直连nsqd后,接收来自控制台的输入,然后将消息发送给nsqd

package main

import (
	"fmt"
	"github.com/bitly/go-nsq"
	"nsq-demo/src/config"
	"time"
)

type MyConsumer struct{}

func (*MyConsumer) HandleMessage(msg *nsq.Message) error {// implementation Handler interface
	fmt.Println("receive from ", msg.NSQDAddress, "msg:", string(msg.Body))
	return nil
}

func InitConsumer(topic, channel, addr string) {
	conf := nsq.NewConfig()
	conf.LookupdPollInterval = time.Second
	c, err := nsq.NewConsumer(topic, channel, conf)
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0)// set system log
	c.AddHandler(&MyConsumer{})// set Hander to handle msg

	//if err := c.ConnectToNSQLookupd(addr); err != nil {
	//	panic(err)
	//}

	//if err := c.ConnectToNSQDs(config.GetNsqdIPs()); err != nil {
	//	panic(err)
	//}

	if err := c.ConnectToNSQD(config.Nsqd01); err != nil {
		panic(err)
	}
}

func main() {
	InitConsumer(config.Topic, config.Channel, config.Lookupd)
	select {}
}

// consumer直连nsqd后,通过自定义的Handler来处理消息

附录

version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160" # for the nsqd
      - "4161" # for the nsqadmin
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 # connect to nsqlookupd
    depends_on:
      - nsqlookupd
    ports:
      - "4150" # for clients
      - "4151" # for the HTTP API
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 # connect to nsqlookupd
    depends_on:
      - nsqlookupd  
    ports:
      - "4171"

# docker-compose.yaml of simple nsq

到此,相信大家对“怎么使用nsq消息中间件”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


网页标题:怎么使用nsq消息中间件
文章出自:http://pcwzsj.com/article/pgeidp.html