diff --git a/src/ctx.nobj.lua b/src/ctx.nobj.lua index 694520f..fa7bf11 100644 --- a/src/ctx.nobj.lua +++ b/src/ctx.nobj.lua @@ -37,11 +37,11 @@ typedef struct ZMQ_Ctx ZMQ_Ctx; c_method_call "!ZMQ_Socket *" "zmq_socket" { "int", "type"} }, method "set" { - if_defs = { "VERSION_3_2" }, + if_defs = { "VERSION_3_2", "VERSION_4_0" }, c_method_call "int" "zmq_ctx_set" { "int", "flag", "int", "value" } }, method "get" { - if_defs = { "VERSION_3_2" }, + if_defs = { "VERSION_3_2", "VERSION_4_0" }, c_method_call "int" "zmq_ctx_get" { "int", "flag" } }, } diff --git a/src/socket.nobj.lua b/src/socket.nobj.lua index bee28f2..61648db 100644 --- a/src/socket.nobj.lua +++ b/src/socket.nobj.lua @@ -128,8 +128,87 @@ ZMQ_Error lzmq_socket_hwm(ZMQ_Socket *sock, int *value) { [30] = { }, [31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" }, }, + { ver_def = 'VERSION_4_0', major = 4, minor = 0, + [1] = { name="hwm", otype="INT", mode="rw", +custom = [[ +ZMQ_Error lzmq_socket_set_hwm(ZMQ_Socket *sock, int value) { + int val; + int rc; + val = (int)value; + rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &value, sizeof(value)); + if(-1 == rc) return rc; + val = (int)value; + return zmq_setsockopt(sock, ZMQ_RCVHWM, &value, sizeof(value)); +} +ZMQ_Error lzmq_socket_hwm(ZMQ_Socket *sock, int *value) { + size_t val_len; + int rc; + val_len = sizeof(value); + rc = zmq_getsockopt(sock, ZMQ_SNDHWM, value, &val_len); + if(-1 == rc) return rc; + val_len = sizeof(value); + return zmq_getsockopt(sock, ZMQ_RCVHWM, value, &val_len); } -local max_options = 50 + +]] }, + [2] = { }, + [3] = { }, + [4] = { name="affinity", otype="UINT64", mode="rw", ltype="uint64_t" }, + [5] = { name="identity", otype="BLOB", mode="rw", ltype="const char *" }, + [6] = { name="subscribe", otype="BLOB", mode="w", ltype="const char *" }, + [7] = { name="unsubscribe", otype="BLOB", mode="w", ltype="const char *" }, + [8] = { name="rate", otype="INT", mode="rw", ltype="int" }, + [9] = { name="recovery_ivl", otype="INT", mode="rw", ltype="int" }, + [10] = { }, + [11] = { name="sndbuf", otype="INT", mode="rw", ltype="int" }, + [12] = { name="rcvbuf", otype="INT", mode="rw", ltype="int" }, + [13] = { name="rcvmore", otype="INT", mode="r", ltype="int" }, + [14] = { name="fd", otype="FD", mode="r", ltype="int" }, + [15] = { name="events", otype="INT", mode="r", ltype="int" }, + [16] = { name="type", otype="INT", mode="r", ltype="int" }, + [17] = { name="linger", otype="INT", mode="rw", ltype="int" }, + [18] = { name="reconnect_ivl", otype="INT", mode="rw", ltype="int" }, + [19] = { name="backlog", otype="INT", mode="rw", ltype="int" }, + [20] = { }, + [21] = { name="reconnect_ivl_max", otype="INT", mode="rw", ltype="int" }, + [22] = { name="maxmsgsize", otype="INT64", mode="rw", ltype="int64_t" }, + [23] = { name="sndhwm", otype="INT", mode="rw", ltype="int" }, + [24] = { name="rcvhwm", otype="INT", mode="rw", ltype="int" }, + [25] = { name="multicast_hops", otype="INT", mode="rw", ltype="int" }, + [26] = { }, + [27] = { name="rcvtimeo", otype="INT", mode="rw", ltype="int" }, + [28] = { name="sndtimeo", otype="INT", mode="rw", ltype="int" }, + [29] = { }, + [30] = { }, + [31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" }, + -- New to version 4.x + [32] = { name="last_endpoint", otype="BLOB", mode="r", ltype="const char *" }, + [33] = { name="router_mandatory", otype="INT", mode="w", ltype="int" }, + [34] = { name="tcp_keepalive", otype="INT", mode="rw", ltype="int" }, + [35] = { name="tcp_keepalive_cnt", otype="INT", mode="rw", ltype="int" }, + [36] = { name="tcp_keepalive_idle",otype="INT", mode="rw", ltype="int" }, + [37] = { name="tcp_keepalive_intvl",otype="INT", mode="rw", ltype="int" }, + [38] = { name="tcp_accept_filter", otype="BLOB", mode="w", ltype="const char *" }, + [39] = { name="immediate", otype="INT", mode="rw", ltype="int" }, + [40] = { name="xpub_verbose", otype="INT", mode="w", ltype="int" }, + [41] = { name="router_raw", otype="INT", mode="w", ltype="int" }, + [42] = { name="ipv6", otype="INT", mode="rw", ltype="int" }, + [43] = { name="mechanism", otype="INT", mode="r", ltype="int" }, + [44] = { name="plain_server", otype="INT", mode="rw", ltype="int" }, + [45] = { name="plain_username", otype="BLOB", mode="rw", ltype="const char *" }, + [46] = { name="plain_password", otype="BLOB", mode="rw", ltype="const char *" }, + [47] = { name="curve_server", otype="INT", mode="rw", ltype="int" }, + [48] = { name="curve_publickey", otype="BLOB", mode="rw", ltype="const char *" }, + [49] = { name="curve_secretkey", otype="BLOB", mode="rw", ltype="const char *" }, + [50] = { name="curve_serverkey", otype="BLOB", mode="rw", ltype="const char *" }, + [51] = { name="probe_router", otype="INT", mode="w", ltype="int" }, + [52] = { name="req_correlate", otype="INT", mode="w", ltype="int" }, + [53] = { name="req_relaxed", otype="INT", mode="w", ltype="int" }, + [54] = { name="conflate", otype="INT", mode="rw", ltype="int" }, + [55] = { name="zap_domain", otype="BLOB", mode="rw", ltype="const char *" }, + }, +} +local max_options = 60 -- this number must be larger then the highest option value. local function foreach_opt(func) for i=1,#socket_options do @@ -310,7 +389,9 @@ end) endif(last_ver.ver_def) add(opt_types, [[ -#if VERSION_3_0 +#if VERSION_4_0 +# define MAX_OPTS VERSION_4_0_MAX_OPT +#elif VERSION_3_0 # define MAX_OPTS VERSION_3_0_MAX_OPT #else # if VERSION_2_2 @@ -401,8 +482,12 @@ local VERSION_2_0 = true local VERSION_2_1 = false local VERSION_2_2 = false local VERSION_3_0 = false +local VERSION_4_0 = false local zver = _M.version() if zver[1] == 3 then + VERSION_2_0 = false + VERSION_4_0 = true +elseif zver[1] == 3 then VERSION_2_0 = false VERSION_3_0 = true elseif zver[1] == 2 and zver[2] == 2 then @@ -464,14 +549,14 @@ static const int opt_types[] = { c_method_call "ZMQ_Error" "zmq_bind" { "const char *", "addr" } }, method "unbind" { - if_defs = { "VERSION_3_2" }, + if_defs = { "VERSION_3_2", "VERSION_4_0" }, c_method_call "ZMQ_Error" "zmq_unbind" { "const char *", "addr" } }, method "connect" { c_method_call "ZMQ_Error" "zmq_connect" { "const char *", "addr" } }, method "disconnect" { - if_defs = { "VERSION_3_2" }, + if_defs = { "VERSION_3_2", "VERSION_4_0" }, c_method_call "ZMQ_Error" "zmq_disconnect" { "const char *", "addr" } }, ffi_cdef[[ @@ -512,7 +597,7 @@ end size_t val_len; const void *val; -#if defined(VERSION_2_1) || defined(VERSION_3_0) +#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0 socket_t fd_val; #endif int int_val; @@ -520,7 +605,7 @@ end uint64_t uint64_val; int64_t int64_val; -#if VERSION_3_0 +#if VERSION_3_0 || VERSION_4_0 /* 3.0 backwards compatibility support for HWM. */ if(${opt} == ZMQ_HWM) { int_val = luaL_checklong(L, ${val::idx}); @@ -539,7 +624,7 @@ end } switch(opt_types[${opt}]) { -#if defined(VERSION_2_1) || defined(VERSION_3_0) +#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0 case OPT_TYPE_FD: fd_val = luaL_checklong(L, ${val::idx}); val = &fd_val; @@ -596,7 +681,7 @@ local tmp_val_len = ffi.new('size_t[1]', 4) c_source[[ size_t val_len; -#if defined(VERSION_2_1) || defined(VERSION_3_0) +#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0 socket_t fd_val; #endif int int_val; @@ -613,7 +698,7 @@ local tmp_val_len = ffi.new('size_t[1]', 4) } switch(opt_types[${opt}]) { -#if defined(VERSION_2_1) || defined(VERSION_3_0) +#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0 case OPT_TYPE_FD: val_len = sizeof(fd_val); ${err} = zmq_getsockopt(${this}, ${opt}, &fd_val, &val_len); @@ -758,6 +843,198 @@ local tmp_msg = ffi.new('zmq_msg_t') ]], }, + -- + -- Monitor socket. + -- + method "monitor" { + if_defs = { "VERSION_3_2", "VERSION_4_0" }, + c_method_call "ZMQ_Error" "zmq_socket_monitor" { "const char *", "addr", "int", "events" } + }, + c_source[[ +typedef struct ZMQ_recv_event { + int event_id; + int value; + const char *addr; + size_t addr_len; + const char *err; +} ZMQ_recv_event; + +int monitor_recv_event(ZMQ_Socket *s, zmq_msg_t *msg, int flags, ZMQ_recv_event *ev) +{ + int rc ; + zmq_event_t event; + + ev->event_id = 0; + ev->value = 0; + ev->addr = NULL; + ev->err = NULL; + ev->addr_len = 0; + zmq_msg_init(msg); + + /* recv binary event. */ + rc = zmq_recvmsg(s, msg, flags); + if(rc < 0) { + return rc; + } +#if ZMQ_VERSION_MAJOR == 3 + if(zmq_msg_size(msg) != sizeof(event)) { + ev->err = "Invalid monitor event. Wrong event size."; + return -1; + } + memcpy(&event, zmq_msg_data(msg), sizeof(event)); + ev->event_id = event.event; + + switch(event.event) { + case ZMQ_EVENT_CONNECTED: + ev->value = event.data.connected.fd; + ev->addr = event.data.connected.addr; + break; + case ZMQ_EVENT_CONNECT_DELAYED: + ev->value = event.data.connect_delayed.err; + ev->addr = event.data.connect_delayed.addr; + break; + case ZMQ_EVENT_CONNECT_RETRIED: + ev->value = event.data.connect_retried.interval; + ev->addr = event.data.connect_retried.addr; + break; + case ZMQ_EVENT_LISTENING: + ev->value = event.data.listening.fd; + ev->addr = event.data.listening.addr; + break; + case ZMQ_EVENT_BIND_FAILED: + ev->value = event.data.bind_failed.err; + ev->addr = event.data.bind_failed.addr; + break; + case ZMQ_EVENT_ACCEPTED: + ev->value = event.data.accepted.fd; + ev->addr = event.data.accepted.addr; + break; + case ZMQ_EVENT_ACCEPT_FAILED: + ev->value = event.data.accept_failed.err; + ev->addr = event.data.accept_failed.addr; + break; + case ZMQ_EVENT_CLOSED: + ev->value = event.data.closed.fd; + ev->addr = event.data.closed.addr; + break; + case ZMQ_EVENT_CLOSE_FAILED: + ev->value = event.data.close_failed.err; + ev->addr = event.data.close_failed.addr; + break; + case ZMQ_EVENT_DISCONNECTED: + ev->value = event.data.disconnected.fd; + ev->addr = event.data.disconnected.addr; + break; + } + if(ev->addr) { + ev->addr_len = strlen(ev->addr); + } + + if(zmq_msg_more(msg) != 0) { + ev->err = "Invalid monitor event. Has too many parts."; + return -1; + } +#else + if(zmq_msg_size(msg) != (sizeof(event.event) + sizeof(event.value))) { + ev->err = "Invalid monitor event. Wrong event size."; + return -1; + } + /* copy binary data to event struct */ + const char* data = (char*)zmq_msg_data(msg); + memcpy(&(event.event), data, sizeof(event.event)); + memcpy(&(event.value), data+sizeof(event.event), sizeof(event.value)); + ev->event_id = event.event; + ev->value = event.value; + + if(zmq_msg_more(msg) == 0) { + ev->err = "Invalid monitor event. Missing address part."; + return -1; + } + ev->value = event.value; + + /* recv address part */ + rc = zmq_recvmsg(s, msg, flags); + if(rc < 0) { + return rc; + } + if(zmq_msg_more(msg) != 0) { + ev->err = "Invalid monitor event. Has too many parts."; + return -1; + } + /* copy address part */ + ev->addr_len = zmq_msg_size(msg) ; + ev->addr = zmq_msg_data(msg); +#endif + + return 1; +} + +]], + ffi_cdef[[ +typedef struct ZMQ_recv_event { + int event_id; + int value; + const char *addr; + size_t addr_len; + const char *err; +} ZMQ_recv_event; + +int monitor_recv_event(ZMQ_Socket *s, zmq_msg_t *msg, int flags, ZMQ_recv_event *ev); +]], + ffi_source[[ +local tmp_recv_event = ffi.new('ZMQ_recv_event') +]], + method "recv_event" { + if_defs = { "VERSION_3_2", "VERSION_4_0" }, + var_in{ "int", "flags?" }, + var_out{ "int", "event_id" }, + var_out{ "int", "value" }, + var_out{ "const char *", "addr", has_length = true }, + var_out{ "ZMQ_Error", "err" }, + c_source[[ + zmq_msg_t msg; + ZMQ_recv_event event; + + /* receive monitor event */ + ${err} = monitor_recv_event(${this}, &msg, ${flags}, &event); + if(${err} >= 0) { + ${event_id} = event.event_id; + ${value} = event.value; + ${addr} = event.addr; + ${addr_len} = event.addr_len; //${err}; + } else if(event.err != NULL) { + /* error parsing monitor event. */ + lua_pushnil(L); + lua_pushstring(L, event.err); + return 2; + } +]], + c_source "post" [[ + /* close message */ + zmq_msg_close(&msg); +]], + ffi_source[[ + local msg = tmp_msg + local event = tmp_recv_event + local addr + + -- receive monitor event + ${err} = Cmod.monitor_recv_event(${this}, msg, ${flags}, event) + if ${err} >= 0 then + addr = ffi.string(event.addr, event.addr_len) + -- close message + C.zmq_msg_close(msg) + return event.event_id, event.value, addr + end + -- close message + C.zmq_msg_close(msg) + if event.err ~= nil then + -- error parsing monitor event. + return nil, ffi.string(event.err) + end +]], + }, + -- build option set/get methods. THIS MUST BE LAST. build_option_methods(), } diff --git a/zmq.nobj.lua b/zmq.nobj.lua index e46193c..b3e17c8 100644 --- a/zmq.nobj.lua +++ b/zmq.nobj.lua @@ -45,6 +45,7 @@ c_source "typedefs" [[ #define VERSION_2_2 0 #define VERSION_3_0 0 #define VERSION_3_2 0 +#define VERSION_4_0 0 #if defined(ZMQ_VERSION_MAJOR) # if (ZMQ_VERSION_MAJOR == 2) && (ZMQ_VERSION_MINOR == 2) # undef VERSION_2_2 @@ -78,6 +79,16 @@ c_source "typedefs" [[ # undef VERSION_3_0 # define VERSION_3_0 1 # endif +# if (ZMQ_VERSION_MAJOR == 4) +# undef VERSION_2_0 +# define VERSION_2_0 0 +# undef VERSION_3_2 +# define VERSION_3_2 0 +# undef VERSION_3_0 +# define VERSION_3_0 0 +# undef VERSION_4_0 +# define VERSION_4_0 1 +# endif #endif /* make sure ZMQ_DONTWAIT & ZMQ_NOBLOCK are both defined. */ @@ -104,7 +115,7 @@ c_source "typedefs" [[ #if VERSION_2_0 # define ZMQ_POLL_MSEC 1000 // zmq_poll is usec -#elif VERSION_3_0 +#elif VERSION_3_0 || VERSION_4_0 # define ZMQ_POLL_MSEC 1 // zmq_poll is msec # ifndef ZMQ_HWM # define ZMQ_HWM 1 // backwards compatibility @@ -119,7 +130,7 @@ export_definitions { MAX_VSM_SIZE = "ZMQ_MAX_VSM_SIZE", -- context settings -MAX_SOCKETS = "ZMQ_MAX_SOCKETS", +MAX_SOCKETS = "ZMQ_MAX_SOCKETS", IO_THREADS = "ZMQ_IO_THREADS", -- message types @@ -176,6 +187,30 @@ MULTICAST_HOPS = "ZMQ_MULTICAST_HOPS", RCVTIMEO = "ZMQ_RCVTIMEO", SNDTIMEO = "ZMQ_SNDTIMEO", RCVLABEL = "ZMQ_RCVLABEL", +LAST_ENDPOINT = "ZMQ_LAST_ENDPOINT", +ROUTER_MANDATORY = "ZMQ_ROUTER_MANDATORY", +TCP_KEEPALIVE = "ZMQ_TCP_KEEPALIVE", +TCP_KEEPALIVE_CNT = "ZMQ_TCP_KEEPALIVE_CNT", +TCP_KEEPALIVE_IDLE= "ZMQ_TCP_KEEPALIVE_IDLE", +TCP_KEEPALIVE_INTVL= "ZMQ_TCP_KEEPALIVE_INTVL", +TCP_ACCEPT_FILTER = "ZMQ_TCP_ACCEPT_FILTER", +IMMEDIATE = "ZMQ_IMMEDIATE", +XPUB_VERBOSE = "ZMQ_XPUB_VERBOSE", +ROUTER_RAW = "ZMQ_ROUTER_RAW", +IPV6 = "ZMQ_IPV6", +MECHANISM = "ZMQ_MECHANISM", +PLAIN_SERVER = "ZMQ_PLAIN_SERVER", +PLAIN_USERNAME = "ZMQ_PLAIN_USERNAME", +PLAIN_PASSWORD = "ZMQ_PLAIN_PASSWORD", +CURVE_SERVER = "ZMQ_CURVE_SERVER", +CURVE_PUBLICKEY = "ZMQ_CURVE_PUBLICKEY", +CURVE_SECRETKEY = "ZMQ_CURVE_SECRETKEY", +CURVE_SERVERKEY = "ZMQ_CURVE_SERVERKEY", +PROBE_ROUTER = "ZMQ_PROBE_ROUTER", +REQ_CORRELATE = "ZMQ_REQ_CORRELATE", +REQ_RELAXED = "ZMQ_REQ_RELAXED", +CONFLATE = "ZMQ_CONFLATE", +ZAP_DOMAIN = "ZMQ_ZAP_DOMAIN", -- send/recv flags NOBLOCK = "ZMQ_NOBLOCK", @@ -183,6 +218,11 @@ DONTWAIT = "ZMQ_DONTWAIT", SNDMORE = "ZMQ_SNDMORE", SNDLABEL = "ZMQ_SNDLABEL", +-- Security mechanisms +NULL = "ZMQ_NULL", +PLAIN = "ZMQ_PLAIN", +CURVE = "ZMQ_CURVE", + -- poll events POLLIN = "ZMQ_POLLIN", POLLOUT = "ZMQ_POLLOUT", @@ -191,13 +231,30 @@ POLLERR = "ZMQ_POLLERR", -- poll milliseconds. POLL_MSEC = "ZMQ_POLL_MSEC", +-- Socket Monitor events. +EVENT_CONNECTED = "ZMQ_EVENT_CONNECTED", +EVENT_CONNECT_DELAYED = "ZMQ_EVENT_CONNECT_DELAYED", +EVENT_CONNECT_RETRIED = "ZMQ_EVENT_CONNECT_RETRIED", + +EVENT_LISTENING = "ZMQ_EVENT_LISTENING", +EVENT_BIND_FAILED = "ZMQ_EVENT_BIND_FAILED", + +EVENT_ACCEPTED = "ZMQ_EVENT_ACCEPTED", +EVENT_ACCEPT_FAILED= "ZMQ_EVENT_ACCEPT_FAILED", + +EVENT_CLOSED = "ZMQ_EVENT_CLOSED", +EVENT_CLOSE_FAILED= "ZMQ_EVENT_CLOSE_FAILED", +EVENT_DISCONNECTED= "ZMQ_EVENT_DISCONNECTED", +EVENT_MONITOR_STOPPED = "ZMQ_EVENT_MONITOR_STOPPED", + +EVENT_ALL = "ZMQ_EVENT_ALL", + -- devices STREAMER = "ZMQ_STREAMER", FORWARDER = "ZMQ_FORWARDER", QUEUE = "ZMQ_QUEUE", }, - subfiles { "src/error.nobj.lua", "src/msg.nobj.lua",