Add support for zeromq 4.x

master
Robert G. Jakabosky 12 years ago
parent 327583c0f6
commit a335ef0625

@ -37,11 +37,11 @@ typedef struct ZMQ_Ctx ZMQ_Ctx;
c_method_call "!ZMQ_Socket *" "zmq_socket" { "int", "type"}
},
method "set" {
if_defs = { "VERSION_3_2" },
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "int" "zmq_ctx_set" { "int", "flag", "int", "value" }
},
method "get" {
if_defs = { "VERSION_3_2" },
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "int" "zmq_ctx_get" { "int", "flag" }
},
}

@ -128,8 +128,87 @@ ZMQ_Error lzmq_socket_hwm(ZMQ_Socket *sock, int *value) {
[30] = { },
[31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" },
},
{ ver_def = 'VERSION_4_0', major = 4, minor = 0,
[1] = { name="hwm", otype="INT", mode="rw",
custom = [[
ZMQ_Error lzmq_socket_set_hwm(ZMQ_Socket *sock, int value) {
int val;
int rc;
val = (int)value;
rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &value, sizeof(value));
if(-1 == rc) return rc;
val = (int)value;
return zmq_setsockopt(sock, ZMQ_RCVHWM, &value, sizeof(value));
}
ZMQ_Error lzmq_socket_hwm(ZMQ_Socket *sock, int *value) {
size_t val_len;
int rc;
val_len = sizeof(value);
rc = zmq_getsockopt(sock, ZMQ_SNDHWM, value, &val_len);
if(-1 == rc) return rc;
val_len = sizeof(value);
return zmq_getsockopt(sock, ZMQ_RCVHWM, value, &val_len);
}
local max_options = 50
]] },
[2] = { },
[3] = { },
[4] = { name="affinity", otype="UINT64", mode="rw", ltype="uint64_t" },
[5] = { name="identity", otype="BLOB", mode="rw", ltype="const char *" },
[6] = { name="subscribe", otype="BLOB", mode="w", ltype="const char *" },
[7] = { name="unsubscribe", otype="BLOB", mode="w", ltype="const char *" },
[8] = { name="rate", otype="INT", mode="rw", ltype="int" },
[9] = { name="recovery_ivl", otype="INT", mode="rw", ltype="int" },
[10] = { },
[11] = { name="sndbuf", otype="INT", mode="rw", ltype="int" },
[12] = { name="rcvbuf", otype="INT", mode="rw", ltype="int" },
[13] = { name="rcvmore", otype="INT", mode="r", ltype="int" },
[14] = { name="fd", otype="FD", mode="r", ltype="int" },
[15] = { name="events", otype="INT", mode="r", ltype="int" },
[16] = { name="type", otype="INT", mode="r", ltype="int" },
[17] = { name="linger", otype="INT", mode="rw", ltype="int" },
[18] = { name="reconnect_ivl", otype="INT", mode="rw", ltype="int" },
[19] = { name="backlog", otype="INT", mode="rw", ltype="int" },
[20] = { },
[21] = { name="reconnect_ivl_max", otype="INT", mode="rw", ltype="int" },
[22] = { name="maxmsgsize", otype="INT64", mode="rw", ltype="int64_t" },
[23] = { name="sndhwm", otype="INT", mode="rw", ltype="int" },
[24] = { name="rcvhwm", otype="INT", mode="rw", ltype="int" },
[25] = { name="multicast_hops", otype="INT", mode="rw", ltype="int" },
[26] = { },
[27] = { name="rcvtimeo", otype="INT", mode="rw", ltype="int" },
[28] = { name="sndtimeo", otype="INT", mode="rw", ltype="int" },
[29] = { },
[30] = { },
[31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" },
-- New to version 4.x
[32] = { name="last_endpoint", otype="BLOB", mode="r", ltype="const char *" },
[33] = { name="router_mandatory", otype="INT", mode="w", ltype="int" },
[34] = { name="tcp_keepalive", otype="INT", mode="rw", ltype="int" },
[35] = { name="tcp_keepalive_cnt", otype="INT", mode="rw", ltype="int" },
[36] = { name="tcp_keepalive_idle",otype="INT", mode="rw", ltype="int" },
[37] = { name="tcp_keepalive_intvl",otype="INT", mode="rw", ltype="int" },
[38] = { name="tcp_accept_filter", otype="BLOB", mode="w", ltype="const char *" },
[39] = { name="immediate", otype="INT", mode="rw", ltype="int" },
[40] = { name="xpub_verbose", otype="INT", mode="w", ltype="int" },
[41] = { name="router_raw", otype="INT", mode="w", ltype="int" },
[42] = { name="ipv6", otype="INT", mode="rw", ltype="int" },
[43] = { name="mechanism", otype="INT", mode="r", ltype="int" },
[44] = { name="plain_server", otype="INT", mode="rw", ltype="int" },
[45] = { name="plain_username", otype="BLOB", mode="rw", ltype="const char *" },
[46] = { name="plain_password", otype="BLOB", mode="rw", ltype="const char *" },
[47] = { name="curve_server", otype="INT", mode="rw", ltype="int" },
[48] = { name="curve_publickey", otype="BLOB", mode="rw", ltype="const char *" },
[49] = { name="curve_secretkey", otype="BLOB", mode="rw", ltype="const char *" },
[50] = { name="curve_serverkey", otype="BLOB", mode="rw", ltype="const char *" },
[51] = { name="probe_router", otype="INT", mode="w", ltype="int" },
[52] = { name="req_correlate", otype="INT", mode="w", ltype="int" },
[53] = { name="req_relaxed", otype="INT", mode="w", ltype="int" },
[54] = { name="conflate", otype="INT", mode="rw", ltype="int" },
[55] = { name="zap_domain", otype="BLOB", mode="rw", ltype="const char *" },
},
}
local max_options = 60 -- this number must be larger then the highest option value.
local function foreach_opt(func)
for i=1,#socket_options do
@ -310,7 +389,9 @@ end)
endif(last_ver.ver_def)
add(opt_types, [[
#if VERSION_3_0
#if VERSION_4_0
# define MAX_OPTS VERSION_4_0_MAX_OPT
#elif VERSION_3_0
# define MAX_OPTS VERSION_3_0_MAX_OPT
#else
# if VERSION_2_2
@ -401,8 +482,12 @@ local VERSION_2_0 = true
local VERSION_2_1 = false
local VERSION_2_2 = false
local VERSION_3_0 = false
local VERSION_4_0 = false
local zver = _M.version()
if zver[1] == 3 then
VERSION_2_0 = false
VERSION_4_0 = true
elseif zver[1] == 3 then
VERSION_2_0 = false
VERSION_3_0 = true
elseif zver[1] == 2 and zver[2] == 2 then
@ -464,14 +549,14 @@ static const int opt_types[] = {
c_method_call "ZMQ_Error" "zmq_bind" { "const char *", "addr" }
},
method "unbind" {
if_defs = { "VERSION_3_2" },
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_unbind" { "const char *", "addr" }
},
method "connect" {
c_method_call "ZMQ_Error" "zmq_connect" { "const char *", "addr" }
},
method "disconnect" {
if_defs = { "VERSION_3_2" },
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_disconnect" { "const char *", "addr" }
},
ffi_cdef[[
@ -512,7 +597,7 @@ end
size_t val_len;
const void *val;
#if defined(VERSION_2_1) || defined(VERSION_3_0)
#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
socket_t fd_val;
#endif
int int_val;
@ -520,7 +605,7 @@ end
uint64_t uint64_val;
int64_t int64_val;
#if VERSION_3_0
#if VERSION_3_0 || VERSION_4_0
/* 3.0 backwards compatibility support for HWM. */
if(${opt} == ZMQ_HWM) {
int_val = luaL_checklong(L, ${val::idx});
@ -539,7 +624,7 @@ end
}
switch(opt_types[${opt}]) {
#if defined(VERSION_2_1) || defined(VERSION_3_0)
#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
case OPT_TYPE_FD:
fd_val = luaL_checklong(L, ${val::idx});
val = &fd_val;
@ -596,7 +681,7 @@ local tmp_val_len = ffi.new('size_t[1]', 4)
c_source[[
size_t val_len;
#if defined(VERSION_2_1) || defined(VERSION_3_0)
#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
socket_t fd_val;
#endif
int int_val;
@ -613,7 +698,7 @@ local tmp_val_len = ffi.new('size_t[1]', 4)
}
switch(opt_types[${opt}]) {
#if defined(VERSION_2_1) || defined(VERSION_3_0)
#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
case OPT_TYPE_FD:
val_len = sizeof(fd_val);
${err} = zmq_getsockopt(${this}, ${opt}, &fd_val, &val_len);
@ -758,6 +843,198 @@ local tmp_msg = ffi.new('zmq_msg_t')
]],
},
--
-- Monitor socket.
--
method "monitor" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_socket_monitor" { "const char *", "addr", "int", "events" }
},
c_source[[
typedef struct ZMQ_recv_event {
int event_id;
int value;
const char *addr;
size_t addr_len;
const char *err;
} ZMQ_recv_event;
int monitor_recv_event(ZMQ_Socket *s, zmq_msg_t *msg, int flags, ZMQ_recv_event *ev)
{
int rc ;
zmq_event_t event;
ev->event_id = 0;
ev->value = 0;
ev->addr = NULL;
ev->err = NULL;
ev->addr_len = 0;
zmq_msg_init(msg);
/* recv binary event. */
rc = zmq_recvmsg(s, msg, flags);
if(rc < 0) {
return rc;
}
#if ZMQ_VERSION_MAJOR == 3
if(zmq_msg_size(msg) != sizeof(event)) {
ev->err = "Invalid monitor event. Wrong event size.";
return -1;
}
memcpy(&event, zmq_msg_data(msg), sizeof(event));
ev->event_id = event.event;
switch(event.event) {
case ZMQ_EVENT_CONNECTED:
ev->value = event.data.connected.fd;
ev->addr = event.data.connected.addr;
break;
case ZMQ_EVENT_CONNECT_DELAYED:
ev->value = event.data.connect_delayed.err;
ev->addr = event.data.connect_delayed.addr;
break;
case ZMQ_EVENT_CONNECT_RETRIED:
ev->value = event.data.connect_retried.interval;
ev->addr = event.data.connect_retried.addr;
break;
case ZMQ_EVENT_LISTENING:
ev->value = event.data.listening.fd;
ev->addr = event.data.listening.addr;
break;
case ZMQ_EVENT_BIND_FAILED:
ev->value = event.data.bind_failed.err;
ev->addr = event.data.bind_failed.addr;
break;
case ZMQ_EVENT_ACCEPTED:
ev->value = event.data.accepted.fd;
ev->addr = event.data.accepted.addr;
break;
case ZMQ_EVENT_ACCEPT_FAILED:
ev->value = event.data.accept_failed.err;
ev->addr = event.data.accept_failed.addr;
break;
case ZMQ_EVENT_CLOSED:
ev->value = event.data.closed.fd;
ev->addr = event.data.closed.addr;
break;
case ZMQ_EVENT_CLOSE_FAILED:
ev->value = event.data.close_failed.err;
ev->addr = event.data.close_failed.addr;
break;
case ZMQ_EVENT_DISCONNECTED:
ev->value = event.data.disconnected.fd;
ev->addr = event.data.disconnected.addr;
break;
}
if(ev->addr) {
ev->addr_len = strlen(ev->addr);
}
if(zmq_msg_more(msg) != 0) {
ev->err = "Invalid monitor event. Has too many parts.";
return -1;
}
#else
if(zmq_msg_size(msg) != (sizeof(event.event) + sizeof(event.value))) {
ev->err = "Invalid monitor event. Wrong event size.";
return -1;
}
/* copy binary data to event struct */
const char* data = (char*)zmq_msg_data(msg);
memcpy(&(event.event), data, sizeof(event.event));
memcpy(&(event.value), data+sizeof(event.event), sizeof(event.value));
ev->event_id = event.event;
ev->value = event.value;
if(zmq_msg_more(msg) == 0) {
ev->err = "Invalid monitor event. Missing address part.";
return -1;
}
ev->value = event.value;
/* recv address part */
rc = zmq_recvmsg(s, msg, flags);
if(rc < 0) {
return rc;
}
if(zmq_msg_more(msg) != 0) {
ev->err = "Invalid monitor event. Has too many parts.";
return -1;
}
/* copy address part */
ev->addr_len = zmq_msg_size(msg) ;
ev->addr = zmq_msg_data(msg);
#endif
return 1;
}
]],
ffi_cdef[[
typedef struct ZMQ_recv_event {
int event_id;
int value;
const char *addr;
size_t addr_len;
const char *err;
} ZMQ_recv_event;
int monitor_recv_event(ZMQ_Socket *s, zmq_msg_t *msg, int flags, ZMQ_recv_event *ev);
]],
ffi_source[[
local tmp_recv_event = ffi.new('ZMQ_recv_event')
]],
method "recv_event" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
var_in{ "int", "flags?" },
var_out{ "int", "event_id" },
var_out{ "int", "value" },
var_out{ "const char *", "addr", has_length = true },
var_out{ "ZMQ_Error", "err" },
c_source[[
zmq_msg_t msg;
ZMQ_recv_event event;
/* receive monitor event */
${err} = monitor_recv_event(${this}, &msg, ${flags}, &event);
if(${err} >= 0) {
${event_id} = event.event_id;
${value} = event.value;
${addr} = event.addr;
${addr_len} = event.addr_len; //${err};
} else if(event.err != NULL) {
/* error parsing monitor event. */
lua_pushnil(L);
lua_pushstring(L, event.err);
return 2;
}
]],
c_source "post" [[
/* close message */
zmq_msg_close(&msg);
]],
ffi_source[[
local msg = tmp_msg
local event = tmp_recv_event
local addr
-- receive monitor event
${err} = Cmod.monitor_recv_event(${this}, msg, ${flags}, event)
if ${err} >= 0 then
addr = ffi.string(event.addr, event.addr_len)
-- close message
C.zmq_msg_close(msg)
return event.event_id, event.value, addr
end
-- close message
C.zmq_msg_close(msg)
if event.err ~= nil then
-- error parsing monitor event.
return nil, ffi.string(event.err)
end
]],
},
-- build option set/get methods. THIS MUST BE LAST.
build_option_methods(),
}

@ -45,6 +45,7 @@ c_source "typedefs" [[
#define VERSION_2_2 0
#define VERSION_3_0 0
#define VERSION_3_2 0
#define VERSION_4_0 0
#if defined(ZMQ_VERSION_MAJOR)
# if (ZMQ_VERSION_MAJOR == 2) && (ZMQ_VERSION_MINOR == 2)
# undef VERSION_2_2
@ -78,6 +79,16 @@ c_source "typedefs" [[
# undef VERSION_3_0
# define VERSION_3_0 1
# endif
# if (ZMQ_VERSION_MAJOR == 4)
# undef VERSION_2_0
# define VERSION_2_0 0
# undef VERSION_3_2
# define VERSION_3_2 0
# undef VERSION_3_0
# define VERSION_3_0 0
# undef VERSION_4_0
# define VERSION_4_0 1
# endif
#endif
/* make sure ZMQ_DONTWAIT & ZMQ_NOBLOCK are both defined. */
@ -104,7 +115,7 @@ c_source "typedefs" [[
#if VERSION_2_0
# define ZMQ_POLL_MSEC 1000 // zmq_poll is usec
#elif VERSION_3_0
#elif VERSION_3_0 || VERSION_4_0
# define ZMQ_POLL_MSEC 1 // zmq_poll is msec
# ifndef ZMQ_HWM
# define ZMQ_HWM 1 // backwards compatibility
@ -119,7 +130,7 @@ export_definitions {
MAX_VSM_SIZE = "ZMQ_MAX_VSM_SIZE",
-- context settings
MAX_SOCKETS = "ZMQ_MAX_SOCKETS",
MAX_SOCKETS = "ZMQ_MAX_SOCKETS",
IO_THREADS = "ZMQ_IO_THREADS",
-- message types
@ -176,6 +187,30 @@ MULTICAST_HOPS = "ZMQ_MULTICAST_HOPS",
RCVTIMEO = "ZMQ_RCVTIMEO",
SNDTIMEO = "ZMQ_SNDTIMEO",
RCVLABEL = "ZMQ_RCVLABEL",
LAST_ENDPOINT = "ZMQ_LAST_ENDPOINT",
ROUTER_MANDATORY = "ZMQ_ROUTER_MANDATORY",
TCP_KEEPALIVE = "ZMQ_TCP_KEEPALIVE",
TCP_KEEPALIVE_CNT = "ZMQ_TCP_KEEPALIVE_CNT",
TCP_KEEPALIVE_IDLE= "ZMQ_TCP_KEEPALIVE_IDLE",
TCP_KEEPALIVE_INTVL= "ZMQ_TCP_KEEPALIVE_INTVL",
TCP_ACCEPT_FILTER = "ZMQ_TCP_ACCEPT_FILTER",
IMMEDIATE = "ZMQ_IMMEDIATE",
XPUB_VERBOSE = "ZMQ_XPUB_VERBOSE",
ROUTER_RAW = "ZMQ_ROUTER_RAW",
IPV6 = "ZMQ_IPV6",
MECHANISM = "ZMQ_MECHANISM",
PLAIN_SERVER = "ZMQ_PLAIN_SERVER",
PLAIN_USERNAME = "ZMQ_PLAIN_USERNAME",
PLAIN_PASSWORD = "ZMQ_PLAIN_PASSWORD",
CURVE_SERVER = "ZMQ_CURVE_SERVER",
CURVE_PUBLICKEY = "ZMQ_CURVE_PUBLICKEY",
CURVE_SECRETKEY = "ZMQ_CURVE_SECRETKEY",
CURVE_SERVERKEY = "ZMQ_CURVE_SERVERKEY",
PROBE_ROUTER = "ZMQ_PROBE_ROUTER",
REQ_CORRELATE = "ZMQ_REQ_CORRELATE",
REQ_RELAXED = "ZMQ_REQ_RELAXED",
CONFLATE = "ZMQ_CONFLATE",
ZAP_DOMAIN = "ZMQ_ZAP_DOMAIN",
-- send/recv flags
NOBLOCK = "ZMQ_NOBLOCK",
@ -183,6 +218,11 @@ DONTWAIT = "ZMQ_DONTWAIT",
SNDMORE = "ZMQ_SNDMORE",
SNDLABEL = "ZMQ_SNDLABEL",
-- Security mechanisms
NULL = "ZMQ_NULL",
PLAIN = "ZMQ_PLAIN",
CURVE = "ZMQ_CURVE",
-- poll events
POLLIN = "ZMQ_POLLIN",
POLLOUT = "ZMQ_POLLOUT",
@ -191,13 +231,30 @@ POLLERR = "ZMQ_POLLERR",
-- poll milliseconds.
POLL_MSEC = "ZMQ_POLL_MSEC",
-- Socket Monitor events.
EVENT_CONNECTED = "ZMQ_EVENT_CONNECTED",
EVENT_CONNECT_DELAYED = "ZMQ_EVENT_CONNECT_DELAYED",
EVENT_CONNECT_RETRIED = "ZMQ_EVENT_CONNECT_RETRIED",
EVENT_LISTENING = "ZMQ_EVENT_LISTENING",
EVENT_BIND_FAILED = "ZMQ_EVENT_BIND_FAILED",
EVENT_ACCEPTED = "ZMQ_EVENT_ACCEPTED",
EVENT_ACCEPT_FAILED= "ZMQ_EVENT_ACCEPT_FAILED",
EVENT_CLOSED = "ZMQ_EVENT_CLOSED",
EVENT_CLOSE_FAILED= "ZMQ_EVENT_CLOSE_FAILED",
EVENT_DISCONNECTED= "ZMQ_EVENT_DISCONNECTED",
EVENT_MONITOR_STOPPED = "ZMQ_EVENT_MONITOR_STOPPED",
EVENT_ALL = "ZMQ_EVENT_ALL",
-- devices
STREAMER = "ZMQ_STREAMER",
FORWARDER = "ZMQ_FORWARDER",
QUEUE = "ZMQ_QUEUE",
},
subfiles {
"src/error.nobj.lua",
"src/msg.nobj.lua",

Loading…
Cancel
Save