libzmq master
The Intelligent Transport Layer

kqueue.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include "kqueue.hpp"
00023 #if defined ZMQ_USE_KQUEUE
00024 
00025 #include <sys/time.h>
00026 #include <sys/types.h>
00027 #include <sys/event.h>
00028 #include <stdlib.h>
00029 #include <unistd.h>
00030 #include <algorithm>
00031 #include <new>
00032 
00033 #include "kqueue.hpp"
00034 #include "err.hpp"
00035 #include "config.hpp"
00036 #include "i_poll_events.hpp"
00037 #include "likely.hpp"
00038 
00039 //  NetBSD defines (struct kevent).udata as intptr_t, everyone else
00040 //  as void *.
00041 #if defined ZMQ_HAVE_NETBSD
00042 #define kevent_udata_t intptr_t
00043 #else
00044 #define kevent_udata_t void *
00045 #endif
00046 
00047 zmq::kqueue_t::kqueue_t () :
00048     stopping (false)
00049 {
00050     //  Create event queue
00051     kqueue_fd = kqueue ();
00052     errno_assert (kqueue_fd != -1);
00053 }
00054 
00055 zmq::kqueue_t::~kqueue_t ()
00056 {
00057     worker.stop ();
00058     close (kqueue_fd);
00059 }
00060 
00061 void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_)
00062 {
00063     struct kevent ev;
00064 
00065     EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t)udata_);
00066     int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
00067     errno_assert (rc != -1);
00068 }
00069 
00070 void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_)
00071 {
00072     struct kevent ev;
00073 
00074     EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0);
00075     int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL);
00076     errno_assert (rc != -1);
00077 }
00078 
00079 zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_,
00080     i_poll_events *reactor_)
00081 {
00082     poll_entry_t *pe = new (std::nothrow) poll_entry_t;
00083     alloc_assert (pe);
00084 
00085     pe->fd = fd_;
00086     pe->flag_pollin = 0;
00087     pe->flag_pollout = 0;
00088     pe->reactor = reactor_;
00089 
00090     adjust_load (1);
00091 
00092     return pe;
00093 }
00094 
00095 void zmq::kqueue_t::rm_fd (handle_t handle_)
00096 {
00097     poll_entry_t *pe = (poll_entry_t*) handle_;
00098     if (pe->flag_pollin)
00099         kevent_delete (pe->fd, EVFILT_READ);
00100     if (pe->flag_pollout)
00101         kevent_delete (pe->fd, EVFILT_WRITE);
00102     pe->fd = retired_fd;
00103     retired.push_back (pe);
00104 
00105     adjust_load (-1);
00106 }
00107 
00108 void zmq::kqueue_t::set_pollin (handle_t handle_)
00109 {
00110     poll_entry_t *pe = (poll_entry_t*) handle_;
00111     if (likely (!pe->flag_pollin)) {
00112         pe->flag_pollin = true;
00113         kevent_add (pe->fd, EVFILT_READ, pe);
00114     }
00115 }
00116 
00117 void zmq::kqueue_t::reset_pollin (handle_t handle_)
00118 {
00119     poll_entry_t *pe = (poll_entry_t*) handle_;
00120     if (likely (pe->flag_pollin)) {
00121         pe->flag_pollin = false;
00122         kevent_delete (pe->fd, EVFILT_READ);
00123     }
00124 }
00125 
00126 void zmq::kqueue_t::set_pollout (handle_t handle_)
00127 {
00128     poll_entry_t *pe = (poll_entry_t*) handle_;
00129     if (likely (!pe->flag_pollout)) {
00130         pe->flag_pollout = true;
00131         kevent_add (pe->fd, EVFILT_WRITE, pe);
00132     }
00133 }
00134 
00135 void zmq::kqueue_t::reset_pollout (handle_t handle_)
00136 {
00137     poll_entry_t *pe = (poll_entry_t*) handle_;
00138     if (likely (pe->flag_pollout)) {
00139         pe->flag_pollout = false;
00140         kevent_delete (pe->fd, EVFILT_WRITE);
00141    }
00142 }
00143 
00144 void zmq::kqueue_t::start ()
00145 {
00146     worker.start (worker_routine, this);
00147 }
00148 
00149 void zmq::kqueue_t::stop ()
00150 {
00151     stopping = true;
00152 }
00153 
00154 void zmq::kqueue_t::loop ()
00155 {
00156     while (!stopping) {
00157 
00158         //  Execute any due timers.
00159         int timeout = (int) execute_timers ();
00160 
00161         //  Wait for events.
00162         struct kevent ev_buf [max_io_events];
00163         timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
00164         int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
00165             timeout ? &ts: NULL);
00166         if (n == -1 && errno == EINTR)
00167             continue;
00168         errno_assert (n != -1);
00169 
00170         for (int i = 0; i < n; i ++) {
00171             poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;
00172 
00173             if (pe->fd == retired_fd)
00174                 continue;
00175             if (ev_buf [i].flags & EV_EOF)
00176                 pe->reactor->in_event ();
00177             if (pe->fd == retired_fd)
00178                 continue;
00179             if (ev_buf [i].filter == EVFILT_WRITE)
00180                 pe->reactor->out_event ();
00181             if (pe->fd == retired_fd)
00182                 continue;
00183             if (ev_buf [i].filter == EVFILT_READ)
00184                 pe->reactor->in_event ();
00185         }
00186 
00187         //  Destroy retired event sources.
00188         for (retired_t::iterator it = retired.begin (); it != retired.end ();
00189               ++it)
00190             delete *it;
00191         retired.clear ();
00192     }
00193 }
00194 
00195 void zmq::kqueue_t::worker_routine (void *arg_)
00196 {
00197     ((kqueue_t*) arg_)->loop ();
00198 }
00199 
00200 #endif
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines