![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2011 VMware, Inc. 00004 Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <assert.h> 00023 #include <string.h> 00024 #include <stdio.h> 00025 00026 #include "../include/zmq.h" 00027 00028 int main (int argc, char *argv []) 00029 { 00030 fprintf (stderr, "test_reqrep_device running...\n"); 00031 00032 void *ctx = zmq_init (1); 00033 assert (ctx); 00034 00035 // Create a req/rep device. 00036 void *xreq = zmq_socket (ctx, ZMQ_XREQ); 00037 assert (xreq); 00038 int rc = zmq_bind (xreq, "tcp://127.0.0.1:5560"); 00039 assert (rc == 0); 00040 void *xrep = zmq_socket (ctx, ZMQ_XREP); 00041 assert (xrep); 00042 rc = zmq_bind (xrep, "tcp://127.0.0.1:5561"); 00043 assert (rc == 0); 00044 00045 // Create a worker. 00046 void *rep = zmq_socket (ctx, ZMQ_REP); 00047 assert (rep); 00048 rc = zmq_connect (rep, "tcp://127.0.0.1:5560"); 00049 assert (rc == 0); 00050 00051 // Create a client. 00052 void *req = zmq_socket (ctx, ZMQ_REQ); 00053 assert (req); 00054 rc = zmq_connect (req, "tcp://127.0.0.1:5561"); 00055 assert (rc == 0); 00056 00057 // Send a request. 00058 rc = zmq_send (req, "ABC", 3, ZMQ_SNDMORE); 00059 assert (rc == 3); 00060 rc = zmq_send (req, "DEF", 3, 0); 00061 assert (rc == 3); 00062 00063 // Pass the request through the device. 00064 for (int i = 0; i != 4; i++) { 00065 zmq_msg_t msg; 00066 rc = zmq_msg_init (&msg); 00067 assert (rc == 0); 00068 rc = zmq_recvmsg (xrep, &msg, 0); 00069 assert (rc >= 0); 00070 int rcvmore; 00071 size_t sz = sizeof (rcvmore); 00072 rc = zmq_getsockopt (xrep, ZMQ_RCVMORE, &rcvmore, &sz); 00073 assert (rc == 0); 00074 rc = zmq_sendmsg (xreq, &msg, rcvmore ? ZMQ_SNDMORE : 0); 00075 assert (rc >= 0); 00076 } 00077 00078 // Receive the request. 00079 char buff [3]; 00080 rc = zmq_recv (rep, buff, 3, 0); 00081 assert (rc == 3); 00082 assert (memcmp (buff, "ABC", 3) == 0); 00083 int rcvmore; 00084 size_t sz = sizeof (rcvmore); 00085 rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); 00086 assert (rc == 0); 00087 assert (rcvmore); 00088 rc = zmq_recv (rep, buff, 3, 0); 00089 assert (rc == 3); 00090 assert (memcmp (buff, "DEF", 3) == 0); 00091 rc = zmq_getsockopt (rep, ZMQ_RCVMORE, &rcvmore, &sz); 00092 assert (rc == 0); 00093 assert (!rcvmore); 00094 00095 // Send the reply. 00096 rc = zmq_send (rep, "GHI", 3, ZMQ_SNDMORE); 00097 assert (rc == 3); 00098 rc = zmq_send (rep, "JKL", 3, 0); 00099 assert (rc == 3); 00100 00101 // Pass the reply through the device. 00102 for (int i = 0; i != 4; i++) { 00103 zmq_msg_t msg; 00104 rc = zmq_msg_init (&msg); 00105 assert (rc == 0); 00106 rc = zmq_recvmsg (xreq, &msg, 0); 00107 assert (rc >= 0); 00108 int rcvmore; 00109 rc = zmq_getsockopt (xreq, ZMQ_RCVMORE, &rcvmore, &sz); 00110 assert (rc == 0); 00111 rc = zmq_sendmsg (xrep, &msg, rcvmore ? ZMQ_SNDMORE : 0); 00112 assert (rc >= 0); 00113 } 00114 00115 // Receive the reply. 00116 rc = zmq_recv (req, buff, 3, 0); 00117 assert (rc == 3); 00118 assert (memcmp (buff, "GHI", 3) == 0); 00119 rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); 00120 assert (rc == 0); 00121 assert (rcvmore); 00122 rc = zmq_recv (req, buff, 3, 0); 00123 assert (rc == 3); 00124 assert (memcmp (buff, "JKL", 3) == 0); 00125 rc = zmq_getsockopt (req, ZMQ_RCVMORE, &rcvmore, &sz); 00126 assert (rc == 0); 00127 assert (!rcvmore); 00128 00129 // Clean up. 00130 rc = zmq_close (req); 00131 assert (rc == 0); 00132 rc = zmq_close (rep); 00133 assert (rc == 0); 00134 rc = zmq_close (xrep); 00135 assert (rc == 0); 00136 rc = zmq_close (xreq); 00137 assert (rc == 0); 00138 rc = zmq_term (ctx); 00139 assert (rc == 0); 00140 00141 return 0 ; 00142 }