libzmq master
The Intelligent Transport Layer

xrep.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2011 iMatix Corporation
00004     Copyright (c) 2011 VMware, Inc.
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include "xrep.hpp"
00024 #include "pipe.hpp"
00025 #include "wire.hpp"
00026 #include "random.hpp"
00027 #include "likely.hpp"
00028 #include "err.hpp"
00029 
00030 zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
00031     socket_base_t (parent_, tid_),
00032     prefetched (false),
00033     more_in (false),
00034     current_out (NULL),
00035     more_out (false),
00036     next_peer_id (generate_random ())
00037 {
00038     options.type = ZMQ_XREP;
00039 
00040     //  TODO: Uncomment the following line when XREP will become true XREP
00041     //  rather than generic router socket.
00042     //  If peer disconnect there's noone to send reply to anyway. We can drop
00043     //  all the outstanding requests from that peer.
00044     //  options.delay_on_disconnect = false;
00045 
00046     options.send_identity = true;
00047     options.recv_identity = true;
00048 
00049     prefetched_msg.init ();
00050 }
00051 
00052 zmq::xrep_t::~xrep_t ()
00053 {
00054     zmq_assert (outpipes.empty ());
00055     prefetched_msg.close ();
00056 }
00057 
00058 void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
00059 {
00060     zmq_assert (pipe_);
00061 
00062     //  Generate a new unique peer identity.
00063     unsigned char buf [5];
00064     buf [0] = 0;
00065     put_uint32 (buf + 1, next_peer_id);
00066     blob_t identity (buf, 5);
00067     ++next_peer_id;
00068 
00069     //  Add the pipe to the map out outbound pipes.
00070     outpipe_t outpipe = {pipe_, true};
00071     bool ok = outpipes.insert (outpipes_t::value_type (
00072         identity, outpipe)).second;
00073     zmq_assert (ok);
00074 
00075     //  Add the pipe to the list of inbound pipes.
00076     pipe_->set_identity (identity);
00077     fq.attach (pipe_);    
00078 }
00079 
00080 void zmq::xrep_t::xterminated (pipe_t *pipe_)
00081 {
00082     fq.terminated (pipe_);
00083 
00084     for (outpipes_t::iterator it = outpipes.begin ();
00085           it != outpipes.end (); ++it) {
00086         if (it->second.pipe == pipe_) {
00087             outpipes.erase (it);
00088             if (pipe_ == current_out)
00089                 current_out = NULL;
00090             return;
00091         }
00092     }
00093     zmq_assert (false);
00094 }
00095 
00096 void zmq::xrep_t::xread_activated (pipe_t *pipe_)
00097 {
00098     fq.activated (pipe_);
00099 }
00100 
00101 void zmq::xrep_t::xwrite_activated (pipe_t *pipe_)
00102 {
00103     for (outpipes_t::iterator it = outpipes.begin ();
00104           it != outpipes.end (); ++it) {
00105         if (it->second.pipe == pipe_) {
00106             zmq_assert (!it->second.active);
00107             it->second.active = true;
00108             return;
00109         }
00110     }
00111     zmq_assert (false);
00112 }
00113 
00114 int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
00115 {
00116     //  If this is the first part of the message it's the ID of the
00117     //  peer to send the message to.
00118     if (!more_out) {
00119         zmq_assert (!current_out);
00120 
00121         //  If we have malformed message (prefix with no subsequent message)
00122         //  then just silently ignore it.
00123         //  TODO: The connections should be killed instead.
00124         if (msg_->flags () & msg_t::more) {
00125 
00126             more_out = true;
00127 
00128             //  Find the pipe associated with the identity stored in the prefix.
00129             //  If there's no such pipe just silently ignore the message.
00130             blob_t identity ((unsigned char*) msg_->data (), msg_->size ());
00131             outpipes_t::iterator it = outpipes.find (identity);
00132 
00133             if (it != outpipes.end ()) {
00134                 current_out = it->second.pipe;
00135                 msg_t empty;
00136                 int rc = empty.init ();
00137                 errno_assert (rc == 0);
00138                 if (!current_out->check_write (&empty)) {
00139                     it->second.active = false;
00140                     more_out = false;
00141                     current_out = NULL;
00142                 }
00143                 rc = empty.close ();
00144                 errno_assert (rc == 0);
00145             }
00146 
00147         }
00148 
00149         int rc = msg_->close ();
00150         errno_assert (rc == 0);
00151         rc = msg_->init ();
00152         errno_assert (rc == 0);
00153         return 0;
00154     }
00155 
00156     //  Check whether this is the last part of the message.
00157     more_out = msg_->flags () & msg_t::more ? true : false;
00158 
00159     //  Push the message into the pipe. If there's no out pipe, just drop it.
00160     if (current_out) {
00161         bool ok = current_out->write (msg_);
00162         if (unlikely (!ok))
00163             current_out = NULL;
00164         else if (!more_out) {
00165             current_out->flush ();
00166             current_out = NULL;
00167         }
00168     }
00169     else {
00170         int rc = msg_->close ();
00171         errno_assert (rc == 0);
00172     }
00173 
00174     //  Detach the message from the data buffer.
00175     int rc = msg_->init ();
00176     errno_assert (rc == 0);
00177 
00178     return 0;
00179 }
00180 
00181 int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
00182 {
00183     //  If there is a prefetched message, return it.
00184     if (prefetched) {
00185         int rc = msg_->move (prefetched_msg);
00186         errno_assert (rc == 0);
00187         more_in = msg_->flags () & msg_t::more ? true : false;
00188         prefetched = false;
00189         return 0;
00190     }
00191 
00192     //  Get next message part.
00193     pipe_t *pipe;
00194     int rc = fq.recvpipe (msg_, flags_, &pipe);
00195     if (rc != 0)
00196         return -1;
00197 
00198     //  If identity is received, change the key assigned to the pipe.
00199     if (unlikely (msg_->flags () & msg_t::identity)) {
00200         zmq_assert (!more_in);
00201 
00202         //  Empty identity means we can preserve the auto-generated identity.
00203         if (msg_->size () != 0) {
00204 
00205             //  Actual change of the identity.
00206             outpipes_t::iterator it = outpipes.begin ();
00207             while (it != outpipes.end ()) {
00208                 if (it->second.pipe == pipe) {
00209                     blob_t identity ((unsigned char*) msg_->data (),
00210                         msg_->size ());
00211                     pipe->set_identity (identity);
00212                     outpipes.erase (it);
00213                     outpipe_t outpipe = {pipe, true};
00214                     outpipes.insert (outpipes_t::value_type (identity,
00215                         outpipe));
00216                     break;
00217                 }
00218                 ++it;
00219             }
00220             zmq_assert (it != outpipes.end ());
00221         }
00222 
00223         //  After processing the identity, try to get the next message.
00224         rc = fq.recvpipe (msg_, flags_, &pipe);
00225         if (rc != 0)
00226             return -1;
00227     }
00228 
00229     //  If we are in the middle of reading a message, just return the next part.
00230     if (more_in) {
00231         more_in = msg_->flags () & msg_t::more ? true : false;
00232         return 0;
00233     }
00234  
00235     //  We are at the beginning of a new message. Move the message part we
00236     //  have to the prefetched and return the ID of the peer instead.
00237     rc = prefetched_msg.move (*msg_);
00238     errno_assert (rc == 0);
00239     prefetched = true;
00240     rc = msg_->close ();
00241     errno_assert (rc == 0);
00242 
00243     blob_t identity = pipe->get_identity ();
00244     rc = msg_->init_size (identity.size ());
00245     errno_assert (rc == 0);
00246     memcpy (msg_->data (), identity.data (), identity.size ());
00247     msg_->set_flags (msg_t::more);
00248     return 0;
00249 }
00250 
00251 int zmq::xrep_t::rollback (void)
00252 {
00253     if (current_out) {
00254         current_out->rollback ();
00255         current_out = NULL;
00256         more_out = false;
00257     }
00258     return 0;
00259 }
00260 
00261 bool zmq::xrep_t::xhas_in ()
00262 {
00263     if (prefetched)
00264         return true;
00265     return fq.has_in ();
00266 }
00267 
00268 bool zmq::xrep_t::xhas_out ()
00269 {
00270     //  In theory, XREP socket is always ready for writing. Whether actual
00271     //  attempt to write succeeds depends on whitch pipe the message is going
00272     //  to be routed to.
00273     return true;
00274 }
00275 
00276 zmq::xrep_session_t::xrep_session_t (io_thread_t *io_thread_, bool connect_,
00277       socket_base_t *socket_, const options_t &options_,
00278       const char *protocol_, const char *address_) :
00279     session_base_t (io_thread_, connect_, socket_, options_, protocol_,
00280         address_)
00281 {
00282 }
00283 
00284 zmq::xrep_session_t::~xrep_session_t ()
00285 {
00286 }
00287 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines