diff --git a/examples/client_epoll.lua b/examples/client_epoll.lua new file mode 100644 index 0000000..5341c29 --- /dev/null +++ b/examples/client_epoll.lua @@ -0,0 +1,108 @@ +-- Copyright (c) 2010 Aleksey Yeschenko +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +local poller = require"examples.poller" +local poll = poller.new() + +local zmq = require"zmq" +local z_NOBLOCK = zmq.NOBLOCK +local z_EVENTS = zmq.EVENTS +local z_POLLIN = zmq.POLLIN +local z_POLLOUT = zmq.POLLOUT +local z_POLLIN_OUT = z_POLLIN + z_POLLOUT + +local N=tonumber(arg[1] or 100) + +local ctx = zmq.init() +local s = ctx:socket(zmq.REQ) +local s_FD = s:getopt(zmq.FD) + +s:connect("tcp://localhost:5555") + +-- current socket state +local blocked_state +local blocked_event +local on_sock_recv +local on_sock_send + +-- IO event callback when socket was blocked +local function on_sock_io() + local events = s:getopt(z_EVENTS) + local unblocked = false + if events == blocked_event then + -- got the event the socket was blocked on. + unblocked = true + elseif events == z_POLLIN_OUT then + -- got both in & out events + unblocked = true + end + if unblocked then + -- got the event we are blocked on resume. + blocked_event = nil + blocked_state() + -- check if blocked event was processed. + if not blocked_event then + poll:remove_read(s_FD) + end + end +end +local function sock_blocked(state, event) + if not blocked_event then + -- need to register socket's fd with event loop + poll:add_read(s_FD, on_sock_io) + end + blocked_state = state + blocked_event = event +end + +-- sock state functions +function on_sock_send() + N = N - 1 + if N == 0 then + return poll:stop() + end + local sent, err = s:send("SELECT * FROM mytable", z_NOBLOCK) + if not sent then + assert(err == 'timeout', "Bad error on zmq socket.") + return sock_blocked(on_sock_send, z_POLLOUT) + end + -- yield back to event loop + poll:add_work(on_sock_recv) +end + +function on_sock_recv() + local data, err = s:recv(z_NOBLOCK) + if not data then + assert(err == 'timeout', "Bad error on zmq socket.") + return sock_blocked(on_sock_recv, z_POLLIN) + end + print(data) + return on_sock_send() +end + +-- start processing of the socket. +poll:add_work(on_sock_send) + +-- start event loop +poll:start() + +s:close() +ctx:term() + diff --git a/examples/poller.lua b/examples/poller.lua new file mode 100644 index 0000000..db5f125 --- /dev/null +++ b/examples/poller.lua @@ -0,0 +1,121 @@ +-- Copyright (c) 2010 Aleksey Yeschenko +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +local epoll = require"epoll" +local EPOLLIN = epoll.EPOLLIN +local EPOLLOUT = epoll.EPOLLOUT + +local poller_meths = {} +local poller_mt = {__index = poller_meths} + +local function poller_new() + local reads = {} + -- create closure for epoll io_event callback. + local function do_io_event(fd, ev) + local cb = reads[fd] + return cb(fd, ev) + end + + return setmetatable({ + work_cur = {}, + work_last = {}, + reads = reads, + io_events = 0, + do_io_event = do_io_event, + poller = epoll.new(), + }, poller_mt) +end + +function poller_meths:add_work(task) + -- add task to current work queue. + self.work_cur[#self.work_cur + 1] = task +end + +function poller_meths:add_read(fd, cb) + -- make sure read event hasn't been registered yet. + if not self.reads[fd] then + self.io_events = self.io_events + 1 + self.reads[fd] = cb + return self.poller:add(fd, EPOLLIN, fd) + else + -- update read callback? + self.reads[fd] = cb + end +end + +function poller_meths:remove_read(fd) + -- make sure there was a read event registered. + if self.reads[fd] then + self.io_events = self.io_events - 1 + self.reads[fd] = nil + return self.poller:del(fd) + end +end + +local function poller_do_work(self) + local tasks = #self.work_cur + -- check if there is any work + if tasks > 0 then + -- swap work queues. + local last, cur = self.work_cur, self.work_last + self.work_cur, self.work_last = cur, last + for i=1,tasks do + local task = last[i] + last[i] = nil + task() + end + -- return new work queue length. + return #cur + end + return tasks +end + +function poller_meths:start() + local do_io_event = self.do_io_event + local poller = self.poller + self.is_running = true + while self.is_running do + -- run work task + local new_work = poller_do_work(self) + -- wait == 0, if there is work to do, else wait == -1 + local wait = (new_work > 0) and 0 or -1 + -- poll for fd events, if there are events to poll for. +--print("poller:step()", new_work, self.io_events) + if self.io_events > 0 then + assert(poller:wait_callback(do_io_event, wait)) + else + -- no io events to poll, do we still have work? + if #self.work_cur == 0 then + -- nothing to do, exit event loop + self.is_running = false + return + end + end + end +end + +function poller_meths:stop() + self.is_running = false +end + +-- module only exports a 'new' function. +return { +new = poller_new, +} diff --git a/examples/publisher_epoll.lua b/examples/publisher_epoll.lua new file mode 100644 index 0000000..6f8e13b --- /dev/null +++ b/examples/publisher_epoll.lua @@ -0,0 +1,95 @@ +-- Copyright (c) 2010 Aleksey Yeschenko +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +local poller = require"examples.poller" +local poll = poller.new() + +local zmq = require"zmq" +local z_NOBLOCK = zmq.NOBLOCK +local z_EVENTS = zmq.EVENTS +local z_POLLIN = zmq.POLLIN +local z_POLLOUT = zmq.POLLOUT +local z_POLLIN_OUT = z_POLLIN + z_POLLOUT + +local ctx = zmq.init() +local s = ctx:socket(zmq.PUB) +local s_FD = s:getopt(zmq.FD) + +s:bind("tcp://lo:5555") + +-- current socket state +local blocked_state +local blocked_event +local on_sock_recv +local on_sock_send + +-- IO event callback when socket was blocked +local function on_sock_io() + local events = s:getopt(z_EVENTS) + local unblocked = false + if events == blocked_event then + -- got the event the socket was blocked on. + unblocked = true + elseif events == z_POLLIN_OUT then + -- got both in & out events + unblocked = true + end + if unblocked then + -- got the event we are blocked on resume. + blocked_event = nil + blocked_state() + -- check if blocked event was processed. + if not blocked_event then + poll:remove_read(s_FD) + end + end +end +local function sock_blocked(state, event) + if not blocked_event then + -- need to register socket's fd with event loop + poll:add_read(s_FD, on_sock_io) + end + blocked_state = state + blocked_event = event +end + +-- sock state functions +local msg_id = 1 +function on_sock_send() + local sent, err = s:send(tostring(msg_id), z_NOBLOCK) + if not sent then + assert(err == 'timeout', "Bad error on zmq socket.") + return sock_blocked(on_sock_send, z_POLLOUT) + end + -- message sent, inc. id + msg_id = msg_id + 1 + -- yield back to event loop + poll:add_work(on_sock_send) +end + +-- start processing of the socket. +poll:add_work(on_sock_send) + +-- start event loop +poll:start() + +s:close() +ctx:term() + diff --git a/examples/server_epoll.lua b/examples/server_epoll.lua new file mode 100644 index 0000000..383c822 --- /dev/null +++ b/examples/server_epoll.lua @@ -0,0 +1,102 @@ +-- Copyright (c) 2010 Aleksey Yeschenko +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +local poller = require"examples.poller" +local poll = poller.new() + +local zmq = require"zmq" +local z_NOBLOCK = zmq.NOBLOCK +local z_EVENTS = zmq.EVENTS +local z_POLLIN = zmq.POLLIN +local z_POLLOUT = zmq.POLLOUT +local z_POLLIN_OUT = z_POLLIN + z_POLLOUT + +local ctx = zmq.init() +local s = ctx:socket(zmq.REP) +local s_FD = s:getopt(zmq.FD) + +s:bind("tcp://lo:5555") + +-- current socket state +local blocked_state +local blocked_event +local on_sock_recv +local on_sock_send + +-- IO event callback when socket was blocked +local function on_sock_io() + local events = s:getopt(z_EVENTS) + local unblocked = false + if events == blocked_event then + -- got the event the socket was blocked on. + unblocked = true + elseif events == z_POLLIN_OUT then + -- got both in & out events + unblocked = true + end + if unblocked then + -- got the event we are blocked on resume. + blocked_event = nil + blocked_state() + -- check if blocked event was processed. + if not blocked_event then + poll:remove_read(s_FD) + end + end +end +local function sock_blocked(state, event) + if not blocked_event then + -- need to register socket's fd with event loop + poll:add_read(s_FD, on_sock_io) + end + blocked_state = state + blocked_event = event +end + +-- sock state functions +function on_sock_recv() + local data, err = s:recv(z_NOBLOCK) + if not data then + assert(err == 'timeout', "Bad error on zmq socket.") + return sock_blocked(on_sock_recv, z_POLLIN) + end + print(string.format("Received query: '%s'", data)) + return on_sock_send() +end + +function on_sock_send() + local sent, err = s:send("OK", z_NOBLOCK) + if not sent then + assert(err == 'timeout', "Bad error on zmq socket.") + return sock_blocked(on_sock_send, z_POLLOUT) + end + -- yield back to event loop + poll:add_work(on_sock_recv) +end + +-- start processing of the socket. +poll:add_work(on_sock_recv) + +-- start event loop +poll:start() + +s:close() +ctx:term() + diff --git a/examples/subscriber_epoll.lua b/examples/subscriber_epoll.lua new file mode 100644 index 0000000..8cf65e6 --- /dev/null +++ b/examples/subscriber_epoll.lua @@ -0,0 +1,96 @@ +-- Copyright (c) 2010 Aleksey Yeschenko +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +local poller = require"examples.poller" +local poll = poller.new() + +local zmq = require"zmq" +local z_NOBLOCK = zmq.NOBLOCK +local z_EVENTS = zmq.EVENTS +local z_POLLIN = zmq.POLLIN +local z_POLLOUT = zmq.POLLOUT +local z_POLLIN_OUT = z_POLLIN + z_POLLOUT + +local N=tonumber(arg[1] or 100) + +local ctx = zmq.init() +local s = ctx:socket(zmq.SUB) +local s_FD = s:getopt(zmq.FD) + +s:setopt(zmq.SUBSCRIBE, "") +s:connect("tcp://localhost:5555") + +-- current socket state +local blocked_state +local blocked_event +local on_sock_recv +local on_sock_send + +-- IO event callback when socket was blocked +local function on_sock_io() + local events = s:getopt(z_EVENTS) + local unblocked = false + if events == blocked_event then + -- got the event the socket was blocked on. + unblocked = true + elseif events == z_POLLIN_OUT then + -- got both in & out events + unblocked = true + end + if unblocked then + -- got the event we are blocked on resume. + blocked_event = nil + blocked_state() + -- check if blocked event was processed. + if not blocked_event then + poll:remove_read(s_FD) + end + end +end +local function sock_blocked(state, event) + if not blocked_event then + -- need to register socket's fd with event loop + poll:add_read(s_FD, on_sock_io) + end + blocked_state = state + blocked_event = event +end + +-- sock state functions +function on_sock_recv() + local data, err = s:recv(z_NOBLOCK) + if not data then + assert(err == 'timeout', "Bad error on zmq socket.") + return sock_blocked(on_sock_recv, z_POLLIN) + end + local msg_id = tonumber(data) + if (msg_id % 10000) == 0 then print(data) end + return on_sock_recv() +end + +-- start processing of the socket. +poll:add_work(on_sock_recv) + +-- start event loop +poll:start() + +s:close() +ctx:term() +