ZMQ Allowed Patterns

  • PUB and SUB
  • REQ and REP
  • REQ and ROUTER
  • DEALER and REP
  • DEALER and ROUTER
  • DEALER and DEALER
  • ROUTER and ROUTER
  • PUSH and PULL
  • PAIR and PAIR

ZMQ模式

REQ-REP

请求-应答:由请求端发起请求,并等待回应端回应请求。从请求端来看,一定是一对对收发配对的;反之,在回应端一定是发收对。请求端和回应端都可以是1:N的模型。通常把1认为是server,N认为是Client。

注意:

  1. 必须先提问,后回答
  2. 对于一个提问,只能回答一次
  3. 在没有收到回答前不能再次提问

举个栗子

Request:

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://127.0.0.1:10000')

for _ in range(1, 11):
    socket.send(b'hello, i am msg-%s' % _)
    message = socket.recv()
    print("Received reply %d [%s]" % (_, message))

Reply:

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('tcp://127.0.0.1:10000')

while True:
    message = socket.recv()
    print("Received request: %s " % message)
    time.sleep(1)
    socket.send("yay, it's ok.")

Extended Request-Reply

关于 DEALER 和 ROUTE:

DEALER 类似一个异步的 REQ Socket。

ROUTE 类似一个一部的 REP Socket。

你依然可以使用REQ,也可以使用DEALER,它们有两个不同:

  • The REQ socket always sends an empty delimiter frame before any data frames; the DEALER does not.
  • The REQ socket will send only one message before it receives a reply; the DEALER is fully asynchronous.

看一个具体的例子:

Broker:

import zmq

context = zmq.Context()

frontend = context.socket(zmq.ROUTER)
frontend.bind('tcp://127.0.0.1:5559')

backend = context.socket(zmq.DEALER)
backend.bind('tcp://127.0.0.1:5560')

# init poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        backend.send_multipart(message)

    if socks.get(backend) == zmq.POLLIN:
        message = backend.recv_multipart()
        frontend.send_multipart(message)

Client:

import zmq

#  Prepare our context and sockets
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://127.0.0.1:5559")

#  Do 10 requests, waiting each time for a response
for request in range(1,11):
    socket.send(b"Hello")
    message = socket.recv()
    print("Received reply %s [%s]" % (request, message))

Server:

import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://127.0.0.1:5560")

while True:
    message = socket.recv()
    print("Received request: %s" % message)
    socket.send(b"World")

客户端和服务端相对透明,唯一的静态节点代理在中间。

PUB-SUB

发布-订阅:类似广播。这个模型里,发布端是单向只发送数据的,且不关心是否把全部的信息都发送给订阅者。如果发布端开始发布信息的时候,订阅端尚未连接上,这些信息直接丢弃。不过一旦订阅连接上来,中间会保证没有信息丢失。同样,订阅端则只负责接收,而不能反馈。如果发布端和订阅端需要交互(比如要确认订阅者是否已经连接上),则使用额外的socket采用请求回应模型满足这个需求。

举个栗子:

Publisher

import zmq
import random
import time

port = "10001"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:%s" % port)

while True:
    topic = random.randrange(9999, 10005)
    messagedata = random.randrange(1, 215) - 80
    print("%d %d" % (topic, messagedata))
    socket.send("%d %d" % (topic, messagedata))
    time.sleep(1)

Subscriber:

Publisher像广播一样一直在发送,不管Subscriber是否接收得到,当Subscriber连接上Publisher之后,从连上那一刻开始接收KEY为10001的内容,满5次自动断开。Publisher依然会向端口广播消息,不论Subscriber是否连接。

import zmq

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting updates from weather server...")
socket.connect("tcp://127.0.0.1:10001")

# Subscribe to zipcode, default is NYC, 10001, not port
topicfilter = "10001"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

# Process 5 updates
total_value = 0
for update_nbr in range(5):
    string = socket.recv()
    topic, messagedata = string.split()
    total_value += int(messagedata)
    print('topic: %s  messagedata: %s' %(topic, messagedata))

print(
    "Average value: topic '%s' messagedata %dF" %
    (topicfilter, total_value / update_nbr)
)

PUSH-PULL

管道模式。这个模型里,管道是单向的,从PUSH端单向的向PULL端单向的推送数据流。

举个栗子:

Ventilator:

import zmq
import random

context = zmq.Context()

ventilator = context.socket(zmq.PUSH)
ventilator.bind("tcp://127.0.0.1:5557")

# random.seed()

while True:
    workload = random.randint(1, 100)
    ventilator.send(str(workload))

Worker:

import zmq

context = zmq.Context()

ventilator = context.socket(zmq.PULL)
ventilator.connect('tcp://127.0.0.1:5557')

sink = context.socket(zmq.PUSH)
sink.connect('tcp://127.0.0.1:5558')

while True:
    # do something
    recv = ventilator.recv()
    sink.send(recv)

Sink:

import zmq

context = zmq.Context()

sink = context.socket(zmq.PULL)
sink.bind("tcp://127.0.0.1:5558")

while True:
    print sink.recv()

PAIR

在一个进程中,用于两个线程间的通信。

Conventional sockets allow:

  • only strict one-to-one (two peers)
  • many-to-one (many clients, one server)
  • one-to-many (multicast) relationships

Exclusive pair pattern

Paired sockets are very similar to regular sockets.

  • The communication is bidirectional.
  • There is no specific state stored within the socket
  • There can only be one connected peer.
  • The server listens on a certain port and a client connects to it.

举个栗子:

服务端和客户端可以发送任意的消息数量。

Server:

import zmq
import random
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)

while True:
    socket.send("Server message to client3")
    msg = socket.recv()
    print msg
    time.sleep(1)

Client:

import zmq
import random
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

while True:
    msg = socket.recv()
    print msg
    socket.send("client message to server1")
    socket.send("client message to server2")
    time.sleep(1)

路由

更复杂的网络模型。ZMQ可以很好的支持路由功能(实现路由功能的组件叫做Device),把1:N扩展为N:M(只需要加入若干路由节点)。从这个模型看,更底层的端点地址是对上层隐藏的。每个请求都隐含回应地址,而应用则不关心它。

Device 组件有 3 个基本模式:

  • Queue
  • Forwarder
  • Streamer

还有新的Proxy()。

Queue Device

Queue 是位于客户端和服务器之间的中介,将客户端的请求转发给服务端,再将收到的服务端的回应转发给客户端。

Queue:

import zmq

def main():
    try:
        context = zmq.Context(1)
        frontend = context.socket(zmq.XREP)
        frontend.bind('tcp://*:5559')

        backend = context.socket(zmq.XREQ)
        backend.bind('tcp://*:5560')

        zmq.device(zmq.QUEUE, frontend, backend)
    except Exception, e:
        print e
        print "bringing down zmq device"
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == '__main__':
    main()

Client:

import zmq
import random

port = '5559'
context = zmq.Context()

print 'Connecting to server...'
socket = context.socket(zmq.REQ)
socket.connect('tcp://localhost:%s' % port)

client_id = random.randrange(1, 10005)

for request in range(1, 11):
    print 'Sending request ', request
    socket.send('Hello from %s ' % client_id)

    message = socket.recv()
    print 'Received reply %d [%s]' % (request, message)

Server:

import zmq
import time
import random

port = '5560'

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect('tcp://localhost:%s' % port)

server_id = random.randrange(1, 10005)

while True:
    message = socket.recv()
    print 'Received request: ', message
    time.sleep(1)
    socket.send('World from server %s' % server_id)

如果想查看负载均衡效果,可以运行多个客户端和服务端。客户端的请求会经由中介分配到不同的服务器。

Forwarder

如同 QUEUE 的 request-reply 模式。FORWARDER 像 pub-sub 的代理服务器。它允许 publishers 和 subscribers 移动部件,FORWARDER 是中心部件负责连接所有的publishers和subscribers。

Forwarder:

import zmq

def main():
    try:
        context = zmq.Context(1)

        # sub
        frontend = context.socket(zmq.SUB)
        frontend.bind("tcp://127.0.0.1:5559")

        frontend.setsockopt(zmq.SUBSCRIBE, "")

        # pub
        backend = context.socket(zmq.PUB)
        backend.bind("tcp://127.0.0.1:5560")

        # forwarder
        zmq.device(zmq.FORWARDER, frontend, backend)
    except Exception, e:
        print e
        print "bringing down zmq device"
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

Client:

import zmq

port = "5560"

context = zmq.Context()
socket = context.socket(zmq.SUB)

print "Collecting updates from server..."
socket.connect("tcp://localhost:%s" % port)

topicfilter = "9"
socket.setsockopt(zmq.SUBSCRIBE, topicfilter)

for update_nbr in range(10):
    string = socket.recv()
    topic, messagedata = string.split()
    print topic, messagedata

Server:

import zmq
import random
import time

port = "5559"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect("tcp://localhost:%s" % port)

publisher_id = random.randrange(0, 9999)

while True:
    topic = random.randrange(1,10)
    messagedata = "server#%s" % publisher_id
    print "%s %s" % (topic, messagedata)
    socket.send("%d %s" % (topic, messagedata))
    time.sleep(1)

Streamer

Streamer 用于并行管道消息通信。作为一个代理,从PUSH端收集任务并发放给PULL端。

Streamer:

import zmq

def main():
    try:
        context = zmq.Context()

        # pull
        frontend = context.socket(zmq.PULL)
        frontend.bind('tcp://127.0.0.1:5559')

        # push
        backend = context.socket(zmq.PUSH)
        backend.bind('tcp://127.0.0.1:5560')

        # streamer
        zmq.device(zmq.STREAMER, frontend, backend)
    except Exception, e:
        print e
        print 'streamer_device'
    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == '__main__':
    main()

PUSH:

import time
import zmq

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.connect("tcp://127.0.0.1:5559")
    # push
    for num in xrange(20000):
        work_message = {'num': num}
        zmq_socket.send_json(work_message)
        time.sleep(1)

producer()

PULL:

import zmq
import random

def consumer():
    context = zmq.Context()
    # pull
    consumer_receiver = context.socket(zmq.PULL)
    consumer_receiver.connect("tcp://127.0.0.1:5560")

    consumer_id = random.randrange(1, 10005)
    print "I am consumer #%s" % (consumer_id)

    while True:
        work = consumer_receiver.recv_json()
        data = work['num']
        result = {'consumer': consumer_id, 'num': data}
        print result

consumer()

Proxy()

用于替代Device。

Show me The Code:

import zmq

def main():
    context = zmq.Context()

    # Socket facing clients
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://127.0.0.1:5559")

    # Socket facing services
    backend = context.socket(zmq.DEALER)
    backend.bind("tcp://127.0.0.1:5560")

    zmq.proxy(frontend, backend)

    # We never get here
    frontend.close()
    backend.close()
    context.term()

if __name__ == "__main__":
    main()

参考链接:
http://zguide.zeromq.org/py:all
http://pyzmq.readthedocs.io/en/latest/api/
https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/
https://pyzmq.readthedocs.io/en/latest/api/index.html