From 7519b05994c56dabe38c97a1fd0474932d4b828c Mon Sep 17 00:00:00 2001 From: "Robert G. Jakabosky" Date: Fri, 25 Mar 2011 01:58:20 -0700 Subject: [PATCH] Add threaded perf benchmarks. Added push/pull perf benchmark. --- perf/local_lat.lua | 9 +--- perf/local_multipart.lua | 9 +--- perf/local_pull.lua | 66 +++++++++++++++++++++++ perf/local_thr.lua | 15 +++--- perf/remote_lat.lua | 9 +--- perf/remote_multipart.lua | 9 +--- perf/remote_push.lua | 47 ++++++++++++++++ perf/remote_thr.lua | 9 +--- perf/thread_lat.lua | 26 ++++----- perf/thread_push_pull.lua | 111 ++++++++++++++++++++++++++++++++++++++ perf/thread_thr.lua | 111 ++++++++++++++++++++++++++++++++++++++ 11 files changed, 364 insertions(+), 57 deletions(-) create mode 100644 perf/local_pull.lua create mode 100644 perf/remote_push.lua create mode 100644 perf/thread_push_pull.lua create mode 100644 perf/thread_thr.lua diff --git a/perf/local_lat.lua b/perf/local_lat.lua index c10da41..d6a4ec1 100644 --- a/perf/local_lat.lua +++ b/perf/local_lat.lua @@ -19,20 +19,15 @@ -- THE SOFTWARE. if not arg[3] then - print("usage: lua local_lat.lua []") + print("usage: lua local_lat.lua ") os.exit() end local bind_to = arg[1] local message_size = tonumber(arg[2]) local roundtrip_count = tonumber(arg[3]) -local mod = arg[4] or "zmq" -if mod == 'disable_ffi' then - disable_ffi = true - mod = 'zmq' -end -local zmq = require(mod) +local zmq = require"zmq" local ctx = zmq.init(1) local s = ctx:socket(zmq.REP) diff --git a/perf/local_multipart.lua b/perf/local_multipart.lua index 8a98e82..97c33f6 100644 --- a/perf/local_multipart.lua +++ b/perf/local_multipart.lua @@ -19,20 +19,15 @@ -- THE SOFTWARE. if not arg[3] then - print("usage: lua local_thr.lua []") + print("usage: lua local_thr.lua ") os.exit() end local bind_to = arg[1] local message_size = tonumber(arg[2]) local message_count = tonumber(arg[3]) -local mod = arg[4] or "zmq" -if mod == 'disable_ffi' then - disable_ffi = true - mod = 'zmq' -end -local zmq = require(mod) +local zmq = require"zmq" local socket = require"socket" local time = socket.gettime diff --git a/perf/local_pull.lua b/perf/local_pull.lua new file mode 100644 index 0000000..189aee1 --- /dev/null +++ b/perf/local_pull.lua @@ -0,0 +1,66 @@ +-- Copyright (c) 2011 Robert G. Jakabosky +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +if not arg[3] then + print("usage: lua local_thr.lua ") + os.exit() +end + +local bind_to = arg[1] +local message_size = tonumber(arg[2]) +local message_count = tonumber(arg[3]) + +local zmq = require"zmq" + +local socket = require"socket" +local time = socket.gettime + +local ctx = zmq.init(1) +local s = ctx:socket(zmq.PULL) +s:bind(bind_to) + +print(string.format("message size: %i [B]", message_size)) +print(string.format("message count: %i", message_count)) + +local msg +msg = zmq.zmq_msg_t() +assert(s:recv_msg(msg)) + +local start_time = time() + +for i = 1, message_count - 1 do + assert(s:recv_msg(msg)) + assert(msg:size() == message_size, "Invalid message size") +end + +local end_time = time() + +s:close() +ctx:term() + +local elapsed = end_time - start_time +if elapsed == 0 then elapsed = 1 end + +local throughput = message_count / elapsed +local megabits = throughput * message_size * 8 / 1000000 + +print(string.format("mean throughput: %i [msg/s]", throughput)) +print(string.format("mean throughput: %.3f [Mb/s]", megabits)) + diff --git a/perf/local_thr.lua b/perf/local_thr.lua index 6f327b4..fc20569 100644 --- a/perf/local_thr.lua +++ b/perf/local_thr.lua @@ -19,20 +19,15 @@ -- THE SOFTWARE. if not arg[3] then - print("usage: lua local_thr.lua []") + print("usage: lua local_thr.lua ") os.exit() end local bind_to = arg[1] local message_size = tonumber(arg[2]) local message_count = tonumber(arg[3]) -local mod = arg[4] or "zmq" -if mod == 'disable_ffi' then - disable_ffi = true - mod = 'zmq' -end -local zmq = require(mod) +local zmq = require"zmq" local socket = require"socket" local time = socket.gettime @@ -42,6 +37,9 @@ local s = ctx:socket(zmq.SUB) s:setopt(zmq.SUBSCRIBE, ""); s:bind(bind_to) +print(string.format("message size: %i [B]", message_size)) +print(string.format("message count: %i", message_count)) + local msg msg = zmq.zmq_msg_t() assert(s:recv_msg(msg)) @@ -64,7 +62,6 @@ if elapsed == 0 then elapsed = 1 end local throughput = message_count / elapsed local megabits = throughput * message_size * 8 / 1000000 -print(string.format("message size: %i [B]", message_size)) -print(string.format("message count: %i", message_count)) print(string.format("mean throughput: %i [msg/s]", throughput)) print(string.format("mean throughput: %.3f [Mb/s]", megabits)) + diff --git a/perf/remote_lat.lua b/perf/remote_lat.lua index 7b78eb7..631d0b4 100644 --- a/perf/remote_lat.lua +++ b/perf/remote_lat.lua @@ -19,20 +19,15 @@ -- THE SOFTWARE. if not arg[3] then - print("usage: lua remote_lat.lua []") + print("usage: lua remote_lat.lua ") os.exit() end local connect_to = arg[1] local message_size = tonumber(arg[2]) local roundtrip_count = tonumber(arg[3]) -local mod = arg[4] or "zmq" -if mod == 'disable_ffi' then - disable_ffi = true - mod = 'zmq' -end -local zmq = require(mod) +local zmq = require"zmq" local socket = require"socket" local time = socket.gettime diff --git a/perf/remote_multipart.lua b/perf/remote_multipart.lua index f7f7168..f071555 100644 --- a/perf/remote_multipart.lua +++ b/perf/remote_multipart.lua @@ -19,20 +19,15 @@ -- THE SOFTWARE. if not arg[3] then - print("usage: lua remote_thr.lua []") + print("usage: lua remote_thr.lua ") os.exit() end local connect_to = arg[1] local message_size = tonumber(arg[2]) local message_count = tonumber(arg[3]) -local mod = arg[4] or "zmq" -if mod == 'disable_ffi' then - disable_ffi = true - mod = 'zmq' -end -local zmq = require(mod) +local zmq = require"zmq" local z_SNDMORE = zmq.SNDMORE local ctx = zmq.init(1) diff --git a/perf/remote_push.lua b/perf/remote_push.lua new file mode 100644 index 0000000..a37a189 --- /dev/null +++ b/perf/remote_push.lua @@ -0,0 +1,47 @@ +-- Copyright (c) 2010 Aleksey Yeschenko +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +if not arg[3] then + print("usage: lua remote_thr.lua ") + os.exit() +end + +local connect_to = arg[1] +local message_size = tonumber(arg[2]) +local message_count = tonumber(arg[3]) + +local zmq = require"zmq" + +local ctx = zmq.init(1) +local s = ctx:socket(zmq.PUSH) +s:connect(connect_to) + +local data = ("0"):rep(message_size) +local msg = zmq.zmq_msg_t.init_size(message_size) + +for i = 1, message_count do + msg:set_data(data) + assert(s:send_msg(msg)) +end + +--os.execute("sleep " .. 10) + +s:close() +ctx:term() diff --git a/perf/remote_thr.lua b/perf/remote_thr.lua index c3763f5..982f36a 100644 --- a/perf/remote_thr.lua +++ b/perf/remote_thr.lua @@ -19,20 +19,15 @@ -- THE SOFTWARE. if not arg[3] then - print("usage: lua remote_thr.lua []") + print("usage: lua remote_thr.lua ") os.exit() end local connect_to = arg[1] local message_size = tonumber(arg[2]) local message_count = tonumber(arg[3]) -local mod = arg[4] or "zmq" -if mod == 'disable_ffi' then - disable_ffi = true - mod = 'zmq' -end -local zmq = require(mod) +local zmq = require"zmq" local ctx = zmq.init(1) local s = ctx:socket(zmq.PUB) diff --git a/perf/thread_lat.lua b/perf/thread_lat.lua index a0ea65a..51de621 100644 --- a/perf/thread_lat.lua +++ b/perf/thread_lat.lua @@ -18,15 +18,14 @@ -- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -- THE SOFTWARE. -if #arg < 4 then - print("usage: lua " .. arg[0] .. " ") - os.exit() +if #arg < 1 then + print("usage: lua " .. arg[0] .. " [message-size] [roundtrip-count] [bind-to] [connect-to]") end -local bind_to = arg[1] -local connect_to = arg[2] -local message_size = tonumber(arg[3]) -local roundtrip_count = tonumber(arg[4]) +local message_size = tonumber(arg[1] or 1) +local roundtrip_count = tonumber(arg[2] or 100000) +local bind_to = arg[3] or 'inproc://thread_lat_test' +local connect_to = arg[4] or 'inproc://thread_lat_test' local zmq = require"zmq" local zthreads = require"zmq.threads" @@ -66,6 +65,9 @@ child_thread:start() local data = ("0"):rep(message_size) local msg = zmq.zmq_msg_t.init_size(message_size) +print(string.format("message size: %i [B]", message_size)) +print(string.format("roundtrip count: %i", roundtrip_count)) + local start_time = time() for i = 1, roundtrip_count do @@ -76,14 +78,12 @@ end local end_time = time() +s:close() +child_thread:join() +ctx:term() + local elapsed = end_time - start_time local latency = elapsed * 1000000 / roundtrip_count / 2 -print(string.format("message size: %i [B]", message_size)) -print(string.format("roundtrip count: %i", roundtrip_count)) print(string.format("mean latency: %.3f [us]", latency)) -s:close() -ctx:term() - - diff --git a/perf/thread_push_pull.lua b/perf/thread_push_pull.lua new file mode 100644 index 0000000..d0ecab4 --- /dev/null +++ b/perf/thread_push_pull.lua @@ -0,0 +1,111 @@ +-- Copyright (c) 2011 Robert G. Jakabosky +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +if #arg < 1 then + print("usage: lua " .. arg[0] .. " [message-size] [message-count] [bind-to] [connect-to]") +end + +local message_size = tonumber(arg[1] or 1) +local message_count = tonumber(arg[2] or 100000) +local bind_to = arg[3] or 'inproc://thread_lat_test' +local connect_to = arg[4] or 'inproc://thread_lat_test' + +local zmq = require"zmq" +local zthreads = require"zmq.threads" + +local socket = require"socket" +local time = socket.gettime + +local child_code = [[ + local connect_to, message_size, message_count = ... + print("child:", ...) + + local zmq = require"zmq" + local zthreads = require"zmq.threads" + local socket = require"socket" + local time = socket.gettime + + local ctx = zthreads.get_parent_ctx() + local s = ctx:socket(zmq.PUSH) + s:setopt(zmq.HWM, message_count/4) + s:connect(connect_to) + + local data = ("0"):rep(message_size) + local msg = zmq.zmq_msg_t.init_size(message_size) + + local start_time = time() + + for i = 1, message_count do + msg:set_data(data) + assert(s:send_msg(msg)) + end + + local end_time = time() + + s:close() + + local elapsed = end_time - start_time + if elapsed == 0 then elapsed = 1 end + + local throughput = message_count / elapsed + local megabits = throughput * message_size * 8 / 1000000 + + print(string.format("Sender mean throughput: %i [msg/s]", throughput)) + print(string.format("Sender mean throughput: %.3f [Mb/s]", megabits)) + + print("sending thread finished.") +]] + +local ctx = zmq.init(1) +local s = ctx:socket(zmq.PULL) +s:bind(bind_to) + +print(string.format("message size: %i [B]", message_size)) +print(string.format("message count: %i", message_count)) + +local child_thread = zthreads.runstring(ctx, child_code, connect_to, message_size, message_count) +child_thread:start() + +local msg +msg = zmq.zmq_msg_t() +assert(s:recv_msg(msg)) + +local start_time = time() + +for i = 1, message_count - 1 do + assert(s:recv_msg(msg)) + assert(msg:size() == message_size, "Invalid message size") +end + +local end_time = time() + +s:close() +child_thread:join() +ctx:term() + +local elapsed = end_time - start_time +if elapsed == 0 then elapsed = 1 end + +local throughput = message_count / elapsed +local megabits = throughput * message_size * 8 / 1000000 + +print(string.format("mean throughput: %i [msg/s]", throughput)) +print(string.format("mean throughput: %.3f [Mb/s]", megabits)) + diff --git a/perf/thread_thr.lua b/perf/thread_thr.lua new file mode 100644 index 0000000..66abd51 --- /dev/null +++ b/perf/thread_thr.lua @@ -0,0 +1,111 @@ +-- Copyright (c) 2011 Robert G. Jakabosky +-- +-- Permission is hereby granted, free of charge, to any person obtaining a copy +-- of this software and associated documentation files (the "Software"), to deal +-- in the Software without restriction, including without limitation the rights +-- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +-- copies of the Software, and to permit persons to whom the Software is +-- furnished to do so, subject to the following conditions: +-- +-- The above copyright notice and this permission notice shall be included in +-- all copies or substantial portions of the Software. +-- +-- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +-- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +-- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +-- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +-- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +-- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +-- THE SOFTWARE. + +if #arg < 1 then + print("usage: lua " .. arg[0] .. " [message-size] [message-count] [bind-to] [connect-to]") +end + +local message_size = tonumber(arg[1] or 1) +local message_count = tonumber(arg[2] or 100000) +local bind_to = arg[3] or 'inproc://thread_lat_test' +local connect_to = arg[4] or 'inproc://thread_lat_test' + +local zmq = require"zmq" +local zthreads = require"zmq.threads" + +local socket = require"socket" +local time = socket.gettime + +local child_code = [[ + local connect_to, message_size, message_count = ... + print("child:", ...) + + local zmq = require"zmq" + local zthreads = require"zmq.threads" + local socket = require"socket" + local time = socket.gettime + + local ctx = zthreads.get_parent_ctx() + local s = ctx:socket(zmq.PUB) + s:connect(connect_to) + + local data = ("0"):rep(message_size) + local msg = zmq.zmq_msg_t.init_size(message_size) + + local start_time = time() + + for i = 1, message_count do + msg:set_data(data) + assert(s:send_msg(msg)) + end + + local end_time = time() + + s:close() + + local elapsed = end_time - start_time + if elapsed == 0 then elapsed = 1 end + + local throughput = message_count / elapsed + local megabits = throughput * message_size * 8 / 1000000 + + print(string.format("Sender mean throughput: %i [msg/s]", throughput)) + print(string.format("Sender mean throughput: %.3f [Mb/s]", megabits)) + + print("sending thread finished.") +]] + +local ctx = zmq.init(1) +local s = ctx:socket(zmq.SUB) +s:setopt(zmq.SUBSCRIBE, ""); +s:bind(bind_to) + +print(string.format("message size: %i [B]", message_size)) +print(string.format("message count: %i", message_count)) + +local child_thread = zthreads.runstring(ctx, child_code, connect_to, message_size, message_count) +child_thread:start() + +local msg +msg = zmq.zmq_msg_t() +assert(s:recv_msg(msg)) + +local start_time = time() + +for i = 1, message_count - 1 do + assert(s:recv_msg(msg)) + assert(msg:size() == message_size, "Invalid message size") +end + +local end_time = time() + +s:close() +child_thread:join() +ctx:term() + +local elapsed = end_time - start_time +if elapsed == 0 then elapsed = 1 end + +local throughput = message_count / elapsed +local megabits = throughput * message_size * 8 / 1000000 + +print(string.format("mean throughput: %i [msg/s]", throughput)) +print(string.format("mean throughput: %.3f [Mb/s]", megabits)) +