![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2011 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "xrep.hpp" 00024 #include "pipe.hpp" 00025 #include "wire.hpp" 00026 #include "random.hpp" 00027 #include "likely.hpp" 00028 #include "err.hpp" 00029 00030 zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : 00031 socket_base_t (parent_, tid_), 00032 prefetched (false), 00033 more_in (false), 00034 current_out (NULL), 00035 more_out (false), 00036 next_peer_id (generate_random ()) 00037 { 00038 options.type = ZMQ_XREP; 00039 00040 // TODO: Uncomment the following line when XREP will become true XREP 00041 // rather than generic router socket. 00042 // If peer disconnect there's noone to send reply to anyway. We can drop 00043 // all the outstanding requests from that peer. 00044 // options.delay_on_disconnect = false; 00045 00046 options.send_identity = true; 00047 options.recv_identity = true; 00048 00049 prefetched_msg.init (); 00050 } 00051 00052 zmq::xrep_t::~xrep_t () 00053 { 00054 zmq_assert (outpipes.empty ()); 00055 prefetched_msg.close (); 00056 } 00057 00058 void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) 00059 { 00060 zmq_assert (pipe_); 00061 00062 // Generate a new unique peer identity. 00063 unsigned char buf [5]; 00064 buf [0] = 0; 00065 put_uint32 (buf + 1, next_peer_id); 00066 blob_t identity (buf, 5); 00067 ++next_peer_id; 00068 00069 // Add the pipe to the map out outbound pipes. 00070 outpipe_t outpipe = {pipe_, true}; 00071 bool ok = outpipes.insert (outpipes_t::value_type ( 00072 identity, outpipe)).second; 00073 zmq_assert (ok); 00074 00075 // Add the pipe to the list of inbound pipes. 00076 pipe_->set_identity (identity); 00077 fq.attach (pipe_); 00078 } 00079 00080 void zmq::xrep_t::xterminated (pipe_t *pipe_) 00081 { 00082 fq.terminated (pipe_); 00083 00084 for (outpipes_t::iterator it = outpipes.begin (); 00085 it != outpipes.end (); ++it) { 00086 if (it->second.pipe == pipe_) { 00087 outpipes.erase (it); 00088 if (pipe_ == current_out) 00089 current_out = NULL; 00090 return; 00091 } 00092 } 00093 zmq_assert (false); 00094 } 00095 00096 void zmq::xrep_t::xread_activated (pipe_t *pipe_) 00097 { 00098 fq.activated (pipe_); 00099 } 00100 00101 void zmq::xrep_t::xwrite_activated (pipe_t *pipe_) 00102 { 00103 for (outpipes_t::iterator it = outpipes.begin (); 00104 it != outpipes.end (); ++it) { 00105 if (it->second.pipe == pipe_) { 00106 zmq_assert (!it->second.active); 00107 it->second.active = true; 00108 return; 00109 } 00110 } 00111 zmq_assert (false); 00112 } 00113 00114 int zmq::xrep_t::xsend (msg_t *msg_, int flags_) 00115 { 00116 // If this is the first part of the message it's the ID of the 00117 // peer to send the message to. 00118 if (!more_out) { 00119 zmq_assert (!current_out); 00120 00121 // If we have malformed message (prefix with no subsequent message) 00122 // then just silently ignore it. 00123 // TODO: The connections should be killed instead. 00124 if (msg_->flags () & msg_t::more) { 00125 00126 more_out = true; 00127 00128 // Find the pipe associated with the identity stored in the prefix. 00129 // If there's no such pipe just silently ignore the message. 00130 blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); 00131 outpipes_t::iterator it = outpipes.find (identity); 00132 00133 if (it != outpipes.end ()) { 00134 current_out = it->second.pipe; 00135 msg_t empty; 00136 int rc = empty.init (); 00137 errno_assert (rc == 0); 00138 if (!current_out->check_write (&empty)) { 00139 it->second.active = false; 00140 more_out = false; 00141 current_out = NULL; 00142 } 00143 rc = empty.close (); 00144 errno_assert (rc == 0); 00145 } 00146 00147 } 00148 00149 int rc = msg_->close (); 00150 errno_assert (rc == 0); 00151 rc = msg_->init (); 00152 errno_assert (rc == 0); 00153 return 0; 00154 } 00155 00156 // Check whether this is the last part of the message. 00157 more_out = msg_->flags () & msg_t::more ? true : false; 00158 00159 // Push the message into the pipe. If there's no out pipe, just drop it. 00160 if (current_out) { 00161 bool ok = current_out->write (msg_); 00162 if (unlikely (!ok)) 00163 current_out = NULL; 00164 else if (!more_out) { 00165 current_out->flush (); 00166 current_out = NULL; 00167 } 00168 } 00169 else { 00170 int rc = msg_->close (); 00171 errno_assert (rc == 0); 00172 } 00173 00174 // Detach the message from the data buffer. 00175 int rc = msg_->init (); 00176 errno_assert (rc == 0); 00177 00178 return 0; 00179 } 00180 00181 int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) 00182 { 00183 // If there is a prefetched message, return it. 00184 if (prefetched) { 00185 int rc = msg_->move (prefetched_msg); 00186 errno_assert (rc == 0); 00187 more_in = msg_->flags () & msg_t::more ? true : false; 00188 prefetched = false; 00189 return 0; 00190 } 00191 00192 // Get next message part. 00193 pipe_t *pipe; 00194 int rc = fq.recvpipe (msg_, flags_, &pipe); 00195 if (rc != 0) 00196 return -1; 00197 00198 // If identity is received, change the key assigned to the pipe. 00199 if (unlikely (msg_->flags () & msg_t::identity)) { 00200 zmq_assert (!more_in); 00201 00202 // Empty identity means we can preserve the auto-generated identity. 00203 if (msg_->size () != 0) { 00204 00205 // Actual change of the identity. 00206 outpipes_t::iterator it = outpipes.begin (); 00207 while (it != outpipes.end ()) { 00208 if (it->second.pipe == pipe) { 00209 blob_t identity ((unsigned char*) msg_->data (), 00210 msg_->size ()); 00211 pipe->set_identity (identity); 00212 outpipes.erase (it); 00213 outpipe_t outpipe = {pipe, true}; 00214 outpipes.insert (outpipes_t::value_type (identity, 00215 outpipe)); 00216 break; 00217 } 00218 ++it; 00219 } 00220 zmq_assert (it != outpipes.end ()); 00221 } 00222 00223 // After processing the identity, try to get the next message. 00224 rc = fq.recvpipe (msg_, flags_, &pipe); 00225 if (rc != 0) 00226 return -1; 00227 } 00228 00229 // If we are in the middle of reading a message, just return the next part. 00230 if (more_in) { 00231 more_in = msg_->flags () & msg_t::more ? true : false; 00232 return 0; 00233 } 00234 00235 // We are at the beginning of a new message. Move the message part we 00236 // have to the prefetched and return the ID of the peer instead. 00237 rc = prefetched_msg.move (*msg_); 00238 errno_assert (rc == 0); 00239 prefetched = true; 00240 rc = msg_->close (); 00241 errno_assert (rc == 0); 00242 00243 blob_t identity = pipe->get_identity (); 00244 rc = msg_->init_size (identity.size ()); 00245 errno_assert (rc == 0); 00246 memcpy (msg_->data (), identity.data (), identity.size ()); 00247 msg_->set_flags (msg_t::more); 00248 return 0; 00249 } 00250 00251 int zmq::xrep_t::rollback (void) 00252 { 00253 if (current_out) { 00254 current_out->rollback (); 00255 current_out = NULL; 00256 more_out = false; 00257 } 00258 return 0; 00259 } 00260 00261 bool zmq::xrep_t::xhas_in () 00262 { 00263 if (prefetched) 00264 return true; 00265 return fq.has_in (); 00266 } 00267 00268 bool zmq::xrep_t::xhas_out () 00269 { 00270 // In theory, XREP socket is always ready for writing. Whether actual 00271 // attempt to write succeeds depends on whitch pipe the message is going 00272 // to be routed to. 00273 return true; 00274 } 00275 00276 zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_, 00277 socket_base_t *socket_, const options_t &options_, 00278 const char *protocol_, const char *address_) : 00279 session_base_t (io_thread_, connect_, socket_, options_, protocol_, 00280 address_) 00281 { 00282 } 00283 00284 zmq::xrep_session_t::~xrep_session_t () 00285 { 00286 } 00287