![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2011 250bpm s.r.o. 00003 Copyright (c) 2011 VMware, Inc. 00004 Copyright (c) 2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "dist.hpp" 00023 #include "pipe.hpp" 00024 #include "err.hpp" 00025 #include "msg.hpp" 00026 #include "likely.hpp" 00027 00028 zmq::dist_t::dist_t () : 00029 matching (0), 00030 active (0), 00031 eligible (0), 00032 more (false) 00033 { 00034 } 00035 00036 zmq::dist_t::~dist_t () 00037 { 00038 zmq_assert (pipes.empty ()); 00039 } 00040 00041 void zmq::dist_t::attach (pipe_t *pipe_) 00042 { 00043 // If we are in the middle of sending a message, we'll add new pipe 00044 // into the list of eligible pipes. Otherwise we add it to the list 00045 // of active pipes. 00046 if (more) { 00047 pipes.push_back (pipe_); 00048 pipes.swap (eligible, pipes.size () - 1); 00049 eligible++; 00050 } 00051 else { 00052 pipes.push_back (pipe_); 00053 pipes.swap (active, pipes.size () - 1); 00054 active++; 00055 eligible++; 00056 } 00057 } 00058 00059 void zmq::dist_t::match (pipe_t *pipe_) 00060 { 00061 // If pipe is already matching do nothing. 00062 if (pipes.index (pipe_) < matching) 00063 return; 00064 00065 // If the pipe isn't eligible, ignore it. 00066 if (pipes.index (pipe_) >= eligible) 00067 return; 00068 00069 // Mark the pipe as matching. 00070 pipes.swap (pipes.index (pipe_), matching); 00071 matching++; 00072 } 00073 00074 void zmq::dist_t::unmatch () 00075 { 00076 matching = 0; 00077 } 00078 00079 void zmq::dist_t::terminated (pipe_t *pipe_) 00080 { 00081 // Remove the pipe from the list; adjust number of matching, active and/or 00082 // eligible pipes accordingly. 00083 if (pipes.index (pipe_) < matching) 00084 matching--; 00085 if (pipes.index (pipe_) < active) 00086 active--; 00087 if (pipes.index (pipe_) < eligible) 00088 eligible--; 00089 pipes.erase (pipe_); 00090 } 00091 00092 void zmq::dist_t::activated (pipe_t *pipe_) 00093 { 00094 // Move the pipe from passive to eligible state. 00095 pipes.swap (pipes.index (pipe_), eligible); 00096 eligible++; 00097 00098 // If there's no message being sent at the moment, move it to 00099 // the active state. 00100 if (!more) { 00101 pipes.swap (eligible - 1, active); 00102 active++; 00103 } 00104 } 00105 00106 int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) 00107 { 00108 matching = active; 00109 return send_to_matching (msg_, flags_); 00110 } 00111 00112 int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) 00113 { 00114 // Is this end of a multipart message? 00115 bool msg_more = msg_->flags () & msg_t::more ? true : false; 00116 00117 // Push the message to matching pipes. 00118 distribute (msg_, flags_); 00119 00120 // If mutlipart message is fully sent, activate all the eligible pipes. 00121 if (!msg_more) 00122 active = eligible; 00123 00124 more = msg_more; 00125 00126 return 0; 00127 } 00128 00129 void zmq::dist_t::distribute (msg_t *msg_, int flags_) 00130 { 00131 // If there are no matching pipes available, simply drop the message. 00132 if (matching == 0) { 00133 int rc = msg_->close (); 00134 errno_assert (rc == 0); 00135 rc = msg_->init (); 00136 zmq_assert (rc == 0); 00137 return; 00138 } 00139 00140 if (msg_->is_vsm ()) { 00141 for (pipes_t::size_type i = 0; i < matching; ++i) 00142 write (pipes [i], msg_); 00143 int rc = msg_->close(); 00144 errno_assert (rc == 0); 00145 rc = msg_->init (); 00146 errno_assert (rc == 0); 00147 return; 00148 } 00149 00150 // Add matching-1 references to the message. We already hold one reference, 00151 // that's why -1. 00152 msg_->add_refs ((int) matching - 1); 00153 00154 // Push copy of the message to each matching pipe. 00155 int failed = 0; 00156 for (pipes_t::size_type i = 0; i < matching; ++i) 00157 if (!write (pipes [i], msg_)) 00158 ++failed; 00159 if (unlikely (failed)) 00160 msg_->rm_refs (failed); 00161 00162 // Detach the original message from the data buffer. Note that we don't 00163 // close the message. That's because we've already used all the references. 00164 int rc = msg_->init (); 00165 errno_assert (rc == 0); 00166 } 00167 00168 bool zmq::dist_t::has_out () 00169 { 00170 return true; 00171 } 00172 00173 bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) 00174 { 00175 if (!pipe_->write (msg_)) { 00176 pipes.swap (pipes.index (pipe_), matching - 1); 00177 matching--; 00178 pipes.swap (pipes.index (pipe_), active - 1); 00179 active--; 00180 pipes.swap (active, eligible - 1); 00181 eligible--; 00182 return false; 00183 } 00184 if (!(msg_->flags () & msg_t::more)) 00185 pipe_->flush (); 00186 return true; 00187 } 00188