libzmq master
The Intelligent Transport Layer

own.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2010-2011 250bpm s.r.o.
00003     Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
00004 
00005     This file is part of 0MQ.
00006 
00007     0MQ is free software; you can redistribute it and/or modify it under
00008     the terms of the GNU Lesser General Public License as published by
00009     the Free Software Foundation; either version 3 of the License, or
00010     (at your option) any later version.
00011 
00012     0MQ is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015     GNU Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public License
00018     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00019 */
00020 
00021 #include "own.hpp"
00022 #include "err.hpp"
00023 #include "io_thread.hpp"
00024 
00025 zmq::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
00026     object_t (parent_, tid_),
00027     terminating (false),
00028     sent_seqnum (0),
00029     processed_seqnum (0),
00030     owner (NULL),
00031     term_acks (0)
00032 {
00033 }
00034 
00035 zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
00036     object_t (io_thread_),
00037     options (options_),
00038     terminating (false),
00039     sent_seqnum (0),
00040     processed_seqnum (0),
00041     owner (NULL),
00042     term_acks (0)
00043 {
00044 }
00045 
00046 zmq::own_t::~own_t ()
00047 {
00048 }
00049 
00050 void zmq::own_t::set_owner (own_t *owner_)
00051 {
00052     zmq_assert (!owner);
00053     owner = owner_;
00054 }
00055 
00056 void zmq::own_t::inc_seqnum ()
00057 {
00058     //  This function may be called from a different thread!
00059     sent_seqnum.add (1);
00060 }
00061 
00062 void zmq::own_t::process_seqnum ()
00063 {
00064     //  Catch up with counter of processed commands.
00065     processed_seqnum++;
00066 
00067     //  We may have catched up and still have pending terms acks.
00068     check_term_acks ();
00069 }
00070 
00071 void zmq::own_t::launch_child (own_t *object_)
00072 {
00073     //  Specify the owner of the object.
00074     object_->set_owner (this);
00075 
00076     //  Plug the object into the I/O thread.
00077     send_plug (object_);
00078 
00079     //  Take ownership of the object.
00080     send_own (this, object_);
00081 }
00082 
00083 void zmq::own_t::launch_sibling (own_t *object_)
00084 {
00085     //  At this point it is important that object is plugged in before its
00086     //  owner has a chance to terminate it. Thus, 'plug' command is sent before
00087     //  the 'own' command. Given that the mailbox preserves ordering of
00088     //  commands, 'term' command from the owner cannot make it to the object
00089     //  before the already written 'plug' command.
00090 
00091     //  Specify the owner of the object.
00092     object_->set_owner (owner);
00093 
00094     //  Plug the object into its I/O thread.
00095     send_plug (object_);
00096 
00097     //  Make parent own the object.
00098     send_own (owner, object_);
00099 }
00100 
00101 void zmq::own_t::process_term_req (own_t *object_)
00102 {
00103     //  When shutting down we can ignore termination requests from owned
00104     //  objects. The termination request was already sent to the object.
00105     if (terminating)
00106         return;
00107 
00108     //  If I/O object is well and alive let's ask it to terminate.
00109     owned_t::iterator it = std::find (owned.begin (), owned.end (), object_);
00110 
00111     //  If not found, we assume that termination request was already sent to
00112     //  the object so we can safely ignore the request.
00113     if (it == owned.end ())
00114         return;
00115 
00116     owned.erase (it);
00117     register_term_acks (1);
00118 
00119     //  Note that this object is the root of the (partial shutdown) thus, its
00120     //  value of linger is used, rather than the value stored by the children.
00121     send_term (object_, options.linger);
00122 }
00123 
00124 void zmq::own_t::process_own (own_t *object_)
00125 {
00126     //  If the object is already being shut down, new owned objects are
00127     //  immediately asked to terminate. Note that linger is set to zero.
00128     if (terminating) {
00129         register_term_acks (1);
00130         send_term (object_, 0);
00131         return;
00132     }
00133 
00134     //  Store the reference to the owned object.
00135     owned.insert (object_);
00136 }
00137 
00138 void zmq::own_t::terminate ()
00139 {
00140     //  If termination is already underway, there's no point
00141     //  in starting it anew.
00142     if (terminating)
00143         return;
00144 
00145     //  As for the root of the ownership tree, there's noone to terminate it,
00146     //  so it has to terminate itself.
00147     if (!owner) {
00148         process_term (options.linger);
00149         return;
00150     }
00151 
00152     //  If I am an owned object, I'll ask my owner to terminate me.
00153     send_term_req (owner, this);
00154 }
00155 
00156 bool zmq::own_t::is_terminating ()
00157 {
00158     return terminating;
00159 }
00160 
00161 void zmq::own_t::process_term (int linger_)
00162 {
00163     //  Double termination should never happen.
00164     zmq_assert (!terminating);
00165 
00166     //  Send termination request to all owned objects.
00167     for (owned_t::iterator it = owned.begin (); it != owned.end (); ++it)
00168         send_term (*it, linger_);
00169     register_term_acks ((int) owned.size ());
00170     owned.clear ();
00171 
00172     //  Start termination process and check whether by chance we cannot
00173     //  terminate immediately.
00174     terminating = true;
00175     check_term_acks ();
00176 }
00177 
00178 void zmq::own_t::register_term_acks (int count_)
00179 {
00180     term_acks += count_;
00181 }
00182 
00183 void zmq::own_t::unregister_term_ack ()
00184 {
00185     zmq_assert (term_acks > 0);
00186     term_acks--;
00187 
00188     //  This may be a last ack we are waiting for before termination...
00189     check_term_acks (); 
00190 }
00191 
00192 void zmq::own_t::process_term_ack ()
00193 {
00194     unregister_term_ack ();
00195 }
00196 
00197 void zmq::own_t::check_term_acks ()
00198 {
00199     if (terminating && processed_seqnum == sent_seqnum.get () &&
00200           term_acks == 0) {
00201 
00202         //  Sanity check. There should be no active children at this point.
00203         zmq_assert (owned.empty ());
00204 
00205         //  The root object has nobody to confirm the termination to.
00206         //  Other nodes will confirm the termination to the owner.
00207         if (owner)
00208             send_term_ack (owner);
00209 
00210         //  Deallocate the resources.
00211         process_destroy ();
00212     }
00213 }
00214 
00215 void zmq::own_t::process_destroy ()
00216 {
00217     delete this;
00218 }
00219 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines