libzmq master
The Intelligent Transport Layer

lb.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2010-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2011 VMware, Inc.
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include "lb.hpp"
00024 #include "pipe.hpp"
00025 #include "err.hpp"
00026 #include "msg.hpp"
00027 
00028 zmq::lb_t::lb_t () :
00029     active (0),
00030     current (0),
00031     more (false),
00032     dropping (false)
00033 {
00034 }
00035 
00036 zmq::lb_t::~lb_t ()
00037 {
00038     zmq_assert (pipes.empty ());
00039 }
00040 
00041 void zmq::lb_t::attach (pipe_t *pipe_)
00042 {
00043     pipes.push_back (pipe_);
00044     pipes.swap (active, pipes.size () - 1);
00045     active++;
00046 }
00047 
00048 void zmq::lb_t::terminated (pipe_t *pipe_)
00049 {
00050     pipes_t::size_type index = pipes.index (pipe_);
00051 
00052     //  If we are in the middle of multipart message and current pipe
00053     //  have disconnected, we have to drop the remainder of the message.
00054     if (index == current && more)
00055         dropping = true;
00056 
00057     //  Remove the pipe from the list; adjust number of active pipes
00058     //  accordingly.
00059     if (index < active) {
00060         active--;
00061         if (current == active)
00062             current = 0;
00063     }
00064     pipes.erase (pipe_);
00065 }
00066 
00067 void zmq::lb_t::activated (pipe_t *pipe_)
00068 {
00069     //  Move the pipe to the list of active pipes.
00070     pipes.swap (pipes.index (pipe_), active);
00071     active++;
00072 }
00073 
00074 int zmq::lb_t::send (msg_t *msg_, int flags_)
00075 {
00076     //  Drop the message if required. If we are at the end of the message
00077     //  switch back to non-dropping mode.
00078     if (dropping) {
00079 
00080         more = msg_->flags () & msg_t::more ? true : false;
00081         if (!more)
00082             dropping = false;
00083 
00084         int rc = msg_->close ();
00085         errno_assert (rc == 0);
00086         rc = msg_->init ();
00087         zmq_assert (rc == 0);
00088         return 0;
00089     }
00090 
00091     while (active > 0) {
00092         if (pipes [current]->write (msg_)) {
00093             more = msg_->flags () & msg_t::more ? true : false;
00094             break;
00095         }
00096 
00097         zmq_assert (!more);
00098         active--;
00099         if (current < active)
00100             pipes.swap (current, active);
00101         else
00102             current = 0;
00103     }
00104 
00105     //  If there are no pipes we cannot send the message.
00106     if (active == 0) {
00107         errno = EAGAIN;
00108         return -1;
00109     }
00110 
00111     //  If it's final part of the message we can fluch it downstream and
00112     //  continue round-robinning (load balance).
00113     if (!more) {
00114         pipes [current]->flush ();
00115         current = (current + 1) % active;
00116     }
00117 
00118     //  Detach the message from the data buffer.
00119     int rc = msg_->init ();
00120     errno_assert (rc == 0);
00121 
00122     return 0;
00123 }
00124 
00125 bool zmq::lb_t::has_out ()
00126 {
00127     //  If one part of the message was already written we can definitely
00128     //  write the rest of the message.
00129     if (more)
00130         return true;
00131 
00132     while (active > 0) {
00133 
00134         //  Check whether zero-sized message can be written to the pipe.
00135         msg_t msg;
00136         int rc = msg.init ();
00137         errno_assert (rc == 0);
00138         if (pipes [current]->check_write (&msg)) {
00139             rc = msg.close ();
00140             errno_assert (rc == 0);
00141             return true;
00142         }
00143         rc = msg.close ();
00144         errno_assert (rc == 0);
00145 
00146         //  Deactivate the pipe.
00147         active--;
00148         pipes.swap (current, active);
00149         if (current == active)
00150             current = 0;
00151     }
00152 
00153     return false;
00154 }
00155 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines