Object sugar

lua-zmq
Aleksey Yeschenko 16 years ago
parent 5a06c29a95
commit 133cf040c6

@ -1,8 +1,8 @@
## constants ## constants
ZMQ_CONSTANT_NAME in the C API corresponds to zmq.CONSTANT_NAME in Lua. ZMQ_CONSTANT_NAME in the C API turns into zmq.CONSTANT_NAME in Lua.
## zmq.init ## init
Initialises ØMQ context. Initialises ØMQ context.
See [zmq_init(3)](http://api.zeromq.org/zmq_init.3.html). See [zmq_init(3)](http://api.zeromq.org/zmq_init.3.html).
@ -10,67 +10,67 @@ See [zmq_init(3)](http://api.zeromq.org/zmq_init.3.html).
zmq.init(app_threads, io_threads) zmq.init(app_threads, io_threads)
zmq.init(app_threads, io_threads, flags) zmq.init(app_threads, io_threads, flags)
## zmq.term ## term
Terminates ØMQ context. Terminates ØMQ context.
See [zmq_term(3)](http://api.zeromq.org/zmq_term.3.html). See [zmq_term(3)](http://api.zeromq.org/zmq_term.3.html).
zmq.term(context) ctx:term()
## zmq.socket ## socket
Creates ØMQ socket. Creates ØMQ socket.
See [zmq_socket(3)](http://api.zeromq.org/zmq_socket.3.html). See [zmq_socket(3)](http://api.zeromq.org/zmq_socket.3.html).
zmq.socket(context, type) ctx:socket(type)
## zmq.close ## close
Destroys ØMQ socket. Destroys ØMQ socket.
See [zmq_close(3)](http://api.zeromq.org/zmq_close.3.html). See [zmq_close(3)](http://api.zeromq.org/zmq_close.3.html).
zmq.close(socket) s:close()
## zmq.setsockopt ## setsockopt
Sets a specified option on a ØMQ socket. Sets a specified option on a ØMQ socket.
See [zmq_setsockopt(3)](http://api.zeromq.org/zmq_setsockopt.3.html). See [zmq_setsockopt(3)](http://api.zeromq.org/zmq_setsockopt.3.html).
zmq.setsockopt(socket, option, optval) s:setsockopt(option, optval)
## zmq.bind ## bind
Binds the socket to the specified address. Binds the socket to the specified address.
See [zmq_bind(3)](http://api.zeromq.org/zmq_bind.3.html). See [zmq_bind(3)](http://api.zeromq.org/zmq_bind.3.html).
zmq.bind(socket, addr) s:bind(addr)
## zmq.connect ## connect
Connect the socket to the specified address. Connect the socket to the specified address.
See [zmq_connect(3)](http://api.zeromq.org/zmq_connect.3.html). See [zmq_connect(3)](http://api.zeromq.org/zmq_connect.3.html).
zmq.connect(socket, addr) s:connect(addr)
## zmq.send ## send
Sends a message. Sends a message.
See [zmq_send(3)](http://api.zeromq.org/zmq_send.3.html). See [zmq_send(3)](http://api.zeromq.org/zmq_send.3.html).
zmq.send(socket, msg) s:send(msg)
zmq.send(socket, msg, flags) s:send(msg, flags)
## zmq.flush ## flush
Flushes unflushed messages to the socket. Flushes unflushed messages to the socket.
See [zmq_flush(3)](http://api.zeromq.org/zmq_flush.3.html). See [zmq_flush(3)](http://api.zeromq.org/zmq_flush.3.html).
zmq.flush(socket) s:flush()
## zmq.recv ## recv
Retrieves a message from the socket. Retrieves a message from the socket.
See [zmq_recv(3)](http://api.zeromq.org/zmq_recv.3.html). See [zmq_recv(3)](http://api.zeromq.org/zmq_recv.3.html).
zmq.recv(socket) s:recv()
zmq.recv(socket, flags) s:recv(flags)

@ -21,9 +21,12 @@
require("zmq") require("zmq")
local ctx = zmq.init(1, 1) local ctx = zmq.init(1, 1)
local s = zmq.socket(ctx, zmq.REQ) local s = ctx:socket(zmq.REQ)
zmq.connect(s, "tcp://localhost:5555")
zmq.send(s, "SELECT * FROM mytable") s:connect("tcp://localhost:5555")
print(zmq.recv(s))
zmq.close(s) s:send("SELECT * FROM mytable")
zmq.term(ctx) print(s:recv())
s:close()
ctx:term()

@ -21,10 +21,12 @@
require("zmq") require("zmq")
local ctx = zmq.init(1, 1, 0) local ctx = zmq.init(1, 1, 0)
local s = zmq.socket(ctx, zmq.PUB) local s = ctx:socket(zmq.PUB)
zmq.bind(s, "tcp://lo:5555")
s:bind("tcp://lo:5555")
local msg_id = 1 local msg_id = 1
while true do while true do
zmq.send(s, tostring(msg_id)) s:send(tostring(msg_id))
msg_id = msg_id + 1 msg_id = msg_id + 1
end end

@ -21,9 +21,11 @@
require("zmq") require("zmq")
local ctx = zmq.init(1, 1) local ctx = zmq.init(1, 1)
local s = zmq.socket(ctx, zmq.REP) local s = ctx:socket(zmq.REP)
zmq.bind(s, "tcp://lo:5555")
s:bind("tcp://lo:5555")
while true do while true do
print(string.format("Received query: '%s'", zmq.recv(s))) print(string.format("Received query: '%s'", s:recv()))
zmq.send(s, "OK") s:send("OK")
end end

@ -21,11 +21,11 @@
require("zmq") require("zmq")
local ctx = zmq.init(1, 1, 0) local ctx = zmq.init(1, 1, 0)
local s = zmq.socket(ctx, zmq.SUB) local s = ctx:socket(zmq.SUB)
zmq.setsockopt(s, zmq.SUBSCRIBE, "") s:setsockopt(zmq.SUBSCRIBE, "")
zmq.connect(s, "tcp://localhost:5555") s:connect("tcp://localhost:5555")
while true do while true do
local msg = zmq.recv(s) local msg = s:recv()
local msg_id = tonumber(msg) local msg_id = tonumber(msg)
if math.mod(msg_id, 10000) == 0 then print(msg_id) end if math.mod(msg_id, 10000) == 0 then print(msg_id) end
end end

@ -20,24 +20,25 @@
require("zmq") require("zmq")
if not arg[2] then if not arg[3] then
print("usage: lua local_lat.lua <bind-to> <roundtrip-count>") print("usage: lua local_lat.lua <bind-to> <message-size> <roundtrip-count>")
os.exit() os.exit()
end end
local bind_to = arg[1] local bind_to = arg[1]
local roundtrip_count = tonumber(arg[2]) local message_size = tonumber(arg[2])
local roundtrip_count = tonumber(arg[3])
local ctx = zmq.init(1, 1) local ctx = zmq.init(1, 1)
local s = zmq.socket(ctx, zmq.REP) local s = ctx:socket(zmq.REP)
zmq.bind(s, bind_to) s:bind(bind_to)
local msg local msg
for i = 1, roundtrip_count do for i = 1, roundtrip_count do
msg = zmq.recv(s) msg = s:recv()
zmq.send(s, msg) s:send(msg)
end end
zmq.close(s) s:close()
zmq.term(ctx) ctx:term()

@ -30,22 +30,22 @@ local message_size = tonumber(arg[2])
local message_count = tonumber(arg[3]) local message_count = tonumber(arg[3])
local ctx = zmq.init(1, 1) local ctx = zmq.init(1, 1)
local s = zmq.socket(ctx, zmq.SUB) local s = ctx:socket(zmq.SUB)
zmq.setsockopt(s, zmq.SUBSCRIBE, ""); s:setsockopt(zmq.SUBSCRIBE, "");
zmq.bind(s, bind_to) s:bind(bind_to)
local msg = zmq.recv(s) local msg = s:recv()
local start_time = os.time() local start_time = os.time()
for i = 1, message_count - 1 do for i = 1, message_count - 1 do
msg = zmq.recv(s) msg = s:recv()
end end
local end_time = os.time() local end_time = os.time()
zmq.close(s) s:close()
zmq.term(ctx) ctx:term()
local elapsed = os.difftime(end_time, start_time) local elapsed = os.difftime(end_time, start_time)
if elapsed == 0 then elapsed = 1 end if elapsed == 0 then elapsed = 1 end

@ -30,8 +30,8 @@ local message_size = tonumber(arg[2])
local roundtrip_count = tonumber(arg[3]) local roundtrip_count = tonumber(arg[3])
local ctx = zmq.init(1, 1) local ctx = zmq.init(1, 1)
local s = zmq.socket(ctx, zmq.REQ) local s = ctx:socket(zmq.REQ)
zmq.connect(s, connect_to) s:connect(connect_to)
local msg = "" local msg = ""
for i = 1, message_size do msg = msg .. "0" end for i = 1, message_size do msg = msg .. "0" end
@ -39,14 +39,14 @@ for i = 1, message_size do msg = msg .. "0" end
local start_time = os.time() local start_time = os.time()
for i = 1, roundtrip_count do for i = 1, roundtrip_count do
zmq.send(s, msg) s:send(msg)
msg = zmq.recv(s) msg = s:recv()
end end
local end_time = os.time() local end_time = os.time()
zmq.close(s) s:close()
zmq.term(ctx) ctx:term()
local elapsed = os.difftime(end_time, start_time) local elapsed = os.difftime(end_time, start_time)
local latency = elapsed * 1000000 / roundtrip_count / 2 local latency = elapsed * 1000000 / roundtrip_count / 2

@ -30,17 +30,17 @@ local message_size = tonumber(arg[2])
local message_count = tonumber(arg[3]) local message_count = tonumber(arg[3])
local ctx = zmq.init(1, 1) local ctx = zmq.init(1, 1)
local s = zmq.socket(ctx, zmq.PUB) local s = ctx:socket(zmq.PUB)
zmq.connect(s, connect_to) s:connect(connect_to)
local msg = "" local msg = ""
for i = 1, message_size do msg = msg .. "0" end for i = 1, message_size do msg = msg .. "0" end
for i = 1, message_count do for i = 1, message_count do
zmq.send(s, msg) s:send(msg)
end end
os.execute("sleep " .. 10) os.execute("sleep " .. 10)
zmq.close(s) s:close()
zmq.term(ctx) ctx:term()

109
zmq.c

@ -30,55 +30,67 @@
#include <string.h> #include <string.h>
#include <stdint.h> #include <stdint.h>
#define MT_ZMQ_CONTEXT "MT_ZMQ_CONTEXT"
#define MT_ZMQ_SOCKET "MT_ZMQ_SOCKET"
typedef struct { void *ptr; } zmq_ptr;
static int Lzmq_init(lua_State *L) static int Lzmq_init(lua_State *L)
{ {
int app_threads = luaL_checkint(L, 1); int app_threads = luaL_checkint(L, 1);
int io_threads = luaL_checkint(L, 2); int io_threads = luaL_checkint(L, 2);
int flags = luaL_optint(L, 3, 0); int flags = luaL_optint(L, 3, 0);
void *ctx = zmq_init(app_threads, io_threads, flags); zmq_ptr *ctx = lua_newuserdata(L, sizeof(zmq_ptr));
if (!ctx) {
ctx->ptr = zmq_init(app_threads, io_threads, flags);
if (!ctx->ptr) {
return luaL_error(L, zmq_strerror(errno)); return luaL_error(L, zmq_strerror(errno));
} }
lua_pushlightuserdata(L, ctx);
luaL_getmetatable(L, MT_ZMQ_CONTEXT);
lua_setmetatable(L, -2);
return 1; return 1;
} }
static int Lzmq_term(lua_State *L) static int Lzmq_term(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *ctx = luaL_checkudata(L, 1, MT_ZMQ_CONTEXT);
void *ctx = lua_touserdata(L, 1); assert(zmq_term(ctx->ptr) == 0);
assert(zmq_term(ctx) == 0);
return 0; return 0;
} }
static int Lzmq_socket(lua_State *L) static int Lzmq_socket(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *ctx = luaL_checkudata(L, 1, MT_ZMQ_CONTEXT);
void *ctx = lua_touserdata(L, 1);
int type = luaL_checkint(L, 2); int type = luaL_checkint(L, 2);
void *s = zmq_socket(ctx, type); zmq_ptr *s = lua_newuserdata(L, sizeof(zmq_ptr));
if (!s) {
s->ptr = zmq_socket(ctx->ptr, type);
if (!s->ptr) {
return luaL_error(L, zmq_strerror(errno)); return luaL_error(L, zmq_strerror(errno));
} }
lua_pushlightuserdata(L, s);
luaL_getmetatable(L, MT_ZMQ_SOCKET);
lua_setmetatable(L, -2);
return 1; return 1;
} }
static int Lzmq_close(lua_State *L) static int Lzmq_close(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
void *s = lua_touserdata(L, 1); assert(zmq_close(s->ptr) == 0);
assert(zmq_close(s) == 0);
return 0; return 0;
} }
static int Lzmq_setsockopt(lua_State *L) static int Lzmq_setsockopt(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
void *s = lua_touserdata(L, 1);
int option = luaL_checkint(L, 2); int option = luaL_checkint(L, 2);
int rc = 0; int rc = 0;
@ -90,7 +102,7 @@ static int Lzmq_setsockopt(lua_State *L)
case ZMQ_AFFINITY: case ZMQ_AFFINITY:
{ {
int64_t optval = (int64_t) luaL_checklong(L, 3); int64_t optval = (int64_t) luaL_checklong(L, 3);
rc = zmq_setsockopt(s, option, (void *) &optval, sizeof(int64_t)); rc = zmq_setsockopt(s->ptr, option, (void *) &optval, sizeof(int64_t));
} }
break; break;
case ZMQ_IDENTITY: case ZMQ_IDENTITY:
@ -99,7 +111,7 @@ static int Lzmq_setsockopt(lua_State *L)
{ {
size_t optvallen; size_t optvallen;
const char *optval = luaL_checklstring(L, 3, &optvallen); const char *optval = luaL_checklstring(L, 3, &optvallen);
rc = zmq_setsockopt(s, option, (void *) optval, optvallen); rc = zmq_setsockopt(s->ptr, option, (void *) optval, optvallen);
} }
break; break;
case ZMQ_RATE: case ZMQ_RATE:
@ -109,7 +121,7 @@ static int Lzmq_setsockopt(lua_State *L)
case ZMQ_RCVBUF: case ZMQ_RCVBUF:
{ {
uint64_t optval = (uint64_t) luaL_checklong(L, 3); uint64_t optval = (uint64_t) luaL_checklong(L, 3);
rc = zmq_setsockopt(s, option, (void *) &optval, sizeof(uint64_t)); rc = zmq_setsockopt(s->ptr, option, (void *) &optval, sizeof(uint64_t));
} }
break; break;
default: default:
@ -125,32 +137,31 @@ static int Lzmq_setsockopt(lua_State *L)
static int Lzmq_bind(lua_State *L) static int Lzmq_bind(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
void *s = lua_touserdata(L, 1);
const char *addr = luaL_checkstring(L, 2); const char *addr = luaL_checkstring(L, 2);
if (zmq_bind(s, addr) != 0) { if (zmq_bind(s->ptr, addr) != 0) {
return luaL_error(L, zmq_strerror(errno)); return luaL_error(L, zmq_strerror(errno));
} }
return 0; return 0;
} }
static int Lzmq_connect(lua_State *L) static int Lzmq_connect(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
void *s = lua_touserdata(L, 1);
const char *addr = luaL_checkstring(L, 2); const char *addr = luaL_checkstring(L, 2);
if (zmq_connect(s, addr) != 0) { if (zmq_connect(s->ptr, addr) != 0) {
return luaL_error(L, zmq_strerror(errno)); return luaL_error(L, zmq_strerror(errno));
} }
return 0; return 0;
} }
static int Lzmq_send(lua_State *L) static int Lzmq_send(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
void *s = lua_touserdata(L, 1);
size_t msg_size; size_t msg_size;
const char *data = luaL_checklstring(L, 2, &msg_size); const char *data = luaL_checklstring(L, 2, &msg_size);
int flags = luaL_optint(L, 3, 0); int flags = luaL_optint(L, 3, 0);
@ -159,7 +170,7 @@ static int Lzmq_send(lua_State *L)
assert(zmq_msg_init_size(&msg, msg_size) == 0); assert(zmq_msg_init_size(&msg, msg_size) == 0);
memcpy(zmq_msg_data(&msg), data, msg_size); memcpy(zmq_msg_data(&msg), data, msg_size);
int rc = zmq_send(s, &msg, flags); int rc = zmq_send(s->ptr, &msg, flags);
assert(zmq_msg_close(&msg) == 0); assert(zmq_msg_close(&msg) == 0);
@ -173,15 +184,14 @@ static int Lzmq_send(lua_State *L)
} }
lua_pushboolean(L, 1); lua_pushboolean(L, 1);
return 1; return 1;
} }
static int Lzmq_flush(lua_State *L) static int Lzmq_flush(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
void *s = lua_touserdata(L, 1); if (zmq_flush(s->ptr) != 0) {
if (zmq_flush(s) != 0) {
return luaL_error(L, zmq_strerror(errno)); return luaL_error(L, zmq_strerror(errno));
} }
return 0; return 0;
@ -189,14 +199,13 @@ static int Lzmq_flush(lua_State *L)
static int Lzmq_recv(lua_State *L) static int Lzmq_recv(lua_State *L)
{ {
luaL_checktype(L, 1, LUA_TLIGHTUSERDATA); zmq_ptr *s = luaL_checkudata(L, 1, MT_ZMQ_SOCKET);
void *s = lua_touserdata(L, 1);
int flags = luaL_optint(L, 2, 0); int flags = luaL_optint(L, 2, 0);
zmq_msg_t msg; zmq_msg_t msg;
assert(zmq_msg_init(&msg) == 0); assert(zmq_msg_init(&msg) == 0);
int rc = zmq_recv(s, &msg, flags); int rc = zmq_recv(s->ptr, &msg, flags);
if (rc != 0 && errno == EAGAIN) { if (rc != 0 && errno == EAGAIN) {
assert(zmq_msg_close(&msg) == 0); assert(zmq_msg_close(&msg) == 0);
@ -210,14 +219,24 @@ static int Lzmq_recv(lua_State *L)
} }
lua_pushlstring(L, zmq_msg_data(&msg), zmq_msg_size(&msg)); lua_pushlstring(L, zmq_msg_data(&msg), zmq_msg_size(&msg));
assert(zmq_msg_close(&msg) == 0); assert(zmq_msg_close(&msg) == 0);
return 1; return 1;
} }
static const luaL_reg zmqlib[] = { static const luaL_reg zmqlib[] = {
{"init", Lzmq_init}, {"init", Lzmq_init},
{NULL, NULL}
};
static const luaL_reg ctxmethods[] = {
{"term", Lzmq_term}, {"term", Lzmq_term},
{"socket", Lzmq_socket}, {"socket", Lzmq_socket},
{NULL, NULL}
};
static const luaL_reg sockmethods[] = {
{"close", Lzmq_close}, {"close", Lzmq_close},
{"setsockopt", Lzmq_setsockopt}, {"setsockopt", Lzmq_setsockopt},
{"bind", Lzmq_bind}, {"bind", Lzmq_bind},
@ -231,6 +250,21 @@ static const luaL_reg zmqlib[] = {
LUALIB_API int luaopen_zmq(lua_State *L) LUALIB_API int luaopen_zmq(lua_State *L)
{ {
luaL_register(L, "zmq", zmqlib); luaL_register(L, "zmq", zmqlib);
// context metatable.
luaL_newmetatable(L, MT_ZMQ_CONTEXT);
lua_createtable(L, 0, sizeof(ctxmethods) / sizeof(luaL_reg) - 1);
luaL_register(L, NULL, ctxmethods);
lua_setfield(L, -2, "__index");
lua_pop(L, 1);
// socket metatable.
luaL_newmetatable(L, MT_ZMQ_SOCKET);
lua_createtable(L, 0, sizeof(sockmethods) / sizeof(luaL_reg) - 1);
luaL_register(L, NULL, sockmethods);
lua_setfield(L, -2, "__index");
lua_pop(L, 1);
// flags. // flags.
lua_pushnumber(L, ZMQ_POLL); lua_pushnumber(L, ZMQ_POLL);
lua_setfield(L, -2, "POLL"); lua_setfield(L, -2, "POLL");
@ -238,6 +272,7 @@ LUALIB_API int luaopen_zmq(lua_State *L)
lua_setfield(L, -2, "NOBLOCK"); lua_setfield(L, -2, "NOBLOCK");
lua_pushnumber(L, ZMQ_NOFLUSH); lua_pushnumber(L, ZMQ_NOFLUSH);
lua_setfield(L, -2, "NOFLUSH"); lua_setfield(L, -2, "NOFLUSH");
// zmq.socket types. // zmq.socket types.
lua_pushnumber(L, ZMQ_P2P); lua_pushnumber(L, ZMQ_P2P);
lua_setfield(L, -2, "P2P"); lua_setfield(L, -2, "P2P");
@ -257,6 +292,7 @@ LUALIB_API int luaopen_zmq(lua_State *L)
lua_setfield(L, -2, "UPSTREAM"); lua_setfield(L, -2, "UPSTREAM");
lua_pushnumber(L, ZMQ_DOWNSTREAM); lua_pushnumber(L, ZMQ_DOWNSTREAM);
lua_setfield(L, -2, "DOWNSTREAM"); lua_setfield(L, -2, "DOWNSTREAM");
// zmq.setsockopt options. // zmq.setsockopt options.
lua_pushnumber(L, ZMQ_HWM); lua_pushnumber(L, ZMQ_HWM);
lua_setfield(L, -2, "HWM"); lua_setfield(L, -2, "HWM");
@ -282,5 +318,6 @@ LUALIB_API int luaopen_zmq(lua_State *L)
lua_setfield(L, -2, "SNDBUF"); lua_setfield(L, -2, "SNDBUF");
lua_pushnumber(L, ZMQ_RCVBUF); lua_pushnumber(L, ZMQ_RCVBUF);
lua_setfield(L, -2, "RCVBUF"); lua_setfield(L, -2, "RCVBUF");
return 1; return 1;
} }

Loading…
Cancel
Save