Get zmq socket events in IO read callback. Allow batch recvs.

pull/47/merge
Robert G. Jakabosky 14 years ago
parent 0697debefb
commit a598995e33

@ -19,6 +19,11 @@
-- THE SOFTWARE. -- THE SOFTWARE.
local zmq = require"zmq" local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local ev = require'ev' local ev = require'ev'
local loop = ev.Loop.default local loop = ev.Loop.default
@ -51,27 +56,43 @@ local function sub_worker(loop, ctx, msg_cb)
-- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered -- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered
local s_io_idle local s_io_idle
local s_io_read local s_io_read
s_io_idle = ev.Idle.new(function() local max_recvs = 10
local msg, err = s:recv(zmq.NOBLOCK) local function s_recv(recv_cnt)
local msg, err = s:recv(z_NOBLOCK)
if err == 'timeout' then if err == 'timeout' then
-- need to block on read IO
return false
end
self:msg_cb(msg)
if recv_cnt > 1 then
return s_recv(recv_cnt - 1)
end
return true
end
s_io_idle = ev.Idle.new(function()
if not s_recv(max_recvs) then
-- need to block on read IO -- need to block on read IO
s_io_idle:stop(loop) s_io_idle:stop(loop)
s_io_read:start(loop) s_io_read:start(loop)
return
end end
self:msg_cb(msg)
end) end)
s_io_idle:start(loop) s_io_idle:start(loop)
s_io_read = ev.IO.new(function() s_io_read = ev.IO.new(function()
s_io_idle:start(loop) local events = s:getopt(z_EVENTS)
s_io_read:stop(loop) if events == z_POLLIN or events == z_POLLIN_OUT then
if s_recv(max_recvs) then
-- read IO is not block, enable idle watcher to handle reads.
s_io_idle:start(loop)
s_io_read:stop(loop)
end
end
end, s:getopt(zmq.FD), ev.READ) end, s:getopt(zmq.FD), ev.READ)
self.s_io_idle = s_io_idle self.s_io_idle = s_io_idle
self.s_io_read = s_io_read self.s_io_read = s_io_read
return self return self
end end
local ctx = zmq.init(1) local ctx = zmq.init()
-- message handling function. -- message handling function.
local function handle_msg(worker, msg) local function handle_msg(worker, msg)

Loading…
Cancel
Save