![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2011 VMware, Inc. 00004 Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <string.h> 00023 00024 #include "xsub.hpp" 00025 #include "err.hpp" 00026 00027 zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : 00028 socket_base_t (parent_, tid_), 00029 has_message (false), 00030 more (false) 00031 { 00032 options.type = ZMQ_XSUB; 00033 int rc = message.init (); 00034 errno_assert (rc == 0); 00035 } 00036 00037 zmq::xsub_t::~xsub_t () 00038 { 00039 int rc = message.close (); 00040 errno_assert (rc == 0); 00041 } 00042 00043 void zmq::xsub_t::xattach_pipe (pipe_t *pipe_) 00044 { 00045 zmq_assert (pipe_); 00046 fq.attach (pipe_); 00047 dist.attach (pipe_); 00048 00049 // Send all the cached subscriptions to the new upstream peer. 00050 subscriptions.apply (send_subscription, pipe_); 00051 pipe_->flush (); 00052 } 00053 00054 void zmq::xsub_t::xread_activated (pipe_t *pipe_) 00055 { 00056 fq.activated (pipe_); 00057 } 00058 00059 void zmq::xsub_t::xwrite_activated (pipe_t *pipe_) 00060 { 00061 dist.activated (pipe_); 00062 } 00063 00064 void zmq::xsub_t::xterminated (pipe_t *pipe_) 00065 { 00066 fq.terminated (pipe_); 00067 dist.terminated (pipe_); 00068 } 00069 00070 void zmq::xsub_t::xhiccuped (pipe_t *pipe_) 00071 { 00072 // Send all the cached subscriptions to the hiccuped pipe. 00073 subscriptions.apply (send_subscription, pipe_); 00074 pipe_->flush (); 00075 } 00076 00077 int zmq::xsub_t::xsend (msg_t *msg_, int flags_) 00078 { 00079 size_t size = msg_->size (); 00080 unsigned char *data = (unsigned char*) msg_->data (); 00081 00082 // Malformed subscriptions. 00083 if (size < 1 || (*data != 0 && *data != 1)) { 00084 errno = EINVAL; 00085 return -1; 00086 } 00087 00088 // Process the subscription. 00089 if (*data == 1) { 00090 if (subscriptions.add (data + 1, size - 1)) 00091 return dist.send_to_all (msg_, flags_); 00092 else 00093 return 0; 00094 } 00095 else if (*data == 0) { 00096 if (subscriptions.rm (data + 1, size - 1)) 00097 return dist.send_to_all (msg_, flags_); 00098 else 00099 return 0; 00100 } 00101 00102 zmq_assert (false); 00103 return -1; 00104 } 00105 00106 bool zmq::xsub_t::xhas_out () 00107 { 00108 // Subscription can be added/removed anytime. 00109 return true; 00110 } 00111 00112 int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) 00113 { 00114 // If there's already a message prepared by a previous call to zmq_poll, 00115 // return it straight ahead. 00116 if (has_message) { 00117 int rc = msg_->move (message); 00118 errno_assert (rc == 0); 00119 has_message = false; 00120 more = msg_->flags () & msg_t::more ? true : false; 00121 return 0; 00122 } 00123 00124 // TODO: This can result in infinite loop in the case of continuous 00125 // stream of non-matching messages which breaks the non-blocking recv 00126 // semantics. 00127 while (true) { 00128 00129 // Get a message using fair queueing algorithm. 00130 int rc = fq.recv (msg_, flags_); 00131 00132 // If there's no message available, return immediately. 00133 // The same when error occurs. 00134 if (rc != 0) 00135 return -1; 00136 00137 // Check whether the message matches at least one subscription. 00138 // Non-initial parts of the message are passed 00139 if (more || !options.filter || match (msg_)) { 00140 more = msg_->flags () & msg_t::more ? true : false; 00141 return 0; 00142 } 00143 00144 // Message doesn't match. Pop any remaining parts of the message 00145 // from the pipe. 00146 while (msg_->flags () & msg_t::more) { 00147 rc = fq.recv (msg_, ZMQ_DONTWAIT); 00148 zmq_assert (rc == 0); 00149 } 00150 } 00151 } 00152 00153 bool zmq::xsub_t::xhas_in () 00154 { 00155 // There are subsequent parts of the partly-read message available. 00156 if (more) 00157 return true; 00158 00159 // If there's already a message prepared by a previous call to zmq_poll, 00160 // return straight ahead. 00161 if (has_message) 00162 return true; 00163 00164 // TODO: This can result in infinite loop in the case of continuous 00165 // stream of non-matching messages. 00166 while (true) { 00167 00168 // Get a message using fair queueing algorithm. 00169 int rc = fq.recv (&message, ZMQ_DONTWAIT); 00170 00171 // If there's no message available, return immediately. 00172 // The same when error occurs. 00173 if (rc != 0) { 00174 zmq_assert (errno == EAGAIN); 00175 return false; 00176 } 00177 00178 // Check whether the message matches at least one subscription. 00179 if (!options.filter || match (&message)) { 00180 has_message = true; 00181 return true; 00182 } 00183 00184 // Message doesn't match. Pop any remaining parts of the message 00185 // from the pipe. 00186 while (message.flags () & msg_t::more) { 00187 rc = fq.recv (&message, ZMQ_DONTWAIT); 00188 zmq_assert (rc == 0); 00189 } 00190 } 00191 } 00192 00193 bool zmq::xsub_t::match (msg_t *msg_) 00194 { 00195 return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); 00196 } 00197 00198 void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_, 00199 void *arg_) 00200 { 00201 pipe_t *pipe = (pipe_t*) arg_; 00202 00203 // Create the subsctription message. 00204 msg_t msg; 00205 int rc = msg.init_size (size_ + 1); 00206 zmq_assert (rc == 0); 00207 unsigned char *data = (unsigned char*) msg.data (); 00208 data [0] = 1; 00209 memcpy (data + 1, data_, size_); 00210 00211 // Send it to the pipe. 00212 bool sent = pipe->write (&msg); 00213 zmq_assert (sent); 00214 } 00215 00216 zmq::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_, 00217 socket_base_t *socket_, const options_t &options_, 00218 const char *protocol_, const char *address_) : 00219 session_base_t (io_thread_, connect_, socket_, options_, protocol_, 00220 address_) 00221 { 00222 } 00223 00224 zmq::xsub_session_t::~xsub_session_t () 00225 { 00226 } 00227