diff --git a/examples/poll_zsock.lua b/examples/poll_zsock.lua new file mode 100644 index 0000000..163dd0c --- /dev/null +++ b/examples/poll_zsock.lua @@ -0,0 +1,164 @@ +-- Copyright (c) 2012 Robert G. Jakabosky +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +local zmq = require"zmq" + +local z_EVENTS = zmq.EVENTS + +local z_SNDMORE = zmq.SNDMORE +local z_RCVMORE = zmq.RCVMORE +local z_NOBLOCK = zmq.NOBLOCK + +local z_POLLIN = zmq.POLLIN +local z_POLLOUT = zmq.POLLOUT +local z_POLLIN_OUT = z_POLLIN + z_POLLOUT + +local poll + +local meths = {} +local zsock_mt = { __index=meths } + +local function zsock_check_events(self) + if not self.check_enabled then + self.check_enabled = true + poll:add_work(self.on_work) + end +end + +function meths:events() + zsock_check_events(self) + return self.sock:events() +end + +function meths:getopt(opt) + if (opt == z_EVENTS) then + zsock_check_events(self) + end + return self.sock:getopt(opt) +end + +function meths:setopt(opt,val) + return self.sock:setopt(opt,val) +end + +function meths:sub(topic) + return self.sock:sub(topic) +end + +function meths:unsub(topic) + return self.sock:unsub(topic) +end + +function meths:identity(id) + return self.sock:identity(id) +end + +function meths:bind(addr) + return self.sock:bind(addr) +end + +function meths:connect(addr) + return self.sock:connect(addr) +end + +function meths:close() + return self.sock:close() +end + +function meths:send(msg, flags) + zsock_check_events(self) + local sent, err = self.sock:send(msg, flags) + if not sent and err == 'timeout' then + self.send_blocked = true + end + return sent, err +end + +function meths:send_msg(msg, flags) + zsock_check_events(self) + local sent, err = self.sock:send_msg(msg, flags) + if not sent and err == 'timeout' then + self.send_blocked = true + end + return sent, err +end + +function meths:recv(flags) + zsock_check_events(self) + return self.sock:recv(flags) +end + +function meths:recv_msg(msg, flags) + zsock_check_events(self) + return self.sock:recv_msg(msg, flags) +end + +local function nil_cb() +end + +local function wrap_zsock(sock, on_data, on_drain) + local self = setmetatable({ + sock = sock, + on_data = on_data or nil_cb, + on_drain = on_drain or nil_cb, + send_blocked = false, + check_enabled = false, + }, zsock_mt) + + function self.on_work() + self.check_enabled = false + local events = sock:events() + local read = false + local write = false + if events == z_POLLIN_OUT then + read = true + write = true + elseif events == z_POLLIN then + read = true + elseif events == z_POLLOUT then + write = true + else + return + end + if read then + self:on_data(sock) + end + if write and self.send_blocked then + self:on_drain(sock) + end + end + + -- listen for read events to enable socket. + poll:add_read(sock:fd(), function() + zsock_check_events(self) + end) + + return self +end + +return setmetatable({ +set_poller = function(poller) + local old = poll + poll = poller + return old +end, +wrap_zsock = wrap_zsock, +}, { __call = function(tab, ...) return wrap_zsock(...) end}) +