diff --git a/API.md b/API.md index 4d36aad..5134fb9 100644 --- a/API.md +++ b/API.md @@ -1,8 +1,8 @@ ## constants -ZMQ_CONSTANT_NAME in the C API corresponds to zmq.CONSTANT_NAME in Lua. +ZMQ_CONSTANT_NAME in the C API turns into zmq.CONSTANT_NAME in Lua. -## zmq.init +## init Initialises ØMQ context. See [zmq_init(3)](http://api.zeromq.org/zmq_init.3.html). @@ -10,67 +10,67 @@ See [zmq_init(3)](http://api.zeromq.org/zmq_init.3.html). zmq.init(app_threads, io_threads) zmq.init(app_threads, io_threads, flags) -## zmq.term +## term Terminates ØMQ context. See [zmq_term(3)](http://api.zeromq.org/zmq_term.3.html). -zmq.term(context) +ctx:term() -## zmq.socket +## socket Creates ØMQ socket. See [zmq_socket(3)](http://api.zeromq.org/zmq_socket.3.html). -zmq.socket(context, type) +ctx:socket(type) -## zmq.close +## close Destroys ØMQ socket. See [zmq_close(3)](http://api.zeromq.org/zmq_close.3.html). -zmq.close(socket) +s:close() -## zmq.setsockopt +## setsockopt Sets a specified option on a ØMQ socket. See [zmq_setsockopt(3)](http://api.zeromq.org/zmq_setsockopt.3.html). -zmq.setsockopt(socket, option, optval) +s:setsockopt(option, optval) -## zmq.bind +## bind Binds the socket to the specified address. See [zmq_bind(3)](http://api.zeromq.org/zmq_bind.3.html). -zmq.bind(socket, addr) +s:bind(addr) -## zmq.connect +## connect Connect the socket to the specified address. See [zmq_connect(3)](http://api.zeromq.org/zmq_connect.3.html). -zmq.connect(socket, addr) +s:connect(addr) -## zmq.send +## send Sends a message. See [zmq_send(3)](http://api.zeromq.org/zmq_send.3.html). -zmq.send(socket, msg) -zmq.send(socket, msg, flags) +s:send(msg) +s:send(msg, flags) -## zmq.flush +## flush Flushes unflushed messages to the socket. See [zmq_flush(3)](http://api.zeromq.org/zmq_flush.3.html). -zmq.flush(socket) +s:flush() -## zmq.recv +## recv Retrieves a message from the socket. See [zmq_recv(3)](http://api.zeromq.org/zmq_recv.3.html). -zmq.recv(socket) -zmq.recv(socket, flags) +s:recv() +s:recv(flags) diff --git a/examples/client.lua b/examples/client.lua index d472552..ecc1c6e 100644 --- a/examples/client.lua +++ b/examples/client.lua @@ -21,9 +21,12 @@ require("zmq") local ctx = zmq.init(1, 1) -local s = zmq.socket(ctx, zmq.REQ) -zmq.connect(s, "tcp://localhost:5555") -zmq.send(s, "SELECT * FROM mytable") -print(zmq.recv(s)) -zmq.close(s) -zmq.term(ctx) +local s = ctx:socket(zmq.REQ) + +s:connect("tcp://localhost:5555") + +s:send("SELECT * FROM mytable") +print(s:recv()) + +s:close() +ctx:term() diff --git a/examples/publiser.lua b/examples/publiser.lua index 6b56ec3..fe93595 100644 --- a/examples/publiser.lua +++ b/examples/publiser.lua @@ -21,10 +21,12 @@ require("zmq") local ctx = zmq.init(1, 1, 0) -local s = zmq.socket(ctx, zmq.PUB) -zmq.bind(s, "tcp://lo:5555") +local s = ctx:socket(zmq.PUB) + +s:bind("tcp://lo:5555") + local msg_id = 1 while true do - zmq.send(s, tostring(msg_id)) + s:send(tostring(msg_id)) msg_id = msg_id + 1 end diff --git a/examples/server.lua b/examples/server.lua index cc2e587..c1a0a49 100644 --- a/examples/server.lua +++ b/examples/server.lua @@ -21,9 +21,11 @@ require("zmq") local ctx = zmq.init(1, 1) -local s = zmq.socket(ctx, zmq.REP) -zmq.bind(s, "tcp://lo:5555") +local s = ctx:socket(zmq.REP) + +s:bind("tcp://lo:5555") + while true do - print(string.format("Received query: '%s'", zmq.recv(s))) - zmq.send(s, "OK") + print(string.format("Received query: '%s'", s:recv())) + s:send("OK") end diff --git a/examples/subscriber.lua b/examples/subscriber.lua index fb7abf6..d587a84 100644 --- a/examples/subscriber.lua +++ b/examples/subscriber.lua @@ -21,11 +21,11 @@ require("zmq") local ctx = zmq.init(1, 1, 0) -local s = zmq.socket(ctx, zmq.SUB) -zmq.setsockopt(s, zmq.SUBSCRIBE, "") -zmq.connect(s, "tcp://localhost:5555") +local s = ctx:socket(zmq.SUB) +s:setsockopt(zmq.SUBSCRIBE, "") +s:connect("tcp://localhost:5555") while true do - local msg = zmq.recv(s) + local msg = s:recv() local msg_id = tonumber(msg) if math.mod(msg_id, 10000) == 0 then print(msg_id) end end diff --git a/perf/local_lat.lua b/perf/local_lat.lua index d001073..9294aaa 100644 --- a/perf/local_lat.lua +++ b/perf/local_lat.lua @@ -20,24 +20,25 @@ require("zmq") -if not arg[2] then - print("usage: lua local_lat.lua ") +if not arg[3] then + print("usage: lua local_lat.lua ") os.exit() end local bind_to = arg[1] -local roundtrip_count = tonumber(arg[2]) +local message_size = tonumber(arg[2]) +local roundtrip_count = tonumber(arg[3]) local ctx = zmq.init(1, 1) -local s = zmq.socket(ctx, zmq.REP) -zmq.bind(s, bind_to) +local s = ctx:socket(zmq.REP) +s:bind(bind_to) local msg for i = 1, roundtrip_count do - msg = zmq.recv(s) - zmq.send(s, msg) + msg = s:recv() + s:send(msg) end -zmq.close(s) -zmq.term(ctx) +s:close() +ctx:term() diff --git a/perf/local_thr.lua b/perf/local_thr.lua index 8b62f51..246236a 100644 --- a/perf/local_thr.lua +++ b/perf/local_thr.lua @@ -30,22 +30,22 @@ local message_size = tonumber(arg[2]) local message_count = tonumber(arg[3]) local ctx = zmq.init(1, 1) -local s = zmq.socket(ctx, zmq.SUB) -zmq.setsockopt(s, zmq.SUBSCRIBE, ""); -zmq.bind(s, bind_to) +local s = ctx:socket(zmq.SUB) +s:setsockopt(zmq.SUBSCRIBE, ""); +s:bind(bind_to) -local msg = zmq.recv(s) +local msg = s:recv() local start_time = os.time() for i = 1, message_count - 1 do - msg = zmq.recv(s) + msg = s:recv() end local end_time = os.time() -zmq.close(s) -zmq.term(ctx) +s:close() +ctx:term() local elapsed = os.difftime(end_time, start_time) if elapsed == 0 then elapsed = 1 end diff --git a/perf/remote_lat.lua b/perf/remote_lat.lua index 7d070e8..1d5803c 100644 --- a/perf/remote_lat.lua +++ b/perf/remote_lat.lua @@ -30,8 +30,8 @@ local message_size = tonumber(arg[2]) local roundtrip_count = tonumber(arg[3]) local ctx = zmq.init(1, 1) -local s = zmq.socket(ctx, zmq.REQ) -zmq.connect(s, connect_to) +local s = ctx:socket(zmq.REQ) +s:connect(connect_to) local msg = "" for i = 1, message_size do msg = msg .. "0" end @@ -39,14 +39,14 @@ for i = 1, message_size do msg = msg .. "0" end local start_time = os.time() for i = 1, roundtrip_count do - zmq.send(s, msg) - msg = zmq.recv(s) + s:send(msg) + msg = s:recv() end local end_time = os.time() -zmq.close(s) -zmq.term(ctx) +s:close() +ctx:term() local elapsed = os.difftime(end_time, start_time) local latency = elapsed * 1000000 / roundtrip_count / 2 diff --git a/perf/remote_thr.lua b/perf/remote_thr.lua index aa4e171..9333944 100644 --- a/perf/remote_thr.lua +++ b/perf/remote_thr.lua @@ -30,17 +30,17 @@ local message_size = tonumber(arg[2]) local message_count = tonumber(arg[3]) local ctx = zmq.init(1, 1) -local s = zmq.socket(ctx, zmq.PUB) -zmq.connect(s, connect_to) +local s = ctx:socket(zmq.PUB) +s:connect(connect_to) local msg = "" for i = 1, message_size do msg = msg .. "0" end for i = 1, message_count do - zmq.send(s, msg) + s:send(msg) end os.execute("sleep " .. 10) -zmq.close(s) -zmq.term(ctx) +s:close() +ctx:term() diff --git a/zmq.c b/zmq.c index 01a0ac7..a3be4b2 100644 --- a/zmq.c +++ b/zmq.c @@ -30,55 +30,67 @@ #include #include +#define MT_ZMQ_CONTEXT "MT_ZMQ_CONTEXT" +#define MT_ZMQ_SOCKET "MT_ZMQ_SOCKET" + +typedef struct { void *ptr; } zmq_ptr; + static int Lzmq_init(lua_State *L) { int app_threads = luaL_checkint(L, 1); int io_threads = luaL_checkint(L, 2); int flags = luaL_optint(L, 3, 0); - void *ctx = zmq_init(app_threads, io_threads, flags); - if (!ctx) { + zmq_ptr *ctx = lua_newuserdata(L, sizeof(zmq_ptr)); + + ctx->ptr = zmq_init(app_threads, io_threads, flags); + + if (!ctx->ptr) { return luaL_error(L, zmq_strerror(errno)); } - lua_pushlightuserdata(L, ctx); + + luaL_getmetatable(L, MT_ZMQ_CONTEXT); + lua_setmetatable(L, -2); + return 1; } static int Lzmq_term(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *ctx = lua_touserdata(L, 1); - - assert(zmq_term(ctx) == 0); + zmq_ptr *ctx = luaL_checkudata(L, 1, MT_ZMQ_CONTEXT); + assert(zmq_term(ctx->ptr) == 0); return 0; } static int Lzmq_socket(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *ctx = lua_touserdata(L, 1); + zmq_ptr *ctx = luaL_checkudata(L, 1, MT_ZMQ_CONTEXT); int type = luaL_checkint(L, 2); - void *s = zmq_socket(ctx, type); - if (!s) { + zmq_ptr *s = lua_newuserdata(L, sizeof(zmq_ptr)); + + s->ptr = zmq_socket(ctx->ptr, type); + + if (!s->ptr) { return luaL_error(L, zmq_strerror(errno)); } - lua_pushlightuserdata(L, s); + + luaL_getmetatable(L, MT_ZMQ_SOCKET); + lua_setmetatable(L, -2); + return 1; } static int Lzmq_close(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *s = lua_touserdata(L, 1); - assert(zmq_close(s) == 0); + zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET); + assert(zmq_close(s->ptr) == 0); return 0; } static int Lzmq_setsockopt(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *s = lua_touserdata(L, 1); + zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET); int option = luaL_checkint(L, 2); int rc = 0; @@ -90,7 +102,7 @@ static int Lzmq_setsockopt(lua_State *L) case ZMQ_AFFINITY: { int64_t optval = (int64_t) luaL_checklong(L, 3); - rc = zmq_setsockopt(s, option, (void *) &optval, sizeof(int64_t)); + rc = zmq_setsockopt(s->ptr, option, (void *) &optval, sizeof(int64_t)); } break; case ZMQ_IDENTITY: @@ -99,7 +111,7 @@ static int Lzmq_setsockopt(lua_State *L) { size_t optvallen; const char *optval = luaL_checklstring(L, 3, &optvallen); - rc = zmq_setsockopt(s, option, (void *) optval, optvallen); + rc = zmq_setsockopt(s->ptr, option, (void *) optval, optvallen); } break; case ZMQ_RATE: @@ -109,7 +121,7 @@ static int Lzmq_setsockopt(lua_State *L) case ZMQ_RCVBUF: { uint64_t optval = (uint64_t) luaL_checklong(L, 3); - rc = zmq_setsockopt(s, option, (void *) &optval, sizeof(uint64_t)); + rc = zmq_setsockopt(s->ptr, option, (void *) &optval, sizeof(uint64_t)); } break; default: @@ -125,32 +137,31 @@ static int Lzmq_setsockopt(lua_State *L) static int Lzmq_bind(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *s = lua_touserdata(L, 1); + zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET); const char *addr = luaL_checkstring(L, 2); - if (zmq_bind(s, addr) != 0) { + if (zmq_bind(s->ptr, addr) != 0) { return luaL_error(L, zmq_strerror(errno)); } + return 0; } static int Lzmq_connect(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *s = lua_touserdata(L, 1); + zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET); const char *addr = luaL_checkstring(L, 2); - if (zmq_connect(s, addr) != 0) { + if (zmq_connect(s->ptr, addr) != 0) { return luaL_error(L, zmq_strerror(errno)); } + return 0; } static int Lzmq_send(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *s = lua_touserdata(L, 1); + zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET); size_t msg_size; const char *data = luaL_checklstring(L, 2, &msg_size); int flags = luaL_optint(L, 3, 0); @@ -159,7 +170,7 @@ static int Lzmq_send(lua_State *L) assert(zmq_msg_init_size(&msg, msg_size) == 0); memcpy(zmq_msg_data(&msg), data, msg_size); - int rc = zmq_send(s, &msg, flags); + int rc = zmq_send(s->ptr, &msg, flags); assert(zmq_msg_close(&msg) == 0); @@ -173,15 +184,14 @@ static int Lzmq_send(lua_State *L) } lua_pushboolean(L, 1); + return 1; } static int Lzmq_flush(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *s = lua_touserdata(L, 1); - - if (zmq_flush(s) != 0) { + zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET); + if (zmq_flush(s->ptr) != 0) { return luaL_error(L, zmq_strerror(errno)); } return 0; @@ -189,14 +199,13 @@ static int Lzmq_flush(lua_State *L) static int Lzmq_recv(lua_State *L) { - luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); - void *s = lua_touserdata(L, 1); + zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET); int flags = luaL_optint(L, 2, 0); zmq_msg_t msg; assert(zmq_msg_init(&msg) == 0); - int rc = zmq_recv(s, &msg, flags); + int rc = zmq_recv(s->ptr, &msg, flags); if (rc != 0 && errno == EAGAIN) { assert(zmq_msg_close(&msg) == 0); @@ -210,14 +219,24 @@ static int Lzmq_recv(lua_State *L) } lua_pushlstring(L, zmq_msg_data(&msg), zmq_msg_size(&msg)); + assert(zmq_msg_close(&msg) == 0); + return 1; } static const luaL_reg zmqlib[] = { {"init", Lzmq_init}, + {NULL, NULL} +}; + +static const luaL_reg ctxmethods[] = { {"term", Lzmq_term}, {"socket", Lzmq_socket}, + {NULL, NULL} +}; + +static const luaL_reg sockmethods[] = { {"close", Lzmq_close}, {"setsockopt", Lzmq_setsockopt}, {"bind", Lzmq_bind}, @@ -231,6 +250,21 @@ static const luaL_reg zmqlib[] = { LUALIB_API int luaopen_zmq(lua_State *L) { luaL_register(L, "zmq", zmqlib); + + // context metatable. + luaL_newmetatable(L, MT_ZMQ_CONTEXT); + lua_createtable(L, 0, sizeof(ctxmethods) / sizeof(luaL_reg) - 1); + luaL_register(L, NULL, ctxmethods); + lua_setfield(L, -2, "__index"); + lua_pop(L, 1); + + // socket metatable. + luaL_newmetatable(L, MT_ZMQ_SOCKET); + lua_createtable(L, 0, sizeof(sockmethods) / sizeof(luaL_reg) - 1); + luaL_register(L, NULL, sockmethods); + lua_setfield(L, -2, "__index"); + lua_pop(L, 1); + // flags. lua_pushnumber(L, ZMQ_POLL); lua_setfield(L, -2, "POLL"); @@ -238,6 +272,7 @@ LUALIB_API int luaopen_zmq(lua_State *L) lua_setfield(L, -2, "NOBLOCK"); lua_pushnumber(L, ZMQ_NOFLUSH); lua_setfield(L, -2, "NOFLUSH"); + // zmq.socket types. lua_pushnumber(L, ZMQ_P2P); lua_setfield(L, -2, "P2P"); @@ -257,6 +292,7 @@ LUALIB_API int luaopen_zmq(lua_State *L) lua_setfield(L, -2, "UPSTREAM"); lua_pushnumber(L, ZMQ_DOWNSTREAM); lua_setfield(L, -2, "DOWNSTREAM"); + // zmq.setsockopt options. lua_pushnumber(L, ZMQ_HWM); lua_setfield(L, -2, "HWM"); @@ -282,5 +318,6 @@ LUALIB_API int luaopen_zmq(lua_State *L) lua_setfield(L, -2, "SNDBUF"); lua_pushnumber(L, ZMQ_RCVBUF); lua_setfield(L, -2, "RCVBUF"); + return 1; }