diff --git a/examples/client.lua b/examples/client.lua index 682a549..5e9849c 100644 --- a/examples/client.lua +++ b/examples/client.lua @@ -18,15 +18,24 @@ -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- THE SOFTWARE. -require("zmq") +local zmq = require"zmq" -local ctx = zmq.init(1) +local N=tonumber(arg[1] or 100) + +local ctx = zmq.init() local s = ctx:socket(zmq.REQ) s:connect("tcp://localhost:5555") -s:send("SELECT * FROM mytable") -print(s:recv()) +for i=1,N do + s:send("SELECT * FROM mytable") + local data, err = s:recv() + if data then + print(data) + else + print("s:recv() error:", err) + end +end s:close() ctx:term() diff --git a/examples/client_multipart.lua b/examples/client_multipart.lua index e82ca26..4f10c58 100644 --- a/examples/client_multipart.lua +++ b/examples/client_multipart.lua @@ -18,17 +18,26 @@ -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- THE SOFTWARE. -require("zmq") +local zmq = require"zmq" -local ctx = zmq.init(1) +local N=tonumber(arg[1] or 100) + +local ctx = zmq.init() local s = ctx:socket(zmq.REQ) s:connect("tcp://localhost:5555") -s:send("SELECT * FROM mytable ", zmq.SNDMORE) -s:send("WHERE library = 'zmq'") +for i=1,N do + s:send("SELECT * FROM mytable ", zmq.SNDMORE) + s:send("WHERE library = 'zmq'") -print(s:recv()) + local data, err = s:recv() + if data then + print(data) + else + print("s:recv() error:", err) + end +end s:close() ctx:term() diff --git a/examples/ev_subscriber.lua b/examples/ev_subscriber.lua index 876a0ea..13f51a9 100644 --- a/examples/ev_subscriber.lua +++ b/examples/ev_subscriber.lua @@ -18,65 +18,65 @@ -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- THE SOFTWARE. -require("zmq") +local zmq = require"zmq" local ev = require'ev' local loop = ev.Loop.default -- define a sub_worker class local sub_worker_mt = {} function sub_worker_mt:close(...) - self.s_io_idle:stop(self.loop) - self.s_io_read:stop(self.loop) - return self.socket:close(...) + self.s_io_idle:stop(self.loop) + self.s_io_read:stop(self.loop) + return self.socket:close(...) end function sub_worker_mt:bind(...) - return self.socket:bind(...) + return self.socket:bind(...) end function sub_worker_mt:connect(...) - return self.socket:connect(...) + return self.socket:connect(...) end function sub_worker_mt:sub(topic) - return self.socket:setopt(zmq.SUBSCRIBE, topic) + return self.socket:setopt(zmq.SUBSCRIBE, topic) end function sub_worker_mt:unsub(topic) - return self.socket:setopt(zmq.UNSUBSCRIBE, topic) + return self.socket:setopt(zmq.UNSUBSCRIBE, topic) end sub_worker_mt.__index = sub_worker_mt local function sub_worker(loop, ctx, msg_cb) - local s = ctx:socket(zmq.SUB) - local self = { loop = loop, socket = s, msg_cb = msg_cb } - setmetatable(self, sub_worker_mt) - -- create ev callbacks for recving data. - -- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered - local s_io_idle - local s_io_read - s_io_idle = ev.Idle.new(function() - local msg, err = s:recv(zmq.NOBLOCK) - if err == 'timeout' then - -- need to block on read IO - s_io_idle:stop(loop) - s_io_read:start(loop) - return - end - self:msg_cb(msg) - end) - s_io_idle:start(loop) - s_io_read = ev.IO.new(function() - s_io_idle:start(loop) - s_io_read:stop(loop) - end, s:getopt(zmq.FD), ev.READ) - self.s_io_idle = s_io_idle - self.s_io_read = s_io_read - return self + local s = ctx:socket(zmq.SUB) + local self = { loop = loop, socket = s, msg_cb = msg_cb } + setmetatable(self, sub_worker_mt) + -- create ev callbacks for recving data. + -- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered + local s_io_idle + local s_io_read + s_io_idle = ev.Idle.new(function() + local msg, err = s:recv(zmq.NOBLOCK) + if err == 'timeout' then + -- need to block on read IO + s_io_idle:stop(loop) + s_io_read:start(loop) + return + end + self:msg_cb(msg) + end) + s_io_idle:start(loop) + s_io_read = ev.IO.new(function() + s_io_idle:start(loop) + s_io_read:stop(loop) + end, s:getopt(zmq.FD), ev.READ) + self.s_io_idle = s_io_idle + self.s_io_read = s_io_read + return self end local ctx = zmq.init(1) -- message handling function. local function handle_msg(worker, msg) - local msg_id = tonumber(msg) - if math.mod(msg_id, 10000) == 0 then print(worker.id, msg_id) end + local msg_id = tonumber(msg) + if math.mod(msg_id, 10000) == 0 then print(worker.id, msg_id) end end local sub1 = sub_worker(loop, ctx, handle_msg) diff --git a/examples/publiser.lua b/examples/publisher.lua similarity index 92% rename from examples/publiser.lua rename to examples/publisher.lua index 12e240e..a5f943e 100644 --- a/examples/publiser.lua +++ b/examples/publisher.lua @@ -18,15 +18,15 @@ -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- THE SOFTWARE. -require("zmq") +local zmq = require"zmq" -local ctx = zmq.init(1) +local ctx = zmq.init() local s = ctx:socket(zmq.PUB) s:bind("tcp://lo:5555") local msg_id = 1 while true do - s:send(tostring(msg_id)) - msg_id = msg_id + 1 + s:send(tostring(msg_id)) + msg_id = msg_id + 1 end diff --git a/examples/server.lua b/examples/server.lua index 57bb278..3423c4e 100644 --- a/examples/server.lua +++ b/examples/server.lua @@ -18,14 +18,14 @@ -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- THE SOFTWARE. -require("zmq") +local zmq = require"zmq" -local ctx = zmq.init(1) +local ctx = zmq.init() local s = ctx:socket(zmq.REP) s:bind("tcp://lo:5555") while true do - print(string.format("Received query: '%s'", s:recv())) - s:send("OK") + print(string.format("Received query: '%s'", s:recv())) + s:send("OK") end diff --git a/examples/server_multipart.lua b/examples/server_multipart.lua index 663c3f7..c26e3e0 100644 --- a/examples/server_multipart.lua +++ b/examples/server_multipart.lua @@ -18,18 +18,18 @@ -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- THE SOFTWARE. -require("zmq") +local zmq = require"zmq" -local ctx = zmq.init(1) +local ctx = zmq.init() local s = ctx:socket(zmq.REP) s:bind("tcp://lo:5555") while true do - local query = s:recv() - while s:getopt(zmq.RCVMORE) == 1 do - query = query .. s:recv() - end - print(string.format("Received query: '%s'", query)) - s:send("OK") + local query = s:recv() + while s:getopt(zmq.RCVMORE) == 1 do + query = query .. s:recv() + end + print(string.format("Received query: '%s'", query)) + s:send("OK") end diff --git a/examples/subscriber.lua b/examples/subscriber.lua index 370b7dc..41434c7 100644 --- a/examples/subscriber.lua +++ b/examples/subscriber.lua @@ -18,14 +18,14 @@ -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- THE SOFTWARE. -require("zmq") +local zmq = require"zmq" -local ctx = zmq.init(1) +local ctx = zmq.init() local s = ctx:socket(zmq.SUB) s:setopt(zmq.SUBSCRIBE, "") s:connect("tcp://localhost:5555") while true do - local msg = s:recv() - local msg_id = tonumber(msg) - if math.mod(msg_id, 10000) == 0 then print(msg_id) end + local msg = s:recv() + local msg_id = tonumber(msg) + if math.mod(msg_id, 10000) == 0 then print(msg_id) end end