![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file 00004 00005 This file is part of 0MQ. 00006 00007 0MQ is free software; you can redistribute it and/or modify it under 00008 the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 0MQ is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00021 #ifndef __ZMQ_OWN_HPP_INCLUDED__ 00022 #define __ZMQ_OWN_HPP_INCLUDED__ 00023 00024 #include <set> 00025 #include <algorithm> 00026 00027 #include "object.hpp" 00028 #include "options.hpp" 00029 #include "atomic_counter.hpp" 00030 #include "stdint.hpp" 00031 00032 namespace zmq 00033 { 00034 00035 // Base class for objects forming a part of ownership hierarchy. 00036 // It handles initialisation and destruction of such objects. 00037 00038 class own_t : public object_t 00039 { 00040 public: 00041 00042 // Note that the owner is unspecified in the constructor. 00043 // It'll be supplied later on when the object is plugged in. 00044 00045 // The object is not living within an I/O thread. It has it's own 00046 // thread outside of 0MQ infrastructure. 00047 own_t (class ctx_t *parent_, uint32_t tid_); 00048 00049 // The object is living within I/O thread. 00050 own_t (class io_thread_t *io_thread_, const options_t &options_); 00051 00052 // When another owned object wants to send command to this object 00053 // it calls this function to let it know it should not shut down 00054 // before the command is delivered. 00055 void inc_seqnum (); 00056 00057 // Use following two functions to wait for arbitrary events before 00058 // terminating. Just add number of events to wait for using 00059 // register_tem_acks functions. When event occurs, call 00060 // remove_term_ack. When number of pending acks reaches zero 00061 // object will be deallocated. 00062 void register_term_acks (int count_); 00063 void unregister_term_ack (); 00064 00065 protected: 00066 00067 // Launch the supplied object and become its owner. 00068 void launch_child (own_t *object_); 00069 00070 // Launch the supplied object and make it your sibling (make your 00071 // owner become its owner as well). 00072 void launch_sibling (own_t *object_); 00073 00074 // Ask owner object to terminate this object. It may take a while 00075 // while actual termination is started. This function should not be 00076 // called more than once. 00077 void terminate (); 00078 00079 // Returns true if the object is in process of termination. 00080 bool is_terminating (); 00081 00082 // Derived object destroys own_t. There's no point in allowing 00083 // others to invoke the destructor. At the same time, it has to be 00084 // virtual so that generic own_t deallocation mechanism destroys 00085 // specific type of the owned object correctly. 00086 virtual ~own_t (); 00087 00088 // Term handler is protocted rather than private so that it can 00089 // be intercepted by the derived class. This is useful to add custom 00090 // steps to the beginning of the termination process. 00091 void process_term (int linger_); 00092 00093 // A place to hook in when phyicallal destruction of the object 00094 // is to be delayed. 00095 virtual void process_destroy (); 00096 00097 // Socket options associated with this object. 00098 options_t options; 00099 00100 private: 00101 00102 // Set owner of the object 00103 void set_owner (own_t *owner_); 00104 00105 // Handlers for incoming commands. 00106 void process_own (own_t *object_); 00107 void process_term_req (own_t *object_); 00108 void process_term_ack (); 00109 void process_seqnum (); 00110 00111 // Check whether all the peding term acks were delivered. 00112 // If so, deallocate this object. 00113 void check_term_acks (); 00114 00115 // True if termination was already initiated. If so, we can destroy 00116 // the object if there are no more child objects or pending term acks. 00117 bool terminating; 00118 00119 // Sequence number of the last command sent to this object. 00120 atomic_counter_t sent_seqnum; 00121 00122 // Sequence number of the last command processed by this object. 00123 uint64_t processed_seqnum; 00124 00125 // Socket owning this object. It's responsible for shutting down 00126 // this object. 00127 own_t *owner; 00128 00129 // List of all objects owned by this socket. We are responsible 00130 // for deallocating them before we quit. 00131 typedef std::set <own_t*> owned_t; 00132 owned_t owned; 00133 00134 // Number of events we have to get before we can destroy the object. 00135 int term_acks; 00136 00137 own_t (const own_t&); 00138 const own_t &operator = (const own_t&); 00139 }; 00140 00141 } 00142 00143 #endif