From a598995e331c76ab0d0e03bbeddedffd66e05ef3 Mon Sep 17 00:00:00 2001 From: "Robert G. Jakabosky" Date: Thu, 22 Mar 2012 01:52:54 -0700 Subject: [PATCH] Get zmq socket events in IO read callback. Allow batch recvs. --- examples/ev_subscriber.lua | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/examples/ev_subscriber.lua b/examples/ev_subscriber.lua index 13f51a9..b2676c9 100644 --- a/examples/ev_subscriber.lua +++ b/examples/ev_subscriber.lua @@ -19,6 +19,11 @@ -- THE SOFTWARE. local zmq = require"zmq" +local z_NOBLOCK = zmq.NOBLOCK +local z_EVENTS = zmq.EVENTS +local z_POLLIN = zmq.POLLIN +local z_POLLOUT = zmq.POLLOUT +local z_POLLIN_OUT = z_POLLIN + z_POLLOUT local ev = require'ev' local loop = ev.Loop.default @@ -51,27 +56,43 @@ local function sub_worker(loop, ctx, msg_cb) -- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered local s_io_idle local s_io_read - s_io_idle = ev.Idle.new(function() - local msg, err = s:recv(zmq.NOBLOCK) + local max_recvs = 10 + local function s_recv(recv_cnt) + local msg, err = s:recv(z_NOBLOCK) if err == 'timeout' then + -- need to block on read IO + return false + end + self:msg_cb(msg) + if recv_cnt > 1 then + return s_recv(recv_cnt - 1) + end + return true + end + s_io_idle = ev.Idle.new(function() + if not s_recv(max_recvs) then -- need to block on read IO s_io_idle:stop(loop) s_io_read:start(loop) - return end - self:msg_cb(msg) end) s_io_idle:start(loop) s_io_read = ev.IO.new(function() - s_io_idle:start(loop) - s_io_read:stop(loop) + local events = s:getopt(z_EVENTS) + if events == z_POLLIN or events == z_POLLIN_OUT then + if s_recv(max_recvs) then + -- read IO is not block, enable idle watcher to handle reads. + s_io_idle:start(loop) + s_io_read:stop(loop) + end + end end, s:getopt(zmq.FD), ev.READ) self.s_io_idle = s_io_idle self.s_io_read = s_io_read return self end -local ctx = zmq.init(1) +local ctx = zmq.init() -- message handling function. local function handle_msg(worker, msg)