libzmq master
The Intelligent Transport Layer

reaper.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2011 250bpm s.r.o.
00003     Copyright (c) 2011 Other contributors as noted in the AUTHORS file
00004 
00005     This file is part of 0MQ.
00006 
00007     0MQ is free software; you can redistribute it and/or modify it under
00008     the terms of the GNU Lesser General Public License as published by
00009     the Free Software Foundation; either version 3 of the License, or
00010     (at your option) any later version.
00011 
00012     0MQ is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015     GNU Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public License
00018     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00019 */
00020 
00021 #include "reaper.hpp"
00022 #include "socket_base.hpp"
00023 #include "err.hpp"
00024 
00025 zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
00026     object_t (ctx_, tid_),
00027     sockets (0),
00028     terminating (false)
00029 {
00030     poller = new (std::nothrow) poller_t;
00031     alloc_assert (poller);
00032 
00033     mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
00034     poller->set_pollin (mailbox_handle);
00035 }
00036 
00037 zmq::reaper_t::~reaper_t ()
00038 {
00039     delete poller;
00040 }
00041 
00042 zmq::mailbox_t *zmq::reaper_t::get_mailbox ()
00043 {
00044     return &mailbox;
00045 }
00046 
00047 void zmq::reaper_t::start ()
00048 {
00049     //  Start the thread.
00050     poller->start ();
00051 }
00052 
00053 void zmq::reaper_t::stop ()
00054 {
00055     send_stop ();
00056 }
00057 
00058 void zmq::reaper_t::in_event ()
00059 {
00060     while (true) {
00061 
00062         //  Get the next command. If there is none, exit.
00063         command_t cmd;
00064         int rc = mailbox.recv (&cmd, 0);
00065         if (rc != 0 && errno == EINTR)
00066             continue;
00067         if (rc != 0 && errno == EAGAIN)
00068             break;
00069         errno_assert (rc == 0);
00070 
00071         //  Process the command.
00072         cmd.destination->process_command (cmd);
00073     }
00074 }
00075 
00076 void zmq::reaper_t::out_event ()
00077 {
00078     zmq_assert (false);
00079 }
00080 
00081 void zmq::reaper_t::timer_event (int id_)
00082 {
00083     zmq_assert (false);
00084 }
00085 
00086 void zmq::reaper_t::process_stop ()
00087 {
00088     terminating = true;
00089 
00090     //  If there are no sockets being reaped finish immediately.
00091     if (!sockets) {
00092         send_done ();
00093         poller->rm_fd (mailbox_handle);
00094         poller->stop ();
00095     }
00096 }
00097 
00098 void zmq::reaper_t::process_reap (socket_base_t *socket_)
00099 {
00100     //  Add the socket to the poller.
00101     socket_->start_reaping (poller);
00102 
00103     ++sockets;
00104 }
00105 
00106 void zmq::reaper_t::process_reaped ()
00107 {
00108     --sockets;
00109 
00110     //  If reaped was already asked to terminate and there are no more sockets,
00111     //  finish immediately.
00112     if (!sockets && terminating) {
00113         send_done ();
00114         poller->rm_fd (mailbox_handle);
00115         poller->stop ();
00116     }
00117 }
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines