libzmq master
The Intelligent Transport Layer

ctx.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2011 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include "platform.hpp"
00023 #if defined ZMQ_HAVE_WINDOWS
00024 #include "windows.hpp"
00025 #else
00026 #include <unistd.h>
00027 #endif
00028 
00029 #include <new>
00030 #include <string.h>
00031 
00032 #include "ctx.hpp"
00033 #include "socket_base.hpp"
00034 #include "io_thread.hpp"
00035 #include "reaper.hpp"
00036 #include "pipe.hpp"
00037 #include "err.hpp"
00038 #include "msg.hpp"
00039 
00040 zmq::ctx_t::ctx_t (uint32_t io_threads_) :
00041     tag (0xbadcafe0),
00042     terminating (false)
00043 {
00044     int rc;
00045 
00046     //  Initialise the array of mailboxes. Additional three slots are for
00047     //  internal log socket and the zmq_term thread the reaper thread.
00048     slot_count = max_sockets + io_threads_ + 3;
00049     slots = (mailbox_t**) malloc (sizeof (mailbox_t*) * slot_count);
00050     alloc_assert (slots);
00051 
00052     //  Initialise the infrastructure for zmq_term thread.
00053     slots [term_tid] = &term_mailbox;
00054 
00055     //  Create the reaper thread.
00056     reaper = new (std::nothrow) reaper_t (this, reaper_tid);
00057     alloc_assert (reaper);
00058     slots [reaper_tid] = reaper->get_mailbox ();
00059     reaper->start ();
00060 
00061     //  Create I/O thread objects and launch them.
00062     for (uint32_t i = 2; i != io_threads_ + 2; i++) {
00063         io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
00064         alloc_assert (io_thread);
00065         io_threads.push_back (io_thread);
00066         slots [i] = io_thread->get_mailbox ();
00067         io_thread->start ();
00068     }
00069 
00070     //  In the unused part of the slot array, create a list of empty slots.
00071     for (int32_t i = (int32_t) slot_count - 1;
00072           i >= (int32_t) io_threads_ + 2; i--) {
00073         empty_slots.push_back (i);
00074         slots [i] = NULL;
00075     }
00076 
00077     //  Create the logging infrastructure.
00078     log_socket = create_socket (ZMQ_PUB);
00079     zmq_assert (log_socket);
00080     rc = log_socket->bind ("sys://log");
00081     zmq_assert (rc == 0);
00082 }
00083 
00084 bool zmq::ctx_t::check_tag ()
00085 {
00086     return tag == 0xbadcafe0;
00087 }
00088 
00089 zmq::ctx_t::~ctx_t ()
00090 {
00091     //  Check that there are no remaining sockets.
00092     zmq_assert (sockets.empty ());
00093 
00094     //  Ask I/O threads to terminate. If stop signal wasn't sent to I/O
00095     //  thread subsequent invocation of destructor would hang-up.
00096     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
00097         io_threads [i]->stop ();
00098 
00099     //  Wait till I/O threads actually terminate.
00100     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
00101         delete io_threads [i];
00102 
00103     //  Deallocate the reaper thread object.
00104     delete reaper;
00105 
00106     //  Deallocate the array of mailboxes. No special work is
00107     //  needed as mailboxes themselves were deallocated with their
00108     //  corresponding io_thread/socket objects.
00109     free (slots);
00110 
00111     //  Remove the tag, so that the object is considered dead.
00112     tag = 0xdeadbeef;
00113 }
00114 
00115 int zmq::ctx_t::terminate ()
00116 {
00117     //  Check whether termination was already underway, but interrupted and now
00118     //  restarted.
00119     slot_sync.lock ();
00120     bool restarted = terminating;
00121     slot_sync.unlock ();
00122 
00123     //  First attempt to terminate the context.
00124     if (!restarted) {
00125 
00126         //  Close the logging infrastructure.
00127         log_sync.lock ();
00128         int rc = log_socket->close ();
00129         zmq_assert (rc == 0);
00130         log_socket = NULL;
00131         log_sync.unlock ();
00132 
00133         //  First send stop command to sockets so that any blocking calls can be
00134         //  interrupted. If there are no sockets we can ask reaper thread to stop.
00135         slot_sync.lock ();
00136         terminating = true;
00137         for (sockets_t::size_type i = 0; i != sockets.size (); i++)
00138             sockets [i]->stop ();
00139         if (sockets.empty ())
00140             reaper->stop ();
00141         slot_sync.unlock ();
00142     }
00143 
00144     //  Wait till reaper thread closes all the sockets.
00145     command_t cmd;
00146     int rc = term_mailbox.recv (&cmd, -1);
00147     if (rc == -1 && errno == EINTR)
00148         return -1;
00149     zmq_assert (rc == 0);
00150     zmq_assert (cmd.type == command_t::done);
00151     slot_sync.lock ();
00152     zmq_assert (sockets.empty ());
00153     slot_sync.unlock ();
00154 
00155     //  Deallocate the resources.
00156     delete this;
00157 
00158     return 0;
00159 }
00160 
00161 zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
00162 {
00163     slot_sync.lock ();
00164 
00165     //  Once zmq_term() was called, we can't create new sockets.
00166     if (terminating) {
00167         slot_sync.unlock ();
00168         errno = ETERM;
00169         return NULL;
00170     }
00171 
00172     //  If max_sockets limit was reached, return error.
00173     if (empty_slots.empty ()) {
00174         slot_sync.unlock ();
00175         errno = EMFILE;
00176         return NULL;
00177     }
00178 
00179     //  Choose a slot for the socket.
00180     uint32_t slot = empty_slots.back ();
00181     empty_slots.pop_back ();
00182 
00183     //  Create the socket and register its mailbox.
00184     socket_base_t *s = socket_base_t::create (type_, this, slot);
00185     if (!s) {
00186         empty_slots.push_back (slot);
00187         slot_sync.unlock ();
00188         return NULL;
00189     }
00190     sockets.push_back (s);
00191     slots [slot] = s->get_mailbox ();
00192 
00193     slot_sync.unlock ();
00194 
00195     return s;
00196 }
00197 
00198 void zmq::ctx_t::destroy_socket (class socket_base_t *socket_)
00199 {
00200     slot_sync.lock ();
00201 
00202     //  Free the associared thread slot.
00203     uint32_t tid = socket_->get_tid ();
00204     empty_slots.push_back (tid);
00205     slots [tid] = NULL;    
00206 
00207     //  Remove the socket from the list of sockets.
00208     sockets.erase (socket_);
00209 
00210     //  If zmq_term() was already called and there are no more socket
00211     //  we can ask reaper thread to terminate.
00212     if (terminating && sockets.empty ())
00213         reaper->stop ();
00214 
00215     slot_sync.unlock ();
00216 }
00217 
00218 zmq::object_t *zmq::ctx_t::get_reaper ()
00219 {
00220     return reaper;
00221 }
00222 
00223 void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_)
00224 {
00225     slots [tid_]->send (command_);
00226 }
00227 
00228 zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
00229 {
00230     if (io_threads.empty ())
00231         return NULL;
00232 
00233     //  Find the I/O thread with minimum load.
00234     int min_load = -1;
00235     io_threads_t::size_type result = 0;
00236     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
00237         if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
00238             int load = io_threads [i]->get_load ();
00239             if (min_load == -1 || load < min_load) {
00240                 min_load = load;
00241                 result = i;
00242             }
00243         }
00244     }
00245     zmq_assert (min_load != -1);
00246     return io_threads [result];
00247 }
00248 
00249 int zmq::ctx_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
00250 {
00251     endpoints_sync.lock ();
00252 
00253     bool inserted = endpoints.insert (endpoints_t::value_type (
00254         std::string (addr_), endpoint_)).second;
00255     if (!inserted) {
00256         errno = EADDRINUSE;
00257         endpoints_sync.unlock ();
00258         return -1;
00259     }
00260 
00261     endpoints_sync.unlock ();
00262     return 0;
00263 }
00264 
00265 void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
00266 {
00267     endpoints_sync.lock ();
00268 
00269     endpoints_t::iterator it = endpoints.begin ();
00270     while (it != endpoints.end ()) {
00271         if (it->second.socket == socket_) {
00272             endpoints_t::iterator to_erase = it;
00273             ++it;
00274             endpoints.erase (to_erase);
00275             continue;
00276         }
00277         ++it;
00278     }
00279         
00280     endpoints_sync.unlock ();
00281 }
00282 
00283 zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
00284 {
00285      endpoints_sync.lock ();
00286 
00287      endpoints_t::iterator it = endpoints.find (addr_);
00288      if (it == endpoints.end ()) {
00289          endpoints_sync.unlock ();
00290          errno = ECONNREFUSED;
00291          endpoint_t empty = {NULL, options_t()};
00292          return empty;
00293      }
00294      endpoint_t *endpoint = &it->second;
00295 
00296      //  Increment the command sequence number of the peer so that it won't
00297      //  get deallocated until "bind" command is issued by the caller.
00298      //  The subsequent 'bind' has to be called with inc_seqnum parameter
00299      //  set to false, so that the seqnum isn't incremented twice.
00300      endpoint->socket->inc_seqnum ();
00301 
00302      endpoints_sync.unlock ();
00303      return *endpoint;
00304 }
00305 
00306 void zmq::ctx_t::log (const char *format_, va_list args_)
00307 {
00308     //  Create the log message.
00309     msg_t msg;
00310     int rc = msg.init_size (strlen (format_) + 1);
00311     errno_assert (rc == 0);
00312     memcpy (msg.data (), format_, msg.size ());
00313 
00314     //  At this  point we migrate the log socket to the current thread.
00315     //  We rely on mutex for executing the memory barrier.
00316     log_sync.lock ();
00317     if (log_socket)
00318         log_socket->send (&msg, 0);
00319     log_sync.unlock ();
00320 
00321     rc = msg.close ();
00322     errno_assert (rc == 0);
00323 }
00324 
00325 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines