From bd303988613929f6faeeac3cebd29ed2bfa47455 Mon Sep 17 00:00:00 2001 From: "Robert G. Jakabosky" Date: Wed, 23 Mar 2011 05:27:36 -0700 Subject: [PATCH] Added zmq.poller object that wraps zmq_poll() function. --- rockspecs/lua-zmq-scm-1.rockspec | 29 +- src/error.nobj.lua | 8 +- src/poller.lua | 76 ++++ src/poller.nobj.lua | 386 ++++++++++++++++++++ src/pre_generated-zmq.nobj.c | 594 ++++++++++++++++++++++++++++--- zmq.nobj.lua | 1 + 6 files changed, 1037 insertions(+), 57 deletions(-) create mode 100644 src/poller.lua create mode 100644 src/poller.nobj.lua diff --git a/rockspecs/lua-zmq-scm-1.rockspec b/rockspecs/lua-zmq-scm-1.rockspec index 91a2a5c..4e77ea3 100644 --- a/rockspecs/lua-zmq-scm-1.rockspec +++ b/rockspecs/lua-zmq-scm-1.rockspec @@ -1,22 +1,27 @@ package = "lua-zmq" version = "scm-1" source = { - url = "git://github.com/Neopallium/lua-zmq.git" + url = "git://github.com/Neopallium/lua-zmq.git" } description = { - summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.", - homepage = "http://github.com/Neopallium/lua-zmq", - license = "MIT/X11" + summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.", + homepage = "http://github.com/Neopallium/lua-zmq", + license = "MIT/X11" } dependencies = { - "lua >= 5.1" + "lua >= 5.1" } build = { - type = "builtin", - modules = { - zmq = { - sources = {"src/pre_generated-zmq.nobj.c"}, - libraries = {"zmq"} - } - } + type = "builtin", + modules = { + zmq = { + sources = {"src/pre_generated-zmq.nobj.c"}, + libraries = {"zmq"} + } + } + install = { + lua = { + ['zmq.poller'] = "src/poller.lua", + } + } } diff --git a/src/error.nobj.lua b/src/error.nobj.lua index 4f143b5..6c8da6a 100644 --- a/src/error.nobj.lua +++ b/src/error.nobj.lua @@ -27,16 +27,16 @@ error_code "ZMQ_Error" "int" { ffi_cdef[[ typedef int ZMQ_Error; ]], - is_error_check = function(rec) return "(0 != ${" .. rec.name .. "})" end, - ffi_is_error_check = function(rec) return "(0 ~= ${" .. rec.name .. "})" end, + is_error_check = function(rec) return "(-1 == ${" .. rec.name .. "})" end, + ffi_is_error_check = function(rec) return "(-1 == ${" .. rec.name .. "})" end, default = "0", c_source [[ - if(err != 0) { + if(-1 == err) { err_str = get_zmq_strerror(); } ]], ffi_source [[ - if(0 ~= err) then + if(-1 == err) then err_str = get_zmq_strerror(); end ]], diff --git a/src/poller.lua b/src/poller.lua new file mode 100644 index 0000000..4e83b85 --- /dev/null +++ b/src/poller.lua @@ -0,0 +1,76 @@ +-- +-- zmq.poller wraps the low-level zmq.zmq_poller object. +-- +-- This wrapper simplifies the event polling loop. +-- + +local zmq = require"zmq" + +local setmetatable = setmetatable +local tonumber = tonumber +local assert = assert + +local poller_mt = {} +poller_mt.__index = poller_mt + +function poller_mt:add(sock, events, cb) + self.poller:add(sock, events) + self.callbacks[sock] = cb +end + +function poller_mt:modify(sock, events, cb) + if events ~= 0 and cb then + self.callbacks[sock] = cb + self.poller:modify(sock, events) + else + self:remove(sock) + end +end + +function poller_mt:remove(sock) + self.poller:remove(sock) + self.callbacks[sock] = nil +end + +function poller_mt:poll(timeout) + local poller = self.poller + local status, err = poller:poll(-1) + if not status then + return false, err + end + local callbacks = self.callbacks + while true do + local sock, revents = poller:next_revents() + if not sock then + break + end + local cb = callbacks[sock] + if cb then + cb(sock, revents) + end + end + return true +end + +function poller_mt:start() + self.is_running = true + while self.is_running do + self:poll(-1) + end +end + +function poller_mt:stop() + self.is_running = false +end + +module(...) + +function new(pre_alloc) + return setmetatable({ + poller = zmq.zmq_poller(pre_alloc), + callbacks = setmetatable({}, {__mode="k"}), + }, poller_mt) +end + +setmetatable(_M, {__call = function(tab, ...) return new(...) end}) + diff --git a/src/poller.nobj.lua b/src/poller.nobj.lua new file mode 100644 index 0000000..0555a0f --- /dev/null +++ b/src/poller.nobj.lua @@ -0,0 +1,386 @@ +-- Copyright (c) 2010 by Robert G. Jakabosky +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +local zmq_poller_type = [[ +typedef struct zmq_poller { + zmq_pollitem_t *items; + int next; + int count; + int free_list; + int len; +} zmq_poller; +]] + +object "zmq_poller" { + -- store the `zmq_poller` structure in Lua userdata object + userdata_type = "embed", + c_source(zmq_poller_type), + c_source[[ +#define FREE_ITEM_EVENTS_TAG 0xFFFF + +#define ITEM_TO_INDEX(items, item) (item - (items)) + +static int poller_resize_items(zmq_poller *this, int len) { + int old_len = this->len; + + /* make sure new length is atleast as large as items count. */ + len = (this->count <= len) ? len : this->count; + + /* if the new length is the same as the old length, then don't try to resize. */ + if(old_len == len) return len; + + this->items = (zmq_pollitem_t *)realloc(this->items, len * sizeof(zmq_pollitem_t)); + this->len = len; + if(len > old_len) { + /* clear new space. */ + memset(&(this->items[old_len]), 0, (old_len - len) * sizeof(zmq_pollitem_t)); + } + return len; +} + +static int poller_find_sock_item(zmq_poller *this, ZMQ_Socket *sock) { + zmq_pollitem_t *items; + int count; + int n; + + /* find ZMQ_Socket */ + items = this->items; + count = this->count; + for(n=0; n < count; n++) { + if(items[n].socket == sock) return n; + } + /* not found. */ + return -1; +} + +static int poller_find_fd_item(zmq_poller *this, socket_t fd) { + zmq_pollitem_t *items; + int count; + int n; + + /* find fd */ + items = this->items; + count = this->count; + for(n=0; n < count; n++) { + if(items[n].fd == fd) return n; + } + /* not found. */ + return -1; +} + +static void poller_remove_item(zmq_poller *this, int idx) { + zmq_pollitem_t *items; + int free_list; + int count; + + count = this->count; + /* no item to remove. */ + if(idx >= count || count == 0) return; + + items = this->items; + free_list = this->free_list; + + /* link new free slot to head of free list. */ + if(free_list >= 0 && free_list < count) { + /* use socket pointer for free list's 'next' field. */ + items[idx].socket = &(items[free_list]); + } else { + /* free list is empty mark this slot as the end. */ + items[idx].socket = NULL; + } + this->free_list = idx; + /* mark this slot as a free slot. */ + items[idx].events = FREE_ITEM_EVENTS_TAG; +} + +static int poller_get_free_item(zmq_poller *this) { + zmq_pollitem_t *curr; + zmq_pollitem_t *next; + int count; + int idx; + + count = this->count; + idx = this->free_list; + /* check for a free slot in the free list. */ + if(idx >= 0 && idx < count) { + /* remove free slot from free list. */ + curr = &(this->items[idx]); + /* valid free slot. */ + assert(curr->events == FREE_ITEM_EVENTS_TAG); + /* is this the last free slot? */ + next = ((zmq_pollitem_t *)curr->socket); + if(next != NULL) { + /* set next free slot as head of free list. */ + this->free_list = ITEM_TO_INDEX(this->items, next); + } else { + /* free list is empty now. */ + this->free_list = -1; + } + /* clear slot */ + memset(curr, 0, sizeof(zmq_pollitem_t)); + return idx; + } + + idx = count; + this->count = ++count; + /* make room for new item. */ + if(count >= this->len) { + poller_resize_items(this, this->len + 10); + } + return idx; +} + +static int poller_compact_items(zmq_poller *this) { + zmq_pollitem_t *items; + int count; + int old_count; + int next; + + count = this->count; + /* if no free slot, then return. */ + if(this->free_list < 0) return count; + old_count = count; + + items = this->items; + next = 0; + /* find first free slot. */ + while(items[next].events != FREE_ITEM_EVENTS_TAG) { + assert(next <= old_count); + ++next; + } + + /* move non-free slots into free slot. */ + count = next; + ++next; + while(next <= old_count) { + if(items[next].events != FREE_ITEM_EVENTS_TAG) { + /* found non-free slot, move it to the current free slot. */ + items[count] = items[next]; + ++count; + } + ++next; + } + + /* clear old used-space */ + memset(&(items[count]), 0, ((old_count - count) * sizeof(zmq_pollitem_t))); + this->count = count; + this->free_list = -1; /* free list is now empty. */ + + return count; +} + +static int poller_poll(zmq_poller *this, long timeout) { + int count; + /* remove free slots from items list. */ + count = poller_compact_items(this); + /* poll for events. */ + return zmq_poll(this->items, count, timeout); +} + +]], + ffi_export_function "int" "poller_find_sock_item" "(zmq_poller *this, ZMQ_Socket *sock)", + ffi_export_function "int" "poller_find_fd_item" "(zmq_poller *this, socket_t fd)", + ffi_export_function "int" "poller_get_free_item" "(zmq_poller *this)", + ffi_export_function "int" "poller_poll" "(zmq_poller *this, long timeout)", + ffi_export_function "void" "poller_remove_item" "(zmq_poller *this, int idx)", +-- +-- Define zmq_poller type & function API for FFI +-- + ffi_cdef[[ +typedef struct zmq_pollitem_t { + ZMQ_Socket socket; + socket_t fd; + short events; + short revents; +} zmq_pollitem_t; + +int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout); +]], + ffi_cdef(zmq_poller_type), + + constructor "new" { + var_in{ "unsigned int", "length", is_optional = true, default = 10 }, + c_source[[ + zmq_poller poller; + ${this} = &poller; + ${this}->items = (zmq_pollitem_t *)calloc(${length}, sizeof(zmq_pollitem_t)); + ${this}->next = -1; + ${this}->count = 0; + ${this}->len = ${length}; + ${this}->free_list = -1; +]], + }, + destructor "close" { + c_source[[ + free(${this}->items); + ${this}->items = NULL; + ${this}->next = -1; + ${this}->count = 0; + ${this}->len = 0; + ${this}->free_list = -1; +]], + }, + method "add" { + var_in{ "", "sock" }, + var_in{ "short", "events" }, + var_out{ "int", "idx" }, + c_source "pre" [[ + zmq_pollitem_t *item; + ZMQ_Socket *sock = NULL; + socket_t fd = 0; +]], + c_source[[ + if(lua_isuserdata(L, ${sock::idx})) { + sock = obj_type_ZMQ_Socket_check(L, ${sock::idx}); + } else if(lua_isnumber(L, ${sock::idx})) { + fd = lua_tonumber(L, ${sock::idx}); + } else { + return luaL_typerror(L, ${sock::idx}, "number or ZMQ_Socket"); + } + ${idx} = poller_get_free_item(${this}); + item = &(${this}->items[${idx}]); + item->socket = sock; + item->fd = fd; + item->events = ${events}; +]], + }, + method "modify" { + var_in{ "", "sock" }, + var_in{ "short", "events" }, + var_out{ "int", "idx" }, + c_source "pre" [[ + zmq_pollitem_t *item; + ZMQ_Socket *sock = NULL; + socket_t fd = 0; +]], + c_source[[ + if(lua_isuserdata(L, ${sock::idx})) { + sock = obj_type_ZMQ_Socket_check(L, ${sock::idx}); + /* find sock in items list. */ + ${idx} = poller_find_sock_item(${this}, sock); + } else if(lua_isnumber(L, ${sock::idx})) { + fd = lua_tonumber(L, ${sock::idx}); + /* find fd in items list. */ + ${idx} = poller_find_fd_item(${this}, fd); + } else { + return luaL_typerror(L, ${sock::idx}, "number or ZMQ_Socket"); + } + if(${events} != 0) { + /* add/modify. */ + if(${idx} < 0) { + ${idx} = poller_get_free_item(${this}); + } + item = &(${this}->items[${idx}]); + item->socket = sock; + item->fd = fd; + item->events = ${events}; + } else if(${idx} >= 0) { + /* no events remove socket/fd. */ + poller_remove_item(${this}, ${idx}); + } +]], + }, + method "remove" { + var_in{ "", "sock" }, + c_source "pre" [[ + ZMQ_Socket *sock; + socket_t fd; + int idx; +]], + c_source[[ + /* ZMQ_Socket or fd */ + if(lua_isuserdata(L, ${sock::idx})) { + sock = obj_type_ZMQ_Socket_check(L, ${sock::idx}); + /* find sock in items list. */ + idx = poller_find_sock_item(${this}, sock); + } else if(lua_isnumber(L, ${sock::idx})) { + fd = lua_tonumber(L, ${sock::idx}); + /* find fd in items list. */ + idx = poller_find_fd_item(${this}, fd); + } else { + return luaL_typerror(L, ${sock::idx}, "number or ZMQ_Socket"); + } + /* if sock/fd was found. */ + if(idx >= 0) { + poller_remove_item(${this}, idx); + } +]], + }, + method "poll" { + var_in{ "long", "timeout" }, + var_out{ "ZMQ_Error", "err" }, + c_source[[ + /* poll for events */ + ${err} = poller_poll(${this}, ${timeout}); + if(${err} > 0) { + ${this}->next = 0; + } else { + ${this}->next = -1; + } +]], + }, + method "next_revents" { + var_out{ "", "sock" }, + var_out{ "short", "revents" }, + c_source "pre" [[ + zmq_pollitem_t *items; + int count; + int idx; +]], + c_source[[ + ${revents} = -1; + idx = ${this}->next; + if(idx >= 0) { + count = ${this}->count; + items = ${this}->items; + /* find next item with pending events. */ + while(idx < count && items[idx].revents == 0) ++idx; + /* did we find a pending event? */ + if(idx < count) { + /* push the event's sock/fd. */ + if(items[idx].socket != NULL) { + obj_type_ZMQ_Socket_push(L, items[idx].socket, 0); + } else { + lua_pushnumber(L, items[idx].fd); + } + ${revents} = items[idx].revents; + /* is this the last event. */ + ++idx; + ${this}->next = (idx < count) ? idx : -1; + } + } + if(${revents} < 0) { + /* no more pending events. */ + lua_pushnil(L); + ${this}->next = -1; + } +]], + }, + method "count" { + var_out{ "int", "count" }, + c_source[[ + ${count} = ${this}->count; +]], + ffi_source[[ + ${count} = ${this}.count; +]], + }, +} + diff --git a/src/pre_generated-zmq.nobj.c b/src/pre_generated-zmq.nobj.c index 9260e98..a63735b 100644 --- a/src/pre_generated-zmq.nobj.c +++ b/src/pre_generated-zmq.nobj.c @@ -169,7 +169,15 @@ typedef struct ffi_export_symbol { #define obj_type_ZMQ_Socket_push(L, obj, flags) \ obj_udata_luapush_weak(L, (void *)obj, &(obj_type_ZMQ_Socket), flags) -#define obj_type_id_ZMQ_Ctx 2 +#define obj_type_id_zmq_poller 2 +#define obj_type_zmq_poller_check(L, _index) \ + (zmq_poller *)obj_simple_udata_luacheck(L, _index, &(obj_type_zmq_poller)) +#define obj_type_zmq_poller_delete(L, _index, flags) \ + (zmq_poller *)obj_simple_udata_luadelete(L, _index, &(obj_type_zmq_poller), flags) +#define obj_type_zmq_poller_push(L, obj, flags) \ + obj_simple_udata_luapush(L, obj, sizeof(zmq_poller), &(obj_type_zmq_poller)) + +#define obj_type_id_ZMQ_Ctx 3 #define obj_type_ZMQ_Ctx_check(L, _index) \ obj_udata_luacheck(L, _index, &(obj_type_ZMQ_Ctx)) #define obj_type_ZMQ_Ctx_delete(L, _index, flags) \ @@ -186,7 +194,8 @@ static void error_code__ZMQ_Error__push(lua_State *L, ZMQ_Error err); static obj_type obj_type_zmq_msg_t = { NULL, 0, OBJ_TYPE_SIMPLE, "zmq_msg_t" }; static obj_type obj_type_ZMQ_Socket = { NULL, 1, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Socket" }; -static obj_type obj_type_ZMQ_Ctx = { NULL, 2, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Ctx" }; +static obj_type obj_type_zmq_poller = { NULL, 2, OBJ_TYPE_SIMPLE, "zmq_poller" }; +static obj_type obj_type_ZMQ_Ctx = { NULL, 3, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Ctx" }; #ifndef REG_PACKAGE_IS_CONSTRUCTOR @@ -866,6 +875,33 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "\n" "ZMQ_Error zmq_recv(ZMQ_Socket * this, zmq_msg_t * msg, int flags);\n" "\n" +"typedef int (*poller_find_sock_item_func)(zmq_poller *this, ZMQ_Socket *sock);\n" +"\n" +"typedef int (*poller_find_fd_item_func)(zmq_poller *this, socket_t fd);\n" +"\n" +"typedef int (*poller_get_free_item_func)(zmq_poller *this);\n" +"\n" +"typedef int (*poller_poll_func)(zmq_poller *this, long timeout);\n" +"\n" +"typedef void (*poller_remove_item_func)(zmq_poller *this, int idx);\n" +"\n" +"typedef struct zmq_pollitem_t {\n" +" ZMQ_Socket socket;\n" +" socket_t fd;\n" +" short events;\n" +" short revents;\n" +"} zmq_pollitem_t;\n" +"\n" +"int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout);\n" +"\n" +"typedef struct zmq_poller {\n" +" zmq_pollitem_t *items;\n" +" int next;\n" +" int count;\n" +" int free_list;\n" +" int len;\n" +"} zmq_poller;\n" +"\n" "typedef void * ZMQ_Ctx;\n" "\n" "ZMQ_Error zmq_term(ZMQ_Ctx * this);\n" @@ -938,6 +974,36 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "end\n" "\n" "\n" +"local zmq_poller_pub = _M[\"zmq_poller\"]\n" +"local zmq_poller_mt = _priv[\"zmq_poller\"]\n" +"local zmq_poller_type = obj_type_ptr(zmq_poller_mt[\".type\"])\n" +"local zmq_poller_meth = zmq_poller_mt.__index\n" +"local zmq_poller_objects = setmetatable({}, { __mode = \"k\" })\n" +"\n" +"local function obj_type_zmq_poller_check(ud_obj)\n" +" local c_obj = zmq_poller_objects[ud_obj]\n" +" if c_obj == nil then\n" +" -- cdata object not in cache\n" +" c_obj = obj_simple_udata_luacheck(ud_obj, zmq_poller_mt)\n" +" c_obj = ffi.cast(\"zmq_poller *\", c_obj) -- cast from 'void *'\n" +" zmq_poller_objects[ud_obj] = c_obj\n" +" end\n" +" return c_obj\n" +"end\n" +"\n" +"local function obj_type_zmq_poller_delete(ud_obj)\n" +" zmq_poller_objects[ud_obj] = nil\n" +" return obj_simple_udata_luadelete(ud_obj, zmq_poller_mt)\n" +"end\n" +"\n" +"local zmq_poller_sizeof = ffi.sizeof\"zmq_poller\"\n" +"local function obj_type_zmq_poller_push(c_obj)\n" +" local ud_obj, cdata = obj_simple_udata_luapush(c_obj, zmq_poller_sizeof, zmq_poller_mt)\n" +" zmq_poller_objects[ud_obj] = cdata\n" +" return ud_obj\n" +"end\n" +"\n" +"\n" "local ZMQ_Ctx_pub = _M[\"ZMQ_Ctx\"]\n" "local ZMQ_Ctx_mt = _priv[\"ZMQ_Ctx\"]\n" "local ZMQ_Ctx_type = obj_type_ptr(ZMQ_Ctx_mt[\".type\"])\n" @@ -990,7 +1056,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "\n" "local function error_code__ZMQ_Error__push(err)\n" " local err_str\n" -" if(0 ~= err) then\n" +" if(-1 == err) then\n" " err_str = get_zmq_strerror();\n" " end\n" "\n" @@ -1007,7 +1073,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_msg_close = C.zmq_msg_close(this)\n" " -- check for error.\n" " local rc_zmq_msg_close_err\n" -" if (0 ~= rc_zmq_msg_close) then\n" +" if (-1 == rc_zmq_msg_close) then\n" " rc_zmq_msg_close = false\n" " rc_zmq_msg_close_err = error_code__ZMQ_Error__push(rc_zmq_msg_close)\n" " else\n" @@ -1023,7 +1089,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_msg_close = C.zmq_msg_close(this)\n" " -- check for error.\n" " local rc_zmq_msg_close_err\n" -" if (0 ~= rc_zmq_msg_close) then\n" +" if (-1 == rc_zmq_msg_close) then\n" " rc_zmq_msg_close = false\n" " rc_zmq_msg_close_err = error_code__ZMQ_Error__push(rc_zmq_msg_close)\n" " else\n" @@ -1040,7 +1106,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_msg_move = C.zmq_msg_move(this, src)\n" " -- check for error.\n" " local rc_zmq_msg_move_err\n" -" if (0 ~= rc_zmq_msg_move) then\n" +" if (-1 == rc_zmq_msg_move) then\n" " rc_zmq_msg_move = false\n" " rc_zmq_msg_move_err = error_code__ZMQ_Error__push(rc_zmq_msg_move)\n" " else\n" @@ -1057,7 +1123,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_msg_copy = C.zmq_msg_copy(this, src)\n" " -- check for error.\n" " local rc_zmq_msg_copy_err\n" -" if (0 ~= rc_zmq_msg_copy) then\n" +" if (-1 == rc_zmq_msg_copy) then\n" " rc_zmq_msg_copy = false\n" " rc_zmq_msg_copy_err = error_code__ZMQ_Error__push(rc_zmq_msg_copy)\n" " else\n" @@ -1085,7 +1151,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "\n" " -- check for error.\n" " local err_err\n" -" if (0 ~= err) then\n" +" if (-1 == err) then\n" " err = false\n" " err_err = error_code__ZMQ_Error__push(err)\n" " else\n" @@ -1120,7 +1186,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "\n" " -- check for error.\n" " local err_err\n" -" if (0 ~= err) then\n" +" if (-1 == err) then\n" " err = false\n" " err_err = error_code__ZMQ_Error__push(err)\n" " else\n" @@ -1162,7 +1228,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_close = C.zmq_close(this)\n" " -- check for error.\n" " local rc_zmq_close_err\n" -" if (0 ~= rc_zmq_close) then\n" +" if (-1 == rc_zmq_close) then\n" " rc_zmq_close = false\n" " rc_zmq_close_err = error_code__ZMQ_Error__push(rc_zmq_close)\n" " else\n" @@ -1179,7 +1245,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_bind = C.zmq_bind(this, addr)\n" " -- check for error.\n" " local rc_zmq_bind_err\n" -" if (0 ~= rc_zmq_bind) then\n" +" if (-1 == rc_zmq_bind) then\n" " rc_zmq_bind = false\n" " rc_zmq_bind_err = error_code__ZMQ_Error__push(rc_zmq_bind)\n" " else\n" @@ -1196,7 +1262,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_connect = C.zmq_connect(this, addr)\n" " -- check for error.\n" " local rc_zmq_connect_err\n" -" if (0 ~= rc_zmq_connect) then\n" +" if (-1 == rc_zmq_connect) then\n" " rc_zmq_connect = false\n" " rc_zmq_connect_err = error_code__ZMQ_Error__push(rc_zmq_connect)\n" " else\n" @@ -1255,7 +1321,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "\n" " -- check for error.\n" " local err_err\n" -" if (0 ~= err) then\n" +" if (-1 == err) then\n" " err = false\n" " err_err = error_code__ZMQ_Error__push(err)\n" " else\n" @@ -1312,7 +1378,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " err = C.zmq_getsockopt(this, ZMQ_EVENTS, events_tmp, events_tmp_len);\n" " events = events_tmp[0]\n" "\n" -" if not (0 ~= err) then\n" +" if not (-1 == err) then\n" " events = events\n" " else\n" " events = nil\n" @@ -1330,7 +1396,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_send = C.zmq_send(this, msg, flags)\n" " -- check for error.\n" " local rc_zmq_send_err\n" -" if (0 ~= rc_zmq_send) then\n" +" if (-1 == rc_zmq_send) then\n" " rc_zmq_send = false\n" " rc_zmq_send_err = error_code__ZMQ_Error__push(rc_zmq_send)\n" " else\n" @@ -1351,7 +1417,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "\n" " -- check for error.\n" " local err_err\n" -" if (0 ~= err) then\n" +" if (-1 == err) then\n" " err = false\n" " err_err = error_code__ZMQ_Error__push(err)\n" " else\n" @@ -1369,7 +1435,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_recv = C.zmq_recv(this, msg, flags)\n" " -- check for error.\n" " local rc_zmq_recv_err\n" -" if (0 ~= rc_zmq_recv) then\n" +" if (-1 == rc_zmq_recv) then\n" " rc_zmq_recv = false\n" " rc_zmq_recv_err = error_code__ZMQ_Error__push(rc_zmq_recv)\n" " else\n" @@ -1402,7 +1468,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " return data\n" " end\n" "\n" -" if not (0 ~= err) then\n" +" if not (-1 == err) then\n" " data = ((nil ~= data) and ffi.string(data,data_len))\n" " else\n" " data = nil\n" @@ -1417,6 +1483,30 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "-- End \"ZMQ_Socket\" FFI interface\n" "\n" "\n" +"-- Start \"zmq_poller\" FFI interface\n" +"local poller_find_sock_item = ffi.new(\"poller_find_sock_item_func\", _priv[\"poller_find_sock_item\"])\n" +"\n" +"local poller_find_fd_item = ffi.new(\"poller_find_fd_item_func\", _priv[\"poller_find_fd_item\"])\n" +"\n" +"local poller_get_free_item = ffi.new(\"poller_get_free_item_func\", _priv[\"poller_get_free_item\"])\n" +"\n" +"local poller_poll = ffi.new(\"poller_poll_func\", _priv[\"poller_poll\"])\n" +"\n" +"local poller_remove_item = ffi.new(\"poller_remove_item_func\", _priv[\"poller_remove_item\"])\n" +"\n" +"-- method: count\n" +"function zmq_poller_meth.count(self)\n" +" local this = obj_type_zmq_poller_check(self)\n" +" local count\n" +" count = this.count;\n" +"\n" +" count = count\n" +" return count\n" +"end\n" +"\n" +"-- End \"zmq_poller\" FFI interface\n" +"\n" +"\n" "-- Start \"ZMQ_Ctx\" FFI interface\n" "-- method: term\n" "function ZMQ_Ctx_meth.term(self)\n" @@ -1425,7 +1515,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_term = C.zmq_term(this)\n" " -- check for error.\n" " local rc_zmq_term_err\n" -" if (0 ~= rc_zmq_term) then\n" +" if (-1 == rc_zmq_term) then\n" " rc_zmq_term = false\n" " rc_zmq_term_err = error_code__ZMQ_Error__push(rc_zmq_term)\n" " else\n" @@ -1498,7 +1588,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " rc_zmq_device = C.zmq_device(device, insock, outsock)\n" " -- check for error.\n" " local rc_zmq_device_err\n" -" if (0 ~= rc_zmq_device) then\n" +" if (-1 == rc_zmq_device) then\n" " rc_zmq_device = false\n" " rc_zmq_device_err = error_code__ZMQ_Error__push(rc_zmq_device)\n" " else\n" @@ -1586,6 +1676,176 @@ static ZMQ_Error simple_zmq_send(ZMQ_Socket sock, const char *data, size_t data_ return err; } +typedef struct zmq_poller { + zmq_pollitem_t *items; + int next; + int count; + int free_list; + int len; +} zmq_poller; + +#define FREE_ITEM_EVENTS_TAG 0xFFFF + +#define ITEM_TO_INDEX(items, item) (item - (items)) + +static int poller_resize_items(zmq_poller *this, int len) { + int old_len = this->len; + + /* make sure new length is atleast as large as items count. */ + len = (this->count <= len) ? len : this->count; + + /* if the new length is the same as the old length, then don't try to resize. */ + if(old_len == len) return len; + + this->items = (zmq_pollitem_t *)realloc(this->items, len * sizeof(zmq_pollitem_t)); + this->len = len; + if(len > old_len) { + /* clear new space. */ + memset(&(this->items[old_len]), 0, (old_len - len) * sizeof(zmq_pollitem_t)); + } + return len; +} + +static int poller_find_sock_item(zmq_poller *this, ZMQ_Socket *sock) { + zmq_pollitem_t *items; + int count; + int n; + + /* find ZMQ_Socket */ + items = this->items; + count = this->count; + for(n=0; n < count; n++) { + if(items[n].socket == sock) return n; + } + /* not found. */ + return -1; +} + +static int poller_find_fd_item(zmq_poller *this, socket_t fd) { + zmq_pollitem_t *items; + int count; + int n; + + /* find fd */ + items = this->items; + count = this->count; + for(n=0; n < count; n++) { + if(items[n].fd == fd) return n; + } + /* not found. */ + return -1; +} + +static void poller_remove_item(zmq_poller *this, int idx) { + zmq_pollitem_t *items; + int free_list; + int count; + + count = this->count; + /* no item to remove. */ + if(idx >= count || count == 0) return; + + items = this->items; + free_list = this->free_list; + + /* link new free slot to head of free list. */ + if(free_list >= 0 && free_list < count) { + /* use socket pointer for free list's 'next' field. */ + items[idx].socket = &(items[free_list]); + } else { + /* free list is empty mark this slot as the end. */ + items[idx].socket = NULL; + } + this->free_list = idx; + /* mark this slot as a free slot. */ + items[idx].events = FREE_ITEM_EVENTS_TAG; +} + +static int poller_get_free_item(zmq_poller *this) { + zmq_pollitem_t *curr; + zmq_pollitem_t *next; + int count; + int idx; + + count = this->count; + idx = this->free_list; + /* check for a free slot in the free list. */ + if(idx >= 0 && idx < count) { + /* remove free slot from free list. */ + curr = &(this->items[idx]); + /* valid free slot. */ + assert(curr->events == FREE_ITEM_EVENTS_TAG); + /* is this the last free slot? */ + next = ((zmq_pollitem_t *)curr->socket); + if(next != NULL) { + /* set next free slot as head of free list. */ + this->free_list = ITEM_TO_INDEX(this->items, next); + } else { + /* free list is empty now. */ + this->free_list = -1; + } + /* clear slot */ + memset(curr, 0, sizeof(zmq_pollitem_t)); + return idx; + } + + idx = count; + this->count = ++count; + /* make room for new item. */ + if(count >= this->len) { + poller_resize_items(this, this->len + 10); + } + return idx; +} + +static int poller_compact_items(zmq_poller *this) { + zmq_pollitem_t *items; + int count; + int old_count; + int next; + + count = this->count; + /* if no free slot, then return. */ + if(this->free_list < 0) return count; + old_count = count; + + items = this->items; + next = 0; + /* find first free slot. */ + while(items[next].events != FREE_ITEM_EVENTS_TAG) { + assert(next <= old_count); + ++next; + } + + /* move non-free slots into free slot. */ + count = next; + ++next; + while(next <= old_count) { + if(items[next].events != FREE_ITEM_EVENTS_TAG) { + /* found non-free slot, move it to the current free slot. */ + items[count] = items[next]; + ++count; + } + ++next; + } + + /* clear old used-space */ + memset(&(items[count]), 0, ((old_count - count) * sizeof(zmq_pollitem_t))); + this->count = count; + this->free_list = -1; /* free list is now empty. */ + + return count; +} + +static int poller_poll(zmq_poller *this, long timeout) { + int count; + /* remove free slots from items list. */ + count = poller_compact_items(this); + /* poll for events. */ + return zmq_poll(this->items, count, timeout); +} + + typedef void * ZMQ_Ctx; #define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1) @@ -1614,7 +1874,7 @@ static const char *get_zmq_strerror() { static void error_code__ZMQ_Error__push(lua_State *L, ZMQ_Error err) { const char *err_str = NULL; - if(err != 0) { + if(-1 == err) { err_str = get_zmq_strerror(); } @@ -1634,7 +1894,7 @@ static int zmq_msg_t__init__meth(lua_State *L) { this = &tmp; err = zmq_msg_init(this); - if(!(0 != err)) { + if(!(-1 == err)) { obj_type_zmq_msg_t_push(L, this, this_flags); } else { lua_pushnil(L); @@ -1653,7 +1913,7 @@ static int zmq_msg_t__init_size__meth(lua_State *L) { this = &tmp; err = zmq_msg_init_size(this, size); - if(!(0 != err)) { + if(!(-1 == err)) { obj_type_zmq_msg_t_push(L, this, this_flags); } else { lua_pushnil(L); @@ -1677,7 +1937,7 @@ static int zmq_msg_t__init_data__meth(lua_State *L) { memcpy(zmq_msg_data(this), data, data_len); } - if(!(0 != err)) { + if(!(-1 == err)) { obj_type_zmq_msg_t_push(L, this, this_flags); } else { lua_pushnil(L); @@ -1694,11 +1954,12 @@ static int zmq_msg_t__delete__meth(lua_State *L) { if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; } rc_zmq_msg_close = zmq_msg_close(this); /* check for error. */ - if((0 != rc_zmq_msg_close)) { + if((-1 == rc_zmq_msg_close)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_msg_close); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1709,11 +1970,12 @@ static int zmq_msg_t__close__meth(lua_State *L) { ZMQ_Error rc_zmq_msg_close = 0; rc_zmq_msg_close = zmq_msg_close(this); /* check for error. */ - if((0 != rc_zmq_msg_close)) { + if((-1 == rc_zmq_msg_close)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_msg_close); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1725,11 +1987,12 @@ static int zmq_msg_t__move__meth(lua_State *L) { ZMQ_Error rc_zmq_msg_move = 0; rc_zmq_msg_move = zmq_msg_move(this, src); /* check for error. */ - if((0 != rc_zmq_msg_move)) { + if((-1 == rc_zmq_msg_move)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_msg_move); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1741,11 +2004,12 @@ static int zmq_msg_t__copy__meth(lua_State *L) { ZMQ_Error rc_zmq_msg_copy = 0; rc_zmq_msg_copy = zmq_msg_copy(this, src); /* check for error. */ - if((0 != rc_zmq_msg_copy)) { + if((-1 == rc_zmq_msg_copy)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_msg_copy); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1769,11 +2033,12 @@ static int zmq_msg_t__set_data__meth(lua_State *L) { memcpy(zmq_msg_data(this), data, data_len); /* check for error. */ - if((0 != err)) { + if((-1 == err)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, err); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1803,11 +2068,12 @@ static int zmq_msg_t__set_size__meth(lua_State *L) { } /* check for error. */ - if((0 != err)) { + if((-1 == err)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, err); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1841,11 +2107,12 @@ static int ZMQ_Socket__close__meth(lua_State *L) { if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; } rc_zmq_close = zmq_close(this); /* check for error. */ - if((0 != rc_zmq_close)) { + if((-1 == rc_zmq_close)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_close); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1858,11 +2125,12 @@ static int ZMQ_Socket__bind__meth(lua_State *L) { ZMQ_Error rc_zmq_bind = 0; rc_zmq_bind = zmq_bind(this, addr); /* check for error. */ - if((0 != rc_zmq_bind)) { + if((-1 == rc_zmq_bind)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_bind); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1875,11 +2143,12 @@ static int ZMQ_Socket__connect__meth(lua_State *L) { ZMQ_Error rc_zmq_connect = 0; rc_zmq_connect = zmq_connect(this, addr); /* check for error. */ - if((0 != rc_zmq_connect)) { + if((-1 == rc_zmq_connect)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_connect); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -1943,11 +2212,12 @@ static int ZMQ_Socket__setopt__meth(lua_State *L) { err = zmq_setsockopt(this, opt, val, val_len); /* check for error. */ - if((0 != err)) { + if((-1 == err)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, err); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -2050,7 +2320,7 @@ static int ZMQ_Socket__events__meth(lua_State *L) { luaL_error(L, "'events' method only supported in 0MQ version >= 2.1"); #endif - if(!(0 != err)) { + if(!(-1 == err)) { lua_pushinteger(L, events); } else { lua_pushnil(L); @@ -2067,11 +2337,12 @@ static int ZMQ_Socket__send_msg__meth(lua_State *L) { ZMQ_Error rc_zmq_send = 0; rc_zmq_send = zmq_send(this, msg, flags); /* check for error. */ - if((0 != rc_zmq_send)) { + if((-1 == rc_zmq_send)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_send); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -2086,11 +2357,12 @@ static int ZMQ_Socket__send__meth(lua_State *L) { err = simple_zmq_send(this, data, data_len, flags); /* check for error. */ - if((0 != err)) { + if((-1 == err)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, err); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -2103,11 +2375,12 @@ static int ZMQ_Socket__recv_msg__meth(lua_State *L) { ZMQ_Error rc_zmq_recv = 0; rc_zmq_recv = zmq_recv(this, msg, flags); /* check for error. */ - if((0 != rc_zmq_recv)) { + if((-1 == rc_zmq_recv)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_recv); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -2131,7 +2404,7 @@ static int ZMQ_Socket__recv__meth(lua_State *L) { } } - if(!(0 != err)) { + if(!(-1 == err)) { if(data == NULL) lua_pushnil(L); else lua_pushlstring(L, data,data_len); } else { lua_pushnil(L); @@ -2143,6 +2416,202 @@ static int ZMQ_Socket__recv__meth(lua_State *L) { return 2; } +/* method: new */ +static int zmq_poller__new__meth(lua_State *L) { + unsigned int length = luaL_optinteger(L,1,10); + int this_flags = OBJ_UDATA_FLAG_OWN; + zmq_poller * this; + zmq_poller poller; + this = &poller; + this->items = (zmq_pollitem_t *)calloc(length, sizeof(zmq_pollitem_t)); + this->next = -1; + this->count = 0; + this->len = length; + this->free_list = -1; + + obj_type_zmq_poller_push(L, this, this_flags); + return 1; +} + +/* method: close */ +static int zmq_poller__close__meth(lua_State *L) { + int this_flags = 0; + zmq_poller * this = obj_type_zmq_poller_delete(L,1,&(this_flags)); + if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; } + free(this->items); + this->items = NULL; + this->next = -1; + this->count = 0; + this->len = 0; + this->free_list = -1; + + return 0; +} + +/* method: add */ +static int zmq_poller__add__meth(lua_State *L) { + zmq_poller * this = obj_type_zmq_poller_check(L,1); + short events = luaL_checkinteger(L,3); + int idx = 0; + zmq_pollitem_t *item; + ZMQ_Socket *sock = NULL; + socket_t fd = 0; + + if(lua_isuserdata(L, 2)) { + sock = obj_type_ZMQ_Socket_check(L, 2); + } else if(lua_isnumber(L, 2)) { + fd = lua_tonumber(L, 2); + } else { + return luaL_typerror(L, 2, "number or ZMQ_Socket"); + } + idx = poller_get_free_item(this); + item = &(this->items[idx]); + item->socket = sock; + item->fd = fd; + item->events = events; + + lua_pushinteger(L, idx); + return 1; +} + +/* method: modify */ +static int zmq_poller__modify__meth(lua_State *L) { + zmq_poller * this = obj_type_zmq_poller_check(L,1); + short events = luaL_checkinteger(L,3); + int idx = 0; + zmq_pollitem_t *item; + ZMQ_Socket *sock = NULL; + socket_t fd = 0; + + if(lua_isuserdata(L, 2)) { + sock = obj_type_ZMQ_Socket_check(L, 2); + /* find sock in items list. */ + idx = poller_find_sock_item(this, sock); + } else if(lua_isnumber(L, 2)) { + fd = lua_tonumber(L, 2); + /* find fd in items list. */ + idx = poller_find_fd_item(this, fd); + } else { + return luaL_typerror(L, 2, "number or ZMQ_Socket"); + } + if(events != 0) { + /* add/modify. */ + if(idx < 0) { + idx = poller_get_free_item(this); + } + item = &(this->items[idx]); + item->socket = sock; + item->fd = fd; + item->events = events; + } else if(idx >= 0) { + /* no events remove socket/fd. */ + poller_remove_item(this, idx); + } + + lua_pushinteger(L, idx); + return 1; +} + +/* method: remove */ +static int zmq_poller__remove__meth(lua_State *L) { + zmq_poller * this = obj_type_zmq_poller_check(L,1); + ZMQ_Socket *sock; + socket_t fd; + int idx; + + /* ZMQ_Socket or fd */ + if(lua_isuserdata(L, 2)) { + sock = obj_type_ZMQ_Socket_check(L, 2); + /* find sock in items list. */ + idx = poller_find_sock_item(this, sock); + } else if(lua_isnumber(L, 2)) { + fd = lua_tonumber(L, 2); + /* find fd in items list. */ + idx = poller_find_fd_item(this, fd); + } else { + return luaL_typerror(L, 2, "number or ZMQ_Socket"); + } + /* if sock/fd was found. */ + if(idx >= 0) { + poller_remove_item(this, idx); + } + + return 0; +} + +/* method: poll */ +static int zmq_poller__poll__meth(lua_State *L) { + zmq_poller * this = obj_type_zmq_poller_check(L,1); + long timeout = luaL_checkinteger(L,2); + ZMQ_Error err = 0; + /* poll for events */ + err = poller_poll(this, timeout); + if(err > 0) { + this->next = 0; + } else { + this->next = -1; + } + + /* check for error. */ + if((-1 == err)) { + lua_pushboolean(L, 0); + error_code__ZMQ_Error__push(L, err); + } else { + lua_pushboolean(L, 1); + lua_pushnil(L); + } + return 2; +} + +/* method: next_revents */ +static int zmq_poller__next_revents__meth(lua_State *L) { + zmq_poller * this = obj_type_zmq_poller_check(L,1); + short revents = 0; + zmq_pollitem_t *items; + int count; + int idx; + + revents = -1; + idx = this->next; + if(idx >= 0) { + count = this->count; + items = this->items; + /* find next item with pending events. */ + while(idx < count && items[idx].revents == 0) ++idx; + /* did we find a pending event? */ + if(idx < count) { + /* push the event's sock/fd. */ + if(items[idx].socket != NULL) { + obj_type_ZMQ_Socket_push(L, items[idx].socket, 0); + } else { + lua_pushnumber(L, items[idx].fd); + } + revents = items[idx].revents; + /* is this the last event. */ + ++idx; + this->next = (idx < count) ? idx : -1; + } + } + if(revents < 0) { + /* no more pending events. */ + lua_pushnil(L); + this->next = -1; + } + + lua_pushinteger(L, revents); + return 2; +} + +/* method: count */ +static int zmq_poller__count__meth(lua_State *L) { + zmq_poller * this = obj_type_zmq_poller_check(L,1); + int count = 0; + count = this->count; + + lua_pushinteger(L, count); + return 1; +} + /* method: delete */ static int ZMQ_Ctx__delete__meth(lua_State *L) { int this_flags = 0; @@ -2161,11 +2630,12 @@ static int ZMQ_Ctx__term__meth(lua_State *L) { ZMQ_Error rc_zmq_term = 0; rc_zmq_term = zmq_term(this); /* check for error. */ - if((0 != rc_zmq_term)) { + if((-1 == rc_zmq_term)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_term); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -2255,11 +2725,12 @@ static int zmq__device__func(lua_State *L) { ZMQ_Error rc_zmq_device = 0; rc_zmq_device = zmq_device(device, insock, outsock); /* check for error. */ - if((0 != rc_zmq_device)) { + if((-1 == rc_zmq_device)) { lua_pushboolean(L, 0); error_code__ZMQ_Error__push(L, rc_zmq_device); } else { lua_pushboolean(L, 1); + lua_pushnil(L); } return 2; } @@ -2351,6 +2822,41 @@ static const obj_const obj_ZMQ_Socket_constants[] = { {NULL, NULL, 0.0 , 0} }; +static const luaL_reg obj_zmq_poller_pub_funcs[] = { + {"new", zmq_poller__new__meth}, + {NULL, NULL} +}; + +static const luaL_reg obj_zmq_poller_methods[] = { + {"close", zmq_poller__close__meth}, + {"add", zmq_poller__add__meth}, + {"modify", zmq_poller__modify__meth}, + {"remove", zmq_poller__remove__meth}, + {"poll", zmq_poller__poll__meth}, + {"next_revents", zmq_poller__next_revents__meth}, + {"count", zmq_poller__count__meth}, + {NULL, NULL} +}; + +static const luaL_reg obj_zmq_poller_metas[] = { + {"__gc", zmq_poller__close__meth}, + {"__tostring", obj_simple_udata_default_tostring}, + {"__eq", obj_simple_udata_default_equal}, + {NULL, NULL} +}; + +static const obj_base obj_zmq_poller_bases[] = { + {-1, NULL} +}; + +static const obj_field obj_zmq_poller_fields[] = { + {NULL, 0, 0, 0} +}; + +static const obj_const obj_zmq_poller_constants[] = { + {NULL, NULL, 0.0 , 0} +}; + static const luaL_reg obj_ZMQ_Ctx_pub_funcs[] = { {NULL, NULL} }; @@ -2437,6 +2943,11 @@ static const obj_const zmq_constants[] = { static const ffi_export_symbol zmq_ffi_export[] = { { "get_zmq_strerror", get_zmq_strerror }, { "simple_zmq_send", simple_zmq_send }, +{ "poller_find_sock_item", poller_find_sock_item }, +{ "poller_find_fd_item", poller_find_fd_item }, +{ "poller_get_free_item", poller_get_free_item }, +{ "poller_poll", poller_poll }, +{ "poller_remove_item", poller_remove_item }, {NULL, NULL} }; @@ -2445,6 +2956,7 @@ static const ffi_export_symbol zmq_ffi_export[] = { static const reg_sub_module reg_sub_modules[] = { { &(obj_type_zmq_msg_t), 0, obj_zmq_msg_t_pub_funcs, obj_zmq_msg_t_methods, obj_zmq_msg_t_metas, obj_zmq_msg_t_bases, obj_zmq_msg_t_fields, obj_zmq_msg_t_constants}, { &(obj_type_ZMQ_Socket), 0, obj_ZMQ_Socket_pub_funcs, obj_ZMQ_Socket_methods, obj_ZMQ_Socket_metas, obj_ZMQ_Socket_bases, obj_ZMQ_Socket_fields, obj_ZMQ_Socket_constants}, + { &(obj_type_zmq_poller), 0, obj_zmq_poller_pub_funcs, obj_zmq_poller_methods, obj_zmq_poller_metas, obj_zmq_poller_bases, obj_zmq_poller_fields, obj_zmq_poller_constants}, { &(obj_type_ZMQ_Ctx), 0, obj_ZMQ_Ctx_pub_funcs, obj_ZMQ_Ctx_methods, obj_ZMQ_Ctx_metas, obj_ZMQ_Ctx_bases, obj_ZMQ_Ctx_fields, obj_ZMQ_Ctx_constants}, {NULL, 0, NULL, NULL, NULL, NULL, NULL, NULL} }; diff --git a/zmq.nobj.lua b/zmq.nobj.lua index d1af84a..c02d9bf 100644 --- a/zmq.nobj.lua +++ b/zmq.nobj.lua @@ -120,6 +120,7 @@ subfiles { "src/error.nobj.lua", "src/msg.nobj.lua", "src/socket.nobj.lua", +"src/poller.nobj.lua", "src/ctx.nobj.lua", },