libzmq master
The Intelligent Transport Layer

signaler.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2010-2011 250bpm s.r.o.
00003     Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
00004 
00005     This file is part of 0MQ.
00006 
00007     0MQ is free software; you can redistribute it and/or modify it under
00008     the terms of the GNU Lesser General Public License as published by
00009     the Free Software Foundation; either version 3 of the License, or
00010     (at your option) any later version.
00011 
00012     0MQ is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015     GNU Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public License
00018     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00019 */
00020 
00021 #include "platform.hpp"
00022 
00023 #if defined ZMQ_FORCE_SELECT
00024 #define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
00025 #elif defined ZMQ_FORCE_POLL
00026 #define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
00027 #elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
00028     defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
00029     defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
00030     defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
00031     defined ZMQ_HAVE_NETBSD
00032 #define ZMQ_SIGNALER_WAIT_BASED_ON_POLL
00033 #elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\
00034     defined ZMQ_HAVE_CYGWIN
00035 #define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
00036 #endif
00037 
00038 //  On AIX, poll.h has to be included before zmq.h to get consistent
00039 //  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
00040 //  instead of 'events' and 'revents' and defines macros to map from POSIX-y
00041 //  names to AIX-specific names).
00042 #if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
00043 #include <poll.h>
00044 #elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
00045 #if defined ZMQ_HAVE_WINDOWS
00046 #include "windows.hpp"
00047 #elif defined ZMQ_HAVE_HPUX
00048 #include <sys/param.h>
00049 #include <sys/types.h>
00050 #include <sys/time.h>
00051 #elif defined ZMQ_HAVE_OPENVMS
00052 #include <sys/types.h>
00053 #include <sys/time.h>
00054 #else
00055 #include <sys/select.h>
00056 #endif
00057 #endif
00058 
00059 #include "signaler.hpp"
00060 #include "likely.hpp"
00061 #include "stdint.hpp"
00062 #include "config.hpp"
00063 #include "err.hpp"
00064 #include "fd.hpp"
00065 #include "ip.hpp"
00066 
00067 #if defined ZMQ_HAVE_EVENTFD
00068 #include <sys/eventfd.h>
00069 #endif
00070 
00071 #if defined ZMQ_HAVE_WINDOWS
00072 #include "windows.hpp"
00073 #else
00074 #include <unistd.h>
00075 #include <netinet/tcp.h>
00076 #include <unistd.h>
00077 #include <sys/types.h>
00078 #include <sys/socket.h>
00079 #endif
00080 
00081 zmq::signaler_t::signaler_t ()
00082 {
00083     //  Create the socketpair for signaling.
00084     int rc = make_fdpair (&r, &w);
00085     errno_assert (rc == 0);
00086 
00087     //  Set both fds to non-blocking mode.
00088     unblock_socket (w);
00089     unblock_socket (r);
00090 }
00091 
00092 zmq::signaler_t::~signaler_t ()
00093 {
00094 #if defined ZMQ_HAVE_EVENTFD
00095     int rc = close (r);
00096     errno_assert (rc == 0);
00097 #elif defined ZMQ_HAVE_WINDOWS
00098     int rc = closesocket (w);
00099     wsa_assert (rc != SOCKET_ERROR);
00100     rc = closesocket (r);
00101     wsa_assert (rc != SOCKET_ERROR);
00102 #else
00103     int rc = close (w);
00104     errno_assert (rc == 0);
00105     rc = close (r);
00106     errno_assert (rc == 0);
00107 #endif
00108 }
00109 
00110 zmq::fd_t zmq::signaler_t::get_fd ()
00111 {
00112     return r;
00113 }
00114 
00115 void zmq::signaler_t::send ()
00116 {
00117 #if defined ZMQ_HAVE_EVENTFD
00118     const uint64_t inc = 1;
00119     ssize_t sz = write (w, &inc, sizeof (inc));
00120     errno_assert (sz == sizeof (inc));
00121 #elif defined ZMQ_HAVE_WINDOWS
00122     unsigned char dummy = 0;
00123     int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
00124     wsa_assert (nbytes != SOCKET_ERROR);
00125     zmq_assert (nbytes == sizeof (dummy));
00126 #else
00127     unsigned char dummy = 0;
00128     while (true) {
00129         ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
00130         if (unlikely (nbytes == -1 && errno == EINTR))
00131             continue;
00132         zmq_assert (nbytes == sizeof (dummy));
00133         break;
00134     }
00135 #endif
00136 }
00137 
00138 int zmq::signaler_t::wait (int timeout_)
00139 {
00140 #ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
00141 
00142     struct pollfd pfd;
00143     pfd.fd = r;
00144     pfd.events = POLLIN;
00145     int rc = poll (&pfd, 1, timeout_);
00146     if (unlikely (rc < 0)) {
00147         zmq_assert (errno == EINTR);
00148         return -1;
00149     }
00150     else if (unlikely (rc == 0)) {
00151         errno = EAGAIN;
00152         return -1;
00153     }
00154     zmq_assert (rc == 1);
00155     zmq_assert (pfd.revents & POLLIN);
00156     return 0;
00157 
00158 #elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
00159 
00160     fd_set fds;
00161     FD_ZERO (&fds);
00162     FD_SET (r, &fds);
00163     struct timeval timeout;
00164     if (timeout_ >= 0) {
00165         timeout.tv_sec = timeout_ / 1000;
00166         timeout.tv_usec = timeout_ % 1000 * 1000;
00167     }
00168 #ifdef ZMQ_HAVE_WINDOWS
00169     int rc = select (0, &fds, NULL, NULL,
00170         timeout_ >= 0 ? &timeout : NULL);
00171     wsa_assert (rc != SOCKET_ERROR);
00172 #else
00173     int rc = select (r + 1, &fds, NULL, NULL,
00174         timeout_ >= 0 ? &timeout : NULL);
00175     if (unlikely (rc < 0)) {
00176         zmq_assert (errno == EINTR);
00177         return -1;
00178     }
00179 #endif
00180     if (unlikely (rc == 0)) {
00181         errno = EAGAIN;
00182         return -1;
00183     }
00184     zmq_assert (rc == 1);
00185     return 0;
00186 
00187 #else
00188 #error
00189 #endif
00190 }
00191 
00192 void zmq::signaler_t::recv ()
00193 {
00194     //  Attempt to read a signal.
00195 #if defined ZMQ_HAVE_EVENTFD
00196     uint64_t dummy;
00197     ssize_t sz = read (r, &dummy, sizeof (dummy));
00198     errno_assert (sz == sizeof (dummy));
00199 
00200     //  If we accidentally grabbed the next signal along with the current
00201     //  one, return it back to the eventfd object.
00202     if (unlikely (dummy == 2)) {
00203         const uint64_t inc = 1;
00204         ssize_t sz = write (w, &inc, sizeof (inc));
00205         errno_assert (sz == sizeof (inc));
00206         return;
00207     }
00208 
00209     zmq_assert (dummy == 1);
00210 #else
00211     unsigned char dummy;
00212 #if defined ZMQ_HAVE_WINDOWS
00213     int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
00214     wsa_assert (nbytes != SOCKET_ERROR);
00215 #else
00216     ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
00217     errno_assert (nbytes >= 0);
00218 #endif
00219     zmq_assert (nbytes == sizeof (dummy));
00220     zmq_assert (dummy == 0);
00221 #endif
00222 }
00223 
00224 int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
00225 {
00226 #if defined ZMQ_HAVE_EVENTFD
00227 
00228     // Create eventfd object.
00229     fd_t fd = eventfd (0, 0);
00230     errno_assert (fd != -1);
00231     *w_ = fd;
00232     *r_ = fd;
00233     return 0;
00234 
00235 #elif defined ZMQ_HAVE_WINDOWS
00236 
00237     //  This function has to be in a system-wide critical section so that
00238     //  two instances of the library don't accidentally create signaler
00239     //  crossing the process boundary.
00240     //  We'll use named event object to implement the critical section.
00241     HANDLE sync = CreateEvent (NULL, FALSE, FALSE, "zmq-signaler-port-sync");
00242     win_assert (sync != NULL);
00243 
00244     //  Enter the critical section.
00245     DWORD dwrc = WaitForSingleObject (sync, INFINITE);
00246     zmq_assert (dwrc == WAIT_OBJECT_0);
00247 
00248     //  Windows has no 'socketpair' function. CreatePipe is no good as pipe
00249     //  handles cannot be polled on. Here we create the socketpair by hand.
00250     *w_ = INVALID_SOCKET;
00251     *r_ = INVALID_SOCKET;
00252 
00253     //  Create listening socket.
00254     SOCKET listener;
00255     listener = open_socket (AF_INET, SOCK_STREAM, 0);
00256     wsa_assert (listener != INVALID_SOCKET);
00257 
00258     //  Set SO_REUSEADDR and TCP_NODELAY on listening socket.
00259     BOOL so_reuseaddr = 1;
00260     int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
00261         (char *)&so_reuseaddr, sizeof (so_reuseaddr));
00262     wsa_assert (rc != SOCKET_ERROR);
00263     BOOL tcp_nodelay = 1;
00264     rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
00265         (char *)&tcp_nodelay, sizeof (tcp_nodelay));
00266     wsa_assert (rc != SOCKET_ERROR);
00267 
00268     //  Bind listening socket to any free local port.
00269     struct sockaddr_in addr;
00270     memset (&addr, 0, sizeof (addr));
00271     addr.sin_family = AF_INET;
00272     addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
00273     addr.sin_port = htons (signaler_port);
00274     rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
00275     wsa_assert (rc != SOCKET_ERROR);
00276 
00277     //  Listen for incomming connections.
00278     rc = listen (listener, 1);
00279     wsa_assert (rc != SOCKET_ERROR);
00280 
00281     //  Create the writer socket.
00282     *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0,  0);
00283     wsa_assert (*w_ != INVALID_SOCKET);
00284 
00285     //  Set TCP_NODELAY on writer socket.
00286     rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
00287         (char *)&tcp_nodelay, sizeof (tcp_nodelay));
00288     wsa_assert (rc != SOCKET_ERROR);
00289 
00290     //  Connect writer to the listener.
00291     rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
00292     wsa_assert (rc != SOCKET_ERROR);
00293 
00294     //  Accept connection from writer.
00295     *r_ = accept (listener, NULL, NULL);
00296     wsa_assert (*r_ != INVALID_SOCKET);
00297 
00298     //  We don't need the listening socket anymore. Close it.
00299     rc = closesocket (listener);
00300     wsa_assert (rc != SOCKET_ERROR);
00301 
00302     //  Exit the critical section.
00303     BOOL brc = SetEvent (sync);
00304     win_assert (brc != 0);
00305 
00306     return 0;
00307 
00308 #elif defined ZMQ_HAVE_OPENVMS
00309 
00310     //  Whilst OpenVMS supports socketpair - it maps to AF_INET only.  Further,
00311     //  it does not set the socket options TCP_NODELAY and TCP_NODELACK which
00312     //  can lead to performance problems.
00313     //
00314     //  The bug will be fixed in V5.6 ECO4 and beyond.  In the meantime, we'll
00315     //  create the socket pair manually.
00316     sockaddr_in lcladdr;
00317     memset (&lcladdr, 0, sizeof (lcladdr));
00318     lcladdr.sin_family = AF_INET;
00319     lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
00320     lcladdr.sin_port = 0;
00321 
00322     int listener = open_socket (AF_INET, SOCK_STREAM, 0);
00323     errno_assert (listener != -1);
00324 
00325     int on = 1;
00326     int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
00327     errno_assert (rc != -1);
00328 
00329     rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
00330     errno_assert (rc != -1);
00331 
00332     rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
00333     errno_assert (rc != -1);
00334 
00335     socklen_t lcladdr_len = sizeof (lcladdr);
00336 
00337     rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
00338     errno_assert (rc != -1);
00339 
00340     rc = listen (listener, 1);
00341     errno_assert (rc != -1);
00342 
00343     *w_ = open_socket (AF_INET, SOCK_STREAM, 0);
00344     errno_assert (*w_ != -1);
00345 
00346     rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
00347     errno_assert (rc != -1);
00348 
00349     rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
00350     errno_assert (rc != -1);
00351 
00352     rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
00353     errno_assert (rc != -1);
00354 
00355     *r_ = accept (listener, NULL, NULL);
00356     errno_assert (*r_ != -1);
00357 
00358     close (listener);
00359 
00360     return 0;
00361 
00362 #else // All other implementations support socketpair()
00363 
00364     int sv [2];
00365     int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
00366     errno_assert (rc == 0);
00367     *w_ = sv [0];
00368     *r_ = sv [1];
00369     return 0;
00370 
00371 #endif
00372 }
00373 
00374 #if defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
00375 #undef ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
00376 #endif
00377 #if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL
00378 #undef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
00379 #endif
00380 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines