|
|
|
|
@ -40,32 +40,32 @@ typedef struct ZMQ_Poller ZMQ_Poller;
|
|
|
|
|
|
|
|
|
|
#define ITEM_TO_INDEX(items, item) (item - (items))
|
|
|
|
|
|
|
|
|
|
static int poller_resize_items(ZMQ_Poller *this, int len) {
|
|
|
|
|
int old_len = this->len;
|
|
|
|
|
static int poller_resize_items(ZMQ_Poller *poller, int len) {
|
|
|
|
|
int old_len = poller->len;
|
|
|
|
|
|
|
|
|
|
/* make sure new length is atleast as large as items count. */
|
|
|
|
|
len = (this->count <= len) ? len : this->count;
|
|
|
|
|
len = (poller->count <= len) ? len : poller->count;
|
|
|
|
|
|
|
|
|
|
/* if the new length is the same as the old length, then don't try to resize. */
|
|
|
|
|
if(old_len == len) return len;
|
|
|
|
|
|
|
|
|
|
this->items = (zmq_pollitem_t *)realloc(this->items, len * sizeof(zmq_pollitem_t));
|
|
|
|
|
this->len = len;
|
|
|
|
|
poller->items = (zmq_pollitem_t *)realloc(poller->items, len * sizeof(zmq_pollitem_t));
|
|
|
|
|
poller->len = len;
|
|
|
|
|
if(len > old_len) {
|
|
|
|
|
/* clear new space. */
|
|
|
|
|
memset(&(this->items[old_len]), 0, (len - old_len) * sizeof(zmq_pollitem_t));
|
|
|
|
|
memset(&(poller->items[old_len]), 0, (len - old_len) * sizeof(zmq_pollitem_t));
|
|
|
|
|
}
|
|
|
|
|
return len;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int poller_find_sock_item(ZMQ_Poller *this, ZMQ_Socket *sock) {
|
|
|
|
|
static int poller_find_sock_item(ZMQ_Poller *poller, ZMQ_Socket *sock) {
|
|
|
|
|
zmq_pollitem_t *items;
|
|
|
|
|
int count;
|
|
|
|
|
int n;
|
|
|
|
|
|
|
|
|
|
/* find ZMQ_Socket */
|
|
|
|
|
items = this->items;
|
|
|
|
|
count = this->count;
|
|
|
|
|
items = poller->items;
|
|
|
|
|
count = poller->count;
|
|
|
|
|
for(n=0; n < count; n++) {
|
|
|
|
|
if(items[n].socket == sock) return n;
|
|
|
|
|
}
|
|
|
|
|
@ -73,14 +73,14 @@ static int poller_find_sock_item(ZMQ_Poller *this, ZMQ_Socket *sock) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int poller_find_fd_item(ZMQ_Poller *this, socket_t fd) {
|
|
|
|
|
static int poller_find_fd_item(ZMQ_Poller *poller, socket_t fd) {
|
|
|
|
|
zmq_pollitem_t *items;
|
|
|
|
|
int count;
|
|
|
|
|
int n;
|
|
|
|
|
|
|
|
|
|
/* find fd */
|
|
|
|
|
items = this->items;
|
|
|
|
|
count = this->count;
|
|
|
|
|
items = poller->items;
|
|
|
|
|
count = poller->count;
|
|
|
|
|
for(n=0; n < count; n++) {
|
|
|
|
|
if(items[n].fd == fd) return n;
|
|
|
|
|
}
|
|
|
|
|
@ -88,55 +88,55 @@ static int poller_find_fd_item(ZMQ_Poller *this, socket_t fd) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void poller_remove_item(ZMQ_Poller *this, int idx) {
|
|
|
|
|
static void poller_remove_item(ZMQ_Poller *poller, int idx) {
|
|
|
|
|
zmq_pollitem_t *items;
|
|
|
|
|
int free_list;
|
|
|
|
|
int count;
|
|
|
|
|
|
|
|
|
|
count = this->count;
|
|
|
|
|
count = poller->count;
|
|
|
|
|
/* no item to remove. */
|
|
|
|
|
if(idx >= count || count == 0) return;
|
|
|
|
|
|
|
|
|
|
items = this->items;
|
|
|
|
|
free_list = this->free_list;
|
|
|
|
|
items = poller->items;
|
|
|
|
|
free_list = poller->free_list;
|
|
|
|
|
|
|
|
|
|
/* link new free slot to head of free list. */
|
|
|
|
|
if(free_list >= 0 && free_list < count) {
|
|
|
|
|
/* use socket pointer for free list's 'next' field. */
|
|
|
|
|
items[idx].socket = &(items[free_list]);
|
|
|
|
|
} else {
|
|
|
|
|
/* free list is empty mark this slot as the end. */
|
|
|
|
|
/* free list is empty mark poller slot as the end. */
|
|
|
|
|
items[idx].socket = NULL;
|
|
|
|
|
}
|
|
|
|
|
this->free_list = idx;
|
|
|
|
|
/* mark this slot as a free slot. */
|
|
|
|
|
poller->free_list = idx;
|
|
|
|
|
/* mark poller slot as a free slot. */
|
|
|
|
|
items[idx].events = FREE_ITEM_EVENTS_TAG;
|
|
|
|
|
/* clear old revents. */
|
|
|
|
|
items[idx].revents = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int poller_get_free_item(ZMQ_Poller *this) {
|
|
|
|
|
static int poller_get_free_item(ZMQ_Poller *poller) {
|
|
|
|
|
zmq_pollitem_t *curr;
|
|
|
|
|
zmq_pollitem_t *next;
|
|
|
|
|
int count;
|
|
|
|
|
int idx;
|
|
|
|
|
|
|
|
|
|
count = this->count;
|
|
|
|
|
idx = this->free_list;
|
|
|
|
|
count = poller->count;
|
|
|
|
|
idx = poller->free_list;
|
|
|
|
|
/* check for a free slot in the free list. */
|
|
|
|
|
if(idx >= 0 && idx < count) {
|
|
|
|
|
/* remove free slot from free list. */
|
|
|
|
|
curr = &(this->items[idx]);
|
|
|
|
|
curr = &(poller->items[idx]);
|
|
|
|
|
/* valid free slot. */
|
|
|
|
|
assert(curr->events == FREE_ITEM_EVENTS_TAG);
|
|
|
|
|
/* is this the last free slot? */
|
|
|
|
|
/* is poller the last free slot? */
|
|
|
|
|
next = ((zmq_pollitem_t *)curr->socket);
|
|
|
|
|
if(next != NULL) {
|
|
|
|
|
/* set next free slot as head of free list. */
|
|
|
|
|
this->free_list = ITEM_TO_INDEX(this->items, next);
|
|
|
|
|
poller->free_list = ITEM_TO_INDEX(poller->items, next);
|
|
|
|
|
} else {
|
|
|
|
|
/* free list is empty now. */
|
|
|
|
|
this->free_list = -1;
|
|
|
|
|
poller->free_list = -1;
|
|
|
|
|
}
|
|
|
|
|
/* clear slot */
|
|
|
|
|
memset(curr, 0, sizeof(zmq_pollitem_t));
|
|
|
|
|
@ -144,26 +144,26 @@ static int poller_get_free_item(ZMQ_Poller *this) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
idx = count;
|
|
|
|
|
this->count = ++count;
|
|
|
|
|
poller->count = ++count;
|
|
|
|
|
/* make room for new item. */
|
|
|
|
|
if(count >= this->len) {
|
|
|
|
|
poller_resize_items(this, this->len + 10);
|
|
|
|
|
if(count >= poller->len) {
|
|
|
|
|
poller_resize_items(poller, poller->len + 10);
|
|
|
|
|
}
|
|
|
|
|
return idx;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int poller_compact_items(ZMQ_Poller *this) {
|
|
|
|
|
static int poller_compact_items(ZMQ_Poller *poller) {
|
|
|
|
|
zmq_pollitem_t *items;
|
|
|
|
|
int count;
|
|
|
|
|
int old_count;
|
|
|
|
|
int next;
|
|
|
|
|
|
|
|
|
|
count = this->count;
|
|
|
|
|
count = poller->count;
|
|
|
|
|
/* if no free slot, then return. */
|
|
|
|
|
if(this->free_list < 0) return count;
|
|
|
|
|
if(poller->free_list < 0) return count;
|
|
|
|
|
old_count = count;
|
|
|
|
|
|
|
|
|
|
items = this->items;
|
|
|
|
|
items = poller->items;
|
|
|
|
|
next = 0;
|
|
|
|
|
/* find first free slot. */
|
|
|
|
|
while(next < count && items[next].events != FREE_ITEM_EVENTS_TAG) {
|
|
|
|
|
@ -184,19 +184,19 @@ static int poller_compact_items(ZMQ_Poller *this) {
|
|
|
|
|
|
|
|
|
|
/* clear old used-space */
|
|
|
|
|
memset(&(items[count]), 0, ((old_count - count) * sizeof(zmq_pollitem_t)));
|
|
|
|
|
this->count = count;
|
|
|
|
|
this->free_list = -1; /* free list is now empty. */
|
|
|
|
|
poller->count = count;
|
|
|
|
|
poller->free_list = -1; /* free list is now empty. */
|
|
|
|
|
|
|
|
|
|
assert(count <= this->len);
|
|
|
|
|
assert(count <= poller->len);
|
|
|
|
|
return count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int poller_poll(ZMQ_Poller *this, long timeout) {
|
|
|
|
|
static int poller_poll(ZMQ_Poller *poller, long timeout) {
|
|
|
|
|
int count;
|
|
|
|
|
/* remove free slots from items list. */
|
|
|
|
|
count = poller_compact_items(this);
|
|
|
|
|
count = poller_compact_items(poller);
|
|
|
|
|
/* poll for events. */
|
|
|
|
|
return zmq_poll(this->items, count, timeout);
|
|
|
|
|
return zmq_poll(poller->items, count, timeout);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
]],
|
|
|
|
|
@ -215,11 +215,11 @@ typedef struct zmq_pollitem_t {
|
|
|
|
|
int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout);
|
|
|
|
|
]],
|
|
|
|
|
ffi_cdef(ZMQ_Poller_type),
|
|
|
|
|
ffi_export_function "int" "poller_find_sock_item" "(ZMQ_Poller *this, ZMQ_Socket *sock)",
|
|
|
|
|
ffi_export_function "int" "poller_find_fd_item" "(ZMQ_Poller *this, socket_t fd)",
|
|
|
|
|
ffi_export_function "int" "poller_get_free_item" "(ZMQ_Poller *this)",
|
|
|
|
|
ffi_export_function "int" "poller_poll" "(ZMQ_Poller *this, long timeout)",
|
|
|
|
|
ffi_export_function "void" "poller_remove_item" "(ZMQ_Poller *this, int idx)",
|
|
|
|
|
ffi_export_function "int" "poller_find_sock_item" "(ZMQ_Poller *poller, ZMQ_Socket *sock)",
|
|
|
|
|
ffi_export_function "int" "poller_find_fd_item" "(ZMQ_Poller *poller, socket_t fd)",
|
|
|
|
|
ffi_export_function "int" "poller_get_free_item" "(ZMQ_Poller *poller)",
|
|
|
|
|
ffi_export_function "int" "poller_poll" "(ZMQ_Poller *poller, long timeout)",
|
|
|
|
|
ffi_export_function "void" "poller_remove_item" "(ZMQ_Poller *poller, int idx)",
|
|
|
|
|
|
|
|
|
|
constructor "new" {
|
|
|
|
|
var_in{ "unsigned int", "length", is_optional = true, default = 10 },
|
|
|
|
|
|