libzmq master
The Intelligent Transport Layer

pipe.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2011 VMware, Inc.
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include <new>
00024 #include <stddef.h>
00025 
00026 #include "pipe.hpp"
00027 #include "err.hpp"
00028 
00029 int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
00030     int hwms_ [2], bool delays_ [2])
00031 {
00032     //   Creates two pipe objects. These objects are connected by two ypipes,
00033     //   each to pass messages in one direction.
00034 
00035     pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t ();
00036     alloc_assert (upipe1);
00037     pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
00038     alloc_assert (upipe2);
00039 
00040     pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
00041         hwms_ [1], hwms_ [0], delays_ [0]);
00042     alloc_assert (pipes_ [0]);
00043     pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
00044         hwms_ [0], hwms_ [1], delays_ [1]);
00045     alloc_assert (pipes_ [1]);
00046 
00047     pipes_ [0]->set_peer (pipes_ [1]);
00048     pipes_ [1]->set_peer (pipes_ [0]);
00049 
00050     return 0;
00051 }
00052 
00053 zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
00054       int inhwm_, int outhwm_, bool delay_) :
00055     object_t (parent_),
00056     inpipe (inpipe_),
00057     outpipe (outpipe_),
00058     in_active (true),
00059     out_active (true),
00060     hwm (outhwm_),
00061     lwm (compute_lwm (inhwm_)),
00062     msgs_read (0),
00063     msgs_written (0),
00064     peers_msgs_read (0),
00065     peer (NULL),
00066     sink (NULL),
00067     state (active),
00068     delay (delay_)
00069 {
00070 }
00071 
00072 zmq::pipe_t::~pipe_t ()
00073 {
00074 }
00075 
00076 void zmq::pipe_t::set_peer (pipe_t *peer_)
00077 {
00078     //  Peer can be set once only.
00079     zmq_assert (!peer);
00080     peer = peer_;
00081 }
00082 
00083 void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
00084 {
00085     // Sink can be set once only.
00086     zmq_assert (!sink);
00087     sink = sink_;
00088 }
00089 
00090 void zmq::pipe_t::set_identity (const blob_t &identity_)
00091 {
00092     identity = identity_;
00093 }
00094 
00095 zmq::blob_t zmq::pipe_t::get_identity ()
00096 {
00097     return identity;
00098 }
00099 
00100 bool zmq::pipe_t::check_read ()
00101 {
00102     if (unlikely (!in_active || (state != active && state != pending)))
00103         return false;
00104 
00105     //  Check if there's an item in the pipe.
00106     if (!inpipe->check_read ()) {
00107         in_active = false;
00108         return false;
00109     }
00110 
00111     //  If the next item in the pipe is message delimiter,
00112     //  initiate termination process.
00113     if (inpipe->probe (is_delimiter)) {
00114         msg_t msg;
00115         bool ok = inpipe->read (&msg);
00116         zmq_assert (ok);
00117         delimit ();
00118         return false;
00119     }
00120 
00121     return true;
00122 }
00123 
00124 bool zmq::pipe_t::read (msg_t *msg_)
00125 {
00126     if (unlikely (!in_active || (state != active && state != pending)))
00127         return false;
00128 
00129     if (!inpipe->read (msg_)) {
00130         in_active = false;
00131         return false;
00132     }
00133 
00134     //  If delimiter was read, start termination process of the pipe.
00135     if (msg_->is_delimiter ()) {
00136         delimit ();
00137         return false;
00138     }
00139 
00140     if (!(msg_->flags () & msg_t::more))
00141         msgs_read++;
00142 
00143     if (lwm > 0 && msgs_read % lwm == 0)
00144         send_activate_write (peer, msgs_read);
00145 
00146     return true;
00147 }
00148 
00149 bool zmq::pipe_t::check_write (msg_t *msg_)
00150 {
00151     if (unlikely (!out_active || state != active))
00152         return false;
00153 
00154     bool full = hwm > 0 && msgs_written - peers_msgs_read == uint64_t (hwm);
00155 
00156     if (unlikely (full)) {
00157         out_active = false;
00158         return false;
00159     }
00160 
00161     return true;
00162 }
00163 
00164 bool zmq::pipe_t::write (msg_t *msg_)
00165 {
00166     if (unlikely (!check_write (msg_)))
00167         return false;
00168 
00169     bool more = msg_->flags () & msg_t::more ? true : false;
00170     outpipe->write (*msg_, more);
00171     if (!more)
00172         msgs_written++;
00173 
00174     return true;
00175 }
00176 
00177 void zmq::pipe_t::rollback ()
00178 {
00179     //  Remove incomplete message from the outbound pipe.
00180     msg_t msg;
00181     if (outpipe) {
00182         while (outpipe->unwrite (&msg)) {
00183             zmq_assert (msg.flags () & msg_t::more);
00184             int rc = msg.close ();
00185             errno_assert (rc == 0);
00186         }
00187     }
00188 }
00189 
00190 void zmq::pipe_t::flush ()
00191 {
00192     //  If terminate() was already called do nothing.
00193     if (state == terminated && state == double_terminated)
00194         return;
00195 
00196     //  The peer does not exist anymore at this point.
00197     if (state == terminating)
00198         return;
00199 
00200     if (outpipe && !outpipe->flush ())
00201         send_activate_read (peer);
00202 }
00203 
00204 void zmq::pipe_t::process_activate_read ()
00205 {
00206     if (!in_active && (state == active || state == pending)) {
00207         in_active = true;
00208         sink->read_activated (this);
00209     }
00210 }
00211 
00212 void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
00213 {
00214     //  Remember the peers's message sequence number.
00215     peers_msgs_read = msgs_read_;
00216 
00217     if (!out_active && state == active) {
00218         out_active = true;
00219         sink->write_activated (this);
00220     }
00221 }
00222 
00223 void zmq::pipe_t::process_hiccup (void *pipe_)
00224 {
00225     //  Destroy old outpipe. Note that the read end of the pipe was already
00226     //  migrated to this thread.
00227     zmq_assert (outpipe);
00228     outpipe->flush ();
00229     msg_t msg;
00230     while (outpipe->read (&msg)) {
00231        int rc = msg.close ();
00232        errno_assert (rc == 0);
00233     }
00234     delete outpipe;
00235 
00236     //  Plug in the new outpipe.
00237     zmq_assert (pipe_);
00238     outpipe = (upipe_t*) pipe_;
00239     out_active = true;
00240 
00241     //  If appropriate, notify the user about the hiccup.
00242     if (state == active)
00243         sink->hiccuped (this);
00244 }
00245 
00246 void zmq::pipe_t::process_pipe_term ()
00247 {
00248     //  This is the simple case of peer-induced termination. If there are no
00249     //  more pending messages to read, or if the pipe was configured to drop
00250     //  pending messages, we can move directly to the terminating state.
00251     //  Otherwise we'll hang up in pending state till all the pending messages
00252     //  are sent.
00253     if (state == active) {
00254         if (!delay) {
00255             state = terminating;
00256             outpipe = NULL;
00257             send_pipe_term_ack (peer);
00258             return;
00259         }
00260         else {
00261             state = pending;
00262             return;
00263         }
00264     }
00265 
00266     //  Delimiter happened to arrive before the term command. Now we have the
00267     //  term command as well, so we can move straight to terminating state.
00268     if (state == delimited) {
00269         state = terminating;
00270         outpipe = NULL;
00271         send_pipe_term_ack (peer);
00272         return;
00273     }
00274 
00275     //  This is the case where both ends of the pipe are closed in parallel.
00276     //  We simply reply to the request by ack and continue waiting for our
00277     //  own ack.
00278     if (state == terminated) {
00279         state = double_terminated;
00280         outpipe = NULL;
00281         send_pipe_term_ack (peer);
00282         return;
00283     }
00284 
00285     //  pipe_term is invalid in other states.
00286     zmq_assert (false);
00287 }
00288 
00289 void zmq::pipe_t::process_pipe_term_ack ()
00290 {
00291     //  Notify the user that all the references to the pipe should be dropped.
00292     zmq_assert (sink);
00293     sink->terminated (this);
00294 
00295     //  In terminating and double_terminated states there's nothing to do.
00296     //  Simply deallocate the pipe. In terminated state we have to ack the
00297     //  peer before deallocating this side of the pipe. All the other states
00298     //  are invalid.
00299     if (state == terminating) ;
00300     else if (state == double_terminated);
00301     else if (state == terminated) {
00302         outpipe = NULL;
00303         send_pipe_term_ack (peer);
00304     }
00305     else
00306         zmq_assert (false);
00307 
00308     //  We'll deallocate the inbound pipe, the peer will deallocate the outbound
00309     //  pipe (which is an inbound pipe from its point of view).
00310     //  First, delete all the unread messages in the pipe. We have to do it by
00311     //  hand because msg_t doesn't have automatic destructor. Then deallocate
00312     //  the ypipe itself.
00313     msg_t msg;
00314     while (inpipe->read (&msg)) {
00315        int rc = msg.close ();
00316        errno_assert (rc == 0);
00317     }
00318     delete inpipe;
00319 
00320     //  Deallocate the pipe object
00321     delete this;
00322 }
00323 
00324 void zmq::pipe_t::terminate (bool delay_)
00325 {
00326     //  Overload the value specified at pipe creation.
00327     delay = delay_;
00328 
00329     //  If terminate was already called, we can ignore the duplicit invocation.
00330     if (state == terminated || state == double_terminated)
00331         return;
00332 
00333     //  If the pipe is in the final phase of async termination, it's going to
00334     //  closed anyway. No need to do anything special here.
00335     else if (state == terminating)
00336         return;
00337 
00338     //  The simple sync termination case. Ask the peer to terminate and wait
00339     //  for the ack.
00340     else if (state == active) {
00341         send_pipe_term (peer);
00342         state = terminated;
00343     }
00344 
00345     //  There are still pending messages available, but the user calls
00346     //  'terminate'. We can act as if all the pending messages were read.
00347     else if (state == pending && !delay) {
00348             outpipe = NULL;
00349             send_pipe_term_ack (peer);
00350             state = terminating;
00351     }
00352 
00353     //  If there are pending messages still availabe, do nothing.
00354     else if (state == pending && delay) {
00355     }
00356 
00357     //  We've already got delimiter, but not term command yet. We can ignore
00358     //  the delimiter and ack synchronously terminate as if we were in
00359     //  active state.    
00360     else if (state == delimited) {
00361         send_pipe_term (peer);
00362         state = terminated;    
00363     }
00364 
00365     //  There are no other states.
00366     else
00367         zmq_assert (false);
00368 
00369     //  Stop outbound flow of messages.
00370     out_active = false;
00371 
00372     if (outpipe) {
00373 
00374         //  Rollback any unfinished outbound messages.
00375         rollback ();
00376 
00377         //  Push delimiter into the outbound pipe. Note that watermarks are not
00378         //  checked thus the delimiter can be written even though the pipe is full.
00379         msg_t msg;
00380         msg.init_delimiter ();
00381         outpipe->write (msg, false);
00382         flush ();
00383     }
00384 }
00385 
00386 bool zmq::pipe_t::is_delimiter (msg_t &msg_)
00387 {
00388     return msg_.is_delimiter ();
00389 }
00390 
00391 int zmq::pipe_t::compute_lwm (int hwm_)
00392 {
00393     //  Compute the low water mark. Following point should be taken
00394     //  into consideration:
00395     //
00396     //  1. LWM has to be less than HWM.
00397     //  2. LWM cannot be set to very low value (such as zero) as after filling
00398     //     the queue it would start to refill only after all the messages are
00399     //     read from it and thus unnecessarily hold the progress back.
00400     //  3. LWM cannot be set to very high value (such as HWM-1) as it would
00401     //     result in lock-step filling of the queue - if a single message is
00402     //     read from a full queue, writer thread is resumed to write exactly one
00403     //     message to the queue and go back to sleep immediately. This would
00404     //     result in low performance.
00405     //
00406     //  Given the 3. it would be good to keep HWM and LWM as far apart as
00407     //  possible to reduce the thread switching overhead to almost zero,
00408     //  say HWM-LWM should be max_wm_delta.
00409     //
00410     //  That done, we still we have to account for the cases where
00411     //  HWM < max_wm_delta thus driving LWM to negative numbers.
00412     //  Let's make LWM 1/2 of HWM in such cases.
00413     int result = (hwm_ > max_wm_delta * 2) ?
00414         hwm_ - max_wm_delta : (hwm_ + 1) / 2;
00415 
00416     return result;
00417 }
00418 
00419 void zmq::pipe_t::delimit ()
00420 {
00421     if (state == active) {
00422         state = delimited;
00423         return;
00424     }
00425 
00426     if (state == pending) {
00427         outpipe = NULL;
00428         send_pipe_term_ack (peer);
00429         state = terminating;
00430         return;
00431     }
00432 
00433     //  Delimiter in any other state is invalid.
00434     zmq_assert (false);
00435 }
00436 
00437 void zmq::pipe_t::hiccup ()
00438 {
00439     //  If termination is already under way do nothing.
00440     if (state != active)
00441         return;
00442 
00443     //  We'll drop the pointer to the inpipe. From now on, the peer is
00444     //  responsible for deallocating it.
00445     inpipe = NULL;
00446 
00447     //  Create new inpipe.
00448     inpipe = new (std::nothrow) pipe_t::upipe_t ();
00449     alloc_assert (inpipe);
00450     in_active = true;
00451 
00452     //  Notify the peer about the hiccup.
00453     send_hiccup (peer, (void*) inpipe);
00454 }
00455 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines