![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "poll.hpp" 00023 #if defined ZMQ_USE_POLL 00024 00025 #include <sys/types.h> 00026 #include <sys/time.h> 00027 #include <poll.h> 00028 #include <algorithm> 00029 00030 #include "poll.hpp" 00031 #include "err.hpp" 00032 #include "config.hpp" 00033 #include "i_poll_events.hpp" 00034 00035 zmq::poll_t::poll_t () : 00036 retired (false), 00037 stopping (false) 00038 { 00039 } 00040 00041 zmq::poll_t::~poll_t () 00042 { 00043 worker.stop (); 00044 } 00045 00046 zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) 00047 { 00048 // If the file descriptor table is too small expand it. 00049 fd_table_t::size_type sz = fd_table.size (); 00050 if (sz <= (fd_table_t::size_type) fd_) { 00051 fd_table.resize (fd_ + 1); 00052 while (sz != (fd_table_t::size_type) (fd_ + 1)) { 00053 fd_table [sz].index = retired_fd; 00054 ++sz; 00055 } 00056 } 00057 00058 pollfd pfd = {fd_, 0, 0}; 00059 pollset.push_back (pfd); 00060 assert (fd_table [fd_].index == retired_fd); 00061 00062 fd_table [fd_].index = pollset.size() - 1; 00063 fd_table [fd_].events = events_; 00064 00065 // Increase the load metric of the thread. 00066 adjust_load (1); 00067 00068 return fd_; 00069 } 00070 00071 void zmq::poll_t::rm_fd (handle_t handle_) 00072 { 00073 fd_t index = fd_table [handle_].index; 00074 assert (index != retired_fd); 00075 00076 // Mark the fd as unused. 00077 pollset [index].fd = retired_fd; 00078 fd_table [handle_].index = retired_fd; 00079 retired = true; 00080 00081 // Decrease the load metric of the thread. 00082 adjust_load (-1); 00083 } 00084 00085 void zmq::poll_t::set_pollin (handle_t handle_) 00086 { 00087 int index = fd_table [handle_].index; 00088 pollset [index].events |= POLLIN; 00089 } 00090 00091 void zmq::poll_t::reset_pollin (handle_t handle_) 00092 { 00093 int index = fd_table [handle_].index; 00094 pollset [index].events &= ~((short) POLLIN); 00095 } 00096 00097 void zmq::poll_t::set_pollout (handle_t handle_) 00098 { 00099 int index = fd_table [handle_].index; 00100 pollset [index].events |= POLLOUT; 00101 } 00102 00103 void zmq::poll_t::reset_pollout (handle_t handle_) 00104 { 00105 int index = fd_table [handle_].index; 00106 pollset [index].events &= ~((short) POLLOUT); 00107 } 00108 00109 void zmq::poll_t::start () 00110 { 00111 worker.start (worker_routine, this); 00112 } 00113 00114 void zmq::poll_t::stop () 00115 { 00116 stopping = true; 00117 } 00118 00119 void zmq::poll_t::loop () 00120 { 00121 while (!stopping) { 00122 00123 // Execute any due timers. 00124 int timeout = (int) execute_timers (); 00125 00126 // Wait for events. 00127 int rc = poll (&pollset [0], pollset.size (), timeout ? timeout : -1); 00128 if (rc == -1 && errno == EINTR) 00129 continue; 00130 errno_assert (rc != -1); 00131 00132 00133 // If there are no events (i.e. it's a timeout) there's no point 00134 // in checking the pollset. 00135 if (rc == 0) 00136 continue; 00137 00138 for (pollset_t::size_type i = 0; i != pollset.size (); i++) { 00139 00140 zmq_assert (!(pollset [i].revents & POLLNVAL)); 00141 if (pollset [i].fd == retired_fd) 00142 continue; 00143 if (pollset [i].revents & (POLLERR | POLLHUP)) 00144 fd_table [pollset [i].fd].events->in_event (); 00145 if (pollset [i].fd == retired_fd) 00146 continue; 00147 if (pollset [i].revents & POLLOUT) 00148 fd_table [pollset [i].fd].events->out_event (); 00149 if (pollset [i].fd == retired_fd) 00150 continue; 00151 if (pollset [i].revents & POLLIN) 00152 fd_table [pollset [i].fd].events->in_event (); 00153 } 00154 00155 // Clean up the pollset and update the fd_table accordingly. 00156 if (retired) { 00157 pollset_t::size_type i = 0; 00158 while (i < pollset.size ()) { 00159 if (pollset [i].fd == retired_fd) 00160 pollset.erase (pollset.begin () + i); 00161 else { 00162 fd_table [pollset [i].fd].index = i; 00163 i ++; 00164 } 00165 } 00166 retired = false; 00167 } 00168 } 00169 } 00170 00171 void zmq::poll_t::worker_routine (void *arg_) 00172 { 00173 ((poll_t*) arg_)->loop (); 00174 } 00175 00176 #endif