![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "kqueue.hpp" 00023 #if defined ZMQ_USE_KQUEUE 00024 00025 #include <sys/time.h> 00026 #include <sys/types.h> 00027 #include <sys/event.h> 00028 #include <stdlib.h> 00029 #include <unistd.h> 00030 #include <algorithm> 00031 #include <new> 00032 00033 #include "kqueue.hpp" 00034 #include "err.hpp" 00035 #include "config.hpp" 00036 #include "i_poll_events.hpp" 00037 #include "likely.hpp" 00038 00039 // NetBSD defines (struct kevent).udata as intptr_t, everyone else 00040 // as void *. 00041 #if defined ZMQ_HAVE_NETBSD 00042 #define kevent_udata_t intptr_t 00043 #else 00044 #define kevent_udata_t void * 00045 #endif 00046 00047 zmq::kqueue_t::kqueue_t () : 00048 stopping (false) 00049 { 00050 // Create event queue 00051 kqueue_fd = kqueue (); 00052 errno_assert (kqueue_fd != -1); 00053 } 00054 00055 zmq::kqueue_t::~kqueue_t () 00056 { 00057 worker.stop (); 00058 close (kqueue_fd); 00059 } 00060 00061 void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_) 00062 { 00063 struct kevent ev; 00064 00065 EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t)udata_); 00066 int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); 00067 errno_assert (rc != -1); 00068 } 00069 00070 void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) 00071 { 00072 struct kevent ev; 00073 00074 EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0); 00075 int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); 00076 errno_assert (rc != -1); 00077 } 00078 00079 zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, 00080 i_poll_events *reactor_) 00081 { 00082 poll_entry_t *pe = new (std::nothrow) poll_entry_t; 00083 alloc_assert (pe); 00084 00085 pe->fd = fd_; 00086 pe->flag_pollin = 0; 00087 pe->flag_pollout = 0; 00088 pe->reactor = reactor_; 00089 00090 adjust_load (1); 00091 00092 return pe; 00093 } 00094 00095 void zmq::kqueue_t::rm_fd (handle_t handle_) 00096 { 00097 poll_entry_t *pe = (poll_entry_t*) handle_; 00098 if (pe->flag_pollin) 00099 kevent_delete (pe->fd, EVFILT_READ); 00100 if (pe->flag_pollout) 00101 kevent_delete (pe->fd, EVFILT_WRITE); 00102 pe->fd = retired_fd; 00103 retired.push_back (pe); 00104 00105 adjust_load (-1); 00106 } 00107 00108 void zmq::kqueue_t::set_pollin (handle_t handle_) 00109 { 00110 poll_entry_t *pe = (poll_entry_t*) handle_; 00111 if (likely (!pe->flag_pollin)) { 00112 pe->flag_pollin = true; 00113 kevent_add (pe->fd, EVFILT_READ, pe); 00114 } 00115 } 00116 00117 void zmq::kqueue_t::reset_pollin (handle_t handle_) 00118 { 00119 poll_entry_t *pe = (poll_entry_t*) handle_; 00120 if (likely (pe->flag_pollin)) { 00121 pe->flag_pollin = false; 00122 kevent_delete (pe->fd, EVFILT_READ); 00123 } 00124 } 00125 00126 void zmq::kqueue_t::set_pollout (handle_t handle_) 00127 { 00128 poll_entry_t *pe = (poll_entry_t*) handle_; 00129 if (likely (!pe->flag_pollout)) { 00130 pe->flag_pollout = true; 00131 kevent_add (pe->fd, EVFILT_WRITE, pe); 00132 } 00133 } 00134 00135 void zmq::kqueue_t::reset_pollout (handle_t handle_) 00136 { 00137 poll_entry_t *pe = (poll_entry_t*) handle_; 00138 if (likely (pe->flag_pollout)) { 00139 pe->flag_pollout = false; 00140 kevent_delete (pe->fd, EVFILT_WRITE); 00141 } 00142 } 00143 00144 void zmq::kqueue_t::start () 00145 { 00146 worker.start (worker_routine, this); 00147 } 00148 00149 void zmq::kqueue_t::stop () 00150 { 00151 stopping = true; 00152 } 00153 00154 void zmq::kqueue_t::loop () 00155 { 00156 while (!stopping) { 00157 00158 // Execute any due timers. 00159 int timeout = (int) execute_timers (); 00160 00161 // Wait for events. 00162 struct kevent ev_buf [max_io_events]; 00163 timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; 00164 int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, 00165 timeout ? &ts: NULL); 00166 if (n == -1 && errno == EINTR) 00167 continue; 00168 errno_assert (n != -1); 00169 00170 for (int i = 0; i < n; i ++) { 00171 poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata; 00172 00173 if (pe->fd == retired_fd) 00174 continue; 00175 if (ev_buf [i].flags & EV_EOF) 00176 pe->reactor->in_event (); 00177 if (pe->fd == retired_fd) 00178 continue; 00179 if (ev_buf [i].filter == EVFILT_WRITE) 00180 pe->reactor->out_event (); 00181 if (pe->fd == retired_fd) 00182 continue; 00183 if (ev_buf [i].filter == EVFILT_READ) 00184 pe->reactor->in_event (); 00185 } 00186 00187 // Destroy retired event sources. 00188 for (retired_t::iterator it = retired.begin (); it != retired.end (); 00189 ++it) 00190 delete *it; 00191 retired.clear (); 00192 } 00193 } 00194 00195 void zmq::kqueue_t::worker_routine (void *arg_) 00196 { 00197 ((kqueue_t*) arg_)->loop (); 00198 } 00199 00200 #endif