NSQ 是一个 Go 实现的实时分布式消息平台。

文档:https://nsq.io/

源码:https://github.com/nsqio/nsq

特性

  • support distributed topologies with no SPOF

  • horizontally scalable (no brokers, seamlessly add more nodes to the cluster)

  • low-latency push based message delivery (performance)

  • combination load-balanced and multicast style message routing

  • excel at both streaming (high-throughput) and job oriented (low-throughput) workloads

  • primarily in-memory (beyond a high-water mark messages are transparently kept on disk)

  • runtime discovery service for consumers to find producers (nsqlookupd)

  • transport layer security (TLS)

  • data format agnostic

  • few dependencies (easy to deploy) and a sane, bounded, default configuration

  • simple TCP protocol supporting client libraries in any language

  • HTTP interface for stats, admin actions, and producers (no client library needed to publish)

  • integrates with statsd for realtime instrumentation

  • robust cluster administration interface (nsqadmin)

和 Kafka 对比

缺点:

  • 消息投递方式只支持最少一次

  • 消息不保证有序

  • 节点不保存消息副本

  • 消息只有 PUSH 模式

优点:

  • 配置和部署简单,资源消耗少

  • 轻量,没有依赖,Debug 方便

  • 支持水平扩展

  • 支持 SPOF 的分布式拓扑

NSQ 组件

  • nsqd :守护进程,它接收、排队并将消息传递给客户端。

  • nsqlookupd :管理拓扑信息的守护进程,它有两个接口:nsqd 使用它的 TCP 接口广播,客户端使用它的 HTTP 接口发现和管理。

  • psqadmin :一个用于实时查看聚合集群统计数据并执行各种管理任务的 Web UI。

它还包含一组辅助工具:

容器部署

使用 Docker 部署测试,使用 nsq_to_filetopic=test 的消息持久化到本地,忽略空白文件,并将持久化的数据压缩。

docker-compose.yml
version: '3'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160:4160"
      - "4161:4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150"
      - "4151:4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd
    ports:
      - "4171:4171"
  nsq_to_file:
    image: nsqio/nsq
    command: /nsq_to_file --gzip --lookupd-http-address=nsqlookupd:4161 --output-dir=/data --skip-empty-files --topic=test
    volumes:
      - ./nsq-data:/data
    depends_on:
      - nsqlookupd

生产者

producer.go
package main

import (
	"fmt"
	"log"

	"github.com/nsqio/go-nsq"
)

func main() {
	config := nsq.NewConfig()

	producer, err := nsq.NewProducer("127.0.0.1:4150", config)
	if err != nil {
		log.Fatal(err)
	}

	messageBody := []byte("hello")
	topicName := "test"

	i := 10
	for i > 0 {
		i--
		fmt.Println(">", i)
		// 异步发送消息
		_ = producer.PublishAsync(topicName, []byte(fmt.Sprintf("%s %d", messageBody, i)), nil)
	}

	producer.Stop()
}

producer 提供了 6 个方法用户发布消息:

  • Publish:同步发送消息。

  • PublishAsync:异步发送消息。

  • MultiPublish:一次发送多个同步消息。

  • MultiPublishAsync:一次发送多个异步消息。

  • DeferredPublish:同步延迟发送消息。

  • DeferredPublishAsync:异步延迟发送消息。

消费者

consumer.go
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/nsqio/go-nsq"
)

type myMessageHandler struct{}

func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
	if len(m.Body) == 0 {
		return nil
	}

	fmt.Printf(">> %s, %s\n", m.ID, m.Body)

	return nil
}

func main() {
	config := nsq.NewConfig()
	config.MaxInFlight = 10 // 限流 客户端接收消息数量

	consumer, err := nsq.NewConsumer("test", "channel", config)
	if err != nil {
		log.Fatal(err)
	}

	consumer.AddConcurrentHandlers(&myMessageHandler{}, 5) // 并发处理消息数量

	err = consumer.ConnectToNSQD("127.0.0.1:4150")
	if err != nil {
		log.Fatal(err)
	}

	time.Sleep(time.Hour)
	consumer.Stop()
}

NSQ 消息只支持 PUSH 模式,当上游突发流量时,consumer 可以通过配置 MaxInFlight 值来达到限流的目的。同时消费端也支持并发接收消息。