![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "lb.hpp" 00024 #include "pipe.hpp" 00025 #include "err.hpp" 00026 #include "msg.hpp" 00027 00028 zmq::lb_t::lb_t () : 00029 active (0), 00030 current (0), 00031 more (false), 00032 dropping (false) 00033 { 00034 } 00035 00036 zmq::lb_t::~lb_t () 00037 { 00038 zmq_assert (pipes.empty ()); 00039 } 00040 00041 void zmq::lb_t::attach (pipe_t *pipe_) 00042 { 00043 pipes.push_back (pipe_); 00044 pipes.swap (active, pipes.size () - 1); 00045 active++; 00046 } 00047 00048 void zmq::lb_t::terminated (pipe_t *pipe_) 00049 { 00050 pipes_t::size_type index = pipes.index (pipe_); 00051 00052 // If we are in the middle of multipart message and current pipe 00053 // have disconnected, we have to drop the remainder of the message. 00054 if (index == current && more) 00055 dropping = true; 00056 00057 // Remove the pipe from the list; adjust number of active pipes 00058 // accordingly. 00059 if (index < active) { 00060 active--; 00061 if (current == active) 00062 current = 0; 00063 } 00064 pipes.erase (pipe_); 00065 } 00066 00067 void zmq::lb_t::activated (pipe_t *pipe_) 00068 { 00069 // Move the pipe to the list of active pipes. 00070 pipes.swap (pipes.index (pipe_), active); 00071 active++; 00072 } 00073 00074 int zmq::lb_t::send (msg_t *msg_, int flags_) 00075 { 00076 // Drop the message if required. If we are at the end of the message 00077 // switch back to non-dropping mode. 00078 if (dropping) { 00079 00080 more = msg_->flags () & msg_t::more ? true : false; 00081 if (!more) 00082 dropping = false; 00083 00084 int rc = msg_->close (); 00085 errno_assert (rc == 0); 00086 rc = msg_->init (); 00087 zmq_assert (rc == 0); 00088 return 0; 00089 } 00090 00091 while (active > 0) { 00092 if (pipes [current]->write (msg_)) { 00093 more = msg_->flags () & msg_t::more ? true : false; 00094 break; 00095 } 00096 00097 zmq_assert (!more); 00098 active--; 00099 if (current < active) 00100 pipes.swap (current, active); 00101 else 00102 current = 0; 00103 } 00104 00105 // If there are no pipes we cannot send the message. 00106 if (active == 0) { 00107 errno = EAGAIN; 00108 return -1; 00109 } 00110 00111 // If it's final part of the message we can fluch it downstream and 00112 // continue round-robinning (load balance). 00113 if (!more) { 00114 pipes [current]->flush (); 00115 current = (current + 1) % active; 00116 } 00117 00118 // Detach the message from the data buffer. 00119 int rc = msg_->init (); 00120 errno_assert (rc == 0); 00121 00122 return 0; 00123 } 00124 00125 bool zmq::lb_t::has_out () 00126 { 00127 // If one part of the message was already written we can definitely 00128 // write the rest of the message. 00129 if (more) 00130 return true; 00131 00132 while (active > 0) { 00133 00134 // Check whether zero-sized message can be written to the pipe. 00135 msg_t msg; 00136 int rc = msg.init (); 00137 errno_assert (rc == 0); 00138 if (pipes [current]->check_write (&msg)) { 00139 rc = msg.close (); 00140 errno_assert (rc == 0); 00141 return true; 00142 } 00143 rc = msg.close (); 00144 errno_assert (rc == 0); 00145 00146 // Deactivate the pipe. 00147 active--; 00148 pipes.swap (current, active); 00149 if (current == active) 00150 current = 0; 00151 } 00152 00153 return false; 00154 } 00155