libzmq master
The Intelligent Transport Layer

dist.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2011 250bpm s.r.o.
00003     Copyright (c) 2011 VMware, Inc.
00004     Copyright (c) 2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include "dist.hpp"
00023 #include "pipe.hpp"
00024 #include "err.hpp"
00025 #include "msg.hpp"
00026 #include "likely.hpp"
00027 
00028 zmq::dist_t::dist_t () :
00029     matching (0),
00030     active (0),
00031     eligible (0),
00032     more (false)
00033 {
00034 }
00035 
00036 zmq::dist_t::~dist_t ()
00037 {
00038     zmq_assert (pipes.empty ());
00039 }
00040 
00041 void zmq::dist_t::attach (pipe_t *pipe_)
00042 {
00043     //  If we are in the middle of sending a message, we'll add new pipe
00044     //  into the list of eligible pipes. Otherwise we add it to the list
00045     //  of active pipes.
00046     if (more) {
00047         pipes.push_back (pipe_);
00048         pipes.swap (eligible, pipes.size () - 1);
00049         eligible++;
00050     }
00051     else {
00052         pipes.push_back (pipe_);
00053         pipes.swap (active, pipes.size () - 1);
00054         active++;
00055         eligible++;
00056     }
00057 }
00058 
00059 void zmq::dist_t::match (pipe_t *pipe_)
00060 {
00061     //  If pipe is already matching do nothing.
00062     if (pipes.index (pipe_) < matching)
00063         return;
00064 
00065     //  If the pipe isn't eligible, ignore it.
00066     if (pipes.index (pipe_) >= eligible)
00067         return;
00068 
00069     //  Mark the pipe as matching.
00070     pipes.swap (pipes.index (pipe_), matching);
00071     matching++;    
00072 }
00073 
00074 void zmq::dist_t::unmatch ()
00075 {
00076     matching = 0;
00077 }
00078 
00079 void zmq::dist_t::terminated (pipe_t *pipe_)
00080 {
00081     //  Remove the pipe from the list; adjust number of matching, active and/or
00082     //  eligible pipes accordingly.
00083     if (pipes.index (pipe_) < matching)
00084         matching--;
00085     if (pipes.index (pipe_) < active)
00086         active--;
00087     if (pipes.index (pipe_) < eligible)
00088         eligible--;
00089     pipes.erase (pipe_);
00090 }
00091 
00092 void zmq::dist_t::activated (pipe_t *pipe_)
00093 {
00094     //  Move the pipe from passive to eligible state.
00095     pipes.swap (pipes.index (pipe_), eligible);
00096     eligible++;
00097 
00098     //  If there's no message being sent at the moment, move it to
00099     //  the active state.
00100     if (!more) {
00101         pipes.swap (eligible - 1, active);
00102         active++;
00103     }
00104 }
00105 
00106 int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
00107 {
00108     matching = active;
00109     return send_to_matching (msg_, flags_);
00110 }
00111 
00112 int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
00113 {
00114     //  Is this end of a multipart message?
00115     bool msg_more = msg_->flags () & msg_t::more ? true : false;
00116 
00117     //  Push the message to matching pipes.
00118     distribute (msg_, flags_);
00119 
00120     //  If mutlipart message is fully sent, activate all the eligible pipes.
00121     if (!msg_more)
00122         active = eligible;
00123 
00124     more = msg_more;
00125 
00126     return 0;
00127 }
00128 
00129 void zmq::dist_t::distribute (msg_t *msg_, int flags_)
00130 {
00131     //  If there are no matching pipes available, simply drop the message.
00132     if (matching == 0) {
00133         int rc = msg_->close ();
00134         errno_assert (rc == 0);
00135         rc = msg_->init ();
00136         zmq_assert (rc == 0);
00137         return;
00138     }
00139 
00140     if (msg_->is_vsm ()) {
00141         for (pipes_t::size_type i = 0; i < matching; ++i)
00142             write (pipes [i], msg_);
00143         int rc = msg_->close();
00144         errno_assert (rc == 0);
00145         rc = msg_->init ();
00146         errno_assert (rc == 0);
00147         return;
00148     }
00149 
00150     //  Add matching-1 references to the message. We already hold one reference,
00151     //  that's why -1.
00152     msg_->add_refs ((int) matching - 1);
00153 
00154     //  Push copy of the message to each matching pipe.
00155     int failed = 0;
00156     for (pipes_t::size_type i = 0; i < matching; ++i)
00157         if (!write (pipes [i], msg_))
00158             ++failed;
00159     if (unlikely (failed))
00160         msg_->rm_refs (failed);
00161 
00162     //  Detach the original message from the data buffer. Note that we don't
00163     //  close the message. That's because we've already used all the references.
00164     int rc = msg_->init ();
00165     errno_assert (rc == 0);
00166 }
00167 
00168 bool zmq::dist_t::has_out ()
00169 {
00170     return true;
00171 }
00172 
00173 bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
00174 {
00175     if (!pipe_->write (msg_)) {
00176         pipes.swap (pipes.index (pipe_), matching - 1);
00177         matching--;
00178         pipes.swap (pipes.index (pipe_), active - 1);
00179         active--;
00180         pipes.swap (active, eligible - 1);
00181         eligible--;
00182         return false;
00183     }
00184     if (!(msg_->flags () & msg_t::more))
00185         pipe_->flush ();
00186     return true;
00187 }
00188 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines