ZeroMQ(也称为 ØMQ,0MQ 或 zmq)是一个可嵌入的网络通讯库(对 Socket 进行了封装)。 它提供了携带跨越多种传输协议(如:进程内,进程间,TCP 和多播)的原子消息的 sockets 。 有了ZeroMQ,我们可以通过发布-订阅、任务分发、和请求-回复等模式来建立 N-N 的 socket 连接。 ZeroMQ 的异步 I / O 模型为我们提供可扩展的基于异步消息处理任务的多核应用程序。 它有一系列语言API(几乎囊括所有编程语言),并能够在大多数操作系统上运行。
传统的 TCP Socket 的连接是1对1的,可以认为“1个 socket = 1个连接”,每一个线程独立维护一个 socket 。但是 ZMQ 摒弃了这种1对1的模式,ZMQ的 Socket 可以很轻松地实现1对N和N对N的连接模式,一个 ZMQ 的 socket 可以自动地维护一组连接,用户无法操作这些连接,用户只能操作套接字而不是连接本身。所以说在 ZMQ 的世界里,连接是私有的。
ZMQ 提供了三种基本的通信模型,分别是 Request-Reply 、Publish-Subscribe 和 Parallel Pipeline ,接下来举例说明三种模型并给出相应的代码实现。
以 “Hello World” 为例。客户端发起请求,并等待服务端回应请求。客户端发送一个简单的 “Hello”,服务端则回应一个 “World”。可以有 N 个客户端,一个服务端,因此是 1-N 连接。
服务端代码如下:
hwserver.java import org.zeromq.ZMQ; public class hwserver { public static void main(String[] args) throws InterruptedException { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket responder = context.socket(ZMQ.REP); responder.bind("tcp://*:5555"); while (!Thread.currentThread().isInterrupted()) { byte[] request = responder.recv(0); System.out.println("Received" + new String(request)); Thread.sleep(1000); String reply = "World"; responder.send(reply.getBytes(),0); } responder.close(); context.term(); } } hwserver.py import time import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: message = socket.recv() print("Received request: %s" % message) # Do some 'work' time.sleep(1) socket.send(b"World")客户端代码如下:
hwclient.java import org.zeromq.ZMQ; public class hwclient { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket requester = context.socket(ZMQ.REQ); requester.connect("tcp://localhost:5555"); for (int requestNbr = 0; requestNbr != 10; requestNbr++) { String request = "Hello"; System.out.println("Sending Hello" + requestNbr); requester.send(request.getBytes(),0); byte[] reply = requester.recv(0); System.out.println("Reveived " + new String(reply) + " " + requestNbr); } requester.close(); context.term(); } } hwclient.py import zmq context = zmq.Context() print("Connecting to hello world server...") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") for request in range(10): print("Sending request %s ..." % request) socket.send(b"Hello") message = socket.recv() print("Received reply %s [ %s ]" % (request,message))从以上的过程,我们可以了解到使用 ZMQ 写基本的程序的方法,需要注意的是:
服务端和客户端无论谁先启动,效果是相同的,这点不同于 Socket。在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。服务端收到信息后,会发送一个 “World” 给客户端。值得注意的是一定是客户端连接上来以后,发消息给服务端,服务端接受消息然后响应客户端,这样一问一答。ZMQ 的通信单元是消息,它只知道消息的长度,并不关心消息格式。因此,你可以使用任何你觉得好用的数据格式,如 Xml、Protocol Buffers、Thrift、json 等等。下面以一个天气预报的例子来介绍该模式。
服务端不断地更新各个城市的天气,客户端可以订阅自己感兴趣(通过一个过滤器)的城市的天气信息。
服务端代码如下:
wuserver.java import org.zeromq.ZMQ; import java.util.Random; public class wuserver { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket publisher = context.socket(ZMQ.PUB); publisher.bind("tcp://*:5556"); publisher.bind("icp://weather"); Random srandom = new Random(System.currentTimeMillis()); while (!Thread.currentThread().isInterrupted()) { int zipcode, temperature, relhumidity; zipcode = 10000 + srandom.nextInt(10000); temperature = srandom.nextInt(215) - 80 + 1; relhumidity = srandom.nextInt(50) + 10 + 1; String update = String.format("d %d %d", zipcode, temperature, relhumidity); } publisher.close(); context.term(); } } wuserver.py from random import randrange import zmq context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5556") while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))客户端代码如下:
wuclient.java import org.zeromq.ZMQ; import java.util.StringTokenizer; public class wuclient { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket suscriber = context.socket(ZMQ.SUB); suscriber.connect("tcp://localhost:5556"); String filter = (args.length > 0) ? args[0] : "10001"; suscriber.suscribe(filter.getBytes()); //过滤条件 int update_nbr; long total_temp = 0; for (update_nbr = 0; update_nbr < 100; update_nbr++) { String string = suscriber.recvStr(0).trim(); StringTokenizer sscanf = new StringTokenizer(string, " "); int zipcode = Integer.valueOf(sscanf.nextToken()); int temperature = Integer.valueOf(sscanf.nextToken()); int relhumidity = Integer.valueOf(sscanf.nextToken()); total_temp += temperature; } System.out.println("Average temperature for zipcode '" + filter + "' was " + (int) (total_temp / update_nbr)); suscriber.close(); context.term(); } } wuclient.py import sys import zmq context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from weather server...") socket.connect("tcp://localhost:5556") zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001" if isinstance(zip_filter, bytes): zip_filter = zip_filter.decode('ascii') socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter) total_temp = 0 for update_nbr in range(5): string = socket.recv_string() zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print("Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / (update_nbr + 1)))服务器端生成随机数 zipcode、temperature、relhumidity 分别代表城市代码、温度值和湿度值,然后不断地广播信息。而客户端通过设置过滤参数,接受特定城市代码的信息,最终将收集到的温度求平均值。
需要注意的是:
与 “Hello World” 例子不同的是,Socket 的类型变成 ZMQ.PUB 和 ZMQ.SUB 。客户端需要设置一个过滤条件,接收自己感兴趣的消息。发布者一直不断地发布新消息,如果中途有订阅者退出,其他均不受影响。当订阅者再连接上来的时候,收到的就是后来发送的消息了。这样比较晚加入的或者是中途离开的订阅者必然会丢失掉一部分信息。这是该模式的一个问题,即所谓的 "Slow joiner" 。Parallel Pipeline 处理模式如下:
ventilator 分发任务到各个 worker每个 worker 执行分配到的任务最后由 sink 收集从 worker 发来的结果taskvent.java import org.zeromq.ZMQ; import java.io.IOException; import java.util.Random; import java.util.StringTokenizer; public class taskvent { public static void main(String[] args) throws IOException { ZMQ.Context context = new ZMQ.context(1); ZMQ.Socket sender = context.socket(ZMQ.PUSH); sender.bind("tcp://*:5557"); ZMQ.Socket sink = context.socket(ZMQ.PUSH); sink.connect("tcp://localhost:5558"); System.out.println("Please enter when the workers are ready: "); System.in.read(); System.out.println("Sending task to workes\n"); sink.send("0",0); Random srandom = new Random(System.currentTimeMillis()); int task_nbr; int total_msec = 0; for (task_nbr = 0; task_nbr < 100; task_nbr++) { int workload; workload = srandom.nextInt(100) + 1; total_msec += workload; System.out.print(workload + "."); String string = String.format("%d", workload); sender.send(string, 0); } System.out.println("Total expected cost: " + total_msec + " msec"); sink.close(); sender.close(); context.term(); } } taskvent.py import zmq import time import random try: raw_input except NameError: raw_input = input context = zmq.Context() sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") sink = context.socket(zmq.PUSH) sink.connect("tcp://localhost:5558") print("Please enter when workers are ready: ") _ = raw_input() print("Sending tasks to workers...") sink.send(b'0') random.seed() total_msec = 0 for task_nbr in range(100): workload = random.randint(1, 100) total_msec += workload sender.send_string(u'%i' % workload) print("Total expected cost: %s msec" % total_msec) time.sleep(1) taskwork.java import org.zeromq.ZMQ; public class taskwork { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.connect("tcp://localhost:5557"); ZMQ.Socket sender = context.socket(ZMQ.PUSH); sender.connect("tcp://localhost:5558"); while (!Thread.currentThread().isInterrupted()) { String string = receiver.recv(0).trim(); Long mesc = Long.parseLong(string); System.out.flush(); System.out.print(string + "."); Sleep(mesc); sender.send("".getBytes(), 0); } sender.close(); receiver.close(); context.term(); } } taskwork.py import zmq import time import sys context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") while True: s = receiver.recv() sys.stdout.write('.') sys.stdout.flush() time.sleep(int(s) * 0.001) sender.send(b'') tasksink.java import org.zeromq.ZMQ; public class tasksink { public static void main(String[] args) { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket receiver = context.socket(ZMQ.PULL); receiver.bind("tcp://*:5558"); String string = new String(receiver.recv(0)); long tstart = System.currentTimeMillis(); int task_nbr; int total_mesc = 0; for (task_nbr = 0; task_nbr < 100; task_nbr++) { string = new String(receiver.recv(0).trim()); if ((task_nbr / 10) * 10 == task_nbr) { System.out.print(":"); } else { System.out.print("."); } } long tend = System.currentTimeMillis(); System.out.println("\nTotal elapsed time: " + (tend - tstart) + "msec"); receiver.close(); context.term(); } } tasksink.py import time import zmq import sys context = zmq.Context() receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") s = receiver.recv() tstart = time.time() for task_nbr in range(1, 100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.') sys.stdout.flush() tend = time.time() print("Total elapsed time: %d msec" % ((tend - tstart) * 1000))
以下两点需要注意:
ventilator 使用 ZMQ.PUSH 来分发任务;worker 用 ZMQ.PULL 来接收任务,用 ZMQ.PUSH 来发送结果;sink 用 ZMQ.PULL 来接收 worker 发来的结果。ventilator 既是服务端,也是客户端(此时服务端是 sink);worker 在两个过程中都是客户端;sink 也一直都是服务端。