Add send_msg()/recv_msg() functions and zmq_msg_t object. Update FFI-based bindings to use latest LuaNativeObjects FFI support.

pull/10/head
Robert G. Jakabosky 15 years ago
parent ed462fe9b8
commit d0a71ccbae

@ -38,12 +38,12 @@ local ctx = zmq.init(1)
local s = ctx:socket(zmq.REP) local s = ctx:socket(zmq.REP)
s:bind(bind_to) s:bind(bind_to)
local msg local msg = zmq.zmq_msg_t()
for i = 1, roundtrip_count do for i = 1, roundtrip_count do
msg = s:recv() assert(s:recv_msg(msg))
assert(#msg == message_size, "Invalid message size") assert(msg:size() == message_size, "Invalid message size")
s:send(msg) assert(s:send_msg(msg))
end end
s:close() s:close()

@ -42,13 +42,15 @@ local s = ctx:socket(zmq.SUB)
s:setopt(zmq.SUBSCRIBE, ""); s:setopt(zmq.SUBSCRIBE, "");
s:bind(bind_to) s:bind(bind_to)
local msg = s:recv() local msg
msg = zmq.zmq_msg_t()
assert(s:recv_msg(msg))
local start_time = time() local start_time = time()
for i = 1, message_count - 1 do for i = 1, message_count - 1 do
msg = s:recv() assert(s:recv_msg(msg))
assert(#msg == message_size, "Invalid message size") assert(msg:size() == message_size, "Invalid message size")
end end
local end_time = time() local end_time = time()

@ -41,15 +41,15 @@ local ctx = zmq.init(1)
local s = ctx:socket(zmq.REQ) local s = ctx:socket(zmq.REQ)
s:connect(connect_to) s:connect(connect_to)
local msg = "" local data = ("0"):rep(message_size)
for i = 1, message_size do msg = msg .. "0" end local msg = zmq.zmq_msg_t.init_size(message_size)
local start_time = time() local start_time = time()
for i = 1, roundtrip_count do for i = 1, roundtrip_count do
s:send(msg) assert(s:send_msg(msg))
msg = s:recv() assert(s:recv_msg(msg))
assert(#msg == message_size, "Invalid message size") assert(msg:size() == message_size, "Invalid message size")
end end
local end_time = time() local end_time = time()

@ -38,11 +38,12 @@ local ctx = zmq.init(1)
local s = ctx:socket(zmq.PUB) local s = ctx:socket(zmq.PUB)
s:connect(connect_to) s:connect(connect_to)
local msg = "" local data = ("0"):rep(message_size)
for i = 1, message_size do msg = msg .. "0" end local msg = zmq.zmq_msg_t.init_size(message_size)
for i = 1, message_count do for i = 1, message_count do
s:send(msg) msg:set_data(data)
assert(s:send_msg(msg))
end end
--os.execute("sleep " .. 10) --os.execute("sleep " .. 10)

@ -20,12 +20,16 @@
object "ZMQ_Ctx" { object "ZMQ_Ctx" {
error_on_null = "get_zmq_strerror()", error_on_null = "get_zmq_strerror()",
ffi_cdef [[
typedef void * ZMQ_Ctx;
]],
c_source [[ c_source [[
typedef void * ZMQ_Ctx; typedef void * ZMQ_Ctx;
]], ]],
destructor { destructor {
c_source[[ c_source[[
if(${this}_flags & OBJ_UDATA_CTX_SHOULD_FREE) { if(${this_flags} & OBJ_UDATA_CTX_SHOULD_FREE) {
zmq_term(${this}); zmq_term(${this});
} }
]] ]]

@ -19,13 +19,26 @@
-- THE SOFTWARE. -- THE SOFTWARE.
-- Convert ZMQ Error codes into strings. -- Convert ZMQ Error codes into strings.
--
-- This is an error code wrapper object, it converts C-style 'int' return error code
-- into Lua-style 'nil, "Error message"' return values.
--
error_code "ZMQ_Error" "int" { error_code "ZMQ_Error" "int" {
ffi_cdef[[
typedef int ZMQ_Error;
]],
is_error_check = function(rec) return "(0 != ${" .. rec.name .. "})" end, is_error_check = function(rec) return "(0 != ${" .. rec.name .. "})" end,
ffi_is_error_check = function(rec) return "(0 ~= ${" .. rec.name .. "})" end,
default = "0", default = "0",
c_source [[ c_source [[
if(err != 0) { if(err != 0) {
err_str = get_zmq_strerror(); err_str = get_zmq_strerror();
} }
]],
ffi_source [[
if(0 ~= err) then
err_str = get_zmq_strerror();
end
]], ]],
} }

@ -0,0 +1,160 @@
-- Copyright (c) 2010 by Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
object "zmq_msg_t" {
-- store the `zmq_msg_t` structure in Lua userdata object
userdata_type = "embed",
--
-- Define zmq_msq_t type & function API for FFI
--
ffi_cdef[[
typedef struct zmq_msg_t
{
void *content;
unsigned char flags;
unsigned char vsm_size;
unsigned char vsm_data [30]; /* that '30' is from 'MAX_VSM_SIZE' */
} zmq_msg_t;
typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init (zmq_msg_t *msg);
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
]],
constructor "init" {
var_out{ "ZMQ_Error", "err" },
c_source[[
zmq_msg_t tmp;
${this} = &tmp;
${err} = zmq_msg_init(${this});
]],
},
constructor "init_size" {
var_in{ "size_t", "size" },
var_out{ "ZMQ_Error", "err" },
c_source[[
zmq_msg_t tmp;
${this} = &tmp;
${err} = zmq_msg_init_size(${this}, ${size});
]],
},
constructor "init_data" {
var_in{ "const char *", "data" },
var_out{ "ZMQ_Error", "err" },
c_source[[
zmq_msg_t tmp;
${this} = &tmp;
${err} = zmq_msg_init_size(${this}, ${data_len});
if(0 == ${err}) {
/* fill message */
memcpy(zmq_msg_data(${this}), ${data}, ${data_len});
}
]],
},
destructor {
c_method_call "ZMQ_Error" "zmq_msg_close" {}
},
method "close" {
c_method_call "ZMQ_Error" "zmq_msg_close" {}
},
method "move" {
c_method_call "ZMQ_Error" "zmq_msg_move" { "zmq_msg_t *", "src" }
},
method "copy" {
c_method_call "ZMQ_Error" "zmq_msg_copy" { "zmq_msg_t *", "src" }
},
method "set_data" {
var_in{ "const char *", "data" },
var_out{ "ZMQ_Error", "err" },
c_source[[
/* check message data size. */
if(zmq_msg_size(${this}) != ${data_len}) {
/* need to resize message. */
zmq_msg_close(${this}); /* close old message, to free old data. */
${err} = zmq_msg_init_size(${this}, ${data_len}); /* re-initialize message. */
if(0 != ${err}) {
luaL_error(L, "set_data() failed: %s", get_zmq_strerror());
}
}
/* copy data into message */
memcpy(zmq_msg_data(${this}), ${data}, ${data_len});
]],
ffi_source[[
-- check message data size.
if (C.zmq_msg_size(${this}) ~= ${data_len}) then
-- need to resize message.
C.zmq_msg_close(${this}); -- close old message, to free old data.
${err} = C.zmq_msg_init_size(${this}, ${data_len}); -- re-initialize message.
if (0 ~= ${err}) then
error("set_data() failed: " .. get_zmq_strerror());
end
end
-- copy data into message
ffi.copy(C.zmq_msg_data(${this}), ${data}, ${data_len});
]],
},
method "data" {
c_method_call "void *" "zmq_msg_data" {}
},
method "set_size" {
var_in{ "size_t", "size" },
var_out{ "ZMQ_Error", "err" },
c_source[[
/* check message data size. */
if(zmq_msg_size(${this}) != ${size}) {
/* need to resize message. */
zmq_msg_close(${this}); /* close old message, to free old data. */
${err} = zmq_msg_init_size(${this}, ${size}); /* re-initialize message. */
if(0 != ${err}) {
luaL_error(L, "set_size() failed: %s", get_zmq_strerror());
}
}
]],
ffi_source[[
-- check message data size.
if (C.zmq_msg_size(${this}) ~= ${size}) then
-- need to resize message.
C.zmq_msg_close(${this}); -- close old message, to free old data.
${err} = C.zmq_msg_init_size(${this}, ${size}); -- re-initialize message.
if (0 ~= ${err}) then
error("set_size() failed: " .. get_zmq_strerror());
end
end
]],
},
method "size" {
c_method_call "size_t" "zmq_msg_size" {}
},
method "__tostring" {
var_out{ "const char *", "data", has_length = true },
c_source[[
${data} = zmq_msg_data(${this});
${data_len} = zmq_msg_size(${this});
]],
ffi_source[[
${data} = zmq_msg_data(${this});
${data_len} = zmq_msg_size(${this});
]],
},
}

File diff suppressed because it is too large Load Diff

@ -20,6 +20,9 @@
object "ZMQ_Socket" { object "ZMQ_Socket" {
error_on_null = "get_zmq_strerror()", error_on_null = "get_zmq_strerror()",
ffi_cdef[[
typedef void * ZMQ_Socket;
]],
c_source [[ c_source [[
/* detect zmq version >= 2.1.0 */ /* detect zmq version >= 2.1.0 */
#define VERSION_2_1 0 #define VERSION_2_1 0
@ -87,6 +90,10 @@ static const int opt_types[] = {
method "connect" { method "connect" {
c_method_call "ZMQ_Error" "zmq_connect" { "const char *", "addr" } c_method_call "ZMQ_Error" "zmq_connect" { "const char *", "addr" }
}, },
ffi_cdef[[
int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
]],
method "setopt" { method "setopt" {
var_in{ "uint32_t", "opt" }, var_in{ "uint32_t", "opt" },
var_in{ "<any>", "val" }, var_in{ "<any>", "val" },
@ -221,34 +228,75 @@ static const int opt_types[] = {
lua_pushnil(L); lua_pushnil(L);
]] ]]
}, },
ffi_source[[
-- temp. values for 'events' function.
local events_tmp = ffi.new('uint32_t[1]', 0)
local events_tmp_size = ffi.sizeof('uint32_t')
local events_tmp_len = ffi.new('size_t[1]', events_tmp_size)
local ZMQ_EVENTS = _M.EVENTS
]],
method "events" { method "events" {
var_out{ "uint32_t", "events" }, var_out{ "uint32_t", "events" },
var_out{ "ZMQ_Error", "err" }, var_out{ "ZMQ_Error", "err" },
c_source[[ c_source[[
size_t val_len = sizeof(${events}); size_t val_len = sizeof(${events});
${err} = zmq_getsockopt(${this}, ZMQ_EVENTS, &(${events}), &val_len); ${err} = zmq_getsockopt(${this}, ZMQ_EVENTS, &(${events}), &val_len);
]] ]],
ffi_source[[
events_tmp_len[0] = events_tmp_size
${err} = C.zmq_getsockopt(${this}, ZMQ_EVENTS, events_tmp, events_tmp_len);
${events} = events_tmp[0]
]],
}, },
method "send" { --
var_in{ "const char *", "data" }, -- zmq_send
var_in{ "int", "flags?" }, --
var_out{ "ZMQ_Error", "err" }, method "send_msg" {
c_source[[ c_method_call "ZMQ_Error" "zmq_send" { "zmq_msg_t *", "msg", "int", "flags?" },
},
-- create helper function for `zmq_send`
c_source[[
static ZMQ_Error simple_zmq_send(ZMQ_Socket sock, const char *data, size_t data_len, int flags) {
ZMQ_Error err;
zmq_msg_t msg; zmq_msg_t msg;
/* initialize message */ /* initialize message */
${err} = zmq_msg_init_size(&msg, ${data}_len); err = zmq_msg_init_size(&msg, data_len);
if(0 == ${err}) { if(0 == err) {
/* fill message */ /* fill message */
memcpy(zmq_msg_data(&msg), ${data}, ${data}_len); memcpy(zmq_msg_data(&msg), data, data_len);
/* send message */ /* send message */
${err} = zmq_send(${this}, &msg, ${flags}); err = zmq_send(sock, &msg, flags);
/* close message */ /* close message */
zmq_msg_close(&msg); zmq_msg_close(&msg);
} }
]] return err;
}
]],
-- export helper function.
ffi_export_function "ZMQ_Error" "simple_zmq_send"
"(ZMQ_Socket sock, const char *data, size_t data_len, int flags)",
method "send" {
var_in{ "const char *", "data" },
var_in{ "int", "flags?" },
var_out{ "ZMQ_Error", "err" },
c_source[[
${err} = simple_zmq_send(${this}, ${data}, ${data_len}, ${flags});
]],
ffi_source[[
${err} = simple_zmq_send(${this}, ${data}, ${data_len}, ${flags});
]],
}, },
--
-- zmq_recv
--
method "recv_msg" {
c_method_call "ZMQ_Error" "zmq_recv" { "zmq_msg_t *", "msg", "int", "flags?" },
},
ffi_source[[
local tmp_msg = ffi.new('zmq_msg_t')
]],
method "recv" { method "recv" {
var_in{ "int", "flags", is_optional = true }, var_in{ "int", "flags?" },
var_out{ "const char *", "data", has_length = true }, var_out{ "const char *", "data", has_length = true },
var_out{ "ZMQ_Error", "err" }, var_out{ "ZMQ_Error", "err" },
c_source[[ c_source[[
@ -260,14 +308,34 @@ static const int opt_types[] = {
${err} = zmq_recv(${this}, &msg, ${flags}); ${err} = zmq_recv(${this}, &msg, ${flags});
if(0 == ${err}) { if(0 == ${err}) {
${data} = zmq_msg_data(&msg); ${data} = zmq_msg_data(&msg);
${data}_len = zmq_msg_size(&msg); ${data_len} = zmq_msg_size(&msg);
} }
} }
]], ]],
c_source "post" [[ c_source "post" [[
/* close message */ /* close message */
zmq_msg_close(&msg); zmq_msg_close(&msg);
]] ]],
ffi_source[[
local msg = tmp_msg
-- initialize blank message.
if C.zmq_msg_init(msg) < 0 then
return nil, get_zmq_strerror()
end
-- receive message
${err} = C.zmq_recv(${this}, msg, ${flags})
if 0 == ${err} then
local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg))
-- close message
C.zmq_msg_close(msg)
return data
end
]],
ffi_source "ffi_post" [[
-- close message
C.zmq_msg_close(msg)
]],
}, },
} }

@ -1,347 +0,0 @@
-- Copyright (c) 2010 by Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = ...
-- try loading luajit's ffi
local stat, ffi=pcall(require,"ffi")
if not stat then
print("No FFI module: ZMQ using standard Lua api interface.")
return
end
-- check if ffi is disabled.
if disable_ffi then
print("FFI disabled: ZMQ using standard Lua api interface.")
return
end
local setmetatable = setmetatable
local getmetatable = getmetatable
local print = print
local pairs = pairs
local error = error
local type = type
local assert = assert
local tostring = tostring
local tonumber = tonumber
local newproxy = newproxy
ffi.cdef[[
void zmq_version (int *major, int *minor, int *patch);
int zmq_errno ();
const char *zmq_strerror (int errnum);
typedef struct zmq_msg_t
{
void *content;
unsigned char flags;
unsigned char vsm_size;
unsigned char vsm_data [30];
} zmq_msg_t;
typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init (zmq_msg_t *msg);
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
int zmq_msg_close (zmq_msg_t *msg);
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
void *zmq_msg_data (zmq_msg_t *msg);
size_t zmq_msg_size (zmq_msg_t *msg);
void *zmq_init (int io_threads);
int zmq_term (void *context);
void *zmq_socket (void *context, int type);
int zmq_close (void *s);
int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
int zmq_bind (void *s, const char *addr);
int zmq_connect (void *s, const char *addr);
int zmq_send (void *s, zmq_msg_t *msg, int flags);
int zmq_recv (void *s, zmq_msg_t *msg, int flags);
int zmq_device (int device, void * insocket, void* outsocket);
]]
local C = ffi.load"zmq"
-- simulate: module(...)
zmq._M = zmq
setfenv(1, zmq)
function version()
local major = ffi.new('int[1]',0)
local minor = ffi.new('int[1]',0)
local patch = ffi.new('int[1]',0)
C.zmq_version(major, minor, patch)
return {major[0], minor[0], patch[0]}
end
local function zmq_error()
local errno = C.zmq_errno()
local err = ffi.string(C.zmq_strerror(errno))
if err == "Resource temporarily unavailable" then err = "timeout" end
if err == "Context was terminated" then err = "closed" end
return nil, err
end
--
-- ZMQ socket
--
local sock_mt = {}
sock_mt.__index = sock_mt
function sock_mt:close()
-- get the true self
self=getmetatable(self)
local sock = self.sock
-- make sure socket is still valid.
if not sock then return end
-- mark this socket as closed.
self.sock = nil
-- close zmq socket.
if C.zmq_close(sock) ~= 0 then
return zmq_error()
end
return true
end
local function sock__gc(self)
self:close()
end
local option_types = {
[zmq.HWM] = 'uint64_t[1]',
[zmq.SWAP] = 'int64_t[1]',
[zmq.AFFINITY] = 'uint64_t[1]',
[zmq.IDENTITY] = 'string',
[zmq.SUBSCRIBE] = 'string',
[zmq.UNSUBSCRIBE] = 'string',
[zmq.RATE] = 'int64_t[1]',
[zmq.RECOVERY_IVL] = 'int64_t[1]',
[zmq.MCAST_LOOP] = 'int64_t[1]',
[zmq.SNDBUF] = 'uint64_t[1]',
[zmq.RCVBUF] = 'uint64_t[1]',
[zmq.RCVMORE] = 'int64_t[1]',
[zmq.FD] = 'int[1]',
[zmq.EVENTS] = 'uint32_t[1]',
[zmq.TYPE] = 'int[1]',
[zmq.LINGER] = 'int[1]',
[zmq.RECONNECT_IVL] = 'int[1]',
[zmq.BACKLOG] = 'int[1]',
}
local option_len = {}
local option_tmps = {}
for k,v in pairs(option_types) do
if v ~= 'string' then
option_len[k] = ffi.sizeof(v)
option_tmps[k] = ffi.new(v, 0)
end
end
function sock_mt:setopt(opt, opt_val)
local ctype = option_types[opt]
local val_len = 0
if ctype == 'string' then
--val = ffi.cast('void *', tostring(val))
val = tostring(opt_val)
val_len = #val
else
val = option_tmps[opt]
val[0] = opt_val
val_len = option_len[opt]
end
local ret = C.zmq_setsockopt(self.sock, opt, val, val_len)
if ret ~= 0 then
return zmq_error()
end
return true
end
local tmp_val_len = ffi.new('size_t[1]', 4)
function sock_mt:getopt(opt)
local ctype = option_types[opt]
local val
local val_len = tmp_val_len
if ctype == 'string' then
val_len[0] = 255
val = ffi.new('uint8_t[?]', val_len[0])
ffi.fill(val, val_len[0])
else
val = option_tmps[opt]
val[0] = 0
val_len[0] = option_len[opt]
end
local ret = C.zmq_getsockopt(self.sock, opt, val, val_len)
if ret ~= 0 then
return zmq_error()
end
if ctype == 'string' then
val_len = val_len[0]
return ffi.string(val, val_len)
else
val = val[0]
end
return tonumber(val)
end
local tmp32 = ffi.new('uint32_t[1]', 0)
local tmp32_len = ffi.new('size_t[1]', 4)
function sock_mt:events()
local val = tmp32
local val_len = tmp32_len
val[0] = 0
val_len[0] = 4
local ret = C.zmq_getsockopt(self.sock, 15, val, val_len)
if ret ~= 0 then
return zmq_error()
end
return val[0]
end
function sock_mt:bind(addr)
local ret = C.zmq_bind(self.sock, addr)
if ret ~= 0 then
return zmq_error()
end
return true
end
function sock_mt:connect(addr)
local ret = C.zmq_connect(self.sock, addr)
if ret ~= 0 then
return zmq_error()
end
return true
end
local tmp_msg = ffi.new('zmq_msg_t')
function sock_mt:send(data, flags)
local msg = tmp_msg
local msg_len = #data
-- initialize message
if C.zmq_msg_init_size(msg, msg_len) < 0 then
return zmq_error()
end
-- copy data into message.
ffi.copy(C.zmq_msg_data(msg), data, msg_len)
-- send message
local ret = C.zmq_send(self.sock, msg, flags or 0)
-- close message before processing return code
C.zmq_msg_close(msg)
-- now process send return code
if ret ~= 0 then
return zmq_error()
end
return true
end
function sock_mt:recv(flags)
local msg = tmp_msg
-- initialize blank message.
if C.zmq_msg_init(msg) < 0 then
return zmq_error()
end
-- receive message
local ret = C.zmq_recv(self.sock, msg, flags or 0)
if ret ~= 0 then
local data, err = zmq_error()
C.zmq_msg_close(msg)
return data, err
end
local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg))
-- close message
C.zmq_msg_close(msg)
return data
end
--
-- ZMQ context
--
local ctx_mt = {}
ctx_mt.__index = ctx_mt
function ctx_mt:term()
-- get the true self
self=getmetatable(self)
local ctx = self.ctx
self.ctx = nil
-- make sure context is valid.
if not ctx then return end
if C.zmq_term(ctx) ~= 0 then
return zmq_error()
end
return true
end
function ctx_mt:lightuserdata()
return self.ctx
end
function ctx_mt:socket(sock_type)
local sock = C.zmq_socket(self.ctx, sock_type)
if not sock then
return zmq_error()
end
-- use a wrapper newproxy for __gc support
local self=newproxy(true)
local meta=getmetatable(self)
meta.__index = meta
meta.sock = sock
meta.__gc = sock__gc
setmetatable(meta, sock_mt)
return self
end
local function ctx__gc(self)
if self.should_free then
self:term()
end
end
function init(io_threads)
local should_free = true
local ctx
print("ZMQ using FFI interface.")
if type(io_threads) == 'number' then
ctx = C.zmq_init(io_threads)
if not ctx then
return zmq_error()
end
else
should_free = false
-- should be lightuserdata or cdata<void *>
ctx = io_threads
end
-- use a wrapper newproxy for __gc support
local self=newproxy(true)
local meta=getmetatable(self)
meta.__index = meta
meta.ctx = ctx
meta.should_free = should_free
meta.__gc = ctx__gc
setmetatable(meta, ctx_mt)
return self
end

@ -1,4 +1,5 @@
-- make generated variable nicer.
set_variable_format "%s" set_variable_format "%s"
c_module "zmq" { c_module "zmq" {
@ -10,9 +11,20 @@ luajit_ffi = true,
sys_include "string.h", sys_include "string.h",
include "zmq.h", include "zmq.h",
ffi_load "zmq",
c_source[[ c_source[[
#define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1) #define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1)
]],
ffi_source[[
local OBJ_UDATA_CTX_SHOULD_FREE = (OBJ_UDATA_LAST_FLAG * 2)
]],
c_source[[
/*
* This wrapper function is to make the EAGAIN/ETERM error messages more like
* what is returned by LuaSocket.
*/
static const char *get_zmq_strerror() { static const char *get_zmq_strerror() {
int err = zmq_errno(); int err = zmq_errno();
switch(err) { switch(err) {
@ -30,6 +42,16 @@ static const char *get_zmq_strerror() {
]], ]],
-- export helper function 'get_zmq_strerror' to FFI code.
ffi_export_function "const char *" "get_zmq_strerror" "()",
ffi_source[[
local C_get_zmq_strerror = get_zmq_strerror
-- make nicer wrapper for exported error function.
local function get_zmq_strerror()
return ffi.string(C_get_zmq_strerror())
end
]],
-- --
-- Module constants -- Module constants
-- --
@ -90,6 +112,14 @@ FORWARDER = 2,
QUEUE = 3, QUEUE = 3,
}, },
subfiles {
"src/error.nobj.lua",
"src/msg.nobj.lua",
"src/socket.nobj.lua",
"src/ctx.nobj.lua",
},
-- --
-- Module static functions -- Module static functions
-- --
@ -107,41 +137,44 @@ c_function "version" {
lua_rawseti(L, -2, 2); lua_rawseti(L, -2, 2);
lua_pushinteger(L, patch); lua_pushinteger(L, patch);
lua_rawseti(L, -2, 3); lua_rawseti(L, -2, 3);
]] ]],
}, },
c_function "init" { c_function "init" {
var_in{ "<any>", "io_threads" }, c_call "!ZMQ_Ctx" "zmq_init" { "int", "io_threads" },
},
c_function "init_ctx" {
var_in{ "<any>", "ptr" },
var_out{ "ZMQ_Ctx", "!ctx" }, var_out{ "ZMQ_Ctx", "!ctx" },
c_source[[ c_source[[
if(lua_isnumber(L, ${io_threads::idx})) { if(lua_isuserdata(L, ${ptr::idx})) {
${ctx} = zmq_init(lua_tointeger(L,${io_threads::idx})); ${ctx} = lua_touserdata(L, ${ptr::idx});
${ctx}_flags |= OBJ_UDATA_CTX_SHOULD_FREE;
} else if(lua_isuserdata(L, ${io_threads::idx})) {
${ctx} = lua_touserdata(L, ${io_threads::idx});
} else { } else {
/* check if value is a LuaJIT 'cdata' */ /* check if value is a LuaJIT 'cdata' */
int type = lua_type(L, ${io_threads::idx}); int type = lua_type(L, ${ptr::idx});
const char *typename = lua_typename(L, type); const char *typename = lua_typename(L, type);
if(strncmp(typename, "cdata", sizeof("cdata")) == 0) { if(strncmp(typename, "cdata", sizeof("cdata")) == 0) {
${ctx} = (void *)lua_topointer(L, ${io_threads::idx}); ${ctx} = (void *)lua_topointer(L, ${ptr::idx});
} else { } else {
return luaL_argerror(L, ${io_threads::idx}, "(expected number)"); return luaL_argerror(L, ${ptr::idx}, "(expected userdata)");
} }
} }
]] ]],
}, },
c_function "device" { c_function "device" {
c_call "ZMQ_Error" "zmq_device" c_call "ZMQ_Error" "zmq_device"
{ "int", "device", "ZMQ_Socket", "insock", "ZMQ_Socket", "outsock" }, { "int", "device", "ZMQ_Socket", "insock", "ZMQ_Socket", "outsock" },
}, },
ffi_files { --
"src/zmq_ffi.nobj.lua", -- This dump function is for getting a copy of the FFI-based bindings code and is
}, -- only for debugging.
subfiles { --
"src/error.nobj.lua", c_function "dump_ffi" {
"src/ctx.nobj.lua", var_out{ "const char *", "ffi_code", has_length = true, },
"src/socket.nobj.lua", c_source[[
${ffi_code} = ${module_c_name}_ffi_lua_code;
${ffi_code_len} = sizeof(${module_c_name}_ffi_lua_code) - 1;
]],
}, },
} }

@ -1,347 +0,0 @@
--[[
--
-- This is just an normal LuaJIT2 FFI based bindings for zmq.
-- It is only for testing and comparison.
--
--]]
local setmetatable = setmetatable
local print = print
local pairs = pairs
local error = error
local type = type
local assert = assert
local tostring = tostring
local tonumber = tonumber
local zmq = {
MAX_VSM_SIZE = 30,
-- message types
DELIMITER = 31,
VSM = 32,
-- message flags
MSG_MORE = 1,
MSG_SHARED = 128,
-- socket types
PAIR = 0,
PUB = 1,
SUB = 2,
REQ = 3,
REP = 4,
XREQ = 5,
XREP = 6,
PULL = 7,
PUSH = 8,
-- socket options
HWM = 1,
SWAP = 3,
AFFINITY = 4,
IDENTITY = 5,
SUBSCRIBE = 6,
UNSUBSCRIBE = 7,
RATE = 8,
RECOVERY_IVL = 9,
MCAST_LOOP = 10,
SNDBUF = 11,
RCVBUF = 12,
RCVMORE = 13,
FD = 14,
EVENTS = 15,
TYPE = 16,
LINGER = 17,
RECONNECT_IVL = 18,
BACKLOG = 19,
-- send/recv flags
NOBLOCK = 1,
SNDMORE = 2,
-- poll events
POLLIN = 1,
POLLOUT = 2,
POLLERR = 4,
-- devices
STREAMER = 1,
FORWARDER = 2,
QUEUE = 3,
}
local z_SUBSCRIBE = zmq.SUBSCRIBE
local z_UNSUBSCRIBE = zmq.UNSUBSCRIBE
local z_IDENTITY = zmq.IDENTITY
local z_NOBLOCK = zmq.NOBLOCK
local z_RCVMORE = zmq.RCVMORE
local z_SNDMORE = zmq.SNDMORE
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local ffi=require"ffi"
ffi.cdef[[
void zmq_version (int *major, int *minor, int *patch);
int zmq_errno ();
const char *zmq_strerror (int errnum);
typedef struct zmq_msg_t
{
void *content;
unsigned char flags;
unsigned char vsm_size;
unsigned char vsm_data [30];
} zmq_msg_t;
typedef void (zmq_free_fn) (void *data, void *hint);
int zmq_msg_init (zmq_msg_t *msg);
int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);
int zmq_msg_close (zmq_msg_t *msg);
int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);
void *zmq_msg_data (zmq_msg_t *msg);
size_t zmq_msg_size (zmq_msg_t *msg);
void *zmq_init (int io_threads);
int zmq_term (void *context);
void *zmq_socket (void *context, int type);
int zmq_close (void *s);
int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
int zmq_bind (void *s, const char *addr);
int zmq_connect (void *s, const char *addr);
int zmq_send (void *s, zmq_msg_t *msg, int flags);
int zmq_recv (void *s, zmq_msg_t *msg, int flags);
int zmq_device (int device, void * insocket, void* outsocket);
]]
local c_zmq = ffi.load"zmq"
module(...)
-- copy constants
for k,v in pairs(zmq) do
-- only copy upper-case string values.
if type(k) == 'string' and k == k:upper() then
_M[k] = v
end
end
function version()
local major = ffi.new('int[1]',0)
local minor = ffi.new('int[1]',0)
local patch = ffi.new('int[1]',0)
c_zmq.zmq_version(major, minor, patch)
return {major[0], minor[0], patch[0]}
end
local function zmq_error()
local errno = c_zmq.zmq_errno()
local err = ffi.string(c_zmq.zmq_strerror(errno))
if err == "Resource temporarily unavailable" then err = "timeout" end
if err == "Context was terminated" then err = "closed" end
return nil, err
end
--
-- ZMQ socket
--
local sock_mt = {}
sock_mt.__index = sock_mt
local function new_socket(ctx, sock_type)
local sock = c_zmq.zmq_socket(ctx, sock_type)
if not sock then
return zmq_error()
end
return setmetatable({ sock = sock }, sock_mt)
end
function sock_mt:close()
local ret = c_zmq.zmq_close(self.sock)
self.sock = nil
if ret ~= 0 then
return zmq_error()
end
return true
end
local option_types = {
[zmq.HWM] = 'uint64_t[1]',
[zmq.SWAP] = 'int64_t[1]',
[zmq.AFFINITY] = 'uint64_t[1]',
[zmq.IDENTITY] = 'string',
[zmq.SUBSCRIBE] = 'string',
[zmq.UNSUBSCRIBE] = 'string',
[zmq.RATE] = 'int64_t[1]',
[zmq.RECOVERY_IVL] = 'int64_t[1]',
[zmq.MCAST_LOOP] = 'int64_t[1]',
[zmq.SNDBUF] = 'uint64_t[1]',
[zmq.RCVBUF] = 'uint64_t[1]',
[zmq.RCVMORE] = 'int64_t[1]',
[zmq.FD] = 'int[1]',
[zmq.EVENTS] = 'uint32_t[1]',
[zmq.TYPE] = 'int[1]',
[zmq.LINGER] = 'int[1]',
[zmq.RECONNECT_IVL] = 'int[1]',
[zmq.BACKLOG] = 'int[1]',
}
local option_len = {}
local option_tmps = {}
for k,v in pairs(option_types) do
if v ~= 'string' then
option_len[k] = ffi.sizeof(v)
option_tmps[k] = ffi.new(v, 0)
end
end
function sock_mt:setopt(opt, opt_val)
local ctype = option_types[opt]
local val_len = 0
if ctype == 'string' then
--val = ffi.cast('void *', tostring(val))
val = tostring(opt_val)
val_len = #val
else
val = option_tmps[opt]
val[0] = opt_val
val_len = option_len[opt]
end
local ret = c_zmq.zmq_setsockopt(self.sock, opt, val, val_len)
if ret ~= 0 then
return zmq_error()
end
return true
end
local tmp_val_len = ffi.new('size_t[1]', 4)
function sock_mt:getopt(opt)
local ctype = option_types[opt]
local val
local val_len = tmp_val_len
if ctype == 'string' then
val_len[0] = 255
val = ffi.new('uint8_t[?]', val_len[0])
ffi.fill(val, val_len[0])
else
val = option_tmps[opt]
val[0] = 0
val_len[0] = option_len[opt]
end
local ret = c_zmq.zmq_getsockopt(self.sock, opt, val, val_len)
if ret ~= 0 then
return zmq_error()
end
if ctype == 'string' then
val_len = val_len[0]
return ffi.string(val, val_len)
else
val = val[0]
end
return tonumber(val)
end
local tmp32 = ffi.new('uint32_t[1]', 0)
local tmp32_len = ffi.new('size_t[1]', 4)
function sock_mt:events()
local val = tmp32
local val_len = tmp32_len
val[0] = 0
val_len[0] = 4
local ret = c_zmq.zmq_getsockopt(self.sock, 15, val, val_len)
if ret ~= 0 then
return zmq_error()
end
return val[0]
end
function sock_mt:bind(addr)
local ret = c_zmq.zmq_bind(self.sock, addr)
if ret ~= 0 then
return zmq_error()
end
return true
end
function sock_mt:connect(addr)
local ret = c_zmq.zmq_connect(self.sock, addr)
if ret ~= 0 then
return zmq_error()
end
return true
end
local tmp_msg = ffi.new('zmq_msg_t')
function sock_mt:send(data, flags)
local msg = tmp_msg
local msg_len = #data
-- initialize message
if c_zmq.zmq_msg_init_size(msg, msg_len) < 0 then
return zmq_error()
end
-- copy data into message.
ffi.copy(c_zmq.zmq_msg_data(msg), data, msg_len)
-- send message
local ret = c_zmq.zmq_send(self.sock, msg, flags or 0)
-- close message before processing return code
c_zmq.zmq_msg_close(msg)
-- now process send return code
if ret ~= 0 then
return zmq_error()
end
return true
end
function sock_mt:recv(flags)
local msg = tmp_msg
-- initialize blank message.
if c_zmq.zmq_msg_init(msg) < 0 then
return zmq_error()
end
-- receive message
local ret = c_zmq.zmq_recv(self.sock, msg, flags or 0)
if ret ~= 0 then
local data, err = zmq_error()
c_zmq.zmq_msg_close(msg)
return data, err
end
local data = ffi.string(c_zmq.zmq_msg_data(msg), c_zmq.zmq_msg_size(msg))
-- close message
c_zmq.zmq_msg_close(msg)
return data
end
--
-- ZMQ context
--
local ctx_mt = {}
ctx_mt.__index = ctx_mt
function ctx_mt:term()
if c_zmq.zmq_term(self.ctx) ~= 0 then
return zmq_error()
end
return true
end
function ctx_mt:socket(sock_type)
return new_socket(self.ctx, sock_type)
end
function init(io_threads)
local ctx = c_zmq.zmq_init(io_threads)
if not ctx then
return zmq_error()
end
return setmetatable({ ctx = ctx }, ctx_mt)
end
Loading…
Cancel
Save