![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "devpoll.hpp" 00023 #if defined ZMQ_USE_DEVPOLL 00024 00025 #include <sys/devpoll.h> 00026 #include <sys/time.h> 00027 #include <sys/types.h> 00028 #include <sys/stat.h> 00029 #include <sys/ioctl.h> 00030 #include <fcntl.h> 00031 #include <unistd.h> 00032 #include <limits.h> 00033 #include <algorithm> 00034 00035 #include "devpoll.hpp" 00036 #include "err.hpp" 00037 #include "config.hpp" 00038 #include "i_poll_events.hpp" 00039 00040 zmq::devpoll_t::devpoll_t () : 00041 stopping (false) 00042 { 00043 devpoll_fd = open ("/dev/poll", O_RDWR); 00044 errno_assert (devpoll_fd != -1); 00045 } 00046 00047 zmq::devpoll_t::~devpoll_t () 00048 { 00049 worker.stop (); 00050 close (devpoll_fd); 00051 } 00052 00053 void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) 00054 { 00055 struct pollfd pfd = {fd_, events_, 0}; 00056 ssize_t rc = write (devpoll_fd, &pfd, sizeof pfd); 00057 zmq_assert (rc == sizeof pfd); 00058 } 00059 00060 zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, 00061 i_poll_events *reactor_) 00062 { 00063 // If the file descriptor table is too small expand it. 00064 fd_table_t::size_type sz = fd_table.size (); 00065 if (sz <= (fd_table_t::size_type) fd_) { 00066 fd_table.resize (fd_ + 1); 00067 while (sz != (fd_table_t::size_type) (fd_ + 1)) { 00068 fd_table [sz].valid = false; 00069 ++sz; 00070 } 00071 } 00072 00073 assert (!fd_table [fd_].valid); 00074 00075 fd_table [fd_].events = 0; 00076 fd_table [fd_].reactor = reactor_; 00077 fd_table [fd_].valid = true; 00078 fd_table [fd_].accepted = false; 00079 00080 devpoll_ctl (fd_, 0); 00081 pending_list.push_back (fd_); 00082 00083 // Increase the load metric of the thread. 00084 adjust_load (1); 00085 00086 return fd_; 00087 } 00088 00089 void zmq::devpoll_t::rm_fd (handle_t handle_) 00090 { 00091 assert (fd_table [handle_].valid); 00092 00093 devpoll_ctl (handle_, POLLREMOVE); 00094 fd_table [handle_].valid = false; 00095 00096 // Decrease the load metric of the thread. 00097 adjust_load (-1); 00098 } 00099 00100 void zmq::devpoll_t::set_pollin (handle_t handle_) 00101 { 00102 devpoll_ctl (handle_, POLLREMOVE); 00103 fd_table [handle_].events |= POLLIN; 00104 devpoll_ctl (handle_, fd_table [handle_].events); 00105 } 00106 00107 void zmq::devpoll_t::reset_pollin (handle_t handle_) 00108 { 00109 devpoll_ctl (handle_, POLLREMOVE); 00110 fd_table [handle_].events &= ~((short) POLLIN); 00111 devpoll_ctl (handle_, fd_table [handle_].events); 00112 } 00113 00114 void zmq::devpoll_t::set_pollout (handle_t handle_) 00115 { 00116 devpoll_ctl (handle_, POLLREMOVE); 00117 fd_table [handle_].events |= POLLOUT; 00118 devpoll_ctl (handle_, fd_table [handle_].events); 00119 } 00120 00121 void zmq::devpoll_t::reset_pollout (handle_t handle_) 00122 { 00123 devpoll_ctl (handle_, POLLREMOVE); 00124 fd_table [handle_].events &= ~((short) POLLOUT); 00125 devpoll_ctl (handle_, fd_table [handle_].events); 00126 } 00127 00128 void zmq::devpoll_t::start () 00129 { 00130 worker.start (worker_routine, this); 00131 } 00132 00133 void zmq::devpoll_t::stop () 00134 { 00135 stopping = true; 00136 } 00137 00138 void zmq::devpoll_t::loop () 00139 { 00140 while (!stopping) { 00141 00142 struct pollfd ev_buf [max_io_events]; 00143 struct dvpoll poll_req; 00144 00145 for (pending_list_t::size_type i = 0; i < pending_list.size (); i ++) 00146 fd_table [pending_list [i]].accepted = true; 00147 pending_list.clear (); 00148 00149 // Execute any due timers. 00150 int timeout = (int) execute_timers (); 00151 00152 // Wait for events. 00153 // On Solaris, we can retrieve no more then (OPEN_MAX - 1) events. 00154 poll_req.dp_fds = &ev_buf [0]; 00155 #if defined ZMQ_HAVE_SOLARIS 00156 poll_req.dp_nfds = std::min ((int) max_io_events, OPEN_MAX - 1); 00157 #else 00158 poll_req.dp_nfds = max_io_events; 00159 #endif 00160 poll_req.dp_timeout = timeout ? timeout : -1; 00161 int n = ioctl (devpoll_fd, DP_POLL, &poll_req); 00162 if (n == -1 && errno == EINTR) 00163 continue; 00164 errno_assert (n != -1); 00165 00166 for (int i = 0; i < n; i ++) { 00167 00168 fd_entry_t *fd_ptr = &fd_table [ev_buf [i].fd]; 00169 if (!fd_ptr->valid || !fd_ptr->accepted) 00170 continue; 00171 if (ev_buf [i].revents & (POLLERR | POLLHUP)) 00172 fd_ptr->reactor->in_event (); 00173 if (!fd_ptr->valid || !fd_ptr->accepted) 00174 continue; 00175 if (ev_buf [i].revents & POLLOUT) 00176 fd_ptr->reactor->out_event (); 00177 if (!fd_ptr->valid || !fd_ptr->accepted) 00178 continue; 00179 if (ev_buf [i].revents & POLLIN) 00180 fd_ptr->reactor->in_event (); 00181 } 00182 } 00183 } 00184 00185 void zmq::devpoll_t::worker_routine (void *arg_) 00186 { 00187 ((devpoll_t*) arg_)->loop (); 00188 } 00189 00190 #endif