Update FFI bindings to be more pure (i.e. less messing around with userdata tricks.)

zmq3.0
Robert G. Jakabosky 15 years ago
parent 74764609bf
commit 23db47708b

@ -34,43 +34,36 @@ local poller_mt = {}
poller_mt.__index = poller_mt
function poller_mt:add(sock, events, cb)
self.poller:add(sock, events)
self.callbacks[sock] = cb
local id = self.poller:add(sock, events)
self.callbacks[id] = function(revents) return cb(sock, revents) end
end
function poller_mt:modify(sock, events, cb)
local id
if events ~= 0 and cb then
self.callbacks[sock] = cb
self.poller:modify(sock, events)
id = self.poller:modify(sock, events)
self.callbacks[id] = function(revents) return cb(sock, revents) end
else
self:remove(sock)
id = self:remove(sock)
self.callbacks[id] = nil
end
end
function poller_mt:remove(sock)
self.poller:remove(sock)
self.callbacks[sock] = nil
local id = self.poller:remove(sock)
self.callbacks[id] = nil
end
function poller_mt:poll(timeout)
local poller = self.poller
local status, err = poller:poll(timeout)
if not status then
local count, err = poller:poll(timeout)
if not count then
return nil, err
end
local callbacks = self.callbacks
local count = 0
while true do
local sock, revents = poller:next_revents()
if not sock then
break
end
local cb = callbacks[sock]
if not cb then
error("Missing callback for sock:" .. tostring(sock))
end
cb(sock, revents)
count = count + 1
for i=1,count do
local id, revents = poller:next_revents_idx()
callbacks[id](revents)
end
return count
end
@ -78,7 +71,7 @@ end
function poller_mt:start()
self.is_running = true
while self.is_running do
local status, err = self:poll(-1)
status, err = self:poll(-1)
if not status then
return false, err
end
@ -95,7 +88,7 @@ module(...)
function new(pre_alloc)
return setmetatable({
poller = zmq.ZMQ_Poller(pre_alloc),
callbacks = setmetatable({}, {__mode="k"}),
callbacks = {},
}, poller_mt)
end

@ -58,7 +58,24 @@ static int poller_resize_items(ZMQ_Poller *poller, int len) {
return len;
}
static int poller_find_sock_item(ZMQ_Poller *poller, ZMQ_Socket *sock) {
void poller_init(ZMQ_Poller *poller, int length) {
poller->items = (zmq_pollitem_t *)calloc(length, sizeof(zmq_pollitem_t));
poller->next = -1;
poller->count = 0;
poller->len = length;
poller->free_list = -1;
}
void poller_cleanup(ZMQ_Poller *poller) {
free(poller->items);
poller->items = NULL;
poller->next = -1;
poller->count = 0;
poller->len = 0;
poller->free_list = -1;
}
int poller_find_sock_item(ZMQ_Poller *poller, ZMQ_Socket *sock) {
zmq_pollitem_t *items;
int count;
int n;
@ -73,7 +90,7 @@ static int poller_find_sock_item(ZMQ_Poller *poller, ZMQ_Socket *sock) {
return -1;
}
static int poller_find_fd_item(ZMQ_Poller *poller, socket_t fd) {
int poller_find_fd_item(ZMQ_Poller *poller, socket_t fd) {
zmq_pollitem_t *items;
int count;
int n;
@ -88,7 +105,7 @@ static int poller_find_fd_item(ZMQ_Poller *poller, socket_t fd) {
return -1;
}
static void poller_remove_item(ZMQ_Poller *poller, int idx) {
void poller_remove_item(ZMQ_Poller *poller, int idx) {
zmq_pollitem_t *items;
int free_list;
int count;
@ -115,7 +132,7 @@ static void poller_remove_item(ZMQ_Poller *poller, int idx) {
items[idx].revents = 0;
}
static int poller_get_free_item(ZMQ_Poller *poller) {
int poller_get_free_item(ZMQ_Poller *poller) {
zmq_pollitem_t *curr;
zmq_pollitem_t *next;
int count;
@ -191,14 +208,46 @@ static int poller_compact_items(ZMQ_Poller *poller) {
return count;
}
static int poller_poll(ZMQ_Poller *poller, long timeout) {
int poller_poll(ZMQ_Poller *poller, long timeout) {
int count;
/* remove free slots from items list. */
count = poller_compact_items(poller);
if(poller->free_list >= 0) {
count = poller_compact_items(poller);
} else {
count = poller->count;
}
/* poll for events. */
return zmq_poll(poller->items, count, timeout);
}
int poller_next_revents(ZMQ_Poller *poller, int *revents) {
zmq_pollitem_t *items;
int count;
int idx;
int next;
idx = poller->next;
/* do we need to poll for more events? */
if(idx < 0) {
return idx;
}
items = poller->items;
count = poller->count;
/* find next item with pending events. */
for(;idx < count; ++idx) {
/* did we find a pending event? */
if(items[idx].revents != 0) {
*revents = items[idx].revents;
poller->next = idx+1;
return idx;
}
}
/* processed all pending events. */
poller->next = -1;
*revents = 0;
return -1;
}
]],
--
-- Define ZMQ_Poller type & function API for FFI
@ -212,36 +261,28 @@ typedef struct zmq_pollitem_t {
short revents;
} zmq_pollitem_t;
int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout);
int poller_find_sock_item(ZMQ_Poller *poller, ZMQ_Socket *sock);
int poller_find_fd_item(ZMQ_Poller *poller, socket_t fd);
int poller_get_free_item(ZMQ_Poller *poller);
int poller_poll(ZMQ_Poller *poller, long timeout);
void poller_remove_item(ZMQ_Poller *poller, int idx);
]],
ffi_cdef(ZMQ_Poller_type),
ffi_export_function "int" "poller_find_sock_item" "(ZMQ_Poller *poller, ZMQ_Socket *sock)",
ffi_export_function "int" "poller_find_fd_item" "(ZMQ_Poller *poller, socket_t fd)",
ffi_export_function "int" "poller_get_free_item" "(ZMQ_Poller *poller)",
ffi_export_function "int" "poller_poll" "(ZMQ_Poller *poller, long timeout)",
ffi_export_function "void" "poller_remove_item" "(ZMQ_Poller *poller, int idx)",
constructor "new" {
var_in{ "unsigned int", "length", is_optional = true, default = 10 },
c_source[[
c_source "pre_src" [[
ZMQ_Poller poller;
${this} = &poller;
${this}->items = (zmq_pollitem_t *)calloc(${length}, sizeof(zmq_pollitem_t));
${this}->next = -1;
${this}->count = 0;
${this}->len = ${length};
${this}->free_list = -1;
]],
ffi_source [[
${this} = ffi.new("ZMQ_Poller")
]],
c_method_call "void" "poller_init" { "unsigned int", "length" },
},
destructor "close" {
c_source[[
free(${this}->items);
${this}->items = NULL;
${this}->next = -1;
${this}->count = 0;
${this}->len = 0;
${this}->free_list = -1;
]],
c_method_call "void" "poller_cleanup" {},
},
method "add" {
var_in{ "<any>", "sock" },
@ -265,6 +306,22 @@ int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout);
item->socket = sock;
item->fd = fd;
item->events = ${events};
]],
ffi_source[[
local fd = 0
local sock_type = type(${sock})
if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock})
elseif sock_type == 'number' then
fd = ${sock}
else
error("expected number or ZMQ_Socket")
end
${idx} = C.poller_get_free_item(${this})
local item = ${this}.items[${idx}]
item.socket = sock
item.fd = fd
item.events = ${events}
]],
},
method "modify" {
@ -301,124 +358,104 @@ int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout);
/* no events remove socket/fd. */
poller_remove_item(${this}, ${idx});
}
]],
ffi_source[[
local fd = 0
local sock_type = type(${sock})
if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock})
-- find sock in items list.
${idx} = C.poller_find_sock_item(${this}, sock)
elseif sock_type == 'number' then
fd = ${sock}
-- find fd in items list.
${idx} = C.poller_find_fd_item(${this}, fd);
else
error("expected number or ZMQ_Socket")
end
if ${events} ~= 0 then
local item = ${this}.items[${idx}]
item.socket = sock
item.fd = fd
item.events = ${events}
else
C.poller_remove_item(${this}, ${idx})
end
]],
},
method "remove" {
var_in{ "<any>", "sock" },
var_out{ "int", "idx" },
c_source "pre" [[
ZMQ_Socket *sock;
socket_t fd;
int idx;
]],
c_source[[
/* ZMQ_Socket or fd */
if(lua_isuserdata(L, ${sock::idx})) {
sock = obj_type_ZMQ_Socket_check(L, ${sock::idx});
/* find sock in items list. */
idx = poller_find_sock_item(${this}, sock);
${idx} = poller_find_sock_item(${this}, sock);
} else if(lua_isnumber(L, ${sock::idx})) {
fd = lua_tonumber(L, ${sock::idx});
/* find fd in items list. */
idx = poller_find_fd_item(${this}, fd);
${idx} = poller_find_fd_item(${this}, fd);
} else {
return luaL_typerror(L, ${sock::idx}, "number or ZMQ_Socket");
}
/* if sock/fd was found. */
if(idx >= 0) {
poller_remove_item(${this}, idx);
if(${idx} >= 0) {
poller_remove_item(${this}, ${idx});
}
]],
ffi_source[[
local fd = 0
local sock_type = type(${sock})
if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock})
-- find sock in items list.
${idx} = C.poller_find_sock_item(${this}, sock)
elseif sock_type == 'number' then
fd = ${sock}
-- find fd in items list.
${idx} = C.poller_find_fd_item(${this}, fd);
else
error("expected number or ZMQ_Socket")
end
if ${idx} >= 0 then
C.poller_remove_item(${this}, ${idx})
end
]],
},
method "poll" {
var_in{ "long", "timeout" },
var_out{ "int", "count" },
var_out{ "ZMQ_Error", "err" },
c_source[[
/* poll for events */
${err} = poller_poll(${this}, ${timeout});
if(${err} > 0) {
${this}->next = 0;
${count} = ${err};
} else {
${this}->next = -1;
${count} = 0;
}
]],
ffi_source[[
-- poll for events
${err} = poller_poll(${this}, ${timeout})
${err} = C.poller_poll(${this}, ${timeout})
if(${err} > 0) then
${this}.next = 0
${count} = ${err}
else
${this}.next = -1
${count} = 0
end
]],
},
method "next_revents" {
var_out{ "<any>", "sock" },
var_out{ "short", "revents" },
c_source "pre" [[
zmq_pollitem_t *items;
int count;
int idx;
]],
c_source[[
${revents} = -1;
idx = ${this}->next;
if(idx >= 0) {
count = ${this}->count;
items = ${this}->items;
/* find next item with pending events. */
while(idx < count && items[idx].revents == 0) ++idx;
/* did we find a pending event? */
if(idx < count) {
/* push the event's sock/fd. */
if(items[idx].socket != NULL) {
obj_type_ZMQ_Socket_push(L, items[idx].socket, 0);
} else {
lua_pushnumber(L, items[idx].fd);
}
${revents} = items[idx].revents;
/* is this the last event. */
++idx;
${this}->next = (idx < count) ? idx : -1;
}
}
if(${revents} < 0) {
/* no more pending events. */
lua_pushnil(L);
${this}->next = -1;
}
]],
ffi_source[[
local sock
local idx = ${this}.next
if (idx < 0) then return nil, -1 end
local count = ${this}.count
-- find next item with pending events.
while (idx < count and ${this}.items[idx].revents == 0) do
idx = idx + 1
if (idx >= count) then
idx = -1
break
end
end
-- did we find a pending event?
if(idx >= 0) then
-- push the event's sock/fd.
if(${this}.items[idx].socket ~= nil) then
sock = obj_type_ZMQ_Socket_push(${this}.items[idx].socket, 0)
else
sock = tonumber(${this}.items[idx].fd)
end
${revents} = ${this}.items[idx].revents
-- is this the last event.
idx = idx + 1
if (idx >= count) then
idx = -1
end
${this}.next = idx
return sock, ${revents}
end
${this}.next = idx
]],
method "next_revents_idx" {
c_method_call { "int", "idx>1" } "poller_poll_next_revents" { "int", "&revents>2" },
},
method "count" {
var_out{ "int", "count" },

File diff suppressed because it is too large Load Diff

@ -344,7 +344,7 @@ local ZMQ_EVENTS = _M.EVENTS
},
-- create helper function for `zmq_send`
c_source[[
static ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_t data_len, int flags) {
ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_t data_len, int flags) {
ZMQ_Error err;
zmq_msg_t msg;
/* initialize message */
@ -360,19 +360,9 @@ static ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_t data
return err;
}
]],
-- export helper function.
ffi_export_function "ZMQ_Error" "simple_zmq_send"
"(ZMQ_Socket *sock, const char *data, size_t data_len, int flags)",
method "send" {
var_in{ "const char *", "data" },
var_in{ "int", "flags?" },
var_out{ "ZMQ_Error", "err" },
c_source[[
${err} = simple_zmq_send(${this}, ${data}, ${data_len}, ${flags});
]],
ffi_source[[
${err} = simple_zmq_send(${this}, ${data}, ${data_len}, ${flags});
]],
c_method_call "ZMQ_Error" "simple_zmq_send"
{ "const char *", "data", "size_t", "#data", "int", "flags?"}
},
--
-- zmq_recv

@ -26,11 +26,12 @@ c_module "zmq" {
use_globals = false,
hide_meta_info = true,
luajit_ffi = true,
luajit_ffi_load_cmodule = "global",
sys_include "string.h",
include "zmq.h",
ffi_load {
ffi_load { global = true,
"zmq", -- default lib name.
Windows = "libzmq", -- lib name for on windows.
},

Loading…
Cancel
Save