![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #ifndef __ZMQ_CTX_HPP_INCLUDED__ 00023 #define __ZMQ_CTX_HPP_INCLUDED__ 00024 00025 #include <map> 00026 #include <vector> 00027 #include <string> 00028 #include <stdarg.h> 00029 00030 #include "mailbox.hpp" 00031 #include "array.hpp" 00032 #include "config.hpp" 00033 #include "mutex.hpp" 00034 #include "stdint.hpp" 00035 #include "options.hpp" 00036 00037 namespace zmq 00038 { 00039 // Information associated with inproc endpoint. Note that endpoint options 00040 // are registered as well so that the peer can access them without a need 00041 // for synchronisation, handshaking or similar. 00042 struct endpoint_t 00043 { 00044 class socket_base_t *socket; 00045 options_t options; 00046 }; 00047 00048 // Context object encapsulates all the global state associated with 00049 // the library. 00050 00051 class ctx_t 00052 { 00053 public: 00054 00055 // Create the context object. The argument specifies the size 00056 // of I/O thread pool to create. 00057 ctx_t (uint32_t io_threads_); 00058 00059 // Returns false if object is not a context. 00060 bool check_tag (); 00061 00062 // This function is called when user invokes zmq_term. If there are 00063 // no more sockets open it'll cause all the infrastructure to be shut 00064 // down. If there are open sockets still, the deallocation happens 00065 // after the last one is closed. 00066 int terminate (); 00067 00068 // Create and destroy a socket. 00069 class socket_base_t *create_socket (int type_); 00070 void destroy_socket (class socket_base_t *socket_); 00071 00072 // Send command to the destination thread. 00073 void send_command (uint32_t tid_, const command_t &command_); 00074 00075 // Returns the I/O thread that is the least busy at the moment. 00076 // Affinity specifies which I/O threads are eligible (0 = all). 00077 // Returns NULL is no I/O thread is available. 00078 class io_thread_t *choose_io_thread (uint64_t affinity_); 00079 00080 // Returns reaper thread object. 00081 class object_t *get_reaper (); 00082 00083 // Management of inproc endpoints. 00084 int register_endpoint (const char *addr_, endpoint_t &endpoint_); 00085 void unregister_endpoints (class socket_base_t *socket_); 00086 endpoint_t find_endpoint (const char *addr_); 00087 00088 // Logging. 00089 void log (const char *format_, va_list args_); 00090 00091 enum { 00092 term_tid = 0, 00093 reaper_tid = 1 00094 }; 00095 00096 private: 00097 00098 ~ctx_t (); 00099 00100 // Used to check whether the object is a context. 00101 uint32_t tag; 00102 00103 // Sockets belonging to this context. We need the list so that 00104 // we can notify the sockets when zmq_term() is called. The sockets 00105 // will return ETERM then. 00106 typedef array_t <socket_base_t> sockets_t; 00107 sockets_t sockets; 00108 00109 // List of unused thread slots. 00110 typedef std::vector <uint32_t> emtpy_slots_t; 00111 emtpy_slots_t empty_slots; 00112 00113 // If true, zmq_term was already called. 00114 bool terminating; 00115 00116 // Synchronisation of accesses to global slot-related data: 00117 // sockets, empty_slots, terminating. It also synchronises 00118 // access to zombie sockets as such (as oposed to slots) and provides 00119 // a memory barrier to ensure that all CPU cores see the same data. 00120 mutex_t slot_sync; 00121 00122 // The reaper thread. 00123 class reaper_t *reaper; 00124 00125 // I/O threads. 00126 typedef std::vector <class io_thread_t*> io_threads_t; 00127 io_threads_t io_threads; 00128 00129 // Array of pointers to mailboxes for both application and I/O threads. 00130 uint32_t slot_count; 00131 mailbox_t **slots; 00132 00133 // Mailbox for zmq_term thread. 00134 mailbox_t term_mailbox; 00135 00136 // List of inproc endpoints within this context. 00137 typedef std::map <std::string, endpoint_t> endpoints_t; 00138 endpoints_t endpoints; 00139 00140 // Synchronisation of access to the list of inproc endpoints. 00141 mutex_t endpoints_sync; 00142 00143 // PUB socket for logging. The socket is shared among all the threads, 00144 // thus it is synchronised by a mutex. 00145 class socket_base_t *log_socket; 00146 mutex_t log_sync; 00147 00148 ctx_t (const ctx_t&); 00149 const ctx_t &operator = (const ctx_t&); 00150 }; 00151 00152 } 00153 00154 #endif 00155