libzmq master
The Intelligent Transport Layer

inproc_lat.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include "../include/zmq.h"
00023 #include "../include/zmq_utils.h"
00024 
00025 #include <stdio.h>
00026 #include <stdlib.h>
00027 #include <string.h>
00028 
00029 #include "../src/platform.hpp"
00030 
00031 #if defined ZMQ_HAVE_WINDOWS
00032 #include <windows.h>
00033 #include <process.h>
00034 #else
00035 #include <pthread.h>
00036 #endif
00037 
00038 static size_t message_size;
00039 static int roundtrip_count;
00040 
00041 #if defined ZMQ_HAVE_WINDOWS
00042 static unsigned int __stdcall worker (void *ctx_)
00043 #else
00044 static void *worker (void *ctx_)
00045 #endif
00046 {
00047     void *s;
00048     int rc;
00049     int i;
00050     zmq_msg_t msg;
00051 
00052     s = zmq_socket (ctx_, ZMQ_REP);
00053     if (!s) {
00054         printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
00055         exit (1);
00056     }
00057 
00058     rc = zmq_connect (s, "inproc://lat_test");
00059     if (rc != 0) {
00060         printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
00061         exit (1);
00062     }
00063 
00064     rc = zmq_msg_init (&msg);
00065     if (rc != 0) {
00066         printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
00067         exit (1);
00068     }
00069 
00070     for (i = 0; i != roundtrip_count; i++) {
00071         rc = zmq_recvmsg (s, &msg, 0);
00072         if (rc < 0) {
00073             printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
00074             exit (1);
00075         }
00076         rc = zmq_sendmsg (s, &msg, 0);
00077         if (rc < 0) {
00078             printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
00079             exit (1);
00080         }
00081     }
00082 
00083     rc = zmq_msg_close (&msg);
00084     if (rc != 0) {
00085         printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
00086         exit (1);
00087     }
00088 
00089     rc = zmq_close (s);
00090     if (rc != 0) {
00091         printf ("error in zmq_close: %s\n", zmq_strerror (errno));
00092         exit (1);
00093     }
00094 
00095 #if defined ZMQ_HAVE_WINDOWS
00096     return 0;
00097 #else
00098     return NULL;
00099 #endif
00100 }
00101 
00102 int main (int argc, char *argv [])
00103 {
00104 #if defined ZMQ_HAVE_WINDOWS
00105     HANDLE local_thread;
00106 #else
00107     pthread_t local_thread;
00108 #endif
00109     void *ctx;
00110     void *s;
00111     int rc;
00112     int i;
00113     zmq_msg_t msg;
00114     void *watch;
00115     unsigned long elapsed;
00116     double latency;
00117 
00118     if (argc != 3) {
00119         printf ("usage: inproc_lat <message-size> <roundtrip-count>\n");
00120         return 1;
00121     }
00122 
00123     message_size = atoi (argv [1]);
00124     roundtrip_count = atoi (argv [2]);
00125 
00126     ctx = zmq_init (1);
00127     if (!ctx) {
00128         printf ("error in zmq_init: %s\n", zmq_strerror (errno));
00129         return -1;
00130     }
00131 
00132     s = zmq_socket (ctx, ZMQ_REQ);
00133     if (!s) {
00134         printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
00135         return -1;
00136     }
00137 
00138     rc = zmq_bind (s, "inproc://lat_test");
00139     if (rc != 0) {
00140         printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
00141         return -1;
00142     }
00143 
00144 #if defined ZMQ_HAVE_WINDOWS
00145     local_thread = (HANDLE) _beginthreadex (NULL, 0,
00146         worker, ctx, 0 , NULL);
00147     if (local_thread == 0) {
00148         printf ("error in _beginthreadex\n");
00149         return -1;
00150     }
00151 #else
00152     rc = pthread_create (&local_thread, NULL, worker, ctx);
00153     if (rc != 0) {
00154         printf ("error in pthread_create: %s\n", zmq_strerror (rc));
00155         return -1;
00156     }
00157 #endif
00158 
00159     rc = zmq_msg_init_size (&msg, message_size);
00160     if (rc != 0) {
00161         printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
00162         return -1;
00163     }
00164     memset (zmq_msg_data (&msg), 0, message_size);
00165 
00166     printf ("message size: %d [B]\n", (int) message_size);
00167     printf ("roundtrip count: %d\n", (int) roundtrip_count);
00168 
00169     watch = zmq_stopwatch_start ();
00170 
00171     for (i = 0; i != roundtrip_count; i++) {
00172         rc = zmq_sendmsg (s, &msg, 0);
00173         if (rc < 0) {
00174             printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
00175             return -1;
00176         }
00177         rc = zmq_recvmsg (s, &msg, 0);
00178         if (rc < 0) {
00179             printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
00180             return -1;
00181         }
00182         if (zmq_msg_size (&msg) != message_size) {
00183             printf ("message of incorrect size received\n");
00184             return -1;
00185         }
00186     }
00187 
00188     elapsed = zmq_stopwatch_stop (watch);
00189 
00190     rc = zmq_msg_close (&msg);
00191     if (rc != 0) {
00192         printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
00193         return -1;
00194     }
00195 
00196     latency = (double) elapsed / (roundtrip_count * 2);
00197 
00198 #if defined ZMQ_HAVE_WINDOWS
00199     DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
00200     if (rc2 == WAIT_FAILED) {
00201         printf ("error in WaitForSingleObject\n");
00202         return -1;
00203     }
00204     BOOL rc3 = CloseHandle (local_thread);
00205     if (rc3 == 0) {
00206         printf ("error in CloseHandle\n");
00207         return -1;
00208     }
00209 #else
00210     rc = pthread_join (local_thread, NULL);
00211     if (rc != 0) {
00212         printf ("error in pthread_join: %s\n", zmq_strerror (rc));
00213         return -1;
00214     }
00215 #endif
00216 
00217     printf ("average latency: %.3f [us]\n", (double) latency);
00218 
00219     rc = zmq_close (s);
00220     if (rc != 0) {
00221         printf ("error in zmq_close: %s\n", zmq_strerror (errno));
00222         return -1;
00223     }
00224 
00225     rc = zmq_term (ctx);
00226     if (rc != 0) {
00227         printf ("error in zmq_term: %s\n", zmq_strerror (errno));
00228         return -1;
00229     }
00230 
00231     return 0;
00232 }
00233 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines