![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include <new> 00024 #include <stddef.h> 00025 00026 #include "pipe.hpp" 00027 #include "err.hpp" 00028 00029 int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], 00030 int hwms_ [2], bool delays_ [2]) 00031 { 00032 // Creates two pipe objects. These objects are connected by two ypipes, 00033 // each to pass messages in one direction. 00034 00035 pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); 00036 alloc_assert (upipe1); 00037 pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t (); 00038 alloc_assert (upipe2); 00039 00040 pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, 00041 hwms_ [1], hwms_ [0], delays_ [0]); 00042 alloc_assert (pipes_ [0]); 00043 pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, 00044 hwms_ [0], hwms_ [1], delays_ [1]); 00045 alloc_assert (pipes_ [1]); 00046 00047 pipes_ [0]->set_peer (pipes_ [1]); 00048 pipes_ [1]->set_peer (pipes_ [0]); 00049 00050 return 0; 00051 } 00052 00053 zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, 00054 int inhwm_, int outhwm_, bool delay_) : 00055 object_t (parent_), 00056 inpipe (inpipe_), 00057 outpipe (outpipe_), 00058 in_active (true), 00059 out_active (true), 00060 hwm (outhwm_), 00061 lwm (compute_lwm (inhwm_)), 00062 msgs_read (0), 00063 msgs_written (0), 00064 peers_msgs_read (0), 00065 peer (NULL), 00066 sink (NULL), 00067 state (active), 00068 delay (delay_) 00069 { 00070 } 00071 00072 zmq::pipe_t::~pipe_t () 00073 { 00074 } 00075 00076 void zmq::pipe_t::set_peer (pipe_t *peer_) 00077 { 00078 // Peer can be set once only. 00079 zmq_assert (!peer); 00080 peer = peer_; 00081 } 00082 00083 void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) 00084 { 00085 // Sink can be set once only. 00086 zmq_assert (!sink); 00087 sink = sink_; 00088 } 00089 00090 void zmq::pipe_t::set_identity (const blob_t &identity_) 00091 { 00092 identity = identity_; 00093 } 00094 00095 zmq::blob_t zmq::pipe_t::get_identity () 00096 { 00097 return identity; 00098 } 00099 00100 bool zmq::pipe_t::check_read () 00101 { 00102 if (unlikely (!in_active || (state != active && state != pending))) 00103 return false; 00104 00105 // Check if there's an item in the pipe. 00106 if (!inpipe->check_read ()) { 00107 in_active = false; 00108 return false; 00109 } 00110 00111 // If the next item in the pipe is message delimiter, 00112 // initiate termination process. 00113 if (inpipe->probe (is_delimiter)) { 00114 msg_t msg; 00115 bool ok = inpipe->read (&msg); 00116 zmq_assert (ok); 00117 delimit (); 00118 return false; 00119 } 00120 00121 return true; 00122 } 00123 00124 bool zmq::pipe_t::read (msg_t *msg_) 00125 { 00126 if (unlikely (!in_active || (state != active && state != pending))) 00127 return false; 00128 00129 if (!inpipe->read (msg_)) { 00130 in_active = false; 00131 return false; 00132 } 00133 00134 // If delimiter was read, start termination process of the pipe. 00135 if (msg_->is_delimiter ()) { 00136 delimit (); 00137 return false; 00138 } 00139 00140 if (!(msg_->flags () & msg_t::more)) 00141 msgs_read++; 00142 00143 if (lwm > 0 && msgs_read % lwm == 0) 00144 send_activate_write (peer, msgs_read); 00145 00146 return true; 00147 } 00148 00149 bool zmq::pipe_t::check_write (msg_t *msg_) 00150 { 00151 if (unlikely (!out_active || state != active)) 00152 return false; 00153 00154 bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm); 00155 00156 if (unlikely (full)) { 00157 out_active = false; 00158 return false; 00159 } 00160 00161 return true; 00162 } 00163 00164 bool zmq::pipe_t::write (msg_t *msg_) 00165 { 00166 if (unlikely (!check_write (msg_))) 00167 return false; 00168 00169 bool more = msg_->flags () & msg_t::more ? true : false; 00170 outpipe->write (*msg_, more); 00171 if (!more) 00172 msgs_written++; 00173 00174 return true; 00175 } 00176 00177 void zmq::pipe_t::rollback () 00178 { 00179 // Remove incomplete message from the outbound pipe. 00180 msg_t msg; 00181 if (outpipe) { 00182 while (outpipe->unwrite (&msg)) { 00183 zmq_assert (msg.flags () & msg_t::more); 00184 int rc = msg.close (); 00185 errno_assert (rc == 0); 00186 } 00187 } 00188 } 00189 00190 void zmq::pipe_t::flush () 00191 { 00192 // If terminate() was already called do nothing. 00193 if (state == terminated && state == double_terminated) 00194 return; 00195 00196 // The peer does not exist anymore at this point. 00197 if (state == terminating) 00198 return; 00199 00200 if (outpipe && !outpipe->flush ()) 00201 send_activate_read (peer); 00202 } 00203 00204 void zmq::pipe_t::process_activate_read () 00205 { 00206 if (!in_active && (state == active || state == pending)) { 00207 in_active = true; 00208 sink->read_activated (this); 00209 } 00210 } 00211 00212 void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) 00213 { 00214 // Remember the peers's message sequence number. 00215 peers_msgs_read = msgs_read_; 00216 00217 if (!out_active && state == active) { 00218 out_active = true; 00219 sink->write_activated (this); 00220 } 00221 } 00222 00223 void zmq::pipe_t::process_hiccup (void *pipe_) 00224 { 00225 // Destroy old outpipe. Note that the read end of the pipe was already 00226 // migrated to this thread. 00227 zmq_assert (outpipe); 00228 outpipe->flush (); 00229 msg_t msg; 00230 while (outpipe->read (&msg)) { 00231 int rc = msg.close (); 00232 errno_assert (rc == 0); 00233 } 00234 delete outpipe; 00235 00236 // Plug in the new outpipe. 00237 zmq_assert (pipe_); 00238 outpipe = (upipe_t*) pipe_; 00239 out_active = true; 00240 00241 // If appropriate, notify the user about the hiccup. 00242 if (state == active) 00243 sink->hiccuped (this); 00244 } 00245 00246 void zmq::pipe_t::process_pipe_term () 00247 { 00248 // This is the simple case of peer-induced termination. If there are no 00249 // more pending messages to read, or if the pipe was configured to drop 00250 // pending messages, we can move directly to the terminating state. 00251 // Otherwise we'll hang up in pending state till all the pending messages 00252 // are sent. 00253 if (state == active) { 00254 if (!delay) { 00255 state = terminating; 00256 outpipe = NULL; 00257 send_pipe_term_ack (peer); 00258 return; 00259 } 00260 else { 00261 state = pending; 00262 return; 00263 } 00264 } 00265 00266 // Delimiter happened to arrive before the term command. Now we have the 00267 // term command as well, so we can move straight to terminating state. 00268 if (state == delimited) { 00269 state = terminating; 00270 outpipe = NULL; 00271 send_pipe_term_ack (peer); 00272 return; 00273 } 00274 00275 // This is the case where both ends of the pipe are closed in parallel. 00276 // We simply reply to the request by ack and continue waiting for our 00277 // own ack. 00278 if (state == terminated) { 00279 state = double_terminated; 00280 outpipe = NULL; 00281 send_pipe_term_ack (peer); 00282 return; 00283 } 00284 00285 // pipe_term is invalid in other states. 00286 zmq_assert (false); 00287 } 00288 00289 void zmq::pipe_t::process_pipe_term_ack () 00290 { 00291 // Notify the user that all the references to the pipe should be dropped. 00292 zmq_assert (sink); 00293 sink->terminated (this); 00294 00295 // In terminating and double_terminated states there's nothing to do. 00296 // Simply deallocate the pipe. In terminated state we have to ack the 00297 // peer before deallocating this side of the pipe. All the other states 00298 // are invalid. 00299 if (state == terminating) ; 00300 else if (state == double_terminated); 00301 else if (state == terminated) { 00302 outpipe = NULL; 00303 send_pipe_term_ack (peer); 00304 } 00305 else 00306 zmq_assert (false); 00307 00308 // We'll deallocate the inbound pipe, the peer will deallocate the outbound 00309 // pipe (which is an inbound pipe from its point of view). 00310 // First, delete all the unread messages in the pipe. We have to do it by 00311 // hand because msg_t doesn't have automatic destructor. Then deallocate 00312 // the ypipe itself. 00313 msg_t msg; 00314 while (inpipe->read (&msg)) { 00315 int rc = msg.close (); 00316 errno_assert (rc == 0); 00317 } 00318 delete inpipe; 00319 00320 // Deallocate the pipe object 00321 delete this; 00322 } 00323 00324 void zmq::pipe_t::terminate (bool delay_) 00325 { 00326 // Overload the value specified at pipe creation. 00327 delay = delay_; 00328 00329 // If terminate was already called, we can ignore the duplicit invocation. 00330 if (state == terminated || state == double_terminated) 00331 return; 00332 00333 // If the pipe is in the final phase of async termination, it's going to 00334 // closed anyway. No need to do anything special here. 00335 else if (state == terminating) 00336 return; 00337 00338 // The simple sync termination case. Ask the peer to terminate and wait 00339 // for the ack. 00340 else if (state == active) { 00341 send_pipe_term (peer); 00342 state = terminated; 00343 } 00344 00345 // There are still pending messages available, but the user calls 00346 // 'terminate'. We can act as if all the pending messages were read. 00347 else if (state == pending && !delay) { 00348 outpipe = NULL; 00349 send_pipe_term_ack (peer); 00350 state = terminating; 00351 } 00352 00353 // If there are pending messages still availabe, do nothing. 00354 else if (state == pending && delay) { 00355 } 00356 00357 // We've already got delimiter, but not term command yet. We can ignore 00358 // the delimiter and ack synchronously terminate as if we were in 00359 // active state. 00360 else if (state == delimited) { 00361 send_pipe_term (peer); 00362 state = terminated; 00363 } 00364 00365 // There are no other states. 00366 else 00367 zmq_assert (false); 00368 00369 // Stop outbound flow of messages. 00370 out_active = false; 00371 00372 if (outpipe) { 00373 00374 // Rollback any unfinished outbound messages. 00375 rollback (); 00376 00377 // Push delimiter into the outbound pipe. Note that watermarks are not 00378 // checked thus the delimiter can be written even though the pipe is full. 00379 msg_t msg; 00380 msg.init_delimiter (); 00381 outpipe->write (msg, false); 00382 flush (); 00383 } 00384 } 00385 00386 bool zmq::pipe_t::is_delimiter (msg_t &msg_) 00387 { 00388 return msg_.is_delimiter (); 00389 } 00390 00391 int zmq::pipe_t::compute_lwm (int hwm_) 00392 { 00393 // Compute the low water mark. Following point should be taken 00394 // into consideration: 00395 // 00396 // 1. LWM has to be less than HWM. 00397 // 2. LWM cannot be set to very low value (such as zero) as after filling 00398 // the queue it would start to refill only after all the messages are 00399 // read from it and thus unnecessarily hold the progress back. 00400 // 3. LWM cannot be set to very high value (such as HWM-1) as it would 00401 // result in lock-step filling of the queue - if a single message is 00402 // read from a full queue, writer thread is resumed to write exactly one 00403 // message to the queue and go back to sleep immediately. This would 00404 // result in low performance. 00405 // 00406 // Given the 3. it would be good to keep HWM and LWM as far apart as 00407 // possible to reduce the thread switching overhead to almost zero, 00408 // say HWM-LWM should be max_wm_delta. 00409 // 00410 // That done, we still we have to account for the cases where 00411 // HWM < max_wm_delta thus driving LWM to negative numbers. 00412 // Let's make LWM 1/2 of HWM in such cases. 00413 int result = (hwm_ > max_wm_delta * 2) ? 00414 hwm_ - max_wm_delta : (hwm_ + 1) / 2; 00415 00416 return result; 00417 } 00418 00419 void zmq::pipe_t::delimit () 00420 { 00421 if (state == active) { 00422 state = delimited; 00423 return; 00424 } 00425 00426 if (state == pending) { 00427 outpipe = NULL; 00428 send_pipe_term_ack (peer); 00429 state = terminating; 00430 return; 00431 } 00432 00433 // Delimiter in any other state is invalid. 00434 zmq_assert (false); 00435 } 00436 00437 void zmq::pipe_t::hiccup () 00438 { 00439 // If termination is already under way do nothing. 00440 if (state != active) 00441 return; 00442 00443 // We'll drop the pointer to the inpipe. From now on, the peer is 00444 // responsible for deallocating it. 00445 inpipe = NULL; 00446 00447 // Create new inpipe. 00448 inpipe = new (std::nothrow) pipe_t::upipe_t (); 00449 alloc_assert (inpipe); 00450 in_active = true; 00451 00452 // Notify the peer about the hiccup. 00453 send_hiccup (peer, (void*) inpipe); 00454 } 00455