![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2010-2011 250bpm s.r.o. 00003 Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file 00004 00005 This file is part of 0MQ. 00006 00007 0MQ is free software; you can redistribute it and/or modify it under 00008 the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 0MQ is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00021 #include "platform.hpp" 00022 00023 #if defined ZMQ_FORCE_SELECT 00024 #define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT 00025 #elif defined ZMQ_FORCE_POLL 00026 #define ZMQ_SIGNALER_WAIT_BASED_ON_POLL 00027 #elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ 00028 defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ 00029 defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ 00030 defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ 00031 defined ZMQ_HAVE_NETBSD 00032 #define ZMQ_SIGNALER_WAIT_BASED_ON_POLL 00033 #elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\ 00034 defined ZMQ_HAVE_CYGWIN 00035 #define ZMQ_SIGNALER_WAIT_BASED_ON_SELECT 00036 #endif 00037 00038 // On AIX, poll.h has to be included before zmq.h to get consistent 00039 // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' 00040 // instead of 'events' and 'revents' and defines macros to map from POSIX-y 00041 // names to AIX-specific names). 00042 #if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL 00043 #include <poll.h> 00044 #elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT 00045 #if defined ZMQ_HAVE_WINDOWS 00046 #include "windows.hpp" 00047 #elif defined ZMQ_HAVE_HPUX 00048 #include <sys/param.h> 00049 #include <sys/types.h> 00050 #include <sys/time.h> 00051 #elif defined ZMQ_HAVE_OPENVMS 00052 #include <sys/types.h> 00053 #include <sys/time.h> 00054 #else 00055 #include <sys/select.h> 00056 #endif 00057 #endif 00058 00059 #include "signaler.hpp" 00060 #include "likely.hpp" 00061 #include "stdint.hpp" 00062 #include "config.hpp" 00063 #include "err.hpp" 00064 #include "fd.hpp" 00065 #include "ip.hpp" 00066 00067 #if defined ZMQ_HAVE_EVENTFD 00068 #include <sys/eventfd.h> 00069 #endif 00070 00071 #if defined ZMQ_HAVE_WINDOWS 00072 #include "windows.hpp" 00073 #else 00074 #include <unistd.h> 00075 #include <netinet/tcp.h> 00076 #include <unistd.h> 00077 #include <sys/types.h> 00078 #include <sys/socket.h> 00079 #endif 00080 00081 zmq::signaler_t::signaler_t () 00082 { 00083 // Create the socketpair for signaling. 00084 int rc = make_fdpair (&r, &w); 00085 errno_assert (rc == 0); 00086 00087 // Set both fds to non-blocking mode. 00088 unblock_socket (w); 00089 unblock_socket (r); 00090 } 00091 00092 zmq::signaler_t::~signaler_t () 00093 { 00094 #if defined ZMQ_HAVE_EVENTFD 00095 int rc = close (r); 00096 errno_assert (rc == 0); 00097 #elif defined ZMQ_HAVE_WINDOWS 00098 int rc = closesocket (w); 00099 wsa_assert (rc != SOCKET_ERROR); 00100 rc = closesocket (r); 00101 wsa_assert (rc != SOCKET_ERROR); 00102 #else 00103 int rc = close (w); 00104 errno_assert (rc == 0); 00105 rc = close (r); 00106 errno_assert (rc == 0); 00107 #endif 00108 } 00109 00110 zmq::fd_t zmq::signaler_t::get_fd () 00111 { 00112 return r; 00113 } 00114 00115 void zmq::signaler_t::send () 00116 { 00117 #if defined ZMQ_HAVE_EVENTFD 00118 const uint64_t inc = 1; 00119 ssize_t sz = write (w, &inc, sizeof (inc)); 00120 errno_assert (sz == sizeof (inc)); 00121 #elif defined ZMQ_HAVE_WINDOWS 00122 unsigned char dummy = 0; 00123 int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0); 00124 wsa_assert (nbytes != SOCKET_ERROR); 00125 zmq_assert (nbytes == sizeof (dummy)); 00126 #else 00127 unsigned char dummy = 0; 00128 while (true) { 00129 ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0); 00130 if (unlikely (nbytes == -1 && errno == EINTR)) 00131 continue; 00132 zmq_assert (nbytes == sizeof (dummy)); 00133 break; 00134 } 00135 #endif 00136 } 00137 00138 int zmq::signaler_t::wait (int timeout_) 00139 { 00140 #ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL 00141 00142 struct pollfd pfd; 00143 pfd.fd = r; 00144 pfd.events = POLLIN; 00145 int rc = poll (&pfd, 1, timeout_); 00146 if (unlikely (rc < 0)) { 00147 zmq_assert (errno == EINTR); 00148 return -1; 00149 } 00150 else if (unlikely (rc == 0)) { 00151 errno = EAGAIN; 00152 return -1; 00153 } 00154 zmq_assert (rc == 1); 00155 zmq_assert (pfd.revents & POLLIN); 00156 return 0; 00157 00158 #elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT 00159 00160 fd_set fds; 00161 FD_ZERO (&fds); 00162 FD_SET (r, &fds); 00163 struct timeval timeout; 00164 if (timeout_ >= 0) { 00165 timeout.tv_sec = timeout_ / 1000; 00166 timeout.tv_usec = timeout_ % 1000 * 1000; 00167 } 00168 #ifdef ZMQ_HAVE_WINDOWS 00169 int rc = select (0, &fds, NULL, NULL, 00170 timeout_ >= 0 ? &timeout : NULL); 00171 wsa_assert (rc != SOCKET_ERROR); 00172 #else 00173 int rc = select (r + 1, &fds, NULL, NULL, 00174 timeout_ >= 0 ? &timeout : NULL); 00175 if (unlikely (rc < 0)) { 00176 zmq_assert (errno == EINTR); 00177 return -1; 00178 } 00179 #endif 00180 if (unlikely (rc == 0)) { 00181 errno = EAGAIN; 00182 return -1; 00183 } 00184 zmq_assert (rc == 1); 00185 return 0; 00186 00187 #else 00188 #error 00189 #endif 00190 } 00191 00192 void zmq::signaler_t::recv () 00193 { 00194 // Attempt to read a signal. 00195 #if defined ZMQ_HAVE_EVENTFD 00196 uint64_t dummy; 00197 ssize_t sz = read (r, &dummy, sizeof (dummy)); 00198 errno_assert (sz == sizeof (dummy)); 00199 00200 // If we accidentally grabbed the next signal along with the current 00201 // one, return it back to the eventfd object. 00202 if (unlikely (dummy == 2)) { 00203 const uint64_t inc = 1; 00204 ssize_t sz = write (w, &inc, sizeof (inc)); 00205 errno_assert (sz == sizeof (inc)); 00206 return; 00207 } 00208 00209 zmq_assert (dummy == 1); 00210 #else 00211 unsigned char dummy; 00212 #if defined ZMQ_HAVE_WINDOWS 00213 int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0); 00214 wsa_assert (nbytes != SOCKET_ERROR); 00215 #else 00216 ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0); 00217 errno_assert (nbytes >= 0); 00218 #endif 00219 zmq_assert (nbytes == sizeof (dummy)); 00220 zmq_assert (dummy == 0); 00221 #endif 00222 } 00223 00224 int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) 00225 { 00226 #if defined ZMQ_HAVE_EVENTFD 00227 00228 // Create eventfd object. 00229 fd_t fd = eventfd (0, 0); 00230 errno_assert (fd != -1); 00231 *w_ = fd; 00232 *r_ = fd; 00233 return 0; 00234 00235 #elif defined ZMQ_HAVE_WINDOWS 00236 00237 // This function has to be in a system-wide critical section so that 00238 // two instances of the library don't accidentally create signaler 00239 // crossing the process boundary. 00240 // We'll use named event object to implement the critical section. 00241 HANDLE sync = CreateEvent (NULL, FALSE, FALSE, "zmq-signaler-port-sync"); 00242 win_assert (sync != NULL); 00243 00244 // Enter the critical section. 00245 DWORD dwrc = WaitForSingleObject (sync, INFINITE); 00246 zmq_assert (dwrc == WAIT_OBJECT_0); 00247 00248 // Windows has no 'socketpair' function. CreatePipe is no good as pipe 00249 // handles cannot be polled on. Here we create the socketpair by hand. 00250 *w_ = INVALID_SOCKET; 00251 *r_ = INVALID_SOCKET; 00252 00253 // Create listening socket. 00254 SOCKET listener; 00255 listener = open_socket (AF_INET, SOCK_STREAM, 0); 00256 wsa_assert (listener != INVALID_SOCKET); 00257 00258 // Set SO_REUSEADDR and TCP_NODELAY on listening socket. 00259 BOOL so_reuseaddr = 1; 00260 int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, 00261 (char *)&so_reuseaddr, sizeof (so_reuseaddr)); 00262 wsa_assert (rc != SOCKET_ERROR); 00263 BOOL tcp_nodelay = 1; 00264 rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, 00265 (char *)&tcp_nodelay, sizeof (tcp_nodelay)); 00266 wsa_assert (rc != SOCKET_ERROR); 00267 00268 // Bind listening socket to any free local port. 00269 struct sockaddr_in addr; 00270 memset (&addr, 0, sizeof (addr)); 00271 addr.sin_family = AF_INET; 00272 addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); 00273 addr.sin_port = htons (signaler_port); 00274 rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); 00275 wsa_assert (rc != SOCKET_ERROR); 00276 00277 // Listen for incomming connections. 00278 rc = listen (listener, 1); 00279 wsa_assert (rc != SOCKET_ERROR); 00280 00281 // Create the writer socket. 00282 *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); 00283 wsa_assert (*w_ != INVALID_SOCKET); 00284 00285 // Set TCP_NODELAY on writer socket. 00286 rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, 00287 (char *)&tcp_nodelay, sizeof (tcp_nodelay)); 00288 wsa_assert (rc != SOCKET_ERROR); 00289 00290 // Connect writer to the listener. 00291 rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); 00292 wsa_assert (rc != SOCKET_ERROR); 00293 00294 // Accept connection from writer. 00295 *r_ = accept (listener, NULL, NULL); 00296 wsa_assert (*r_ != INVALID_SOCKET); 00297 00298 // We don't need the listening socket anymore. Close it. 00299 rc = closesocket (listener); 00300 wsa_assert (rc != SOCKET_ERROR); 00301 00302 // Exit the critical section. 00303 BOOL brc = SetEvent (sync); 00304 win_assert (brc != 0); 00305 00306 return 0; 00307 00308 #elif defined ZMQ_HAVE_OPENVMS 00309 00310 // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further, 00311 // it does not set the socket options TCP_NODELAY and TCP_NODELACK which 00312 // can lead to performance problems. 00313 // 00314 // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll 00315 // create the socket pair manually. 00316 sockaddr_in lcladdr; 00317 memset (&lcladdr, 0, sizeof (lcladdr)); 00318 lcladdr.sin_family = AF_INET; 00319 lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); 00320 lcladdr.sin_port = 0; 00321 00322 int listener = open_socket (AF_INET, SOCK_STREAM, 0); 00323 errno_assert (listener != -1); 00324 00325 int on = 1; 00326 int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); 00327 errno_assert (rc != -1); 00328 00329 rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); 00330 errno_assert (rc != -1); 00331 00332 rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); 00333 errno_assert (rc != -1); 00334 00335 socklen_t lcladdr_len = sizeof (lcladdr); 00336 00337 rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); 00338 errno_assert (rc != -1); 00339 00340 rc = listen (listener, 1); 00341 errno_assert (rc != -1); 00342 00343 *w_ = open_socket (AF_INET, SOCK_STREAM, 0); 00344 errno_assert (*w_ != -1); 00345 00346 rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); 00347 errno_assert (rc != -1); 00348 00349 rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); 00350 errno_assert (rc != -1); 00351 00352 rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); 00353 errno_assert (rc != -1); 00354 00355 *r_ = accept (listener, NULL, NULL); 00356 errno_assert (*r_ != -1); 00357 00358 close (listener); 00359 00360 return 0; 00361 00362 #else // All other implementations support socketpair() 00363 00364 int sv [2]; 00365 int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); 00366 errno_assert (rc == 0); 00367 *w_ = sv [0]; 00368 *r_ = sv [1]; 00369 return 0; 00370 00371 #endif 00372 } 00373 00374 #if defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT 00375 #undef ZMQ_SIGNALER_WAIT_BASED_ON_SELECT 00376 #endif 00377 #if defined ZMQ_SIGNALER_WAIT_BASED_ON_POLL 00378 #undef ZMQ_SIGNALER_WAIT_BASED_ON_POLL 00379 #endif 00380