From 5de9a8d46fa3569de553bdad05cfa41e5725bc52 Mon Sep 17 00:00:00 2001 From: "Robert G. Jakabosky" Date: Thu, 24 Mar 2011 00:06:43 -0700 Subject: [PATCH] Added support for running Lua code in child threads. --- src/pre_generated-zmq.nobj.c | 433 ++++++++++++++++++++++++++++------- src/thread.lua | 96 ++++++++ src/thread.nobj.lua | 205 +++++++++++++++++ zmq.nobj.lua | 1 + 4 files changed, 653 insertions(+), 82 deletions(-) create mode 100644 src/thread.lua create mode 100644 src/thread.nobj.lua diff --git a/src/pre_generated-zmq.nobj.c b/src/pre_generated-zmq.nobj.c index a63735b..74eb214 100644 --- a/src/pre_generated-zmq.nobj.c +++ b/src/pre_generated-zmq.nobj.c @@ -12,6 +12,7 @@ #include #include "zmq.h" +#include #define REG_PACKAGE_IS_CONSTRUCTOR 0 @@ -169,15 +170,23 @@ typedef struct ffi_export_symbol { #define obj_type_ZMQ_Socket_push(L, obj, flags) \ obj_udata_luapush_weak(L, (void *)obj, &(obj_type_ZMQ_Socket), flags) -#define obj_type_id_zmq_poller 2 -#define obj_type_zmq_poller_check(L, _index) \ - (zmq_poller *)obj_simple_udata_luacheck(L, _index, &(obj_type_zmq_poller)) -#define obj_type_zmq_poller_delete(L, _index, flags) \ - (zmq_poller *)obj_simple_udata_luadelete(L, _index, &(obj_type_zmq_poller), flags) -#define obj_type_zmq_poller_push(L, obj, flags) \ - obj_simple_udata_luapush(L, obj, sizeof(zmq_poller), &(obj_type_zmq_poller)) - -#define obj_type_id_ZMQ_Ctx 3 +#define obj_type_id_ZMQ_Poller 2 +#define obj_type_ZMQ_Poller_check(L, _index) \ + (ZMQ_Poller *)obj_simple_udata_luacheck(L, _index, &(obj_type_ZMQ_Poller)) +#define obj_type_ZMQ_Poller_delete(L, _index, flags) \ + (ZMQ_Poller *)obj_simple_udata_luadelete(L, _index, &(obj_type_ZMQ_Poller), flags) +#define obj_type_ZMQ_Poller_push(L, obj, flags) \ + obj_simple_udata_luapush(L, obj, sizeof(ZMQ_Poller), &(obj_type_ZMQ_Poller)) + +#define obj_type_id_ZMQ_Thread 3 +#define obj_type_ZMQ_Thread_check(L, _index) \ + obj_udata_luacheck(L, _index, &(obj_type_ZMQ_Thread)) +#define obj_type_ZMQ_Thread_delete(L, _index, flags) \ + obj_udata_luadelete(L, _index, &(obj_type_ZMQ_Thread), flags) +#define obj_type_ZMQ_Thread_push(L, obj, flags) \ + obj_udata_luapush_weak(L, (void *)obj, &(obj_type_ZMQ_Thread), flags) + +#define obj_type_id_ZMQ_Ctx 4 #define obj_type_ZMQ_Ctx_check(L, _index) \ obj_udata_luacheck(L, _index, &(obj_type_ZMQ_Ctx)) #define obj_type_ZMQ_Ctx_delete(L, _index, flags) \ @@ -194,8 +203,9 @@ static void error_code__ZMQ_Error__push(lua_State *L, ZMQ_Error err); static obj_type obj_type_zmq_msg_t = { NULL, 0, OBJ_TYPE_SIMPLE, "zmq_msg_t" }; static obj_type obj_type_ZMQ_Socket = { NULL, 1, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Socket" }; -static obj_type obj_type_zmq_poller = { NULL, 2, OBJ_TYPE_SIMPLE, "zmq_poller" }; -static obj_type obj_type_ZMQ_Ctx = { NULL, 3, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Ctx" }; +static obj_type obj_type_ZMQ_Poller = { NULL, 2, OBJ_TYPE_SIMPLE, "ZMQ_Poller" }; +static obj_type obj_type_ZMQ_Thread = { NULL, 3, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Thread" }; +static obj_type obj_type_ZMQ_Ctx = { NULL, 4, OBJ_TYPE_FLAG_WEAK_REF, "ZMQ_Ctx" }; #ifndef REG_PACKAGE_IS_CONSTRUCTOR @@ -875,15 +885,15 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "\n" "ZMQ_Error zmq_recv(ZMQ_Socket * this, zmq_msg_t * msg, int flags);\n" "\n" -"typedef int (*poller_find_sock_item_func)(zmq_poller *this, ZMQ_Socket *sock);\n" +"typedef int (*poller_find_sock_item_func)(ZMQ_Poller *this, ZMQ_Socket *sock);\n" "\n" -"typedef int (*poller_find_fd_item_func)(zmq_poller *this, socket_t fd);\n" +"typedef int (*poller_find_fd_item_func)(ZMQ_Poller *this, socket_t fd);\n" "\n" -"typedef int (*poller_get_free_item_func)(zmq_poller *this);\n" +"typedef int (*poller_get_free_item_func)(ZMQ_Poller *this);\n" "\n" -"typedef int (*poller_poll_func)(zmq_poller *this, long timeout);\n" +"typedef int (*poller_poll_func)(ZMQ_Poller *this, long timeout);\n" "\n" -"typedef void (*poller_remove_item_func)(zmq_poller *this, int idx);\n" +"typedef void (*poller_remove_item_func)(ZMQ_Poller *this, int idx);\n" "\n" "typedef struct zmq_pollitem_t {\n" " ZMQ_Socket socket;\n" @@ -894,13 +904,13 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "\n" "int zmq_poll(zmq_pollitem_t *items, int nitems, long timeout);\n" "\n" -"typedef struct zmq_poller {\n" +"typedef struct ZMQ_Poller {\n" " zmq_pollitem_t *items;\n" " int next;\n" " int count;\n" " int free_list;\n" " int len;\n" -"} zmq_poller;\n" +"} ZMQ_Poller;\n" "\n" "typedef void * ZMQ_Ctx;\n" "\n" @@ -974,32 +984,61 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "end\n" "\n" "\n" -"local zmq_poller_pub = _M[\"zmq_poller\"]\n" -"local zmq_poller_mt = _priv[\"zmq_poller\"]\n" -"local zmq_poller_type = obj_type_ptr(zmq_poller_mt[\".type\"])\n" -"local zmq_poller_meth = zmq_poller_mt.__index\n" -"local zmq_poller_objects = setmetatable({}, { __mode = \"k\" })\n" +"local ZMQ_Poller_pub = _M[\"ZMQ_Poller\"]\n" +"local ZMQ_Poller_mt = _priv[\"ZMQ_Poller\"]\n" +"local ZMQ_Poller_type = obj_type_ptr(ZMQ_Poller_mt[\".type\"])\n" +"local ZMQ_Poller_meth = ZMQ_Poller_mt.__index\n" +"local ZMQ_Poller_objects = setmetatable({}, { __mode = \"k\" })\n" +"\n" +"local function obj_type_ZMQ_Poller_check(ud_obj)\n" +" local c_obj = ZMQ_Poller_objects[ud_obj]\n" +" if c_obj == nil then\n" +" -- cdata object not in cache\n" +" c_obj = obj_simple_udata_luacheck(ud_obj, ZMQ_Poller_mt)\n" +" c_obj = ffi.cast(\"ZMQ_Poller *\", c_obj) -- cast from 'void *'\n" +" ZMQ_Poller_objects[ud_obj] = c_obj\n" +" end\n" +" return c_obj\n" +"end\n" +"\n" +"local function obj_type_ZMQ_Poller_delete(ud_obj)\n" +" ZMQ_Poller_objects[ud_obj] = nil\n" +" return obj_simple_udata_luadelete(ud_obj, ZMQ_Poller_mt)\n" +"end\n" +"\n" +"local ZMQ_Poller_sizeof = ffi.sizeof\"ZMQ_Poller\"\n" +"local function obj_type_ZMQ_Poller_push(c_obj)\n" +" local ud_obj, cdata = obj_simple_udata_luapush(c_obj, ZMQ_Poller_sizeof, ZMQ_Poller_mt)\n" +" ZMQ_Poller_objects[ud_obj] = cdata\n" +" return ud_obj\n" +"end\n" +"\n" +"\n" +"local ZMQ_Thread_pub = _M[\"ZMQ_Thread\"]\n" +"local ZMQ_Thread_mt = _priv[\"ZMQ_Thread\"]\n" +"local ZMQ_Thread_type = obj_type_ptr(ZMQ_Thread_mt[\".type\"])\n" +"local ZMQ_Thread_meth = ZMQ_Thread_mt.__index\n" +"local ZMQ_Thread_objects = setmetatable({}, { __mode = \"k\" })\n" "\n" -"local function obj_type_zmq_poller_check(ud_obj)\n" -" local c_obj = zmq_poller_objects[ud_obj]\n" +"local function obj_type_ZMQ_Thread_check(ud_obj)\n" +" local c_obj = ZMQ_Thread_objects[ud_obj]\n" " if c_obj == nil then\n" " -- cdata object not in cache\n" -" c_obj = obj_simple_udata_luacheck(ud_obj, zmq_poller_mt)\n" -" c_obj = ffi.cast(\"zmq_poller *\", c_obj) -- cast from 'void *'\n" -" zmq_poller_objects[ud_obj] = c_obj\n" +" c_obj = obj_udata_luacheck(ud_obj, ZMQ_Thread_mt)\n" +" c_obj = ffi.cast(\"ZMQ_Thread *\", c_obj) -- cast from 'void *'\n" +" ZMQ_Thread_objects[ud_obj] = c_obj\n" " end\n" " return c_obj\n" "end\n" "\n" -"local function obj_type_zmq_poller_delete(ud_obj)\n" -" zmq_poller_objects[ud_obj] = nil\n" -" return obj_simple_udata_luadelete(ud_obj, zmq_poller_mt)\n" +"local function obj_type_ZMQ_Thread_delete(ud_obj)\n" +" ZMQ_Thread_objects[ud_obj] = nil\n" +" return obj_udata_luadelete(ud_obj, ZMQ_Thread_mt)\n" "end\n" "\n" -"local zmq_poller_sizeof = ffi.sizeof\"zmq_poller\"\n" -"local function obj_type_zmq_poller_push(c_obj)\n" -" local ud_obj, cdata = obj_simple_udata_luapush(c_obj, zmq_poller_sizeof, zmq_poller_mt)\n" -" zmq_poller_objects[ud_obj] = cdata\n" +"local function obj_type_ZMQ_Thread_push(c_obj, flags)\n" +" local ud_obj = obj_udata_luapush_weak(c_obj, ZMQ_Thread_mt, ZMQ_Thread_type, flags)\n" +" ZMQ_Thread_objects[ud_obj] = c_obj\n" " return ud_obj\n" "end\n" "\n" @@ -1483,7 +1522,7 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "-- End \"ZMQ_Socket\" FFI interface\n" "\n" "\n" -"-- Start \"zmq_poller\" FFI interface\n" +"-- Start \"ZMQ_Poller\" FFI interface\n" "local poller_find_sock_item = ffi.new(\"poller_find_sock_item_func\", _priv[\"poller_find_sock_item\"])\n" "\n" "local poller_find_fd_item = ffi.new(\"poller_find_fd_item_func\", _priv[\"poller_find_fd_item\"])\n" @@ -1495,8 +1534,8 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" "local poller_remove_item = ffi.new(\"poller_remove_item_func\", _priv[\"poller_remove_item\"])\n" "\n" "-- method: count\n" -"function zmq_poller_meth.count(self)\n" -" local this = obj_type_zmq_poller_check(self)\n" +"function ZMQ_Poller_meth.count(self)\n" +" local this = obj_type_ZMQ_Poller_check(self)\n" " local count\n" " count = this.count;\n" "\n" @@ -1504,7 +1543,11 @@ static const char zmq_ffi_lua_code[] = "-- try loading luajit's ffi\n" " return count\n" "end\n" "\n" -"-- End \"zmq_poller\" FFI interface\n" +"-- End \"ZMQ_Poller\" FFI interface\n" +"\n" +"\n" +"-- Start \"ZMQ_Thread\" FFI interface\n" +"-- End \"ZMQ_Thread\" FFI interface\n" "\n" "\n" "-- Start \"ZMQ_Ctx\" FFI interface\n" @@ -1676,19 +1719,19 @@ static ZMQ_Error simple_zmq_send(ZMQ_Socket sock, const char *data, size_t data_ return err; } -typedef struct zmq_poller { +typedef struct ZMQ_Poller { zmq_pollitem_t *items; int next; int count; int free_list; int len; -} zmq_poller; +} ZMQ_Poller; #define FREE_ITEM_EVENTS_TAG 0xFFFF #define ITEM_TO_INDEX(items, item) (item - (items)) -static int poller_resize_items(zmq_poller *this, int len) { +static int poller_resize_items(ZMQ_Poller *this, int len) { int old_len = this->len; /* make sure new length is atleast as large as items count. */ @@ -1706,7 +1749,7 @@ static int poller_resize_items(zmq_poller *this, int len) { return len; } -static int poller_find_sock_item(zmq_poller *this, ZMQ_Socket *sock) { +static int poller_find_sock_item(ZMQ_Poller *this, ZMQ_Socket *sock) { zmq_pollitem_t *items; int count; int n; @@ -1721,7 +1764,7 @@ static int poller_find_sock_item(zmq_poller *this, ZMQ_Socket *sock) { return -1; } -static int poller_find_fd_item(zmq_poller *this, socket_t fd) { +static int poller_find_fd_item(ZMQ_Poller *this, socket_t fd) { zmq_pollitem_t *items; int count; int n; @@ -1736,7 +1779,7 @@ static int poller_find_fd_item(zmq_poller *this, socket_t fd) { return -1; } -static void poller_remove_item(zmq_poller *this, int idx) { +static void poller_remove_item(ZMQ_Poller *this, int idx) { zmq_pollitem_t *items; int free_list; int count; @@ -1761,7 +1804,7 @@ static void poller_remove_item(zmq_poller *this, int idx) { items[idx].events = FREE_ITEM_EVENTS_TAG; } -static int poller_get_free_item(zmq_poller *this) { +static int poller_get_free_item(ZMQ_Poller *this) { zmq_pollitem_t *curr; zmq_pollitem_t *next; int count; @@ -1798,7 +1841,7 @@ static int poller_get_free_item(zmq_poller *this) { return idx; } -static int poller_compact_items(zmq_poller *this) { +static int poller_compact_items(ZMQ_Poller *this) { zmq_pollitem_t *items; int count; int old_count; @@ -1837,7 +1880,7 @@ static int poller_compact_items(zmq_poller *this) { return count; } -static int poller_poll(zmq_poller *this, long timeout) { +static int poller_poll(ZMQ_Poller *this, long timeout) { int count; /* remove free slots from items list. */ count = poller_compact_items(this); @@ -1846,6 +1889,93 @@ static int poller_poll(zmq_poller *this, long timeout) { } +typedef enum { + TSTATE_NONE = 0, + TSTATE_STARTED = 1<<1, + TSTATE_DETACHED = 1<<2, + TSTATE_JOINED = 1<<3, +} ZMQ_TState; + +typedef struct ZMQ_Thread { + lua_State *L; + pthread_t thread; + ZMQ_TState state; + int nargs; + int status; +} ZMQ_Thread; + + + +#include + +/* traceback() function from Lua 5.1.x source. */ +static int traceback (lua_State *L) { + if (!lua_isstring(L, 1)) /* 'message' not a string? */ + return 1; /* keep it intact */ + lua_getfield(L, LUA_GLOBALSINDEX, "debug"); + if (!lua_istable(L, -1)) { + lua_pop(L, 1); + return 1; + } + lua_getfield(L, -1, "traceback"); + if (!lua_isfunction(L, -1)) { + lua_pop(L, 2); + return 1; + } + lua_pushvalue(L, 1); /* pass error message */ + lua_pushinteger(L, 2); /* skip this function and traceback */ + lua_call(L, 2, 1); /* call debug.traceback */ + return 1; +} + +static ZMQ_Thread *thread_new() { + ZMQ_Thread *this; + + this = (ZMQ_Thread *)calloc(1, sizeof(ZMQ_Thread)); + this->state = TSTATE_NONE; + /* create new lua_State for the thread. */ + this->L = luaL_newstate(); + /* open standard libraries. */ + luaL_openlibs(this->L); + /* push traceback function as first value on stack. */ + lua_pushcfunction(this->L, traceback); + + return this; +} + +static void thread_destroy(ZMQ_Thread *this) { + lua_close(this->L); + free(this); +} + +static void *run_child_thread(void *arg) { + ZMQ_Thread *this = (ZMQ_Thread *)arg; + lua_State *L = this->L; + + this->status = lua_pcall(L, this->nargs, 0, 1); + + if(this->status != 0) { + const char *err_msg = lua_tostring(L, -1); + fprintf(stderr, "%s\n", err_msg); + fflush(stderr); + } + + return NULL; +} + +static int thread_start(ZMQ_Thread *this, int start_detached) { + int rc; + + this->state = TSTATE_STARTED | ((start_detached) ? TSTATE_DETACHED : 0); + rc = pthread_create(&(this->thread), NULL, run_child_thread, this); + if(rc != 0) return rc; + if(start_detached) { + rc = pthread_detach(this->thread); + } + return rc; +} + + typedef void * ZMQ_Ctx; #define OBJ_UDATA_CTX_SHOULD_FREE (OBJ_UDATA_LAST_FLAG << 1) @@ -2417,11 +2547,11 @@ static int ZMQ_Socket__recv__meth(lua_State *L) { } /* method: new */ -static int zmq_poller__new__meth(lua_State *L) { +static int ZMQ_Poller__new__meth(lua_State *L) { unsigned int length = luaL_optinteger(L,1,10); int this_flags = OBJ_UDATA_FLAG_OWN; - zmq_poller * this; - zmq_poller poller; + ZMQ_Poller * this; + ZMQ_Poller poller; this = &poller; this->items = (zmq_pollitem_t *)calloc(length, sizeof(zmq_pollitem_t)); this->next = -1; @@ -2429,14 +2559,14 @@ static int zmq_poller__new__meth(lua_State *L) { this->len = length; this->free_list = -1; - obj_type_zmq_poller_push(L, this, this_flags); + obj_type_ZMQ_Poller_push(L, this, this_flags); return 1; } /* method: close */ -static int zmq_poller__close__meth(lua_State *L) { +static int ZMQ_Poller__close__meth(lua_State *L) { int this_flags = 0; - zmq_poller * this = obj_type_zmq_poller_delete(L,1,&(this_flags)); + ZMQ_Poller * this = obj_type_ZMQ_Poller_delete(L,1,&(this_flags)); if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; } free(this->items); this->items = NULL; @@ -2449,8 +2579,8 @@ static int zmq_poller__close__meth(lua_State *L) { } /* method: add */ -static int zmq_poller__add__meth(lua_State *L) { - zmq_poller * this = obj_type_zmq_poller_check(L,1); +static int ZMQ_Poller__add__meth(lua_State *L) { + ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1); short events = luaL_checkinteger(L,3); int idx = 0; zmq_pollitem_t *item; @@ -2475,8 +2605,8 @@ static int zmq_poller__add__meth(lua_State *L) { } /* method: modify */ -static int zmq_poller__modify__meth(lua_State *L) { - zmq_poller * this = obj_type_zmq_poller_check(L,1); +static int ZMQ_Poller__modify__meth(lua_State *L) { + ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1); short events = luaL_checkinteger(L,3); int idx = 0; zmq_pollitem_t *item; @@ -2513,8 +2643,8 @@ static int zmq_poller__modify__meth(lua_State *L) { } /* method: remove */ -static int zmq_poller__remove__meth(lua_State *L) { - zmq_poller * this = obj_type_zmq_poller_check(L,1); +static int ZMQ_Poller__remove__meth(lua_State *L) { + ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1); ZMQ_Socket *sock; socket_t fd; int idx; @@ -2540,8 +2670,8 @@ static int zmq_poller__remove__meth(lua_State *L) { } /* method: poll */ -static int zmq_poller__poll__meth(lua_State *L) { - zmq_poller * this = obj_type_zmq_poller_check(L,1); +static int ZMQ_Poller__poll__meth(lua_State *L) { + ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1); long timeout = luaL_checkinteger(L,2); ZMQ_Error err = 0; /* poll for events */ @@ -2564,8 +2694,8 @@ static int zmq_poller__poll__meth(lua_State *L) { } /* method: next_revents */ -static int zmq_poller__next_revents__meth(lua_State *L) { - zmq_poller * this = obj_type_zmq_poller_check(L,1); +static int ZMQ_Poller__next_revents__meth(lua_State *L) { + ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1); short revents = 0; zmq_pollitem_t *items; int count; @@ -2603,8 +2733,8 @@ static int zmq_poller__next_revents__meth(lua_State *L) { } /* method: count */ -static int zmq_poller__count__meth(lua_State *L) { - zmq_poller * this = obj_type_zmq_poller_check(L,1); +static int ZMQ_Poller__count__meth(lua_State *L) { + ZMQ_Poller * this = obj_type_ZMQ_Poller_check(L,1); int count = 0; count = this->count; @@ -2612,6 +2742,114 @@ static int zmq_poller__count__meth(lua_State *L) { return 1; } +/* method: new */ +static int ZMQ_Thread__new__meth(lua_State *L) { + size_t bootstrap_code_len; + const char * bootstrap_code = luaL_checklstring(L,1,&(bootstrap_code_len)); + int this_flags = OBJ_UDATA_FLAG_OWN; + ZMQ_Thread * this; + const char *str; + size_t str_len; + int nargs = 0; + int rc; + int top; + int n; + + this = thread_new(); + /* load bootstrap Lua code into child state. */ + rc = luaL_loadbuffer(this->L, bootstrap_code, bootstrap_code_len, bootstrap_code); + if(rc != 0) { + /* copy error message to parent state. */ + str = lua_tolstring(this->L, -1, &(str_len)); + if(str != NULL) { + lua_pushlstring(L, str, str_len); + } else { + /* non-string error message. */ + lua_pushfstring(L, "luaL_loadbuffer() failed to luad bootstrap code: rc=%d", rc); + } + thread_destroy(this); + return lua_error(L); + } + /* copy extra args from main state to child state. */ + top = lua_gettop(L); + /* skip the bootstrap code. */ + for(n = 1 + 1; n <= top; n++) { + /* only support string/number/boolean/nil/lightuserdata. */ + switch(lua_type(L, n)) { + case LUA_TNIL: + lua_pushnil(this->L); + break; + case LUA_TNUMBER: + lua_pushnumber(this->L, lua_tonumber(L, n)); + break; + case LUA_TBOOLEAN: + lua_pushboolean(this->L, lua_toboolean(L, n)); + break; + case LUA_TSTRING: + str = lua_tolstring(L, n, &(str_len)); + lua_pushlstring(this->L, str, str_len); + break; + case LUA_TLIGHTUSERDATA: + lua_pushlightuserdata(this->L, lua_touserdata(L, n)); + break; + case LUA_TTABLE: + case LUA_TFUNCTION: + case LUA_TUSERDATA: + case LUA_TTHREAD: + default: + return luaL_argerror(L, n, + "Only nil/number/boolean/string/lightuserdata values are supported"); + } + ++nargs; + } + this->nargs = nargs; + + obj_type_ZMQ_Thread_push(L, this, this_flags); + return 1; +} + +/* method: delete */ +static int ZMQ_Thread__delete__meth(lua_State *L) { + int this_flags = 0; + ZMQ_Thread * this = obj_type_ZMQ_Thread_delete(L,1,&(this_flags)); + if(!(this_flags & OBJ_UDATA_FLAG_OWN)) { return 0; } + /* We still own the thread object iff the thread was not started or we have joined the thread. */ + if(this->state == TSTATE_NONE || this->state == TSTATE_JOINED) { + thread_destroy(this); + } + + return 0; +} + +/* method: start */ +static int ZMQ_Thread__start__meth(lua_State *L) { + ZMQ_Thread * this = obj_type_ZMQ_Thread_check(L,1); + bool start_detached = lua_toboolean(L,2); + int rc = 0; + if(this->state != TSTATE_NONE) { + return luaL_error(L, "Thread already started."); + } + rc = thread_start(this, start_detached); + + lua_pushinteger(L, rc); + return 1; +} + +/* method: join */ +static int ZMQ_Thread__join__meth(lua_State *L) { + ZMQ_Thread * this = obj_type_ZMQ_Thread_check(L,1); + int rc = 0; + if((this->state & TSTATE_STARTED) == 0) { + return luaL_error(L, "Can't join a thread that hasn't be started."); + } + rc = pthread_join(this->thread, NULL); + /* now we own the thread object again. */ + this->state = TSTATE_JOINED; + + lua_pushinteger(L, rc); + return 1; +} + /* method: delete */ static int ZMQ_Ctx__delete__meth(lua_State *L) { int this_flags = 0; @@ -2822,38 +3060,68 @@ static const obj_const obj_ZMQ_Socket_constants[] = { {NULL, NULL, 0.0 , 0} }; -static const luaL_reg obj_zmq_poller_pub_funcs[] = { - {"new", zmq_poller__new__meth}, +static const luaL_reg obj_ZMQ_Poller_pub_funcs[] = { + {"new", ZMQ_Poller__new__meth}, {NULL, NULL} }; -static const luaL_reg obj_zmq_poller_methods[] = { - {"close", zmq_poller__close__meth}, - {"add", zmq_poller__add__meth}, - {"modify", zmq_poller__modify__meth}, - {"remove", zmq_poller__remove__meth}, - {"poll", zmq_poller__poll__meth}, - {"next_revents", zmq_poller__next_revents__meth}, - {"count", zmq_poller__count__meth}, +static const luaL_reg obj_ZMQ_Poller_methods[] = { + {"close", ZMQ_Poller__close__meth}, + {"add", ZMQ_Poller__add__meth}, + {"modify", ZMQ_Poller__modify__meth}, + {"remove", ZMQ_Poller__remove__meth}, + {"poll", ZMQ_Poller__poll__meth}, + {"next_revents", ZMQ_Poller__next_revents__meth}, + {"count", ZMQ_Poller__count__meth}, {NULL, NULL} }; -static const luaL_reg obj_zmq_poller_metas[] = { - {"__gc", zmq_poller__close__meth}, +static const luaL_reg obj_ZMQ_Poller_metas[] = { + {"__gc", ZMQ_Poller__close__meth}, {"__tostring", obj_simple_udata_default_tostring}, {"__eq", obj_simple_udata_default_equal}, {NULL, NULL} }; -static const obj_base obj_zmq_poller_bases[] = { +static const obj_base obj_ZMQ_Poller_bases[] = { + {-1, NULL} +}; + +static const obj_field obj_ZMQ_Poller_fields[] = { + {NULL, 0, 0, 0} +}; + +static const obj_const obj_ZMQ_Poller_constants[] = { + {NULL, NULL, 0.0 , 0} +}; + +static const luaL_reg obj_ZMQ_Thread_pub_funcs[] = { + {"new", ZMQ_Thread__new__meth}, + {NULL, NULL} +}; + +static const luaL_reg obj_ZMQ_Thread_methods[] = { + {"start", ZMQ_Thread__start__meth}, + {"join", ZMQ_Thread__join__meth}, + {NULL, NULL} +}; + +static const luaL_reg obj_ZMQ_Thread_metas[] = { + {"__gc", ZMQ_Thread__delete__meth}, + {"__tostring", obj_udata_default_tostring}, + {"__eq", obj_udata_default_equal}, + {NULL, NULL} +}; + +static const obj_base obj_ZMQ_Thread_bases[] = { {-1, NULL} }; -static const obj_field obj_zmq_poller_fields[] = { +static const obj_field obj_ZMQ_Thread_fields[] = { {NULL, 0, 0, 0} }; -static const obj_const obj_zmq_poller_constants[] = { +static const obj_const obj_ZMQ_Thread_constants[] = { {NULL, NULL, 0.0 , 0} }; @@ -2956,7 +3224,8 @@ static const ffi_export_symbol zmq_ffi_export[] = { static const reg_sub_module reg_sub_modules[] = { { &(obj_type_zmq_msg_t), 0, obj_zmq_msg_t_pub_funcs, obj_zmq_msg_t_methods, obj_zmq_msg_t_metas, obj_zmq_msg_t_bases, obj_zmq_msg_t_fields, obj_zmq_msg_t_constants}, { &(obj_type_ZMQ_Socket), 0, obj_ZMQ_Socket_pub_funcs, obj_ZMQ_Socket_methods, obj_ZMQ_Socket_metas, obj_ZMQ_Socket_bases, obj_ZMQ_Socket_fields, obj_ZMQ_Socket_constants}, - { &(obj_type_zmq_poller), 0, obj_zmq_poller_pub_funcs, obj_zmq_poller_methods, obj_zmq_poller_metas, obj_zmq_poller_bases, obj_zmq_poller_fields, obj_zmq_poller_constants}, + { &(obj_type_ZMQ_Poller), 0, obj_ZMQ_Poller_pub_funcs, obj_ZMQ_Poller_methods, obj_ZMQ_Poller_metas, obj_ZMQ_Poller_bases, obj_ZMQ_Poller_fields, obj_ZMQ_Poller_constants}, + { &(obj_type_ZMQ_Thread), 0, obj_ZMQ_Thread_pub_funcs, obj_ZMQ_Thread_methods, obj_ZMQ_Thread_metas, obj_ZMQ_Thread_bases, obj_ZMQ_Thread_fields, obj_ZMQ_Thread_constants}, { &(obj_type_ZMQ_Ctx), 0, obj_ZMQ_Ctx_pub_funcs, obj_ZMQ_Ctx_methods, obj_ZMQ_Ctx_metas, obj_ZMQ_Ctx_bases, obj_ZMQ_Ctx_fields, obj_ZMQ_Ctx_constants}, {NULL, 0, NULL, NULL, NULL, NULL, NULL, NULL} }; diff --git a/src/thread.lua b/src/thread.lua new file mode 100644 index 0000000..30cae80 --- /dev/null +++ b/src/thread.lua @@ -0,0 +1,96 @@ +-- Copyright (c) 2011 by Robert G. Jakabosky +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +-- +-- zmq.thread wraps the low-level zmq.ZMQ_Thread object. +-- + +local zmq = require"zmq" + +local setmetatable = setmetatable +local tonumber = tonumber +local assert = assert + +local thread_mt = {} +thread_mt.__index = thread_mt + +function thread_mt:start(detached) + return self.thread:start(detached) +end + +function thread_mt:join() + return self.thread:join() +end + +local bootstrap_code = [[ +local action, action_arg, parent_ctx = ... +local func + +-- copy parent ZeroMQ context to this child thread. +local zmq = require"zmq" +local zmq_thread = require"zmq.thread" +zmq_thread.set_parent_ctx(zmq.init_ctx(parent_ctx)) + +-- create global 'arg' +arg = { select(4, ...) } + +-- load Lua code. +if action == 'runfile' then + func = assert(loadfile(action_arg)) + -- script name + arg[0] = action_arg +elseif action == 'runstring' then + func = assert(loadstring(action_arg)) + -- fake script name + arg[0] = '=(loadstring)' +end + +-- run loaded code. +return func(select(4, ...)) +]] + +local function new_thread(ctx, action, action_arg, ...) + -- convert ZMQ_Ctx to lightuserdata. + ctx = ctx:lightuserdata() + local thread = zmq.ZMQ_Thread(bootstrap_code, action, action_arg, ctx, ...) + return setmetatable({ + thread = thread, + }, thread_mt) +end + +module(...) + +function runfile(ctx, file, ...) + return new_thread(ctx, 'runfile', file, ...) +end + +function runstring(ctx, code, ...) + return new_thread(ctx, 'runstring', code, ...) +end + +local parent_ctx = nil +function set_parent_ctx(ctx) + parent_ctx = ctx +end + +function get_parent_ctx(ctx) + return parent_ctx +end + diff --git a/src/thread.nobj.lua b/src/thread.nobj.lua new file mode 100644 index 0000000..ac39369 --- /dev/null +++ b/src/thread.nobj.lua @@ -0,0 +1,205 @@ +-- Copyright (c) 2011 by Robert G. Jakabosky +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +local ZMQ_Thread_type = [[ +typedef enum { + TSTATE_NONE = 0, + TSTATE_STARTED = 1<<1, + TSTATE_DETACHED = 1<<2, + TSTATE_JOINED = 1<<3, +} ZMQ_TState; + +typedef struct ZMQ_Thread { + lua_State *L; + pthread_t thread; + ZMQ_TState state; + int nargs; + int status; +} ZMQ_Thread; + +]] +object "ZMQ_Thread" { + sys_include "pthread.h", + c_source(ZMQ_Thread_type), + c_source[[ + +#include + +/* traceback() function from Lua 5.1.x source. */ +static int traceback (lua_State *L) { + if (!lua_isstring(L, 1)) /* 'message' not a string? */ + return 1; /* keep it intact */ + lua_getfield(L, LUA_GLOBALSINDEX, "debug"); + if (!lua_istable(L, -1)) { + lua_pop(L, 1); + return 1; + } + lua_getfield(L, -1, "traceback"); + if (!lua_isfunction(L, -1)) { + lua_pop(L, 2); + return 1; + } + lua_pushvalue(L, 1); /* pass error message */ + lua_pushinteger(L, 2); /* skip this function and traceback */ + lua_call(L, 2, 1); /* call debug.traceback */ + return 1; +} + +static ZMQ_Thread *thread_new() { + ZMQ_Thread *this; + + this = (ZMQ_Thread *)calloc(1, sizeof(ZMQ_Thread)); + this->state = TSTATE_NONE; + /* create new lua_State for the thread. */ + this->L = luaL_newstate(); + /* open standard libraries. */ + luaL_openlibs(this->L); + /* push traceback function as first value on stack. */ + lua_pushcfunction(this->L, traceback); + + return this; +} + +static void thread_destroy(ZMQ_Thread *this) { + lua_close(this->L); + free(this); +} + +static void *run_child_thread(void *arg) { + ZMQ_Thread *this = (ZMQ_Thread *)arg; + lua_State *L = this->L; + + this->status = lua_pcall(L, this->nargs, 0, 1); + + if(this->status != 0) { + const char *err_msg = lua_tostring(L, -1); + fprintf(stderr, "%s\n", err_msg); + fflush(stderr); + } + + return NULL; +} + +static int thread_start(ZMQ_Thread *this, int start_detached) { + int rc; + + this->state = TSTATE_STARTED | ((start_detached) ? TSTATE_DETACHED : 0); + rc = pthread_create(&(this->thread), NULL, run_child_thread, this); + if(rc != 0) return rc; + if(start_detached) { + rc = pthread_detach(this->thread); + } + return rc; +} + +]], + constructor { + var_in{ "const char *", "bootstrap_code" }, + --[[ varargs(...) ]] + c_source "pre" [[ + const char *str; + size_t str_len; + int nargs = 0; + int rc; + int top; + int n; +]], + c_source[[ + ${this} = thread_new(); + /* load bootstrap Lua code into child state. */ + rc = luaL_loadbuffer(${this}->L, ${bootstrap_code}, ${bootstrap_code_len}, ${bootstrap_code}); + if(rc != 0) { + /* copy error message to parent state. */ + str = lua_tolstring(${this}->L, -1, &(str_len)); + if(str != NULL) { + lua_pushlstring(L, str, str_len); + } else { + /* non-string error message. */ + lua_pushfstring(L, "luaL_loadbuffer() failed to luad bootstrap code: rc=%d", rc); + } + thread_destroy(${this}); + return lua_error(L); + } + /* copy extra args from main state to child state. */ + top = lua_gettop(L); + /* skip the bootstrap code. */ + for(n = ${bootstrap_code::idx} + 1; n <= top; n++) { + /* only support string/number/boolean/nil/lightuserdata. */ + switch(lua_type(L, n)) { + case LUA_TNIL: + lua_pushnil(${this}->L); + break; + case LUA_TNUMBER: + lua_pushnumber(${this}->L, lua_tonumber(L, n)); + break; + case LUA_TBOOLEAN: + lua_pushboolean(${this}->L, lua_toboolean(L, n)); + break; + case LUA_TSTRING: + str = lua_tolstring(L, n, &(str_len)); + lua_pushlstring(${this}->L, str, str_len); + break; + case LUA_TLIGHTUSERDATA: + lua_pushlightuserdata(${this}->L, lua_touserdata(L, n)); + break; + case LUA_TTABLE: + case LUA_TFUNCTION: + case LUA_TUSERDATA: + case LUA_TTHREAD: + default: + return luaL_argerror(L, n, + "Only nil/number/boolean/string/lightuserdata values are supported"); + } + ++nargs; + } + ${this}->nargs = nargs; +]] + }, + destructor { + c_source[[ + /* We still own the thread object iff the thread was not started or we have joined the thread. */ + if(${this}->state == TSTATE_NONE || ${this}->state == TSTATE_JOINED) { + thread_destroy(${this}); + } +]] + }, + method "start" { + var_in{ "bool", "start_detached", is_optional = true, default = 0 }, + var_out{ "int", "rc" }, + c_source[[ + if(${this}->state != TSTATE_NONE) { + return luaL_error(L, "Thread already started."); + } + ${rc} = thread_start(${this}, ${start_detached}); +]] + }, + method "join" { + var_out{ "int", "rc" }, + c_source[[ + if((${this}->state & TSTATE_STARTED) == 0) { + return luaL_error(L, "Can't join a thread that hasn't be started."); + } + ${rc} = pthread_join(${this}->thread, NULL); + /* now we own the thread object again. */ + ${this}->state = TSTATE_JOINED; +]] + }, +} + diff --git a/zmq.nobj.lua b/zmq.nobj.lua index cd0cdba..790ef10 100644 --- a/zmq.nobj.lua +++ b/zmq.nobj.lua @@ -140,6 +140,7 @@ subfiles { "src/msg.nobj.lua", "src/socket.nobj.lua", "src/poller.nobj.lua", +"src/thread.nobj.lua", "src/ctx.nobj.lua", },