Added support for running Lua code in child threads.

pull/10/head
Robert G. Jakabosky 15 years ago
parent 6126e4a0d6
commit 5de9a8d46f

@ -12,6 +12,7 @@
#include <string.h>
#include "zmq.h"
#include <pthread.h>
#define REG_PACKAGE_IS_CONSTRUCTOR 0
@ -169,15 +170,23 @@ typedef struct ffi_export_symbol {
#define obj_type_ZMQ_Socket_push(L, obj, flags) \
obj_udata_luapush_weak(L, (void *)obj, &(obj_type_ZMQ_Socket), flags)
#define obj_type_id_zmq_poller 2
#define obj_type_zmq_poller_check(L, _index) \
(zmq_poller *)obj_simple_udata_luacheck(L, _index, &(obj_type_zmq_poller))
#define obj_type_zmq_poller_delete(L, _index, flags) \
(zmq_poller *)obj_simple_udata_luadelete(L, _index, &(obj_type_zmq_poller), flags)
#define obj_type_zmq_poller_push(L, obj, flags) \
obj_simple_udata_luapush(L, obj, sizeof(zmq_poller), &(obj_type_zmq_poller))
#define obj_type_id_ZMQ_Ctx 3
#define obj_type_id_ZMQ_Poller 2
#define obj_type_ZMQ_Poller_check(L, _index) \
(ZMQ_Poller *)obj_simple_udata_luacheck(L, _index, &(obj_type_ZMQ_Poller))
#define obj_type_ZMQ_Poller_delete(L, _index, flags) \
(ZMQ_Poller *)obj_simple_udata_luadelete(L, _index, &(obj_type_ZMQ_Poller), flags)
#define obj_type_ZMQ_Poller_push(L, obj, flags) \
obj_simple_udata_luapush(L, obj, sizeof(ZMQ_Poller), &(obj_type_ZMQ_Poller))
#define obj_type_id_ZMQ_Thread 3
#define obj_type_ZMQ_Thread_check(L, _index) \
obj_udata_luacheck(L, _index, &(obj_type_ZMQ_Thread))
#define obj_type_ZMQ_Thread_delete(L, _index, flags) \
obj_udata_luadelete(L, _index, &(obj_type_ZMQ_Thread), flags)
#define obj_type_ZMQ_Thread_push(L, obj, flags) \
obj_udata_luapush_weak(L, (void *)obj, &(obj_type_ZMQ_Thread), flags)
#define obj_type_id_ZMQ_Ctx 4
#define obj_type_ZMQ_Ctx_check(L, _index) \
obj_udata_luacheck(L, _index, &(obj_type_ZMQ_Ctx))
#define obj_type_ZMQ_Ctx_delete(L, _index, flags) \
@ -194,8 +203,9 @@ static void error_code__ZMQ_Error__push(lua_State *L, ZMQ_Error err);
static obj_type obj_type_zmq_msg_t = { NULL, 0, OBJ_TYPE_SIMPLE, "zmq_msg_t" };
static obj_type obj_type_ZMQ_Socket = { NULL, 1, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Socket" };
static obj_type obj_type_zmq_poller = { NULL, 2, OBJ_TYPE_SIMPLE, "zmq_poller" };
static obj_type obj_type_ZMQ_Ctx = { NULL, 3, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Ctx" };
static obj_type obj_type_ZMQ_Poller = { NULL, 2, OBJ_TYPE_SIMPLE, "ZMQ_Poller" };
static obj_type obj_type_ZMQ_Thread = { NULL, 3, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Thread" };
static obj_type obj_type_ZMQ_Ctx = { NULL, 4, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Ctx" };
#ifndef REG_PACKAGE_IS_CONSTRUCTOR
@ -875,15 +885,15 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
"\n"
"ZMQ_Error zmq_recv(ZMQ_Socket * this, zmq_msg_t * msg, int flags);\n"
"\n"
"typedef int (*poller_find_sock_item_func)(zmq_poller *this, ZMQ_Socket *sock);\n"
"typedef int (*poller_find_sock_item_func)(ZMQ_Poller *this, ZMQ_Socket *sock);\n"
"\n"
"typedef int (*poller_find_fd_item_func)(zmq_poller *this, socket_t fd);\n"
"typedef int (*poller_find_fd_item_func)(ZMQ_Poller *this, socket_t fd);\n"
"\n"
"typedef int (*poller_get_free_item_func)(zmq_poller *this);\n"
"typedef int (*poller_get_free_item_func)(ZMQ_Poller *this);\n"
"\n"
"typedef int (*poller_poll_func)(zmq_poller *this, long timeout);\n"
"typedef int (*poller_poll_func)(ZMQ_Poller *this, long timeout);\n"
"\n"
"typedef void (*poller_remove_item_func)(zmq_poller *this, int idx);\n"
"typedef void (*poller_remove_item_func)(ZMQ_Poller *this, int idx);\n"
"\n"
"typedef struct zmq_pollitem_t {\n"
" ZMQ_Socket socket;\n"
@ -894,13 +904,13 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
"\n"
"int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout);\n"
"\n"
"typedef struct zmq_poller {\n"
"typedef struct ZMQ_Poller {\n"
" zmq_pollitem_t *items;\n"
" int next;\n"
" int count;\n"
" int free_list;\n"
" int len;\n"
"} zmq_poller;\n"
"} ZMQ_Poller;\n"
"\n"
"typedef void * ZMQ_Ctx;\n"
"\n"
@ -974,32 +984,61 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
"end\n"
"\n"
"\n"
"local zmq_poller_pub = _M[\"zmq_poller\"]\n"
"local zmq_poller_mt = _priv[\"zmq_poller\"]\n"
"local zmq_poller_type = obj_type_ptr(zmq_poller_mt[\".type\"])\n"
"local zmq_poller_meth = zmq_poller_mt.__index\n"
"local zmq_poller_objects = setmetatable({}, { __mode = \"k\" })\n"
"local ZMQ_Poller_pub = _M[\"ZMQ_Poller\"]\n"
"local ZMQ_Poller_mt = _priv[\"ZMQ_Poller\"]\n"
"local ZMQ_Poller_type = obj_type_ptr(ZMQ_Poller_mt[\".type\"])\n"
"local ZMQ_Poller_meth = ZMQ_Poller_mt.__index\n"
"local ZMQ_Poller_objects = setmetatable({}, { __mode = \"k\" })\n"
"\n"
"local function obj_type_ZMQ_Poller_check(ud_obj)\n"
" local c_obj = ZMQ_Poller_objects[ud_obj]\n"
" if c_obj == nil then\n"
" -- cdata object not in cache\n"
" c_obj = obj_simple_udata_luacheck(ud_obj, ZMQ_Poller_mt)\n"
" c_obj = ffi.cast(\"ZMQ_Poller *\", c_obj) -- cast from 'void *'\n"
" ZMQ_Poller_objects[ud_obj] = c_obj\n"
" end\n"
" return c_obj\n"
"end\n"
"\n"
"local function obj_type_ZMQ_Poller_delete(ud_obj)\n"
" ZMQ_Poller_objects[ud_obj] = nil\n"
" return obj_simple_udata_luadelete(ud_obj, ZMQ_Poller_mt)\n"
"end\n"
"\n"
"local ZMQ_Poller_sizeof = ffi.sizeof\"ZMQ_Poller\"\n"
"local function obj_type_ZMQ_Poller_push(c_obj)\n"
" local ud_obj, cdata = obj_simple_udata_luapush(c_obj, ZMQ_Poller_sizeof, ZMQ_Poller_mt)\n"
" ZMQ_Poller_objects[ud_obj] = cdata\n"
" return ud_obj\n"
"end\n"
"\n"
"\n"
"local ZMQ_Thread_pub = _M[\"ZMQ_Thread\"]\n"
"local ZMQ_Thread_mt = _priv[\"ZMQ_Thread\"]\n"
"local ZMQ_Thread_type = obj_type_ptr(ZMQ_Thread_mt[\".type\"])\n"
"local ZMQ_Thread_meth = ZMQ_Thread_mt.__index\n"
"local ZMQ_Thread_objects = setmetatable({}, { __mode = \"k\" })\n"
"\n"
"local function obj_type_zmq_poller_check(ud_obj)\n"
" local c_obj = zmq_poller_objects[ud_obj]\n"
"local function obj_type_ZMQ_Thread_check(ud_obj)\n"
" local c_obj = ZMQ_Thread_objects[ud_obj]\n"
" if c_obj == nil then\n"
" -- cdata object not in cache\n"
" c_obj = obj_simple_udata_luacheck(ud_obj, zmq_poller_mt)\n"
" c_obj = ffi.cast(\"zmq_poller *\", c_obj) -- cast from 'void *'\n"
" zmq_poller_objects[ud_obj] = c_obj\n"
" c_obj = obj_udata_luacheck(ud_obj, ZMQ_Thread_mt)\n"
" c_obj = ffi.cast(\"ZMQ_Thread *\", c_obj) -- cast from 'void *'\n"
" ZMQ_Thread_objects[ud_obj] = c_obj\n"
" end\n"
" return c_obj\n"
"end\n"
"\n"
"local function obj_type_zmq_poller_delete(ud_obj)\n"
" zmq_poller_objects[ud_obj] = nil\n"
" return obj_simple_udata_luadelete(ud_obj, zmq_poller_mt)\n"
"local function obj_type_ZMQ_Thread_delete(ud_obj)\n"
" ZMQ_Thread_objects[ud_obj] = nil\n"
" return obj_udata_luadelete(ud_obj, ZMQ_Thread_mt)\n"
"end\n"
"\n"
"local zmq_poller_sizeof = ffi.sizeof\"zmq_poller\"\n"
"local function obj_type_zmq_poller_push(c_obj)\n"
" local ud_obj, cdata = obj_simple_udata_luapush(c_obj, zmq_poller_sizeof, zmq_poller_mt)\n"
" zmq_poller_objects[ud_obj] = cdata\n"
"local function obj_type_ZMQ_Thread_push(c_obj, flags)\n"
" local ud_obj = obj_udata_luapush_weak(c_obj, ZMQ_Thread_mt, ZMQ_Thread_type, flags)\n"
" ZMQ_Thread_objects[ud_obj] = c_obj\n"
" return ud_obj\n"
"end\n"
"\n"
@ -1483,7 +1522,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
"-- End \"ZMQ_Socket\" FFI interface\n"
"\n"
"\n"
"-- Start \"zmq_poller\" FFI interface\n"
"-- Start \"ZMQ_Poller\" FFI interface\n"
"local poller_find_sock_item = ffi.new(\"poller_find_sock_item_func\", _priv[\"poller_find_sock_item\"])\n"
"\n"
"local poller_find_fd_item = ffi.new(\"poller_find_fd_item_func\", _priv[\"poller_find_fd_item\"])\n"
@ -1495,8 +1534,8 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
"local poller_remove_item = ffi.new(\"poller_remove_item_func\", _priv[\"poller_remove_item\"])\n"
"\n"
"-- method: count\n"
"function zmq_poller_meth.count(self)\n"
" local this = obj_type_zmq_poller_check(self)\n"
"function ZMQ_Poller_meth.count(self)\n"
" local this = obj_type_ZMQ_Poller_check(self)\n"
" local count\n"
" count = this.count;\n"
"\n"
@ -1504,7 +1543,11 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n"
" return count\n"
"end\n"
"\n"
"-- End \"zmq_poller\" FFI interface\n"
"-- End \"ZMQ_Poller\" FFI interface\n"
"\n"
"\n"
"-- Start \"ZMQ_Thread\" FFI interface\n"
"-- End \"ZMQ_Thread\" FFI interface\n"
"\n"
"\n"
"-- Start \"ZMQ_Ctx\" FFI interface\n"
@ -1676,19 +1719,19 @@ static ZMQ_Error simple_zmq_send(ZMQ_Socket sock, const char *data, size_t data_
return err;
}
typedef struct zmq_poller {
typedef struct ZMQ_Poller {
zmq_pollitem_t *items;
int next;
int count;
int free_list;
int len;
} zmq_poller;
} ZMQ_Poller;
#define FREE_ITEM_EVENTS_TAG 0xFFFF
#define ITEM_TO_INDEX(items, item) (item - (items))
static int poller_resize_items(zmq_poller *this, int len) {
static int poller_resize_items(ZMQ_Poller *this, int len) {
int old_len = this->len;
/* make sure new length is atleast as large as items count. */
@ -1706,7 +1749,7 @@ static int poller_resize_items(zmq_poller *this, int len) {
return len;
}
static int poller_find_sock_item(zmq_poller *this, ZMQ_Socket *sock) {
static int poller_find_sock_item(ZMQ_Poller *this, ZMQ_Socket *sock) {
zmq_pollitem_t *items;
int count;
int n;
@ -1721,7 +1764,7 @@ static int poller_find_sock_item(zmq_poller *this, ZMQ_Socket *sock) {
return -1;
}
static int poller_find_fd_item(zmq_poller *this, socket_t fd) {
static int poller_find_fd_item(ZMQ_Poller *this, socket_t fd) {
zmq_pollitem_t *items;
int count;
int n;
@ -1736,7 +1779,7 @@ static int poller_find_fd_item(zmq_poller *this, socket_t fd) {
return -1;
}
static void poller_remove_item(zmq_poller *this, int idx) {
static void poller_remove_item(ZMQ_Poller *this, int idx) {
zmq_pollitem_t *items;
int free_list;
int count;
@ -1761,7 +1804,7 @@ static void poller_remove_item(zmq_poller *this, int idx) {
items[idx].events = FREE_ITEM_EVENTS_TAG;
}
static int poller_get_free_item(zmq_poller *this) {
static int poller_get_free_item(ZMQ_Poller *this) {
zmq_pollitem_t *curr;
zmq_pollitem_t *next;
int count;
@ -1798,7 +1841,7 @@ static int poller_get_free_item(zmq_poller *this) {
return idx;
}
static int poller_compact_items(zmq_poller *this) {
static int poller_compact_items(ZMQ_Poller *this) {
zmq_pollitem_t *items;
int count;
int old_count;
@ -1837,7 +1880,7 @@ static int poller_compact_items(zmq_poller *this) {
return count;
}
static int poller_poll(zmq_poller *this, long timeout) {
static int poller_poll(ZMQ_Poller *this, long timeout) {
int count;
/* remove free slots from items list. */
count = poller_compact_items(this);
@ -1846,6 +1889,93 @@ static int poller_poll(zmq_poller *this, long timeout) {
}
typedef enum {
TSTATE_NONE = 0,
TSTATE_STARTED = 1<<1,
TSTATE_DETACHED = 1<<2,
TSTATE_JOINED = 1<<3,
} ZMQ_TState;
typedef struct ZMQ_Thread {
lua_State *L;
pthread_t thread;
ZMQ_TState state;
int nargs;
int status;
} ZMQ_Thread;
#include <stdio.h>
/* traceback() function from Lua 5.1.x source. */
static int traceback (lua_State *L) {
if (!lua_isstring(L, 1)) /* 'message' not a string? */
return 1; /* keep it intact */
lua_getfield(L, LUA_GLOBALSINDEX, "debug");
if (!lua_istable(L, -1)) {
lua_pop(L, 1);
return 1;
}
lua_getfield(L, -1, "traceback");
if (!lua_isfunction(L, -1)) {
lua_pop(L, 2);
return 1;
}
lua_pushvalue(L, 1); /* pass error message */
lua_pushinteger(L, 2); /* skip this function and traceback */
lua_call(L, 2, 1); /* call debug.traceback */
return 1;
}
static ZMQ_Thread *thread_new() {
ZMQ_Thread *this;
this = (ZMQ_Thread *)calloc(1, sizeof(ZMQ_Thread));
this->state = TSTATE_NONE;
/* create new lua_State for the thread. */
this->L = luaL_newstate();
/* open standard libraries. */
luaL_openlibs(this->L);
/* push traceback function as first value on stack. */
lua_pushcfunction(this->L, traceback);
return this;
}
static void thread_destroy(ZMQ_Thread *this) {
lua_close(this->L);
free(this);
}
static void *run_child_thread(void *arg) {
ZMQ_Thread *this = (ZMQ_Thread *)arg;
lua_State *L = this->L;
this->status = lua_pcall(L, this->nargs, 0, 1);
if(this->status != 0) {
const char *err_msg = lua_tostring(L, -1);
fprintf(stderr, "%s\n", err_msg);
fflush(stderr);
}
return NULL;
}
static int thread_start(ZMQ_Thread *this, int start_detached) {
int rc;
this->state = TSTATE_STARTED | ((start_detached) ? TSTATE_DETACHED : 0);
rc = pthread_create(&(this->thread), NULL, run_child_thread, this);
if(rc != 0) return rc;
if(start_detached) {
rc = pthread_detach(this->thread);
}
return rc;
}
typedef void * ZMQ_Ctx;
#define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1)
@ -2417,11 +2547,11 @@ static int ZMQ_Socket__recv__meth(lua_State *L) {
}
/* method: new */
static int zmq_poller__new__meth(lua_State *L) {
static int ZMQ_Poller__new__meth(lua_State *L) {
unsigned int length = luaL_optinteger(L,1,10);
int this_flags = OBJ_UDATA_FLAG_OWN;
zmq_poller * this;
zmq_poller poller;
ZMQ_Poller * this;
ZMQ_Poller poller;
this = &poller;
this->items = (zmq_pollitem_t *)calloc(length, sizeof(zmq_pollitem_t));
this->next = -1;
@ -2429,14 +2559,14 @@ static int zmq_poller__new__meth(lua_State *L) {
this->len = length;
this->free_list = -1;
obj_type_zmq_poller_push(L, this, this_flags);
obj_type_ZMQ_Poller_push(L, this, this_flags);
return 1;
}
/* method: close */
static int zmq_poller__close__meth(lua_State *L) {
static int ZMQ_Poller__close__meth(lua_State *L) {
int this_flags = 0;
zmq_poller * this = obj_type_zmq_poller_delete(L,1,&(this_flags));
ZMQ_Poller * this = obj_type_ZMQ_Poller_delete(L,1,&(this_flags));
if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; }
free(this->items);
this->items = NULL;
@ -2449,8 +2579,8 @@ static int zmq_poller__close__meth(lua_State *L) {
}
/* method: add */
static int zmq_poller__add__meth(lua_State *L) {
zmq_poller * this = obj_type_zmq_poller_check(L,1);
static int ZMQ_Poller__add__meth(lua_State *L) {
ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1);
short events = luaL_checkinteger(L,3);
int idx = 0;
zmq_pollitem_t *item;
@ -2475,8 +2605,8 @@ static int zmq_poller__add__meth(lua_State *L) {
}
/* method: modify */
static int zmq_poller__modify__meth(lua_State *L) {
zmq_poller * this = obj_type_zmq_poller_check(L,1);
static int ZMQ_Poller__modify__meth(lua_State *L) {
ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1);
short events = luaL_checkinteger(L,3);
int idx = 0;
zmq_pollitem_t *item;
@ -2513,8 +2643,8 @@ static int zmq_poller__modify__meth(lua_State *L) {
}
/* method: remove */
static int zmq_poller__remove__meth(lua_State *L) {
zmq_poller * this = obj_type_zmq_poller_check(L,1);
static int ZMQ_Poller__remove__meth(lua_State *L) {
ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1);
ZMQ_Socket *sock;
socket_t fd;
int idx;
@ -2540,8 +2670,8 @@ static int zmq_poller__remove__meth(lua_State *L) {
}
/* method: poll */
static int zmq_poller__poll__meth(lua_State *L) {
zmq_poller * this = obj_type_zmq_poller_check(L,1);
static int ZMQ_Poller__poll__meth(lua_State *L) {
ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1);
long timeout = luaL_checkinteger(L,2);
ZMQ_Error err = 0;
/* poll for events */
@ -2564,8 +2694,8 @@ static int zmq_poller__poll__meth(lua_State *L) {
}
/* method: next_revents */
static int zmq_poller__next_revents__meth(lua_State *L) {
zmq_poller * this = obj_type_zmq_poller_check(L,1);
static int ZMQ_Poller__next_revents__meth(lua_State *L) {
ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1);
short revents = 0;
zmq_pollitem_t *items;
int count;
@ -2603,8 +2733,8 @@ static int zmq_poller__next_revents__meth(lua_State *L) {
}
/* method: count */
static int zmq_poller__count__meth(lua_State *L) {
zmq_poller * this = obj_type_zmq_poller_check(L,1);
static int ZMQ_Poller__count__meth(lua_State *L) {
ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1);
int count = 0;
count = this->count;
@ -2612,6 +2742,114 @@ static int zmq_poller__count__meth(lua_State *L) {
return 1;
}
/* method: new */
static int ZMQ_Thread__new__meth(lua_State *L) {
size_t bootstrap_code_len;
const char * bootstrap_code = luaL_checklstring(L,1,&(bootstrap_code_len));
int this_flags = OBJ_UDATA_FLAG_OWN;
ZMQ_Thread * this;
const char *str;
size_t str_len;
int nargs = 0;
int rc;
int top;
int n;
this = thread_new();
/* load bootstrap Lua code into child state. */
rc = luaL_loadbuffer(this->L, bootstrap_code, bootstrap_code_len, bootstrap_code);
if(rc != 0) {
/* copy error message to parent state. */
str = lua_tolstring(this->L, -1, &(str_len));
if(str != NULL) {
lua_pushlstring(L, str, str_len);
} else {
/* non-string error message. */
lua_pushfstring(L, "luaL_loadbuffer() failed to luad bootstrap code: rc=%d", rc);
}
thread_destroy(this);
return lua_error(L);
}
/* copy extra args from main state to child state. */
top = lua_gettop(L);
/* skip the bootstrap code. */
for(n = 1 + 1; n <= top; n++) {
/* only support string/number/boolean/nil/lightuserdata. */
switch(lua_type(L, n)) {
case LUA_TNIL:
lua_pushnil(this->L);
break;
case LUA_TNUMBER:
lua_pushnumber(this->L, lua_tonumber(L, n));
break;
case LUA_TBOOLEAN:
lua_pushboolean(this->L, lua_toboolean(L, n));
break;
case LUA_TSTRING:
str = lua_tolstring(L, n, &(str_len));
lua_pushlstring(this->L, str, str_len);
break;
case LUA_TLIGHTUSERDATA:
lua_pushlightuserdata(this->L, lua_touserdata(L, n));
break;
case LUA_TTABLE:
case LUA_TFUNCTION:
case LUA_TUSERDATA:
case LUA_TTHREAD:
default:
return luaL_argerror(L, n,
"Only nil/number/boolean/string/lightuserdata values are supported");
}
++nargs;
}
this->nargs = nargs;
obj_type_ZMQ_Thread_push(L, this, this_flags);
return 1;
}
/* method: delete */
static int ZMQ_Thread__delete__meth(lua_State *L) {
int this_flags = 0;
ZMQ_Thread * this = obj_type_ZMQ_Thread_delete(L,1,&(this_flags));
if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; }
/* We still own the thread object iff the thread was not started or we have joined the thread. */
if(this->state == TSTATE_NONE || this->state == TSTATE_JOINED) {
thread_destroy(this);
}
return 0;
}
/* method: start */
static int ZMQ_Thread__start__meth(lua_State *L) {
ZMQ_Thread * this = obj_type_ZMQ_Thread_check(L,1);
bool start_detached = lua_toboolean(L,2);
int rc = 0;
if(this->state != TSTATE_NONE) {
return luaL_error(L, "Thread already started.");
}
rc = thread_start(this, start_detached);
lua_pushinteger(L, rc);
return 1;
}
/* method: join */
static int ZMQ_Thread__join__meth(lua_State *L) {
ZMQ_Thread * this = obj_type_ZMQ_Thread_check(L,1);
int rc = 0;
if((this->state & TSTATE_STARTED) == 0) {
return luaL_error(L, "Can't join a thread that hasn't be started.");
}
rc = pthread_join(this->thread, NULL);
/* now we own the thread object again. */
this->state = TSTATE_JOINED;
lua_pushinteger(L, rc);
return 1;
}
/* method: delete */
static int ZMQ_Ctx__delete__meth(lua_State *L) {
int this_flags = 0;
@ -2822,38 +3060,68 @@ static const obj_const obj_ZMQ_Socket_constants[] = {
{NULL, NULL, 0.0 , 0}
};
static const luaL_reg obj_zmq_poller_pub_funcs[] = {
{"new", zmq_poller__new__meth},
static const luaL_reg obj_ZMQ_Poller_pub_funcs[] = {
{"new", ZMQ_Poller__new__meth},
{NULL, NULL}
};
static const luaL_reg obj_zmq_poller_methods[] = {
{"close", zmq_poller__close__meth},
{"add", zmq_poller__add__meth},
{"modify", zmq_poller__modify__meth},
{"remove", zmq_poller__remove__meth},
{"poll", zmq_poller__poll__meth},
{"next_revents", zmq_poller__next_revents__meth},
{"count", zmq_poller__count__meth},
static const luaL_reg obj_ZMQ_Poller_methods[] = {
{"close", ZMQ_Poller__close__meth},
{"add", ZMQ_Poller__add__meth},
{"modify", ZMQ_Poller__modify__meth},
{"remove", ZMQ_Poller__remove__meth},
{"poll", ZMQ_Poller__poll__meth},
{"next_revents", ZMQ_Poller__next_revents__meth},
{"count", ZMQ_Poller__count__meth},
{NULL, NULL}
};
static const luaL_reg obj_zmq_poller_metas[] = {
{"__gc", zmq_poller__close__meth},
static const luaL_reg obj_ZMQ_Poller_metas[] = {
{"__gc", ZMQ_Poller__close__meth},
{"__tostring", obj_simple_udata_default_tostring},
{"__eq", obj_simple_udata_default_equal},
{NULL, NULL}
};
static const obj_base obj_zmq_poller_bases[] = {
static const obj_base obj_ZMQ_Poller_bases[] = {
{-1, NULL}
};
static const obj_field obj_ZMQ_Poller_fields[] = {
{NULL, 0, 0, 0}
};
static const obj_const obj_ZMQ_Poller_constants[] = {
{NULL, NULL, 0.0 , 0}
};
static const luaL_reg obj_ZMQ_Thread_pub_funcs[] = {
{"new", ZMQ_Thread__new__meth},
{NULL, NULL}
};
static const luaL_reg obj_ZMQ_Thread_methods[] = {
{"start", ZMQ_Thread__start__meth},
{"join", ZMQ_Thread__join__meth},
{NULL, NULL}
};
static const luaL_reg obj_ZMQ_Thread_metas[] = {
{"__gc", ZMQ_Thread__delete__meth},
{"__tostring", obj_udata_default_tostring},
{"__eq", obj_udata_default_equal},
{NULL, NULL}
};
static const obj_base obj_ZMQ_Thread_bases[] = {
{-1, NULL}
};
static const obj_field obj_zmq_poller_fields[] = {
static const obj_field obj_ZMQ_Thread_fields[] = {
{NULL, 0, 0, 0}
};
static const obj_const obj_zmq_poller_constants[] = {
static const obj_const obj_ZMQ_Thread_constants[] = {
{NULL, NULL, 0.0 , 0}
};
@ -2956,7 +3224,8 @@ static const ffi_export_symbol zmq_ffi_export[] = {
static const reg_sub_module reg_sub_modules[] = {
{ &(obj_type_zmq_msg_t), 0, obj_zmq_msg_t_pub_funcs, obj_zmq_msg_t_methods, obj_zmq_msg_t_metas, obj_zmq_msg_t_bases, obj_zmq_msg_t_fields, obj_zmq_msg_t_constants},
{ &(obj_type_ZMQ_Socket), 0, obj_ZMQ_Socket_pub_funcs, obj_ZMQ_Socket_methods, obj_ZMQ_Socket_metas, obj_ZMQ_Socket_bases, obj_ZMQ_Socket_fields, obj_ZMQ_Socket_constants},
{ &(obj_type_zmq_poller), 0, obj_zmq_poller_pub_funcs, obj_zmq_poller_methods, obj_zmq_poller_metas, obj_zmq_poller_bases, obj_zmq_poller_fields, obj_zmq_poller_constants},
{ &(obj_type_ZMQ_Poller), 0, obj_ZMQ_Poller_pub_funcs, obj_ZMQ_Poller_methods, obj_ZMQ_Poller_metas, obj_ZMQ_Poller_bases, obj_ZMQ_Poller_fields, obj_ZMQ_Poller_constants},
{ &(obj_type_ZMQ_Thread), 0, obj_ZMQ_Thread_pub_funcs, obj_ZMQ_Thread_methods, obj_ZMQ_Thread_metas, obj_ZMQ_Thread_bases, obj_ZMQ_Thread_fields, obj_ZMQ_Thread_constants},
{ &(obj_type_ZMQ_Ctx), 0, obj_ZMQ_Ctx_pub_funcs, obj_ZMQ_Ctx_methods, obj_ZMQ_Ctx_metas, obj_ZMQ_Ctx_bases, obj_ZMQ_Ctx_fields, obj_ZMQ_Ctx_constants},
{NULL, 0, NULL, NULL, NULL, NULL, NULL, NULL}
};

@ -0,0 +1,96 @@
-- Copyright (c) 2011 by Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
--
-- zmq.thread wraps the low-level zmq.ZMQ_Thread object.
--
local zmq = require"zmq"
local setmetatable = setmetatable
local tonumber = tonumber
local assert = assert
local thread_mt = {}
thread_mt.__index = thread_mt
function thread_mt:start(detached)
return self.thread:start(detached)
end
function thread_mt:join()
return self.thread:join()
end
local bootstrap_code = [[
local action, action_arg, parent_ctx = ...
local func
-- copy parent ZeroMQ context to this child thread.
local zmq = require"zmq"
local zmq_thread = require"zmq.thread"
zmq_thread.set_parent_ctx(zmq.init_ctx(parent_ctx))
-- create global 'arg'
arg = { select(4, ...) }
-- load Lua code.
if action == 'runfile' then
func = assert(loadfile(action_arg))
-- script name
arg[0] = action_arg
elseif action == 'runstring' then
func = assert(loadstring(action_arg))
-- fake script name
arg[0] = '=(loadstring)'
end
-- run loaded code.
return func(select(4, ...))
]]
local function new_thread(ctx, action, action_arg, ...)
-- convert ZMQ_Ctx to lightuserdata.
ctx = ctx:lightuserdata()
local thread = zmq.ZMQ_Thread(bootstrap_code, action, action_arg, ctx, ...)
return setmetatable({
thread = thread,
}, thread_mt)
end
module(...)
function runfile(ctx, file, ...)
return new_thread(ctx, 'runfile', file, ...)
end
function runstring(ctx, code, ...)
return new_thread(ctx, 'runstring', code, ...)
end
local parent_ctx = nil
function set_parent_ctx(ctx)
parent_ctx = ctx
end
function get_parent_ctx(ctx)
return parent_ctx
end

@ -0,0 +1,205 @@
-- Copyright (c) 2011 by Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local ZMQ_Thread_type = [[
typedef enum {
TSTATE_NONE = 0,
TSTATE_STARTED = 1<<1,
TSTATE_DETACHED = 1<<2,
TSTATE_JOINED = 1<<3,
} ZMQ_TState;
typedef struct ZMQ_Thread {
lua_State *L;
pthread_t thread;
ZMQ_TState state;
int nargs;
int status;
} ZMQ_Thread;
]]
object "ZMQ_Thread" {
sys_include "pthread.h",
c_source(ZMQ_Thread_type),
c_source[[
#include <stdio.h>
/* traceback() function from Lua 5.1.x source. */
static int traceback (lua_State *L) {
if (!lua_isstring(L, 1)) /* 'message' not a string? */
return 1; /* keep it intact */
lua_getfield(L, LUA_GLOBALSINDEX, "debug");
if (!lua_istable(L, -1)) {
lua_pop(L, 1);
return 1;
}
lua_getfield(L, -1, "traceback");
if (!lua_isfunction(L, -1)) {
lua_pop(L, 2);
return 1;
}
lua_pushvalue(L, 1); /* pass error message */
lua_pushinteger(L, 2); /* skip this function and traceback */
lua_call(L, 2, 1); /* call debug.traceback */
return 1;
}
static ZMQ_Thread *thread_new() {
ZMQ_Thread *this;
this = (ZMQ_Thread *)calloc(1, sizeof(ZMQ_Thread));
this->state = TSTATE_NONE;
/* create new lua_State for the thread. */
this->L = luaL_newstate();
/* open standard libraries. */
luaL_openlibs(this->L);
/* push traceback function as first value on stack. */
lua_pushcfunction(this->L, traceback);
return this;
}
static void thread_destroy(ZMQ_Thread *this) {
lua_close(this->L);
free(this);
}
static void *run_child_thread(void *arg) {
ZMQ_Thread *this = (ZMQ_Thread *)arg;
lua_State *L = this->L;
this->status = lua_pcall(L, this->nargs, 0, 1);
if(this->status != 0) {
const char *err_msg = lua_tostring(L, -1);
fprintf(stderr, "%s\n", err_msg);
fflush(stderr);
}
return NULL;
}
static int thread_start(ZMQ_Thread *this, int start_detached) {
int rc;
this->state = TSTATE_STARTED | ((start_detached) ? TSTATE_DETACHED : 0);
rc = pthread_create(&(this->thread), NULL, run_child_thread, this);
if(rc != 0) return rc;
if(start_detached) {
rc = pthread_detach(this->thread);
}
return rc;
}
]],
constructor {
var_in{ "const char *", "bootstrap_code" },
--[[ varargs(...) ]]
c_source "pre" [[
const char *str;
size_t str_len;
int nargs = 0;
int rc;
int top;
int n;
]],
c_source[[
${this} = thread_new();
/* load bootstrap Lua code into child state. */
rc = luaL_loadbuffer(${this}->L, ${bootstrap_code}, ${bootstrap_code_len}, ${bootstrap_code});
if(rc != 0) {
/* copy error message to parent state. */
str = lua_tolstring(${this}->L, -1, &(str_len));
if(str != NULL) {
lua_pushlstring(L, str, str_len);
} else {
/* non-string error message. */
lua_pushfstring(L, "luaL_loadbuffer() failed to luad bootstrap code: rc=%d", rc);
}
thread_destroy(${this});
return lua_error(L);
}
/* copy extra args from main state to child state. */
top = lua_gettop(L);
/* skip the bootstrap code. */
for(n = ${bootstrap_code::idx} + 1; n <= top; n++) {
/* only support string/number/boolean/nil/lightuserdata. */
switch(lua_type(L, n)) {
case LUA_TNIL:
lua_pushnil(${this}->L);
break;
case LUA_TNUMBER:
lua_pushnumber(${this}->L, lua_tonumber(L, n));
break;
case LUA_TBOOLEAN:
lua_pushboolean(${this}->L, lua_toboolean(L, n));
break;
case LUA_TSTRING:
str = lua_tolstring(L, n, &(str_len));
lua_pushlstring(${this}->L, str, str_len);
break;
case LUA_TLIGHTUSERDATA:
lua_pushlightuserdata(${this}->L, lua_touserdata(L, n));
break;
case LUA_TTABLE:
case LUA_TFUNCTION:
case LUA_TUSERDATA:
case LUA_TTHREAD:
default:
return luaL_argerror(L, n,
"Only nil/number/boolean/string/lightuserdata values are supported");
}
++nargs;
}
${this}->nargs = nargs;
]]
},
destructor {
c_source[[
/* We still own the thread object iff the thread was not started or we have joined the thread. */
if(${this}->state == TSTATE_NONE || ${this}->state == TSTATE_JOINED) {
thread_destroy(${this});
}
]]
},
method "start" {
var_in{ "bool", "start_detached", is_optional = true, default = 0 },
var_out{ "int", "rc" },
c_source[[
if(${this}->state != TSTATE_NONE) {
return luaL_error(L, "Thread already started.");
}
${rc} = thread_start(${this}, ${start_detached});
]]
},
method "join" {
var_out{ "int", "rc" },
c_source[[
if((${this}->state & TSTATE_STARTED) == 0) {
return luaL_error(L, "Can't join a thread that hasn't be started.");
}
${rc} = pthread_join(${this}->thread, NULL);
/* now we own the thread object again. */
${this}->state = TSTATE_JOINED;
]]
},
}

@ -140,6 +140,7 @@ subfiles {
"src/msg.nobj.lua",
"src/socket.nobj.lua",
"src/poller.nobj.lua",
"src/thread.nobj.lua",
"src/ctx.nobj.lua",
},

Loading…
Cancel
Save