![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <new> 00023 00024 #include "io_thread.hpp" 00025 #include "platform.hpp" 00026 #include "err.hpp" 00027 #include "ctx.hpp" 00028 00029 zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : 00030 object_t (ctx_, tid_) 00031 { 00032 poller = new (std::nothrow) poller_t; 00033 alloc_assert (poller); 00034 00035 mailbox_handle = poller->add_fd (mailbox.get_fd (), this); 00036 poller->set_pollin (mailbox_handle); 00037 } 00038 00039 zmq::io_thread_t::~io_thread_t () 00040 { 00041 delete poller; 00042 } 00043 00044 void zmq::io_thread_t::start () 00045 { 00046 // Start the underlying I/O thread. 00047 poller->start (); 00048 } 00049 00050 void zmq::io_thread_t::stop () 00051 { 00052 send_stop (); 00053 } 00054 00055 zmq::mailbox_t *zmq::io_thread_t::get_mailbox () 00056 { 00057 return &mailbox; 00058 } 00059 00060 int zmq::io_thread_t::get_load () 00061 { 00062 return poller->get_load (); 00063 } 00064 00065 void zmq::io_thread_t::in_event () 00066 { 00067 // TODO: Do we want to limit number of commands I/O thread can 00068 // process in a single go? 00069 00070 while (true) { 00071 00072 // Get the next command. If there is none, exit. 00073 command_t cmd; 00074 int rc = mailbox.recv (&cmd, 0); 00075 if (rc != 0 && errno == EINTR) 00076 continue; 00077 if (rc != 0 && errno == EAGAIN) 00078 break; 00079 errno_assert (rc == 0); 00080 00081 // Process the command. 00082 cmd.destination->process_command (cmd); 00083 } 00084 } 00085 00086 void zmq::io_thread_t::out_event () 00087 { 00088 // We are never polling for POLLOUT here. This function is never called. 00089 zmq_assert (false); 00090 } 00091 00092 void zmq::io_thread_t::timer_event (int id_) 00093 { 00094 // No timers here. This function is never called. 00095 zmq_assert (false); 00096 } 00097 00098 zmq::poller_t *zmq::io_thread_t::get_poller () 00099 { 00100 zmq_assert (poller); 00101 return poller; 00102 } 00103 00104 void zmq::io_thread_t::process_stop () 00105 { 00106 poller->rm_fd (mailbox_handle); 00107 poller->stop (); 00108 }