![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file 00004 00005 This file is part of 0MQ. 00006 00007 0MQ is free software; you can redistribute it and/or modify it under 00008 the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 0MQ is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00021 #include "own.hpp" 00022 #include "err.hpp" 00023 #include "io_thread.hpp" 00024 00025 zmq::own_t::own_t (class ctx_t *parent_, uint32_t tid_) : 00026 object_t (parent_, tid_), 00027 terminating (false), 00028 sent_seqnum (0), 00029 processed_seqnum (0), 00030 owner (NULL), 00031 term_acks (0) 00032 { 00033 } 00034 00035 zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) : 00036 object_t (io_thread_), 00037 options (options_), 00038 terminating (false), 00039 sent_seqnum (0), 00040 processed_seqnum (0), 00041 owner (NULL), 00042 term_acks (0) 00043 { 00044 } 00045 00046 zmq::own_t::~own_t () 00047 { 00048 } 00049 00050 void zmq::own_t::set_owner (own_t *owner_) 00051 { 00052 zmq_assert (!owner); 00053 owner = owner_; 00054 } 00055 00056 void zmq::own_t::inc_seqnum () 00057 { 00058 // This function may be called from a different thread! 00059 sent_seqnum.add (1); 00060 } 00061 00062 void zmq::own_t::process_seqnum () 00063 { 00064 // Catch up with counter of processed commands. 00065 processed_seqnum++; 00066 00067 // We may have catched up and still have pending terms acks. 00068 check_term_acks (); 00069 } 00070 00071 void zmq::own_t::launch_child (own_t *object_) 00072 { 00073 // Specify the owner of the object. 00074 object_->set_owner (this); 00075 00076 // Plug the object into the I/O thread. 00077 send_plug (object_); 00078 00079 // Take ownership of the object. 00080 send_own (this, object_); 00081 } 00082 00083 void zmq::own_t::launch_sibling (own_t *object_) 00084 { 00085 // At this point it is important that object is plugged in before its 00086 // owner has a chance to terminate it. Thus, 'plug' command is sent before 00087 // the 'own' command. Given that the mailbox preserves ordering of 00088 // commands, 'term' command from the owner cannot make it to the object 00089 // before the already written 'plug' command. 00090 00091 // Specify the owner of the object. 00092 object_->set_owner (owner); 00093 00094 // Plug the object into its I/O thread. 00095 send_plug (object_); 00096 00097 // Make parent own the object. 00098 send_own (owner, object_); 00099 } 00100 00101 void zmq::own_t::process_term_req (own_t *object_) 00102 { 00103 // When shutting down we can ignore termination requests from owned 00104 // objects. The termination request was already sent to the object. 00105 if (terminating) 00106 return; 00107 00108 // If I/O object is well and alive let's ask it to terminate. 00109 owned_t::iterator it = std::find (owned.begin (), owned.end (), object_); 00110 00111 // If not found, we assume that termination request was already sent to 00112 // the object so we can safely ignore the request. 00113 if (it == owned.end ()) 00114 return; 00115 00116 owned.erase (it); 00117 register_term_acks (1); 00118 00119 // Note that this object is the root of the (partial shutdown) thus, its 00120 // value of linger is used, rather than the value stored by the children. 00121 send_term (object_, options.linger); 00122 } 00123 00124 void zmq::own_t::process_own (own_t *object_) 00125 { 00126 // If the object is already being shut down, new owned objects are 00127 // immediately asked to terminate. Note that linger is set to zero. 00128 if (terminating) { 00129 register_term_acks (1); 00130 send_term (object_, 0); 00131 return; 00132 } 00133 00134 // Store the reference to the owned object. 00135 owned.insert (object_); 00136 } 00137 00138 void zmq::own_t::terminate () 00139 { 00140 // If termination is already underway, there's no point 00141 // in starting it anew. 00142 if (terminating) 00143 return; 00144 00145 // As for the root of the ownership tree, there's noone to terminate it, 00146 // so it has to terminate itself. 00147 if (!owner) { 00148 process_term (options.linger); 00149 return; 00150 } 00151 00152 // If I am an owned object, I'll ask my owner to terminate me. 00153 send_term_req (owner, this); 00154 } 00155 00156 bool zmq::own_t::is_terminating () 00157 { 00158 return terminating; 00159 } 00160 00161 void zmq::own_t::process_term (int linger_) 00162 { 00163 // Double termination should never happen. 00164 zmq_assert (!terminating); 00165 00166 // Send termination request to all owned objects. 00167 for (owned_t::iterator it = owned.begin (); it != owned.end (); ++it) 00168 send_term (*it, linger_); 00169 register_term_acks ((int) owned.size ()); 00170 owned.clear (); 00171 00172 // Start termination process and check whether by chance we cannot 00173 // terminate immediately. 00174 terminating = true; 00175 check_term_acks (); 00176 } 00177 00178 void zmq::own_t::register_term_acks (int count_) 00179 { 00180 term_acks += count_; 00181 } 00182 00183 void zmq::own_t::unregister_term_ack () 00184 { 00185 zmq_assert (term_acks > 0); 00186 term_acks--; 00187 00188 // This may be a last ack we are waiting for before termination... 00189 check_term_acks (); 00190 } 00191 00192 void zmq::own_t::process_term_ack () 00193 { 00194 unregister_term_ack (); 00195 } 00196 00197 void zmq::own_t::check_term_acks () 00198 { 00199 if (terminating && processed_seqnum == sent_seqnum.get () && 00200 term_acks == 0) { 00201 00202 // Sanity check. There should be no active children at this point. 00203 zmq_assert (owned.empty ()); 00204 00205 // The root object has nobody to confirm the termination to. 00206 // Other nodes will confirm the termination to the owner. 00207 if (owner) 00208 send_term_ack (owner); 00209 00210 // Deallocate the resources. 00211 process_destroy (); 00212 } 00213 } 00214 00215 void zmq::own_t::process_destroy () 00216 { 00217 delete this; 00218 } 00219