![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2011 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "platform.hpp" 00023 #if defined ZMQ_HAVE_WINDOWS 00024 #include "windows.hpp" 00025 #else 00026 #include <unistd.h> 00027 #endif 00028 00029 #include <new> 00030 #include <string.h> 00031 00032 #include "ctx.hpp" 00033 #include "socket_base.hpp" 00034 #include "io_thread.hpp" 00035 #include "reaper.hpp" 00036 #include "pipe.hpp" 00037 #include "err.hpp" 00038 #include "msg.hpp" 00039 00040 zmq::ctx_t::ctx_t (uint32_t io_threads_) : 00041 tag (0xbadcafe0), 00042 terminating (false) 00043 { 00044 int rc; 00045 00046 // Initialise the array of mailboxes. Additional three slots are for 00047 // internal log socket and the zmq_term thread the reaper thread. 00048 slot_count = max_sockets + io_threads_ + 3; 00049 slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count); 00050 alloc_assert (slots); 00051 00052 // Initialise the infrastructure for zmq_term thread. 00053 slots [term_tid] = &term_mailbox; 00054 00055 // Create the reaper thread. 00056 reaper = new (std::nothrow) reaper_t (this, reaper_tid); 00057 alloc_assert (reaper); 00058 slots [reaper_tid] = reaper->get_mailbox (); 00059 reaper->start (); 00060 00061 // Create I/O thread objects and launch them. 00062 for (uint32_t i = 2; i != io_threads_ + 2; i++) { 00063 io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i); 00064 alloc_assert (io_thread); 00065 io_threads.push_back (io_thread); 00066 slots [i] = io_thread->get_mailbox (); 00067 io_thread->start (); 00068 } 00069 00070 // In the unused part of the slot array, create a list of empty slots. 00071 for (int32_t i = (int32_t) slot_count - 1; 00072 i >= (int32_t) io_threads_ + 2; i--) { 00073 empty_slots.push_back (i); 00074 slots [i] = NULL; 00075 } 00076 00077 // Create the logging infrastructure. 00078 log_socket = create_socket (ZMQ_PUB); 00079 zmq_assert (log_socket); 00080 rc = log_socket->bind ("sys://log"); 00081 zmq_assert (rc == 0); 00082 } 00083 00084 bool zmq::ctx_t::check_tag () 00085 { 00086 return tag == 0xbadcafe0; 00087 } 00088 00089 zmq::ctx_t::~ctx_t () 00090 { 00091 // Check that there are no remaining sockets. 00092 zmq_assert (sockets.empty ()); 00093 00094 // Ask I/O threads to terminate. If stop signal wasn't sent to I/O 00095 // thread subsequent invocation of destructor would hang-up. 00096 for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) 00097 io_threads [i]->stop (); 00098 00099 // Wait till I/O threads actually terminate. 00100 for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) 00101 delete io_threads [i]; 00102 00103 // Deallocate the reaper thread object. 00104 delete reaper; 00105 00106 // Deallocate the array of mailboxes. No special work is 00107 // needed as mailboxes themselves were deallocated with their 00108 // corresponding io_thread/socket objects. 00109 free (slots); 00110 00111 // Remove the tag, so that the object is considered dead. 00112 tag = 0xdeadbeef; 00113 } 00114 00115 int zmq::ctx_t::terminate () 00116 { 00117 // Check whether termination was already underway, but interrupted and now 00118 // restarted. 00119 slot_sync.lock (); 00120 bool restarted = terminating; 00121 slot_sync.unlock (); 00122 00123 // First attempt to terminate the context. 00124 if (!restarted) { 00125 00126 // Close the logging infrastructure. 00127 log_sync.lock (); 00128 int rc = log_socket->close (); 00129 zmq_assert (rc == 0); 00130 log_socket = NULL; 00131 log_sync.unlock (); 00132 00133 // First send stop command to sockets so that any blocking calls can be 00134 // interrupted. If there are no sockets we can ask reaper thread to stop. 00135 slot_sync.lock (); 00136 terminating = true; 00137 for (sockets_t::size_type i = 0; i != sockets.size (); i++) 00138 sockets [i]->stop (); 00139 if (sockets.empty ()) 00140 reaper->stop (); 00141 slot_sync.unlock (); 00142 } 00143 00144 // Wait till reaper thread closes all the sockets. 00145 command_t cmd; 00146 int rc = term_mailbox.recv (&cmd, -1); 00147 if (rc == -1 && errno == EINTR) 00148 return -1; 00149 zmq_assert (rc == 0); 00150 zmq_assert (cmd.type == command_t::done); 00151 slot_sync.lock (); 00152 zmq_assert (sockets.empty ()); 00153 slot_sync.unlock (); 00154 00155 // Deallocate the resources. 00156 delete this; 00157 00158 return 0; 00159 } 00160 00161 zmq::socket_base_t *zmq::ctx_t::create_socket (int type_) 00162 { 00163 slot_sync.lock (); 00164 00165 // Once zmq_term() was called, we can't create new sockets. 00166 if (terminating) { 00167 slot_sync.unlock (); 00168 errno = ETERM; 00169 return NULL; 00170 } 00171 00172 // If max_sockets limit was reached, return error. 00173 if (empty_slots.empty ()) { 00174 slot_sync.unlock (); 00175 errno = EMFILE; 00176 return NULL; 00177 } 00178 00179 // Choose a slot for the socket. 00180 uint32_t slot = empty_slots.back (); 00181 empty_slots.pop_back (); 00182 00183 // Create the socket and register its mailbox. 00184 socket_base_t *s = socket_base_t::create (type_, this, slot); 00185 if (!s) { 00186 empty_slots.push_back (slot); 00187 slot_sync.unlock (); 00188 return NULL; 00189 } 00190 sockets.push_back (s); 00191 slots [slot] = s->get_mailbox (); 00192 00193 slot_sync.unlock (); 00194 00195 return s; 00196 } 00197 00198 void zmq::ctx_t::destroy_socket (class socket_base_t *socket_) 00199 { 00200 slot_sync.lock (); 00201 00202 // Free the associared thread slot. 00203 uint32_t tid = socket_->get_tid (); 00204 empty_slots.push_back (tid); 00205 slots [tid] = NULL; 00206 00207 // Remove the socket from the list of sockets. 00208 sockets.erase (socket_); 00209 00210 // If zmq_term() was already called and there are no more socket 00211 // we can ask reaper thread to terminate. 00212 if (terminating && sockets.empty ()) 00213 reaper->stop (); 00214 00215 slot_sync.unlock (); 00216 } 00217 00218 zmq::object_t *zmq::ctx_t::get_reaper () 00219 { 00220 return reaper; 00221 } 00222 00223 void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) 00224 { 00225 slots [tid_]->send (command_); 00226 } 00227 00228 zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) 00229 { 00230 if (io_threads.empty ()) 00231 return NULL; 00232 00233 // Find the I/O thread with minimum load. 00234 int min_load = -1; 00235 io_threads_t::size_type result = 0; 00236 for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) { 00237 if (!affinity_ || (affinity_ & (uint64_t (1) << i))) { 00238 int load = io_threads [i]->get_load (); 00239 if (min_load == -1 || load < min_load) { 00240 min_load = load; 00241 result = i; 00242 } 00243 } 00244 } 00245 zmq_assert (min_load != -1); 00246 return io_threads [result]; 00247 } 00248 00249 int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) 00250 { 00251 endpoints_sync.lock (); 00252 00253 bool inserted = endpoints.insert (endpoints_t::value_type ( 00254 std::string (addr_), endpoint_)).second; 00255 if (!inserted) { 00256 errno = EADDRINUSE; 00257 endpoints_sync.unlock (); 00258 return -1; 00259 } 00260 00261 endpoints_sync.unlock (); 00262 return 0; 00263 } 00264 00265 void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) 00266 { 00267 endpoints_sync.lock (); 00268 00269 endpoints_t::iterator it = endpoints.begin (); 00270 while (it != endpoints.end ()) { 00271 if (it->second.socket == socket_) { 00272 endpoints_t::iterator to_erase = it; 00273 ++it; 00274 endpoints.erase (to_erase); 00275 continue; 00276 } 00277 ++it; 00278 } 00279 00280 endpoints_sync.unlock (); 00281 } 00282 00283 zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) 00284 { 00285 endpoints_sync.lock (); 00286 00287 endpoints_t::iterator it = endpoints.find (addr_); 00288 if (it == endpoints.end ()) { 00289 endpoints_sync.unlock (); 00290 errno = ECONNREFUSED; 00291 endpoint_t empty = {NULL, options_t()}; 00292 return empty; 00293 } 00294 endpoint_t *endpoint = &it->second; 00295 00296 // Increment the command sequence number of the peer so that it won't 00297 // get deallocated until "bind" command is issued by the caller. 00298 // The subsequent 'bind' has to be called with inc_seqnum parameter 00299 // set to false, so that the seqnum isn't incremented twice. 00300 endpoint->socket->inc_seqnum (); 00301 00302 endpoints_sync.unlock (); 00303 return *endpoint; 00304 } 00305 00306 void zmq::ctx_t::log (const char *format_, va_list args_) 00307 { 00308 // Create the log message. 00309 msg_t msg; 00310 int rc = msg.init_size (strlen (format_) + 1); 00311 errno_assert (rc == 0); 00312 memcpy (msg.data (), format_, msg.size ()); 00313 00314 // At this point we migrate the log socket to the current thread. 00315 // We rely on mutex for executing the memory barrier. 00316 log_sync.lock (); 00317 if (log_socket) 00318 log_socket->send (&msg, 0); 00319 log_sync.unlock (); 00320 00321 rc = msg.close (); 00322 errno_assert (rc == 0); 00323 } 00324 00325