libzmq master
The Intelligent Transport Layer

inproc_thr.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include "../include/zmq.h"
00023 #include "../include/zmq_utils.h"
00024 
00025 #include <stdio.h>
00026 #include <stdlib.h>
00027 #include <string.h>
00028 
00029 #include "../src/platform.hpp"
00030 
00031 #if defined ZMQ_HAVE_WINDOWS
00032 #include <windows.h>
00033 #include <process.h>
00034 #else
00035 #include <pthread.h>
00036 #endif
00037 
00038 static int message_count;
00039 static size_t message_size;
00040 
00041 #if defined ZMQ_HAVE_WINDOWS
00042 static unsigned int __stdcall worker (void *ctx_)
00043 #else
00044 static void *worker (void *ctx_)
00045 #endif
00046 {
00047     void *s;
00048     int rc;
00049     int i;
00050     zmq_msg_t msg;
00051 
00052     s = zmq_socket (ctx_, ZMQ_PUSH);
00053     if (!s) {
00054         printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
00055         exit (1);
00056     }
00057 
00058     rc = zmq_connect (s, "inproc://thr_test");
00059     if (rc != 0) {
00060         printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
00061         exit (1);
00062     }
00063 
00064     for (i = 0; i != message_count; i++) {
00065 
00066         rc = zmq_msg_init_size (&msg, message_size);
00067         if (rc != 0) {
00068             printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
00069             exit (1);
00070         }
00071 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00072         memset (zmq_msg_data (&msg), 0, message_size);
00073 #endif
00074 
00075         rc = zmq_sendmsg (s, &msg, 0);
00076         if (rc < 0) {
00077             printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
00078             exit (1);
00079         }
00080         rc = zmq_msg_close (&msg);
00081         if (rc != 0) {
00082             printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
00083             exit (1);
00084         }
00085     }
00086 
00087     rc = zmq_close (s);
00088     if (rc != 0) {
00089         printf ("error in zmq_close: %s\n", zmq_strerror (errno));
00090         exit (1);
00091     }
00092 
00093 #if defined ZMQ_HAVE_WINDOWS
00094     return 0;
00095 #else
00096     return NULL;
00097 #endif
00098 }
00099 
00100 int main (int argc, char *argv [])
00101 {
00102 #if defined ZMQ_HAVE_WINDOWS
00103     HANDLE local_thread;
00104 #else
00105     pthread_t local_thread;
00106 #endif
00107     void *ctx;
00108     void *s;
00109     int rc;
00110     int i;
00111     zmq_msg_t msg;
00112     void *watch;
00113     unsigned long elapsed;
00114     unsigned long throughput;
00115     double megabits;
00116 
00117     if (argc != 3) {
00118         printf ("usage: thread_thr <message-size> <message-count>\n");
00119         return 1;
00120     }
00121 
00122     message_size = atoi (argv [1]);
00123     message_count = atoi (argv [2]);
00124 
00125     ctx = zmq_init (1);
00126     if (!ctx) {
00127         printf ("error in zmq_init: %s\n", zmq_strerror (errno));
00128         return -1;
00129     }
00130 
00131     s = zmq_socket (ctx, ZMQ_PULL);
00132     if (!s) {
00133         printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
00134         return -1;
00135     }
00136 
00137     rc = zmq_bind (s, "inproc://thr_test");
00138     if (rc != 0) {
00139         printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
00140         return -1;
00141     }
00142 
00143 #if defined ZMQ_HAVE_WINDOWS
00144     local_thread = (HANDLE) _beginthreadex (NULL, 0,
00145         worker, ctx, 0 , NULL);
00146     if (local_thread == 0) {
00147         printf ("error in _beginthreadex\n");
00148         return -1;
00149     }
00150 #else
00151     rc = pthread_create (&local_thread, NULL, worker, ctx);
00152     if (rc != 0) {
00153         printf ("error in pthread_create: %s\n", zmq_strerror (rc));
00154         return -1;
00155     }
00156 #endif
00157 
00158     rc = zmq_msg_init (&msg);
00159     if (rc != 0) {
00160         printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
00161         return -1;
00162     }
00163 
00164     printf ("message size: %d [B]\n", (int) message_size);
00165     printf ("message count: %d\n", (int) message_count);
00166 
00167     rc = zmq_recvmsg (s, &msg, 0);
00168     if (rc < 0) {
00169         printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
00170         return -1;
00171     }
00172     if (zmq_msg_size (&msg) != message_size) {
00173         printf ("message of incorrect size received\n");
00174         return -1;
00175     }
00176 
00177     watch = zmq_stopwatch_start ();
00178 
00179     for (i = 0; i != message_count - 1; i++) {
00180         rc = zmq_recvmsg (s, &msg, 0);
00181         if (rc < 0) {
00182             printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
00183             return -1;
00184         }
00185         if (zmq_msg_size (&msg) != message_size) {
00186             printf ("message of incorrect size received\n");
00187             return -1;
00188         }
00189     }
00190 
00191     elapsed = zmq_stopwatch_stop (watch);
00192     if (elapsed == 0)
00193         elapsed = 1;
00194 
00195     rc = zmq_msg_close (&msg);
00196     if (rc != 0) {
00197         printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
00198         return -1;
00199     }
00200 
00201 #if defined ZMQ_HAVE_WINDOWS
00202     DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
00203     if (rc2 == WAIT_FAILED) {
00204         printf ("error in WaitForSingleObject\n");
00205         return -1;
00206     }
00207     BOOL rc3 = CloseHandle (local_thread);
00208     if (rc3 == 0) {
00209         printf ("error in CloseHandle\n");
00210         return -1;
00211     }
00212 #else
00213     rc = pthread_join (local_thread, NULL);
00214     if (rc != 0) {
00215         printf ("error in pthread_join: %s\n", zmq_strerror (rc));
00216         return -1;
00217     }
00218 #endif
00219 
00220     rc = zmq_close (s);
00221     if (rc != 0) {
00222         printf ("error in zmq_close: %s\n", zmq_strerror (errno));
00223         return -1;
00224     }
00225 
00226     rc = zmq_term (ctx);
00227     if (rc != 0) {
00228         printf ("error in zmq_term: %s\n", zmq_strerror (errno));
00229         return -1;
00230     }
00231 
00232     throughput = (unsigned long)
00233         ((double) message_count / (double) elapsed * 1000000);
00234     megabits = (double) (throughput * message_size * 8) / 1000000;
00235 
00236     printf ("mean throughput: %d [msg/s]\n", (int) throughput);
00237     printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
00238 
00239     return 0;
00240 }
00241 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines