Compare commits

..

103 Commits

Author SHA1 Message Date
Kamal Curi 84d7d0812e Changing to https 7 months ago
Kamal Curi b171f13fe9 Update 'rockspecs/lua-zmq-scm-1.rockspec' 7 months ago
Kamal Curi fafe11f9c8 Change URL to HTTPS 7 months ago
Robert G. Jakabosky d7812eabad
Bump version. 2 years ago
Robert G. Jakabosky 9588c05348
Support newer zmq and Lua versions. 2 years ago
Robert G. Jakabosky fba94951bc
Support newer libzmq. 2 years ago
Robert G. Jakabosky b9d5000b9d Fix throughput benchmark. Use PUSH/PULL instead of PUB/SUB. 10 years ago
Robert G. Jakabosky 33326d61e4 Update pre-generated bindings. 10 years ago
Robert G. Jakabosky 79d0c5f9fd Remove BOM from .travis.yml to fix travis-ci build. 10 years ago
Robert G. Jakabosky 4db6fc29aa Revert all changes to .travis.yml file. 10 years ago
Robert G. Jakabosky 0052cbd0dc Test simple travis-ci script. 10 years ago
Robert G. Jakabosky 818a28e202 Testing travis-ci 10 years ago
Robert G. Jakabosky 9bb3bd4bf4 Travis-ci is bady brokenvi .travis.yml vi .travis.yml vi .travis.yml 10 years ago
Robert G. Jakabosky 8d8be64a01 Travis-ci seems to been caching old build script. 10 years ago
Robert G. Jakabosky dbe3acabcf Travis-ci broke the build script. 10 years ago
Robert G. Jakabosky 19c277008e Update travis-ci to use Ubuntu Trusty. 10 years ago
Robert G. Jakabosky 601ea38a91 Trying to fix travis-ci. 10 years ago
Robert G. Jakabosky 226976543c Trying to fix travis-ci. 10 years ago
Robert G. Jakabosky 7f9e341773 make travis script executable. 10 years ago
Robert G. Jakabosky 8063d6dc63 Try fixing broken travis-ci build script. 10 years ago
Robert G. Jakabosky d5ef59d67c Update zmq_msg_t typedef to support 4.x. 10 years ago
Robert G. Jakabosky 0be2975f7a Fix bug with zeromq 4.x & Luajit. 10 years ago
Robert G. Jakabosky b861158ec6 Update pre-generated source. 12 years ago
Robert G. Jakabosky a335ef0625 Add support for zeromq 4.x 12 years ago
Robert G. Jakabosky 327583c0f6 Make sure to close the msg before returning errors. 12 years ago
Robert G. Jakabosky e47f0a9a31 Add throughput stats. 12 years ago
Robert G. Jakabosky 66ef76c795 Fix API docs for poller timeout units. 12 years ago
Robert G. Jakabosky 1d6ed5cc3f re-generate bindings. Adding context option set/get methods. 13 years ago
Robert Gabriel Jakabosky ca95905b7a Merge pull request #47 from TimMensch/context_feature
added set and get functions on ZMQ context
13 years ago
Tim Mensch 417ffd7f46 Add if_defs for VERSION_3_2 13 years ago
Tim Mensch 325b5874d1 added set and get functions on ZMQ context 13 years ago
Robert G. Jakabosky def9da8920 Add unbind & disconnect socket methods. 13 years ago
Robert G. Jakabosky 18101b0f05 Add re-generate docs. 13 years ago
Robert G. Jakabosky 8f7dda3147 Don't need to wrap thread object in a table. 13 years ago
Robert G. Jakabosky 3499ee79ae Fix bug with returning error from non-ffi bindings. 13 years ago
Robert G. Jakabosky b0413d4e75 Add zmq_proxy() function for ZeroMQ 3.2.x
zmq_device() is also available in 3.2.x as a deprecated method.
13 years ago
Robert G. Jakabosky 16505b546f Fix definition of ProtoFields to support latest wireshark. 13 years ago
Robert G. Jakabosky d934a9cbb1 Enable all Travis CI build profiles. 13 years ago
Robert G. Jakabosky 0708142d72 Use libzmq3-dev in Travis CI build. 13 years ago
Robert G. Jakabosky 24e50ebd04 Install libzmq-dev. 13 years ago
Robert G. Jakabosky 4038c4f92f Add missing test script. 13 years ago
Robert G. Jakabosky 070e985098 Add Travis CI. 13 years ago
Robert G. Jakabosky bf0c221ac0 Fix typo. 13 years ago
Robert G. Jakabosky 37960ae7c6 Fix a bad global in poller's FFI code. 13 years ago
Robert G. Jakabosky b9e2ad3ea1 Add asserts to catch errors from zeromq (like bind errors). 13 years ago
Robert G. Jakabosky 8d962b4ac9 Set HWM option to zero to override new default in zeromq 3.x 13 years ago
Robert G. Jakabosky 214e2601cf Update supported zeromq versions. 13 years ago
Robert G. Jakabosky 8f239ff6f5 regenerate bindings with Buffer interface. 13 years ago
Robert G. Jakabosky cbfaecd38c Add support for Buffer interface. 13 years ago
Robert G. Jakabosky f83cacde52 Add more stats. 13 years ago
Robert G. Jakabosky 58f6c5db3c Update Windows download link for LuaJIT 2.0 beta11. 14 years ago
Robert G. Jakabosky 6845eba8d2 Regenerate bindings with Lua 5.2 support. 14 years ago
Robert G. Jakabosky 8eaceefab3 Remove duplicate cmake variables. 14 years ago
Robert G. Jakabosky 9841573893 Add sub-modules to 'zmq' module's table. 14 years ago
Petr Štetiar 4642135e12 Fix crash in {set,get}sockoption with zmq 3.2.1
$ lua examples/server_poll.lua
	Invalid socket option type, this shouldn't happen.
	Aborted (core dumped)

Signed-off-by: Petr Štetiar <ynezz@true.cz>
Signed-off-by: Robert G. Jakabosky <bobby@sharedrealm.com>
14 years ago
Robert G. Jakabosky 32ea196579 Update NativeObject cmake files. 14 years ago
Robert G. Jakabosky 466e78aad1 Add download link for windows build 14 years ago
Robert G. Jakabosky b5e03e5d16 Update cmake build files. 14 years ago
Robert G. Jakabosky ce2d1c7c5a Add locat/remote throughput tests using PUSH/PULL sockets. 14 years ago
Robert G. Jakabosky dc1e7b80d0 Use msg copy method. 14 years ago
Robert G. Jakabosky 46ae68fe9e Add support for ZeroMQ 2.2 14 years ago
Robert G. Jakabosky b043c7631f Try Forwarding multiple messages per IO event notification. 14 years ago
Robert G. Jakabosky 8411d346c9 Update URLs for rockspecs. 14 years ago
Robert G. Jakabosky 2785eda10e Remove unused variables. 14 years ago
Robert G. Jakabosky 211a569a15 Directly call 'on_work' callback from IO read callback. 14 years ago
Robert G. Jakabosky 4d23290e3a Make sure 'on_work' is enabled until 'recv' blocks, so all messages are consumed before blocking. 14 years ago
Robert G. Jakabosky a9630265f1 Add example use of poll_zsock wrapper. 14 years ago
Robert G. Jakabosky 105925e3d0 Add 0mq socket wrapper example to show how to use 0mq socket with other event loops (lib-ev, epoll). 14 years ago
Robert G. Jakabosky c5087a2d91 Set priority on Idle watcher. 14 years ago
Robert G. Jakabosky 78ac082f62 Convert return value of msg:size() to a Lua number from 'size_t' in FFI bindings. 14 years ago
Robert G. Jakabosky df6157eb37 Add lua-ev poller backend. 14 years ago
Robert G. Jakabosky 2adc7dfbc2 Add support for multiple poller backends. 14 years ago
Robert G. Jakabosky a74cf52c11 Correct copyrights. 14 years ago
Robert G. Jakabosky 5e6a7b5363 Rename lua-ev example. 14 years ago
Robert G. Jakabosky 3b93a80233 Add more event loop examples. 14 years ago
Robert G. Jakabosky a598995e33 Get zmq socket events in IO read callback. Allow batch recvs. 14 years ago
Robert G. Jakabosky 0697debefb Cleanup examples code. 14 years ago
Robert G. Jakabosky caba791908 Fix error to string conversion for standard Lua bindings. 14 years ago
Robert G. Jakabosky bbaa86a552 io_threads parameter now defaults to 1. 14 years ago
Robert G. Jakabosky 5f8ae64393 Fix FFI bindings for socket option getter/setter methods. 14 years ago
Robert G. Jakabosky 81f1182507 Fix bug with FFI bindings of sock:setopt(). 14 years ago
Robert G. Jakabosky 421101376e Re-generate bindings to fix bug with weak objects. 14 years ago
Robert G. Jakabosky 6ee18ea74d Add missing XPUB/XSUB socket types for zmq 3.1 support.
Also make sure DEALER/ROUTER and XREQ/XREP are all defined.
14 years ago
Robert G. Jakabosky 3058dce215 Change where ZMQ_NOBLOCK is defined if missing. 14 years ago
Robert G. Jakabosky 0aa2ed8e3f Add missing zmq.DONTWAIT constant for zmq 3.1 support.
For forwards & backward compatibility define both DONTWAIT & NOBLOCK
for all versions.
14 years ago
Robert G. Jakabosky 36374c33fe Add zmq.poller version of local_thr.lua to perf/ folder. 14 years ago
Robert G. Jakabosky 36fa3226ef Fix exporting of zmq.poller C functions for FFI bindings. 14 years ago
Robert G. Jakabosky b2aec4654e Regenerate bindings.
Fix ffi detection code.
Split Lua code into multiple smaller C literals.
14 years ago
Robert G. Jakabosky 715f531f52 Fix rockspec for building on windows. 14 years ago
Robert G. Jakabosky 779a406957 Fix recv() method in FFI bindings. 14 years ago
Robert G. Jakabosky 8d780628a0 Fix compile errors with MSVC compiler. 14 years ago
Robert G. Jakabosky 7de39bd409 Export module functions for FFI support on windows. 14 years ago
Robert G. Jakabosky 7adc63d089 Fix problem with FFI support on windows. 14 years ago
Robert G. Jakabosky 38a3f21ba5 Disable ffi bindings support on windows for now. 14 years ago
Robert G. Jakabosky a58d7ad389 Fix for building on Windows. 14 years ago
Robert G. Jakabosky 04208f1059 Change 'false' to 0 in C code. 14 years ago
Robert G. Jakabosky f50970f7a1 Replace use of 'bool' type with 'int'. 14 years ago
Robert G. Jakabosky a9b9ca278a Register module as a global to support old code that doesn't save the
module to a local variable.
14 years ago
Robert G. Jakabosky e8229c5b55 Remove debug print(). 15 years ago
Robert G. Jakabosky 34e19cb65c Update luvit FFI module path detection logic. 15 years ago
Robert G. Jakabosky 8200e8f54a Fix loading of FFI bindings for luvit. 15 years ago
Robert G. Jakabosky f0fd8edddc Fix some Luvit issues. 15 years ago
Robert G. Jakabosky a264b26c48 Fix recv() method when used with ZeroMQ 3.1.x 15 years ago

@ -0,0 +1,68 @@
language: c
env:
matrix:
- LUA=lua5.1 LIBLUA=liblua5.1-dev LUA_INCDIR=/usr/include/lua5.1 LUA_LIB=lua5.1
- LUA=lua5.2 LIBLUA=liblua5.2-dev LUA_INCDIR=/usr/include/lua5.2 LUA_LIB=lua5.2
- LUA=luajit LIBLUA=libluajit-5.1-dev LUA_INCDIR=/usr/include/luajit-2.0 LUA_LIB=luajit-5.1
branches:
only:
- master
compiler:
- gcc
before_install:
- if [ $LUA = "luajit" ]; then
sudo add-apt-repository ppa:mwild1/ppa -y && sudo apt-get update -y;
fi
install:
- sudo apt-get install libzmq3-dev -y
- sudo apt-get install $LUA -y
- sudo apt-get install $LIBLUA -y
- LUA_LIBDIR=`pkg-config $LUA --variable=libdir`
- INSTALL_LMOD=`pkg-config $LUA --variable=INSTALL_LMOD`
- INSTALL_CMOD=`pkg-config $LUA --variable=INSTALL_CMOD`
## make sure there is a 'lua' command.
- if [ ! -x /usr/bin/lua ]; then
sudo ln -s `which $LUA` /usr/bin/lua;
fi
## install lua-llthreads
- git clone git://github.com/Neopallium/lua-llthreads.git
- cd lua-llthreads ; mkdir build ; cd build
- cmake .. -DLUA_LIBRARIES=$LUA_LIBDIR -DLUA_INCLUDE_DIR=$LUA_INCDIR
-DINSTALL_LMOD=$INSTALL_LMOD -DINSTALL_CMOD=$INSTALL_CMOD
- make
- sudo make install
- cd ../..
script:
#### build using pre-generated bindings.
- mkdir build; cd build
- cmake .. -DLUA_LIBRARIES=$LUA_LIBDIR -DLUA_INCLUDE_DIR=$LUA_INCDIR
-DINSTALL_LMOD=$INSTALL_LMOD -DINSTALL_CMOD=$INSTALL_CMOD
- make
- sudo make install
# Run tests.
- $LUA ../tests/test_inproc.lua
- $LUA ../perf/thread_lat.lua 1 1000
- cd .. ; rm -rf build
#### Re-Generate bindings.
- git clone git://github.com/Neopallium/LuaNativeObjects.git;
- mkdir build; cd build
- cmake .. -DLUA_LIBRARIES=$LUA_LIBDIR -DLUA_INCLUDE_DIR=$LUA_INCDIR
-DLUA_NATIVE_OBJECTS_PATH=$TRAVIS_BUILD_DIR/LuaNativeObjects
-DUSE_PRE_GENERATED_BINDINGS=OFF -DGENERATE_LUADOCS=OFF
-DINSTALL_LMOD=$INSTALL_LMOD -DINSTALL_CMOD=$INSTALL_CMOD
- make
- sudo make install
# Run tests.
- $LUA ../tests/test_inproc.lua
- $LUA ../perf/thread_lat.lua 1 1000
notifications:
email:
on_failure: always
on_success: change

@ -147,6 +147,14 @@ See [zmq_msg_copy(3)](http://api.zeromq.org/zmq_msg_copy.html).
msg1:copy(msg2) -- copy contents from msg2 -> msg1 msg1:copy(msg2) -- copy contents from msg2 -> msg1
## set_size(size)
Re-initialize the message with a new size. The current contents will be lost.
See [zmq_msg_init_size(3)](http://api.zeromq.org/zmq_msg_init_size.html).
msg:set_size(size) -- re-initialize message if size is different from current size.
local buf = msg:data() -- get buffer to fill message with new contents.
## set_data(data) ## set_data(data)
Change the message contents. Change the message contents.
@ -156,7 +164,7 @@ See [zmq_msg_data(3)](http://api.zeromq.org/zmq_msg_data.html).
## data() ## data()
Get the message contents. Get a lightuserdata pointer to the message contents.
See [zmq_msg_data(3)](http://api.zeromq.org/zmq_msg_data.html). See [zmq_msg_data(3)](http://api.zeromq.org/zmq_msg_data.html).
local data = msg:data() -- get the message contents local data = msg:data() -- get the message contents
@ -218,10 +226,14 @@ Remove a socket/fd from the poller.
## poll(timeout) ## poll(timeout)
Wait `timeout` microseconds for events on the registered sockets (timeout = -1, means Wait `timeout` milliseconds [1] for events on the registered sockets (timeout = -1, means
wait indefinitely). If any events happen, then those events are dispatched. wait indefinitely). If any events happen, then those events are dispatched.
poller:poll(1000000) -- wait 1 second for events. poller:poll(1000) -- wait 1 second for events.
[1] For zmq 2.x `timeout` is in microseconds. For versions 3.x or higher `timeout` will be in milliseconds.
poller:poll(1000 * zmq.POLL_MSEC) -- backwards/forwards compatible
## start() ## start()

@ -1,7 +1,7 @@
# #
# Lua bindings for 0MQ # Lua bindings for 0MQ
# #
cmake_minimum_required(VERSION 2.8) cmake_minimum_required(VERSION 3.18)
project(lua-zmq C) project(lua-zmq C)
@ -9,14 +9,10 @@ set(BUILD_SHARED_LIBS TRUE)
set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake) set(CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/cmake)
set(INSTALL_LMOD ${CMAKE_INSTALL_PREFIX}/share/lua/5.1 CACHE PATH set(INSTALL_LMOD ${CMAKE_INSTALL_PREFIX}/share/lua/ CACHE PATH
"Directory to install Lua source modules (configure lua via LUA_PATH)") "Directory to install Lua source modules (configure lua via LUA_PATH)")
set(INSTALL_CMOD ${CMAKE_INSTALL_PREFIX}/lib/lua/5.1 CACHE PATH set(INSTALL_CMOD ${CMAKE_INSTALL_PREFIX}/lib/lua/ CACHE PATH
"Directory to install Lua binary modules (configure lua via LUA_CPATH)") "Directory to install Lua binary modules (configure lua via LUA_CPATH)")
set(LUA_NATIVE_OBJECTS_PATH ../LuaNativeObjects CACHE PATH
"Directory to LuaNativeObjects bindings generator.")
set(USE_PRE_GENERATED_BINDINGS TRUE CACHE BOOL
"Set this to FALSE to re-generate bindings using LuaNativeObjects")
set(ZMQ_PATH "" CACHE PATH set(ZMQ_PATH "" CACHE PATH
"Directory to libzmq. (by default use pkg-config to detect path)") "Directory to libzmq. (by default use pkg-config to detect path)")
@ -24,10 +20,19 @@ set(COMMON_CFLAGS "${CFLAGS}")
set(COMMON_LDFLAGS) set(COMMON_LDFLAGS)
set(COMMON_LIBS) set(COMMON_LIBS)
## Lua 5.1.x ## Lua 5.x
include(FindLua51) include(FindLua)
if(NOT ${LUA51_FOUND}) if(NOT ${LUA_FOUND})
message(FATAL_ERROR "The FindLua51 module could not find lua :-(") message(FATAL_ERROR "The FindLua module could not find lua :-(")
endif()
set(COMMON_LIBS "${COMMON_LIBS};${LUA_LIBRARIES}")
if(WIN32)
set(COMMON_CFLAGS "${COMMON_CFLAGS} -I${LUA_INCLUDE_DIR}")
set(COMMON_LDFLAGS "${COMMON_LDFLAGS} ${LUA_LIBRARY}")
if(NOT MSVC)
set(COMMON_LDFLAGS "${COMMON_LDFLAGS} -Wl,--export-all-symbols")
endif()
endif() endif()
## MAC OSX needs extra linker flags ## MAC OSX needs extra linker flags
if(APPLE) if(APPLE)
@ -44,8 +49,13 @@ if(WIN32)
endif() endif()
if(IS_DIRECTORY ${ZMQ_PATH}) if(IS_DIRECTORY ${ZMQ_PATH})
set(COMMON_CFLAGS "${COMMON_CFLAGS} -I${ZMQ_PATH}/include") set(COMMON_CFLAGS "${COMMON_CFLAGS} -I${ZMQ_PATH}/include")
set(COMMON_LDFLAGS "${COMMON_LDFLAGS} -L${ZMQ_PATH}/lib") if(MSVC)
set(COMMON_LIBS "${COMMON_LIBS};zmq") set(COMMON_LIBS "${COMMON_LIBS};libzmq")
else()
set(COMMON_LDFLAGS "${COMMON_LDFLAGS} -L${ZMQ_PATH}/lib")
set(COMMON_LIBS "${COMMON_LIBS};zmq")
endif()
link_directories(${ZMQ_PATH}/lib)
else() else()
## fallback to using pkg-config ## fallback to using pkg-config
include(FindPkgConfig) include(FindPkgConfig)
@ -63,8 +73,6 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_BINARY_DIR}
${LUA_INCLUDE_DIR}) ${LUA_INCLUDE_DIR})
link_directories(${ZMQ_LIBRARY_DIRS})
## LuaZMQ ## LuaZMQ
set(LUA_ZMQ_SRC set(LUA_ZMQ_SRC
zmq.nobj.lua zmq.nobj.lua

@ -1,8 +1,15 @@
About About
===== =====
[![travis-ci status](https://secure.travis-ci.org/Neopallium/lua-zmq.png?branch=master)](http://travis-ci.org/Neopallium/lua-zmq/builds)
Lua bindings to zeromq2. Check out the [ZeroMQ Guide with Lua examples](http://zguide.zeromq.org/lua:all). Lua bindings to zeromq2. Check out the [ZeroMQ Guide with Lua examples](http://zguide.zeromq.org/lua:all).
Windows
=======
Download a compiled version of [LuaJIT 2.0.0-beta11 + lua-zmq + zeromq2.2.0](https://github.com/downloads/Neopallium/lua-zmq/luajit2.0_beta11_zmq2.2_llthreads.zip) 32bit & 64bit.
API API
=== ===
@ -12,7 +19,7 @@ See [API.md](https://github.com/Neopallium/lua-zmq/blob/master/API.md) and
Requirements Requirements
============ ============
* ZeroMQ version 2.1.x. * ZeroMQ version 2.1, 2.2 or 3.2.
* Might work with some 2.0.x versions (2.0.6 and lower are not supported). * Might work with some 2.0.x versions (2.0.6 and lower are not supported).
For Ubuntu 10.10 users: For Ubuntu 10.10 users:
@ -45,12 +52,12 @@ Latest Git revision
With LuaRocks 2.0.4.1: With LuaRocks 2.0.4.1:
$ sudo luarocks install https://github.com/Neopallium/lua-zmq/raw/master/rockspecs/lua-zmq-scm-1.rockspec $ sudo luarocks install https://raw.github.com/Neopallium/lua-zmq/master/rockspecs/lua-zmq-scm-1.rockspec
For threads support: For threads support:
$ sudo luarocks install https://github.com/Neopallium/lua-llthreads/raw/master/rockspecs/lua-llthreads-scm-0.rockspec $ sudo luarocks install https://raw.github.com/Neopallium/lua-llthreads/master/rockspecs/lua-llthreads-scm-0.rockspec
$ sudo luarocks install https://github.com/Neopallium/lua-zmq/raw/master/rockspecs/lua-zmq-threads-scm-0.rockspec $ sudo luarocks install https://raw.github.com/Neopallium/lua-zmq/master/rockspecs/lua-zmq-threads-scm-0.rockspec
With CMake: With CMake:

@ -0,0 +1,13 @@
To re-generating the bindings
-----------------------------
You will need to install LuaNativeObjects and set the CMake variable `USE_PRE_GENERATED_BINDINGS` to FALSE.
By default CMake will use the pre-generated bindings that are include in the project.
Build Dependencies
------------------
Optional dependency for re-generating Lua bindings from `*.nobj.lua` files:
* [LuaNativeObjects](https://github.com/Neopallium/LuaNativeObjects), this is the bindings generator used to convert the `*.nobj.lua` files into a native Lua module.

@ -1,19 +1,39 @@
# #
# Lua Native Objects # Lua Native Objects
# #
find_program(LUA_NATIVE_OBJECTS_EXECUTABLE native_objects.lua
PATHS ${CMAKE_SOURCE_DIR}/../LuaNativeObjects
DOC "LuaNativeObjects executable path")
set(USE_PRE_GENERATED_BINDINGS TRUE CACHE BOOL
"Set this to FALSE to re-generate bindings using LuaNativeObjects")
set(GENERATE_LUADOCS TRUE CACHE BOOL
"Set this to FALSE to avoid generation of docs using LuaDoc")
macro(GenLuaNativeObjects _src_files_var) macro(GenLuaNativeObjects _src_files_var)
set(_new_src_files) set(_new_src_files)
foreach(_src_file ${${_src_files_var}}) foreach(_src_file ${${_src_files_var}})
if(_src_file MATCHES ".nobj.lua") if(_src_file MATCHES ".nobj.lua")
string(REGEX REPLACE ".nobj.lua" ".nobj.c" _src_file_out ${_src_file}) string(REGEX REPLACE ".nobj.lua" ".nobj.c" _src_file_out ${_src_file})
string(REGEX REPLACE ".nobj.lua" ".nobj.h" _header_file_out ${_src_file}) string(REGEX REPLACE ".nobj.lua" ".nobj.ffi.lua" _ffi_file_out ${_src_file})
add_custom_command(OUTPUT ${_src_file_out} ${_header_file_out} add_custom_command(OUTPUT ${_src_file_out} ${_ffi_file_out}
COMMAND lua ${LUA_NATIVE_OBJECTS_PATH}/native_objects.lua -outpath ${CMAKE_CURRENT_BINARY_DIR} -gen lua ${_src_file} COMMAND ${LUA_NATIVE_OBJECTS_EXECUTABLE} -outpath ${CMAKE_CURRENT_BINARY_DIR} -gen lua ${_src_file}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS ${_src_file} DEPENDS ${_src_file}
) )
set_source_files_properties(${_src_file_out} PROPERTIES GENERATED TRUE) set_source_files_properties(${_src_file_out} PROPERTIES GENERATED TRUE)
set_source_files_properties(${_header_file_out} PROPERTIES GENERATED TRUE) set_source_files_properties(${_ffi_file_out} PROPERTIES GENERATED TRUE)
if (${GENERATE_LUADOCS})
string(REGEX REPLACE ".nobj.lua" "" _doc_base ${_src_file})
string(REGEX REPLACE ".nobj.lua" ".luadoc" _doc_file_out ${_src_file})
add_custom_target(${_doc_file_out} ALL
COMMAND ${LUA_NATIVE_OBJECTS_EXECUTABLE} -outpath docs -gen luadoc ${_src_file}
COMMAND luadoc -nofiles -d docs docs
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS ${_src_file}
)
endif()
set_source_files_properties(${_doc_file_out} PROPERTIES GENERATED TRUE)
set(_new_src_files ${_new_src_files} ${_src_file_out}) set(_new_src_files ${_new_src_files} ${_src_file_out})
else(_src_file MATCHES ".nobj.lua") else(_src_file MATCHES ".nobj.lua")
set(_new_src_files ${_new_src_files} ${_src_file}) set(_new_src_files ${_new_src_files} ${_src_file})

@ -18,15 +18,24 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE. -- THE SOFTWARE.
require("zmq") local zmq = require"zmq"
local ctx = zmq.init(1) local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local s = ctx:socket(zmq.REQ) local s = ctx:socket(zmq.REQ)
s:connect("tcp://localhost:5555") s:connect("tcp://localhost:5555")
s:send("SELECT * FROM mytable") for i=1,N do
print(s:recv()) s:send("SELECT * FROM mytable")
local data, err = s:recv()
if data then
print(data)
else
print("s:recv() error:", err)
end
end
s:close() s:close()
ctx:term() ctx:term()

@ -18,17 +18,26 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE. -- THE SOFTWARE.
require("zmq") local zmq = require"zmq"
local ctx = zmq.init(1) local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local s = ctx:socket(zmq.REQ) local s = ctx:socket(zmq.REQ)
s:connect("tcp://localhost:5555") s:connect("tcp://localhost:5555")
s:send("SELECT * FROM mytable ", zmq.SNDMORE) for i=1,N do
s:send("WHERE library = 'zmq'") s:send("SELECT * FROM mytable ", zmq.SNDMORE)
s:send("WHERE library = 'zmq'")
print(s:recv()) local data, err = s:recv()
if data then
print(data)
else
print("s:recv() error:", err)
end
end
s:close() s:close()
ctx:term() ctx:term()

@ -0,0 +1,108 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local poller = require"examples.poller"
local poll = poller.new()
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local s = ctx:socket(zmq.REQ)
local s_FD = s:getopt(zmq.FD)
s:connect("tcp://localhost:5555")
-- current socket state
local blocked_state
local blocked_event
local on_sock_recv
local on_sock_send
-- IO event callback when socket was blocked
local function on_sock_io()
local events = s:getopt(z_EVENTS)
local unblocked = false
if events == blocked_event then
-- got the event the socket was blocked on.
unblocked = true
elseif events == z_POLLIN_OUT then
-- got both in & out events
unblocked = true
end
if unblocked then
-- got the event we are blocked on resume.
blocked_event = nil
blocked_state()
-- check if blocked event was processed.
if not blocked_event then
poll:remove_read(s_FD)
end
end
end
local function sock_blocked(state, event)
if not blocked_event then
-- need to register socket's fd with event loop
poll:add_read(s_FD, on_sock_io)
end
blocked_state = state
blocked_event = event
end
-- sock state functions
function on_sock_send()
N = N - 1
if N == 0 then
return poll:stop()
end
local sent, err = s:send("SELECT * FROM mytable", z_NOBLOCK)
if not sent then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_send, z_POLLOUT)
end
-- yield back to event loop
poll:add_work(on_sock_recv)
end
function on_sock_recv()
local data, err = s:recv(z_NOBLOCK)
if not data then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_recv, z_POLLIN)
end
print(data)
return on_sock_send()
end
-- start processing of the socket.
poll:add_work(on_sock_send)
-- start event loop
poll:start()
s:close()
ctx:term()

@ -0,0 +1,39 @@
local zmq = require'zmq'
local poller = require"examples.poller"
local poll_zsock = require"examples.poll_zsock"
local poll = poller.new()
poll_zsock.set_poller(poll)
local c = zmq.init(1)
local xreq = poll_zsock(c:socket(zmq.XREQ))
xreq:bind('tcp://127.0.0.1:13333')
local xrep = poll_zsock(c:socket(zmq.XREP))
xrep:bind('tcp://127.0.0.1:13334')
local max_recv = 10
local function forward_io(src,dst)
src.on_data = function()
for i=1,max_recv do
repeat
local data, err = src:recv(zmq.NOBLOCK)
if not data then
if err == 'timeout' then
return
else
error("socket recv error:" .. err)
end
end
local more = src:getopt(zmq.RCVMORE) > 0
dst:send(data,more and zmq.SNDMORE or 0)
until not more
end
end
end
forward_io(xrep,xreq)
forward_io(xreq,xrep)
poll:start()

@ -0,0 +1,177 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local zmq = require"zmq"
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local poll
local meths = {}
local zsock_mt = { __index=meths }
local function zsock_check_events(self)
if not self.check_enabled then
-- enable 'on_work' callback to handle checking for socket events.
self.check_enabled = true
poll:add_work(self.on_work)
end
end
function meths:events()
zsock_check_events(self)
return self.sock:events()
end
function meths:getopt(opt)
if (opt == z_EVENTS) then
zsock_check_events(self)
end
return self.sock:getopt(opt)
end
function meths:setopt(opt,val)
return self.sock:setopt(opt,val)
end
function meths:sub(topic)
return self.sock:sub(topic)
end
function meths:unsub(topic)
return self.sock:unsub(topic)
end
function meths:identity(id)
return self.sock:identity(id)
end
function meths:bind(addr)
return self.sock:bind(addr)
end
function meths:connect(addr)
return self.sock:connect(addr)
end
function meths:close()
return self.sock:close()
end
function meths:send(msg, flags)
zsock_check_events(self)
local sent, err = self.sock:send(msg, flags)
if not sent and err == 'timeout' then
self.send_blocked = true
end
return sent, err
end
function meths:send_msg(msg, flags)
zsock_check_events(self)
local sent, err = self.sock:send_msg(msg, flags)
if not sent and err == 'timeout' then
self.send_blocked = true
end
return sent, err
end
function meths:recv(flags)
zsock_check_events(self)
local msg, err = self.sock:recv(flags)
if not msg and err == 'timeout' then
self.recv_blocked = true
end
return msg, err
end
function meths:recv_msg(msg, flags)
zsock_check_events(self)
local stat, err = self.sock:recv_msg(msg, flags)
if not stat and err == 'timeout' then
self.recv_blocked = true
end
return stat, err
end
local function nil_cb()
end
local function wrap_zsock(sock, on_data, on_drain)
local self = setmetatable({
sock = sock,
on_data = on_data or nil_cb,
on_drain = on_drain or nil_cb,
recv_blocked = false,
send_blocked = false,
check_enabled = false,
}, zsock_mt)
local function on_work()
self.check_enabled = false
local events = sock:events()
local read = false
local write = false
if events == z_POLLIN_OUT then
read = true
write = true
elseif events == z_POLLIN then
read = true
elseif events == z_POLLOUT then
write = true
else
return
end
if read then
self.recv_blocked = false
self:on_data(sock)
-- there might be more messages to read.
if not self.recv_blocked then
zsock_check_events(self)
end
end
if write and self.send_blocked then
self:on_drain(sock)
end
end
self.on_work = on_work
-- listen for read events to enable socket.
poll:add_read(sock:fd(), function()
on_work()
end)
zsock_check_events(self)
return self
end
return setmetatable({
set_poller = function(poller)
local old = poll
poll = poller
return old
end,
wrap_zsock = wrap_zsock,
}, { __call = function(tab, ...) return wrap_zsock(...) end})

@ -0,0 +1,45 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
-- safe require.
local require = require
local function safe_require(...)
return pcall(require, ...)
end
local mod_name = ...
local backends = {
"epoll",
"ev",
}
for i=1,#backends do
local backend = backends[i]
local name = mod_name .. '.' .. backend
local status, mod = safe_require(name)
if status then
--print("Loaded backend:", name)
return mod
end
end
error("Failed to load backend for: " .. mod_name)

@ -0,0 +1,121 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local epoll = require"epoll"
local EPOLLIN = epoll.EPOLLIN
local EPOLLOUT = epoll.EPOLLOUT
local poller_meths = {}
local poller_mt = {__index = poller_meths}
local function poller_new()
local reads = {}
-- create closure for epoll io_event callback.
local function do_io_event(fd, ev)
local cb = reads[fd]
return cb(fd, ev)
end
return setmetatable({
work_cur = {},
work_last = {},
reads = reads,
io_events = 0,
do_io_event = do_io_event,
poller = epoll.new(),
}, poller_mt)
end
function poller_meths:add_work(task)
-- add task to current work queue.
self.work_cur[#self.work_cur + 1] = task
end
function poller_meths:add_read(fd, cb)
-- make sure read event hasn't been registered yet.
if not self.reads[fd] then
self.io_events = self.io_events + 1
self.reads[fd] = cb
return self.poller:add(fd, EPOLLIN, fd)
else
-- update read callback?
self.reads[fd] = cb
end
end
function poller_meths:remove_read(fd)
-- make sure there was a read event registered.
if self.reads[fd] then
self.io_events = self.io_events - 1
self.reads[fd] = nil
return self.poller:del(fd)
end
end
local function poller_do_work(self)
local tasks = #self.work_cur
-- check if there is any work
if tasks > 0 then
-- swap work queues.
local last, cur = self.work_cur, self.work_last
self.work_cur, self.work_last = cur, last
for i=1,tasks do
local task = last[i]
last[i] = nil
task()
end
-- return new work queue length.
return #cur
end
return tasks
end
function poller_meths:start()
local do_io_event = self.do_io_event
local poller = self.poller
self.is_running = true
while self.is_running do
-- run work task
local new_work = poller_do_work(self)
-- wait == 0, if there is work to do, else wait == -1
local wait = (new_work > 0) and 0 or -1
-- poll for fd events, if there are events to poll for.
--print("poller:step()", new_work, self.io_events)
if self.io_events > 0 then
assert(poller:wait_callback(do_io_event, wait))
else
-- no io events to poll, do we still have work?
if #self.work_cur == 0 then
-- nothing to do, exit event loop
self.is_running = false
return
end
end
end
end
function poller_meths:stop()
self.is_running = false
end
-- module only exports a 'new' function.
return {
new = poller_new,
}

@ -0,0 +1,119 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local ev = require'ev'
local ev_READ = ev.READ
local ev_WRITE = ev.WRITE
local loop = ev.Loop.default
assert(ev.Idle,"Need version > 1.3 of lua-ev that supports Idle watchers.")
local poller_meths = {}
local poller_mt = {__index = poller_meths}
local function poller_new()
local self = {
work_cur = {},
work_last = {},
io_events = 0,
reads = {},
idle_enabled = false,
}
self.idle = ev.Idle.new(function()
local tasks = #self.work_cur
-- check if there is any work
if tasks > 0 then
-- swap work queues.
local last, cur = self.work_cur, self.work_last
self.work_cur, self.work_last = cur, last
for i=1,tasks do
local task = last[i]
last[i] = nil
task()
end
-- check if there is more work.
if #cur > 0 then
return -- don't disable idle watcher, when we have work.
end
end
--print("STOP IDLE:", #self.work_cur, #self.work_last)
-- stop idle watcher, no work.
self.idle_enabled = false
self.idle:stop(loop)
end)
-- set priority to max, to make sure the work queue is processed on each loop.
self.idle:priority(ev.MAXPRI)
return setmetatable(self, poller_mt)
end
function poller_meths:add_work(task)
local idx = #self.work_cur + 1
-- add task to current work queue.
self.work_cur[idx] = task
-- make sure the idle watcher is enabled.
if not self.idle_enabled then
self.idle_enabled = true
self.idle:start(loop)
end
end
function poller_meths:add_read(fd, cb)
local io_read = self.reads[fd]
-- make sure read event hasn't been registered yet.
if not io_read then
self.io_events = self.io_events + 1
io_read = ev.IO.new(function()
cb(fd)
end, fd, ev_READ)
self.reads[fd] = io_read
io_read:start(loop)
else
-- update read callback?
io_read:callback(cb)
-- need to re-start watcher?
if not io_read:is_active() then
io_read:start(loop)
end
end
end
function poller_meths:remove_read(fd)
local io_read = self.reads[fd]
-- make sure there was a read event registered.
if io_read then
self.io_events = self.io_events - 1
io_read:stop(loop)
end
end
function poller_meths:start()
return loop:loop()
end
function poller_meths:stop()
return loop:unloop()
end
-- module only exports a 'new' function.
return {
new = poller_new,
}

@ -18,15 +18,15 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE. -- THE SOFTWARE.
require("zmq") local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init()
local s = ctx:socket(zmq.PUB) local s = ctx:socket(zmq.PUB)
s:bind("tcp://lo:5555") s:bind("tcp://lo:5555")
local msg_id = 1 local msg_id = 1
while true do while true do
s:send(tostring(msg_id)) s:send(tostring(msg_id))
msg_id = msg_id + 1 msg_id = msg_id + 1
end end

@ -0,0 +1,95 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local poller = require"examples.poller"
local poll = poller.new()
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local ctx = zmq.init()
local s = ctx:socket(zmq.PUB)
local s_FD = s:getopt(zmq.FD)
s:bind("tcp://lo:5555")
-- current socket state
local blocked_state
local blocked_event
local on_sock_recv
local on_sock_send
-- IO event callback when socket was blocked
local function on_sock_io()
local events = s:getopt(z_EVENTS)
local unblocked = false
if events == blocked_event then
-- got the event the socket was blocked on.
unblocked = true
elseif events == z_POLLIN_OUT then
-- got both in & out events
unblocked = true
end
if unblocked then
-- got the event we are blocked on resume.
blocked_event = nil
blocked_state()
-- check if blocked event was processed.
if not blocked_event then
poll:remove_read(s_FD)
end
end
end
local function sock_blocked(state, event)
if not blocked_event then
-- need to register socket's fd with event loop
poll:add_read(s_FD, on_sock_io)
end
blocked_state = state
blocked_event = event
end
-- sock state functions
local msg_id = 1
function on_sock_send()
local sent, err = s:send(tostring(msg_id), z_NOBLOCK)
if not sent then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_send, z_POLLOUT)
end
-- message sent, inc. id
msg_id = msg_id + 1
-- yield back to event loop
poll:add_work(on_sock_send)
end
-- start processing of the socket.
poll:add_work(on_sock_send)
-- start event loop
poll:start()
s:close()
ctx:term()

@ -18,14 +18,14 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE. -- THE SOFTWARE.
require("zmq") local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init()
local s = ctx:socket(zmq.REP) local s = ctx:socket(zmq.REP)
s:bind("tcp://lo:5555") s:bind("tcp://lo:5555")
while true do while true do
print(string.format("Received query: '%s'", s:recv())) print(string.format("Received query: '%s'", s:recv()))
s:send("OK") s:send("OK")
end end

@ -18,18 +18,18 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE. -- THE SOFTWARE.
require("zmq") local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init()
local s = ctx:socket(zmq.REP) local s = ctx:socket(zmq.REP)
s:bind("tcp://lo:5555") s:bind("tcp://lo:5555")
while true do while true do
local query = s:recv() local query = s:recv()
while s:getopt(zmq.RCVMORE) == 1 do while s:getopt(zmq.RCVMORE) == 1 do
query = query .. s:recv() query = query .. s:recv()
end end
print(string.format("Received query: '%s'", query)) print(string.format("Received query: '%s'", query))
s:send("OK") s:send("OK")
end end

@ -0,0 +1,102 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local poller = require"examples.poller"
local poll = poller.new()
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local ctx = zmq.init()
local s = ctx:socket(zmq.REP)
local s_FD = s:getopt(zmq.FD)
s:bind("tcp://lo:5555")
-- current socket state
local blocked_state
local blocked_event
local on_sock_recv
local on_sock_send
-- IO event callback when socket was blocked
local function on_sock_io()
local events = s:getopt(z_EVENTS)
local unblocked = false
if events == blocked_event then
-- got the event the socket was blocked on.
unblocked = true
elseif events == z_POLLIN_OUT then
-- got both in & out events
unblocked = true
end
if unblocked then
-- got the event we are blocked on resume.
blocked_event = nil
blocked_state()
-- check if blocked event was processed.
if not blocked_event then
poll:remove_read(s_FD)
end
end
end
local function sock_blocked(state, event)
if not blocked_event then
-- need to register socket's fd with event loop
poll:add_read(s_FD, on_sock_io)
end
blocked_state = state
blocked_event = event
end
-- sock state functions
function on_sock_recv()
local data, err = s:recv(z_NOBLOCK)
if not data then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_recv, z_POLLIN)
end
print(string.format("Received query: '%s'", data))
return on_sock_send()
end
function on_sock_send()
local sent, err = s:send("OK", z_NOBLOCK)
if not sent then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_send, z_POLLOUT)
end
-- yield back to event loop
poll:add_work(on_sock_recv)
end
-- start processing of the socket.
poll:add_work(on_sock_recv)
-- start event loop
poll:start()
s:close()
ctx:term()

@ -18,14 +18,14 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE. -- THE SOFTWARE.
require("zmq") local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init()
local s = ctx:socket(zmq.SUB) local s = ctx:socket(zmq.SUB)
s:setopt(zmq.SUBSCRIBE, "") s:setopt(zmq.SUBSCRIBE, "")
s:connect("tcp://localhost:5555") s:connect("tcp://localhost:5555")
while true do while true do
local msg = s:recv() local msg = s:recv()
local msg_id = tonumber(msg) local msg_id = tonumber(msg)
if math.mod(msg_id, 10000) == 0 then print(msg_id) end if math.mod(msg_id, 10000) == 0 then print(msg_id) end
end end

@ -1,4 +1,4 @@
-- Copyright (c) 2010 Aleksey Yeschenko <aleksey@yeschenko.com> -- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
-- --
-- Permission is hereby granted, free of charge, to any person obtaining a copy -- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal -- of this software and associated documentation files (the "Software"), to deal
@ -18,65 +18,86 @@
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE. -- THE SOFTWARE.
require("zmq") local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local ev = require'ev' local ev = require'ev'
local loop = ev.Loop.default local loop = ev.Loop.default
-- define a sub_worker class -- define a sub_worker class
local sub_worker_mt = {} local sub_worker_mt = {}
function sub_worker_mt:close(...) function sub_worker_mt:close(...)
self.s_io_idle:stop(self.loop) self.s_io_idle:stop(self.loop)
self.s_io_read:stop(self.loop) self.s_io_read:stop(self.loop)
return self.socket:close(...) return self.socket:close(...)
end end
function sub_worker_mt:bind(...) function sub_worker_mt:bind(...)
return self.socket:bind(...) return self.socket:bind(...)
end end
function sub_worker_mt:connect(...) function sub_worker_mt:connect(...)
return self.socket:connect(...) return self.socket:connect(...)
end end
function sub_worker_mt:sub(topic) function sub_worker_mt:sub(topic)
return self.socket:setopt(zmq.SUBSCRIBE, topic) return self.socket:setopt(zmq.SUBSCRIBE, topic)
end end
function sub_worker_mt:unsub(topic) function sub_worker_mt:unsub(topic)
return self.socket:setopt(zmq.UNSUBSCRIBE, topic) return self.socket:setopt(zmq.UNSUBSCRIBE, topic)
end end
sub_worker_mt.__index = sub_worker_mt sub_worker_mt.__index = sub_worker_mt
local function sub_worker(loop, ctx, msg_cb) local function sub_worker(loop, ctx, msg_cb)
local s = ctx:socket(zmq.SUB) local s = ctx:socket(zmq.SUB)
local self = { loop = loop, socket = s, msg_cb = msg_cb } local self = { loop = loop, socket = s, msg_cb = msg_cb }
setmetatable(self, sub_worker_mt) setmetatable(self, sub_worker_mt)
-- create ev callbacks for recving data. -- create ev callbacks for recving data.
-- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered -- need idle watcher since ZeroMQ sockets are edge-triggered instead of level-triggered
local s_io_idle local s_io_idle
local s_io_read local s_io_read
s_io_idle = ev.Idle.new(function() local max_recvs = 10
local msg, err = s:recv(zmq.NOBLOCK) local function s_recv(recv_cnt)
if err == 'timeout' then local msg, err = s:recv(z_NOBLOCK)
-- need to block on read IO if err == 'timeout' then
s_io_idle:stop(loop) -- need to block on read IO
s_io_read:start(loop) return false
return end
end self:msg_cb(msg)
self:msg_cb(msg) if recv_cnt > 1 then
end) return s_recv(recv_cnt - 1)
s_io_idle:start(loop) end
s_io_read = ev.IO.new(function() return true
s_io_idle:start(loop) end
s_io_read:stop(loop) s_io_idle = ev.Idle.new(function()
end, s:getopt(zmq.FD), ev.READ) if not s_recv(max_recvs) then
self.s_io_idle = s_io_idle -- need to block on read IO
self.s_io_read = s_io_read s_io_idle:stop(loop)
return self s_io_read:start(loop)
end
end)
s_io_idle:start(loop)
s_io_read = ev.IO.new(function()
local events = s:getopt(z_EVENTS)
if events == z_POLLIN or events == z_POLLIN_OUT then
if s_recv(max_recvs) then
-- read IO is not block, enable idle watcher to handle reads.
s_io_idle:start(loop)
s_io_read:stop(loop)
end
end
end, s:getopt(zmq.FD), ev.READ)
self.s_io_idle = s_io_idle
self.s_io_read = s_io_read
return self
end end
local ctx = zmq.init(1) local ctx = zmq.init()
-- message handling function. -- message handling function.
local function handle_msg(worker, msg) local function handle_msg(worker, msg)
local msg_id = tonumber(msg) local msg_id = tonumber(msg)
if math.mod(msg_id, 10000) == 0 then print(worker.id, msg_id) end if math.mod(msg_id, 10000) == 0 then print(worker.id, msg_id) end
end end
local sub1 = sub_worker(loop, ctx, handle_msg) local sub1 = sub_worker(loop, ctx, handle_msg)

@ -0,0 +1,96 @@
-- Copyright (c) 2012 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
local poller = require"examples.poller"
local poll = poller.new()
local zmq = require"zmq"
local z_NOBLOCK = zmq.NOBLOCK
local z_EVENTS = zmq.EVENTS
local z_POLLIN = zmq.POLLIN
local z_POLLOUT = zmq.POLLOUT
local z_POLLIN_OUT = z_POLLIN + z_POLLOUT
local N=tonumber(arg[1] or 100)
local ctx = zmq.init()
local s = ctx:socket(zmq.SUB)
local s_FD = s:getopt(zmq.FD)
s:setopt(zmq.SUBSCRIBE, "")
s:connect("tcp://localhost:5555")
-- current socket state
local blocked_state
local blocked_event
local on_sock_recv
local on_sock_send
-- IO event callback when socket was blocked
local function on_sock_io()
local events = s:getopt(z_EVENTS)
local unblocked = false
if events == blocked_event then
-- got the event the socket was blocked on.
unblocked = true
elseif events == z_POLLIN_OUT then
-- got both in & out events
unblocked = true
end
if unblocked then
-- got the event we are blocked on resume.
blocked_event = nil
blocked_state()
-- check if blocked event was processed.
if not blocked_event then
poll:remove_read(s_FD)
end
end
end
local function sock_blocked(state, event)
if not blocked_event then
-- need to register socket's fd with event loop
poll:add_read(s_FD, on_sock_io)
end
blocked_state = state
blocked_event = event
end
-- sock state functions
function on_sock_recv()
local data, err = s:recv(z_NOBLOCK)
if not data then
assert(err == 'timeout', "Bad error on zmq socket.")
return sock_blocked(on_sock_recv, z_POLLIN)
end
local msg_id = tonumber(data)
if (msg_id % 10000) == 0 then print(data) end
return on_sock_recv()
end
-- start processing of the socket.
poll:add_work(on_sock_recv)
-- start event loop
poll:start()
s:close()
ctx:term()

@ -30,16 +30,31 @@ local roundtrip_count = tonumber(arg[3])
local zmq = require"zmq" local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.REP) local s = assert(ctx:socket(zmq.REP))
s:bind(bind_to) assert(s:bind(bind_to))
local msg = zmq.zmq_msg_t() local msg = zmq.zmq_msg_t()
local timer
for i = 1, roundtrip_count do for i = 1, roundtrip_count do
assert(s:recv_msg(msg)) assert(s:recv_msg(msg))
if not timer then
timer = zmq.stopwatch_start()
end
assert(msg:size() == message_size, "Invalid message size") assert(msg:size() == message_size, "Invalid message size")
assert(s:send_msg(msg)) assert(s:send_msg(msg))
end end
local elapsed = timer:stop()
s:close() s:close()
ctx:term() ctx:term()
local latency = elapsed / roundtrip_count / 2
print(string.format("mean latency: %.3f [us]", latency))
local secs = elapsed / (1000 * 1000)
print(string.format("elapsed = %f", secs))
print(string.format("msg/sec = %f", roundtrip_count / secs))

@ -30,9 +30,9 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq" local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.PULL) local s = assert(ctx:socket(zmq.PULL))
--s:setopt(zmq.SUBSCRIBE, ""); --s:setopt(zmq.SUBSCRIBE, "");
s:bind(bind_to) assert(s:bind(bind_to))
local function recv_msg(s,msg) local function recv_msg(s,msg)
assert(s:recv_msg(msg)) assert(s:recv_msg(msg))

@ -30,8 +30,8 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq" local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.PULL) local s = assert(ctx:socket(zmq.PULL))
s:bind(bind_to) assert(s:bind(bind_to))
print(string.format("message size: %i [B]", message_size)) print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count)) print(string.format("message count: %i", message_count))

@ -30,9 +30,8 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq" local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.SUB) local s = assert(ctx:socket(zmq.PULL))
s:setopt(zmq.SUBSCRIBE, ""); assert(s:bind(bind_to))
s:bind(bind_to)
print(string.format("message size: %i [B]", message_size)) print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count)) print(string.format("message count: %i", message_count))

@ -0,0 +1,77 @@
-- Copyright (c) 2010 Aleksey Yeschenko <aleksey@yeschenko.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
if not arg[3] then
print("usage: lua local_thr.lua <bind-to> <message-size> <message-count>")
os.exit()
end
local bind_to = arg[1]
local message_size = tonumber(arg[2])
local message_count = tonumber(arg[3])
local zmq = require"zmq"
local z_poller = require"zmq.poller"
local z_NOBLOCK = zmq.NOBLOCK
local poller = z_poller(64)
local ctx = zmq.init(1)
local s = assert(ctx:socket(zmq.SUB))
assert(s:setopt(zmq.SUBSCRIBE, ""))
assert(s:bind(bind_to))
print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count))
local msg
msg = zmq.zmq_msg_t()
local cnt = 0
poller:add(s, zmq.POLLIN, function(sock)
while s:recv_msg(msg, z_NOBLOCK) do
--assert(msg:size() == message_size, "Invalid message size")
cnt = cnt + 1
if cnt == message_count then
poller:stop()
end
end
end)
-- wait for first message
assert(s:recv_msg(msg))
cnt = 1
local timer = zmq.stopwatch_start()
poller:start()
local elapsed = timer:stop()
s:close()
ctx:term()
if elapsed == 0 then elapsed = 1 end
local throughput = message_count / (elapsed / 1000000)
local megabits = throughput * message_size * 8 / 1000000
print(string.format("mean throughput: %i [msg/s]", throughput))
print(string.format("mean throughput: %.3f [Mb/s]", megabits))

@ -30,8 +30,8 @@ local roundtrip_count = tonumber(arg[3])
local zmq = require"zmq" local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.REQ) local s = assert(ctx:socket(zmq.REQ))
s:connect(connect_to) assert(s:connect(connect_to))
local data = ("0"):rep(message_size) local data = ("0"):rep(message_size)
local msg = zmq.zmq_msg_t.init_size(message_size) local msg = zmq.zmq_msg_t.init_size(message_size)
@ -54,3 +54,7 @@ local latency = elapsed / roundtrip_count / 2
print(string.format("message size: %i [B]", message_size)) print(string.format("message size: %i [B]", message_size))
print(string.format("roundtrip count: %i", roundtrip_count)) print(string.format("roundtrip count: %i", roundtrip_count))
print(string.format("mean latency: %.3f [us]", latency)) print(string.format("mean latency: %.3f [us]", latency))
local secs = elapsed / (1000 * 1000)
print(string.format("Elapsed: %f secs", secs))
print(string.format("throughput %f msg/secs", roundtrip_count / secs))

@ -31,8 +31,8 @@ local zmq = require"zmq"
local z_SNDMORE = zmq.SNDMORE local z_SNDMORE = zmq.SNDMORE
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.PUSH) local s = assert(ctx:socket(zmq.PUSH))
s:connect(connect_to) assert(s:connect(connect_to))
local data = ("0"):rep(message_size/2) local data = ("0"):rep(message_size/2)
local msg = zmq.zmq_msg_t.init_size(message_size/2) local msg = zmq.zmq_msg_t.init_size(message_size/2)

@ -30,8 +30,8 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq" local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.PUSH) local s = assert(ctx:socket(zmq.PUSH))
s:connect(connect_to) assert(s:connect(connect_to))
local data = ("0"):rep(message_size) local data = ("0"):rep(message_size)
local msg = zmq.zmq_msg_t.init_size(message_size) local msg = zmq.zmq_msg_t.init_size(message_size)

@ -30,8 +30,8 @@ local message_count = tonumber(arg[3])
local zmq = require"zmq" local zmq = require"zmq"
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.PUB) local s = assert(ctx:socket(zmq.PUSH))
s:connect(connect_to) assert(s:connect(connect_to))
zmq.sleep(1) zmq.sleep(1)

@ -52,8 +52,8 @@ local child_code = [[
]] ]]
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.REQ) local s = assert(ctx:socket(zmq.REQ))
s:bind(bind_to) assert(s:bind(bind_to))
local child_thread = zthreads.runstring(ctx, child_code, connect_to, message_size, roundtrip_count) local child_thread = zthreads.runstring(ctx, child_code, connect_to, message_size, roundtrip_count)
child_thread:start() child_thread:start()
@ -83,4 +83,7 @@ ctx:term()
local latency = elapsed / roundtrip_count / 2 local latency = elapsed / roundtrip_count / 2
print(string.format("mean latency: %.3f [us]", latency)) print(string.format("mean latency: %.3f [us]", latency))
local secs = elapsed / (1000 * 1000)
print(string.format("elapsed = %f", secs))
print(string.format("msg/sec = %f", roundtrip_count / secs))

@ -37,17 +37,18 @@ local child_code = [[
local zthreads = require"zmq.threads" local zthreads = require"zmq.threads"
local ctx = zthreads.get_parent_ctx() local ctx = zthreads.get_parent_ctx()
local s = ctx:socket(zmq.PUSH) local s = assert(ctx:socket(zmq.PUSH))
s:setopt(zmq.HWM, message_count/4) assert(s:setopt(zmq.HWM, message_count/4))
s:connect(connect_to) assert(s:connect(connect_to))
local data = ("0"):rep(message_size) local data = ("0"):rep(message_size)
local msg = zmq.zmq_msg_t.init_size(message_size) local msg_data = zmq.zmq_msg_t.init_data(data)
local msg = zmq.zmq_msg_t.init()
local timer = zmq.stopwatch_start() local timer = zmq.stopwatch_start()
for i = 1, message_count do for i = 1, message_count do
msg:set_data(data) msg:copy(msg_data)
assert(s:send_msg(msg)) assert(s:send_msg(msg))
end end
@ -67,8 +68,8 @@ local child_code = [[
]] ]]
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.PULL) local s = assert(ctx:socket(zmq.PULL))
s:bind(bind_to) assert(s:bind(bind_to))
print(string.format("message size: %i [B]", message_size)) print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count)) print(string.format("message count: %i", message_count))

@ -24,8 +24,8 @@ end
local message_size = tonumber(arg[1] or 1) local message_size = tonumber(arg[1] or 1)
local message_count = tonumber(arg[2] or 100000) local message_count = tonumber(arg[2] or 100000)
local bind_to = arg[3] or 'inproc://thread_lat_test' local bind_to = arg[3] or 'inproc://thread_thr_test'
local connect_to = arg[4] or 'inproc://thread_lat_test' local connect_to = arg[4] or 'inproc://thread_thr_test'
local zmq = require"zmq" local zmq = require"zmq"
local zthreads = require"zmq.threads" local zthreads = require"zmq.threads"
@ -37,8 +37,8 @@ local child_code = [[
local zthreads = require"zmq.threads" local zthreads = require"zmq.threads"
local ctx = zthreads.get_parent_ctx() local ctx = zthreads.get_parent_ctx()
local s = ctx:socket(zmq.PUB) local s = assert(ctx:socket(zmq.PUSH))
s:connect(connect_to) assert(s:connect(connect_to))
local data = ("0"):rep(message_size) local data = ("0"):rep(message_size)
local msg_data = zmq.zmq_msg_t.init_data(data) local msg_data = zmq.zmq_msg_t.init_data(data)
@ -67,9 +67,8 @@ local child_code = [[
]] ]]
local ctx = zmq.init(1) local ctx = zmq.init(1)
local s = ctx:socket(zmq.SUB) local s = assert(ctx:socket(zmq.PULL))
s:setopt(zmq.SUBSCRIBE, ""); assert(s:bind(bind_to))
s:bind(bind_to)
print(string.format("message size: %i [B]", message_size)) print(string.format("message size: %i [B]", message_size))
print(string.format("message count: %i", message_count)) print(string.format("message count: %i", message_count))

@ -0,0 +1,36 @@
package = "lua-zmq"
version = "1.2-1"
source = {
url = "git://github.com/Neopallium/lua-zmq.git",
branch = "v1.2",
}
description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",
homepage = "http://github.com/Neopallium/lua-zmq",
license = "MIT/X11",
}
dependencies = {
"lua >= 5.1, < 5.5",
}
external_dependencies = {
ZEROMQ = {
header = "zmq.h",
library = "zmq",
}
}
build = {
type = "builtin",
modules = {
zmq = {
sources = {"src/pre_generated-zmq.nobj.c"},
incdirs = "$(ZEROMQ_INCDIR)",
libdirs = "$(ZEROMQ_LIBDIR)",
libraries = {"zmq"},
},
},
install = {
lua = {
['zmq.poller'] = "src/poller.lua",
}
}
}

@ -1,7 +1,7 @@
package = "lua-zmq" package = "lua-zmq"
version = "scm-1" version = "scm-1"
source = { source = {
url = "git://github.com/Neopallium/lua-zmq.git", url = "git+https://brejela.club/gitea/brejela/lua-zmq.git",
} }
description = { description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.", summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",
@ -9,15 +9,31 @@ description = {
license = "MIT/X11", license = "MIT/X11",
} }
dependencies = { dependencies = {
"lua >= 5.1", "lua >= 5.1, < 5.5",
} }
external_dependencies = { external_dependencies = {
platforms = {
windows = {
ZEROMQ = {
library = "libzmq",
}
},
},
ZEROMQ = { ZEROMQ = {
header = "zmq.h", header = "zmq.h",
library = "zmq", library = "zmq",
} }
} }
build = { build = {
platforms = {
windows = {
modules = {
zmq = {
libraries = {"libzmq"},
}
}
},
},
type = "builtin", type = "builtin",
modules = { modules = {
zmq = { zmq = {

@ -0,0 +1,51 @@
package = "lua-zmq"
version = "scm-2"
source = {
url = "git://github.com/Neopallium/lua-zmq.git",
}
description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",
homepage = "http://github.com/Neopallium/lua-zmq",
license = "MIT/X11",
}
dependencies = {
"lua >= 5.1, < 5.5",
}
external_dependencies = {
platforms = {
windows = {
ZEROMQ = {
library = "libzmq",
}
},
},
ZEROMQ = {
header = "zmq.h",
library = "zmq",
}
}
build = {
platforms = {
windows = {
modules = {
zmq = {
libraries = {"libzmq"},
}
}
},
},
type = "builtin",
modules = {
zmq = {
sources = {"src/pre_generated-zmq.nobj.c"},
incdirs = "$(ZEROMQ_INCDIR)",
libdirs = "$(ZEROMQ_LIBDIR)",
libraries = {"zmq"},
},
},
install = {
lua = {
['zmq.poller'] = "src/poller.lua",
},
},
}

@ -0,0 +1,23 @@
package = "lua-zmq-threads"
version = "1.2-1"
source = {
url = "git://github.com/Neopallium/lua-zmq.git",
branch = "v1.2",
}
description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",
homepage = "http://github.com/Neopallium/lua-zmq",
license = "MIT/X11"
}
dependencies = {
"lua-zmq >= 1.2-1",
"lua-llthreads >= 1.3-1",
}
build = {
type = "none",
install = {
lua = {
['zmq.threads'] = "src/threads.lua",
}
}
}

@ -1,7 +1,7 @@
package = "lua-zmq-threads" package = "lua-zmq-threads"
version = "scm-0" version = "scm-0"
source = { source = {
url = "git://github.com/Neopallium/lua-zmq.git", url = "git+https://brejela.club/gitea/brejela/lua-zmq.git",
} }
description = { description = {
summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.", summary = "Lua bindings to zeromq2, with LuaJIT2 FFI support.",

@ -36,5 +36,13 @@ typedef struct ZMQ_Ctx ZMQ_Ctx;
method "socket" { method "socket" {
c_method_call "!ZMQ_Socket *" "zmq_socket" { "int", "type"} c_method_call "!ZMQ_Socket *" "zmq_socket" { "int", "type"}
}, },
method "set" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "int" "zmq_ctx_set" { "int", "flag", "int", "value" }
},
method "get" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "int" "zmq_ctx_get" { "int", "flag" }
},
} }

@ -283,12 +283,15 @@ error_code "ZMQ_Error" "int" {
ffi_is_error_check = function(rec) return "(-1 == ${" .. rec.name .. "})" end, ffi_is_error_check = function(rec) return "(-1 == ${" .. rec.name .. "})" end,
default = "0", default = "0",
c_source [[ c_source [[
int num;
if(-1 == err) { if(-1 == err) {
/* get ZErrors table. */ /* get ZErrors table. */
lua_pushlightuserdata(L, zmq_ZErrors_key); lua_pushlightuserdata(L, zmq_ZErrors_key);
lua_rawget(L, LUA_REGISTRYINDEX); lua_rawget(L, LUA_REGISTRYINDEX);
/* convert zmq_errno to string. */ /* convert zmq_errno to string. */
lua_rawgeti(L, -1, zmq_errno()); num = zmq_errno();
lua_pushinteger(L, num);
lua_gettable(L, -2);
/* remove ZErrors table. */ /* remove ZErrors table. */
lua_remove(L, -2); lua_remove(L, -2);
if(!lua_isnil(L, -1)) { if(!lua_isnil(L, -1)) {
@ -297,7 +300,8 @@ error_code "ZMQ_Error" "int" {
} }
/* Unknown error. */ /* Unknown error. */
lua_pop(L, 1); lua_pop(L, 1);
err_str = "UNKNOWN ERROR"; lua_pushfstring(L, "UNKNOWN ERROR(%d)", num);
return;
} }
]], ]],
ffi_source [[ ffi_source [[

@ -21,6 +21,22 @@
object "zmq_msg_t" { object "zmq_msg_t" {
-- store the `zmq_msg_t` structure in Lua userdata object -- store the `zmq_msg_t` structure in Lua userdata object
userdata_type = "embed", userdata_type = "embed",
implements "Buffer" {
implement_method "const_data" {
c_function = "zmq_msg_data"
},
implement_method "get_size" {
c_function = "zmq_msg_size"
},
},
implements "MutableBuffer" {
implement_method "data" {
c_function = "zmq_msg_data"
},
implement_method "get_size" {
c_function = "zmq_msg_size"
},
},
-- --
-- Define zmq_msq_t type & function API for FFI -- Define zmq_msq_t type & function API for FFI
-- --
@ -28,10 +44,7 @@ object "zmq_msg_t" {
struct zmq_msg_t struct zmq_msg_t
{ {
void *content; unsigned char _ [64];
unsigned char flags;
unsigned char vsm_size;
unsigned char vsm_data [30]; /* that '30' is from 'MAX_VSM_SIZE' */
}; };
int zmq_msg_init (zmq_msg_t *msg); int zmq_msg_init (zmq_msg_t *msg);
@ -132,7 +145,7 @@ int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
]], ]],
}, },
method "size" { method "size" {
c_method_call "size_t" "zmq_msg_size" {} c_method_call { "size_t", "size", ffi_wrap = "tonumber"} "zmq_msg_size" {}
}, },
method "__tostring" { method "__tostring" {
var_out{ "const char *", "data", has_length = true }, var_out{ "const char *", "data", has_length = true },

@ -92,5 +92,6 @@ function M.new(pre_alloc)
}, poller_mt) }, poller_mt)
end end
zmq.poller = M
return setmetatable(M, {__call = function(tab, ...) return M.new(...) end}) return setmetatable(M, {__call = function(tab, ...) return M.new(...) end})

@ -272,10 +272,10 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
constructor "new" { constructor "new" {
var_in{ "unsigned int", "length", is_optional = true, default = 10 }, var_in{ "unsigned int", "length", is_optional = true, default = 10 },
c_method_call "void" "poller_init" { "unsigned int", "length" }, c_export_method_call "void" "poller_init" { "unsigned int", "length" },
}, },
destructor "close" { destructor "close" {
c_method_call "void" "poller_cleanup" {}, c_export_method_call "void" "poller_cleanup" {},
}, },
method "add" { method "add" {
var_in{ "<any>", "sock" }, var_in{ "<any>", "sock" },
@ -303,6 +303,7 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
ffi_source[[ ffi_source[[
local fd = 0 local fd = 0
local sock_type = type(${sock}) local sock_type = type(${sock})
local sock
if sock_type == 'cdata' then if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock}) sock = obj_type_ZMQ_Socket_check(${sock})
elseif sock_type == 'number' then elseif sock_type == 'number' then
@ -310,7 +311,7 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
else else
error("expected number or ZMQ_Socket") error("expected number or ZMQ_Socket")
end end
${idx} = C.poller_get_free_item(${this}) ${idx} = Cmod.poller_get_free_item(${this})
local item = ${this}.items[${idx}] local item = ${this}.items[${idx}]
item.socket = sock item.socket = sock
item.fd = fd item.fd = fd
@ -355,14 +356,15 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
ffi_source[[ ffi_source[[
local fd = 0 local fd = 0
local sock_type = type(${sock}) local sock_type = type(${sock})
local sock
if sock_type == 'cdata' then if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock}) sock = obj_type_ZMQ_Socket_check(${sock})
-- find sock in items list. -- find sock in items list.
${idx} = C.poller_find_sock_item(${this}, sock) ${idx} = Cmod.poller_find_sock_item(${this}, sock)
elseif sock_type == 'number' then elseif sock_type == 'number' then
fd = ${sock} fd = ${sock}
-- find fd in items list. -- find fd in items list.
${idx} = C.poller_find_fd_item(${this}, fd); ${idx} = Cmod.poller_find_fd_item(${this}, fd);
else else
error("expected number or ZMQ_Socket") error("expected number or ZMQ_Socket")
end end
@ -372,7 +374,7 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
item.fd = fd item.fd = fd
item.events = ${events} item.events = ${events}
else else
C.poller_remove_item(${this}, ${idx}) Cmod.poller_remove_item(${this}, ${idx})
end end
]], ]],
}, },
@ -404,29 +406,28 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
ffi_source[[ ffi_source[[
local fd = 0 local fd = 0
local sock_type = type(${sock}) local sock_type = type(${sock})
local sock
if sock_type == 'cdata' then if sock_type == 'cdata' then
sock = obj_type_ZMQ_Socket_check(${sock}) sock = obj_type_ZMQ_Socket_check(${sock})
-- find sock in items list. -- find sock in items list.
${idx} = C.poller_find_sock_item(${this}, sock) ${idx} = Cmod.poller_find_sock_item(${this}, sock)
elseif sock_type == 'number' then elseif sock_type == 'number' then
fd = ${sock} fd = ${sock}
-- find fd in items list. -- find fd in items list.
${idx} = C.poller_find_fd_item(${this}, fd); ${idx} = Cmod.poller_find_fd_item(${this}, fd);
else else
error("expected number or ZMQ_Socket") error("expected number or ZMQ_Socket")
end end
if ${idx} >= 0 then if ${idx} >= 0 then
C.poller_remove_item(${this}, ${idx}) Cmod.poller_remove_item(${this}, ${idx})
end end
]], ]],
}, },
method "poll" { method "poll" {
var_in{ "long", "timeout" },
var_out{ "int", "count" }, var_out{ "int", "count" },
var_out{ "ZMQ_Error", "err" }, -- poll for events
c_export_method_call { "ZMQ_Error", "err>2" } "poller_poll" { "long", "timeout" },
c_source[[ c_source[[
/* poll for events */
${err} = poller_poll(${this}, ${timeout});
if(${err} > 0) { if(${err} > 0) {
${this}->next = 0; ${this}->next = 0;
${count} = ${err}; ${count} = ${err};
@ -436,8 +437,6 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
} }
]], ]],
ffi_source[[ ffi_source[[
-- poll for events
${err} = C.poller_poll(${this}, ${timeout})
if(${err} > 0) then if(${err} > 0) then
${this}.next = 0 ${this}.next = 0
${count} = ${err} ${count} = ${err}
@ -448,7 +447,7 @@ void poller_remove_item(ZMQ_Poller *poller, int idx);
]], ]],
}, },
method "next_revents_idx" { method "next_revents_idx" {
c_method_call { "int", "idx>1" } "poller_next_revents" { "int", "&revents>2" }, c_export_method_call { "int", "idx>1" } "poller_next_revents" { "int", "&revents>2" },
}, },
method "count" { method "count" {
var_out{ "int", "count" }, var_out{ "int", "count" },

File diff suppressed because it is too large Load Diff

@ -65,6 +65,15 @@ local socket_options = {
[20] = { name="recovery_ivl_msec", otype="INT64", mode="rw", ltype="int64_t" }, [20] = { name="recovery_ivl_msec", otype="INT64", mode="rw", ltype="int64_t" },
[21] = { name="reconnect_ivl_max", otype="INT", mode="rw", ltype="int" }, [21] = { name="reconnect_ivl_max", otype="INT", mode="rw", ltype="int" },
}, },
{ ver_def = 'VERSION_2_2', major = 2, minor = 2,
[22] = { },
[23] = { },
[24] = { },
[25] = { },
[26] = { },
[27] = { name="rcvtimeo", otype="INT", mode="rw", ltype="int" },
[28] = { name="sndtimeo", otype="INT", mode="rw", ltype="int" },
},
{ ver_def = 'VERSION_3_0', major = 3, minor = 0, { ver_def = 'VERSION_3_0', major = 3, minor = 0,
[1] = { name="hwm", otype="INT", mode="rw", [1] = { name="hwm", otype="INT", mode="rw",
custom = [[ custom = [[
@ -119,8 +128,87 @@ ZMQ_Error lzmq_socket_hwm(ZMQ_Socket *sock, int *value) {
[30] = { }, [30] = { },
[31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" }, [31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" },
}, },
{ ver_def = 'VERSION_4_0', major = 4, minor = 0,
[1] = { name="hwm", otype="INT", mode="rw",
custom = [[
ZMQ_Error lzmq_socket_set_hwm(ZMQ_Socket *sock, int value) {
int val;
int rc;
val = (int)value;
rc = zmq_setsockopt(sock, ZMQ_SNDHWM, &value, sizeof(value));
if(-1 == rc) return rc;
val = (int)value;
return zmq_setsockopt(sock, ZMQ_RCVHWM, &value, sizeof(value));
}
ZMQ_Error lzmq_socket_hwm(ZMQ_Socket *sock, int *value) {
size_t val_len;
int rc;
val_len = sizeof(value);
rc = zmq_getsockopt(sock, ZMQ_SNDHWM, value, &val_len);
if(-1 == rc) return rc;
val_len = sizeof(value);
return zmq_getsockopt(sock, ZMQ_RCVHWM, value, &val_len);
} }
local max_options = 50
]] },
[2] = { },
[3] = { },
[4] = { name="affinity", otype="UINT64", mode="rw", ltype="uint64_t" },
[5] = { name="identity", otype="BLOB", mode="rw", ltype="const char *" },
[6] = { name="subscribe", otype="BLOB", mode="w", ltype="const char *" },
[7] = { name="unsubscribe", otype="BLOB", mode="w", ltype="const char *" },
[8] = { name="rate", otype="INT", mode="rw", ltype="int" },
[9] = { name="recovery_ivl", otype="INT", mode="rw", ltype="int" },
[10] = { },
[11] = { name="sndbuf", otype="INT", mode="rw", ltype="int" },
[12] = { name="rcvbuf", otype="INT", mode="rw", ltype="int" },
[13] = { name="rcvmore", otype="INT", mode="r", ltype="int" },
[14] = { name="fd", otype="FD", mode="r", ltype="int" },
[15] = { name="events", otype="INT", mode="r", ltype="int" },
[16] = { name="type", otype="INT", mode="r", ltype="int" },
[17] = { name="linger", otype="INT", mode="rw", ltype="int" },
[18] = { name="reconnect_ivl", otype="INT", mode="rw", ltype="int" },
[19] = { name="backlog", otype="INT", mode="rw", ltype="int" },
[20] = { },
[21] = { name="reconnect_ivl_max", otype="INT", mode="rw", ltype="int" },
[22] = { name="maxmsgsize", otype="INT64", mode="rw", ltype="int64_t" },
[23] = { name="sndhwm", otype="INT", mode="rw", ltype="int" },
[24] = { name="rcvhwm", otype="INT", mode="rw", ltype="int" },
[25] = { name="multicast_hops", otype="INT", mode="rw", ltype="int" },
[26] = { },
[27] = { name="rcvtimeo", otype="INT", mode="rw", ltype="int" },
[28] = { name="sndtimeo", otype="INT", mode="rw", ltype="int" },
[29] = { },
[30] = { },
[31] = { name="ipv4only", otype="INT", mode="rw", ltype="int" },
-- New to version 4.x
[32] = { name="last_endpoint", otype="BLOB", mode="r", ltype="const char *" },
[33] = { name="router_mandatory", otype="INT", mode="w", ltype="int" },
[34] = { name="tcp_keepalive", otype="INT", mode="rw", ltype="int" },
[35] = { name="tcp_keepalive_cnt", otype="INT", mode="rw", ltype="int" },
[36] = { name="tcp_keepalive_idle",otype="INT", mode="rw", ltype="int" },
[37] = { name="tcp_keepalive_intvl",otype="INT", mode="rw", ltype="int" },
[38] = { name="tcp_accept_filter", otype="BLOB", mode="w", ltype="const char *" },
[39] = { name="immediate", otype="INT", mode="rw", ltype="int" },
[40] = { name="xpub_verbose", otype="INT", mode="w", ltype="int" },
[41] = { name="router_raw", otype="INT", mode="w", ltype="int" },
[42] = { name="ipv6", otype="INT", mode="rw", ltype="int" },
[43] = { name="mechanism", otype="INT", mode="r", ltype="int" },
[44] = { name="plain_server", otype="INT", mode="rw", ltype="int" },
[45] = { name="plain_username", otype="BLOB", mode="rw", ltype="const char *" },
[46] = { name="plain_password", otype="BLOB", mode="rw", ltype="const char *" },
[47] = { name="curve_server", otype="INT", mode="rw", ltype="int" },
[48] = { name="curve_publickey", otype="BLOB", mode="rw", ltype="const char *" },
[49] = { name="curve_secretkey", otype="BLOB", mode="rw", ltype="const char *" },
[50] = { name="curve_serverkey", otype="BLOB", mode="rw", ltype="const char *" },
[51] = { name="probe_router", otype="INT", mode="w", ltype="int" },
[52] = { name="req_correlate", otype="INT", mode="w", ltype="int" },
[53] = { name="req_relaxed", otype="INT", mode="w", ltype="int" },
[54] = { name="conflate", otype="INT", mode="rw", ltype="int" },
[55] = { name="zap_domain", otype="BLOB", mode="rw", ltype="const char *" },
},
}
local max_options = 60 -- this number must be larger then the highest option value.
local function foreach_opt(func) local function foreach_opt(func)
for i=1,#socket_options do for i=1,#socket_options do
@ -248,17 +336,17 @@ foreach_opt(function(num, opt, ver)
if opt.c_set then if opt.c_set then
if opt.otype == 'BLOB' then if opt.otype == 'BLOB' then
set = [[ set = [[
ZMQ_Error ${c_set}(ZMQ_Socket *sock, const char *value, size_t str_len) { LUA_NOBJ_API ZMQ_Error ${c_set}(ZMQ_Socket *sock, const char *value, size_t str_len) {
return zmq_setsockopt(sock, ${DEF}, value, str_len); return zmq_setsockopt(sock, ${DEF}, value, str_len);
]] ]]
elseif opt.ctype == opt.ltype then elseif opt.ctype == opt.ltype then
set = [[ set = [[
ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) { LUA_NOBJ_API ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) {
return zmq_setsockopt(sock, ${DEF}, &value, sizeof(value)); return zmq_setsockopt(sock, ${DEF}, &value, sizeof(value));
]] ]]
else else
set = [[ set = [[
ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) { LUA_NOBJ_API ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) {
${ctype} val = (${ctype})value; ${ctype} val = (${ctype})value;
return zmq_setsockopt(sock, ${DEF}, &val, sizeof(val)); return zmq_setsockopt(sock, ${DEF}, &val, sizeof(val));
]] ]]
@ -269,18 +357,18 @@ ZMQ_Error ${c_set}(ZMQ_Socket *sock, ${ltype} value) {
if opt.c_get then if opt.c_get then
if opt.otype == 'BLOB' then if opt.otype == 'BLOB' then
get = [[ get = [[
ZMQ_Error ${c_get}(ZMQ_Socket *sock, char *value, size_t *len) { LUA_NOBJ_API ZMQ_Error ${c_get}(ZMQ_Socket *sock, char *value, size_t *len) {
return zmq_getsockopt(sock, ${DEF}, value, len); return zmq_getsockopt(sock, ${DEF}, value, len);
]] ]]
elseif opt.ctype == opt.ltype then elseif opt.ctype == opt.ltype then
get = [[ get = [[
ZMQ_Error ${c_get}(ZMQ_Socket *sock, ${ltype} *value) { LUA_NOBJ_API ZMQ_Error ${c_get}(ZMQ_Socket *sock, ${ltype} *value) {
size_t val_len = sizeof(${ltype}); size_t val_len = sizeof(${ltype});
return zmq_getsockopt(sock, ${DEF}, value, &val_len); return zmq_getsockopt(sock, ${DEF}, value, &val_len);
]] ]]
else else
get = [[ get = [[
ZMQ_Error ${c_get}(ZMQ_Socket *sock, ${ltype} *value) { LUA_NOBJ_API ZMQ_Error ${c_get}(ZMQ_Socket *sock, ${ltype} *value) {
${ctype} val; ${ctype} val;
size_t val_len = sizeof(val); size_t val_len = sizeof(val);
int rc = zmq_getsockopt(sock, ${DEF}, &val, &val_len); int rc = zmq_getsockopt(sock, ${DEF}, &val, &val_len);
@ -301,10 +389,14 @@ end)
endif(last_ver.ver_def) endif(last_ver.ver_def)
add(opt_types, [[ add(opt_types, [[
#if VERSION_3_0 #if VERSION_4_0
# define MAX_OPTS VERSION_4_0_MAX_OPT
#elif VERSION_3_0
# define MAX_OPTS VERSION_3_0_MAX_OPT # define MAX_OPTS VERSION_3_0_MAX_OPT
#else #else
# if VERSION_2_1 # if VERSION_2_2
# define MAX_OPTS VERSION_2_2_MAX_OPT
# elif VERSION_2_1
# define MAX_OPTS VERSION_2_1_MAX_OPT # define MAX_OPTS VERSION_2_1_MAX_OPT
# else # else
# define MAX_OPTS VERSION_2_0_MAX_OPT # define MAX_OPTS VERSION_2_0_MAX_OPT
@ -355,7 +447,7 @@ local function build_option_methods()
end end
m[#m+1] = method (name) { if_defs = if_defs, m[#m+1] = method (name) { if_defs = if_defs,
var_out(val_out), var_out(val_out),
c_method_call "ZMQ_Error" (meth.c_get) (args), c_export_method_call "ZMQ_Error" (meth.c_get) (args),
} }
end end
-- generate setter method. -- generate setter method.
@ -366,7 +458,7 @@ local function build_option_methods()
args = { ltype, "value", "size_t", "#value" } args = { ltype, "value", "size_t", "#value" }
end end
m[#m+1] = method (name) { if_defs = if_defs, m[#m+1] = method (name) { if_defs = if_defs,
c_method_call "ZMQ_Error" (meth.c_set) (args), c_export_method_call "ZMQ_Error" (meth.c_set) (args),
} }
end end
end end
@ -383,18 +475,38 @@ end
object "ZMQ_Socket" { object "ZMQ_Socket" {
error_on_null = "get_zmq_strerror()", error_on_null = "get_zmq_strerror()",
ffi_source [[ ffi_source "ffi_pre_cdef" [[
-- detect zmq version -- detect zmq version
local VERSION_2_0 = true local VERSION_2_0 = true
local VERSION_2_1 = false local VERSION_2_1 = false
local VERSION_2_2 = false
local VERSION_3_0 = false local VERSION_3_0 = false
local VERSION_4_0 = false
local zver = _M.version() local zver = _M.version()
if zver[1] == 3 then if zver[1] == 4 then
VERSION_2_0 = false
VERSION_4_0 = true
elseif zver[1] == 3 then
VERSION_2_0 = false VERSION_2_0 = false
VERSION_3_0 = true VERSION_3_0 = true
elseif zver[1] == 2 and zver[2] == 2 then
VERSION_2_2 = true
VERSION_2_1 = true
elseif zver[1] == 2 and zver[2] == 1 then elseif zver[1] == 2 and zver[2] == 1 then
VERSION_2_1 = true VERSION_2_1 = true
end end
if VERSION_2_0 then
ffi.cdef[==[
typedef int ZMQ_Error;
typedef struct ZMQ_Socket ZMQ_Socket;
typedef struct zmq_msg_t zmq_msg_t;
ZMQ_Error zmq_sendmsg(ZMQ_Socket *sock, zmq_msg_t *msg, int flags) __asm__("zmq_send");
ZMQ_Error zmq_recvmsg(ZMQ_Socket *sock, zmq_msg_t *msg, int flags) __asm__("zmq_recv");
]==]
end
]], ]],
c_source ([[ c_source ([[
@ -436,9 +548,17 @@ static const int opt_types[] = {
method "bind" { method "bind" {
c_method_call "ZMQ_Error" "zmq_bind" { "const char *", "addr" } c_method_call "ZMQ_Error" "zmq_bind" { "const char *", "addr" }
}, },
method "unbind" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_unbind" { "const char *", "addr" }
},
method "connect" { method "connect" {
c_method_call "ZMQ_Error" "zmq_connect" { "const char *", "addr" } c_method_call "ZMQ_Error" "zmq_connect" { "const char *", "addr" }
}, },
method "disconnect" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_disconnect" { "const char *", "addr" }
},
ffi_cdef[[ ffi_cdef[[
int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen); int zmq_setsockopt (void *s, int option, const void *optval, size_t optvallen);
int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen); int zmq_getsockopt (void *s, int option, void *optval, size_t *optvallen);
@ -460,7 +580,7 @@ do
setmetatable(option_sets,{__index = function(tab,opt) setmetatable(option_sets,{__index = function(tab,opt)
local opt_name = opt_name[opt] local opt_name = opt_name[opt]
if not opt_name then return nil end if not opt_name then return nil end
local method = methods[opt_name] or methods['set_' .. opt_name] local method = methods['set_' .. opt_name] or methods[opt_name]
rawset(tab, opt, method) rawset(tab, opt, method)
return method return method
end}) end})
@ -477,7 +597,7 @@ end
size_t val_len; size_t val_len;
const void *val; const void *val;
#if VERSION_2_1 #if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
socket_t fd_val; socket_t fd_val;
#endif #endif
int int_val; int int_val;
@ -485,10 +605,10 @@ end
uint64_t uint64_val; uint64_t uint64_val;
int64_t int64_val; int64_t int64_val;
#if VERSION_3_0 #if VERSION_3_0 || VERSION_4_0
/* 3.0 backwards compatibility support for HWM. */ /* 3.0 backwards compatibility support for HWM. */
if(${opt} == ZMQ_HWM) { if(${opt} == ZMQ_HWM) {
int_val = luaL_checklong(L, ${val::idx}); int_val = luaL_checkinteger(L, ${val::idx});
val = &int_val; val = &int_val;
val_len = sizeof(int_val); val_len = sizeof(int_val);
${err} = zmq_setsockopt(${this}, ZMQ_SNDHWM, val, val_len); ${err} = zmq_setsockopt(${this}, ZMQ_SNDHWM, val, val_len);
@ -504,30 +624,30 @@ end
} }
switch(opt_types[${opt}]) { switch(opt_types[${opt}]) {
#if VERSION_2_1 #if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
case OPT_TYPE_FD: case OPT_TYPE_FD:
fd_val = luaL_checklong(L, ${val::idx}); fd_val = luaL_checkinteger(L, ${val::idx});
val = &fd_val; val = &fd_val;
val_len = sizeof(fd_val); val_len = sizeof(fd_val);
break; break;
#endif #endif
case OPT_TYPE_INT: case OPT_TYPE_INT:
int_val = luaL_checklong(L, ${val::idx}); int_val = luaL_checkinteger(L, ${val::idx});
val = &int_val; val = &int_val;
val_len = sizeof(int_val); val_len = sizeof(int_val);
break; break;
case OPT_TYPE_UINT32: case OPT_TYPE_UINT32:
uint32_val = luaL_checklong(L, ${val::idx}); uint32_val = luaL_checkinteger(L, ${val::idx});
val = &uint32_val; val = &uint32_val;
val_len = sizeof(uint32_val); val_len = sizeof(uint32_val);
break; break;
case OPT_TYPE_UINT64: case OPT_TYPE_UINT64:
uint64_val = luaL_checklong(L, ${val::idx}); uint64_val = luaL_checkinteger(L, ${val::idx});
val = &uint64_val; val = &uint64_val;
val_len = sizeof(uint64_val); val_len = sizeof(uint64_val);
break; break;
case OPT_TYPE_INT64: case OPT_TYPE_INT64:
int64_val = luaL_checklong(L, ${val::idx}); int64_val = luaL_checkinteger(L, ${val::idx});
val = &int64_val; val = &int64_val;
val_len = sizeof(int64_val); val_len = sizeof(int64_val);
break; break;
@ -561,7 +681,7 @@ local tmp_val_len = ffi.new('size_t[1]', 4)
c_source[[ c_source[[
size_t val_len; size_t val_len;
#if VERSION_2_1 #if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
socket_t fd_val; socket_t fd_val;
#endif #endif
int int_val; int int_val;
@ -578,7 +698,7 @@ local tmp_val_len = ffi.new('size_t[1]', 4)
} }
switch(opt_types[${opt}]) { switch(opt_types[${opt}]) {
#if VERSION_2_1 #if VERSION_2_1 || VERSION_3_0 || VERSION_4_0
case OPT_TYPE_FD: case OPT_TYPE_FD:
val_len = sizeof(fd_val); val_len = sizeof(fd_val);
${err} = zmq_getsockopt(${this}, ${opt}, &fd_val, &val_len); ${err} = zmq_getsockopt(${this}, ${opt}, &fd_val, &val_len);
@ -649,11 +769,14 @@ local tmp_val_len = ffi.new('size_t[1]', 4)
-- zmq_send -- zmq_send
-- --
method "send_msg" { method "send_msg" {
c_export_method_call "ZMQ_Error" "zmq_sendmsg" { "zmq_msg_t *", "msg", "int", "flags?" }, c_method_call "ZMQ_Error" "zmq_sendmsg" { "zmq_msg_t *", "msg", "int", "flags?" },
}, },
-- create helper function for `zmq_send` -- create helper function for `zmq_send`
c_source[[ c_source[[
ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_t data_len, int flags) { LUA_NOBJ_API ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_t data_len, int flags) {
#if VERSION_3_2
return zmq_send(sock, data, data_len, flags);
#else
ZMQ_Error err; ZMQ_Error err;
zmq_msg_t msg; zmq_msg_t msg;
/* initialize message */ /* initialize message */
@ -667,17 +790,18 @@ ZMQ_Error simple_zmq_send(ZMQ_Socket *sock, const char *data, size_t data_len, i
zmq_msg_close(&msg); zmq_msg_close(&msg);
} }
return err; return err;
#endif
} }
]], ]],
method "send" { method "send" {
c_method_call "ZMQ_Error" "simple_zmq_send" c_export_method_call "ZMQ_Error" "simple_zmq_send"
{ "const char *", "data", "size_t", "#data", "int", "flags?"} { "const char *", "data", "size_t", "#data", "int", "flags?"}
}, },
-- --
-- zmq_recv -- zmq_recv
-- --
method "recv_msg" { method "recv_msg" {
c_export_method_call "ZMQ_Error" "zmq_recvmsg" { "zmq_msg_t *", "msg", "int", "flags?" }, c_method_call "ZMQ_Error" "zmq_recvmsg" { "zmq_msg_t *", "msg", "int", "flags?" },
}, },
ffi_source[[ ffi_source[[
local tmp_msg = ffi.new('zmq_msg_t') local tmp_msg = ffi.new('zmq_msg_t')
@ -693,7 +817,7 @@ local tmp_msg = ffi.new('zmq_msg_t')
if(0 == ${err}) { if(0 == ${err}) {
/* receive message */ /* receive message */
${err} = zmq_recvmsg(${this}, &msg, ${flags}); ${err} = zmq_recvmsg(${this}, &msg, ${flags});
if(0 == ${err}) { if(${err} >= 0) {
${data} = zmq_msg_data(&msg); ${data} = zmq_msg_data(&msg);
${data_len} = zmq_msg_size(&msg); ${data_len} = zmq_msg_size(&msg);
} }
@ -711,17 +835,215 @@ local tmp_msg = ffi.new('zmq_msg_t')
end end
-- receive message -- receive message
${err} = zmq_recvmsg(${this}, msg, ${flags}) ${err} = C.zmq_recvmsg(${this}, msg, ${flags})
if 0 == ${err} then if ${err} >= 0 then
local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg)) local data = ffi.string(C.zmq_msg_data(msg), C.zmq_msg_size(msg))
-- close message -- close message
C.zmq_msg_close(msg) C.zmq_msg_close(msg)
return data return data
end end
-- close message
C.zmq_msg_close(msg)
]],
},
--
-- Monitor socket.
--
method "monitor" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
c_method_call "ZMQ_Error" "zmq_socket_monitor" { "const char *", "addr", "int", "events" }
},
c_source[[
typedef struct ZMQ_recv_event {
int event_id;
int value;
const char *addr;
size_t addr_len;
const char *err;
} ZMQ_recv_event;
#if (ZMQ_VERSION_MAJOR == 4) && (ZMQ_VERSION_MINOR >= 1)
typedef struct zmq_event_t {
int16_t event;
int32_t value;
} zmq_event_t;
#endif
int monitor_recv_event(ZMQ_Socket *s, zmq_msg_t *msg, int flags, ZMQ_recv_event *ev)
{
int rc ;
zmq_event_t event;
ev->event_id = 0;
ev->value = 0;
ev->addr = NULL;
ev->err = NULL;
ev->addr_len = 0;
zmq_msg_init(msg);
/* recv binary event. */
rc = zmq_recvmsg(s, msg, flags);
if(rc < 0) {
return rc;
}
#if ZMQ_VERSION_MAJOR == 3
if(zmq_msg_size(msg) != sizeof(event)) {
ev->err = "Invalid monitor event. Wrong event size.";
return -1;
}
memcpy(&event, zmq_msg_data(msg), sizeof(event));
ev->event_id = event.event;
switch(event.event) {
case ZMQ_EVENT_CONNECTED:
ev->value = event.data.connected.fd;
ev->addr = event.data.connected.addr;
break;
case ZMQ_EVENT_CONNECT_DELAYED:
ev->value = event.data.connect_delayed.err;
ev->addr = event.data.connect_delayed.addr;
break;
case ZMQ_EVENT_CONNECT_RETRIED:
ev->value = event.data.connect_retried.interval;
ev->addr = event.data.connect_retried.addr;
break;
case ZMQ_EVENT_LISTENING:
ev->value = event.data.listening.fd;
ev->addr = event.data.listening.addr;
break;
case ZMQ_EVENT_BIND_FAILED:
ev->value = event.data.bind_failed.err;
ev->addr = event.data.bind_failed.addr;
break;
case ZMQ_EVENT_ACCEPTED:
ev->value = event.data.accepted.fd;
ev->addr = event.data.accepted.addr;
break;
case ZMQ_EVENT_ACCEPT_FAILED:
ev->value = event.data.accept_failed.err;
ev->addr = event.data.accept_failed.addr;
break;
case ZMQ_EVENT_CLOSED:
ev->value = event.data.closed.fd;
ev->addr = event.data.closed.addr;
break;
case ZMQ_EVENT_CLOSE_FAILED:
ev->value = event.data.close_failed.err;
ev->addr = event.data.close_failed.addr;
break;
case ZMQ_EVENT_DISCONNECTED:
ev->value = event.data.disconnected.fd;
ev->addr = event.data.disconnected.addr;
break;
}
if(ev->addr) {
ev->addr_len = strlen(ev->addr);
}
if(zmq_msg_more(msg) != 0) {
ev->err = "Invalid monitor event. Has too many parts.";
return -1;
}
#else
if(zmq_msg_size(msg) != (sizeof(event.event) + sizeof(event.value))) {
ev->err = "Invalid monitor event. Wrong event size.";
return -1;
}
/* copy binary data to event struct */
const char* data = (char*)zmq_msg_data(msg);
memcpy(&(event.event), data, sizeof(event.event));
memcpy(&(event.value), data+sizeof(event.event), sizeof(event.value));
ev->event_id = event.event;
ev->value = event.value;
if(zmq_msg_more(msg) == 0) {
ev->err = "Invalid monitor event. Missing address part.";
return -1;
}
ev->value = event.value;
/* recv address part */
rc = zmq_recvmsg(s, msg, flags);
if(rc < 0) {
return rc;
}
if(zmq_msg_more(msg) != 0) {
ev->err = "Invalid monitor event. Has too many parts.";
return -1;
}
/* copy address part */
ev->addr_len = zmq_msg_size(msg) ;
ev->addr = zmq_msg_data(msg);
#endif
return 1;
}
]],
ffi_cdef[[
typedef struct ZMQ_recv_event {
int event_id;
int value;
const char *addr;
size_t addr_len;
const char *err;
} ZMQ_recv_event;
int monitor_recv_event(ZMQ_Socket *s, zmq_msg_t *msg, int flags, ZMQ_recv_event *ev);
]],
ffi_source[[
local tmp_recv_event = ffi.new('ZMQ_recv_event')
]],
method "recv_event" {
if_defs = { "VERSION_3_2", "VERSION_4_0" },
var_in{ "int", "flags?" },
var_out{ "int", "event_id" },
var_out{ "int", "value" },
var_out{ "const char *", "addr", has_length = true },
var_out{ "ZMQ_Error", "err" },
c_source[[
zmq_msg_t msg;
ZMQ_recv_event event;
/* receive monitor event */
${err} = monitor_recv_event(${this}, &msg, ${flags}, &event);
if(${err} >= 0) {
${event_id} = event.event_id;
${value} = event.value;
${addr} = event.addr;
${addr_len} = event.addr_len; //${err};
} else if(event.err != NULL) {
/* error parsing monitor event. */
lua_pushnil(L);
lua_pushstring(L, event.err);
return 2;
}
]], ]],
ffi_source "ffi_post" [[ c_source "post" [[
/* close message */
zmq_msg_close(&msg);
]],
ffi_source[[
local msg = tmp_msg
local event = tmp_recv_event
local addr
-- receive monitor event
${err} = Cmod.monitor_recv_event(${this}, msg, ${flags}, event)
if ${err} >= 0 then
addr = ffi.string(event.addr, event.addr_len)
-- close message
C.zmq_msg_close(msg)
return event.event_id, event.value, addr
end
-- close message -- close message
C.zmq_msg_close(msg) C.zmq_msg_close(msg)
if event.err ~= nil then
-- error parsing monitor event.
return nil, ffi.string(event.err)
end
]], ]],
}, },

@ -19,8 +19,10 @@
-- THE SOFTWARE. -- THE SOFTWARE.
object "ZMQ_StopWatch" { object "ZMQ_StopWatch" {
include "zmq_utils.h",
c_source[[ c_source[[
#if (ZMQ_VERSION_MAJOR <= 4) && (ZMQ_VERSION_MINOR <= 1)
#include "zmq_utils.h"
#endif
typedef struct ZMQ_StopWatch ZMQ_StopWatch; typedef struct ZMQ_StopWatch ZMQ_StopWatch;
]], ]],
constructor "start" { constructor "start" {

@ -25,21 +25,6 @@
local zmq = require"zmq" local zmq = require"zmq"
local llthreads = require"llthreads" local llthreads = require"llthreads"
local setmetatable = setmetatable
local tonumber = tonumber
local assert = assert
local thread_mt = {}
thread_mt.__index = thread_mt
function thread_mt:start(detached)
return self.thread:start(detached)
end
function thread_mt:join()
return self.thread:join()
end
local bootstrap_pre = [[ local bootstrap_pre = [[
local action, action_arg, parent_ctx = ... local action, action_arg, parent_ctx = ...
local func local func
@ -79,10 +64,7 @@ local function new_thread(ctx, action, action_arg, ...)
if ctx then if ctx then
ctx = ctx:lightuserdata() ctx = ctx:lightuserdata()
end end
local thread = llthreads.new(bootstrap_code, action, action_arg, ctx, ...) return llthreads.new(bootstrap_code, action, action_arg, ctx, ...)
return setmetatable({
thread = thread,
}, thread_mt)
end end
local M = {} local M = {}
@ -108,4 +90,5 @@ function M.get_parent_ctx(ctx)
return parent_ctx return parent_ctx
end end
zmq.threads = M
return M return M

@ -0,0 +1,75 @@
-- Copyright (c) 2011 Robert G. Jakabosky <bobby@sharedrealm.com>
--
-- Permission is hereby granted, free of charge, to any person obtaining a copy
-- of this software and associated documentation files (the "Software"), to deal
-- in the Software without restriction, including without limitation the rights
-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-- copies of the Software, and to permit persons to whom the Software is
-- furnished to do so, subject to the following conditions:
--
-- The above copyright notice and this permission notice shall be included in
-- all copies or substantial portions of the Software.
--
-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-- THE SOFTWARE.
if #arg < 1 then
print("usage: lua " .. arg[0] .. " [message-size] [roundtrip-count] [bind-to] [connect-to]")
end
local message_size = tonumber(arg[1] or 1)
local roundtrip_count = tonumber(arg[2] or 100)
local bind_to = arg[3] or 'inproc://thread_lat_test'
local connect_to = arg[4] or 'inproc://thread_lat_test'
local zmq = require"zmq"
local ctx = zmq.init(1)
local server = assert(ctx:socket(zmq.REQ))
assert(server:bind(bind_to))
local client = ctx:socket(zmq.REP)
client:connect(connect_to)
local data = ("0"):rep(message_size)
local msg = zmq.zmq_msg_t.init_size(message_size)
local client_msg = zmq.zmq_msg_t()
print(string.format("message size: %i [B]", message_size))
print(string.format("roundtrip count: %i", roundtrip_count))
local timer = zmq.stopwatch_start()
for i = 1, roundtrip_count do
-- server send
assert(server:send_msg(msg))
-- client recv
assert(client:recv_msg(client_msg))
assert(client_msg:size() == message_size, "Invalid message size")
-- client send
assert(client:send_msg(client_msg))
-- server recv
assert(server:recv_msg(msg))
assert(msg:size() == message_size, "Invalid message size")
end
local elapsed = timer:stop()
server:close()
client:close()
ctx:term()
local latency = elapsed / roundtrip_count / 2
print(string.format("mean latency: %.3f [us]", latency))
local secs = elapsed / (1000 * 1000)
print(string.format("elapsed = %f", secs))
print(string.format("msg/sec = %f", roundtrip_count / secs))

@ -34,13 +34,13 @@ tcp_port_end = -1,
-- setup protocol fields. -- setup protocol fields.
zmq_proto.fields = {} zmq_proto.fields = {}
local fds = zmq_proto.fields local fds = zmq_proto.fields
fds.frame = ProtoField.new("Frame", "zmq.frame", "FT_BYTES", nil, "BASE_NONE") fds.frame = ProtoField.new("Frame", "zmq.frame", "ftypes.BYTES", nil, "base.NONE")
fds.length = ProtoField.new("Frame Length", "zmq.frame.len", "FT_UINT64", nil, "BASE_DEC") fds.length = ProtoField.new("Frame Length", "zmq.frame.len", "ftypes.UINT64", nil, "base.DEC")
fds.length8 = ProtoField.new("Frame 8bit Length", "zmq.frame.len8", "FT_UINT8", nil, "BASE_DEC") fds.length8 = ProtoField.new("Frame 8bit Length", "zmq.frame.len8", "ftypes.UINT8", nil, "base.DEC")
fds.length64 = ProtoField.new("Frame 64bit Length", "zmq.frame.len64", "FT_UINT64", nil, "BASE_DEC") fds.length64 = ProtoField.new("Frame 64bit Length", "zmq.frame.len64", "ftypes.UINT64", nil, "base.DEC")
fds.flags = ProtoField.new("Frame Flags", "zmq.frame.flags", "FT_UINT8", nil, "BASE_HEX", "0xFF") fds.flags = ProtoField.new("Frame Flags", "zmq.frame.flags", "ftypes.UINT8", nil, "base.HEX", "0xFF")
fds.flags_more = ProtoField.new("More", "zmq.frame.flags.more", "FT_UINT8", nil, "BASE_HEX", "0x01") fds.flags_more = ProtoField.new("More", "zmq.frame.flags.more", "ftypes.UINT8", nil, "base.HEX", "0x01")
fds.body = ProtoField.new("Frame body", "zmq.frame.body", "FT_BYTES", nil, "BASE_NONE") fds.body = ProtoField.new("Frame body", "zmq.frame.body", "ftypes.BYTES", nil, "base.NONE")
-- un-register zmq to handle tcp port range -- un-register zmq to handle tcp port range
local function unregister_tcp_port_range(start_port, end_port) local function unregister_tcp_port_range(start_port, end_port)

@ -23,11 +23,18 @@ set_variable_format "%s%d"
c_module "zmq" { c_module "zmq" {
-- module settings. -- module settings.
module_globals = true, -- support old code that doesn't do: local zmq = require"zmq"
use_globals = false, use_globals = false,
hide_meta_info = true, hide_meta_info = true,
luajit_ffi = true, luajit_ffi = true,
-- needed for functions exported from module.
luajit_ffi_load_cmodule = true, luajit_ffi_load_cmodule = true,
ffi_load {
"zmq", -- default lib name.
Windows = "libzmq", -- lib name for on windows.
},
sys_include "string.h", sys_include "string.h",
include "zmq.h", include "zmq.h",
@ -35,37 +42,85 @@ c_source "typedefs" [[
/* detect zmq version */ /* detect zmq version */
#define VERSION_2_0 1 #define VERSION_2_0 1
#define VERSION_2_1 0 #define VERSION_2_1 0
#define VERSION_2_2 0
#define VERSION_3_0 0 #define VERSION_3_0 0
#define VERSION_3_2 0
#define VERSION_4_0 0
#if defined(ZMQ_VERSION_MAJOR) #if defined(ZMQ_VERSION_MAJOR)
# if (ZMQ_VERSION_MAJOR == 2) && (ZMQ_VERSION_MINOR == 2)
# undef VERSION_2_2
# define VERSION_2_2 1
# undef VERSION_2_1
# define VERSION_2_1 1
# endif
# if (ZMQ_VERSION_MAJOR == 2) && (ZMQ_VERSION_MINOR == 1) # if (ZMQ_VERSION_MAJOR == 2) && (ZMQ_VERSION_MINOR == 1)
# undef VERSION_2_1 # undef VERSION_2_1
# define VERSION_2_1 1 # define VERSION_2_1 1
# endif # endif
# if (ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR == 3)
# undef VERSION_2_0
# define VERSION_2_0 0
# undef VERSION_3_2
# define VERSION_3_2 1
# undef VERSION_3_0
# define VERSION_3_0 1
# endif
# if (ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR == 2)
# undef VERSION_2_0
# define VERSION_2_0 0
# undef VERSION_3_2
# define VERSION_3_2 1
# undef VERSION_3_0
# define VERSION_3_0 1
# endif
# if (ZMQ_VERSION_MAJOR == 3) # if (ZMQ_VERSION_MAJOR == 3)
# undef VERSION_2_0 # undef VERSION_2_0
# define VERSION_2_0 0 # define VERSION_2_0 0
# undef VERSION_3_0 # undef VERSION_3_0
# define VERSION_3_0 1 # define VERSION_3_0 1
# endif # endif
# if (ZMQ_VERSION_MAJOR == 4)
# undef VERSION_2_0
# define VERSION_2_0 0
# undef VERSION_3_2
# define VERSION_3_2 0
# undef VERSION_3_0
# define VERSION_3_0 0
# undef VERSION_4_0
# define VERSION_4_0 1
# endif
#endif #endif
/* make sure ZMQ_DONTWAIT & ZMQ_NOBLOCK are both defined. */
#ifndef ZMQ_DONTWAIT #ifndef ZMQ_DONTWAIT
# define ZMQ_DONTWAIT ZMQ_NOBLOCK # define ZMQ_DONTWAIT ZMQ_NOBLOCK
#endif #endif
#if VERSION_2_0 #ifndef ZMQ_NOBLOCK
# define ZMQ_POLL_MSEC 1000 // zmq_poll is usec # define ZMQ_NOBLOCK ZMQ_DONTWAIT
#elif VERSION_3_0
# define ZMQ_POLL_MSEC 1 // zmq_poll is msec
# ifndef ZMQ_HWM
# define ZMQ_HWM 1 // backwards compatibility
# endif
#endif #endif
/* make sure DEALER/ROUTER & XREQ/XREP are all defined. */
#ifndef ZMQ_DEALER #ifndef ZMQ_DEALER
# define ZMQ_DEALER ZMQ_XREQ # define ZMQ_DEALER ZMQ_XREQ
#endif #endif
#ifndef ZMQ_ROUTER #ifndef ZMQ_ROUTER
# define ZMQ_ROUTER ZMQ_XREP # define ZMQ_ROUTER ZMQ_XREP
#endif #endif
#ifndef ZMQ_XREQ
# define ZMQ_XREQ ZMQ_DEALER
#endif
#ifndef ZMQ_XREP
# define ZMQ_XREP ZMQ_ROUTER
#endif
#if VERSION_2_0
# define ZMQ_POLL_MSEC 1000 // zmq_poll is usec
#elif VERSION_3_0 || VERSION_4_0
# define ZMQ_POLL_MSEC 1 // zmq_poll is msec
# ifndef ZMQ_HWM
# define ZMQ_HWM 1 // backwards compatibility
# endif
#endif
]], ]],
-- --
@ -74,6 +129,10 @@ c_source "typedefs" [[
export_definitions { export_definitions {
MAX_VSM_SIZE = "ZMQ_MAX_VSM_SIZE", MAX_VSM_SIZE = "ZMQ_MAX_VSM_SIZE",
-- context settings
MAX_SOCKETS = "ZMQ_MAX_SOCKETS",
IO_THREADS = "ZMQ_IO_THREADS",
-- message types -- message types
DELIMITER = "ZMQ_DELIMITER", DELIMITER = "ZMQ_DELIMITER",
VSM = "ZMQ_VSM", VSM = "ZMQ_VSM",
@ -88,14 +147,17 @@ PUB = "ZMQ_PUB",
SUB = "ZMQ_SUB", SUB = "ZMQ_SUB",
REQ = "ZMQ_REQ", REQ = "ZMQ_REQ",
REP = "ZMQ_REP", REP = "ZMQ_REP",
DEALER = "ZMQ_DEALER",
ROUTER = "ZMQ_ROUTER",
PULL = "ZMQ_PULL", PULL = "ZMQ_PULL",
PUSH = "ZMQ_PUSH", PUSH = "ZMQ_PUSH",
-- deprecated DEALER = "ZMQ_DEALER",
XREQ = "ZMQ_DEALER", ROUTER = "ZMQ_ROUTER",
XREP = "ZMQ_ROUTER", XREQ = "ZMQ_XREQ",
XREP = "ZMQ_XREP",
-- new 3.1 socket types
XPUB = "ZMQ_XPUB",
XSUB = "ZMQ_XSUB",
-- socket options -- socket options
HWM = "ZMQ_HWM", HWM = "ZMQ_HWM",
@ -125,12 +187,42 @@ MULTICAST_HOPS = "ZMQ_MULTICAST_HOPS",
RCVTIMEO = "ZMQ_RCVTIMEO", RCVTIMEO = "ZMQ_RCVTIMEO",
SNDTIMEO = "ZMQ_SNDTIMEO", SNDTIMEO = "ZMQ_SNDTIMEO",
RCVLABEL = "ZMQ_RCVLABEL", RCVLABEL = "ZMQ_RCVLABEL",
LAST_ENDPOINT = "ZMQ_LAST_ENDPOINT",
ROUTER_MANDATORY = "ZMQ_ROUTER_MANDATORY",
TCP_KEEPALIVE = "ZMQ_TCP_KEEPALIVE",
TCP_KEEPALIVE_CNT = "ZMQ_TCP_KEEPALIVE_CNT",
TCP_KEEPALIVE_IDLE= "ZMQ_TCP_KEEPALIVE_IDLE",
TCP_KEEPALIVE_INTVL= "ZMQ_TCP_KEEPALIVE_INTVL",
TCP_ACCEPT_FILTER = "ZMQ_TCP_ACCEPT_FILTER",
IMMEDIATE = "ZMQ_IMMEDIATE",
XPUB_VERBOSE = "ZMQ_XPUB_VERBOSE",
ROUTER_RAW = "ZMQ_ROUTER_RAW",
IPV6 = "ZMQ_IPV6",
MECHANISM = "ZMQ_MECHANISM",
PLAIN_SERVER = "ZMQ_PLAIN_SERVER",
PLAIN_USERNAME = "ZMQ_PLAIN_USERNAME",
PLAIN_PASSWORD = "ZMQ_PLAIN_PASSWORD",
CURVE_SERVER = "ZMQ_CURVE_SERVER",
CURVE_PUBLICKEY = "ZMQ_CURVE_PUBLICKEY",
CURVE_SECRETKEY = "ZMQ_CURVE_SECRETKEY",
CURVE_SERVERKEY = "ZMQ_CURVE_SERVERKEY",
PROBE_ROUTER = "ZMQ_PROBE_ROUTER",
REQ_CORRELATE = "ZMQ_REQ_CORRELATE",
REQ_RELAXED = "ZMQ_REQ_RELAXED",
CONFLATE = "ZMQ_CONFLATE",
ZAP_DOMAIN = "ZMQ_ZAP_DOMAIN",
-- send/recv flags -- send/recv flags
NOBLOCK = "ZMQ_NOBLOCK", NOBLOCK = "ZMQ_NOBLOCK",
DONTWAIT = "ZMQ_DONTWAIT",
SNDMORE = "ZMQ_SNDMORE", SNDMORE = "ZMQ_SNDMORE",
SNDLABEL = "ZMQ_SNDLABEL", SNDLABEL = "ZMQ_SNDLABEL",
-- Security mechanisms
NULL = "ZMQ_NULL",
PLAIN = "ZMQ_PLAIN",
CURVE = "ZMQ_CURVE",
-- poll events -- poll events
POLLIN = "ZMQ_POLLIN", POLLIN = "ZMQ_POLLIN",
POLLOUT = "ZMQ_POLLOUT", POLLOUT = "ZMQ_POLLOUT",
@ -139,13 +231,30 @@ POLLERR = "ZMQ_POLLERR",
-- poll milliseconds. -- poll milliseconds.
POLL_MSEC = "ZMQ_POLL_MSEC", POLL_MSEC = "ZMQ_POLL_MSEC",
-- Socket Monitor events.
EVENT_CONNECTED = "ZMQ_EVENT_CONNECTED",
EVENT_CONNECT_DELAYED = "ZMQ_EVENT_CONNECT_DELAYED",
EVENT_CONNECT_RETRIED = "ZMQ_EVENT_CONNECT_RETRIED",
EVENT_LISTENING = "ZMQ_EVENT_LISTENING",
EVENT_BIND_FAILED = "ZMQ_EVENT_BIND_FAILED",
EVENT_ACCEPTED = "ZMQ_EVENT_ACCEPTED",
EVENT_ACCEPT_FAILED= "ZMQ_EVENT_ACCEPT_FAILED",
EVENT_CLOSED = "ZMQ_EVENT_CLOSED",
EVENT_CLOSE_FAILED= "ZMQ_EVENT_CLOSE_FAILED",
EVENT_DISCONNECTED= "ZMQ_EVENT_DISCONNECTED",
EVENT_MONITOR_STOPPED = "ZMQ_EVENT_MONITOR_STOPPED",
EVENT_ALL = "ZMQ_EVENT_ALL",
-- devices -- devices
STREAMER = "ZMQ_STREAMER", STREAMER = "ZMQ_STREAMER",
FORWARDER = "ZMQ_FORWARDER", FORWARDER = "ZMQ_FORWARDER",
QUEUE = "ZMQ_QUEUE", QUEUE = "ZMQ_QUEUE",
}, },
subfiles { subfiles {
"src/error.nobj.lua", "src/error.nobj.lua",
"src/msg.nobj.lua", "src/msg.nobj.lua",
@ -175,6 +284,7 @@ c_function "version" {
]], ]],
}, },
c_function "init" { c_function "init" {
var_in{ "int", "io_threads?", default = "1" },
c_call "!ZMQ_Ctx *" "zmq_init" { "int", "io_threads" }, c_call "!ZMQ_Ctx *" "zmq_init" { "int", "io_threads" },
}, },
c_function "init_ctx" { c_function "init_ctx" {
@ -198,31 +308,23 @@ c_function "init_ctx" {
end end
]], ]],
}, },
c_function "device" { if_defs = "VERSION_2_0", c_function "device" { if_defs = { "VERSION_2_0", "VERSION_3_2" },
c_call "ZMQ_Error" "zmq_device" c_call "ZMQ_Error" "zmq_device"
{ "int", "device", "ZMQ_Socket *", "insock", "ZMQ_Socket *", "outsock" }, { "int", "device", "ZMQ_Socket *", "insock", "ZMQ_Socket *", "outsock" },
}, },
c_function "proxy" { if_defs = "VERSION_3_2",
c_call "ZMQ_Error" "zmq_proxy"
{ "ZMQ_Socket *", "frontend", "ZMQ_Socket *", "backend", "ZMQ_Socket *", "capture?" },
},
-- --
-- zmq_utils.h -- utils
-- --
include "zmq_utils.h",
c_function "stopwatch_start" { c_function "stopwatch_start" {
c_call "!ZMQ_StopWatch *" "zmq_stopwatch_start" {}, c_call "!ZMQ_StopWatch *" "zmq_stopwatch_start" {},
}, },
c_function "sleep" { c_function "sleep" {
c_call "void" "zmq_sleep" { "int", "seconds_" }, c_call "void" "zmq_sleep" { "int", "seconds_" },
}, },
--
-- This dump function is for getting a copy of the FFI-based bindings code and is
-- only for debugging.
--
c_function "dump_ffi" {
var_out{ "const char *", "ffi_code", has_length = true, },
c_source[[
${ffi_code} = ${module_c_name}_ffi_lua_code;
${ffi_code_len} = sizeof(${module_c_name}_ffi_lua_code) - 1;
]],
},
} }

Loading…
Cancel
Save