Cleanup examples code.

pull/47/merge
Robert G. Jakabosky 14 years ago
parent caba791908
commit 0697debefb

@ -18,15 +18,24 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
require("zmq")
local zmq = require"zmq"
local ctx = zmq.init(1)
local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local s = ctx:socket(zmq.REQ)
s:connect("tcp://localhost:5555")
s:send("SELECT * FROM mytable")
print(s:recv())
for i=1,N do
s:send("SELECT * FROM mytable")
local data, err = s:recv()
if data then
print(data)
else
print("s:recv() error:", err)
end
end
s:close()
ctx:term()

@ -18,17 +18,26 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
require("zmq")
local zmq = require"zmq"
local ctx = zmq.init(1)
local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local s = ctx:socket(zmq.REQ)
s:connect("tcp://localhost:5555")
s:send("SELECT * FROM mytable ", zmq.SNDMORE)
s:send("WHERE library = 'zmq'")
for i=1,N do
s:send("SELECT * FROM mytable ", zmq.SNDMORE)
s:send("WHERE library = 'zmq'")
print(s:recv())
local data, err = s:recv()
if data then
print(data)
else
print("s:recv() error:", err)
end
end
s:close()
ctx:term()

@ -18,65 +18,65 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
require("zmq")
local zmq = require"zmq"
local ev = require'ev'
local loop = ev.Loop.default
-- define a sub_worker class
local sub_worker_mt = {}
function sub_worker_mt:close(...)
self.s_io_idle:stop(self.loop)
self.s_io_read:stop(self.loop)
return self.socket:close(...)
self.s_io_idle:stop(self.loop)
self.s_io_read:stop(self.loop)
return self.socket:close(...)
end
function sub_worker_mt:bind(...)
return self.socket:bind(...)
return self.socket:bind(...)
end
function sub_worker_mt:connect(...)
return self.socket:connect(...)
return self.socket:connect(...)
end
function sub_worker_mt:sub(topic)
return self.socket:setopt(zmq.SUBSCRIBE, topic)
return self.socket:setopt(zmq.SUBSCRIBE, topic)
end
function sub_worker_mt:unsub(topic)
return self.socket:setopt(zmq.UNSUBSCRIBE, topic)
return self.socket:setopt(zmq.UNSUBSCRIBE, topic)
end
sub_worker_mt.__index = sub_worker_mt
local function sub_worker(loop, ctx, msg_cb)
local s = ctx:socket(zmq.SUB)
local self = { loop = loop, socket = s, msg_cb = msg_cb }
setmetatable(self, sub_worker_mt)
-- create ev callbacks for recving data.
-- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered
local s_io_idle
local s_io_read
s_io_idle = ev.Idle.new(function()
local msg, err = s:recv(zmq.NOBLOCK)
if err == 'timeout' then
-- need to block on read IO
s_io_idle:stop(loop)
s_io_read:start(loop)
return
end
self:msg_cb(msg)
end)
s_io_idle:start(loop)
s_io_read = ev.IO.new(function()
s_io_idle:start(loop)
s_io_read:stop(loop)
end, s:getopt(zmq.FD), ev.READ)
self.s_io_idle = s_io_idle
self.s_io_read = s_io_read
return self
local s = ctx:socket(zmq.SUB)
local self = { loop = loop, socket = s, msg_cb = msg_cb }
setmetatable(self, sub_worker_mt)
-- create ev callbacks for recving data.
-- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered
local s_io_idle
local s_io_read
s_io_idle = ev.Idle.new(function()
local msg, err = s:recv(zmq.NOBLOCK)
if err == 'timeout' then
-- need to block on read IO
s_io_idle:stop(loop)
s_io_read:start(loop)
return
end
self:msg_cb(msg)
end)
s_io_idle:start(loop)
s_io_read = ev.IO.new(function()
s_io_idle:start(loop)
s_io_read:stop(loop)
end, s:getopt(zmq.FD), ev.READ)
self.s_io_idle = s_io_idle
self.s_io_read = s_io_read
return self
end
local ctx = zmq.init(1)
-- message handling function.
local function handle_msg(worker, msg)
local msg_id = tonumber(msg)
if math.mod(msg_id, 10000) == 0 then print(worker.id, msg_id) end
local msg_id = tonumber(msg)
if math.mod(msg_id, 10000) == 0 then print(worker.id, msg_id) end
end
local sub1 = sub_worker(loop, ctx, handle_msg)

@ -18,15 +18,15 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
require("zmq")
local zmq = require"zmq"
local ctx = zmq.init(1)
local ctx = zmq.init()
local s = ctx:socket(zmq.PUB)
s:bind("tcp://lo:5555")
local msg_id = 1
while true do
s:send(tostring(msg_id))
msg_id = msg_id + 1
s:send(tostring(msg_id))
msg_id = msg_id + 1
end

@ -18,14 +18,14 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
require("zmq")
local zmq = require"zmq"
local ctx = zmq.init(1)
local ctx = zmq.init()
local s = ctx:socket(zmq.REP)
s:bind("tcp://lo:5555")
while true do
print(string.format("Received query: '%s'", s:recv()))
s:send("OK")
print(string.format("Received query: '%s'", s:recv()))
s:send("OK")
end

@ -18,18 +18,18 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
require("zmq")
local zmq = require"zmq"
local ctx = zmq.init(1)
local ctx = zmq.init()
local s = ctx:socket(zmq.REP)
s:bind("tcp://lo:5555")
while true do
local query = s:recv()
while s:getopt(zmq.RCVMORE) == 1 do
query = query .. s:recv()
end
print(string.format("Received query: '%s'", query))
s:send("OK")
local query = s:recv()
while s:getopt(zmq.RCVMORE) == 1 do
query = query .. s:recv()
end
print(string.format("Received query: '%s'", query))
s:send("OK")
end

@ -18,14 +18,14 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
require("zmq")
local zmq = require"zmq"
local ctx = zmq.init(1)
local ctx = zmq.init()
local s = ctx:socket(zmq.SUB)
s:setopt(zmq.SUBSCRIBE, "")
s:connect("tcp://localhost:5555")
while true do
local msg = s:recv()
local msg_id = tonumber(msg)
if math.mod(msg_id, 10000) == 0 then print(msg_id) end
local msg = s:recv()
local msg_id = tonumber(msg)
if math.mod(msg_id, 10000) == 0 then print(msg_id) end
end

Loading…
Cancel
Save