![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "../include/zmq.h" 00023 #include "../include/zmq_utils.h" 00024 00025 #include <stdio.h> 00026 #include <stdlib.h> 00027 #include <string.h> 00028 00029 #include "../src/platform.hpp" 00030 00031 #if defined ZMQ_HAVE_WINDOWS 00032 #include <windows.h> 00033 #include <process.h> 00034 #else 00035 #include <pthread.h> 00036 #endif 00037 00038 static int message_count; 00039 static size_t message_size; 00040 00041 #if defined ZMQ_HAVE_WINDOWS 00042 static unsigned int __stdcall worker (void *ctx_) 00043 #else 00044 static void *worker (void *ctx_) 00045 #endif 00046 { 00047 void *s; 00048 int rc; 00049 int i; 00050 zmq_msg_t msg; 00051 00052 s = zmq_socket (ctx_, ZMQ_PUSH); 00053 if (!s) { 00054 printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); 00055 exit (1); 00056 } 00057 00058 rc = zmq_connect (s, "inproc://thr_test"); 00059 if (rc != 0) { 00060 printf ("error in zmq_connect: %s\n", zmq_strerror (errno)); 00061 exit (1); 00062 } 00063 00064 for (i = 0; i != message_count; i++) { 00065 00066 rc = zmq_msg_init_size (&msg, message_size); 00067 if (rc != 0) { 00068 printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno)); 00069 exit (1); 00070 } 00071 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00072 memset (zmq_msg_data (&msg), 0, message_size); 00073 #endif 00074 00075 rc = zmq_sendmsg (s, &msg, 0); 00076 if (rc < 0) { 00077 printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); 00078 exit (1); 00079 } 00080 rc = zmq_msg_close (&msg); 00081 if (rc != 0) { 00082 printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); 00083 exit (1); 00084 } 00085 } 00086 00087 rc = zmq_close (s); 00088 if (rc != 0) { 00089 printf ("error in zmq_close: %s\n", zmq_strerror (errno)); 00090 exit (1); 00091 } 00092 00093 #if defined ZMQ_HAVE_WINDOWS 00094 return 0; 00095 #else 00096 return NULL; 00097 #endif 00098 } 00099 00100 int main (int argc, char *argv []) 00101 { 00102 #if defined ZMQ_HAVE_WINDOWS 00103 HANDLE local_thread; 00104 #else 00105 pthread_t local_thread; 00106 #endif 00107 void *ctx; 00108 void *s; 00109 int rc; 00110 int i; 00111 zmq_msg_t msg; 00112 void *watch; 00113 unsigned long elapsed; 00114 unsigned long throughput; 00115 double megabits; 00116 00117 if (argc != 3) { 00118 printf ("usage: thread_thr <message-size> <message-count>\n"); 00119 return 1; 00120 } 00121 00122 message_size = atoi (argv [1]); 00123 message_count = atoi (argv [2]); 00124 00125 ctx = zmq_init (1); 00126 if (!ctx) { 00127 printf ("error in zmq_init: %s\n", zmq_strerror (errno)); 00128 return -1; 00129 } 00130 00131 s = zmq_socket (ctx, ZMQ_PULL); 00132 if (!s) { 00133 printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); 00134 return -1; 00135 } 00136 00137 rc = zmq_bind (s, "inproc://thr_test"); 00138 if (rc != 0) { 00139 printf ("error in zmq_bind: %s\n", zmq_strerror (errno)); 00140 return -1; 00141 } 00142 00143 #if defined ZMQ_HAVE_WINDOWS 00144 local_thread = (HANDLE) _beginthreadex (NULL, 0, 00145 worker, ctx, 0 , NULL); 00146 if (local_thread == 0) { 00147 printf ("error in _beginthreadex\n"); 00148 return -1; 00149 } 00150 #else 00151 rc = pthread_create (&local_thread, NULL, worker, ctx); 00152 if (rc != 0) { 00153 printf ("error in pthread_create: %s\n", zmq_strerror (rc)); 00154 return -1; 00155 } 00156 #endif 00157 00158 rc = zmq_msg_init (&msg); 00159 if (rc != 0) { 00160 printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); 00161 return -1; 00162 } 00163 00164 printf ("message size: %d [B]\n", (int) message_size); 00165 printf ("message count: %d\n", (int) message_count); 00166 00167 rc = zmq_recvmsg (s, &msg, 0); 00168 if (rc < 0) { 00169 printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); 00170 return -1; 00171 } 00172 if (zmq_msg_size (&msg) != message_size) { 00173 printf ("message of incorrect size received\n"); 00174 return -1; 00175 } 00176 00177 watch = zmq_stopwatch_start (); 00178 00179 for (i = 0; i != message_count - 1; i++) { 00180 rc = zmq_recvmsg (s, &msg, 0); 00181 if (rc < 0) { 00182 printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); 00183 return -1; 00184 } 00185 if (zmq_msg_size (&msg) != message_size) { 00186 printf ("message of incorrect size received\n"); 00187 return -1; 00188 } 00189 } 00190 00191 elapsed = zmq_stopwatch_stop (watch); 00192 if (elapsed == 0) 00193 elapsed = 1; 00194 00195 rc = zmq_msg_close (&msg); 00196 if (rc != 0) { 00197 printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); 00198 return -1; 00199 } 00200 00201 #if defined ZMQ_HAVE_WINDOWS 00202 DWORD rc2 = WaitForSingleObject (local_thread, INFINITE); 00203 if (rc2 == WAIT_FAILED) { 00204 printf ("error in WaitForSingleObject\n"); 00205 return -1; 00206 } 00207 BOOL rc3 = CloseHandle (local_thread); 00208 if (rc3 == 0) { 00209 printf ("error in CloseHandle\n"); 00210 return -1; 00211 } 00212 #else 00213 rc = pthread_join (local_thread, NULL); 00214 if (rc != 0) { 00215 printf ("error in pthread_join: %s\n", zmq_strerror (rc)); 00216 return -1; 00217 } 00218 #endif 00219 00220 rc = zmq_close (s); 00221 if (rc != 0) { 00222 printf ("error in zmq_close: %s\n", zmq_strerror (errno)); 00223 return -1; 00224 } 00225 00226 rc = zmq_term (ctx); 00227 if (rc != 0) { 00228 printf ("error in zmq_term: %s\n", zmq_strerror (errno)); 00229 return -1; 00230 } 00231 00232 throughput = (unsigned long) 00233 ((double) message_count / (double) elapsed * 1000000); 00234 megabits = (double) (throughput * message_size * 8) / 1000000; 00235 00236 printf ("mean throughput: %d [msg/s]\n", (int) throughput); 00237 printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits); 00238 00239 return 0; 00240 } 00241