Fixed issue with GC of 0MQ context object. poller:poll() method now returns number of events handled.

pull/10/head
Robert G. Jakabosky 15 years ago
parent 0833549572
commit b66901fe3c

@ -27,14 +27,7 @@ typedef void * ZMQ_Ctx;
c_source [[ c_source [[
typedef void * ZMQ_Ctx; typedef void * ZMQ_Ctx;
]], ]],
destructor { destructor "term" {
c_source[[
if(${this_flags} & OBJ_UDATA_CTX_SHOULD_FREE) {
zmq_term(${this});
}
]]
},
method "term" {
c_method_call "ZMQ_Error" "zmq_term" {} c_method_call "ZMQ_Error" "zmq_term" {}
}, },
method "lightuserdata" { method "lightuserdata" {

@ -56,9 +56,10 @@ function poller_mt:poll(timeout)
local poller = self.poller local poller = self.poller
local status, err = poller:poll(timeout) local status, err = poller:poll(timeout)
if not status then if not status then
return false, err return nil, err
end end
local callbacks = self.callbacks local callbacks = self.callbacks
local count = 0
while true do while true do
local sock, revents = poller:next_revents() local sock, revents = poller:next_revents()
if not sock then if not sock then
@ -69,15 +70,20 @@ function poller_mt:poll(timeout)
error("Missing callback for sock:" .. tostring(sock)) error("Missing callback for sock:" .. tostring(sock))
end end
cb(sock, revents) cb(sock, revents)
count = count + 1
end end
return true return count
end end
function poller_mt:start() function poller_mt:start()
self.is_running = true self.is_running = true
while self.is_running do while self.is_running do
self:poll(-1) local status, err = self:poll(-1)
if not status then
return false, err
end
end end
return true
end end
function poller_mt:stop() function poller_mt:stop()

@ -1045,8 +1045,6 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
"}\n" "}\n"
"local C = ffi.load(os_lib_table[ffi.os] or \"zmq\")\n" "local C = ffi.load(os_lib_table[ffi.os] or \"zmq\")\n"
"\n" "\n"
"local OBJ_UDATA_CTX_SHOULD_FREE = (OBJ_UDATA_LAST_FLAG * 2)\n"
"\n"
"local get_zmq_strerror = ffi.new(\"get_zmq_strerror_func\", _priv[\"get_zmq_strerror\"])\n" "local get_zmq_strerror = ffi.new(\"get_zmq_strerror_func\", _priv[\"get_zmq_strerror\"])\n"
"\n" "\n"
"local C_get_zmq_strerror = get_zmq_strerror\n" "local C_get_zmq_strerror = get_zmq_strerror\n"
@ -1574,7 +1572,8 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
"-- Start \"ZMQ_Ctx\" FFI interface\n" "-- Start \"ZMQ_Ctx\" FFI interface\n"
"-- method: term\n" "-- method: term\n"
"function ZMQ_Ctx_meth.term(self)\n" "function ZMQ_Ctx_meth.term(self)\n"
" local this = obj_type_ZMQ_Ctx_check(self)\n" " local this,this_flags = obj_type_ZMQ_Ctx_delete(self)\n"
" if(band(this_flags,OBJ_UDATA_FLAG_OWN) == 0) then return end\n"
" local rc_zmq_term\n" " local rc_zmq_term\n"
" rc_zmq_term = C.zmq_term(this)\n" " rc_zmq_term = C.zmq_term(this)\n"
" -- check for error.\n" " -- check for error.\n"
@ -1623,7 +1622,6 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
"\n" "\n"
"-- method: init_ctx\n" "-- method: init_ctx\n"
"function zmq_pub.init_ctx(ptr)\n" "function zmq_pub.init_ctx(ptr)\n"
" local ctx_flags = OBJ_UDATA_FLAG_OWN\n"
" local ctx\n" " local ctx\n"
" local p_type = type(ptr)\n" " local p_type = type(ptr)\n"
" if p_type == 'userdata' then\n" " if p_type == 'userdata' then\n"
@ -1638,7 +1636,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
" if (nil == ctx) then\n" " if (nil == ctx) then\n"
" ctx_err = get_zmq_strerror()\n" " ctx_err = get_zmq_strerror()\n"
" else\n" " else\n"
" ctx = obj_type_ZMQ_Ctx_push(ctx, ctx_flags)\n" " ctx = obj_type_ZMQ_Ctx_push(ctx, 0)\n"
" end\n" " end\n"
" return ctx, ctx_err\n" " return ctx, ctx_err\n"
"end\n" "end\n"
@ -1914,8 +1912,6 @@ static int poller_poll(ZMQ_Poller *this, long timeout) {
typedef void * ZMQ_Ctx; typedef void * ZMQ_Ctx;
#define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1)
/* /*
* This wrapper function is to make the EAGAIN/ETERM error messages more like * This wrapper function is to make the EAGAIN/ETERM error messages more like
* what is returned by LuaSocket. * what is returned by LuaSocket.
@ -2678,22 +2674,12 @@ static int ZMQ_Poller__count__meth(lua_State *L) {
return 1; return 1;
} }
/* method: delete */
static int ZMQ_Ctx__delete__meth(lua_State *L) {
int this_flags = 0;
ZMQ_Ctx * this = obj_type_ZMQ_Ctx_delete(L,1,&(this_flags));
if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; }
if(this_flags & OBJ_UDATA_CTX_SHOULD_FREE) {
zmq_term(this);
}
return 0;
}
/* method: term */ /* method: term */
static int ZMQ_Ctx__term__meth(lua_State *L) { static int ZMQ_Ctx__term__meth(lua_State *L) {
ZMQ_Ctx * this = obj_type_ZMQ_Ctx_check(L,1); int this_flags = 0;
ZMQ_Ctx * this = obj_type_ZMQ_Ctx_delete(L,1,&(this_flags));
ZMQ_Error rc_zmq_term = 0; ZMQ_Error rc_zmq_term = 0;
if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; }
rc_zmq_term = zmq_term(this); rc_zmq_term = zmq_term(this);
/* check for error. */ /* check for error. */
if((-1 == rc_zmq_term)) { if((-1 == rc_zmq_term)) {
@ -2766,7 +2752,6 @@ static int zmq__init__func(lua_State *L) {
/* method: init_ctx */ /* method: init_ctx */
static int zmq__init_ctx__func(lua_State *L) { static int zmq__init_ctx__func(lua_State *L) {
int ctx_flags = OBJ_UDATA_FLAG_OWN;
ZMQ_Ctx ctx; ZMQ_Ctx ctx;
if(lua_isuserdata(L, 1)) { if(lua_isuserdata(L, 1)) {
ctx = lua_touserdata(L, 1); ctx = lua_touserdata(L, 1);
@ -2778,7 +2763,7 @@ static int zmq__init_ctx__func(lua_State *L) {
lua_pushnil(L); lua_pushnil(L);
lua_pushstring(L, get_zmq_strerror()); lua_pushstring(L, get_zmq_strerror());
} else { } else {
obj_type_ZMQ_Ctx_push(L, ctx, ctx_flags); obj_type_ZMQ_Ctx_push(L, ctx, 0);
} }
return 1; return 1;
} }
@ -2935,7 +2920,7 @@ static const luaL_reg obj_ZMQ_Ctx_methods[] = {
}; };
static const luaL_reg obj_ZMQ_Ctx_metas[] = { static const luaL_reg obj_ZMQ_Ctx_metas[] = {
{"__gc", ZMQ_Ctx__delete__meth}, {"__gc", ZMQ_Ctx__term__meth},
{"__tostring", obj_udata_default_tostring}, {"__tostring", obj_udata_default_tostring},
{"__eq", obj_udata_default_equal}, {"__eq", obj_udata_default_equal},
{NULL, NULL} {NULL, NULL}

@ -35,13 +35,6 @@ ffi_load {
Windows = "libzmq", -- lib name for on windows. Windows = "libzmq", -- lib name for on windows.
}, },
c_source[[
#define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1)
]],
ffi_source[[
local OBJ_UDATA_CTX_SHOULD_FREE = (OBJ_UDATA_LAST_FLAG * 2)
]],
c_source[[ c_source[[
/* /*
* This wrapper function is to make the EAGAIN/ETERM error messages more like * This wrapper function is to make the EAGAIN/ETERM error messages more like
@ -167,7 +160,7 @@ c_function "init" {
}, },
c_function "init_ctx" { c_function "init_ctx" {
var_in{ "<any>", "ptr" }, var_in{ "<any>", "ptr" },
var_out{ "ZMQ_Ctx", "!ctx" }, var_out{ "ZMQ_Ctx", "ctx" },
c_source[[ c_source[[
if(lua_isuserdata(L, ${ptr::idx})) { if(lua_isuserdata(L, ${ptr::idx})) {
${ctx} = lua_touserdata(L, ${ptr::idx}); ${ctx} = lua_touserdata(L, ${ptr::idx});

Loading…
Cancel
Save