![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2011 250bpm s.r.o. 00003 Copyright (c) 2011 Other contributors as noted in the AUTHORS file 00004 00005 This file is part of 0MQ. 00006 00007 0MQ is free software; you can redistribute it and/or modify it under 00008 the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 0MQ is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00021 #include "reaper.hpp" 00022 #include "socket_base.hpp" 00023 #include "err.hpp" 00024 00025 zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) : 00026 object_t (ctx_, tid_), 00027 sockets (0), 00028 terminating (false) 00029 { 00030 poller = new (std::nothrow) poller_t; 00031 alloc_assert (poller); 00032 00033 mailbox_handle = poller->add_fd (mailbox.get_fd (), this); 00034 poller->set_pollin (mailbox_handle); 00035 } 00036 00037 zmq::reaper_t::~reaper_t () 00038 { 00039 delete poller; 00040 } 00041 00042 zmq::mailbox_t *zmq::reaper_t::get_mailbox () 00043 { 00044 return &mailbox; 00045 } 00046 00047 void zmq::reaper_t::start () 00048 { 00049 // Start the thread. 00050 poller->start (); 00051 } 00052 00053 void zmq::reaper_t::stop () 00054 { 00055 send_stop (); 00056 } 00057 00058 void zmq::reaper_t::in_event () 00059 { 00060 while (true) { 00061 00062 // Get the next command. If there is none, exit. 00063 command_t cmd; 00064 int rc = mailbox.recv (&cmd, 0); 00065 if (rc != 0 && errno == EINTR) 00066 continue; 00067 if (rc != 0 && errno == EAGAIN) 00068 break; 00069 errno_assert (rc == 0); 00070 00071 // Process the command. 00072 cmd.destination->process_command (cmd); 00073 } 00074 } 00075 00076 void zmq::reaper_t::out_event () 00077 { 00078 zmq_assert (false); 00079 } 00080 00081 void zmq::reaper_t::timer_event (int id_) 00082 { 00083 zmq_assert (false); 00084 } 00085 00086 void zmq::reaper_t::process_stop () 00087 { 00088 terminating = true; 00089 00090 // If there are no sockets being reaped finish immediately. 00091 if (!sockets) { 00092 send_done (); 00093 poller->rm_fd (mailbox_handle); 00094 poller->stop (); 00095 } 00096 } 00097 00098 void zmq::reaper_t::process_reap (socket_base_t *socket_) 00099 { 00100 // Add the socket to the poller. 00101 socket_->start_reaping (poller); 00102 00103 ++sockets; 00104 } 00105 00106 void zmq::reaper_t::process_reaped () 00107 { 00108 --sockets; 00109 00110 // If reaped was already asked to terminate and there are no more sockets, 00111 // finish immediately. 00112 if (!sockets && terminating) { 00113 send_done (); 00114 poller->rm_fd (mailbox_handle); 00115 poller->stop (); 00116 } 00117 }