Compare commits

..

No commits in common. 'master' and 'zmq3.0' have entirely different histories.

@ -1,68 +0,0 @@
language: c
env:
matrix:
- LUA=lua5.1 LIBLUA=liblua5.1-dev LUA_INCDIR=/usr/include/lua5.1 LUA_LIB=lua5.1
- LUA=lua5.2 LIBLUA=liblua5.2-dev LUA_INCDIR=/usr/include/lua5.2 LUA_LIB=lua5.2
- LUA=luajit LIBLUA=libluajit-5.1-dev LUA_INCDIR=/usr/include/luajit-2.0 LUA_LIB=luajit-5.1
branches:
only:
- master
compiler:
- gcc
before_install:
- if [ $LUA = "luajit" ]; then
sudo add-apt-repository ppa:mwild1/ppa -y && sudo apt-get update -y;
fi
install:
- sudo apt-get install libzmq3-dev -y
- sudo apt-get install $LUA -y
- sudo apt-get install $LIBLUA -y
- LUA_LIBDIR=`pkg-config $LUA --variable=libdir`
- INSTALL_LMOD=`pkg-config $LUA --variable=INSTALL_LMOD`
- INSTALL_CMOD=`pkg-config $LUA --variable=INSTALL_CMOD`
## make sure there is a 'lua' command.
- if [ ! -x /usr/bin/lua ]; then
sudo ln -s `which $LUA` /usr/bin/lua;
fi
## install lua-llthreads
- git clone git://github.com/Neopallium/lua-llthreads.git
- cd lua-llthreads ; mkdir build ; cd build
- cmake .. -DLUA_LIBRARIES=$LUA_LIBDIR -DLUA_INCLUDE_DIR=$LUA_INCDIR
-DINSTALL_LMOD=$INSTALL_LMOD -DINSTALL_CMOD=$INSTALL_CMOD
- make
- sudo make install
- cd ../..
script:
#### build using pre-generated bindings.
- mkdir build; cd build
- cmake .. -DLUA_LIBRARIES=$LUA_LIBDIR -DLUA_INCLUDE_DIR=$LUA_INCDIR
-DINSTALL_LMOD=$INSTALL_LMOD -DINSTALL_CMOD=$INSTALL_CMOD
- make
- sudo make install
# Run tests.
- $LUA ../tests/test_inproc.lua
- $LUA ../perf/thread_lat.lua 1 1000
- cd .. ; rm -rf build
#### Re-Generate bindings.
- git clone git://github.com/Neopallium/LuaNativeObjects.git;
- mkdir build; cd build
- cmake .. -DLUA_LIBRARIES=$LUA_LIBDIR -DLUA_INCLUDE_DIR=$LUA_INCDIR
-DLUA_NATIVE_OBJECTS_PATH=$TRAVIS_BUILD_DIR/LuaNativeObjects
-DUSE_PRE_GENERATED_BINDINGS=OFF -DGENERATE_LUADOCS=OFF
-DINSTALL_LMOD=$INSTALL_LMOD -DINSTALL_CMOD=$INSTALL_CMOD
- make
- sudo make install
# Run tests.
- $LUA ../tests/test_inproc.lua
- $LUA ../perf/thread_lat.lua 1 1000
notifications:
email:
on_failure: always
on_success: change

@ -147,14 +147,6 @@ See [zmq_msg_copy(3)](http://api.zeromq.org/zmq_msg_copy.html).
msg1:copy(msg2) -- copy contents from msg2 -> msg1
## set_size(size)
Re-initialize the message with a new size. The current contents will be lost.
See [zmq_msg_init_size(3)](http://api.zeromq.org/zmq_msg_init_size.html).
msg:set_size(size) -- re-initialize message if size is different from current size.
local buf = msg:data() -- get buffer to fill message with new contents.
## set_data(data)
Change the message contents.
@ -164,7 +156,7 @@ See [zmq_msg_data(3)](http://api.zeromq.org/zmq_msg_data.html).
## data()
Get a lightuserdata pointer to the message contents.
Get the message contents.
See [zmq_msg_data(3)](http://api.zeromq.org/zmq_msg_data.html).
local data = msg:data() -- get the message contents
@ -226,14 +218,10 @@ Remove a socket/fd from the poller.
## poll(timeout)
Wait `timeout` milliseconds [1] for events on the registered sockets (timeout = -1, means
Wait `timeout` microseconds for events on the registered sockets (timeout = -1, means
wait indefinitely). If any events happen, then those events are dispatched.
poller:poll(1000) -- wait 1 second for events.
[1] For zmq 2.x `timeout` is in microseconds. For versions 3.x or higher `timeout` will be in milliseconds.
poller:poll(1000 * zmq.POLL_MSEC) -- backwards/forwards compatible
poller:poll(1000000) -- wait 1 second for events.
## start()

@ -1,7 +1,7 @@
#
# Lua bindings for 0MQ
#
cmake_minimum_required(VERSION 3.18)
cmake_minimum_required(VERSION 2.8)
project(lua-zmq C)
@ -9,10 +9,14 @@ set(BUILD_SHARED_LIBS TRUE)
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
set(INSTALL_LMOD ${CMAKE_INSTALL_PREFIX}/share/lua/ CACHE PATH
set(INSTALL_LMOD ${CMAKE_INSTALL_PREFIX}/share/lua/5.1 CACHE PATH
"Directory to install Lua source modules (configure lua via LUA_PATH)")
set(INSTALL_CMOD ${CMAKE_INSTALL_PREFIX}/lib/lua/ CACHE PATH
set(INSTALL_CMOD ${CMAKE_INSTALL_PREFIX}/lib/lua/5.1 CACHE PATH
"Directory to install Lua binary modules (configure lua via LUA_CPATH)")
set(LUA_NATIVE_OBJECTS_PATH ../LuaNativeObjects CACHE PATH
"Directory to LuaNativeObjects bindings generator.")
set(USE_PRE_GENERATED_BINDINGS TRUE CACHE BOOL
"Set this to FALSE to re-generate bindings using LuaNativeObjects")
set(ZMQ_PATH "" CACHE PATH
"Directory to libzmq. (by default use pkg-config to detect path)")
@ -20,19 +24,10 @@ set(COMMON_CFLAGS "${CFLAGS}")
set(COMMON_LDFLAGS)
set(COMMON_LIBS)
## Lua 5.x
include(FindLua)
if(NOT ${LUA_FOUND})
message(FATAL_ERROR "The FindLua module could not find lua :-(")
endif()
set(COMMON_LIBS "${COMMON_LIBS};${LUA_LIBRARIES}")
if(WIN32)
set(COMMON_CFLAGS "${COMMON_CFLAGS} -I${LUA_INCLUDE_DIR}")
set(COMMON_LDFLAGS "${COMMON_LDFLAGS} ${LUA_LIBRARY}")
if(NOT MSVC)
set(COMMON_LDFLAGS "${COMMON_LDFLAGS} -Wl,--export-all-symbols")
endif()
## Lua 5.1.x
include(FindLua51)
if(NOT ${LUA51_FOUND})
message(FATAL_ERROR "The FindLua51 module could not find lua :-(")
endif()
## MAC OSX needs extra linker flags
if(APPLE)
@ -49,13 +44,8 @@ if(WIN32)
endif()
if(IS_DIRECTORY ${ZMQ_PATH})
set(COMMON_CFLAGS "${COMMON_CFLAGS} -I${ZMQ_PATH}/include")
if(MSVC)
set(COMMON_LIBS "${COMMON_LIBS};libzmq")
else()
set(COMMON_LDFLAGS "${COMMON_LDFLAGS} -L${ZMQ_PATH}/lib")
set(COMMON_LIBS "${COMMON_LIBS};zmq")
endif()
link_directories(${ZMQ_PATH}/lib)
else()
## fallback to using pkg-config
include(FindPkgConfig)
@ -73,6 +63,8 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
${LUA_INCLUDE_DIR})
link_directories(${ZMQ_LIBRARY_DIRS})
## LuaZMQ
set(LUA_ZMQ_SRC
zmq.nobj.lua

@ -1,15 +1,8 @@
About
=====
[![travis-ci status](https://secure.travis-ci.org/Neopallium/lua-zmq.png?branch=master)](http://travis-ci.org/Neopallium/lua-zmq/builds)
Lua bindings to zeromq2. Check out the [ZeroMQ Guide with Lua examples](http://zguide.zeromq.org/lua:all).
Windows
=======
Download a compiled version of [LuaJIT 2.0.0-beta11 + lua-zmq + zeromq2.2.0](https://github.com/downloads/Neopallium/lua-zmq/luajit2.0_beta11_zmq2.2_llthreads.zip) 32bit & 64bit.
API
===
@ -19,7 +12,7 @@ See [API.md](https://github.com/Neopallium/lua-zmq/blob/master/API.md) and
Requirements
============
* ZeroMQ version 2.1, 2.2 or 3.2.
* ZeroMQ version 2.1.x.
* Might work with some 2.0.x versions (2.0.6 and lower are not supported).
For Ubuntu 10.10 users:
@ -52,12 +45,12 @@ Latest Git revision
With LuaRocks 2.0.4.1:
$ sudo luarocks install https://raw.github.com/Neopallium/lua-zmq/master/rockspecs/lua-zmq-scm-1.rockspec
$ sudo luarocks install https://github.com/Neopallium/lua-zmq/raw/master/rockspecs/lua-zmq-scm-1.rockspec
For threads support:
$ sudo luarocks install https://raw.github.com/Neopallium/lua-llthreads/master/rockspecs/lua-llthreads-scm-0.rockspec
$ sudo luarocks install https://raw.github.com/Neopallium/lua-zmq/master/rockspecs/lua-zmq-threads-scm-0.rockspec
$ sudo luarocks install https://github.com/Neopallium/lua-llthreads/raw/master/rockspecs/lua-llthreads-scm-0.rockspec
$ sudo luarocks install https://github.com/Neopallium/lua-zmq/raw/master/rockspecs/lua-zmq-threads-scm-0.rockspec
With CMake:

@ -1,13 +0,0 @@
To re-generating the bindings
-----------------------------
You will need to install LuaNativeObjects and set the CMake variable `USE_PRE_GENERATED_BINDINGS` to FALSE.
By default CMake will use the pre-generated bindings that are include in the project.
Build Dependencies
------------------
Optional dependency for re-generating Lua bindings from `*.nobj.lua` files:
* [LuaNativeObjects](https://github.com/Neopallium/LuaNativeObjects), this is the bindings generator used to convert the `*.nobj.lua` files into a native Lua module.

@ -1,39 +1,19 @@
#
# Lua Native Objects
#
find_program(LUA_NATIVE_OBJECTS_EXECUTABLE native_objects.lua
PATHS ${CMAKE_SOURCE_DIR}/../LuaNativeObjects
DOC "LuaNativeObjects executable path")
set(USE_PRE_GENERATED_BINDINGS TRUE CACHE BOOL
"Set this to FALSE to re-generate bindings using LuaNativeObjects")
set(GENERATE_LUADOCS TRUE CACHE BOOL
"Set this to FALSE to avoid generation of docs using LuaDoc")
macro(GenLuaNativeObjects _src_files_var)
set(_new_src_files)
foreach(_src_file ${${_src_files_var}})
if(_src_file MATCHES ".nobj.lua")
string(REGEX REPLACE ".nobj.lua" ".nobj.c" _src_file_out ${_src_file})
string(REGEX REPLACE ".nobj.lua" ".nobj.ffi.lua" _ffi_file_out ${_src_file})
add_custom_command(OUTPUT ${_src_file_out} ${_ffi_file_out}
COMMAND ${LUA_NATIVE_OBJECTS_EXECUTABLE} -outpath ${CMAKE_CURRENT_BINARY_DIR} -gen lua ${_src_file}
string(REGEX REPLACE ".nobj.lua" ".nobj.h" _header_file_out ${_src_file})
add_custom_command(OUTPUT ${_src_file_out} ${_header_file_out}
COMMAND lua ${LUA_NATIVE_OBJECTS_PATH}/native_objects.lua -outpath ${CMAKE_CURRENT_BINARY_DIR} -gen lua ${_src_file}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS ${_src_file}
)
set_source_files_properties(${_src_file_out} PROPERTIES GENERATED TRUE)
set_source_files_properties(${_ffi_file_out} PROPERTIES GENERATED TRUE)
if (${GENERATE_LUADOCS})
string(REGEX REPLACE ".nobj.lua" "" _doc_base ${_src_file})
string(REGEX REPLACE ".nobj.lua" ".luadoc" _doc_file_out ${_src_file})
add_custom_target(${_doc_file_out} ALL
COMMAND ${LUA_NATIVE_OBJECTS_EXECUTABLE} -outpath docs -gen luadoc ${_src_file}
COMMAND luadoc -nofiles -d docs docs
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS ${_src_file}
)
endif()
set_source_files_properties(${_doc_file_out} PROPERTIES GENERATED TRUE)
set_source_files_properties(${_header_file_out} PROPERTIES GENERATED TRUE)
set(_new_src_files ${_new_src_files} ${_src_file_out})
else(_src_file MATCHES ".nobj.lua")
set(_new_src_files ${_new_src_files} ${_src_file})

@ -18,24 +18,15 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
require("zmq")
local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local ctx = zmq.init(1)
local s = ctx:socket(zmq.REQ)
s:connect("tcp://localhost:5555")
for i=1,N do
s:send("SELECT * FROM mytable")
local data, err = s:recv()
if data then
print(data)
else
print("s:recv() error:", err)
end
end
print(s:recv())
s:close()
ctx:term()

@ -18,26 +18,17 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
require("zmq")
local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local ctx = zmq.init(1)
local s = ctx:socket(zmq.REQ)
s:connect("tcp://localhost:5555")
for i=1,N do
s:send("SELECT * FROM mytable ", zmq.SNDMORE)
s:send("WHERE library = 'zmq'")
local data, err = s:recv()
if data then
print(data)
else
print("s:recv() error:", err)
end
end
print(s:recv())
s:close()
ctx:term()

@ -1,108 +0,0 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local poller = require"examples.poller"
local poll = poller.new()
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local s = ctx:socket(zmq.REQ)
local s_FD = s:getopt(zmq.FD)
s:connect("tcp://localhost:5555")
-- current socket state
local blocked_state
local blocked_event
local on_sock_recv
local on_sock_send
-- IO event callback when socket was blocked
local function on_sock_io()
local events = s:getopt(z_EVENTS)
local unblocked = false
if events == blocked_event then
-- got the event the socket was blocked on.
unblocked = true
elseif events == z_POLLIN_OUT then
-- got both in & out events
unblocked = true
end
if unblocked then
-- got the event we are blocked on resume.
blocked_event = nil
blocked_state()
-- check if blocked event was processed.
if not blocked_event then
poll:remove_read(s_FD)
end
end
end
local function sock_blocked(state, event)
if not blocked_event then
-- need to register socket's fd with event loop
poll:add_read(s_FD, on_sock_io)
end
blocked_state = state
blocked_event = event
end
-- sock state functions
function on_sock_send()
N = N - 1
if N == 0 then
return poll:stop()
end
local sent, err = s:send("SELECT * FROM mytable", z_NOBLOCK)
if not sent then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_send, z_POLLOUT)
end
-- yield back to event loop
poll:add_work(on_sock_recv)
end
function on_sock_recv()
local data, err = s:recv(z_NOBLOCK)
if not data then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_recv, z_POLLIN)
end
print(data)
return on_sock_send()
end
-- start processing of the socket.
poll:add_work(on_sock_send)
-- start event loop
poll:start()
s:close()
ctx:term()

@ -1,4 +1,4 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
-- Copyright (c) 2010 Aleksey Yeschenko <aleksey@yeschenko.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
@ -18,12 +18,7 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
require("zmq")
local ev = require'ev'
local loop = ev.Loop.default
@ -56,43 +51,27 @@ local function sub_worker(loop, ctx, msg_cb)
-- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered
local s_io_idle
local s_io_read
local max_recvs = 10
local function s_recv(recv_cnt)
local msg, err = s:recv(z_NOBLOCK)
if err == 'timeout' then
-- need to block on read IO
return false
end
self:msg_cb(msg)
if recv_cnt > 1 then
return s_recv(recv_cnt - 1)
end
return true
end
s_io_idle = ev.Idle.new(function()
if not s_recv(max_recvs) then
local msg, err = s:recv(zmq.NOBLOCK)
if err == 'timeout' then
-- need to block on read IO
s_io_idle:stop(loop)
s_io_read:start(loop)
return
end
self:msg_cb(msg)
end)
s_io_idle:start(loop)
s_io_read = ev.IO.new(function()
local events = s:getopt(z_EVENTS)
if events == z_POLLIN or events == z_POLLIN_OUT then
if s_recv(max_recvs) then
-- read IO is not block, enable idle watcher to handle reads.
s_io_idle:start(loop)
s_io_read:stop(loop)
end
end
end, s:getopt(zmq.FD), ev.READ)
self.s_io_idle = s_io_idle
self.s_io_read = s_io_read
return self
end
local ctx = zmq.init()
local ctx = zmq.init(1)
-- message handling function.
local function handle_msg(worker, msg)

@ -1,39 +0,0 @@
local zmq = require'zmq'
local poller = require"examples.poller"
local poll_zsock = require"examples.poll_zsock"
local poll = poller.new()
poll_zsock.set_poller(poll)
local c = zmq.init(1)
local xreq = poll_zsock(c:socket(zmq.XREQ))
xreq:bind('tcp://127.0.0.1:13333')
local xrep = poll_zsock(c:socket(zmq.XREP))
xrep:bind('tcp://127.0.0.1:13334')
local max_recv = 10
local function forward_io(src,dst)
src.on_data = function()
for i=1,max_recv do
repeat
local data, err = src:recv(zmq.NOBLOCK)
if not data then
if err == 'timeout' then
return
else
error("socket recv error:" .. err)
end
end
local more = src:getopt(zmq.RCVMORE) > 0
dst:send(data,more and zmq.SNDMORE or 0)
until not more
end
end
end
forward_io(xrep,xreq)
forward_io(xreq,xrep)
poll:start()

@ -1,177 +0,0 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local poll
local meths = {}
local zsock_mt = { __index=meths }
local function zsock_check_events(self)
if not self.check_enabled then
-- enable 'on_work' callback to handle checking for socket events.
self.check_enabled = true
poll:add_work(self.on_work)
end
end
function meths:events()
zsock_check_events(self)
return self.sock:events()
end
function meths:getopt(opt)
if (opt == z_EVENTS) then
zsock_check_events(self)
end
return self.sock:getopt(opt)
end
function meths:setopt(opt,val)
return self.sock:setopt(opt,val)
end
function meths:sub(topic)
return self.sock:sub(topic)
end
function meths:unsub(topic)
return self.sock:unsub(topic)
end
function meths:identity(id)
return self.sock:identity(id)
end
function meths:bind(addr)
return self.sock:bind(addr)
end
function meths:connect(addr)
return self.sock:connect(addr)
end
function meths:close()
return self.sock:close()
end
function meths:send(msg, flags)
zsock_check_events(self)
local sent, err = self.sock:send(msg, flags)
if not sent and err == 'timeout' then
self.send_blocked = true
end
return sent, err
end
function meths:send_msg(msg, flags)
zsock_check_events(self)
local sent, err = self.sock:send_msg(msg, flags)
if not sent and err == 'timeout' then
self.send_blocked = true
end
return sent, err
end
function meths:recv(flags)
zsock_check_events(self)
local msg, err = self.sock:recv(flags)
if not msg and err == 'timeout' then
self.recv_blocked = true
end
return msg, err
end
function meths:recv_msg(msg, flags)
zsock_check_events(self)
local stat, err = self.sock:recv_msg(msg, flags)
if not stat and err == 'timeout' then
self.recv_blocked = true
end
return stat, err
end
local function nil_cb()
end
local function wrap_zsock(sock, on_data, on_drain)
local self = setmetatable({
sock = sock,
on_data = on_data or nil_cb,
on_drain = on_drain or nil_cb,
recv_blocked = false,
send_blocked = false,
check_enabled = false,
}, zsock_mt)
local function on_work()
self.check_enabled = false
local events = sock:events()
local read = false
local write = false
if events == z_POLLIN_OUT then
read = true
write = true
elseif events == z_POLLIN then
read = true
elseif events == z_POLLOUT then
write = true
else
return
end
if read then
self.recv_blocked = false
self:on_data(sock)
-- there might be more messages to read.
if not self.recv_blocked then
zsock_check_events(self)
end
end
if write and self.send_blocked then
self:on_drain(sock)
end
end
self.on_work = on_work
-- listen for read events to enable socket.
poll:add_read(sock:fd(), function()
on_work()
end)
zsock_check_events(self)
return self
end
return setmetatable({
set_poller = function(poller)
local old = poll
poll = poller
return old
end,
wrap_zsock = wrap_zsock,
}, { __call = function(tab, ...) return wrap_zsock(...) end})

@ -1,45 +0,0 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
-- safe require.
local require = require
local function safe_require(...)
return pcall(require, ...)
end
local mod_name = ...
local backends = {
"epoll",
"ev",
}
for i=1,#backends do
local backend = backends[i]
local name = mod_name .. '.' .. backend
local status, mod = safe_require(name)
if status then
--print("Loaded backend:", name)
return mod
end
end
error("Failed to load backend for: " .. mod_name)

@ -1,121 +0,0 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local epoll = require"epoll"
local EPOLLIN = epoll.EPOLLIN
local EPOLLOUT = epoll.EPOLLOUT
local poller_meths = {}
local poller_mt = {__index = poller_meths}
local function poller_new()
local reads = {}
-- create closure for epoll io_event callback.
local function do_io_event(fd, ev)
local cb = reads[fd]
return cb(fd, ev)
end
return setmetatable({
work_cur = {},
work_last = {},
reads = reads,
io_events = 0,
do_io_event = do_io_event,
poller = epoll.new(),
}, poller_mt)
end
function poller_meths:add_work(task)
-- add task to current work queue.
self.work_cur[#self.work_cur + 1] = task
end
function poller_meths:add_read(fd, cb)
-- make sure read event hasn't been registered yet.
if not self.reads[fd] then
self.io_events = self.io_events + 1
self.reads[fd] = cb
return self.poller:add(fd, EPOLLIN, fd)
else
-- update read callback?
self.reads[fd] = cb
end
end
function poller_meths:remove_read(fd)
-- make sure there was a read event registered.
if self.reads[fd] then
self.io_events = self.io_events - 1
self.reads[fd] = nil
return self.poller:del(fd)
end
end
local function poller_do_work(self)
local tasks = #self.work_cur
-- check if there is any work
if tasks > 0 then
-- swap work queues.
local last, cur = self.work_cur, self.work_last
self.work_cur, self.work_last = cur, last
for i=1,tasks do
local task = last[i]
last[i] = nil
task()
end
-- return new work queue length.
return #cur
end
return tasks
end
function poller_meths:start()
local do_io_event = self.do_io_event
local poller = self.poller
self.is_running = true
while self.is_running do
-- run work task
local new_work = poller_do_work(self)
-- wait == 0, if there is work to do, else wait == -1
local wait = (new_work > 0) and 0 or -1
-- poll for fd events, if there are events to poll for.
--print("poller:step()", new_work, self.io_events)
if self.io_events > 0 then
assert(poller:wait_callback(do_io_event, wait))
else
-- no io events to poll, do we still have work?
if #self.work_cur == 0 then
-- nothing to do, exit event loop
self.is_running = false
return
end
end
end
end
function poller_meths:stop()
self.is_running = false
end
-- module only exports a 'new' function.
return {
new = poller_new,
}

@ -1,119 +0,0 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local ev = require'ev'
local ev_READ = ev.READ
local ev_WRITE = ev.WRITE
local loop = ev.Loop.default
assert(ev.Idle,"Need version > 1.3 of lua-ev that supports Idle watchers.")
local poller_meths = {}
local poller_mt = {__index = poller_meths}
local function poller_new()
local self = {
work_cur = {},
work_last = {},
io_events = 0,
reads = {},
idle_enabled = false,
}
self.idle = ev.Idle.new(function()
local tasks = #self.work_cur
-- check if there is any work
if tasks > 0 then
-- swap work queues.
local last, cur = self.work_cur, self.work_last
self.work_cur, self.work_last = cur, last
for i=1,tasks do
local task = last[i]
last[i] = nil
task()
end
-- check if there is more work.
if #cur > 0 then
return -- don't disable idle watcher, when we have work.
end
end
--print("STOP IDLE:", #self.work_cur, #self.work_last)
-- stop idle watcher, no work.
self.idle_enabled = false
self.idle:stop(loop)
end)
-- set priority to max, to make sure the work queue is processed on each loop.
self.idle:priority(ev.MAXPRI)
return setmetatable(self, poller_mt)
end
function poller_meths:add_work(task)
local idx = #self.work_cur + 1
-- add task to current work queue.
self.work_cur[idx] = task
-- make sure the idle watcher is enabled.
if not self.idle_enabled then
self.idle_enabled = true
self.idle:start(loop)
end
end
function poller_meths:add_read(fd, cb)
local io_read = self.reads[fd]
-- make sure read event hasn't been registered yet.
if not io_read then
self.io_events = self.io_events + 1
io_read = ev.IO.new(function()
cb(fd)
end, fd, ev_READ)
self.reads[fd] = io_read
io_read:start(loop)
else
-- update read callback?
io_read:callback(cb)
-- need to re-start watcher?
if not io_read:is_active() then
io_read:start(loop)
end
end
end
function poller_meths:remove_read(fd)
local io_read = self.reads[fd]
-- make sure there was a read event registered.
if io_read then
self.io_events = self.io_events - 1
io_read:stop(loop)
end
end
function poller_meths:start()
return loop:loop()
end
function poller_meths:stop()
return loop:unloop()
end
-- module only exports a 'new' function.
return {
new = poller_new,
}

@ -18,9 +18,9 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
require("zmq")
local ctx = zmq.init()
local ctx = zmq.init(1)
local s = ctx:socket(zmq.PUB)
s:bind("tcp://lo:5555")

@ -1,95 +0,0 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local poller = require"examples.poller"
local poll = poller.new()
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local ctx = zmq.init()
local s = ctx:socket(zmq.PUB)
local s_FD = s:getopt(zmq.FD)
s:bind("tcp://lo:5555")
-- current socket state
local blocked_state
local blocked_event
local on_sock_recv
local on_sock_send
-- IO event callback when socket was blocked
local function on_sock_io()
local events = s:getopt(z_EVENTS)
local unblocked = false
if events == blocked_event then
-- got the event the socket was blocked on.
unblocked = true
elseif events == z_POLLIN_OUT then
-- got both in & out events
unblocked = true
end
if unblocked then
-- got the event we are blocked on resume.
blocked_event = nil
blocked_state()
-- check if blocked event was processed.
if not blocked_event then
poll:remove_read(s_FD)
end
end
end
local function sock_blocked(state, event)
if not blocked_event then
-- need to register socket's fd with event loop
poll:add_read(s_FD, on_sock_io)
end
blocked_state = state
blocked_event = event
end
-- sock state functions
local msg_id = 1
function on_sock_send()
local sent, err = s:send(tostring(msg_id), z_NOBLOCK)
if not sent then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_send, z_POLLOUT)
end
-- message sent, inc. id
msg_id = msg_id + 1
-- yield back to event loop
poll:add_work(on_sock_send)
end
-- start processing of the socket.
poll:add_work(on_sock_send)
-- start event loop
poll:start()
s:close()
ctx:term()

@ -18,9 +18,9 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
require("zmq")
local ctx = zmq.init()
local ctx = zmq.init(1)
local s = ctx:socket(zmq.REP)
s:bind("tcp://lo:5555")

@ -18,9 +18,9 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
require("zmq")
local ctx = zmq.init()
local ctx = zmq.init(1)
local s = ctx:socket(zmq.REP)
s:bind("tcp://lo:5555")

@ -1,102 +0,0 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local poller = require"examples.poller"
local poll = poller.new()
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local ctx = zmq.init()
local s = ctx:socket(zmq.REP)
local s_FD = s:getopt(zmq.FD)
s:bind("tcp://lo:5555")
-- current socket state
local blocked_state
local blocked_event
local on_sock_recv
local on_sock_send
-- IO event callback when socket was blocked
local function on_sock_io()
local events = s:getopt(z_EVENTS)
local unblocked = false
if events == blocked_event then
-- got the event the socket was blocked on.
unblocked = true
elseif events == z_POLLIN_OUT then
-- got both in & out events
unblocked = true
end
if unblocked then
-- got the event we are blocked on resume.
blocked_event = nil
blocked_state()
-- check if blocked event was processed.
if not blocked_event then
poll:remove_read(s_FD)
end
end
end
local function sock_blocked(state, event)
if not blocked_event then
-- need to register socket's fd with event loop
poll:add_read(s_FD, on_sock_io)
end
blocked_state = state
blocked_event = event
end
-- sock state functions
function on_sock_recv()
local data, err = s:recv(z_NOBLOCK)
if not data then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_recv, z_POLLIN)
end
print(string.format("Received query: '%s'", data))
return on_sock_send()
end
function on_sock_send()
local sent, err = s:send("OK", z_NOBLOCK)
if not sent then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_send, z_POLLOUT)
end
-- yield back to event loop
poll:add_work(on_sock_recv)
end
-- start processing of the socket.
poll:add_work(on_sock_recv)
-- start event loop
poll:start()
s:close()
ctx:term()

@ -18,9 +18,9 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
require("zmq")
local ctx = zmq.init()
local ctx = zmq.init(1)
local s = ctx:socket(zmq.SUB)
s:setopt(zmq.SUBSCRIBE, "")
s:connect("tcp://localhost:5555")

@ -1,96 +0,0 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local poller = require"examples.poller"
local poll = poller.new()
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local s = ctx:socket(zmq.SUB)
local s_FD = s:getopt(zmq.FD)
s:setopt(zmq.SUBSCRIBE, "")
s:connect("tcp://localhost:5555")
-- current socket state
local blocked_state
local blocked_event
local on_sock_recv
local on_sock_send
-- IO event callback when socket was blocked
local function on_sock_io()
local events = s:getopt(z_EVENTS)
local unblocked = false
if events == blocked_event then
-- got the event the socket was blocked on.
unblocked = true
elseif events == z_POLLIN_OUT then
-- got both in & out events
unblocked = true
end
if unblocked then
-- got the event we are blocked on resume.
blocked_event = nil
blocked_state()
-- check if blocked event was processed.
if not blocked_event then
poll:remove_read(s_FD)
end
end
end
local function sock_blocked(state, event)
if not blocked_event then
-- need to register socket's fd with event loop
poll:add_read(s_FD, on_sock_io)
end
blocked_state = state
blocked_event = event
end
-- sock state functions
function on_sock_recv()
local data, err = s:recv(z_NOBLOCK)
if not data then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_recv, z_POLLIN)
end
local msg_id = tonumber(data)
if (msg_id % 10000) == 0 then print(data) end
return on_sock_recv()
end
-- start processing of the socket.
poll:add_work(on_sock_recv)
-- start event loop
poll:start()
s:close()
ctx:term()

@ -30,31 +30,16 @@ local roundtrip_count = tonumber(arg[3])
local zmq = require"zmq"
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.REP))
assert(s:bind(bind_to))
local s = ctx:socket(zmq.REP)
s:bind(bind_to)
local msg = zmq.zmq_msg_t()
local timer
for i = 1, roundtrip_count do
assert(s:recv_msg(msg))
if not timer then
timer = zmq.stopwatch_start()
end
assert(msg:size() == message_size, "Invalid message size")
assert(s:send_msg(msg))
end
local elapsed = timer:stop()
s:close()
ctx:term()
local latency = elapsed / roundtrip_count / 2
print(string.format("mean latency: %.3f [us]", latency))
local secs = elapsed / (1000 * 1000)
print(string.format("elapsed = %f", secs))
print(string.format("msg/sec = %f", roundtrip_count / secs))

@ -30,9 +30,9 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq"
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.PULL))
local s = ctx:socket(zmq.PULL)
--s:setopt(zmq.SUBSCRIBE, "");
assert(s:bind(bind_to))
s:bind(bind_to)
local function recv_msg(s,msg)
assert(s:recv_msg(msg))

@ -30,8 +30,8 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq"
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.PULL))
assert(s:bind(bind_to))
local s = ctx:socket(zmq.PULL)
s:bind(bind_to)
print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count))

@ -30,8 +30,9 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq"
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.PULL))
assert(s:bind(bind_to))
local s = ctx:socket(zmq.SUB)
s:setopt(zmq.SUBSCRIBE, "");
s:bind(bind_to)
print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count))

@ -1,77 +0,0 @@
-- Copyright (c) 2010 Aleksey Yeschenko <aleksey@yeschenko.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
if not arg[3] then
print("usage: lua local_thr.lua <bind-to> <message-size> <message-count>")
os.exit()
end
local bind_to = arg[1]
local message_size = tonumber(arg[2])
local message_count = tonumber(arg[3])
local zmq = require"zmq"
local z_poller = require"zmq.poller"
local z_NOBLOCK = zmq.NOBLOCK
local poller = z_poller(64)
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.SUB))
assert(s:setopt(zmq.SUBSCRIBE, ""))
assert(s:bind(bind_to))
print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count))
local msg
msg = zmq.zmq_msg_t()
local cnt = 0
poller:add(s, zmq.POLLIN, function(sock)
while s:recv_msg(msg, z_NOBLOCK) do
--assert(msg:size() == message_size, "Invalid message size")
cnt = cnt + 1
if cnt == message_count then
poller:stop()
end
end
end)
-- wait for first message
assert(s:recv_msg(msg))
cnt = 1
local timer = zmq.stopwatch_start()
poller:start()
local elapsed = timer:stop()
s:close()
ctx:term()
if elapsed == 0 then elapsed = 1 end
local throughput = message_count / (elapsed / 1000000)
local megabits = throughput * message_size * 8 / 1000000
print(string.format("mean throughput: %i [msg/s]", throughput))
print(string.format("mean throughput: %.3f [Mb/s]", megabits))

@ -30,8 +30,8 @@ local roundtrip_count = tonumber(arg[3])
local zmq = require"zmq"
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.REQ))
assert(s:connect(connect_to))
local s = ctx:socket(zmq.REQ)
s:connect(connect_to)
local data = ("0"):rep(message_size)
local msg = zmq.zmq_msg_t.init_size(message_size)
@ -54,7 +54,3 @@ local latency = elapsed / roundtrip_count / 2
print(string.format("message size: %i [B]", message_size))
print(string.format("roundtrip count: %i", roundtrip_count))
print(string.format("mean latency: %.3f [us]", latency))
local secs = elapsed / (1000 * 1000)
print(string.format("Elapsed: %f secs", secs))
print(string.format("throughput %f msg/secs", roundtrip_count / secs))

@ -31,8 +31,8 @@ local zmq = require"zmq"
local z_SNDMORE = zmq.SNDMORE
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.PUSH))
assert(s:connect(connect_to))
local s = ctx:socket(zmq.PUSH)
s:connect(connect_to)
local data = ("0"):rep(message_size/2)
local msg = zmq.zmq_msg_t.init_size(message_size/2)

@ -30,8 +30,8 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq"
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.PUSH))
assert(s:connect(connect_to))
local s = ctx:socket(zmq.PUSH)
s:connect(connect_to)
local data = ("0"):rep(message_size)
local msg = zmq.zmq_msg_t.init_size(message_size)

@ -30,8 +30,8 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq"
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.PUSH))
assert(s:connect(connect_to))
local s = ctx:socket(zmq.PUB)
s:connect(connect_to)
zmq.sleep(1)

@ -52,8 +52,8 @@ local child_code = [[
]]
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.REQ))
assert(s:bind(bind_to))
local s = ctx:socket(zmq.REQ)
s:bind(bind_to)
local child_thread = zthreads.runstring(ctx, child_code, connect_to, message_size, roundtrip_count)
child_thread:start()
@ -83,7 +83,4 @@ ctx:term()
local latency = elapsed / roundtrip_count / 2
print(string.format("mean latency: %.3f [us]", latency))
local secs = elapsed / (1000 * 1000)
print(string.format("elapsed = %f", secs))
print(string.format("msg/sec = %f", roundtrip_count / secs))

@ -37,18 +37,17 @@ local child_code = [[
local zthreads = require"zmq.threads"
local ctx = zthreads.get_parent_ctx()
local s = assert(ctx:socket(zmq.PUSH))
assert(s:setopt(zmq.HWM, message_count/4))
assert(s:connect(connect_to))
local s = ctx:socket(zmq.PUSH)
s:setopt(zmq.HWM, message_count/4)
s:connect(connect_to)
local data = ("0"):rep(message_size)
local msg_data = zmq.zmq_msg_t.init_data(data)
local msg = zmq.zmq_msg_t.init()
local msg = zmq.zmq_msg_t.init_size(message_size)
local timer = zmq.stopwatch_start()
for i = 1, message_count do
msg:copy(msg_data)
msg:set_data(data)
assert(s:send_msg(msg))
end
@ -68,8 +67,8 @@ local child_code = [[
]]
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.PULL))
assert(s:bind(bind_to))
local s = ctx:socket(zmq.PULL)
s:bind(bind_to)
print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count))

@ -24,8 +24,8 @@ end
local message_size = tonumber(arg[1] or 1)
local message_count = tonumber(arg[2] or 100000)
local bind_to = arg[3] or 'inproc://thread_thr_test'
local connect_to = arg[4] or 'inproc://thread_thr_test'
local bind_to = arg[3] or 'inproc://thread_lat_test'
local connect_to = arg[4] or 'inproc://thread_lat_test'
local zmq = require"zmq"
local zthreads = require"zmq.threads"
@ -37,8 +37,8 @@ local child_code = [[
local zthreads = require"zmq.threads"
local ctx = zthreads.get_parent_ctx()
local s = assert(ctx:socket(zmq.PUSH))
assert(s:connect(connect_to))
local s = ctx:socket(zmq.PUB)
s:connect(connect_to)
local data = ("0"):rep(message_size)
local msg_data = zmq.zmq_msg_t.init_data(data)
@ -67,8 +67,9 @@ local child_code = [[
]]
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.PULL))
assert(s:bind(bind_to))
local s = ctx:socket(zmq.SUB)
s:setopt(zmq.SUBSCRIBE, "");
s:bind(bind_to)
print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count))

@ -1,36 +0,0 @@
package = "lua-zmq"
version = "1.2-1"
source = {
url = "git://github.com/Neopallium/lua-zmq.git",
branch = "v1.2",
}
description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",
homepage = "http://github.com/Neopallium/lua-zmq",
license = "MIT/X11",
}
dependencies = {
"lua >= 5.1, < 5.5",
}
external_dependencies = {
ZEROMQ = {
header = "zmq.h",
library = "zmq",
}
}
build = {
type = "builtin",
modules = {
zmq = {
sources = {"src/pre_generated-zmq.nobj.c"},
incdirs = "$(ZEROMQ_INCDIR)",
libdirs = "$(ZEROMQ_LIBDIR)",
libraries = {"zmq"},
},
},
install = {
lua = {
['zmq.poller'] = "src/poller.lua",
}
}
}

@ -1,7 +1,7 @@
package = "lua-zmq"
version = "scm-1"
source = {
url = "git+https://brejela.club/gitea/brejela/lua-zmq.git",
url = "git://github.com/Neopallium/lua-zmq.git",
}
description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",
@ -9,31 +9,15 @@ description = {
license = "MIT/X11",
}
dependencies = {
"lua >= 5.1, < 5.5",
"lua >= 5.1",
}
external_dependencies = {
platforms = {
windows = {
ZEROMQ = {
library = "libzmq",
}
},
},
ZEROMQ = {
header = "zmq.h",
library = "zmq",
}
}
build = {
platforms = {
windows = {
modules = {
zmq = {
libraries = {"libzmq"},
}
}
},
},
type = "builtin",
modules = {
zmq = {

@ -1,51 +0,0 @@
package = "lua-zmq"
version = "scm-2"
source = {
url = "git://github.com/Neopallium/lua-zmq.git",
}
description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",
homepage = "http://github.com/Neopallium/lua-zmq",
license = "MIT/X11",
}
dependencies = {
"lua >= 5.1, < 5.5",
}
external_dependencies = {
platforms = {
windows = {
ZEROMQ = {
library = "libzmq",
}
},
},
ZEROMQ = {
header = "zmq.h",
library = "zmq",
}
}
build = {
platforms = {
windows = {
modules = {
zmq = {
libraries = {"libzmq"},
}
}
},
},
type = "builtin",
modules = {
zmq = {
sources = {"src/pre_generated-zmq.nobj.c"},
incdirs = "$(ZEROMQ_INCDIR)",
libdirs = "$(ZEROMQ_LIBDIR)",
libraries = {"zmq"},
},
},
install = {
lua = {
['zmq.poller'] = "src/poller.lua",
},
},
}

@ -1,23 +0,0 @@
package = "lua-zmq-threads"
version = "1.2-1"
source = {
url = "git://github.com/Neopallium/lua-zmq.git",
branch = "v1.2",
}
description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",
homepage = "http://github.com/Neopallium/lua-zmq",
license = "MIT/X11"
}
dependencies = {
"lua-zmq >= 1.2-1",
"lua-llthreads >= 1.3-1",
}
build = {
type = "none",
install = {
lua = {
['zmq.threads'] = "src/threads.lua",
}
}
}

@ -1,7 +1,7 @@
package = "lua-zmq-threads"
version = "scm-0"
source = {
url = "git+https://brejela.club/gitea/brejela/lua-zmq.git",
url = "git://github.com/Neopallium/lua-zmq.git",
}
description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",

@ -36,13 +36,5 @@ typedef struct ZMQ_Ctx ZMQ_Ctx;
method "socket" {
c_method_call "!ZMQ_Socket *" "zmq_socket" { "int", "type"}
},
method "set" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "int" "zmq_ctx_set" { "int", "flag", "int", "value" }
},
method "get" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "int" "zmq_ctx_get" { "int", "flag" }
},
}

@ -283,15 +283,12 @@ error_code "ZMQ_Error" "int" {
ffi_is_error_check = function(rec) return "(-1 == ${" .. rec.name .. "})" end,
default = "0",
c_source [[
int num;
if(-1 == err) {
/* get ZErrors table. */
lua_pushlightuserdata(L, zmq_ZErrors_key);
lua_rawget(L, LUA_REGISTRYINDEX);
/* convert zmq_errno to string. */
num = zmq_errno();
lua_pushinteger(L, num);
lua_gettable(L, -2);
lua_rawgeti(L, -1, zmq_errno());
/* remove ZErrors table. */
lua_remove(L, -2);
if(!lua_isnil(L, -1)) {
@ -300,8 +297,7 @@ error_code "ZMQ_Error" "int" {
}
/* Unknown error. */
lua_pop(L, 1);
lua_pushfstring(L, "UNKNOWN ERROR(%d)", num);
return;
err_str = "UNKNOWN ERROR";
}
]],
ffi_source [[

@ -21,22 +21,6 @@
object "zmq_msg_t" {
-- store the `zmq_msg_t` structure in Lua userdata object
userdata_type = "embed",
implements "Buffer" {
implement_method "const_data" {
c_function = "zmq_msg_data"
},
implement_method "get_size" {
c_function = "zmq_msg_size"
},
},
implements "MutableBuffer" {
implement_method "data" {
c_function = "zmq_msg_data"
},
implement_method "get_size" {
c_function = "zmq_msg_size"
},
},
--
-- Define zmq_msq_t type & function API for FFI
--
@ -44,7 +28,10 @@ object "zmq_msg_t" {
struct zmq_msg_t
{
unsigned char _ [64];
void *content;
unsigned char flags;
unsigned char vsm_size;
unsigned char vsm_data [30]; /* that '30' is from 'MAX_VSM_SIZE' */
};
int zmq_msg_init (zmq_msg_t *msg);
@ -145,7 +132,7 @@ int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
]],
},
method "size" {
c_method_call { "size_t", "size", ffi_wrap = "tonumber"} "zmq_msg_size" {}
c_method_call "size_t" "zmq_msg_size" {}
},
method "__tostring" {
var_out{ "const char *", "data", has_length = true },

@ -92,6 +92,5 @@ function M.new(pre_alloc)
}, poller_mt)
end
zmq.poller = M
return setmetatable(M, {__call = function(tab, ...) return M.new(...) end})

@ -272,10 +272,10 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
constructor "new" {
var_in{ "unsigned int", "length", is_optional = true, default = 10 },
c_export_method_call "void" "poller_init" { "unsigned int", "length" },
c_method_call "void" "poller_init" { "unsigned int", "length" },
},
destructor "close" {
c_export_method_call "void" "poller_cleanup" {},
c_method_call "void" "poller_cleanup" {},
},
method "add" {
var_in{ "<any>", "sock" },
@ -303,7 +303,6 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
ffi_source[[
local fd = 0
local sock_type = type(${sock})
local sock
if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock})
elseif sock_type == 'number' then
@ -311,7 +310,7 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
else
error("expected number or ZMQ_Socket")
end
${idx} = Cmod.poller_get_free_item(${this})
${idx} = C.poller_get_free_item(${this})
local item = ${this}.items[${idx}]
item.socket = sock
item.fd = fd
@ -356,15 +355,14 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
ffi_source[[
local fd = 0
local sock_type = type(${sock})
local sock
if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock})
-- find sock in items list.
${idx} = Cmod.poller_find_sock_item(${this}, sock)
${idx} = C.poller_find_sock_item(${this}, sock)
elseif sock_type == 'number' then
fd = ${sock}
-- find fd in items list.
${idx} = Cmod.poller_find_fd_item(${this}, fd);
${idx} = C.poller_find_fd_item(${this}, fd);
else
error("expected number or ZMQ_Socket")
end
@ -374,7 +372,7 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
item.fd = fd
item.events = ${events}
else
Cmod.poller_remove_item(${this}, ${idx})
C.poller_remove_item(${this}, ${idx})
end
]],
},
@ -406,28 +404,29 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
ffi_source[[
local fd = 0
local sock_type = type(${sock})
local sock
if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock})
-- find sock in items list.
${idx} = Cmod.poller_find_sock_item(${this}, sock)
${idx} = C.poller_find_sock_item(${this}, sock)
elseif sock_type == 'number' then
fd = ${sock}
-- find fd in items list.
${idx} = Cmod.poller_find_fd_item(${this}, fd);
${idx} = C.poller_find_fd_item(${this}, fd);
else
error("expected number or ZMQ_Socket")
end
if ${idx} >= 0 then
Cmod.poller_remove_item(${this}, ${idx})
C.poller_remove_item(${this}, ${idx})
end
]],
},
method "poll" {
var_in{ "long", "timeout" },
var_out{ "int", "count" },
-- poll for events
c_export_method_call { "ZMQ_Error", "err>2" } "poller_poll" { "long", "timeout" },
var_out{ "ZMQ_Error", "err" },
c_source[[
/* poll for events */
${err} = poller_poll(${this}, ${timeout});
if(${err} > 0) {
${this}->next = 0;
${count} = ${err};
@ -437,6 +436,8 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
}
]],
ffi_source[[
-- poll for events
${err} = C.poller_poll(${this}, ${timeout})
if(${err} > 0) then
${this}.next = 0
${count} = ${err}
@ -447,7 +448,7 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
]],
},
method "next_revents_idx" {
c_export_method_call { "int", "idx>1" } "poller_next_revents" { "int", "&revents>2" },
c_method_call { "int", "idx>1" } "poller_next_revents" { "int", "&revents>2" },
},
method "count" {
var_out{ "int", "count" },

File diff suppressed because it is too large Load Diff

@ -65,15 +65,6 @@ local socket_options = {
[20] = { name="recovery_ivl_msec", otype="INT64", mode="rw", ltype="int64_t" },
[21] = { name="reconnect_ivl_max", otype="INT", mode="rw", ltype="int" },
},
{ ver_def = 'VERSION_2_2', major = 2, minor = 2,
[22] = { },
[23] = { },
[24] = { },
[25] = { },
[26] = { },
[27] = { name="rcvtimeo", otype="INT", mode="rw", ltype="int" },
[28] = { name="sndtimeo", otype="INT", mode="rw", ltype="int" },
},
{ ver_def = 'VERSION_3_0', major = 3, minor = 0,
[1] = { name="hwm", otype="INT", mode="rw",
custom = [[
@ -128,87 +119,8 @@ ZMQ_Error lzmq_socket_hwm(ZMQ_Socket *sock, int *value) {
[30] = { },
[31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" },
},
{ ver_def = 'VERSION_4_0', major = 4, minor = 0,
[1] = { name="hwm", otype="INT", mode="rw",
custom = [[
ZMQ_Error lzmq_socket_set_hwm(ZMQ_Socket *sock, int value) {
int val;
int rc;
val = (int)value;
rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &value, sizeof(value));
if(-1 == rc) return rc;
val = (int)value;
return zmq_setsockopt(sock, ZMQ_RCVHWM, &value, sizeof(value));
}
ZMQ_Error lzmq_socket_hwm(ZMQ_Socket *sock, int *value) {
size_t val_len;
int rc;
val_len = sizeof(value);
rc = zmq_getsockopt(sock, ZMQ_SNDHWM, value, &val_len);
if(-1 == rc) return rc;
val_len = sizeof(value);
return zmq_getsockopt(sock, ZMQ_RCVHWM, value, &val_len);
}
]] },
[2] = { },
[3] = { },
[4] = { name="affinity", otype="UINT64", mode="rw", ltype="uint64_t" },
[5] = { name="identity", otype="BLOB", mode="rw", ltype="const char *" },
[6] = { name="subscribe", otype="BLOB", mode="w", ltype="const char *" },
[7] = { name="unsubscribe", otype="BLOB", mode="w", ltype="const char *" },
[8] = { name="rate", otype="INT", mode="rw", ltype="int" },
[9] = { name="recovery_ivl", otype="INT", mode="rw", ltype="int" },
[10] = { },
[11] = { name="sndbuf", otype="INT", mode="rw", ltype="int" },
[12] = { name="rcvbuf", otype="INT", mode="rw", ltype="int" },
[13] = { name="rcvmore", otype="INT", mode="r", ltype="int" },
[14] = { name="fd", otype="FD", mode="r", ltype="int" },
[15] = { name="events", otype="INT", mode="r", ltype="int" },
[16] = { name="type", otype="INT", mode="r", ltype="int" },
[17] = { name="linger", otype="INT", mode="rw", ltype="int" },
[18] = { name="reconnect_ivl", otype="INT", mode="rw", ltype="int" },
[19] = { name="backlog", otype="INT", mode="rw", ltype="int" },
[20] = { },
[21] = { name="reconnect_ivl_max", otype="INT", mode="rw", ltype="int" },
[22] = { name="maxmsgsize", otype="INT64", mode="rw", ltype="int64_t" },
[23] = { name="sndhwm", otype="INT", mode="rw", ltype="int" },
[24] = { name="rcvhwm", otype="INT", mode="rw", ltype="int" },
[25] = { name="multicast_hops", otype="INT", mode="rw", ltype="int" },
[26] = { },
[27] = { name="rcvtimeo", otype="INT", mode="rw", ltype="int" },
[28] = { name="sndtimeo", otype="INT", mode="rw", ltype="int" },
[29] = { },
[30] = { },
[31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" },
-- New to version 4.x
[32] = { name="last_endpoint", otype="BLOB", mode="r", ltype="const char *" },
[33] = { name="router_mandatory", otype="INT", mode="w", ltype="int" },
[34] = { name="tcp_keepalive", otype="INT", mode="rw", ltype="int" },
[35] = { name="tcp_keepalive_cnt", otype="INT", mode="rw", ltype="int" },
[36] = { name="tcp_keepalive_idle",otype="INT", mode="rw", ltype="int" },
[37] = { name="tcp_keepalive_intvl",otype="INT", mode="rw", ltype="int" },
[38] = { name="tcp_accept_filter", otype="BLOB", mode="w", ltype="const char *" },
[39] = { name="immediate", otype="INT", mode="rw", ltype="int" },
[40] = { name="xpub_verbose", otype="INT", mode="w", ltype="int" },
[41] = { name="router_raw", otype="INT", mode="w", ltype="int" },
[42] = { name="ipv6", otype="INT", mode="rw", ltype="int" },
[43] = { name="mechanism", otype="INT", mode="r", ltype="int" },
[44] = { name="plain_server", otype="INT", mode="rw", ltype="int" },
[45] = { name="plain_username", otype="BLOB", mode="rw", ltype="const char *" },
[46] = { name="plain_password", otype="BLOB", mode="rw", ltype="const char *" },
[47] = { name="curve_server", otype="INT", mode="rw", ltype="int" },
[48] = { name="curve_publickey", otype="BLOB", mode="rw", ltype="const char *" },
[49] = { name="curve_secretkey", otype="BLOB", mode="rw", ltype="const char *" },
[50] = { name="curve_serverkey", otype="BLOB", mode="rw", ltype="const char *" },
[51] = { name="probe_router", otype="INT", mode="w", ltype="int" },
[52] = { name="req_correlate", otype="INT", mode="w", ltype="int" },
[53] = { name="req_relaxed", otype="INT", mode="w", ltype="int" },
[54] = { name="conflate", otype="INT", mode="rw", ltype="int" },
[55] = { name="zap_domain", otype="BLOB", mode="rw", ltype="const char *" },
},
}
local max_options = 60 -- this number must be larger then the highest option value.
local max_options = 50
local function foreach_opt(func)
for i=1,#socket_options do
@ -336,17 +248,17 @@ foreach_opt(function(num, opt, ver)
if opt.c_set then
if opt.otype == 'BLOB' then
set = [[
LUA_NOBJ_API ZMQ_Error ${c_set}(ZMQ_Socket *sock, const char *value, size_t str_len) {
ZMQ_Error ${c_set}(ZMQ_Socket *sock, const char *value, size_t str_len) {
return zmq_setsockopt(sock, ${DEF}, value, str_len);
]]
elseif opt.ctype == opt.ltype then
set = [[
LUA_NOBJ_API ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) {
ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) {
return zmq_setsockopt(sock, ${DEF}, &value, sizeof(value));
]]
else
set = [[
LUA_NOBJ_API ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) {
ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) {
${ctype} val = (${ctype})value;
return zmq_setsockopt(sock, ${DEF}, &val, sizeof(val));
]]
@ -357,18 +269,18 @@ LUA_NOBJ_API ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) {
if opt.c_get then
if opt.otype == 'BLOB' then
get = [[
LUA_NOBJ_API ZMQ_Error ${c_get}(ZMQ_Socket *sock, char *value, size_t *len) {
ZMQ_Error ${c_get}(ZMQ_Socket *sock, char *value, size_t *len) {
return zmq_getsockopt(sock, ${DEF}, value, len);
]]
elseif opt.ctype == opt.ltype then
get = [[
LUA_NOBJ_API ZMQ_Error ${c_get}(ZMQ_Socket *sock, ${ltype} *value) {
ZMQ_Error ${c_get}(ZMQ_Socket *sock, ${ltype} *value) {
size_t val_len = sizeof(${ltype});
return zmq_getsockopt(sock, ${DEF}, value, &val_len);
]]
else
get = [[
LUA_NOBJ_API ZMQ_Error ${c_get}(ZMQ_Socket *sock, ${ltype} *value) {
ZMQ_Error ${c_get}(ZMQ_Socket *sock, ${ltype} *value) {
${ctype} val;
size_t val_len = sizeof(val);
int rc = zmq_getsockopt(sock, ${DEF}, &val, &val_len);
@ -389,14 +301,10 @@ end)
endif(last_ver.ver_def)
add(opt_types, [[
#if VERSION_4_0
# define MAX_OPTS VERSION_4_0_MAX_OPT
#elif VERSION_3_0
#if VERSION_3_0
# define MAX_OPTS VERSION_3_0_MAX_OPT
#else
# if VERSION_2_2
# define MAX_OPTS VERSION_2_2_MAX_OPT
# elif VERSION_2_1
# if VERSION_2_1
# define MAX_OPTS VERSION_2_1_MAX_OPT
# else
# define MAX_OPTS VERSION_2_0_MAX_OPT
@ -447,7 +355,7 @@ local function build_option_methods()
end
m[#m+1] = method (name) { if_defs = if_defs,
var_out(val_out),
c_export_method_call "ZMQ_Error" (meth.c_get) (args),
c_method_call "ZMQ_Error" (meth.c_get) (args),
}
end
-- generate setter method.
@ -458,7 +366,7 @@ local function build_option_methods()
args = { ltype, "value", "size_t", "#value" }
end
m[#m+1] = method (name) { if_defs = if_defs,
c_export_method_call "ZMQ_Error" (meth.c_set) (args),
c_method_call "ZMQ_Error" (meth.c_set) (args),
}
end
end
@ -475,38 +383,18 @@ end
object "ZMQ_Socket" {
error_on_null = "get_zmq_strerror()",
ffi_source "ffi_pre_cdef" [[
ffi_source [[
-- detect zmq version
local VERSION_2_0 = true
local VERSION_2_1 = false
local VERSION_2_2 = false
local VERSION_3_0 = false
local VERSION_4_0 = false
local zver = _M.version()
if zver[1] == 4 then
VERSION_2_0 = false
VERSION_4_0 = true
elseif zver[1] == 3 then
if zver[1] == 3 then
VERSION_2_0 = false
VERSION_3_0 = true
elseif zver[1] == 2 and zver[2] == 2 then
VERSION_2_2 = true
VERSION_2_1 = true
elseif zver[1] == 2 and zver[2] == 1 then
VERSION_2_1 = true
end
if VERSION_2_0 then
ffi.cdef[==[
typedef int ZMQ_Error;
typedef struct ZMQ_Socket ZMQ_Socket;
typedef struct zmq_msg_t zmq_msg_t;
ZMQ_Error zmq_sendmsg(ZMQ_Socket *sock, zmq_msg_t *msg, int flags) __asm__("zmq_send");
ZMQ_Error zmq_recvmsg(ZMQ_Socket *sock, zmq_msg_t *msg, int flags) __asm__("zmq_recv");
]==]
end
]],
c_source ([[
@ -548,17 +436,9 @@ static const int opt_types[] = {
method "bind" {
c_method_call "ZMQ_Error" "zmq_bind" { "const char *", "addr" }
},
method "unbind" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_unbind" { "const char *", "addr" }
},
method "connect" {
c_method_call "ZMQ_Error" "zmq_connect" { "const char *", "addr" }
},
method "disconnect" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_disconnect" { "const char *", "addr" }
},
ffi_cdef[[
int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
@ -580,7 +460,7 @@ do
setmetatable(option_sets,{__index = function(tab,opt)
local opt_name = opt_name[opt]
if not opt_name then return nil end
local method = methods['set_' .. opt_name] or methods[opt_name]
local method = methods[opt_name] or methods['set_' .. opt_name]
rawset(tab, opt, method)
return method
end})
@ -597,7 +477,7 @@ end
size_t val_len;
const void *val;
#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
#if VERSION_2_1
socket_t fd_val;
#endif
int int_val;
@ -605,10 +485,10 @@ end
uint64_t uint64_val;
int64_t int64_val;
#if VERSION_3_0 || VERSION_4_0
#if VERSION_3_0
/* 3.0 backwards compatibility support for HWM. */
if(${opt} == ZMQ_HWM) {
int_val = luaL_checkinteger(L, ${val::idx});
int_val = luaL_checklong(L, ${val::idx});
val = &int_val;
val_len = sizeof(int_val);
${err} = zmq_setsockopt(${this}, ZMQ_SNDHWM, val, val_len);
@ -624,30 +504,30 @@ end
}
switch(opt_types[${opt}]) {
#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
#if VERSION_2_1
case OPT_TYPE_FD:
fd_val = luaL_checkinteger(L, ${val::idx});
fd_val = luaL_checklong(L, ${val::idx});
val = &fd_val;
val_len = sizeof(fd_val);
break;
#endif
case OPT_TYPE_INT:
int_val = luaL_checkinteger(L, ${val::idx});
int_val = luaL_checklong(L, ${val::idx});
val = &int_val;
val_len = sizeof(int_val);
break;
case OPT_TYPE_UINT32:
uint32_val = luaL_checkinteger(L, ${val::idx});
uint32_val = luaL_checklong(L, ${val::idx});
val = &uint32_val;
val_len = sizeof(uint32_val);
break;
case OPT_TYPE_UINT64:
uint64_val = luaL_checkinteger(L, ${val::idx});
uint64_val = luaL_checklong(L, ${val::idx});
val = &uint64_val;
val_len = sizeof(uint64_val);
break;
case OPT_TYPE_INT64:
int64_val = luaL_checkinteger(L, ${val::idx});
int64_val = luaL_checklong(L, ${val::idx});
val = &int64_val;
val_len = sizeof(int64_val);
break;
@ -681,7 +561,7 @@ local tmp_val_len = ffi.new('size_t[1]', 4)
c_source[[
size_t val_len;
#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
#if VERSION_2_1
socket_t fd_val;
#endif
int int_val;
@ -698,7 +578,7 @@ local tmp_val_len = ffi.new('size_t[1]', 4)
}
switch(opt_types[${opt}]) {
#if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
#if VERSION_2_1
case OPT_TYPE_FD:
val_len = sizeof(fd_val);
${err} = zmq_getsockopt(${this}, ${opt}, &fd_val, &val_len);
@ -769,14 +649,11 @@ local tmp_val_len = ffi.new('size_t[1]', 4)
-- zmq_send
--
method "send_msg" {
c_method_call "ZMQ_Error" "zmq_sendmsg" { "zmq_msg_t *", "msg", "int", "flags?" },
c_export_method_call "ZMQ_Error" "zmq_sendmsg" { "zmq_msg_t *", "msg", "int", "flags?" },
},
-- create helper function for `zmq_send`
c_source[[
LUA_NOBJ_API ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_t data_len, int flags) {
#if VERSION_3_2
return zmq_send(sock, data, data_len, flags);
#else
ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_t data_len, int flags) {
ZMQ_Error err;
zmq_msg_t msg;
/* initialize message */
@ -790,18 +667,17 @@ LUA_NOBJ_API ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_
zmq_msg_close(&msg);
}
return err;
#endif
}
]],
method "send" {
c_export_method_call "ZMQ_Error" "simple_zmq_send"
c_method_call "ZMQ_Error" "simple_zmq_send"
{ "const char *", "data", "size_t", "#data", "int", "flags?"}
},
--
-- zmq_recv
--
method "recv_msg" {
c_method_call "ZMQ_Error" "zmq_recvmsg" { "zmq_msg_t *", "msg", "int", "flags?" },
c_export_method_call "ZMQ_Error" "zmq_recvmsg" { "zmq_msg_t *", "msg", "int", "flags?" },
},
ffi_source[[
local tmp_msg = ffi.new('zmq_msg_t')
@ -817,7 +693,7 @@ local tmp_msg = ffi.new('zmq_msg_t')
if(0 == ${err}) {
/* receive message */
${err} = zmq_recvmsg(${this}, &msg, ${flags});
if(${err} >= 0) {
if(0 == ${err}) {
${data} = zmq_msg_data(&msg);
${data_len} = zmq_msg_size(&msg);
}
@ -835,215 +711,17 @@ local tmp_msg = ffi.new('zmq_msg_t')
end
-- receive message
${err} = C.zmq_recvmsg(${this}, msg, ${flags})
if ${err} >= 0 then
${err} = zmq_recvmsg(${this}, msg, ${flags})
if 0 == ${err} then
local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg))
-- close message
C.zmq_msg_close(msg)
return data
end
-- close message
C.zmq_msg_close(msg)
]],
},
--
-- Monitor socket.
--
method "monitor" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_socket_monitor" { "const char *", "addr", "int", "events" }
},
c_source[[
typedef struct ZMQ_recv_event {
int event_id;
int value;
const char *addr;
size_t addr_len;
const char *err;
} ZMQ_recv_event;
#if (ZMQ_VERSION_MAJOR == 4) && (ZMQ_VERSION_MINOR >= 1)
typedef struct zmq_event_t {
int16_t event;
int32_t value;
} zmq_event_t;
#endif
int monitor_recv_event(ZMQ_Socket *s, zmq_msg_t *msg, int flags, ZMQ_recv_event *ev)
{
int rc ;
zmq_event_t event;
ev->event_id = 0;
ev->value = 0;
ev->addr = NULL;
ev->err = NULL;
ev->addr_len = 0;
zmq_msg_init(msg);
/* recv binary event. */
rc = zmq_recvmsg(s, msg, flags);
if(rc < 0) {
return rc;
}
#if ZMQ_VERSION_MAJOR == 3
if(zmq_msg_size(msg) != sizeof(event)) {
ev->err = "Invalid monitor event. Wrong event size.";
return -1;
}
memcpy(&event, zmq_msg_data(msg), sizeof(event));
ev->event_id = event.event;
switch(event.event) {
case ZMQ_EVENT_CONNECTED:
ev->value = event.data.connected.fd;
ev->addr = event.data.connected.addr;
break;
case ZMQ_EVENT_CONNECT_DELAYED:
ev->value = event.data.connect_delayed.err;
ev->addr = event.data.connect_delayed.addr;
break;
case ZMQ_EVENT_CONNECT_RETRIED:
ev->value = event.data.connect_retried.interval;
ev->addr = event.data.connect_retried.addr;
break;
case ZMQ_EVENT_LISTENING:
ev->value = event.data.listening.fd;
ev->addr = event.data.listening.addr;
break;
case ZMQ_EVENT_BIND_FAILED:
ev->value = event.data.bind_failed.err;
ev->addr = event.data.bind_failed.addr;
break;
case ZMQ_EVENT_ACCEPTED:
ev->value = event.data.accepted.fd;
ev->addr = event.data.accepted.addr;
break;
case ZMQ_EVENT_ACCEPT_FAILED:
ev->value = event.data.accept_failed.err;
ev->addr = event.data.accept_failed.addr;
break;
case ZMQ_EVENT_CLOSED:
ev->value = event.data.closed.fd;
ev->addr = event.data.closed.addr;
break;
case ZMQ_EVENT_CLOSE_FAILED:
ev->value = event.data.close_failed.err;
ev->addr = event.data.close_failed.addr;
break;
case ZMQ_EVENT_DISCONNECTED:
ev->value = event.data.disconnected.fd;
ev->addr = event.data.disconnected.addr;
break;
}
if(ev->addr) {
ev->addr_len = strlen(ev->addr);
}
if(zmq_msg_more(msg) != 0) {
ev->err = "Invalid monitor event. Has too many parts.";
return -1;
}
#else
if(zmq_msg_size(msg) != (sizeof(event.event) + sizeof(event.value))) {
ev->err = "Invalid monitor event. Wrong event size.";
return -1;
}
/* copy binary data to event struct */
const char* data = (char*)zmq_msg_data(msg);
memcpy(&(event.event), data, sizeof(event.event));
memcpy(&(event.value), data+sizeof(event.event), sizeof(event.value));
ev->event_id = event.event;
ev->value = event.value;
if(zmq_msg_more(msg) == 0) {
ev->err = "Invalid monitor event. Missing address part.";
return -1;
}
ev->value = event.value;
/* recv address part */
rc = zmq_recvmsg(s, msg, flags);
if(rc < 0) {
return rc;
}
if(zmq_msg_more(msg) != 0) {
ev->err = "Invalid monitor event. Has too many parts.";
return -1;
}
/* copy address part */
ev->addr_len = zmq_msg_size(msg) ;
ev->addr = zmq_msg_data(msg);
#endif
return 1;
}
]],
ffi_cdef[[
typedef struct ZMQ_recv_event {
int event_id;
int value;
const char *addr;
size_t addr_len;
const char *err;
} ZMQ_recv_event;
int monitor_recv_event(ZMQ_Socket *s, zmq_msg_t *msg, int flags, ZMQ_recv_event *ev);
]],
ffi_source[[
local tmp_recv_event = ffi.new('ZMQ_recv_event')
]],
method "recv_event" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
var_in{ "int", "flags?" },
var_out{ "int", "event_id" },
var_out{ "int", "value" },
var_out{ "const char *", "addr", has_length = true },
var_out{ "ZMQ_Error", "err" },
c_source[[
zmq_msg_t msg;
ZMQ_recv_event event;
/* receive monitor event */
${err} = monitor_recv_event(${this}, &msg, ${flags}, &event);
if(${err} >= 0) {
${event_id} = event.event_id;
${value} = event.value;
${addr} = event.addr;
${addr_len} = event.addr_len; //${err};
} else if(event.err != NULL) {
/* error parsing monitor event. */
lua_pushnil(L);
lua_pushstring(L, event.err);
return 2;
}
]],
c_source "post" [[
/* close message */
zmq_msg_close(&msg);
]],
ffi_source[[
local msg = tmp_msg
local event = tmp_recv_event
local addr
-- receive monitor event
${err} = Cmod.monitor_recv_event(${this}, msg, ${flags}, event)
if ${err} >= 0 then
addr = ffi.string(event.addr, event.addr_len)
ffi_source "ffi_post" [[
-- close message
C.zmq_msg_close(msg)
return event.event_id, event.value, addr
end
-- close message
C.zmq_msg_close(msg)
if event.err ~= nil then
-- error parsing monitor event.
return nil, ffi.string(event.err)
end
]],
},

@ -19,10 +19,8 @@
-- THE SOFTWARE.
object "ZMQ_StopWatch" {
include "zmq_utils.h",
c_source[[
#if (ZMQ_VERSION_MAJOR <= 4) && (ZMQ_VERSION_MINOR <= 1)
#include "zmq_utils.h"
#endif
typedef struct ZMQ_StopWatch ZMQ_StopWatch;
]],
constructor "start" {

@ -25,6 +25,21 @@
local zmq = require"zmq"
local llthreads = require"llthreads"
local setmetatable = setmetatable
local tonumber = tonumber
local assert = assert
local thread_mt = {}
thread_mt.__index = thread_mt
function thread_mt:start(detached)
return self.thread:start(detached)
end
function thread_mt:join()
return self.thread:join()
end
local bootstrap_pre = [[
local action, action_arg, parent_ctx = ...
local func
@ -64,7 +79,10 @@ local function new_thread(ctx, action, action_arg, ...)
if ctx then
ctx = ctx:lightuserdata()
end
return llthreads.new(bootstrap_code, action, action_arg, ctx, ...)
local thread = llthreads.new(bootstrap_code, action, action_arg, ctx, ...)
return setmetatable({
thread = thread,
}, thread_mt)
end
local M = {}
@ -90,5 +108,4 @@ function M.get_parent_ctx(ctx)
return parent_ctx
end
zmq.threads = M
return M

@ -1,75 +0,0 @@
-- Copyright (c) 2011 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
if #arg < 1 then
print("usage: lua " .. arg[0] .. " [message-size] [roundtrip-count] [bind-to] [connect-to]")
end
local message_size = tonumber(arg[1] or 1)
local roundtrip_count = tonumber(arg[2] or 100)
local bind_to = arg[3] or 'inproc://thread_lat_test'
local connect_to = arg[4] or 'inproc://thread_lat_test'
local zmq = require"zmq"
local ctx = zmq.init(1)
local server = assert(ctx:socket(zmq.REQ))
assert(server:bind(bind_to))
local client = ctx:socket(zmq.REP)
client:connect(connect_to)
local data = ("0"):rep(message_size)
local msg = zmq.zmq_msg_t.init_size(message_size)
local client_msg = zmq.zmq_msg_t()
print(string.format("message size: %i [B]", message_size))
print(string.format("roundtrip count: %i", roundtrip_count))
local timer = zmq.stopwatch_start()
for i = 1, roundtrip_count do
-- server send
assert(server:send_msg(msg))
-- client recv
assert(client:recv_msg(client_msg))
assert(client_msg:size() == message_size, "Invalid message size")
-- client send
assert(client:send_msg(client_msg))
-- server recv
assert(server:recv_msg(msg))
assert(msg:size() == message_size, "Invalid message size")
end
local elapsed = timer:stop()
server:close()
client:close()
ctx:term()
local latency = elapsed / roundtrip_count / 2
print(string.format("mean latency: %.3f [us]", latency))
local secs = elapsed / (1000 * 1000)
print(string.format("elapsed = %f", secs))
print(string.format("msg/sec = %f", roundtrip_count / secs))

@ -34,13 +34,13 @@ tcp_port_end = -1,
-- setup protocol fields.
zmq_proto.fields = {}
local fds = zmq_proto.fields
fds.frame = ProtoField.new("Frame", "zmq.frame", "ftypes.BYTES", nil, "base.NONE")
fds.length = ProtoField.new("Frame Length", "zmq.frame.len", "ftypes.UINT64", nil, "base.DEC")
fds.length8 = ProtoField.new("Frame 8bit Length", "zmq.frame.len8", "ftypes.UINT8", nil, "base.DEC")
fds.length64 = ProtoField.new("Frame 64bit Length", "zmq.frame.len64", "ftypes.UINT64", nil, "base.DEC")
fds.flags = ProtoField.new("Frame Flags", "zmq.frame.flags", "ftypes.UINT8", nil, "base.HEX", "0xFF")
fds.flags_more = ProtoField.new("More", "zmq.frame.flags.more", "ftypes.UINT8", nil, "base.HEX", "0x01")
fds.body = ProtoField.new("Frame body", "zmq.frame.body", "ftypes.BYTES", nil, "base.NONE")
fds.frame = ProtoField.new("Frame", "zmq.frame", "FT_BYTES", nil, "BASE_NONE")
fds.length = ProtoField.new("Frame Length", "zmq.frame.len", "FT_UINT64", nil, "BASE_DEC")
fds.length8 = ProtoField.new("Frame 8bit Length", "zmq.frame.len8", "FT_UINT8", nil, "BASE_DEC")
fds.length64 = ProtoField.new("Frame 64bit Length", "zmq.frame.len64", "FT_UINT64", nil, "BASE_DEC")
fds.flags = ProtoField.new("Frame Flags", "zmq.frame.flags", "FT_UINT8", nil, "BASE_HEX", "0xFF")
fds.flags_more = ProtoField.new("More", "zmq.frame.flags.more", "FT_UINT8", nil, "BASE_HEX", "0x01")
fds.body = ProtoField.new("Frame body", "zmq.frame.body", "FT_BYTES", nil, "BASE_NONE")
-- un-register zmq to handle tcp port range
local function unregister_tcp_port_range(start_port, end_port)

@ -23,18 +23,11 @@ set_variable_format "%s%d"
c_module "zmq" {
-- module settings.
module_globals = true, -- support old code that doesn't do: local zmq = require"zmq"
use_globals = false,
hide_meta_info = true,
luajit_ffi = true,
-- needed for functions exported from module.
luajit_ffi_load_cmodule = true,
ffi_load {
"zmq", -- default lib name.
Windows = "libzmq", -- lib name for on windows.
},
sys_include "string.h",
include "zmq.h",
@ -42,85 +35,37 @@ c_source "typedefs" [[
/* detect zmq version */
#define VERSION_2_0 1
#define VERSION_2_1 0
#define VERSION_2_2 0
#define VERSION_3_0 0
#define VERSION_3_2 0
#define VERSION_4_0 0
#if defined(ZMQ_VERSION_MAJOR)
# if (ZMQ_VERSION_MAJOR == 2) && (ZMQ_VERSION_MINOR == 2)
# undef VERSION_2_2
# define VERSION_2_2 1
# undef VERSION_2_1
# define VERSION_2_1 1
# endif
# if (ZMQ_VERSION_MAJOR == 2) && (ZMQ_VERSION_MINOR == 1)
# undef VERSION_2_1
# define VERSION_2_1 1
# endif
# if (ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR == 3)
# undef VERSION_2_0
# define VERSION_2_0 0
# undef VERSION_3_2
# define VERSION_3_2 1
# undef VERSION_3_0
# define VERSION_3_0 1
# endif
# if (ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR == 2)
# undef VERSION_2_0
# define VERSION_2_0 0
# undef VERSION_3_2
# define VERSION_3_2 1
# undef VERSION_3_0
# define VERSION_3_0 1
# endif
# if (ZMQ_VERSION_MAJOR == 3)
# undef VERSION_2_0
# define VERSION_2_0 0
# undef VERSION_3_0
# define VERSION_3_0 1
# endif
# if (ZMQ_VERSION_MAJOR == 4)
# undef VERSION_2_0
# define VERSION_2_0 0
# undef VERSION_3_2
# define VERSION_3_2 0
# undef VERSION_3_0
# define VERSION_3_0 0
# undef VERSION_4_0
# define VERSION_4_0 1
# endif
#endif
/* make sure ZMQ_DONTWAIT & ZMQ_NOBLOCK are both defined. */
#ifndef ZMQ_DONTWAIT
# define ZMQ_DONTWAIT ZMQ_NOBLOCK
#endif
#ifndef ZMQ_NOBLOCK
# define ZMQ_NOBLOCK ZMQ_DONTWAIT
#endif
/* make sure DEALER/ROUTER & XREQ/XREP are all defined. */
#ifndef ZMQ_DEALER
# define ZMQ_DEALER ZMQ_XREQ
#endif
#ifndef ZMQ_ROUTER
# define ZMQ_ROUTER ZMQ_XREP
#endif
#ifndef ZMQ_XREQ
# define ZMQ_XREQ ZMQ_DEALER
#endif
#ifndef ZMQ_XREP
# define ZMQ_XREP ZMQ_ROUTER
#endif
#if VERSION_2_0
# define ZMQ_POLL_MSEC 1000 // zmq_poll is usec
#elif VERSION_3_0 || VERSION_4_0
#elif VERSION_3_0
# define ZMQ_POLL_MSEC 1 // zmq_poll is msec
# ifndef ZMQ_HWM
# define ZMQ_HWM 1 // backwards compatibility
# endif
#endif
#ifndef ZMQ_DEALER
# define ZMQ_DEALER ZMQ_XREQ
#endif
#ifndef ZMQ_ROUTER
# define ZMQ_ROUTER ZMQ_XREP
#endif
]],
--
@ -129,10 +74,6 @@ c_source "typedefs" [[
export_definitions {
MAX_VSM_SIZE = "ZMQ_MAX_VSM_SIZE",
-- context settings
MAX_SOCKETS = "ZMQ_MAX_SOCKETS",
IO_THREADS = "ZMQ_IO_THREADS",
-- message types
DELIMITER = "ZMQ_DELIMITER",
VSM = "ZMQ_VSM",
@ -147,17 +88,14 @@ PUB = "ZMQ_PUB",
SUB = "ZMQ_SUB",
REQ = "ZMQ_REQ",
REP = "ZMQ_REP",
PULL = "ZMQ_PULL",
PUSH = "ZMQ_PUSH",
DEALER = "ZMQ_DEALER",
ROUTER = "ZMQ_ROUTER",
XREQ = "ZMQ_XREQ",
XREP = "ZMQ_XREP",
PULL = "ZMQ_PULL",
PUSH = "ZMQ_PUSH",
-- new 3.1 socket types
XPUB = "ZMQ_XPUB",
XSUB = "ZMQ_XSUB",
-- deprecated
XREQ = "ZMQ_DEALER",
XREP = "ZMQ_ROUTER",
-- socket options
HWM = "ZMQ_HWM",
@ -187,42 +125,12 @@ MULTICAST_HOPS = "ZMQ_MULTICAST_HOPS",
RCVTIMEO = "ZMQ_RCVTIMEO",
SNDTIMEO = "ZMQ_SNDTIMEO",
RCVLABEL = "ZMQ_RCVLABEL",
LAST_ENDPOINT = "ZMQ_LAST_ENDPOINT",
ROUTER_MANDATORY = "ZMQ_ROUTER_MANDATORY",
TCP_KEEPALIVE = "ZMQ_TCP_KEEPALIVE",
TCP_KEEPALIVE_CNT = "ZMQ_TCP_KEEPALIVE_CNT",
TCP_KEEPALIVE_IDLE= "ZMQ_TCP_KEEPALIVE_IDLE",
TCP_KEEPALIVE_INTVL= "ZMQ_TCP_KEEPALIVE_INTVL",
TCP_ACCEPT_FILTER = "ZMQ_TCP_ACCEPT_FILTER",
IMMEDIATE = "ZMQ_IMMEDIATE",
XPUB_VERBOSE = "ZMQ_XPUB_VERBOSE",
ROUTER_RAW = "ZMQ_ROUTER_RAW",
IPV6 = "ZMQ_IPV6",
MECHANISM = "ZMQ_MECHANISM",
PLAIN_SERVER = "ZMQ_PLAIN_SERVER",
PLAIN_USERNAME = "ZMQ_PLAIN_USERNAME",
PLAIN_PASSWORD = "ZMQ_PLAIN_PASSWORD",
CURVE_SERVER = "ZMQ_CURVE_SERVER",
CURVE_PUBLICKEY = "ZMQ_CURVE_PUBLICKEY",
CURVE_SECRETKEY = "ZMQ_CURVE_SECRETKEY",
CURVE_SERVERKEY = "ZMQ_CURVE_SERVERKEY",
PROBE_ROUTER = "ZMQ_PROBE_ROUTER",
REQ_CORRELATE = "ZMQ_REQ_CORRELATE",
REQ_RELAXED = "ZMQ_REQ_RELAXED",
CONFLATE = "ZMQ_CONFLATE",
ZAP_DOMAIN = "ZMQ_ZAP_DOMAIN",
-- send/recv flags
NOBLOCK = "ZMQ_NOBLOCK",
DONTWAIT = "ZMQ_DONTWAIT",
SNDMORE = "ZMQ_SNDMORE",
SNDLABEL = "ZMQ_SNDLABEL",
-- Security mechanisms
NULL = "ZMQ_NULL",
PLAIN = "ZMQ_PLAIN",
CURVE = "ZMQ_CURVE",
-- poll events
POLLIN = "ZMQ_POLLIN",
POLLOUT = "ZMQ_POLLOUT",
@ -231,30 +139,13 @@ POLLERR = "ZMQ_POLLERR",
-- poll milliseconds.
POLL_MSEC = "ZMQ_POLL_MSEC",
-- Socket Monitor events.
EVENT_CONNECTED = "ZMQ_EVENT_CONNECTED",
EVENT_CONNECT_DELAYED = "ZMQ_EVENT_CONNECT_DELAYED",
EVENT_CONNECT_RETRIED = "ZMQ_EVENT_CONNECT_RETRIED",
EVENT_LISTENING = "ZMQ_EVENT_LISTENING",
EVENT_BIND_FAILED = "ZMQ_EVENT_BIND_FAILED",
EVENT_ACCEPTED = "ZMQ_EVENT_ACCEPTED",
EVENT_ACCEPT_FAILED= "ZMQ_EVENT_ACCEPT_FAILED",
EVENT_CLOSED = "ZMQ_EVENT_CLOSED",
EVENT_CLOSE_FAILED= "ZMQ_EVENT_CLOSE_FAILED",
EVENT_DISCONNECTED= "ZMQ_EVENT_DISCONNECTED",
EVENT_MONITOR_STOPPED = "ZMQ_EVENT_MONITOR_STOPPED",
EVENT_ALL = "ZMQ_EVENT_ALL",
-- devices
STREAMER = "ZMQ_STREAMER",
FORWARDER = "ZMQ_FORWARDER",
QUEUE = "ZMQ_QUEUE",
},
subfiles {
"src/error.nobj.lua",
"src/msg.nobj.lua",
@ -284,7 +175,6 @@ c_function "version" {
]],
},
c_function "init" {
var_in{ "int", "io_threads?", default = "1" },
c_call "!ZMQ_Ctx *" "zmq_init" { "int", "io_threads" },
},
c_function "init_ctx" {
@ -308,23 +198,31 @@ c_function "init_ctx" {
end
]],
},
c_function "device" { if_defs = { "VERSION_2_0", "VERSION_3_2" },
c_function "device" { if_defs = "VERSION_2_0",
c_call "ZMQ_Error" "zmq_device"
{ "int", "device", "ZMQ_Socket *", "insock", "ZMQ_Socket *", "outsock" },
},
c_function "proxy" { if_defs = "VERSION_3_2",
c_call "ZMQ_Error" "zmq_proxy"
{ "ZMQ_Socket *", "frontend", "ZMQ_Socket *", "backend", "ZMQ_Socket *", "capture?" },
},
--
-- utils
-- zmq_utils.h
--
include "zmq_utils.h",
c_function "stopwatch_start" {
c_call "!ZMQ_StopWatch *" "zmq_stopwatch_start" {},
},
c_function "sleep" {
c_call "void" "zmq_sleep" { "int", "seconds_" },
},
--
-- This dump function is for getting a copy of the FFI-based bindings code and is
-- only for debugging.
--
c_function "dump_ffi" {
var_out{ "const char *", "ffi_code", has_length = true, },
c_source[[
${ffi_code} = ${module_c_name}_ffi_lua_code;
${ffi_code_len} = sizeof(${module_c_name}_ffi_lua_code) - 1;
]],
},
}

Loading…
Cancel
Save