![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2011 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "platform.hpp" 00023 00024 #if defined ZMQ_FORCE_SELECT 00025 #define ZMQ_POLL_BASED_ON_SELECT 00026 #elif defined ZMQ_FORCE_POLL 00027 #define ZMQ_POLL_BASED_ON_POLL 00028 #elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\ 00029 defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\ 00030 defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\ 00031 defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\ 00032 defined ZMQ_HAVE_NETBSD 00033 #define ZMQ_POLL_BASED_ON_POLL 00034 #elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\ 00035 defined ZMQ_HAVE_CYGWIN 00036 #define ZMQ_POLL_BASED_ON_SELECT 00037 #endif 00038 00039 // On AIX platform, poll.h has to be included first to get consistent 00040 // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' 00041 // instead of 'events' and 'revents' and defines macros to map from POSIX-y 00042 // names to AIX-specific names). 00043 #if defined ZMQ_POLL_BASED_ON_POLL 00044 #include <poll.h> 00045 #endif 00046 00047 #if defined ZMQ_HAVE_WINDOWS 00048 #include "windows.hpp" 00049 #else 00050 #include <unistd.h> 00051 #endif 00052 00053 #include <string.h> 00054 #include <errno.h> 00055 #include <stdlib.h> 00056 #include <new> 00057 00058 #include "socket_base.hpp" 00059 #include "stdint.hpp" 00060 #include "config.hpp" 00061 #include "likely.hpp" 00062 #include "clock.hpp" 00063 #include "ctx.hpp" 00064 #include "err.hpp" 00065 #include "msg.hpp" 00066 #include "fd.hpp" 00067 00068 #if !defined ZMQ_HAVE_WINDOWS 00069 #include <unistd.h> 00070 #endif 00071 00072 #if defined ZMQ_HAVE_OPENPGM 00073 #define __PGM_WININT_H__ 00074 #include <pgm/pgm.h> 00075 #endif 00076 00077 // Compile time check whether msg_t fits into zmq_msg_t. 00078 typedef char check_msg_t_size 00079 [sizeof (zmq::msg_t) == sizeof (zmq_msg_t) ? 1 : -1]; 00080 00081 void zmq_version (int *major_, int *minor_, int *patch_) 00082 { 00083 *major_ = ZMQ_VERSION_MAJOR; 00084 *minor_ = ZMQ_VERSION_MINOR; 00085 *patch_ = ZMQ_VERSION_PATCH; 00086 } 00087 00088 const char *zmq_strerror (int errnum_) 00089 { 00090 return zmq::errno_to_string (errnum_); 00091 } 00092 00093 void *zmq_init (int io_threads_) 00094 { 00095 if (io_threads_ < 0) { 00096 errno = EINVAL; 00097 return NULL; 00098 } 00099 00100 #if defined ZMQ_HAVE_OPENPGM 00101 00102 // Init PGM transport. Ensure threading and timer are enabled. Find PGM 00103 // protocol ID. Note that if you want to use gettimeofday and sleep for 00104 // openPGM timing, set environment variables PGM_TIMER to "GTOD" and 00105 // PGM_SLEEP to "USLEEP". 00106 pgm_error_t *pgm_error = NULL; 00107 const bool ok = pgm_init (&pgm_error); 00108 if (ok != TRUE) { 00109 00110 // Invalid parameters don't set pgm_error_t 00111 zmq_assert (pgm_error != NULL); 00112 if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && ( 00113 pgm_error->code == PGM_ERROR_FAILED)) { 00114 00115 // Failed to access RTC or HPET device. 00116 pgm_error_free (pgm_error); 00117 errno = EINVAL; 00118 return NULL; 00119 } 00120 00121 // PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg. 00122 zmq_assert (false); 00123 } 00124 #endif 00125 00126 #ifdef ZMQ_HAVE_WINDOWS 00127 // Intialise Windows sockets. Note that WSAStartup can be called multiple 00128 // times given that WSACleanup will be called for each WSAStartup. 00129 // We do this before the ctx constructor since its embedded mailbox_t 00130 // object needs Winsock to be up and running. 00131 WORD version_requested = MAKEWORD (2, 2); 00132 WSADATA wsa_data; 00133 int rc = WSAStartup (version_requested, &wsa_data); 00134 zmq_assert (rc == 0); 00135 zmq_assert (LOBYTE (wsa_data.wVersion) == 2 && 00136 HIBYTE (wsa_data.wVersion) == 2); 00137 #endif 00138 00139 // Create 0MQ context. 00140 zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_); 00141 alloc_assert (ctx); 00142 return (void*) ctx; 00143 } 00144 00145 int zmq_term (void *ctx_) 00146 { 00147 if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { 00148 errno = EFAULT; 00149 return -1; 00150 } 00151 00152 int rc = ((zmq::ctx_t*) ctx_)->terminate (); 00153 int en = errno; 00154 00155 #ifdef ZMQ_HAVE_WINDOWS 00156 // On Windows, uninitialise socket layer. 00157 rc = WSACleanup (); 00158 wsa_assert (rc != SOCKET_ERROR); 00159 #endif 00160 00161 #if defined ZMQ_HAVE_OPENPGM 00162 // Shut down the OpenPGM library. 00163 if (pgm_shutdown () != TRUE) 00164 zmq_assert (false); 00165 #endif 00166 00167 errno = en; 00168 return rc; 00169 } 00170 00171 void *zmq_socket (void *ctx_, int type_) 00172 { 00173 if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { 00174 errno = EFAULT; 00175 return NULL; 00176 } 00177 return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_)); 00178 } 00179 00180 int zmq_close (void *s_) 00181 { 00182 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { 00183 errno = ENOTSOCK; 00184 return -1; 00185 } 00186 ((zmq::socket_base_t*) s_)->close (); 00187 return 0; 00188 } 00189 00190 int zmq_setsockopt (void *s_, int option_, const void *optval_, 00191 size_t optvallen_) 00192 { 00193 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { 00194 errno = ENOTSOCK; 00195 return -1; 00196 } 00197 return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_, 00198 optvallen_)); 00199 } 00200 00201 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) 00202 { 00203 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { 00204 errno = ENOTSOCK; 00205 return -1; 00206 } 00207 return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_, 00208 optvallen_)); 00209 } 00210 00211 int zmq_bind (void *s_, const char *addr_) 00212 { 00213 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { 00214 errno = ENOTSOCK; 00215 return -1; 00216 } 00217 return (((zmq::socket_base_t*) s_)->bind (addr_)); 00218 } 00219 00220 int zmq_connect (void *s_, const char *addr_) 00221 { 00222 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { 00223 errno = ENOTSOCK; 00224 return -1; 00225 } 00226 return (((zmq::socket_base_t*) s_)->connect (addr_)); 00227 } 00228 00229 int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) 00230 { 00231 zmq_msg_t msg; 00232 int rc = zmq_msg_init_size (&msg, len_); 00233 if (rc != 0) 00234 return -1; 00235 memcpy (zmq_msg_data (&msg), buf_, len_); 00236 00237 rc = zmq_sendmsg (s_, &msg, flags_); 00238 if (unlikely (rc < 0)) { 00239 int err = errno; 00240 int rc2 = zmq_msg_close (&msg); 00241 errno_assert (rc2 == 0); 00242 errno = err; 00243 return -1; 00244 } 00245 00246 // Note the optimisation here. We don't close the msg object as it is 00247 // empty anyway. This may change when implementation of zmq_msg_t changes. 00248 return rc; 00249 } 00250 00251 int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) 00252 { 00253 zmq_msg_t msg; 00254 int rc = zmq_msg_init (&msg); 00255 errno_assert (rc == 0); 00256 00257 int nbytes = zmq_recvmsg (s_, &msg, flags_); 00258 if (unlikely (nbytes < 0)) { 00259 int err = errno; 00260 rc = zmq_msg_close (&msg); 00261 errno_assert (rc == 0); 00262 errno = err; 00263 return -1; 00264 } 00265 00266 // At the moment an oversized message is silently truncated. 00267 // TODO: Build in a notification mechanism to report the overflows. 00268 size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_; 00269 memcpy (buf_, zmq_msg_data (&msg), to_copy); 00270 00271 rc = zmq_msg_close (&msg); 00272 errno_assert (rc == 0); 00273 00274 return nbytes; 00275 } 00276 00277 int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) 00278 { 00279 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { 00280 errno = ENOTSOCK; 00281 return -1; 00282 } 00283 int sz = (int) zmq_msg_size (msg_); 00284 int rc = (((zmq::socket_base_t*) s_)->send ((zmq::msg_t*) msg_, flags_)); 00285 if (unlikely (rc < 0)) 00286 return -1; 00287 return sz; 00288 } 00289 00290 int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) 00291 { 00292 if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { 00293 errno = ENOTSOCK; 00294 return -1; 00295 } 00296 int rc = (((zmq::socket_base_t*) s_)->recv ((zmq::msg_t*) msg_, flags_)); 00297 if (unlikely (rc < 0)) 00298 return -1; 00299 return (int) zmq_msg_size (msg_); 00300 } 00301 00302 int zmq_msg_init (zmq_msg_t *msg_) 00303 { 00304 return ((zmq::msg_t*) msg_)->init (); 00305 } 00306 00307 int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_) 00308 { 00309 return ((zmq::msg_t*) msg_)->init_size (size_); 00310 } 00311 00312 int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, 00313 zmq_free_fn *ffn_, void *hint_) 00314 { 00315 return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_); 00316 } 00317 00318 int zmq_msg_close (zmq_msg_t *msg_) 00319 { 00320 return ((zmq::msg_t*) msg_)->close (); 00321 } 00322 00323 int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_) 00324 { 00325 return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_); 00326 } 00327 00328 int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) 00329 { 00330 return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_); 00331 } 00332 00333 void *zmq_msg_data (zmq_msg_t *msg_) 00334 { 00335 return ((zmq::msg_t*) msg_)->data (); 00336 } 00337 00338 size_t zmq_msg_size (zmq_msg_t *msg_) 00339 { 00340 return ((zmq::msg_t*) msg_)->size (); 00341 } 00342 00343 int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_, 00344 size_t *optvallen_) 00345 { 00346 switch (option_) { 00347 case ZMQ_MORE: 00348 if (*optvallen_ < sizeof (int)) { 00349 errno = EINVAL; 00350 return -1; 00351 } 00352 *((int*) optval_) = 00353 (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more) ? 1 : 0; 00354 *optvallen_ = sizeof (int); 00355 return 0; 00356 default: 00357 errno = EINVAL; 00358 return -1; 00359 } 00360 } 00361 00362 int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) 00363 { 00364 #if defined ZMQ_POLL_BASED_ON_POLL 00365 if (unlikely (nitems_ < 0)) { 00366 errno = EINVAL; 00367 return -1; 00368 } 00369 if (unlikely (nitems_ == 0)) { 00370 if (timeout_ == 0) 00371 return 0; 00372 #if defined ZMQ_HAVE_WINDOWS 00373 Sleep (timeout_ > 0 ? timeout_ : INFINITE); 00374 return 0; 00375 #elif defined ZMQ_HAVE_ANDROID 00376 usleep (timeout_ * 1000); 00377 return 0; 00378 #else 00379 return usleep (timeout_ * 1000); 00380 #endif 00381 } 00382 00383 if (!items_) { 00384 errno = EFAULT; 00385 return -1; 00386 } 00387 00388 zmq::clock_t clock; 00389 uint64_t now = 0; 00390 uint64_t end = 0; 00391 00392 pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd)); 00393 alloc_assert (pollfds); 00394 00395 // Build pollset for poll () system call. 00396 for (int i = 0; i != nitems_; i++) { 00397 00398 // If the poll item is a 0MQ socket, we poll on the file descriptor 00399 // retrieved by the ZMQ_FD socket option. 00400 if (items_ [i].socket) { 00401 size_t zmq_fd_size = sizeof (zmq::fd_t); 00402 if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, 00403 &zmq_fd_size) == -1) { 00404 free (pollfds); 00405 return -1; 00406 } 00407 pollfds [i].events = items_ [i].events ? POLLIN : 0; 00408 } 00409 // Else, the poll item is a raw file descriptor. Just convert the 00410 // events to normal POLLIN/POLLOUT for poll (). 00411 else { 00412 pollfds [i].fd = items_ [i].fd; 00413 pollfds [i].events = 00414 (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | 00415 (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0); 00416 } 00417 } 00418 00419 bool first_pass = true; 00420 int nevents = 0; 00421 00422 while (true) { 00423 00424 // Compute the timeout for the subsequent poll. 00425 int timeout; 00426 if (first_pass) 00427 timeout = 0; 00428 else if (timeout_ < 0) 00429 timeout = -1; 00430 else 00431 timeout = end - now; 00432 00433 // Wait for events. 00434 while (true) { 00435 int rc = poll (pollfds, nitems_, timeout); 00436 if (rc == -1 && errno == EINTR) { 00437 free (pollfds); 00438 return -1; 00439 } 00440 errno_assert (rc >= 0); 00441 break; 00442 } 00443 00444 // Check for the events. 00445 for (int i = 0; i != nitems_; i++) { 00446 00447 items_ [i].revents = 0; 00448 00449 // The poll item is a 0MQ socket. Retrieve pending events 00450 // using the ZMQ_EVENTS socket option. 00451 if (items_ [i].socket) { 00452 size_t zmq_events_size = sizeof (uint32_t); 00453 uint32_t zmq_events; 00454 if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, 00455 &zmq_events_size) == -1) { 00456 free (pollfds); 00457 return -1; 00458 } 00459 if ((items_ [i].events & ZMQ_POLLOUT) && 00460 (zmq_events & ZMQ_POLLOUT)) 00461 items_ [i].revents |= ZMQ_POLLOUT; 00462 if ((items_ [i].events & ZMQ_POLLIN) && 00463 (zmq_events & ZMQ_POLLIN)) 00464 items_ [i].revents |= ZMQ_POLLIN; 00465 } 00466 // Else, the poll item is a raw file descriptor, simply convert 00467 // the events to zmq_pollitem_t-style format. 00468 else { 00469 if (pollfds [i].revents & POLLIN) 00470 items_ [i].revents |= ZMQ_POLLIN; 00471 if (pollfds [i].revents & POLLOUT) 00472 items_ [i].revents |= ZMQ_POLLOUT; 00473 if (pollfds [i].revents & ~(POLLIN | POLLOUT)) 00474 items_ [i].revents |= ZMQ_POLLERR; 00475 } 00476 00477 if (items_ [i].revents) 00478 nevents++; 00479 } 00480 00481 // If timout is zero, exit immediately whether there are events or not. 00482 if (timeout_ == 0) 00483 break; 00484 00485 // If there are events to return, we can exit immediately. 00486 if (nevents) 00487 break; 00488 00489 // At this point we are meant to wait for events but there are none. 00490 // If timeout is infinite we can just loop until we get some events. 00491 if (timeout_ < 0) { 00492 if (first_pass) 00493 first_pass = false; 00494 continue; 00495 } 00496 00497 // The timeout is finite and there are no events. In the first pass 00498 // we get a timestamp of when the polling have begun. (We assume that 00499 // first pass have taken negligible time). We also compute the time 00500 // when the polling should time out. 00501 if (first_pass) { 00502 now = clock.now_ms (); 00503 end = now + timeout_; 00504 if (now == end) 00505 break; 00506 first_pass = false; 00507 continue; 00508 } 00509 00510 // Find out whether timeout have expired. 00511 now = clock.now_ms (); 00512 if (now >= end) 00513 break; 00514 } 00515 00516 free (pollfds); 00517 return nevents; 00518 00519 #elif defined ZMQ_POLL_BASED_ON_SELECT 00520 00521 if (unlikely (nitems_ < 0)) { 00522 errno = EINVAL; 00523 return -1; 00524 } 00525 if (unlikely (nitems_ == 0)) { 00526 if (timeout_ == 0) 00527 return 0; 00528 #if defined ZMQ_HAVE_WINDOWS 00529 Sleep (timeout_ > 0 ? timeout_ : INFINITE); 00530 return 0; 00531 #else 00532 return usleep (timeout_ * 1000); 00533 #endif 00534 } 00535 00536 if (!items_) { 00537 errno = EFAULT; 00538 return -1; 00539 } 00540 00541 zmq::clock_t clock; 00542 uint64_t now = 0; 00543 uint64_t end = 0; 00544 00545 // Ensure we do not attempt to select () on more than FD_SETSIZE 00546 // file descriptors. 00547 zmq_assert (nitems_ <= FD_SETSIZE); 00548 00549 fd_set pollset_in; 00550 FD_ZERO (&pollset_in); 00551 fd_set pollset_out; 00552 FD_ZERO (&pollset_out); 00553 fd_set pollset_err; 00554 FD_ZERO (&pollset_err); 00555 00556 zmq::fd_t maxfd = 0; 00557 00558 // Build the fd_sets for passing to select (). 00559 for (int i = 0; i != nitems_; i++) { 00560 00561 // If the poll item is a 0MQ socket we are interested in input on the 00562 // notification file descriptor retrieved by the ZMQ_FD socket option. 00563 if (items_ [i].socket) { 00564 size_t zmq_fd_size = sizeof (zmq::fd_t); 00565 zmq::fd_t notify_fd; 00566 if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, 00567 &zmq_fd_size) == -1) 00568 return -1; 00569 if (items_ [i].events) { 00570 FD_SET (notify_fd, &pollset_in); 00571 if (maxfd < notify_fd) 00572 maxfd = notify_fd; 00573 } 00574 } 00575 // Else, the poll item is a raw file descriptor. Convert the poll item 00576 // events to the appropriate fd_sets. 00577 else { 00578 if (items_ [i].events & ZMQ_POLLIN) 00579 FD_SET (items_ [i].fd, &pollset_in); 00580 if (items_ [i].events & ZMQ_POLLOUT) 00581 FD_SET (items_ [i].fd, &pollset_out); 00582 if (items_ [i].events & ZMQ_POLLERR) 00583 FD_SET (items_ [i].fd, &pollset_err); 00584 if (maxfd < items_ [i].fd) 00585 maxfd = items_ [i].fd; 00586 } 00587 } 00588 00589 bool first_pass = true; 00590 int nevents = 0; 00591 fd_set inset, outset, errset; 00592 00593 while (true) { 00594 00595 // Compute the timeout for the subsequent poll. 00596 timeval timeout; 00597 timeval *ptimeout; 00598 if (first_pass) { 00599 timeout.tv_sec = 0; 00600 timeout.tv_usec = 0; 00601 ptimeout = &timeout; 00602 } 00603 else if (timeout_ < 0) 00604 ptimeout = NULL; 00605 else { 00606 timeout.tv_sec = (long) ((end - now) / 1000); 00607 timeout.tv_usec = (long) ((end - now) % 1000 * 1000); 00608 ptimeout = &timeout; 00609 } 00610 00611 // Wait for events. Ignore interrupts if there's infinite timeout. 00612 while (true) { 00613 memcpy (&inset, &pollset_in, sizeof (fd_set)); 00614 memcpy (&outset, &pollset_out, sizeof (fd_set)); 00615 memcpy (&errset, &pollset_err, sizeof (fd_set)); 00616 #if defined ZMQ_HAVE_WINDOWS 00617 int rc = select (0, &inset, &outset, &errset, ptimeout); 00618 if (unlikely (rc == SOCKET_ERROR)) { 00619 zmq::wsa_error_to_errno (); 00620 if (errno == ENOTSOCK) 00621 return -1; 00622 wsa_assert (false); 00623 } 00624 #else 00625 int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); 00626 if (unlikely (rc == -1)) { 00627 if (errno == EINTR || errno == EBADF) 00628 return -1; 00629 errno_assert (false); 00630 } 00631 #endif 00632 break; 00633 } 00634 00635 // Check for the events. 00636 for (int i = 0; i != nitems_; i++) { 00637 00638 items_ [i].revents = 0; 00639 00640 // The poll item is a 0MQ socket. Retrieve pending events 00641 // using the ZMQ_EVENTS socket option. 00642 if (items_ [i].socket) { 00643 size_t zmq_events_size = sizeof (uint32_t); 00644 uint32_t zmq_events; 00645 if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, 00646 &zmq_events_size) == -1) 00647 return -1; 00648 if ((items_ [i].events & ZMQ_POLLOUT) && 00649 (zmq_events & ZMQ_POLLOUT)) 00650 items_ [i].revents |= ZMQ_POLLOUT; 00651 if ((items_ [i].events & ZMQ_POLLIN) && 00652 (zmq_events & ZMQ_POLLIN)) 00653 items_ [i].revents |= ZMQ_POLLIN; 00654 } 00655 // Else, the poll item is a raw file descriptor, simply convert 00656 // the events to zmq_pollitem_t-style format. 00657 else { 00658 if (FD_ISSET (items_ [i].fd, &inset)) 00659 items_ [i].revents |= ZMQ_POLLIN; 00660 if (FD_ISSET (items_ [i].fd, &outset)) 00661 items_ [i].revents |= ZMQ_POLLOUT; 00662 if (FD_ISSET (items_ [i].fd, &errset)) 00663 items_ [i].revents |= ZMQ_POLLERR; 00664 } 00665 00666 if (items_ [i].revents) 00667 nevents++; 00668 } 00669 00670 // If timout is zero, exit immediately whether there are events or not. 00671 if (timeout_ == 0) 00672 break; 00673 00674 // If there are events to return, we can exit immediately. 00675 if (nevents) 00676 break; 00677 00678 // At this point we are meant to wait for events but there are none. 00679 // If timeout is infinite we can just loop until we get some events. 00680 if (timeout_ < 0) { 00681 if (first_pass) 00682 first_pass = false; 00683 continue; 00684 } 00685 00686 // The timeout is finite and there are no events. In the first pass 00687 // we get a timestamp of when the polling have begun. (We assume that 00688 // first pass have taken negligible time). We also compute the time 00689 // when the polling should time out. 00690 if (first_pass) { 00691 now = clock.now_ms (); 00692 end = now + timeout_; 00693 if (now == end) 00694 break; 00695 first_pass = false; 00696 continue; 00697 } 00698 00699 // Find out whether timeout have expired. 00700 now = clock.now_ms (); 00701 if (now >= end) 00702 break; 00703 } 00704 00705 return nevents; 00706 00707 #else 00708 // Exotic platforms that support neither poll() nor select(). 00709 errno = ENOTSUP; 00710 return -1; 00711 #endif 00712 } 00713 00714 int zmq_errno () 00715 { 00716 return errno; 00717 } 00718 00719 #if defined ZMQ_POLL_BASED_ON_SELECT 00720 #undef ZMQ_POLL_BASED_ON_SELECT 00721 #endif 00722 #if defined ZMQ_POLL_BASED_ON_POLL 00723 #undef ZMQ_POLL_BASED_ON_POLL 00724 #endif 00725