libzmq master
The Intelligent Transport Layer

io_thread.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include <new>
00023 
00024 #include "io_thread.hpp"
00025 #include "platform.hpp"
00026 #include "err.hpp"
00027 #include "ctx.hpp"
00028 
00029 zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) :
00030     object_t (ctx_, tid_)
00031 {
00032     poller = new (std::nothrow) poller_t;
00033     alloc_assert (poller);
00034 
00035     mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
00036     poller->set_pollin (mailbox_handle);
00037 }
00038 
00039 zmq::io_thread_t::~io_thread_t ()
00040 {
00041     delete poller;
00042 }
00043 
00044 void zmq::io_thread_t::start ()
00045 {
00046     //  Start the underlying I/O thread.
00047     poller->start ();
00048 }
00049 
00050 void zmq::io_thread_t::stop ()
00051 {
00052     send_stop ();
00053 }
00054 
00055 zmq::mailbox_t *zmq::io_thread_t::get_mailbox ()
00056 {
00057     return &mailbox;
00058 }
00059 
00060 int zmq::io_thread_t::get_load ()
00061 {
00062     return poller->get_load ();
00063 }
00064 
00065 void zmq::io_thread_t::in_event ()
00066 {
00067     //  TODO: Do we want to limit number of commands I/O thread can
00068     //  process in a single go?
00069 
00070     while (true) {
00071 
00072         //  Get the next command. If there is none, exit.
00073         command_t cmd;
00074         int rc = mailbox.recv (&cmd, 0);
00075         if (rc != 0 && errno == EINTR)
00076             continue;
00077         if (rc != 0 && errno == EAGAIN)
00078             break;
00079         errno_assert (rc == 0);
00080 
00081         //  Process the command.
00082         cmd.destination->process_command (cmd);
00083     }
00084 }
00085 
00086 void zmq::io_thread_t::out_event ()
00087 {
00088     //  We are never polling for POLLOUT here. This function is never called.
00089     zmq_assert (false);
00090 }
00091 
00092 void zmq::io_thread_t::timer_event (int id_)
00093 {
00094     //  No timers here. This function is never called.
00095     zmq_assert (false);
00096 }
00097 
00098 zmq::poller_t *zmq::io_thread_t::get_poller ()
00099 {
00100     zmq_assert (poller);
00101     return poller;
00102 }
00103 
00104 void zmq::io_thread_t::process_stop ()
00105 {
00106     poller->rm_fd (mailbox_handle);
00107     poller->stop ();
00108 }
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines