![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "epoll.hpp" 00023 #if defined ZMQ_USE_EPOLL 00024 00025 #include <sys/epoll.h> 00026 #include <stdlib.h> 00027 #include <string.h> 00028 #include <unistd.h> 00029 #include <algorithm> 00030 #include <new> 00031 00032 #include "epoll.hpp" 00033 #include "err.hpp" 00034 #include "config.hpp" 00035 #include "i_poll_events.hpp" 00036 00037 zmq::epoll_t::epoll_t () : 00038 stopping (false) 00039 { 00040 epoll_fd = epoll_create (1); 00041 errno_assert (epoll_fd != -1); 00042 } 00043 00044 zmq::epoll_t::~epoll_t () 00045 { 00046 // Wait till the worker thread exits. 00047 worker.stop (); 00048 00049 close (epoll_fd); 00050 for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) 00051 delete *it; 00052 } 00053 00054 zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) 00055 { 00056 poll_entry_t *pe = new (std::nothrow) poll_entry_t; 00057 alloc_assert (pe); 00058 00059 // The memset is not actually needed. It's here to prevent debugging 00060 // tools to complain about using uninitialised memory. 00061 memset (pe, 0, sizeof (poll_entry_t)); 00062 00063 pe->fd = fd_; 00064 pe->ev.events = 0; 00065 pe->ev.data.ptr = pe; 00066 pe->events = events_; 00067 00068 int rc = epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev); 00069 errno_assert (rc != -1); 00070 00071 // Increase the load metric of the thread. 00072 adjust_load (1); 00073 00074 return pe; 00075 } 00076 00077 void zmq::epoll_t::rm_fd (handle_t handle_) 00078 { 00079 poll_entry_t *pe = (poll_entry_t*) handle_; 00080 int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); 00081 errno_assert (rc != -1); 00082 pe->fd = retired_fd; 00083 retired.push_back (pe); 00084 00085 // Decrease the load metric of the thread. 00086 adjust_load (-1); 00087 } 00088 00089 void zmq::epoll_t::set_pollin (handle_t handle_) 00090 { 00091 poll_entry_t *pe = (poll_entry_t*) handle_; 00092 pe->ev.events |= EPOLLIN; 00093 int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); 00094 errno_assert (rc != -1); 00095 } 00096 00097 void zmq::epoll_t::reset_pollin (handle_t handle_) 00098 { 00099 poll_entry_t *pe = (poll_entry_t*) handle_; 00100 pe->ev.events &= ~((short) EPOLLIN); 00101 int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); 00102 errno_assert (rc != -1); 00103 } 00104 00105 void zmq::epoll_t::set_pollout (handle_t handle_) 00106 { 00107 poll_entry_t *pe = (poll_entry_t*) handle_; 00108 pe->ev.events |= EPOLLOUT; 00109 int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); 00110 errno_assert (rc != -1); 00111 } 00112 00113 void zmq::epoll_t::reset_pollout (handle_t handle_) 00114 { 00115 poll_entry_t *pe = (poll_entry_t*) handle_; 00116 pe->ev.events &= ~((short) EPOLLOUT); 00117 int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); 00118 errno_assert (rc != -1); 00119 } 00120 00121 void zmq::epoll_t::start () 00122 { 00123 worker.start (worker_routine, this); 00124 } 00125 00126 void zmq::epoll_t::stop () 00127 { 00128 stopping = true; 00129 } 00130 00131 void zmq::epoll_t::loop () 00132 { 00133 epoll_event ev_buf [max_io_events]; 00134 00135 while (!stopping) { 00136 00137 // Execute any due timers. 00138 int timeout = (int) execute_timers (); 00139 00140 // Wait for events. 00141 int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, 00142 timeout ? timeout : -1); 00143 if (n == -1 && errno == EINTR) 00144 continue; 00145 errno_assert (n != -1); 00146 00147 for (int i = 0; i < n; i ++) { 00148 poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr); 00149 00150 if (pe->fd == retired_fd) 00151 continue; 00152 if (ev_buf [i].events & (EPOLLERR | EPOLLHUP)) 00153 pe->events->in_event (); 00154 if (pe->fd == retired_fd) 00155 continue; 00156 if (ev_buf [i].events & EPOLLOUT) 00157 pe->events->out_event (); 00158 if (pe->fd == retired_fd) 00159 continue; 00160 if (ev_buf [i].events & EPOLLIN) 00161 pe->events->in_event (); 00162 } 00163 00164 // Destroy retired event sources. 00165 for (retired_t::iterator it = retired.begin (); it != retired.end (); 00166 ++it) 00167 delete *it; 00168 retired.clear (); 00169 } 00170 } 00171 00172 void zmq::epoll_t::worker_routine (void *arg_) 00173 { 00174 ((epoll_t*) arg_)->loop (); 00175 } 00176 00177 #endif