![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "../include/zmq.h" 00023 #include "../include/zmq_utils.h" 00024 00025 #include <stdio.h> 00026 #include <stdlib.h> 00027 #include <string.h> 00028 00029 #include "../src/platform.hpp" 00030 00031 #if defined ZMQ_HAVE_WINDOWS 00032 #include <windows.h> 00033 #include <process.h> 00034 #else 00035 #include <pthread.h> 00036 #endif 00037 00038 static size_t message_size; 00039 static int roundtrip_count; 00040 00041 #if defined ZMQ_HAVE_WINDOWS 00042 static unsigned int __stdcall worker (void *ctx_) 00043 #else 00044 static void *worker (void *ctx_) 00045 #endif 00046 { 00047 void *s; 00048 int rc; 00049 int i; 00050 zmq_msg_t msg; 00051 00052 s = zmq_socket (ctx_, ZMQ_REP); 00053 if (!s) { 00054 printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); 00055 exit (1); 00056 } 00057 00058 rc = zmq_connect (s, "inproc://lat_test"); 00059 if (rc != 0) { 00060 printf ("error in zmq_connect: %s\n", zmq_strerror (errno)); 00061 exit (1); 00062 } 00063 00064 rc = zmq_msg_init (&msg); 00065 if (rc != 0) { 00066 printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); 00067 exit (1); 00068 } 00069 00070 for (i = 0; i != roundtrip_count; i++) { 00071 rc = zmq_recvmsg (s, &msg, 0); 00072 if (rc < 0) { 00073 printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); 00074 exit (1); 00075 } 00076 rc = zmq_sendmsg (s, &msg, 0); 00077 if (rc < 0) { 00078 printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); 00079 exit (1); 00080 } 00081 } 00082 00083 rc = zmq_msg_close (&msg); 00084 if (rc != 0) { 00085 printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); 00086 exit (1); 00087 } 00088 00089 rc = zmq_close (s); 00090 if (rc != 0) { 00091 printf ("error in zmq_close: %s\n", zmq_strerror (errno)); 00092 exit (1); 00093 } 00094 00095 #if defined ZMQ_HAVE_WINDOWS 00096 return 0; 00097 #else 00098 return NULL; 00099 #endif 00100 } 00101 00102 int main (int argc, char *argv []) 00103 { 00104 #if defined ZMQ_HAVE_WINDOWS 00105 HANDLE local_thread; 00106 #else 00107 pthread_t local_thread; 00108 #endif 00109 void *ctx; 00110 void *s; 00111 int rc; 00112 int i; 00113 zmq_msg_t msg; 00114 void *watch; 00115 unsigned long elapsed; 00116 double latency; 00117 00118 if (argc != 3) { 00119 printf ("usage: inproc_lat <message-size> <roundtrip-count>\n"); 00120 return 1; 00121 } 00122 00123 message_size = atoi (argv [1]); 00124 roundtrip_count = atoi (argv [2]); 00125 00126 ctx = zmq_init (1); 00127 if (!ctx) { 00128 printf ("error in zmq_init: %s\n", zmq_strerror (errno)); 00129 return -1; 00130 } 00131 00132 s = zmq_socket (ctx, ZMQ_REQ); 00133 if (!s) { 00134 printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); 00135 return -1; 00136 } 00137 00138 rc = zmq_bind (s, "inproc://lat_test"); 00139 if (rc != 0) { 00140 printf ("error in zmq_bind: %s\n", zmq_strerror (errno)); 00141 return -1; 00142 } 00143 00144 #if defined ZMQ_HAVE_WINDOWS 00145 local_thread = (HANDLE) _beginthreadex (NULL, 0, 00146 worker, ctx, 0 , NULL); 00147 if (local_thread == 0) { 00148 printf ("error in _beginthreadex\n"); 00149 return -1; 00150 } 00151 #else 00152 rc = pthread_create (&local_thread, NULL, worker, ctx); 00153 if (rc != 0) { 00154 printf ("error in pthread_create: %s\n", zmq_strerror (rc)); 00155 return -1; 00156 } 00157 #endif 00158 00159 rc = zmq_msg_init_size (&msg, message_size); 00160 if (rc != 0) { 00161 printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno)); 00162 return -1; 00163 } 00164 memset (zmq_msg_data (&msg), 0, message_size); 00165 00166 printf ("message size: %d [B]\n", (int) message_size); 00167 printf ("roundtrip count: %d\n", (int) roundtrip_count); 00168 00169 watch = zmq_stopwatch_start (); 00170 00171 for (i = 0; i != roundtrip_count; i++) { 00172 rc = zmq_sendmsg (s, &msg, 0); 00173 if (rc < 0) { 00174 printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); 00175 return -1; 00176 } 00177 rc = zmq_recvmsg (s, &msg, 0); 00178 if (rc < 0) { 00179 printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); 00180 return -1; 00181 } 00182 if (zmq_msg_size (&msg) != message_size) { 00183 printf ("message of incorrect size received\n"); 00184 return -1; 00185 } 00186 } 00187 00188 elapsed = zmq_stopwatch_stop (watch); 00189 00190 rc = zmq_msg_close (&msg); 00191 if (rc != 0) { 00192 printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); 00193 return -1; 00194 } 00195 00196 latency = (double) elapsed / (roundtrip_count * 2); 00197 00198 #if defined ZMQ_HAVE_WINDOWS 00199 DWORD rc2 = WaitForSingleObject (local_thread, INFINITE); 00200 if (rc2 == WAIT_FAILED) { 00201 printf ("error in WaitForSingleObject\n"); 00202 return -1; 00203 } 00204 BOOL rc3 = CloseHandle (local_thread); 00205 if (rc3 == 0) { 00206 printf ("error in CloseHandle\n"); 00207 return -1; 00208 } 00209 #else 00210 rc = pthread_join (local_thread, NULL); 00211 if (rc != 0) { 00212 printf ("error in pthread_join: %s\n", zmq_strerror (rc)); 00213 return -1; 00214 } 00215 #endif 00216 00217 printf ("average latency: %.3f [us]\n", (double) latency); 00218 00219 rc = zmq_close (s); 00220 if (rc != 0) { 00221 printf ("error in zmq_close: %s\n", zmq_strerror (errno)); 00222 return -1; 00223 } 00224 00225 rc = zmq_term (ctx); 00226 if (rc != 0) { 00227 printf ("error in zmq_term: %s\n", zmq_strerror (errno)); 00228 return -1; 00229 } 00230 00231 return 0; 00232 } 00233