diff --git a/perf/local_lat.lua b/perf/local_lat.lua index 09c0ee1..c10da41 100644 --- a/perf/local_lat.lua +++ b/perf/local_lat.lua @@ -38,12 +38,12 @@ local ctx = zmq.init(1) local s = ctx:socket(zmq.REP) s:bind(bind_to) -local msg +local msg = zmq.zmq_msg_t() for i = 1, roundtrip_count do - msg = s:recv() - assert(#msg == message_size, "Invalid message size") - s:send(msg) + assert(s:recv_msg(msg)) + assert(msg:size() == message_size, "Invalid message size") + assert(s:send_msg(msg)) end s:close() diff --git a/perf/local_thr.lua b/perf/local_thr.lua index 33debba..6f327b4 100644 --- a/perf/local_thr.lua +++ b/perf/local_thr.lua @@ -42,13 +42,15 @@ local s = ctx:socket(zmq.SUB) s:setopt(zmq.SUBSCRIBE, ""); s:bind(bind_to) -local msg = s:recv() +local msg +msg = zmq.zmq_msg_t() +assert(s:recv_msg(msg)) local start_time = time() for i = 1, message_count - 1 do - msg = s:recv() - assert(#msg == message_size, "Invalid message size") + assert(s:recv_msg(msg)) + assert(msg:size() == message_size, "Invalid message size") end local end_time = time() diff --git a/perf/remote_lat.lua b/perf/remote_lat.lua index 048f7be..7b78eb7 100644 --- a/perf/remote_lat.lua +++ b/perf/remote_lat.lua @@ -41,15 +41,15 @@ local ctx = zmq.init(1) local s = ctx:socket(zmq.REQ) s:connect(connect_to) -local msg = "" -for i = 1, message_size do msg = msg .. "0" end +local data = ("0"):rep(message_size) +local msg = zmq.zmq_msg_t.init_size(message_size) local start_time = time() for i = 1, roundtrip_count do - s:send(msg) - msg = s:recv() - assert(#msg == message_size, "Invalid message size") + assert(s:send_msg(msg)) + assert(s:recv_msg(msg)) + assert(msg:size() == message_size, "Invalid message size") end local end_time = time() diff --git a/perf/remote_thr.lua b/perf/remote_thr.lua index b3cb877..c3763f5 100644 --- a/perf/remote_thr.lua +++ b/perf/remote_thr.lua @@ -38,11 +38,12 @@ local ctx = zmq.init(1) local s = ctx:socket(zmq.PUB) s:connect(connect_to) -local msg = "" -for i = 1, message_size do msg = msg .. "0" end +local data = ("0"):rep(message_size) +local msg = zmq.zmq_msg_t.init_size(message_size) for i = 1, message_count do - s:send(msg) + msg:set_data(data) + assert(s:send_msg(msg)) end --os.execute("sleep " .. 10) diff --git a/src/ctx.nobj.lua b/src/ctx.nobj.lua index 3a4d316..947f08f 100644 --- a/src/ctx.nobj.lua +++ b/src/ctx.nobj.lua @@ -20,12 +20,16 @@ object "ZMQ_Ctx" { error_on_null = "get_zmq_strerror()", + ffi_cdef [[ +typedef void * ZMQ_Ctx; +]], + c_source [[ typedef void * ZMQ_Ctx; ]], destructor { c_source[[ - if(${this}_flags & OBJ_UDATA_CTX_SHOULD_FREE) { + if(${this_flags} & OBJ_UDATA_CTX_SHOULD_FREE) { zmq_term(${this}); } ]] diff --git a/src/error.nobj.lua b/src/error.nobj.lua index d56e226..4f143b5 100644 --- a/src/error.nobj.lua +++ b/src/error.nobj.lua @@ -19,13 +19,26 @@ -- THE SOFTWARE. -- Convert ZMQ Error codes into strings. +-- +-- This is an error code wrapper object, it converts C-style 'int' return error code +-- into Lua-style 'nil, "Error message"' return values. +-- error_code "ZMQ_Error" "int" { + ffi_cdef[[ +typedef int ZMQ_Error; +]], is_error_check = function(rec) return "(0 != ${" .. rec.name .. "})" end, + ffi_is_error_check = function(rec) return "(0 ~= ${" .. rec.name .. "})" end, default = "0", c_source [[ if(err != 0) { err_str = get_zmq_strerror(); } +]], + ffi_source [[ + if(0 ~= err) then + err_str = get_zmq_strerror(); + end ]], } diff --git a/src/msg.nobj.lua b/src/msg.nobj.lua new file mode 100644 index 0000000..55566b7 --- /dev/null +++ b/src/msg.nobj.lua @@ -0,0 +1,160 @@ +-- Copyright (c) 2010 by Robert G. Jakabosky +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +object "zmq_msg_t" { + -- store the `zmq_msg_t` structure in Lua userdata object + userdata_type = "embed", +-- +-- Define zmq_msq_t type & function API for FFI +-- + ffi_cdef[[ + +typedef struct zmq_msg_t +{ + void *content; + unsigned char flags; + unsigned char vsm_size; + unsigned char vsm_data [30]; /* that '30' is from 'MAX_VSM_SIZE' */ +} zmq_msg_t; + +typedef void (zmq_free_fn) (void *data, void *hint); + +int zmq_msg_init (zmq_msg_t *msg); +int zmq_msg_init_size (zmq_msg_t *msg, size_t size); +int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint); + +]], + constructor "init" { + var_out{ "ZMQ_Error", "err" }, + c_source[[ + zmq_msg_t tmp; + ${this} = &tmp; + ${err} = zmq_msg_init(${this}); +]], + }, + constructor "init_size" { + var_in{ "size_t", "size" }, + var_out{ "ZMQ_Error", "err" }, + c_source[[ + zmq_msg_t tmp; + ${this} = &tmp; + ${err} = zmq_msg_init_size(${this}, ${size}); +]], + }, + constructor "init_data" { + var_in{ "const char *", "data" }, + var_out{ "ZMQ_Error", "err" }, + c_source[[ + zmq_msg_t tmp; + ${this} = &tmp; + ${err} = zmq_msg_init_size(${this}, ${data_len}); + if(0 == ${err}) { + /* fill message */ + memcpy(zmq_msg_data(${this}), ${data}, ${data_len}); + } +]], + }, + destructor { + c_method_call "ZMQ_Error" "zmq_msg_close" {} + }, + method "close" { + c_method_call "ZMQ_Error" "zmq_msg_close" {} + }, + method "move" { + c_method_call "ZMQ_Error" "zmq_msg_move" { "zmq_msg_t *", "src" } + }, + method "copy" { + c_method_call "ZMQ_Error" "zmq_msg_copy" { "zmq_msg_t *", "src" } + }, + method "set_data" { + var_in{ "const char *", "data" }, + var_out{ "ZMQ_Error", "err" }, + c_source[[ + /* check message data size. */ + if(zmq_msg_size(${this}) != ${data_len}) { + /* need to resize message. */ + zmq_msg_close(${this}); /* close old message, to free old data. */ + ${err} = zmq_msg_init_size(${this}, ${data_len}); /* re-initialize message. */ + if(0 != ${err}) { + luaL_error(L, "set_data() failed: %s", get_zmq_strerror()); + } + } + /* copy data into message */ + memcpy(zmq_msg_data(${this}), ${data}, ${data_len}); +]], + ffi_source[[ + -- check message data size. + if (C.zmq_msg_size(${this}) ~= ${data_len}) then + -- need to resize message. + C.zmq_msg_close(${this}); -- close old message, to free old data. + ${err} = C.zmq_msg_init_size(${this}, ${data_len}); -- re-initialize message. + if (0 ~= ${err}) then + error("set_data() failed: " .. get_zmq_strerror()); + end + end + -- copy data into message + ffi.copy(C.zmq_msg_data(${this}), ${data}, ${data_len}); +]], + }, + method "data" { + c_method_call "void *" "zmq_msg_data" {} + }, + method "set_size" { + var_in{ "size_t", "size" }, + var_out{ "ZMQ_Error", "err" }, + c_source[[ + /* check message data size. */ + if(zmq_msg_size(${this}) != ${size}) { + /* need to resize message. */ + zmq_msg_close(${this}); /* close old message, to free old data. */ + ${err} = zmq_msg_init_size(${this}, ${size}); /* re-initialize message. */ + if(0 != ${err}) { + luaL_error(L, "set_size() failed: %s", get_zmq_strerror()); + } + } +]], + ffi_source[[ + -- check message data size. + if (C.zmq_msg_size(${this}) ~= ${size}) then + -- need to resize message. + C.zmq_msg_close(${this}); -- close old message, to free old data. + ${err} = C.zmq_msg_init_size(${this}, ${size}); -- re-initialize message. + if (0 ~= ${err}) then + error("set_size() failed: " .. get_zmq_strerror()); + end + end +]], + }, + method "size" { + c_method_call "size_t" "zmq_msg_size" {} + }, + method "__tostring" { + var_out{ "const char *", "data", has_length = true }, + c_source[[ + ${data} = zmq_msg_data(${this}); + ${data_len} = zmq_msg_size(${this}); +]], + ffi_source[[ + ${data} = zmq_msg_data(${this}); + ${data_len} = zmq_msg_size(${this}); +]], + }, +} + diff --git a/src/pre_generated-zmq.nobj.c b/src/pre_generated-zmq.nobj.c index 3facbf1..76b9c9d 100644 --- a/src/pre_generated-zmq.nobj.c +++ b/src/pre_generated-zmq.nobj.c @@ -22,14 +22,26 @@ -#include -#include #include #include #include +#ifdef _MSC_VER + +/* define some types that we need. */ +typedef __int32 int32_t; +typedef unsigned __int32 uint32_t; + +#define FUNC_UNUSED + +#else + +#include + #define FUNC_UNUSED __attribute__((unused)) +#endif + #if defined(__GNUC__) && (__GNUC__ >= 4) #define assert_obj_type(type, obj) \ assert(__builtin_types_compatible_p(typeof(obj), type *)) @@ -129,14 +141,21 @@ typedef struct obj_udata { /* use static pointer as key to weak userdata table. */ static char *obj_udata_weak_ref_key = "obj_udata_weak_ref_key"; +#if LUAJIT_FFI +typedef struct ffi_export_symbol { + const char *name; + void *sym; +} ffi_export_symbol; +#endif -#define obj_type_id_ZMQ_Ctx 0 -#define obj_type_ZMQ_Ctx_check(L, _index) \ - obj_udata_luacheck(L, _index, &(obj_type_ZMQ_Ctx)) -#define obj_type_ZMQ_Ctx_delete(L, _index, flags) \ - obj_udata_luadelete(L, _index, &(obj_type_ZMQ_Ctx), flags) -#define obj_type_ZMQ_Ctx_push(L, obj, flags) \ - obj_udata_luapush_weak(L, (void *)obj, &(obj_type_ZMQ_Ctx), flags) + +#define obj_type_id_zmq_msg_t 0 +#define obj_type_zmq_msg_t_check(L, _index) \ + (zmq_msg_t *)obj_simple_udata_luacheck(L, _index, &(obj_type_zmq_msg_t)) +#define obj_type_zmq_msg_t_delete(L, _index, flags) \ + (zmq_msg_t *)obj_simple_udata_luadelete(L, _index, &(obj_type_zmq_msg_t), flags) +#define obj_type_zmq_msg_t_push(L, obj, flags) \ + obj_simple_udata_luapush(L, obj, sizeof(zmq_msg_t), &(obj_type_zmq_msg_t)) #define obj_type_id_ZMQ_Socket 1 #define obj_type_ZMQ_Socket_check(L, _index) \ @@ -146,6 +165,14 @@ static char *obj_udata_weak_ref_key = "obj_udata_weak_ref_key"; #define obj_type_ZMQ_Socket_push(L, obj, flags) \ obj_udata_luapush_weak(L, (void *)obj, &(obj_type_ZMQ_Socket), flags) +#define obj_type_id_ZMQ_Ctx 2 +#define obj_type_ZMQ_Ctx_check(L, _index) \ + obj_udata_luacheck(L, _index, &(obj_type_ZMQ_Ctx)) +#define obj_type_ZMQ_Ctx_delete(L, _index, flags) \ + obj_udata_luadelete(L, _index, &(obj_type_ZMQ_Ctx), flags) +#define obj_type_ZMQ_Ctx_push(L, obj, flags) \ + obj_udata_luapush_weak(L, (void *)obj, &(obj_type_ZMQ_Ctx), flags) + typedef int ZMQ_Error; @@ -153,8 +180,9 @@ typedef int ZMQ_Error; static void error_code__ZMQ_Error__push(lua_State *L, ZMQ_Error err); -static obj_type obj_type_ZMQ_Ctx = { NULL, 0, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Ctx" }; +static obj_type obj_type_zmq_msg_t = { NULL, 0, OBJ_TYPE_SIMPLE, "zmq_msg_t" }; static obj_type obj_type_ZMQ_Socket = { NULL, 1, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Socket" }; +static obj_type obj_type_ZMQ_Ctx = { NULL, 2, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Ctx" }; #ifndef REG_PACKAGE_IS_CONSTRUCTOR @@ -258,13 +286,7 @@ static FUNC_UNUSED void *obj_udata_luacheck(lua_State *L, int _index, obj_type * static FUNC_UNUSED void *obj_udata_luadelete(lua_State *L, int _index, obj_type *type, int *flags) { void *obj; -#if OBJ_DATA_HIDDEN_METATABLE - obj_udata *ud = obj_udata_toobj(L, _index); - (void)type; - obj = ud->obj; -#else obj_udata *ud = obj_udata_luacheck_internal(L, _index, &(obj), type); -#endif *flags = ud->flags; /* null userdata. */ ud->obj = NULL; @@ -276,6 +298,7 @@ static FUNC_UNUSED void *obj_udata_luadelete(lua_State *L, int _index, obj_type } static FUNC_UNUSED void obj_udata_luapush(lua_State *L, void *obj, obj_type *type, int flags) { + obj_udata *ud; /* convert NULL's into Lua nil's. */ if(obj == NULL) { lua_pushnil(L); @@ -286,7 +309,7 @@ static FUNC_UNUSED void obj_udata_luapush(lua_State *L, void *obj, obj_type *typ (type->dcaster)(&obj, &type); } /* create new userdata. */ - obj_udata *ud = (obj_udata *)lua_newuserdata(L, sizeof(obj_udata)); + ud = (obj_udata *)lua_newuserdata(L, sizeof(obj_udata)); ud->obj = obj; ud->flags = flags; /* get obj_type metatable. */ @@ -398,13 +421,11 @@ static FUNC_UNUSED void * obj_simple_udata_luacheck(lua_State *L, int _index, ob static FUNC_UNUSED void * obj_simple_udata_luadelete(lua_State *L, int _index, obj_type *type, int *flags) { void *obj; -#if OBJ_DATA_HIDDEN_METATABLE - obj = obj_simple_udata_toobj(L, _index); - (void)type; -#else obj = obj_simple_udata_luacheck(L, _index, type); -#endif *flags = OBJ_UDATA_FLAG_OWN; + /* clear the metatable to invalidate userdata. */ + lua_pushnil(L); + lua_setmetatable(L, _index); return obj; } @@ -500,7 +521,7 @@ static void obj_type_register_package(lua_State *L, const reg_sub_module *type_r lua_pop(L, 1); /* drop package table */ } -static void obj_type_register(lua_State *L, const reg_sub_module *type_reg) { +static void obj_type_register(lua_State *L, const reg_sub_module *type_reg, int priv_table) { const luaL_reg *reg_list; obj_type *type = type_reg->type; const obj_base *base = type_reg->bases; @@ -543,6 +564,13 @@ static void obj_type_register(lua_State *L, const reg_sub_module *type_reg) { lua_pushvalue(L, -2); /* dup metatable. */ lua_rawset(L, LUA_REGISTRYINDEX); /* REGISTRY[type] = metatable */ +#if LUAJIT_FFI + /* add metatable to 'priv_table' */ + lua_pushstring(L, type->name); + lua_pushvalue(L, -2); /* dup metatable. */ + lua_rawset(L, priv_table); /* priv_table[""] = metatable */ +#endif + luaL_register(L, NULL, type_reg->metas); /* fill metatable */ /* add obj_bases to metatable. */ @@ -573,14 +601,45 @@ static FUNC_UNUSED int lua_checktype_ref(lua_State *L, int _index, int _type) { } #if LUAJIT_FFI -static int nobj_try_loading_ffi(lua_State *L, const char *ffi_init_code) { - int err; +static int nobj_udata_new_ffi(lua_State *L) { + size_t size = luaL_checkinteger(L, 1); + void *ud; + luaL_checktype(L, 2, LUA_TTABLE); + lua_settop(L, 2); + /* create userdata. */ + ud = lua_newuserdata(L, size); + lua_replace(L, 1); + /* set userdata's metatable. */ + lua_setmetatable(L, 1); + return 1; +} - err = luaL_loadstring(L, ffi_init_code); +static int nobj_try_loading_ffi(lua_State *L, const char *ffi_mod_name, + const char *ffi_init_code, const ffi_export_symbol *ffi_exports, int priv_table) +{ + int err; - lua_pushvalue(L, -2); /* dup C module's table. */ - err = lua_pcall(L, 1, 0, 0); + /* export symbols to priv_table. */ + while(ffi_exports->name != NULL) { + lua_pushstring(L, ffi_exports->name); + lua_pushlightuserdata(L, ffi_exports->sym); + lua_settable(L, priv_table); + ffi_exports++; + } + err = luaL_loadbuffer(L, ffi_init_code, strlen(ffi_init_code), ffi_mod_name); + if(0 == err) { + lua_pushvalue(L, -2); /* dup C module's table. */ + lua_pushvalue(L, priv_table); /* move priv_table to top of stack. */ + lua_remove(L, priv_table); + lua_pushcfunction(L, nobj_udata_new_ffi); + err = lua_pcall(L, 3, 0, 0); + } if(err) { + const char *msg = ""; + if(lua_isstring(L, -1)) { + msg = lua_tostring(L, -1); + } + printf("Failed to install FFI-based bindings: %s\n", msg); lua_pop(L, 1); /* pop error message. */ } return err; @@ -588,63 +647,185 @@ static int nobj_try_loading_ffi(lua_State *L, const char *ffi_init_code) { #endif -static const char *zmq_ffi_lua_code = "\ --- Copyright (c) 2010 by Robert G. Jakabosky \n\ ---\n\ --- Permission is hereby granted, free of charge, to any person obtaining a copy\n\ --- of this software and associated documentation files (the \"Software\"), to deal\n\ --- in the Software without restriction, including without limitation the rights\n\ --- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell\n\ --- copies of the Software, and to permit persons to whom the Software is\n\ --- furnished to do so, subject to the following conditions:\n\ ---\n\ --- The above copyright notice and this permission notice shall be included in\n\ --- all copies or substantial portions of the Software.\n\ ---\n\ --- THE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\n\ --- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\n\ --- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\n\ --- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\n\ --- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\n\ --- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\n\ --- THE SOFTWARE.\n\ -\n\ -local zmq = ...\n\ +static const char zmq_ffi_lua_code[] = "\ +local _M, _priv, udata_new = ...\n\ +\n\ +local band = bit.band\n\ +local d_getmetatable = debug.getmetatable\n\ +local d_setmetatable = debug.setmetatable\n\ \n\ -- try loading luajit's ffi\n\ local stat, ffi=pcall(require,\"ffi\")\n\ if not stat then\n\ - print(\"No FFI module: ZMQ using standard Lua api interface.\")\n\ + print(\"No FFI support: Using standard Lua api interface.\")\n\ return\n\ end\n\ -- check if ffi is disabled.\n\ if disable_ffi then\n\ - print(\"FFI disabled: ZMQ using standard Lua api interface.\")\n\ + print(\"FFI disabled: Using standard Lua api interface.\")\n\ return\n\ end\n\ \n\ -local setmetatable = setmetatable\n\ -local getmetatable = getmetatable\n\ -local print = print\n\ -local pairs = pairs\n\ -local error = error\n\ -local type = type\n\ -local assert = assert\n\ -local tostring = tostring\n\ -local tonumber = tonumber\n\ -local newproxy = newproxy\n\ +local OBJ_UDATA_FLAG_OWN = 1\n\ +local OBJ_UDATA_FLAG_LOOKUP = 2\n\ +local OBJ_UDATA_LAST_FLAG = OBJ_UDATA_FLAG_LOOKUP\n\ +\n\ +local OBJ_TYPE_FLAG_WEAK_REF = 1\n\ +local OBJ_TYPE_SIMPLE = 2\n\ +\n\ +ffi.cdef[[\n\ +\n\ +typedef struct obj_type obj_type;\n\ +\n\ +typedef void (*base_caster_t)(void **obj);\n\ +\n\ +typedef void (*dyn_caster_t)(void **obj, obj_type **type);\n\ +\n\ +struct obj_type {\n\ + dyn_caster_t dcaster; /**< caster to support casting to sub-objects. */\n\ + int32_t id; /**< type's id. */\n\ + uint32_t flags; /**< type's flags (weak refs) */\n\ + const char *name; /**< type's object name. */\n\ +};\n\ +\n\ +typedef struct obj_base {\n\ + int32_t id;\n\ + base_caster_t bcaster;\n\ +} obj_base;\n\ +\n\ +typedef struct obj_udata {\n\ + void *obj;\n\ + uint32_t flags; /**< lua_own:1bit */\n\ +} obj_udata;\n\ +\n\ +]]\n\ +\n\ +local obj_type_ptr = ffi.typeof\"obj_type *\"\n\ +local obj_udata_ptr = ffi.typeof\"obj_udata *\"\n\ +local obj_simple_udata_ptr = ffi.typeof\"void *\"\n\ +local obj_udata_size = ffi.sizeof\"obj_udata\"\n\ +\n\ +-- cache mapping of cdata to userdata\n\ +local weak_objects = setmetatable({}, { __mode = \"v\" })\n\ +\n\ +local function obj_udata_luacheck_internal(obj, type_mt)\n\ + local obj_mt = d_getmetatable(obj)\n\ + if obj_mt == type_mt then\n\ + -- convert userdata to cdata.\n\ + return obj_udata_ptr(obj)\n\ + end\n\ + error(\"(expected `\" .. type_mt['.name'] .. \"`, got \" .. type(obj) .. \")\", 3)\n\ +end\n\ +\n\ +local function obj_udata_luacheck(obj, type_mt)\n\ + local ud = obj_udata_luacheck_internal(obj, type_mt)\n\ + return ud.obj\n\ +end\n\ +\n\ +local function obj_udata_luadelete(ud_obj, type_mt)\n\ + local ud = obj_udata_luacheck_internal(ud_obj, type_mt)\n\ + local obj, flags = ud.obj, ud.flags\n\ + -- null userdata.\n\ + ud.obj = nil\n\ + ud.flags = 0\n\ + -- invalid userdata, by setting the metatable to nil.\n\ + d_setmetatable(ud_obj, nil)\n\ + return obj, flags\n\ +end\n\ +\n\ +local function obj_udata_luapush(obj, type_mt, obj_type, flags)\n\ + if obj == nil then return end\n\ +\n\ + -- apply type's dynamic caster.\n\ + if obj_type.dcaster ~= nil then\n\ + local obj_ptr = ffi.new(\"void *[1]\", obj)\n\ + local type_ptr = ffi.new(\"obj_type *[1]\", obj_type)\n\ + obj_type.dcaster(obj_ptr, type_ptr)\n\ + obj = obj_ptr[1]\n\ + type = type_ptr[1]\n\ + end\n\ +\n\ + -- create new userdata\n\ + ud_obj = udata_new(obj_udata_size, type_mt)\n\ + local ud = obj_udata_ptr(ud_obj)\n\ + -- init. object\n\ + ud.obj = obj\n\ + ud.flags = flags\n\ +\n\ + return ud_obj\n\ +end\n\ +\n\ +local function obj_udata_luapush_weak(obj, type_mt, obj_type, flags)\n\ + if obj == nil then return end\n\ +\n\ + -- apply type's dynamic caster.\n\ + if obj_type.dcaster ~= nil then\n\ + local obj_ptr = ffi.new(\"void *[1]\", obj)\n\ + local type_ptr = ffi.new(\"obj_type *[1]\", obj_type)\n\ + obj_type.dcaster(obj_ptr, type_ptr)\n\ + obj = obj_ptr[1]\n\ + type = type_ptr[1]\n\ + end\n\ +\n\ + -- lookup object in weak ref. table.\n\ + local obj_key = tonumber(ffi.cast('uintptr_t', obj))\n\ + local ud_obj = weak_objects[obj_key]\n\ + if ud_obj ~= nil then return ud_obj end\n\ +\n\ + -- create new userdata\n\ + ud_obj = udata_new(obj_udata_size, type_mt)\n\ + local ud = obj_udata_ptr(ud_obj)\n\ + -- init. object\n\ + ud.obj = obj\n\ + ud.flags = flags\n\ +\n\ + -- cache weak reference to object.\n\ + weak_objects[obj_key] = ud_obj\n\ +\n\ + return ud_obj\n\ +end\n\ +\n\ +require\"utils\"\n\ +local function obj_simple_udata_luacheck(ud_obj, type_mt)\n\ + local obj_mt = d_getmetatable(ud_obj)\n\ + if obj_mt == type_mt then\n\ + -- convert userdata to cdata.\n\ + return obj_simple_udata_ptr(ud_obj)\n\ + end\n\ + error(\"(expected `\" .. type_mt['.name'] .. \"`, got \" .. type(ud_obj) .. \")\", 3)\n\ +end\n\ +\n\ +local function obj_simple_udata_luadelete(ud_obj, type_mt)\n\ + local c_obj = obj_simple_udata_luacheck(ud_obj, type_mt)\n\ + -- invalid userdata, by setting the metatable to nil.\n\ + d_setmetatable(ud_obj, nil)\n\ + return c_obj, OBJ_UDATA_FLAG_OWN\n\ +end\n\ +\n\ +local function obj_simple_udata_luapush(c_obj, size, type_mt)\n\ + if c_obj == nil then return end\n\ +\n\ + -- create new userdata\n\ + ud_obj = udata_new(size, type_mt)\n\ + local data = obj_simple_udata_ptr(ud_obj)\n\ + -- init. object\n\ + ffi.copy(data, c_obj, size)\n\ +\n\ + return ud_obj\n\ +end\n\ \n\ ffi.cdef[[\n\ -void zmq_version (int *major, int *minor, int *patch);\n\ -int zmq_errno ();\n\ -const char *zmq_strerror (int errnum);\n\ +typedef const char * (*get_zmq_strerror_func)();\n\ +\n\ +typedef int ZMQ_Error;\n\ +\n\ \n\ typedef struct zmq_msg_t\n\ {\n\ void *content;\n\ unsigned char flags;\n\ unsigned char vsm_size;\n\ - unsigned char vsm_data [30];\n\ + unsigned char vsm_data [30]; /* that '30' is from 'MAX_VSM_SIZE' */\n\ } zmq_msg_t;\n\ \n\ typedef void (zmq_free_fn) (void *data, void *hint);\n\ @@ -652,295 +833,556 @@ typedef void (zmq_free_fn) (void *data, void *hint);\n\ int zmq_msg_init (zmq_msg_t *msg);\n\ int zmq_msg_init_size (zmq_msg_t *msg, size_t size);\n\ int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint);\n\ -int zmq_msg_close (zmq_msg_t *msg);\n\ -int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);\n\ -int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src);\n\ -void *zmq_msg_data (zmq_msg_t *msg);\n\ -size_t zmq_msg_size (zmq_msg_t *msg);\n\ \n\ -void *zmq_init (int io_threads);\n\ -int zmq_term (void *context);\n\ \n\ -void *zmq_socket (void *context, int type);\n\ -int zmq_close (void *s);\n\ +ZMQ_Error zmq_msg_close(zmq_msg_t * this);\n\ +\n\ +ZMQ_Error zmq_msg_close(zmq_msg_t * this);\n\ +\n\ +ZMQ_Error zmq_msg_move(zmq_msg_t * this, zmq_msg_t * src);\n\ +\n\ +ZMQ_Error zmq_msg_copy(zmq_msg_t * this, zmq_msg_t * src);\n\ +\n\ +void * zmq_msg_data(zmq_msg_t * this);\n\ +\n\ +size_t zmq_msg_size(zmq_msg_t * this);\n\ +\n\ +typedef void * ZMQ_Socket;\n\ +\n\ +ZMQ_Error zmq_close(ZMQ_Socket * this);\n\ +\n\ +ZMQ_Error zmq_bind(ZMQ_Socket * this, const char * addr);\n\ +\n\ +ZMQ_Error zmq_connect(ZMQ_Socket * this, const char * addr);\n\ +\n\ int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);\n\ int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);\n\ -int zmq_bind (void *s, const char *addr);\n\ -int zmq_connect (void *s, const char *addr);\n\ -int zmq_send (void *s, zmq_msg_t *msg, int flags);\n\ -int zmq_recv (void *s, zmq_msg_t *msg, int flags);\n\ \n\ -int zmq_device (int device, void * insocket, void* outsocket);\n\ +ZMQ_Error zmq_send(ZMQ_Socket * this, zmq_msg_t * msg, int flags);\n\ +\n\ +typedef ZMQ_Error (*simple_zmq_send_func)(ZMQ_Socket sock, const char *data, size_t data_len, int flags);\n\ +\n\ +ZMQ_Error zmq_recv(ZMQ_Socket * this, zmq_msg_t * msg, int flags);\n\ +\n\ +typedef void * ZMQ_Ctx;\n\ +\n\ +ZMQ_Error zmq_term(ZMQ_Ctx * this);\n\ +\n\ +ZMQ_Socket zmq_socket(ZMQ_Ctx * this, int type);\n\ +\n\ +ZMQ_Ctx zmq_init(int io_threads);\n\ +\n\ +ZMQ_Error zmq_device(int device, ZMQ_Socket insock, ZMQ_Socket outsock);\n\ +\n\ \n\ ]]\n\ \n\ -local C = ffi.load\"zmq\"\n\ +local zmq_msg_t_mt = _priv[\"zmq_msg_t\"]\n\ +local zmq_msg_t_type = obj_type_ptr(zmq_msg_t_mt[\".type\"])\n\ +local zmq_msg_t_meth = zmq_msg_t_mt.__index\n\ +local zmq_msg_t_objects = setmetatable({}, { __mode = \"k\" })\n\ \n\ --- simulate: module(...)\n\ -zmq._M = zmq\n\ -setfenv(1, zmq)\n\ +local function obj_type_zmq_msg_t_check(ud_obj)\n\ + local c_obj = zmq_msg_t_objects[ud_obj]\n\ + if c_obj == nil then\n\ + -- cdata object not in cache\n\ + c_obj = obj_simple_udata_luacheck(ud_obj, zmq_msg_t_mt)\n\ + zmq_msg_t_objects[ud_obj] = c_obj\n\ + end\n\ + return c_obj\n\ +end\n\ \n\ -function version()\n\ - local major = ffi.new('int[1]',0)\n\ - local minor = ffi.new('int[1]',0)\n\ - local patch = ffi.new('int[1]',0)\n\ - C.zmq_version(major, minor, patch)\n\ - return {major[0], minor[0], patch[0]}\n\ +local function obj_type_zmq_msg_t_delete(ud_obj)\n\ + zmq_msg_t_objects[ud_obj] = nil\n\ + return obj_simple_udata_luadelete(ud_obj, zmq_msg_t_mt)\n\ end\n\ \n\ -local function zmq_error()\n\ - local errno = C.zmq_errno()\n\ - local err = ffi.string(C.zmq_strerror(errno))\n\ - if err == \"Resource temporarily unavailable\" then err = \"timeout\" end\n\ - if err == \"Context was terminated\" then err = \"closed\" end\n\ - return nil, err\n\ +local zmq_msg_t_sizeof = ffi.sizeof\"zmq_msg_t\"\n\ +local function obj_type_zmq_msg_t_push(c_obj)\n\ + local ud_obj = obj_simple_udata_luapush(c_obj, zmq_msg_t_sizeof, zmq_msg_t_mt)\n\ + zmq_msg_t_objects[ud_obj] = c_obj\n\ + return ud_obj\n\ end\n\ \n\ ---\n\ --- ZMQ socket\n\ ---\n\ -local sock_mt = {}\n\ -sock_mt.__index = sock_mt\n\ -\n\ -function sock_mt:close()\n\ - -- get the true self\n\ - self=getmetatable(self)\n\ - local sock = self.sock\n\ - -- make sure socket is still valid.\n\ - if not sock then return end\n\ - -- mark this socket as closed.\n\ - self.sock = nil\n\ - -- close zmq socket.\n\ - if C.zmq_close(sock) ~= 0 then\n\ - return zmq_error()\n\ +\n\ +local ZMQ_Socket_mt = _priv[\"ZMQ_Socket\"]\n\ +local ZMQ_Socket_type = obj_type_ptr(ZMQ_Socket_mt[\".type\"])\n\ +local ZMQ_Socket_meth = ZMQ_Socket_mt.__index\n\ +local ZMQ_Socket_objects = setmetatable({}, { __mode = \"k\" })\n\ +\n\ +local function obj_type_ZMQ_Socket_check(ud_obj)\n\ + local c_obj = ZMQ_Socket_objects[ud_obj]\n\ + if c_obj == nil then\n\ + -- cdata object not in cache\n\ + c_obj = obj_udata_luacheck(ud_obj, ZMQ_Socket_mt)\n\ + ZMQ_Socket_objects[ud_obj] = c_obj\n\ end\n\ - return true\n\ + return c_obj\n\ end\n\ \n\ -local function sock__gc(self)\n\ - self:close()\n\ +local function obj_type_ZMQ_Socket_delete(ud_obj)\n\ + ZMQ_Socket_objects[ud_obj] = nil\n\ + return obj_udata_luadelete(ud_obj, ZMQ_Socket_mt)\n\ end\n\ \n\ -local option_types = {\n\ -[zmq.HWM] = 'uint64_t[1]',\n\ -[zmq.SWAP] = 'int64_t[1]',\n\ -[zmq.AFFINITY] = 'uint64_t[1]',\n\ -[zmq.IDENTITY] = 'string',\n\ -[zmq.SUBSCRIBE] = 'string',\n\ -[zmq.UNSUBSCRIBE] = 'string',\n\ -[zmq.RATE] = 'int64_t[1]',\n\ -[zmq.RECOVERY_IVL] = 'int64_t[1]',\n\ -[zmq.MCAST_LOOP] = 'int64_t[1]',\n\ -[zmq.SNDBUF] = 'uint64_t[1]',\n\ -[zmq.RCVBUF] = 'uint64_t[1]',\n\ -[zmq.RCVMORE] = 'int64_t[1]',\n\ -[zmq.FD] = 'int[1]',\n\ -[zmq.EVENTS] = 'uint32_t[1]',\n\ -[zmq.TYPE] = 'int[1]',\n\ -[zmq.LINGER] = 'int[1]',\n\ -[zmq.RECONNECT_IVL] = 'int[1]',\n\ -[zmq.BACKLOG] = 'int[1]',\n\ -}\n\ -local option_len = {}\n\ -local option_tmps = {}\n\ -for k,v in pairs(option_types) do\n\ - if v ~= 'string' then\n\ - option_len[k] = ffi.sizeof(v)\n\ - option_tmps[k] = ffi.new(v, 0)\n\ - end\n\ +local function obj_type_ZMQ_Socket_push(c_obj, flags)\n\ + local ud_obj = obj_udata_luapush_weak(c_obj, ZMQ_Socket_mt, ZMQ_Socket_type, flags)\n\ + ZMQ_Socket_objects[ud_obj] = c_obj\n\ + return ud_obj\n\ end\n\ -function sock_mt:setopt(opt, opt_val)\n\ - local ctype = option_types[opt]\n\ - local val_len = 0\n\ - if ctype == 'string' then\n\ - --val = ffi.cast('void *', tostring(val))\n\ - val = tostring(opt_val)\n\ - val_len = #val\n\ - else\n\ - val = option_tmps[opt]\n\ - val[0] = opt_val\n\ - val_len = option_len[opt]\n\ - end\n\ - local ret = C.zmq_setsockopt(self.sock, opt, val, val_len)\n\ - if ret ~= 0 then\n\ - return zmq_error()\n\ +\n\ +\n\ +local ZMQ_Ctx_mt = _priv[\"ZMQ_Ctx\"]\n\ +local ZMQ_Ctx_type = obj_type_ptr(ZMQ_Ctx_mt[\".type\"])\n\ +local ZMQ_Ctx_meth = ZMQ_Ctx_mt.__index\n\ +local ZMQ_Ctx_objects = setmetatable({}, { __mode = \"k\" })\n\ +\n\ +local function obj_type_ZMQ_Ctx_check(ud_obj)\n\ + local c_obj = ZMQ_Ctx_objects[ud_obj]\n\ + if c_obj == nil then\n\ + -- cdata object not in cache\n\ + c_obj = obj_udata_luacheck(ud_obj, ZMQ_Ctx_mt)\n\ + ZMQ_Ctx_objects[ud_obj] = c_obj\n\ end\n\ - return true\n\ + return c_obj\n\ end\n\ \n\ -local tmp_val_len = ffi.new('size_t[1]', 4)\n\ -function sock_mt:getopt(opt)\n\ - local ctype = option_types[opt]\n\ - local val\n\ - local val_len = tmp_val_len\n\ - if ctype == 'string' then\n\ - val_len[0] = 255\n\ - val = ffi.new('uint8_t[?]', val_len[0])\n\ - ffi.fill(val, val_len[0])\n\ - else\n\ - val = option_tmps[opt]\n\ - val[0] = 0\n\ - val_len[0] = option_len[opt]\n\ - end\n\ - local ret = C.zmq_getsockopt(self.sock, opt, val, val_len)\n\ - if ret ~= 0 then\n\ - return zmq_error()\n\ - end\n\ - if ctype == 'string' then\n\ - val_len = val_len[0]\n\ - return ffi.string(val, val_len)\n\ - else\n\ - val = val[0]\n\ - end\n\ - return tonumber(val)\n\ +local function obj_type_ZMQ_Ctx_delete(ud_obj)\n\ + ZMQ_Ctx_objects[ud_obj] = nil\n\ + return obj_udata_luadelete(ud_obj, ZMQ_Ctx_mt)\n\ end\n\ \n\ -local tmp32 = ffi.new('uint32_t[1]', 0)\n\ -local tmp32_len = ffi.new('size_t[1]', 4)\n\ -function sock_mt:events()\n\ - local val = tmp32\n\ - local val_len = tmp32_len\n\ - val[0] = 0\n\ - val_len[0] = 4\n\ - local ret = C.zmq_getsockopt(self.sock, 15, val, val_len)\n\ - if ret ~= 0 then\n\ - return zmq_error()\n\ - end\n\ - return val[0]\n\ +local function obj_type_ZMQ_Ctx_push(c_obj, flags)\n\ + local ud_obj = obj_udata_luapush_weak(c_obj, ZMQ_Ctx_mt, ZMQ_Ctx_type, flags)\n\ + ZMQ_Ctx_objects[ud_obj] = c_obj\n\ + return ud_obj\n\ end\n\ \n\ -function sock_mt:bind(addr)\n\ - local ret = C.zmq_bind(self.sock, addr)\n\ - if ret ~= 0 then\n\ - return zmq_error()\n\ - end\n\ - return true\n\ +\n\ +local zmq_mt = _M\n\ +local zmq_meth = _M\n\ +local zmq_func = _M\n\ +\n\ +\n\ +local C = ffi.load(\"zmq\",false)\n\ +\n\ +local OBJ_UDATA_CTX_SHOULD_FREE = (OBJ_UDATA_LAST_FLAG * 2)\n\ +\n\ +local get_zmq_strerror = ffi.new(\"get_zmq_strerror_func\", _priv[\"get_zmq_strerror\"])\n\ +\n\ +local C_get_zmq_strerror = get_zmq_strerror\n\ +-- make nicer wrapper for exported error function.\n\ +local function get_zmq_strerror()\n\ + return ffi.string(C_get_zmq_strerror())\n\ end\n\ \n\ -function sock_mt:connect(addr)\n\ - local ret = C.zmq_connect(self.sock, addr)\n\ - if ret ~= 0 then\n\ - return zmq_error()\n\ +local function error_code__ZMQ_Error__push(err)\n\ + local err_str\n\ + if(0 ~= err) then\n\ + err_str = get_zmq_strerror();\n\ end\n\ - return true\n\ +\n\ + return err_str\n\ end\n\ \n\ -local tmp_msg = ffi.new('zmq_msg_t')\n\ -function sock_mt:send(data, flags)\n\ - local msg = tmp_msg\n\ - local msg_len = #data\n\ - -- initialize message\n\ - if C.zmq_msg_init_size(msg, msg_len) < 0 then\n\ - return zmq_error()\n\ +\n\ +-- Start \"zmq_msg_t\" FFI interface\n\ +-- method: delete\n\ +function zmq_msg_t_meth.delete(self)\n\ + local this,this_flags = obj_type_zmq_msg_t_delete(self)\n\ + if(band(this_flags,OBJ_UDATA_FLAG_OWN) == 0) then return end\n\ + local rc_zmq_msg_close\n\ + rc_zmq_msg_close = C.zmq_msg_close(this)\n\ + -- check for error.\n\ + local rc_zmq_msg_close_err\n\ + if (0 ~= rc_zmq_msg_close) then\n\ + rc_zmq_msg_close = false\n\ + rc_zmq_msg_close_err = error_code__ZMQ_Error__push(rc_zmq_msg_close)\n\ + else\n\ + rc_zmq_msg_close = true\n\ + end\n\ + return rc_zmq_msg_close, rc_zmq_msg_close_err\n\ +end\n\ +\n\ +-- method: close\n\ +function zmq_msg_t_meth.close(self)\n\ + local this = obj_type_zmq_msg_t_check(self)\n\ + local rc_zmq_msg_close\n\ + rc_zmq_msg_close = C.zmq_msg_close(this)\n\ + -- check for error.\n\ + local rc_zmq_msg_close_err\n\ + if (0 ~= rc_zmq_msg_close) then\n\ + rc_zmq_msg_close = false\n\ + rc_zmq_msg_close_err = error_code__ZMQ_Error__push(rc_zmq_msg_close)\n\ + else\n\ + rc_zmq_msg_close = true\n\ + end\n\ + return rc_zmq_msg_close, rc_zmq_msg_close_err\n\ +end\n\ +\n\ +-- method: move\n\ +function zmq_msg_t_meth.move(self, src)\n\ + local this = obj_type_zmq_msg_t_check(self)\n\ + src = obj_type_zmq_msg_t_check(src)\n\ + local rc_zmq_msg_move\n\ + rc_zmq_msg_move = C.zmq_msg_move(this, src)\n\ + -- check for error.\n\ + local rc_zmq_msg_move_err\n\ + if (0 ~= rc_zmq_msg_move) then\n\ + rc_zmq_msg_move = false\n\ + rc_zmq_msg_move_err = error_code__ZMQ_Error__push(rc_zmq_msg_move)\n\ + else\n\ + rc_zmq_msg_move = true\n\ + end\n\ + return rc_zmq_msg_move, rc_zmq_msg_move_err\n\ +end\n\ +\n\ +-- method: copy\n\ +function zmq_msg_t_meth.copy(self, src)\n\ + local this = obj_type_zmq_msg_t_check(self)\n\ + src = obj_type_zmq_msg_t_check(src)\n\ + local rc_zmq_msg_copy\n\ + rc_zmq_msg_copy = C.zmq_msg_copy(this, src)\n\ + -- check for error.\n\ + local rc_zmq_msg_copy_err\n\ + if (0 ~= rc_zmq_msg_copy) then\n\ + rc_zmq_msg_copy = false\n\ + rc_zmq_msg_copy_err = error_code__ZMQ_Error__push(rc_zmq_msg_copy)\n\ + else\n\ + rc_zmq_msg_copy = true\n\ + end\n\ + return rc_zmq_msg_copy, rc_zmq_msg_copy_err\n\ +end\n\ +\n\ +-- method: set_data\n\ +function zmq_msg_t_meth.set_data(self, data)\n\ + local this = obj_type_zmq_msg_t_check(self)\n\ + local data_len = #data\n\ + local err\n\ + -- check message data size.\n\ + if (C.zmq_msg_size(this) ~= data_len) then\n\ + -- need to resize message.\n\ + C.zmq_msg_close(this); -- close old message, to free old data.\n\ + err = C.zmq_msg_init_size(this, data_len); -- re-initialize message.\n\ + if (0 ~= err) then\n\ + error(\"set_data() failed: \" .. get_zmq_strerror());\n\ + end\n\ end\n\ - -- copy data into message.\n\ - ffi.copy(C.zmq_msg_data(msg), data, msg_len)\n\ + -- copy data into message\n\ + ffi.copy(C.zmq_msg_data(this), data, data_len);\n\ \n\ - -- send message\n\ - local ret = C.zmq_send(self.sock, msg, flags or 0)\n\ - -- close message before processing return code\n\ - C.zmq_msg_close(msg)\n\ - -- now process send return code\n\ - if ret ~= 0 then\n\ - return zmq_error()\n\ + -- check for error.\n\ + local err_err\n\ + if (0 ~= err) then\n\ + err = false\n\ + err_err = error_code__ZMQ_Error__push(err)\n\ + else\n\ + err = true\n\ + end\n\ + return err, err_err\n\ +end\n\ +\n\ +-- method: data\n\ +function zmq_msg_t_meth.data(self)\n\ + local this = obj_type_zmq_msg_t_check(self)\n\ + local rc_zmq_msg_data\n\ + rc_zmq_msg_data = C.zmq_msg_data(this)\n\ + rc_zmq_msg_data = rc_zmq_msg_data\n\ + return rc_zmq_msg_data\n\ +end\n\ +\n\ +-- method: set_size\n\ +function zmq_msg_t_meth.set_size(self, size)\n\ + local this = obj_type_zmq_msg_t_check(self)\n\ + \n\ + local err\n\ + -- check message data size.\n\ + if (C.zmq_msg_size(this) ~= size) then\n\ + -- need to resize message.\n\ + C.zmq_msg_close(this); -- close old message, to free old data.\n\ + err = C.zmq_msg_init_size(this, size); -- re-initialize message.\n\ + if (0 ~= err) then\n\ + error(\"set_size() failed: \" .. get_zmq_strerror());\n\ + end\n\ end\n\ - return true\n\ +\n\ + -- check for error.\n\ + local err_err\n\ + if (0 ~= err) then\n\ + err = false\n\ + err_err = error_code__ZMQ_Error__push(err)\n\ + else\n\ + err = true\n\ + end\n\ + return err, err_err\n\ +end\n\ +\n\ +-- method: size\n\ +function zmq_msg_t_meth.size(self)\n\ + local this = obj_type_zmq_msg_t_check(self)\n\ + local rc_zmq_msg_size\n\ + rc_zmq_msg_size = C.zmq_msg_size(this)\n\ + rc_zmq_msg_size = rc_zmq_msg_size\n\ + return rc_zmq_msg_size\n\ +end\n\ +\n\ +-- method: __tostring\n\ +function zmq_msg_t_mt.__tostring(self)\n\ + local this = obj_type_zmq_msg_t_check(self)\n\ + local data_len = 0\n\ + local data\n\ + data = zmq_msg_data(this);\n\ + data_len = zmq_msg_size(this);\n\ +\n\ + data = ((nil ~= data) and ffi.string(data,data_len))\n\ + return data\n\ +end\n\ +\n\ +-- End \"zmq_msg_t\" FFI interface\n\ +\n\ +\n\ +-- Start \"ZMQ_Socket\" FFI interface\n\ +-- method: close\n\ +function ZMQ_Socket_meth.close(self)\n\ + local this,this_flags = obj_type_ZMQ_Socket_delete(self)\n\ + if(band(this_flags,OBJ_UDATA_FLAG_OWN) == 0) then return end\n\ + local rc_zmq_close\n\ + rc_zmq_close = C.zmq_close(this)\n\ + -- check for error.\n\ + local rc_zmq_close_err\n\ + if (0 ~= rc_zmq_close) then\n\ + rc_zmq_close = false\n\ + rc_zmq_close_err = error_code__ZMQ_Error__push(rc_zmq_close)\n\ + else\n\ + rc_zmq_close = true\n\ + end\n\ + return rc_zmq_close, rc_zmq_close_err\n\ +end\n\ +\n\ +-- method: bind\n\ +function ZMQ_Socket_meth.bind(self, addr)\n\ + local this = obj_type_ZMQ_Socket_check(self)\n\ + local addr_len = #addr\n\ + local rc_zmq_bind\n\ + rc_zmq_bind = C.zmq_bind(this, addr)\n\ + -- check for error.\n\ + local rc_zmq_bind_err\n\ + if (0 ~= rc_zmq_bind) then\n\ + rc_zmq_bind = false\n\ + rc_zmq_bind_err = error_code__ZMQ_Error__push(rc_zmq_bind)\n\ + else\n\ + rc_zmq_bind = true\n\ + end\n\ + return rc_zmq_bind, rc_zmq_bind_err\n\ +end\n\ +\n\ +-- method: connect\n\ +function ZMQ_Socket_meth.connect(self, addr)\n\ + local this = obj_type_ZMQ_Socket_check(self)\n\ + local addr_len = #addr\n\ + local rc_zmq_connect\n\ + rc_zmq_connect = C.zmq_connect(this, addr)\n\ + -- check for error.\n\ + local rc_zmq_connect_err\n\ + if (0 ~= rc_zmq_connect) then\n\ + rc_zmq_connect = false\n\ + rc_zmq_connect_err = error_code__ZMQ_Error__push(rc_zmq_connect)\n\ + else\n\ + rc_zmq_connect = true\n\ + end\n\ + return rc_zmq_connect, rc_zmq_connect_err\n\ +end\n\ +\n\ +-- temp. values for 'events' function.\n\ +local events_tmp = ffi.new('uint32_t[1]', 0)\n\ +local events_tmp_size = ffi.sizeof('uint32_t')\n\ +local events_tmp_len = ffi.new('size_t[1]', events_tmp_size)\n\ +local ZMQ_EVENTS = _M.EVENTS\n\ +\n\ +-- method: events\n\ +function ZMQ_Socket_meth.events(self)\n\ + local this = obj_type_ZMQ_Socket_check(self)\n\ + local events\n\ + local err\n\ + events_tmp_len[0] = events_tmp_size\n\ + err = C.zmq_getsockopt(this, ZMQ_EVENTS, events_tmp, events_tmp_len);\n\ + events = events_tmp[0]\n\ +\n\ + if not (0 ~= err) then\n\ + events = events\n\ + else\n\ + events = nil\n\ + end\n\ + err = error_code__ZMQ_Error__push(err)\n\ + return events, err\n\ +end\n\ +\n\ +-- method: send_msg\n\ +function ZMQ_Socket_meth.send_msg(self, msg, flags)\n\ + local this = obj_type_ZMQ_Socket_check(self)\n\ + msg = obj_type_zmq_msg_t_check(msg)\n\ + flags = flags or 0\n\ + local rc_zmq_send\n\ + rc_zmq_send = C.zmq_send(this, msg, flags)\n\ + -- check for error.\n\ + local rc_zmq_send_err\n\ + if (0 ~= rc_zmq_send) then\n\ + rc_zmq_send = false\n\ + rc_zmq_send_err = error_code__ZMQ_Error__push(rc_zmq_send)\n\ + else\n\ + rc_zmq_send = true\n\ + end\n\ + return rc_zmq_send, rc_zmq_send_err\n\ end\n\ \n\ -function sock_mt:recv(flags)\n\ +local simple_zmq_send = ffi.new(\"simple_zmq_send_func\", _priv[\"simple_zmq_send\"])\n\ +\n\ +-- method: send\n\ +function ZMQ_Socket_meth.send(self, data, flags)\n\ + local this = obj_type_ZMQ_Socket_check(self)\n\ + local data_len = #data\n\ + flags = flags or 0\n\ + local err\n\ + err = simple_zmq_send(this, data, data_len, flags);\n\ +\n\ + -- check for error.\n\ + local err_err\n\ + if (0 ~= err) then\n\ + err = false\n\ + err_err = error_code__ZMQ_Error__push(err)\n\ + else\n\ + err = true\n\ + end\n\ + return err, err_err\n\ +end\n\ +\n\ +-- method: recv_msg\n\ +function ZMQ_Socket_meth.recv_msg(self, msg, flags)\n\ + local this = obj_type_ZMQ_Socket_check(self)\n\ + msg = obj_type_zmq_msg_t_check(msg)\n\ + flags = flags or 0\n\ + local rc_zmq_recv\n\ + rc_zmq_recv = C.zmq_recv(this, msg, flags)\n\ + -- check for error.\n\ + local rc_zmq_recv_err\n\ + if (0 ~= rc_zmq_recv) then\n\ + rc_zmq_recv = false\n\ + rc_zmq_recv_err = error_code__ZMQ_Error__push(rc_zmq_recv)\n\ + else\n\ + rc_zmq_recv = true\n\ + end\n\ + return rc_zmq_recv, rc_zmq_recv_err\n\ +end\n\ +\n\ +local tmp_msg = ffi.new('zmq_msg_t')\n\ +\n\ +-- method: recv\n\ +function ZMQ_Socket_meth.recv(self, flags)\n\ + local this = obj_type_ZMQ_Socket_check(self)\n\ + flags = flags or 0\n\ + local data_len = 0\n\ + local data\n\ + local err\n\ local msg = tmp_msg\n\ -- initialize blank message.\n\ if C.zmq_msg_init(msg) < 0 then\n\ - return zmq_error()\n\ + return nil, get_zmq_strerror()\n\ end\n\ \n\ -- receive message\n\ - local ret = C.zmq_recv(self.sock, msg, flags or 0)\n\ - if ret ~= 0 then\n\ - local data, err = zmq_error()\n\ + err = C.zmq_recv(this, msg, flags)\n\ + if 0 == err then\n\ + local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg))\n\ + -- close message\n\ C.zmq_msg_close(msg)\n\ - return data, err\n\ + return data\n\ end\n\ - local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg))\n\ +\n\ + if not (0 ~= err) then\n\ + data = ((nil ~= data) and ffi.string(data,data_len))\n\ + else\n\ + data = nil\n\ + end\n\ + err = error_code__ZMQ_Error__push(err)\n\ -- close message\n\ C.zmq_msg_close(msg)\n\ - return data\n\ -end\n\ \n\ ---\n\ --- ZMQ context\n\ ---\n\ -local ctx_mt = {}\n\ -ctx_mt.__index = ctx_mt\n\ -\n\ -function ctx_mt:term()\n\ - -- get the true self\n\ - self=getmetatable(self)\n\ - local ctx = self.ctx\n\ - self.ctx = nil\n\ - -- make sure context is valid.\n\ - if not ctx then return end\n\ - if C.zmq_term(ctx) ~= 0 then\n\ - return zmq_error()\n\ - end\n\ - return true\n\ + return data, err\n\ end\n\ \n\ -function ctx_mt:lightuserdata()\n\ - return self.ctx\n\ -end\n\ +-- End \"ZMQ_Socket\" FFI interface\n\ \n\ -function ctx_mt:socket(sock_type)\n\ - local sock = C.zmq_socket(self.ctx, sock_type)\n\ - if not sock then\n\ - return zmq_error()\n\ - end\n\ - -- use a wrapper newproxy for __gc support\n\ - local self=newproxy(true)\n\ - local meta=getmetatable(self)\n\ - meta.__index = meta\n\ - meta.sock = sock\n\ - meta.__gc = sock__gc\n\ - setmetatable(meta, sock_mt)\n\ - return self\n\ +\n\ +-- Start \"ZMQ_Ctx\" FFI interface\n\ +-- method: term\n\ +function ZMQ_Ctx_meth.term(self)\n\ + local this = obj_type_ZMQ_Ctx_check(self)\n\ + local rc_zmq_term\n\ + rc_zmq_term = C.zmq_term(this)\n\ + -- check for error.\n\ + local rc_zmq_term_err\n\ + if (0 ~= rc_zmq_term) then\n\ + rc_zmq_term = false\n\ + rc_zmq_term_err = error_code__ZMQ_Error__push(rc_zmq_term)\n\ + else\n\ + rc_zmq_term = true\n\ + end\n\ + return rc_zmq_term, rc_zmq_term_err\n\ end\n\ \n\ -local function ctx__gc(self)\n\ - if self.should_free then\n\ - self:term()\n\ - end\n\ +-- method: socket\n\ +function ZMQ_Ctx_meth.socket(self, type)\n\ + local this = obj_type_ZMQ_Ctx_check(self)\n\ + \n\ + local rc_zmq_socket_flags = OBJ_UDATA_FLAG_OWN\n\ + local rc_zmq_socket\n\ + rc_zmq_socket = C.zmq_socket(this, type)\n\ + local rc_zmq_socket_err\n\ + if (nil == rc_zmq_socket) then\n\ + rc_zmq_socket_err = get_zmq_strerror()\n\ + else\n\ + rc_zmq_socket = obj_type_ZMQ_Socket_push(rc_zmq_socket, rc_zmq_socket_flags)\n\ + end\n\ + return rc_zmq_socket, rc_zmq_socket_err\n\ end\n\ \n\ -function init(io_threads)\n\ - local should_free = true\n\ - local ctx\n\ - print(\"ZMQ using FFI interface.\")\n\ - if type(io_threads) == 'number' then\n\ - ctx = C.zmq_init(io_threads)\n\ - if not ctx then\n\ - return zmq_error()\n\ - end\n\ - else\n\ - should_free = false\n\ - -- should be lightuserdata or cdata\n\ - ctx = io_threads\n\ - end\n\ - -- use a wrapper newproxy for __gc support\n\ - local self=newproxy(true)\n\ - local meta=getmetatable(self)\n\ - meta.__index = meta\n\ - meta.ctx = ctx\n\ - meta.should_free = should_free\n\ - meta.__gc = ctx__gc\n\ - setmetatable(meta, ctx_mt)\n\ - return self\n\ +-- End \"ZMQ_Ctx\" FFI interface\n\ +\n\ +-- method: init\n\ +function zmq_meth.init(io_threads)\n\ + \n\ + local rc_zmq_init_flags = OBJ_UDATA_FLAG_OWN\n\ + local rc_zmq_init\n\ + rc_zmq_init = C.zmq_init(io_threads)\n\ + local rc_zmq_init_err\n\ + if (nil == rc_zmq_init) then\n\ + rc_zmq_init_err = get_zmq_strerror()\n\ + else\n\ + rc_zmq_init = obj_type_ZMQ_Ctx_push(rc_zmq_init, rc_zmq_init_flags)\n\ + end\n\ + return rc_zmq_init, rc_zmq_init_err\n\ end\n\ \n\ +-- method: device\n\ +function zmq_meth.device(device, insock, outsock)\n\ + \n\ + insock = obj_type_ZMQ_Socket_check(insock)\n\ + outsock = obj_type_ZMQ_Socket_check(outsock)\n\ + local rc_zmq_device\n\ + rc_zmq_device = C.zmq_device(device, insock, outsock)\n\ + -- check for error.\n\ + local rc_zmq_device_err\n\ + if (0 ~= rc_zmq_device) then\n\ + rc_zmq_device = false\n\ + rc_zmq_device_err = error_code__ZMQ_Error__push(rc_zmq_device)\n\ + else\n\ + rc_zmq_device = true\n\ + end\n\ + return rc_zmq_device, rc_zmq_device_err\n\ +end\n\ \n\ "; -typedef void * ZMQ_Ctx; - /* detect zmq version >= 2.1.0 */ #define VERSION_2_1 0 #if defined(ZMQ_VERSION) @@ -997,8 +1439,30 @@ static const int opt_types[] = { #define MAX_OPTS ZMQ_BACKLOG +static ZMQ_Error simple_zmq_send(ZMQ_Socket sock, const char *data, size_t data_len, int flags) { + ZMQ_Error err; + zmq_msg_t msg; + /* initialize message */ + err = zmq_msg_init_size(&msg, data_len); + if(0 == err) { + /* fill message */ + memcpy(zmq_msg_data(&msg), data, data_len); + /* send message */ + err = zmq_send(sock, &msg, flags); + /* close message */ + zmq_msg_close(&msg); + } + return err; +} + +typedef void * ZMQ_Ctx; + #define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1) +/* + * This wrapper function is to make the EAGAIN/ETERM error messages more like + * what is returned by LuaSocket. + */ static const char *get_zmq_strerror() { int err = zmq_errno(); switch(err) { @@ -1017,132 +1481,224 @@ static const char *get_zmq_strerror() { -/* method: version */ -static int zmq__version__func(lua_State *L) { - int major, minor, patch; - zmq_version(&(major), &(minor), &(patch)); - - /* return version as a table: { major, minor, patch } */ - lua_createtable(L, 3, 0); - lua_pushinteger(L, major); - lua_rawseti(L, -2, 1); - lua_pushinteger(L, minor); - lua_rawseti(L, -2, 2); - lua_pushinteger(L, patch); - lua_rawseti(L, -2, 3); +static void error_code__ZMQ_Error__push(lua_State *L, ZMQ_Error err) { + const char *err_str = NULL; + if(err != 0) { + err_str = get_zmq_strerror(); + } - return 1; + if(err_str) { + lua_pushstring(L, err_str); + } else { + lua_pushnil(L); + } } /* method: init */ -static int zmq__init__func(lua_State *L) { - int ctx_flags = OBJ_UDATA_FLAG_OWN; - ZMQ_Ctx ctx; - if(lua_isnumber(L, 1)) { - ctx = zmq_init(lua_tointeger(L,1)); - ctx_flags |= OBJ_UDATA_CTX_SHOULD_FREE; - } else if(lua_isuserdata(L, 1)) { - ctx = lua_touserdata(L, 1); - } else { - /* check if value is a LuaJIT 'cdata' */ - int type = lua_type(L, 1); - const char *typename = lua_typename(L, type); - if(strncmp(typename, "cdata", sizeof("cdata")) == 0) { - ctx = (void *)lua_topointer(L, 1); - } else { - return luaL_argerror(L, 1, "(expected number)"); - } - } +static int zmq_msg_t__init__meth(lua_State *L) { + int this_flags = OBJ_UDATA_FLAG_OWN; + zmq_msg_t * this; + ZMQ_Error err = 0; + zmq_msg_t tmp; + this = &tmp; + err = zmq_msg_init(this); - if((NULL == ctx)) { - lua_pushnil(L); - lua_pushstring(L, get_zmq_strerror()); + if(!(0 != err)) { + obj_type_zmq_msg_t_push(L, this, this_flags); } else { - obj_type_ZMQ_Ctx_push(L, ctx, ctx_flags); + lua_pushnil(L); } - return 1; + error_code__ZMQ_Error__push(L, err); + return 2; } -/* method: device */ -static int zmq__device__func(lua_State *L) { - int device = luaL_checkinteger(L,1); - ZMQ_Socket insock = obj_type_ZMQ_Socket_check(L,2); - ZMQ_Socket outsock = obj_type_ZMQ_Socket_check(L,3); - ZMQ_Error rc_zmq_device = 0; - rc_zmq_device = zmq_device(device, insock, outsock); - /* check for error. */ - if((0 != rc_zmq_device)) { - lua_pushboolean(L, 0); - error_code__ZMQ_Error__push(L, rc_zmq_device); +/* method: init_size */ +static int zmq_msg_t__init_size__meth(lua_State *L) { + size_t size = luaL_checkinteger(L,1); + int this_flags = OBJ_UDATA_FLAG_OWN; + zmq_msg_t * this; + ZMQ_Error err = 0; + zmq_msg_t tmp; + this = &tmp; + err = zmq_msg_init_size(this, size); + + if(!(0 != err)) { + obj_type_zmq_msg_t_push(L, this, this_flags); } else { - lua_pushboolean(L, 1); + lua_pushnil(L); } + error_code__ZMQ_Error__push(L, err); return 2; } -static void error_code__ZMQ_Error__push(lua_State *L, ZMQ_Error err) { - const char *err_str = NULL; - if(err != 0) { - err_str = get_zmq_strerror(); +/* method: init_data */ +static int zmq_msg_t__init_data__meth(lua_State *L) { + size_t data_len; + const char * data = luaL_checklstring(L,1,&(data_len)); + int this_flags = OBJ_UDATA_FLAG_OWN; + zmq_msg_t * this; + ZMQ_Error err = 0; + zmq_msg_t tmp; + this = &tmp; + err = zmq_msg_init_size(this, data_len); + if(0 == err) { + /* fill message */ + memcpy(zmq_msg_data(this), data, data_len); } - if(err_str) { - lua_pushstring(L, err_str); - } else { - lua_pushnil(L); - } + if(!(0 != err)) { + obj_type_zmq_msg_t_push(L, this, this_flags); + } else { + lua_pushnil(L); + } + error_code__ZMQ_Error__push(L, err); + return 2; } /* method: delete */ -static int ZMQ_Ctx__delete__meth(lua_State *L) { +static int zmq_msg_t__delete__meth(lua_State *L) { int this_flags = 0; - ZMQ_Ctx * this = obj_type_ZMQ_Ctx_delete(L,1,&(this_flags)); + zmq_msg_t * this = obj_type_zmq_msg_t_delete(L,1,&(this_flags)); if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; } - if(this_flags & OBJ_UDATA_CTX_SHOULD_FREE) { - zmq_term(this); - } + ZMQ_Error rc_zmq_msg_close = 0; + rc_zmq_msg_close = zmq_msg_close(this); + /* check for error. */ + if((0 != rc_zmq_msg_close)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, rc_zmq_msg_close); + } else { + lua_pushboolean(L, 1); + } + return 2; +} - return 0; +/* method: close */ +static int zmq_msg_t__close__meth(lua_State *L) { + zmq_msg_t * this = obj_type_zmq_msg_t_check(L,1); + ZMQ_Error rc_zmq_msg_close = 0; + rc_zmq_msg_close = zmq_msg_close(this); + /* check for error. */ + if((0 != rc_zmq_msg_close)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, rc_zmq_msg_close); + } else { + lua_pushboolean(L, 1); + } + return 2; } -/* method: term */ -static int ZMQ_Ctx__term__meth(lua_State *L) { - ZMQ_Ctx * this = obj_type_ZMQ_Ctx_check(L,1); - ZMQ_Error rc_zmq_term = 0; - rc_zmq_term = zmq_term(this); +/* method: move */ +static int zmq_msg_t__move__meth(lua_State *L) { + zmq_msg_t * this = obj_type_zmq_msg_t_check(L,1); + zmq_msg_t * src = obj_type_zmq_msg_t_check(L,2); + ZMQ_Error rc_zmq_msg_move = 0; + rc_zmq_msg_move = zmq_msg_move(this, src); /* check for error. */ - if((0 != rc_zmq_term)) { + if((0 != rc_zmq_msg_move)) { lua_pushboolean(L, 0); - error_code__ZMQ_Error__push(L, rc_zmq_term); + error_code__ZMQ_Error__push(L, rc_zmq_msg_move); } else { lua_pushboolean(L, 1); } return 2; } -/* method: lightuserdata */ -static int ZMQ_Ctx__lightuserdata__meth(lua_State *L) { - ZMQ_Ctx * this = obj_type_ZMQ_Ctx_check(L,1); - void * ptr = NULL; - ptr = this; +/* method: copy */ +static int zmq_msg_t__copy__meth(lua_State *L) { + zmq_msg_t * this = obj_type_zmq_msg_t_check(L,1); + zmq_msg_t * src = obj_type_zmq_msg_t_check(L,2); + ZMQ_Error rc_zmq_msg_copy = 0; + rc_zmq_msg_copy = zmq_msg_copy(this, src); + /* check for error. */ + if((0 != rc_zmq_msg_copy)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, rc_zmq_msg_copy); + } else { + lua_pushboolean(L, 1); + } + return 2; +} - lua_pushlightuserdata(L, ptr); +/* method: set_data */ +static int zmq_msg_t__set_data__meth(lua_State *L) { + zmq_msg_t * this = obj_type_zmq_msg_t_check(L,1); + size_t data_len; + const char * data = luaL_checklstring(L,2,&(data_len)); + ZMQ_Error err = 0; + /* check message data size. */ + if(zmq_msg_size(this) != data_len) { + /* need to resize message. */ + zmq_msg_close(this); /* close old message, to free old data. */ + err = zmq_msg_init_size(this, data_len); /* re-initialize message. */ + if(0 != err) { + luaL_error(L, "set_data() failed: %s", get_zmq_strerror()); + } + } + /* copy data into message */ + memcpy(zmq_msg_data(this), data, data_len); + + /* check for error. */ + if((0 != err)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, err); + } else { + lua_pushboolean(L, 1); + } + return 2; +} + +/* method: data */ +static int zmq_msg_t__data__meth(lua_State *L) { + zmq_msg_t * this = obj_type_zmq_msg_t_check(L,1); + void * rc_zmq_msg_data = NULL; + rc_zmq_msg_data = zmq_msg_data(this); + lua_pushlightuserdata(L, rc_zmq_msg_data); return 1; } -/* method: socket */ -static int ZMQ_Ctx__socket__meth(lua_State *L) { - ZMQ_Ctx * this = obj_type_ZMQ_Ctx_check(L,1); - int type = luaL_checkinteger(L,2); - int rc_zmq_socket_flags = OBJ_UDATA_FLAG_OWN; - ZMQ_Socket rc_zmq_socket; - rc_zmq_socket = zmq_socket(this, type); - if((NULL == rc_zmq_socket)) { - lua_pushnil(L); - lua_pushstring(L, get_zmq_strerror()); +/* method: set_size */ +static int zmq_msg_t__set_size__meth(lua_State *L) { + zmq_msg_t * this = obj_type_zmq_msg_t_check(L,1); + size_t size = luaL_checkinteger(L,2); + ZMQ_Error err = 0; + /* check message data size. */ + if(zmq_msg_size(this) != size) { + /* need to resize message. */ + zmq_msg_close(this); /* close old message, to free old data. */ + err = zmq_msg_init_size(this, size); /* re-initialize message. */ + if(0 != err) { + luaL_error(L, "set_size() failed: %s", get_zmq_strerror()); + } + } + + /* check for error. */ + if((0 != err)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, err); } else { - obj_type_ZMQ_Socket_push(L, rc_zmq_socket, rc_zmq_socket_flags); + lua_pushboolean(L, 1); } + return 2; +} + +/* method: size */ +static int zmq_msg_t__size__meth(lua_State *L) { + zmq_msg_t * this = obj_type_zmq_msg_t_check(L,1); + size_t rc_zmq_msg_size = 0; + rc_zmq_msg_size = zmq_msg_size(this); + lua_pushinteger(L, rc_zmq_msg_size); + return 1; +} + +/* method: __tostring */ +static int zmq_msg_t____tostring__meth(lua_State *L) { + zmq_msg_t * this = obj_type_zmq_msg_t_check(L,1); + size_t data_len = 0; + const char * data = NULL; + data = zmq_msg_data(this); + data_len = zmq_msg_size(this); + + if(data == NULL) lua_pushnil(L); else lua_pushlstring(L, data,data_len); return 1; } @@ -1360,6 +1916,23 @@ static int ZMQ_Socket__events__meth(lua_State *L) { return 2; } +/* method: send_msg */ +static int ZMQ_Socket__send_msg__meth(lua_State *L) { + ZMQ_Socket * this = obj_type_ZMQ_Socket_check(L,1); + zmq_msg_t * msg = obj_type_zmq_msg_t_check(L,2); + int flags = luaL_optinteger(L,3,0); + ZMQ_Error rc_zmq_send = 0; + rc_zmq_send = zmq_send(this, msg, flags); + /* check for error. */ + if((0 != rc_zmq_send)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, rc_zmq_send); + } else { + lua_pushboolean(L, 1); + } + return 2; +} + /* method: send */ static int ZMQ_Socket__send__meth(lua_State *L) { ZMQ_Socket * this = obj_type_ZMQ_Socket_check(L,1); @@ -1367,17 +1940,7 @@ static int ZMQ_Socket__send__meth(lua_State *L) { const char * data = luaL_checklstring(L,2,&(data_len)); int flags = luaL_optinteger(L,3,0); ZMQ_Error err = 0; - zmq_msg_t msg; - /* initialize message */ - err = zmq_msg_init_size(&msg, data_len); - if(0 == err) { - /* fill message */ - memcpy(zmq_msg_data(&msg), data, data_len); - /* send message */ - err = zmq_send(this, &msg, flags); - /* close message */ - zmq_msg_close(&msg); - } + err = simple_zmq_send(this, data, data_len, flags); /* check for error. */ if((0 != err)) { @@ -1389,6 +1952,23 @@ static int ZMQ_Socket__send__meth(lua_State *L) { return 2; } +/* method: recv_msg */ +static int ZMQ_Socket__recv_msg__meth(lua_State *L) { + ZMQ_Socket * this = obj_type_ZMQ_Socket_check(L,1); + zmq_msg_t * msg = obj_type_zmq_msg_t_check(L,2); + int flags = luaL_optinteger(L,3,0); + ZMQ_Error rc_zmq_recv = 0; + rc_zmq_recv = zmq_recv(this, msg, flags); + /* check for error. */ + if((0 != rc_zmq_recv)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, rc_zmq_recv); + } else { + lua_pushboolean(L, 1); + } + return 2; +} + /* method: recv */ static int ZMQ_Socket__recv__meth(lua_State *L) { ZMQ_Socket * this = obj_type_ZMQ_Socket_check(L,1); @@ -1420,35 +2000,181 @@ static int ZMQ_Socket__recv__meth(lua_State *L) { return 2; } +/* method: delete */ +static int ZMQ_Ctx__delete__meth(lua_State *L) { + int this_flags = 0; + ZMQ_Ctx * this = obj_type_ZMQ_Ctx_delete(L,1,&(this_flags)); + if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; } + if(this_flags & OBJ_UDATA_CTX_SHOULD_FREE) { + zmq_term(this); + } + return 0; +} -static const luaL_reg obj_ZMQ_Ctx_pub_funcs[] = { +/* method: term */ +static int ZMQ_Ctx__term__meth(lua_State *L) { + ZMQ_Ctx * this = obj_type_ZMQ_Ctx_check(L,1); + ZMQ_Error rc_zmq_term = 0; + rc_zmq_term = zmq_term(this); + /* check for error. */ + if((0 != rc_zmq_term)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, rc_zmq_term); + } else { + lua_pushboolean(L, 1); + } + return 2; +} + +/* method: lightuserdata */ +static int ZMQ_Ctx__lightuserdata__meth(lua_State *L) { + ZMQ_Ctx * this = obj_type_ZMQ_Ctx_check(L,1); + void * ptr = NULL; + ptr = this; + + lua_pushlightuserdata(L, ptr); + return 1; +} + +/* method: socket */ +static int ZMQ_Ctx__socket__meth(lua_State *L) { + ZMQ_Ctx * this = obj_type_ZMQ_Ctx_check(L,1); + int type = luaL_checkinteger(L,2); + int rc_zmq_socket_flags = OBJ_UDATA_FLAG_OWN; + ZMQ_Socket rc_zmq_socket; + rc_zmq_socket = zmq_socket(this, type); + if((NULL == rc_zmq_socket)) { + lua_pushnil(L); + lua_pushstring(L, get_zmq_strerror()); + } else { + obj_type_ZMQ_Socket_push(L, rc_zmq_socket, rc_zmq_socket_flags); + } + return 1; +} + +/* method: version */ +static int zmq__version__func(lua_State *L) { + int major, minor, patch; + zmq_version(&(major), &(minor), &(patch)); + + /* return version as a table: { major, minor, patch } */ + lua_createtable(L, 3, 0); + lua_pushinteger(L, major); + lua_rawseti(L, -2, 1); + lua_pushinteger(L, minor); + lua_rawseti(L, -2, 2); + lua_pushinteger(L, patch); + lua_rawseti(L, -2, 3); + + return 1; +} + +/* method: init */ +static int zmq__init__func(lua_State *L) { + int io_threads = luaL_checkinteger(L,1); + int rc_zmq_init_flags = OBJ_UDATA_FLAG_OWN; + ZMQ_Ctx rc_zmq_init; + rc_zmq_init = zmq_init(io_threads); + if((NULL == rc_zmq_init)) { + lua_pushnil(L); + lua_pushstring(L, get_zmq_strerror()); + } else { + obj_type_ZMQ_Ctx_push(L, rc_zmq_init, rc_zmq_init_flags); + } + return 1; +} + +/* method: init_ctx */ +static int zmq__init_ctx__func(lua_State *L) { + int ctx_flags = OBJ_UDATA_FLAG_OWN; + ZMQ_Ctx ctx; + if(lua_isuserdata(L, 1)) { + ctx = lua_touserdata(L, 1); + } else { + /* check if value is a LuaJIT 'cdata' */ + int type = lua_type(L, 1); + const char *typename = lua_typename(L, type); + if(strncmp(typename, "cdata", sizeof("cdata")) == 0) { + ctx = (void *)lua_topointer(L, 1); + } else { + return luaL_argerror(L, 1, "(expected userdata)"); + } + } + + if((NULL == ctx)) { + lua_pushnil(L); + lua_pushstring(L, get_zmq_strerror()); + } else { + obj_type_ZMQ_Ctx_push(L, ctx, ctx_flags); + } + return 1; +} + +/* method: device */ +static int zmq__device__func(lua_State *L) { + int device = luaL_checkinteger(L,1); + ZMQ_Socket insock = obj_type_ZMQ_Socket_check(L,2); + ZMQ_Socket outsock = obj_type_ZMQ_Socket_check(L,3); + ZMQ_Error rc_zmq_device = 0; + rc_zmq_device = zmq_device(device, insock, outsock); + /* check for error. */ + if((0 != rc_zmq_device)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, rc_zmq_device); + } else { + lua_pushboolean(L, 1); + } + return 2; +} + +/* method: dump_ffi */ +static int zmq__dump_ffi__func(lua_State *L) { + size_t ffi_code_len = 0; + const char * ffi_code = NULL; + ffi_code = zmq_ffi_lua_code; + ffi_code_len = sizeof(zmq_ffi_lua_code) - 1; + + if(ffi_code == NULL) lua_pushnil(L); else lua_pushlstring(L, ffi_code,ffi_code_len); + return 1; +} + + + +static const luaL_reg obj_zmq_msg_t_pub_funcs[] = { + {"init", zmq_msg_t__init__meth}, + {"init_size", zmq_msg_t__init_size__meth}, + {"init_data", zmq_msg_t__init_data__meth}, {NULL, NULL} }; -static const luaL_reg obj_ZMQ_Ctx_methods[] = { - {"term", ZMQ_Ctx__term__meth}, - {"lightuserdata", ZMQ_Ctx__lightuserdata__meth}, - {"socket", ZMQ_Ctx__socket__meth}, +static const luaL_reg obj_zmq_msg_t_methods[] = { + {"close", zmq_msg_t__close__meth}, + {"move", zmq_msg_t__move__meth}, + {"copy", zmq_msg_t__copy__meth}, + {"set_data", zmq_msg_t__set_data__meth}, + {"data", zmq_msg_t__data__meth}, + {"set_size", zmq_msg_t__set_size__meth}, + {"size", zmq_msg_t__size__meth}, {NULL, NULL} }; -static const luaL_reg obj_ZMQ_Ctx_metas[] = { - {"__gc", ZMQ_Ctx__delete__meth}, - {"__tostring", obj_udata_default_tostring}, - {"__eq", obj_udata_default_equal}, +static const luaL_reg obj_zmq_msg_t_metas[] = { + {"__gc", zmq_msg_t__delete__meth}, + {"__tostring", zmq_msg_t____tostring__meth}, + {"__eq", obj_simple_udata_default_equal}, {NULL, NULL} }; -static const obj_base obj_ZMQ_Ctx_bases[] = { +static const obj_base obj_zmq_msg_t_bases[] = { {-1, NULL} }; -static const obj_field obj_ZMQ_Ctx_fields[] = { +static const obj_field obj_zmq_msg_t_fields[] = { {NULL, 0, 0, 0} }; -static const obj_const obj_ZMQ_Ctx_constants[] = { +static const obj_const obj_zmq_msg_t_constants[] = { {NULL, NULL, 0.0 , 0} }; @@ -1463,7 +2189,9 @@ static const luaL_reg obj_ZMQ_Socket_methods[] = { {"setopt", ZMQ_Socket__setopt__meth}, {"getopt", ZMQ_Socket__getopt__meth}, {"events", ZMQ_Socket__events__meth}, + {"send_msg", ZMQ_Socket__send_msg__meth}, {"send", ZMQ_Socket__send__meth}, + {"recv_msg", ZMQ_Socket__recv_msg__meth}, {"recv", ZMQ_Socket__recv__meth}, {NULL, NULL} }; @@ -1487,10 +2215,42 @@ static const obj_const obj_ZMQ_Socket_constants[] = { {NULL, NULL, 0.0 , 0} }; +static const luaL_reg obj_ZMQ_Ctx_pub_funcs[] = { + {NULL, NULL} +}; + +static const luaL_reg obj_ZMQ_Ctx_methods[] = { + {"term", ZMQ_Ctx__term__meth}, + {"lightuserdata", ZMQ_Ctx__lightuserdata__meth}, + {"socket", ZMQ_Ctx__socket__meth}, + {NULL, NULL} +}; + +static const luaL_reg obj_ZMQ_Ctx_metas[] = { + {"__gc", ZMQ_Ctx__delete__meth}, + {"__tostring", obj_udata_default_tostring}, + {"__eq", obj_udata_default_equal}, + {NULL, NULL} +}; + +static const obj_base obj_ZMQ_Ctx_bases[] = { + {-1, NULL} +}; + +static const obj_field obj_ZMQ_Ctx_fields[] = { + {NULL, 0, 0, 0} +}; + +static const obj_const obj_ZMQ_Ctx_constants[] = { + {NULL, NULL, 0.0 , 0} +}; + static const luaL_reg zmq_function[] = { {"version", zmq__version__func}, {"init", zmq__init__func}, + {"init_ctx", zmq__init_ctx__func}, {"device", zmq__device__func}, + {"dump_ffi", zmq__dump_ffi__func}, {NULL, NULL} }; @@ -1538,11 +2298,18 @@ static const obj_const zmq_constants[] = { {NULL, NULL, 0.0 , 0} }; +static const ffi_export_symbol zmq_ffi_export[] = { +{ "get_zmq_strerror", get_zmq_strerror }, +{ "simple_zmq_send", simple_zmq_send }, + {NULL, NULL} +}; + static const reg_sub_module reg_sub_modules[] = { - { &(obj_type_ZMQ_Ctx), 0, obj_ZMQ_Ctx_pub_funcs, obj_ZMQ_Ctx_methods, obj_ZMQ_Ctx_metas, obj_ZMQ_Ctx_bases, obj_ZMQ_Ctx_fields, obj_ZMQ_Ctx_constants}, + { &(obj_type_zmq_msg_t), 0, obj_zmq_msg_t_pub_funcs, obj_zmq_msg_t_methods, obj_zmq_msg_t_metas, obj_zmq_msg_t_bases, obj_zmq_msg_t_fields, obj_zmq_msg_t_constants}, { &(obj_type_ZMQ_Socket), 0, obj_ZMQ_Socket_pub_funcs, obj_ZMQ_Socket_methods, obj_ZMQ_Socket_metas, obj_ZMQ_Socket_bases, obj_ZMQ_Socket_fields, obj_ZMQ_Socket_constants}, + { &(obj_type_ZMQ_Ctx), 0, obj_ZMQ_Ctx_pub_funcs, obj_ZMQ_Ctx_methods, obj_ZMQ_Ctx_metas, obj_ZMQ_Ctx_bases, obj_ZMQ_Ctx_fields, obj_ZMQ_Ctx_constants}, {NULL, 0, NULL, NULL, NULL, NULL, NULL, NULL} }; @@ -1580,15 +2347,23 @@ static void create_object_instance_cache(lua_State *L) { int luaopen_zmq(lua_State *L) { const reg_sub_module *reg = reg_sub_modules; const luaL_Reg *submodules = submodule_libs; + int priv_table = -1; + +#if LUAJIT_FFI + /* private table to hold reference to object metatables. */ + lua_newtable(L); + priv_table = lua_gettop(L); +#endif + + /* create object cache. */ + create_object_instance_cache(L); + /* module table. */ luaL_register(L, "zmq", zmq_function); /* register module constants. */ obj_type_register_constants(L, zmq_constants, -1); - /* create object cache. */ - create_object_instance_cache(L); - for(; submodules->func != NULL ; submodules++) { lua_pushcfunction(L, submodules->func); lua_pushstring(L, submodules->name); @@ -1604,11 +2379,12 @@ int luaopen_zmq(lua_State *L) { lua_pushvalue(L, -1); /* dup value. */ lua_setglobal(L, reg->type->name); /* global: = */ #endif - obj_type_register(L, reg); + obj_type_register(L, reg, priv_table); } #if LUAJIT_FFI - nobj_try_loading_ffi(L, zmq_ffi_lua_code); + nobj_try_loading_ffi(L, "zmq", zmq_ffi_lua_code, + zmq_ffi_export, priv_table); #endif return 1; } diff --git a/src/socket.nobj.lua b/src/socket.nobj.lua index 112e91f..900a00f 100644 --- a/src/socket.nobj.lua +++ b/src/socket.nobj.lua @@ -20,6 +20,9 @@ object "ZMQ_Socket" { error_on_null = "get_zmq_strerror()", + ffi_cdef[[ +typedef void * ZMQ_Socket; +]], c_source [[ /* detect zmq version >= 2.1.0 */ #define VERSION_2_1 0 @@ -87,6 +90,10 @@ static const int opt_types[] = { method "connect" { c_method_call "ZMQ_Error" "zmq_connect" { "const char *", "addr" } }, + ffi_cdef[[ +int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen); +int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen); +]], method "setopt" { var_in{ "uint32_t", "opt" }, var_in{ "", "val" }, @@ -221,34 +228,75 @@ static const int opt_types[] = { lua_pushnil(L); ]] }, + ffi_source[[ +-- temp. values for 'events' function. +local events_tmp = ffi.new('uint32_t[1]', 0) +local events_tmp_size = ffi.sizeof('uint32_t') +local events_tmp_len = ffi.new('size_t[1]', events_tmp_size) +local ZMQ_EVENTS = _M.EVENTS +]], method "events" { var_out{ "uint32_t", "events" }, var_out{ "ZMQ_Error", "err" }, c_source[[ size_t val_len = sizeof(${events}); ${err} = zmq_getsockopt(${this}, ZMQ_EVENTS, &(${events}), &val_len); -]] +]], + ffi_source[[ + events_tmp_len[0] = events_tmp_size + ${err} = C.zmq_getsockopt(${this}, ZMQ_EVENTS, events_tmp, events_tmp_len); + ${events} = events_tmp[0] +]], }, - method "send" { - var_in{ "const char *", "data" }, - var_in{ "int", "flags?" }, - var_out{ "ZMQ_Error", "err" }, - c_source[[ + -- + -- zmq_send + -- + method "send_msg" { + c_method_call "ZMQ_Error" "zmq_send" { "zmq_msg_t *", "msg", "int", "flags?" }, + }, + -- create helper function for `zmq_send` + c_source[[ +static ZMQ_Error simple_zmq_send(ZMQ_Socket sock, const char *data, size_t data_len, int flags) { + ZMQ_Error err; zmq_msg_t msg; /* initialize message */ - ${err} = zmq_msg_init_size(&msg, ${data}_len); - if(0 == ${err}) { + err = zmq_msg_init_size(&msg, data_len); + if(0 == err) { /* fill message */ - memcpy(zmq_msg_data(&msg), ${data}, ${data}_len); + memcpy(zmq_msg_data(&msg), data, data_len); /* send message */ - ${err} = zmq_send(${this}, &msg, ${flags}); + err = zmq_send(sock, &msg, flags); /* close message */ zmq_msg_close(&msg); } -]] + return err; +} +]], + -- export helper function. + ffi_export_function "ZMQ_Error" "simple_zmq_send" + "(ZMQ_Socket sock, const char *data, size_t data_len, int flags)", + method "send" { + var_in{ "const char *", "data" }, + var_in{ "int", "flags?" }, + var_out{ "ZMQ_Error", "err" }, + c_source[[ + ${err} = simple_zmq_send(${this}, ${data}, ${data_len}, ${flags}); +]], + ffi_source[[ + ${err} = simple_zmq_send(${this}, ${data}, ${data_len}, ${flags}); +]], }, + -- + -- zmq_recv + -- + method "recv_msg" { + c_method_call "ZMQ_Error" "zmq_recv" { "zmq_msg_t *", "msg", "int", "flags?" }, + }, + ffi_source[[ +local tmp_msg = ffi.new('zmq_msg_t') +]], method "recv" { - var_in{ "int", "flags", is_optional = true }, + var_in{ "int", "flags?" }, var_out{ "const char *", "data", has_length = true }, var_out{ "ZMQ_Error", "err" }, c_source[[ @@ -260,14 +308,34 @@ static const int opt_types[] = { ${err} = zmq_recv(${this}, &msg, ${flags}); if(0 == ${err}) { ${data} = zmq_msg_data(&msg); - ${data}_len = zmq_msg_size(&msg); + ${data_len} = zmq_msg_size(&msg); } } ]], c_source "post" [[ /* close message */ zmq_msg_close(&msg); -]] +]], + ffi_source[[ + local msg = tmp_msg + -- initialize blank message. + if C.zmq_msg_init(msg) < 0 then + return nil, get_zmq_strerror() + end + + -- receive message + ${err} = C.zmq_recv(${this}, msg, ${flags}) + if 0 == ${err} then + local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg)) + -- close message + C.zmq_msg_close(msg) + return data + end +]], + ffi_source "ffi_post" [[ + -- close message + C.zmq_msg_close(msg) +]], }, } diff --git a/src/zmq_ffi.nobj.lua b/src/zmq_ffi.nobj.lua deleted file mode 100644 index bf146db..0000000 --- a/src/zmq_ffi.nobj.lua +++ /dev/null @@ -1,347 +0,0 @@ --- Copyright (c) 2010 by Robert G. Jakabosky --- --- Permission is hereby granted, free of charge, to any person obtaining a copy --- of this software and associated documentation files (the "Software"), to deal --- in the Software without restriction, including without limitation the rights --- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell --- copies of the Software, and to permit persons to whom the Software is --- furnished to do so, subject to the following conditions: --- --- The above copyright notice and this permission notice shall be included in --- all copies or substantial portions of the Software. --- --- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR --- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, --- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE --- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER --- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, --- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN --- THE SOFTWARE. - -local zmq = ... - --- try loading luajit's ffi -local stat, ffi=pcall(require,"ffi") -if not stat then - print("No FFI module: ZMQ using standard Lua api interface.") - return -end --- check if ffi is disabled. -if disable_ffi then - print("FFI disabled: ZMQ using standard Lua api interface.") - return -end - -local setmetatable = setmetatable -local getmetatable = getmetatable -local print = print -local pairs = pairs -local error = error -local type = type -local assert = assert -local tostring = tostring -local tonumber = tonumber -local newproxy = newproxy - -ffi.cdef[[ -void zmq_version (int *major, int *minor, int *patch); -int zmq_errno (); -const char *zmq_strerror (int errnum); - -typedef struct zmq_msg_t -{ - void *content; - unsigned char flags; - unsigned char vsm_size; - unsigned char vsm_data [30]; -} zmq_msg_t; - -typedef void (zmq_free_fn) (void *data, void *hint); - -int zmq_msg_init (zmq_msg_t *msg); -int zmq_msg_init_size (zmq_msg_t *msg, size_t size); -int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint); -int zmq_msg_close (zmq_msg_t *msg); -int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); -int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); -void *zmq_msg_data (zmq_msg_t *msg); -size_t zmq_msg_size (zmq_msg_t *msg); - -void *zmq_init (int io_threads); -int zmq_term (void *context); - -void *zmq_socket (void *context, int type); -int zmq_close (void *s); -int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen); -int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen); -int zmq_bind (void *s, const char *addr); -int zmq_connect (void *s, const char *addr); -int zmq_send (void *s, zmq_msg_t *msg, int flags); -int zmq_recv (void *s, zmq_msg_t *msg, int flags); - -int zmq_device (int device, void * insocket, void* outsocket); - -]] - -local C = ffi.load"zmq" - --- simulate: module(...) -zmq._M = zmq -setfenv(1, zmq) - -function version() - local major = ffi.new('int[1]',0) - local minor = ffi.new('int[1]',0) - local patch = ffi.new('int[1]',0) - C.zmq_version(major, minor, patch) - return {major[0], minor[0], patch[0]} -end - -local function zmq_error() - local errno = C.zmq_errno() - local err = ffi.string(C.zmq_strerror(errno)) - if err == "Resource temporarily unavailable" then err = "timeout" end - if err == "Context was terminated" then err = "closed" end - return nil, err -end - --- --- ZMQ socket --- -local sock_mt = {} -sock_mt.__index = sock_mt - -function sock_mt:close() - -- get the true self - self=getmetatable(self) - local sock = self.sock - -- make sure socket is still valid. - if not sock then return end - -- mark this socket as closed. - self.sock = nil - -- close zmq socket. - if C.zmq_close(sock) ~= 0 then - return zmq_error() - end - return true -end - -local function sock__gc(self) - self:close() -end - -local option_types = { -[zmq.HWM] = 'uint64_t[1]', -[zmq.SWAP] = 'int64_t[1]', -[zmq.AFFINITY] = 'uint64_t[1]', -[zmq.IDENTITY] = 'string', -[zmq.SUBSCRIBE] = 'string', -[zmq.UNSUBSCRIBE] = 'string', -[zmq.RATE] = 'int64_t[1]', -[zmq.RECOVERY_IVL] = 'int64_t[1]', -[zmq.MCAST_LOOP] = 'int64_t[1]', -[zmq.SNDBUF] = 'uint64_t[1]', -[zmq.RCVBUF] = 'uint64_t[1]', -[zmq.RCVMORE] = 'int64_t[1]', -[zmq.FD] = 'int[1]', -[zmq.EVENTS] = 'uint32_t[1]', -[zmq.TYPE] = 'int[1]', -[zmq.LINGER] = 'int[1]', -[zmq.RECONNECT_IVL] = 'int[1]', -[zmq.BACKLOG] = 'int[1]', -} -local option_len = {} -local option_tmps = {} -for k,v in pairs(option_types) do - if v ~= 'string' then - option_len[k] = ffi.sizeof(v) - option_tmps[k] = ffi.new(v, 0) - end -end -function sock_mt:setopt(opt, opt_val) - local ctype = option_types[opt] - local val_len = 0 - if ctype == 'string' then - --val = ffi.cast('void *', tostring(val)) - val = tostring(opt_val) - val_len = #val - else - val = option_tmps[opt] - val[0] = opt_val - val_len = option_len[opt] - end - local ret = C.zmq_setsockopt(self.sock, opt, val, val_len) - if ret ~= 0 then - return zmq_error() - end - return true -end - -local tmp_val_len = ffi.new('size_t[1]', 4) -function sock_mt:getopt(opt) - local ctype = option_types[opt] - local val - local val_len = tmp_val_len - if ctype == 'string' then - val_len[0] = 255 - val = ffi.new('uint8_t[?]', val_len[0]) - ffi.fill(val, val_len[0]) - else - val = option_tmps[opt] - val[0] = 0 - val_len[0] = option_len[opt] - end - local ret = C.zmq_getsockopt(self.sock, opt, val, val_len) - if ret ~= 0 then - return zmq_error() - end - if ctype == 'string' then - val_len = val_len[0] - return ffi.string(val, val_len) - else - val = val[0] - end - return tonumber(val) -end - -local tmp32 = ffi.new('uint32_t[1]', 0) -local tmp32_len = ffi.new('size_t[1]', 4) -function sock_mt:events() - local val = tmp32 - local val_len = tmp32_len - val[0] = 0 - val_len[0] = 4 - local ret = C.zmq_getsockopt(self.sock, 15, val, val_len) - if ret ~= 0 then - return zmq_error() - end - return val[0] -end - -function sock_mt:bind(addr) - local ret = C.zmq_bind(self.sock, addr) - if ret ~= 0 then - return zmq_error() - end - return true -end - -function sock_mt:connect(addr) - local ret = C.zmq_connect(self.sock, addr) - if ret ~= 0 then - return zmq_error() - end - return true -end - -local tmp_msg = ffi.new('zmq_msg_t') -function sock_mt:send(data, flags) - local msg = tmp_msg - local msg_len = #data - -- initialize message - if C.zmq_msg_init_size(msg, msg_len) < 0 then - return zmq_error() - end - -- copy data into message. - ffi.copy(C.zmq_msg_data(msg), data, msg_len) - - -- send message - local ret = C.zmq_send(self.sock, msg, flags or 0) - -- close message before processing return code - C.zmq_msg_close(msg) - -- now process send return code - if ret ~= 0 then - return zmq_error() - end - return true -end - -function sock_mt:recv(flags) - local msg = tmp_msg - -- initialize blank message. - if C.zmq_msg_init(msg) < 0 then - return zmq_error() - end - - -- receive message - local ret = C.zmq_recv(self.sock, msg, flags or 0) - if ret ~= 0 then - local data, err = zmq_error() - C.zmq_msg_close(msg) - return data, err - end - local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg)) - -- close message - C.zmq_msg_close(msg) - return data -end - --- --- ZMQ context --- -local ctx_mt = {} -ctx_mt.__index = ctx_mt - -function ctx_mt:term() - -- get the true self - self=getmetatable(self) - local ctx = self.ctx - self.ctx = nil - -- make sure context is valid. - if not ctx then return end - if C.zmq_term(ctx) ~= 0 then - return zmq_error() - end - return true -end - -function ctx_mt:lightuserdata() - return self.ctx -end - -function ctx_mt:socket(sock_type) - local sock = C.zmq_socket(self.ctx, sock_type) - if not sock then - return zmq_error() - end - -- use a wrapper newproxy for __gc support - local self=newproxy(true) - local meta=getmetatable(self) - meta.__index = meta - meta.sock = sock - meta.__gc = sock__gc - setmetatable(meta, sock_mt) - return self -end - -local function ctx__gc(self) - if self.should_free then - self:term() - end -end - -function init(io_threads) - local should_free = true - local ctx - print("ZMQ using FFI interface.") - if type(io_threads) == 'number' then - ctx = C.zmq_init(io_threads) - if not ctx then - return zmq_error() - end - else - should_free = false - -- should be lightuserdata or cdata - ctx = io_threads - end - -- use a wrapper newproxy for __gc support - local self=newproxy(true) - local meta=getmetatable(self) - meta.__index = meta - meta.ctx = ctx - meta.should_free = should_free - meta.__gc = ctx__gc - setmetatable(meta, ctx_mt) - return self -end - diff --git a/zmq.nobj.lua b/zmq.nobj.lua index 56865b7..575a52f 100644 --- a/zmq.nobj.lua +++ b/zmq.nobj.lua @@ -1,4 +1,5 @@ +-- make generated variable nicer. set_variable_format "%s" c_module "zmq" { @@ -10,9 +11,20 @@ luajit_ffi = true, sys_include "string.h", include "zmq.h", +ffi_load "zmq", + c_source[[ #define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1) +]], +ffi_source[[ +local OBJ_UDATA_CTX_SHOULD_FREE = (OBJ_UDATA_LAST_FLAG * 2) +]], +c_source[[ +/* + * This wrapper function is to make the EAGAIN/ETERM error messages more like + * what is returned by LuaSocket. + */ static const char *get_zmq_strerror() { int err = zmq_errno(); switch(err) { @@ -30,6 +42,16 @@ static const char *get_zmq_strerror() { ]], +-- export helper function 'get_zmq_strerror' to FFI code. +ffi_export_function "const char *" "get_zmq_strerror" "()", +ffi_source[[ +local C_get_zmq_strerror = get_zmq_strerror +-- make nicer wrapper for exported error function. +local function get_zmq_strerror() + return ffi.string(C_get_zmq_strerror()) +end +]], + -- -- Module constants -- @@ -90,6 +112,14 @@ FORWARDER = 2, QUEUE = 3, }, + +subfiles { +"src/error.nobj.lua", +"src/msg.nobj.lua", +"src/socket.nobj.lua", +"src/ctx.nobj.lua", +}, + -- -- Module static functions -- @@ -107,41 +137,44 @@ c_function "version" { lua_rawseti(L, -2, 2); lua_pushinteger(L, patch); lua_rawseti(L, -2, 3); -]] +]], }, c_function "init" { - var_in{ "", "io_threads" }, + c_call "!ZMQ_Ctx" "zmq_init" { "int", "io_threads" }, +}, +c_function "init_ctx" { + var_in{ "", "ptr" }, var_out{ "ZMQ_Ctx", "!ctx" }, c_source[[ - if(lua_isnumber(L, ${io_threads::idx})) { - ${ctx} = zmq_init(lua_tointeger(L,${io_threads::idx})); - ${ctx}_flags |= OBJ_UDATA_CTX_SHOULD_FREE; - } else if(lua_isuserdata(L, ${io_threads::idx})) { - ${ctx} = lua_touserdata(L, ${io_threads::idx}); + if(lua_isuserdata(L, ${ptr::idx})) { + ${ctx} = lua_touserdata(L, ${ptr::idx}); } else { /* check if value is a LuaJIT 'cdata' */ - int type = lua_type(L, ${io_threads::idx}); + int type = lua_type(L, ${ptr::idx}); const char *typename = lua_typename(L, type); if(strncmp(typename, "cdata", sizeof("cdata")) == 0) { - ${ctx} = (void *)lua_topointer(L, ${io_threads::idx}); + ${ctx} = (void *)lua_topointer(L, ${ptr::idx}); } else { - return luaL_argerror(L, ${io_threads::idx}, "(expected number)"); + return luaL_argerror(L, ${ptr::idx}, "(expected userdata)"); } } -]] +]], }, c_function "device" { c_call "ZMQ_Error" "zmq_device" { "int", "device", "ZMQ_Socket", "insock", "ZMQ_Socket", "outsock" }, }, -ffi_files { -"src/zmq_ffi.nobj.lua", -}, -subfiles { -"src/error.nobj.lua", -"src/ctx.nobj.lua", -"src/socket.nobj.lua", +-- +-- This dump function is for getting a copy of the FFI-based bindings code and is +-- only for debugging. +-- +c_function "dump_ffi" { + var_out{ "const char *", "ffi_code", has_length = true, }, + c_source[[ + ${ffi_code} = ${module_c_name}_ffi_lua_code; + ${ffi_code_len} = sizeof(${module_c_name}_ffi_lua_code) - 1; +]], }, } diff --git a/zmq_ffi.lua b/zmq_ffi.lua deleted file mode 100644 index 329ae4b..0000000 --- a/zmq_ffi.lua +++ /dev/null @@ -1,347 +0,0 @@ ---[[ --- --- This is just an normal LuaJIT2 FFI based bindings for zmq. --- It is only for testing and comparison. --- ---]] - -local setmetatable = setmetatable -local print = print -local pairs = pairs -local error = error -local type = type -local assert = assert -local tostring = tostring -local tonumber = tonumber - -local zmq = { -MAX_VSM_SIZE = 30, - --- message types -DELIMITER = 31, -VSM = 32, - --- message flags -MSG_MORE = 1, -MSG_SHARED = 128, - --- socket types -PAIR = 0, -PUB = 1, -SUB = 2, -REQ = 3, -REP = 4, -XREQ = 5, -XREP = 6, -PULL = 7, -PUSH = 8, - --- socket options -HWM = 1, -SWAP = 3, -AFFINITY = 4, -IDENTITY = 5, -SUBSCRIBE = 6, -UNSUBSCRIBE = 7, -RATE = 8, -RECOVERY_IVL = 9, -MCAST_LOOP = 10, -SNDBUF = 11, -RCVBUF = 12, -RCVMORE = 13, -FD = 14, -EVENTS = 15, -TYPE = 16, -LINGER = 17, -RECONNECT_IVL = 18, -BACKLOG = 19, - --- send/recv flags -NOBLOCK = 1, -SNDMORE = 2, - --- poll events -POLLIN = 1, -POLLOUT = 2, -POLLERR = 4, - --- devices -STREAMER = 1, -FORWARDER = 2, -QUEUE = 3, -} - -local z_SUBSCRIBE = zmq.SUBSCRIBE -local z_UNSUBSCRIBE = zmq.UNSUBSCRIBE -local z_IDENTITY = zmq.IDENTITY -local z_NOBLOCK = zmq.NOBLOCK -local z_RCVMORE = zmq.RCVMORE -local z_SNDMORE = zmq.SNDMORE -local z_EVENTS = zmq.EVENTS -local z_POLLIN = zmq.POLLIN -local z_POLLOUT = zmq.POLLOUT -local z_POLLIN_OUT = z_POLLIN + z_POLLOUT - -local ffi=require"ffi" -ffi.cdef[[ -void zmq_version (int *major, int *minor, int *patch); -int zmq_errno (); -const char *zmq_strerror (int errnum); - -typedef struct zmq_msg_t -{ - void *content; - unsigned char flags; - unsigned char vsm_size; - unsigned char vsm_data [30]; -} zmq_msg_t; - -typedef void (zmq_free_fn) (void *data, void *hint); - -int zmq_msg_init (zmq_msg_t *msg); -int zmq_msg_init_size (zmq_msg_t *msg, size_t size); -int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint); -int zmq_msg_close (zmq_msg_t *msg); -int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); -int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); -void *zmq_msg_data (zmq_msg_t *msg); -size_t zmq_msg_size (zmq_msg_t *msg); - -void *zmq_init (int io_threads); -int zmq_term (void *context); - -void *zmq_socket (void *context, int type); -int zmq_close (void *s); -int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen); -int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen); -int zmq_bind (void *s, const char *addr); -int zmq_connect (void *s, const char *addr); -int zmq_send (void *s, zmq_msg_t *msg, int flags); -int zmq_recv (void *s, zmq_msg_t *msg, int flags); - -int zmq_device (int device, void * insocket, void* outsocket); - -]] - -local c_zmq = ffi.load"zmq" - -module(...) - --- copy constants -for k,v in pairs(zmq) do - -- only copy upper-case string values. - if type(k) == 'string' and k == k:upper() then - _M[k] = v - end -end - -function version() - local major = ffi.new('int[1]',0) - local minor = ffi.new('int[1]',0) - local patch = ffi.new('int[1]',0) - c_zmq.zmq_version(major, minor, patch) - return {major[0], minor[0], patch[0]} -end - -local function zmq_error() - local errno = c_zmq.zmq_errno() - local err = ffi.string(c_zmq.zmq_strerror(errno)) - if err == "Resource temporarily unavailable" then err = "timeout" end - if err == "Context was terminated" then err = "closed" end - return nil, err -end - --- --- ZMQ socket --- -local sock_mt = {} -sock_mt.__index = sock_mt - -local function new_socket(ctx, sock_type) - local sock = c_zmq.zmq_socket(ctx, sock_type) - if not sock then - return zmq_error() - end - return setmetatable({ sock = sock }, sock_mt) -end - -function sock_mt:close() - local ret = c_zmq.zmq_close(self.sock) - self.sock = nil - if ret ~= 0 then - return zmq_error() - end - return true -end - -local option_types = { -[zmq.HWM] = 'uint64_t[1]', -[zmq.SWAP] = 'int64_t[1]', -[zmq.AFFINITY] = 'uint64_t[1]', -[zmq.IDENTITY] = 'string', -[zmq.SUBSCRIBE] = 'string', -[zmq.UNSUBSCRIBE] = 'string', -[zmq.RATE] = 'int64_t[1]', -[zmq.RECOVERY_IVL] = 'int64_t[1]', -[zmq.MCAST_LOOP] = 'int64_t[1]', -[zmq.SNDBUF] = 'uint64_t[1]', -[zmq.RCVBUF] = 'uint64_t[1]', -[zmq.RCVMORE] = 'int64_t[1]', -[zmq.FD] = 'int[1]', -[zmq.EVENTS] = 'uint32_t[1]', -[zmq.TYPE] = 'int[1]', -[zmq.LINGER] = 'int[1]', -[zmq.RECONNECT_IVL] = 'int[1]', -[zmq.BACKLOG] = 'int[1]', -} -local option_len = {} -local option_tmps = {} -for k,v in pairs(option_types) do - if v ~= 'string' then - option_len[k] = ffi.sizeof(v) - option_tmps[k] = ffi.new(v, 0) - end -end -function sock_mt:setopt(opt, opt_val) - local ctype = option_types[opt] - local val_len = 0 - if ctype == 'string' then - --val = ffi.cast('void *', tostring(val)) - val = tostring(opt_val) - val_len = #val - else - val = option_tmps[opt] - val[0] = opt_val - val_len = option_len[opt] - end - local ret = c_zmq.zmq_setsockopt(self.sock, opt, val, val_len) - if ret ~= 0 then - return zmq_error() - end - return true -end - -local tmp_val_len = ffi.new('size_t[1]', 4) -function sock_mt:getopt(opt) - local ctype = option_types[opt] - local val - local val_len = tmp_val_len - if ctype == 'string' then - val_len[0] = 255 - val = ffi.new('uint8_t[?]', val_len[0]) - ffi.fill(val, val_len[0]) - else - val = option_tmps[opt] - val[0] = 0 - val_len[0] = option_len[opt] - end - local ret = c_zmq.zmq_getsockopt(self.sock, opt, val, val_len) - if ret ~= 0 then - return zmq_error() - end - if ctype == 'string' then - val_len = val_len[0] - return ffi.string(val, val_len) - else - val = val[0] - end - return tonumber(val) -end - -local tmp32 = ffi.new('uint32_t[1]', 0) -local tmp32_len = ffi.new('size_t[1]', 4) -function sock_mt:events() - local val = tmp32 - local val_len = tmp32_len - val[0] = 0 - val_len[0] = 4 - local ret = c_zmq.zmq_getsockopt(self.sock, 15, val, val_len) - if ret ~= 0 then - return zmq_error() - end - return val[0] -end - -function sock_mt:bind(addr) - local ret = c_zmq.zmq_bind(self.sock, addr) - if ret ~= 0 then - return zmq_error() - end - return true -end - -function sock_mt:connect(addr) - local ret = c_zmq.zmq_connect(self.sock, addr) - if ret ~= 0 then - return zmq_error() - end - return true -end - -local tmp_msg = ffi.new('zmq_msg_t') -function sock_mt:send(data, flags) - local msg = tmp_msg - local msg_len = #data - -- initialize message - if c_zmq.zmq_msg_init_size(msg, msg_len) < 0 then - return zmq_error() - end - -- copy data into message. - ffi.copy(c_zmq.zmq_msg_data(msg), data, msg_len) - - -- send message - local ret = c_zmq.zmq_send(self.sock, msg, flags or 0) - -- close message before processing return code - c_zmq.zmq_msg_close(msg) - -- now process send return code - if ret ~= 0 then - return zmq_error() - end - return true -end - -function sock_mt:recv(flags) - local msg = tmp_msg - -- initialize blank message. - if c_zmq.zmq_msg_init(msg) < 0 then - return zmq_error() - end - - -- receive message - local ret = c_zmq.zmq_recv(self.sock, msg, flags or 0) - if ret ~= 0 then - local data, err = zmq_error() - c_zmq.zmq_msg_close(msg) - return data, err - end - local data = ffi.string(c_zmq.zmq_msg_data(msg), c_zmq.zmq_msg_size(msg)) - -- close message - c_zmq.zmq_msg_close(msg) - return data -end - --- --- ZMQ context --- -local ctx_mt = {} -ctx_mt.__index = ctx_mt - -function ctx_mt:term() - if c_zmq.zmq_term(self.ctx) ~= 0 then - return zmq_error() - end - return true -end - -function ctx_mt:socket(sock_type) - return new_socket(self.ctx, sock_type) -end - -function init(io_threads) - local ctx = c_zmq.zmq_init(io_threads) - if not ctx then - return zmq_error() - end - return setmetatable({ ctx = ctx }, ctx_mt) -end -