![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2011 VMware, Inc. 00004 Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <string.h> 00023 00024 #include "xpub.hpp" 00025 #include "pipe.hpp" 00026 #include "err.hpp" 00027 #include "msg.hpp" 00028 00029 zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : 00030 socket_base_t (parent_, tid_), 00031 more (false) 00032 { 00033 options.type = ZMQ_XPUB; 00034 } 00035 00036 zmq::xpub_t::~xpub_t () 00037 { 00038 } 00039 00040 void zmq::xpub_t::xattach_pipe (pipe_t *pipe_) 00041 { 00042 zmq_assert (pipe_); 00043 dist.attach (pipe_); 00044 00045 // The pipe is active when attached. Let's read the subscriptions from 00046 // it, if any. 00047 xread_activated (pipe_); 00048 } 00049 00050 void zmq::xpub_t::xread_activated (pipe_t *pipe_) 00051 { 00052 // There are some subscriptions waiting. Let's process them. 00053 msg_t sub; 00054 sub.init (); 00055 while (true) { 00056 00057 // Grab next subscription. 00058 if (!pipe_->read (&sub)) { 00059 sub.close (); 00060 return; 00061 } 00062 00063 // Apply the subscription to the trie. 00064 unsigned char *data = (unsigned char*) sub.data (); 00065 size_t size = sub.size (); 00066 zmq_assert (size > 0 && (*data == 0 || *data == 1)); 00067 bool unique; 00068 if (*data == 0) 00069 unique = subscriptions.rm (data + 1, size - 1, pipe_); 00070 else 00071 unique = subscriptions.add (data + 1, size - 1, pipe_); 00072 00073 // If the subscription is not a duplicate store it so that it can be 00074 // passed to used on next recv call. 00075 if (unique && options.type != ZMQ_PUB) 00076 pending.push_back (blob_t ((unsigned char*) sub.data (), 00077 sub.size ())); 00078 } 00079 } 00080 00081 void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) 00082 { 00083 dist.activated (pipe_); 00084 } 00085 00086 void zmq::xpub_t::xterminated (pipe_t *pipe_) 00087 { 00088 // Remove the pipe from the trie. If there are topics that nobody 00089 // is interested in anymore, send corresponding unsubscriptions 00090 // upstream. 00091 subscriptions.rm (pipe_, send_unsubscription, this); 00092 00093 dist.terminated (pipe_); 00094 } 00095 00096 void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) 00097 { 00098 xpub_t *self = (xpub_t*) arg_; 00099 self->dist.match (pipe_); 00100 } 00101 00102 int zmq::xpub_t::xsend (msg_t *msg_, int flags_) 00103 { 00104 bool msg_more = msg_->flags () & msg_t::more ? true : false; 00105 00106 // For the first part of multi-part message, find the matching pipes. 00107 if (!more) 00108 subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), 00109 mark_as_matching, this); 00110 00111 // Send the message to all the pipes that were marked as matching 00112 // in the previous step. 00113 int rc = dist.send_to_matching (msg_, flags_); 00114 if (rc != 0) 00115 return rc; 00116 00117 // If we are at the end of multi-part message we can mark all the pipes 00118 // as non-matching. 00119 if (!msg_more) 00120 dist.unmatch (); 00121 00122 more = msg_more; 00123 00124 return 0; 00125 } 00126 00127 bool zmq::xpub_t::xhas_out () 00128 { 00129 return dist.has_out (); 00130 } 00131 00132 int zmq::xpub_t::xrecv (msg_t *msg_, int flags_) 00133 { 00134 // If there is at least one 00135 if (pending.empty ()) { 00136 errno = EAGAIN; 00137 return -1; 00138 } 00139 00140 int rc = msg_->close (); 00141 errno_assert (rc == 0); 00142 rc = msg_->init_size (pending.front ().size ()); 00143 errno_assert (rc == 0); 00144 memcpy (msg_->data (), pending.front ().data (), 00145 pending.front ().size ()); 00146 pending.pop_front (); 00147 return 0; 00148 } 00149 00150 bool zmq::xpub_t::xhas_in () 00151 { 00152 return !pending.empty (); 00153 } 00154 00155 void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, 00156 void *arg_) 00157 { 00158 xpub_t *self = (xpub_t*) arg_; 00159 00160 if (self->options.type != ZMQ_PUB) { 00161 00162 // Place the unsubscription to the queue of pending (un)sunscriptions 00163 // to be retrived by the user later on. 00164 xpub_t *self = (xpub_t*) arg_; 00165 blob_t unsub (size_ + 1, 0); 00166 unsub [0] = 0; 00167 memcpy (&unsub [1], data_, size_); 00168 self->pending.push_back (unsub); 00169 } 00170 } 00171 00172 zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_, 00173 socket_base_t *socket_, const options_t &options_, 00174 const char *protocol_, const char *address_) : 00175 session_base_t (io_thread_, connect_, socket_, options_, protocol_, 00176 address_) 00177 { 00178 } 00179 00180 zmq::xpub_session_t::~xpub_session_t () 00181 { 00182 } 00183