![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2011 iMatix Corporation 00004 Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <assert.h> 00023 #include <stdio.h> 00024 00025 #include "../include/zmq.h" 00026 #include "../include/zmq_utils.h" 00027 00028 int main (int argc, char *argv []) 00029 { 00030 fprintf (stderr, "test_sub_forward running...\n"); 00031 00032 void *ctx = zmq_init (1); 00033 assert (ctx); 00034 00035 // First, create an intermediate device. 00036 void *xpub = zmq_socket (ctx, ZMQ_XPUB); 00037 assert (xpub); 00038 int rc = zmq_bind (xpub, "tcp://127.0.0.1:5560"); 00039 assert (rc == 0); 00040 void *xsub = zmq_socket (ctx, ZMQ_XSUB); 00041 assert (xsub); 00042 rc = zmq_bind (xsub, "tcp://127.0.0.1:5561"); 00043 assert (rc == 0); 00044 00045 // Create a publisher. 00046 void *pub = zmq_socket (ctx, ZMQ_PUB); 00047 assert (pub); 00048 rc = zmq_connect (pub, "tcp://127.0.0.1:5561"); 00049 assert (rc == 0); 00050 00051 // Create a subscriber. 00052 void *sub = zmq_socket (ctx, ZMQ_SUB); 00053 assert (sub); 00054 rc = zmq_connect (sub, "tcp://127.0.0.1:5560"); 00055 assert (rc == 0); 00056 00057 // Subscribe for all messages. 00058 rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0); 00059 assert (rc == 0); 00060 00061 // Pass the subscription upstream through the device. 00062 char buff [32]; 00063 rc = zmq_recv (xpub, buff, sizeof (buff), 0); 00064 assert (rc >= 0); 00065 rc = zmq_send (xsub, buff, rc, 0); 00066 assert (rc >= 0); 00067 00068 // Wait a bit till the subscription gets to the publisher. 00069 zmq_sleep (1); 00070 00071 // Send an empty message. 00072 rc = zmq_send (pub, NULL, 0, 0); 00073 assert (rc == 0); 00074 00075 // Pass the message downstream through the device. 00076 rc = zmq_recv (xsub, buff, sizeof (buff), 0); 00077 assert (rc >= 0); 00078 rc = zmq_send (xpub, buff, rc, 0); 00079 assert (rc >= 0); 00080 00081 // Receive the message in the subscriber. 00082 rc = zmq_recv (sub, buff, sizeof (buff), 0); 00083 assert (rc == 0); 00084 00085 // Clean up. 00086 rc = zmq_close (xpub); 00087 assert (rc == 0); 00088 rc = zmq_close (xsub); 00089 assert (rc == 0); 00090 rc = zmq_close (pub); 00091 assert (rc == 0); 00092 rc = zmq_close (sub); 00093 assert (rc == 0); 00094 rc = zmq_term (ctx); 00095 assert (rc == 0); 00096 00097 return 0 ; 00098 }