![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file 00004 00005 This file is part of 0MQ. 00006 00007 0MQ is free software; you can redistribute it and/or modify it under 00008 the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 0MQ is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00021 #include <assert.h> 00022 #include <string.h> 00023 #include <pthread.h> 00024 #include <stdio.h> 00025 00026 #include "../include/zmq.h" 00027 #include "../include/zmq_utils.h" 00028 00029 extern "C" 00030 { 00031 void *worker(void *ctx) 00032 { 00033 // Worker thread connects after delay of 1 second. Then it waits 00034 // for 1 more second, so that async connect has time to succeed. 00035 zmq_sleep (1); 00036 void *sc = zmq_socket (ctx, ZMQ_PUSH); 00037 assert (sc); 00038 int rc = zmq_connect (sc, "inproc://timeout_test"); 00039 assert (rc == 0); 00040 zmq_sleep (1); 00041 rc = zmq_close (sc); 00042 assert (rc == 0); 00043 return NULL; 00044 } 00045 } 00046 00047 int main (int argc, char *argv []) 00048 { 00049 fprintf (stderr, "test_timeo running...\n"); 00050 00051 void *ctx = zmq_init (1); 00052 assert (ctx); 00053 00054 // Create a disconnected socket. 00055 void *sb = zmq_socket (ctx, ZMQ_PULL); 00056 assert (sb); 00057 int rc = zmq_bind (sb, "inproc://timeout_test"); 00058 assert (rc == 0); 00059 00060 // Check whether non-blocking recv returns immediately. 00061 char buf [] = "12345678ABCDEFGH12345678abcdefgh"; 00062 rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT); 00063 assert (rc == -1); 00064 assert (zmq_errno() == EAGAIN); 00065 00066 // Check whether recv timeout is honoured. 00067 int timeout = 500; 00068 size_t timeout_size = sizeof timeout; 00069 rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); 00070 assert (rc == 0); 00071 void *watch = zmq_stopwatch_start (); 00072 rc = zmq_recv (sb, buf, 32, 0); 00073 assert (rc == -1); 00074 assert (zmq_errno () == EAGAIN); 00075 unsigned long elapsed = zmq_stopwatch_stop (watch); 00076 assert (elapsed > 440000 && elapsed < 550000); 00077 00078 // Check whether connection during the wait doesn't distort the timeout. 00079 timeout = 2000; 00080 rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); 00081 assert (rc == 0); 00082 pthread_t thread; 00083 rc = pthread_create (&thread, NULL, worker, ctx); 00084 assert (rc == 0); 00085 watch = zmq_stopwatch_start (); 00086 rc = zmq_recv (sb, buf, 32, 0); 00087 assert (rc == -1); 00088 assert (zmq_errno () == EAGAIN); 00089 elapsed = zmq_stopwatch_stop (watch); 00090 assert (elapsed > 1900000 && elapsed < 2100000); 00091 rc = pthread_join (thread, NULL); 00092 assert (rc == 0); 00093 00094 // Check that timeouts don't break normal message transfer. 00095 void *sc = zmq_socket (ctx, ZMQ_PUSH); 00096 assert (sc); 00097 rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); 00098 assert (rc == 0); 00099 rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size); 00100 assert (rc == 0); 00101 rc = zmq_connect (sc, "inproc://timeout_test"); 00102 assert (rc == 0); 00103 rc = zmq_send (sc, buf, 32, 0); 00104 assert (rc == 32); 00105 rc = zmq_recv (sb, buf, 32, 0); 00106 assert (rc == 32); 00107 00108 // Clean-up. 00109 rc = zmq_close (sc); 00110 assert (rc == 0); 00111 rc = zmq_close (sb); 00112 assert (rc == 0); 00113 rc = zmq_term (ctx); 00114 assert (rc == 0); 00115 00116 return 0 ; 00117 } 00118