libzmq master
The Intelligent Transport Layer

zmq.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2011 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include "platform.hpp"
00023 
00024 #if defined ZMQ_FORCE_SELECT
00025 #define ZMQ_POLL_BASED_ON_SELECT
00026 #elif defined ZMQ_FORCE_POLL
00027 #define ZMQ_POLL_BASED_ON_POLL
00028 #elif defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
00029     defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
00030     defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
00031     defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX ||\
00032     defined ZMQ_HAVE_NETBSD
00033 #define ZMQ_POLL_BASED_ON_POLL
00034 #elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS ||\
00035     defined ZMQ_HAVE_CYGWIN
00036 #define ZMQ_POLL_BASED_ON_SELECT
00037 #endif
00038 
00039 //  On AIX platform, poll.h has to be included first to get consistent
00040 //  definition of pollfd structure (AIX uses 'reqevents' and 'retnevents'
00041 //  instead of 'events' and 'revents' and defines macros to map from POSIX-y
00042 //  names to AIX-specific names).
00043 #if defined ZMQ_POLL_BASED_ON_POLL
00044 #include <poll.h>
00045 #endif
00046 
00047 #if defined ZMQ_HAVE_WINDOWS
00048 #include "windows.hpp"
00049 #else
00050 #include <unistd.h>
00051 #endif
00052 
00053 #include <string.h>
00054 #include <errno.h>
00055 #include <stdlib.h>
00056 #include <new>
00057 
00058 #include "socket_base.hpp"
00059 #include "stdint.hpp"
00060 #include "config.hpp"
00061 #include "likely.hpp"
00062 #include "clock.hpp"
00063 #include "ctx.hpp"
00064 #include "err.hpp"
00065 #include "msg.hpp"
00066 #include "fd.hpp"
00067 
00068 #if !defined ZMQ_HAVE_WINDOWS
00069 #include <unistd.h>
00070 #endif
00071 
00072 #if defined ZMQ_HAVE_OPENPGM
00073 #define __PGM_WININT_H__
00074 #include <pgm/pgm.h>
00075 #endif
00076 
00077 //  Compile time check whether msg_t fits into zmq_msg_t.
00078 typedef char check_msg_t_size
00079     [sizeof (zmq::msg_t) ==  sizeof (zmq_msg_t) ? 1 : -1];
00080 
00081 void zmq_version (int *major_, int *minor_, int *patch_)
00082 {
00083     *major_ = ZMQ_VERSION_MAJOR;
00084     *minor_ = ZMQ_VERSION_MINOR;
00085     *patch_ = ZMQ_VERSION_PATCH;
00086 }
00087 
00088 const char *zmq_strerror (int errnum_)
00089 {
00090     return zmq::errno_to_string (errnum_);
00091 }
00092 
00093 void *zmq_init (int io_threads_)
00094 {
00095     if (io_threads_ < 0) {
00096         errno = EINVAL;
00097         return NULL;
00098     }
00099 
00100 #if defined ZMQ_HAVE_OPENPGM
00101 
00102     //  Init PGM transport. Ensure threading and timer are enabled. Find PGM
00103     //  protocol ID. Note that if you want to use gettimeofday and sleep for
00104     //  openPGM timing, set environment variables PGM_TIMER to "GTOD" and
00105     //  PGM_SLEEP to "USLEEP".
00106     pgm_error_t *pgm_error = NULL;
00107     const bool ok = pgm_init (&pgm_error);
00108     if (ok != TRUE) {
00109 
00110         //  Invalid parameters don't set pgm_error_t
00111         zmq_assert (pgm_error != NULL);
00112         if (pgm_error->domain == PGM_ERROR_DOMAIN_TIME && (
00113               pgm_error->code == PGM_ERROR_FAILED)) {
00114 
00115             //  Failed to access RTC or HPET device.
00116             pgm_error_free (pgm_error);
00117             errno = EINVAL;
00118             return NULL;
00119         }
00120 
00121         //  PGM_ERROR_DOMAIN_ENGINE: WSAStartup errors or missing WSARecvMsg.
00122         zmq_assert (false);
00123     }
00124 #endif
00125 
00126 #ifdef ZMQ_HAVE_WINDOWS
00127     //  Intialise Windows sockets. Note that WSAStartup can be called multiple
00128     //  times given that WSACleanup will be called for each WSAStartup.
00129    //  We do this before the ctx constructor since its embedded mailbox_t
00130    //  object needs Winsock to be up and running.
00131     WORD version_requested = MAKEWORD (2, 2);
00132     WSADATA wsa_data;
00133     int rc = WSAStartup (version_requested, &wsa_data);
00134     zmq_assert (rc == 0);
00135     zmq_assert (LOBYTE (wsa_data.wVersion) == 2 &&
00136         HIBYTE (wsa_data.wVersion) == 2);
00137 #endif
00138 
00139     //  Create 0MQ context.
00140     zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
00141     alloc_assert (ctx);
00142     return (void*) ctx;
00143 }
00144 
00145 int zmq_term (void *ctx_)
00146 {
00147     if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
00148         errno = EFAULT;
00149         return -1;
00150     }
00151 
00152     int rc = ((zmq::ctx_t*) ctx_)->terminate ();
00153     int en = errno;
00154 
00155 #ifdef ZMQ_HAVE_WINDOWS
00156     //  On Windows, uninitialise socket layer.
00157     rc = WSACleanup ();
00158     wsa_assert (rc != SOCKET_ERROR);
00159 #endif
00160 
00161 #if defined ZMQ_HAVE_OPENPGM
00162     //  Shut down the OpenPGM library.
00163     if (pgm_shutdown () != TRUE)
00164         zmq_assert (false);
00165 #endif
00166 
00167     errno = en;
00168     return rc;
00169 }
00170 
00171 void *zmq_socket (void *ctx_, int type_)
00172 {
00173     if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
00174         errno = EFAULT;
00175         return NULL;
00176     }
00177     return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
00178 }
00179 
00180 int zmq_close (void *s_)
00181 {
00182     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
00183         errno = ENOTSOCK;
00184         return -1;
00185     }
00186     ((zmq::socket_base_t*) s_)->close ();
00187     return 0;
00188 }
00189 
00190 int zmq_setsockopt (void *s_, int option_, const void *optval_,
00191     size_t optvallen_)
00192 {
00193     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
00194         errno = ENOTSOCK;
00195         return -1;
00196     }
00197     return (((zmq::socket_base_t*) s_)->setsockopt (option_, optval_,
00198         optvallen_));
00199 }
00200 
00201 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
00202 {
00203     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
00204         errno = ENOTSOCK;
00205         return -1;
00206     }
00207     return (((zmq::socket_base_t*) s_)->getsockopt (option_, optval_,
00208         optvallen_));
00209 }
00210 
00211 int zmq_bind (void *s_, const char *addr_)
00212 {
00213     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
00214         errno = ENOTSOCK;
00215         return -1;
00216     }
00217     return (((zmq::socket_base_t*) s_)->bind (addr_));
00218 }
00219 
00220 int zmq_connect (void *s_, const char *addr_)
00221 {
00222     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
00223         errno = ENOTSOCK;
00224         return -1;
00225     }
00226     return (((zmq::socket_base_t*) s_)->connect (addr_));
00227 }
00228 
00229 int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
00230 {
00231     zmq_msg_t msg;
00232     int rc = zmq_msg_init_size (&msg, len_);
00233     if (rc != 0)
00234         return -1;
00235     memcpy (zmq_msg_data (&msg), buf_, len_);
00236 
00237     rc = zmq_sendmsg (s_, &msg, flags_);
00238     if (unlikely (rc < 0)) {
00239         int err = errno;
00240         int rc2 = zmq_msg_close (&msg);
00241         errno_assert (rc2 == 0);
00242         errno = err;
00243         return -1;
00244     }
00245     
00246     //  Note the optimisation here. We don't close the msg object as it is
00247     //  empty anyway. This may change when implementation of zmq_msg_t changes.
00248     return rc;
00249 }
00250 
00251 int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
00252 {
00253     zmq_msg_t msg;
00254     int rc = zmq_msg_init (&msg);
00255     errno_assert (rc == 0);
00256 
00257     int nbytes = zmq_recvmsg (s_, &msg, flags_);
00258     if (unlikely (nbytes < 0)) {
00259         int err = errno;
00260         rc = zmq_msg_close (&msg);
00261         errno_assert (rc == 0);
00262         errno = err;
00263         return -1;
00264     }
00265 
00266     //  At the moment an oversized message is silently truncated.
00267     //  TODO: Build in a notification mechanism to report the overflows.
00268     size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_;
00269     memcpy (buf_, zmq_msg_data (&msg), to_copy);
00270 
00271     rc = zmq_msg_close (&msg);
00272     errno_assert (rc == 0);
00273 
00274     return nbytes;    
00275 }
00276 
00277 int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
00278 {
00279     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
00280         errno = ENOTSOCK;
00281         return -1;
00282     }
00283     int sz = (int) zmq_msg_size (msg_);
00284     int rc = (((zmq::socket_base_t*) s_)->send ((zmq::msg_t*) msg_, flags_));
00285     if (unlikely (rc < 0))
00286         return -1;
00287     return sz;
00288 }
00289 
00290 int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
00291 {
00292     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
00293         errno = ENOTSOCK;
00294         return -1;
00295     }
00296     int rc = (((zmq::socket_base_t*) s_)->recv ((zmq::msg_t*) msg_, flags_));
00297     if (unlikely (rc < 0))
00298         return -1;
00299     return (int) zmq_msg_size (msg_);
00300 }
00301 
00302 int zmq_msg_init (zmq_msg_t *msg_)
00303 {
00304     return ((zmq::msg_t*) msg_)->init ();
00305 }
00306 
00307 int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_)
00308 {
00309     return ((zmq::msg_t*) msg_)->init_size (size_);
00310 }
00311 
00312 int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_,
00313     zmq_free_fn *ffn_, void *hint_)
00314 {
00315     return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_);
00316 }
00317 
00318 int zmq_msg_close (zmq_msg_t *msg_)
00319 {
00320     return ((zmq::msg_t*) msg_)->close ();
00321 }
00322 
00323 int zmq_msg_move (zmq_msg_t *dest_, zmq_msg_t *src_)
00324 {
00325     return ((zmq::msg_t*) dest_)->move (*(zmq::msg_t*) src_);
00326 }
00327 
00328 int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
00329 {
00330     return ((zmq::msg_t*) dest_)->copy (*(zmq::msg_t*) src_);
00331 }
00332 
00333 void *zmq_msg_data (zmq_msg_t *msg_)
00334 {
00335     return ((zmq::msg_t*) msg_)->data ();
00336 }
00337 
00338 size_t zmq_msg_size (zmq_msg_t *msg_)
00339 {
00340     return ((zmq::msg_t*) msg_)->size ();
00341 }
00342 
00343 int zmq_getmsgopt (zmq_msg_t *msg_, int option_, void *optval_,
00344     size_t *optvallen_)
00345 {
00346     switch (option_) {
00347     case ZMQ_MORE:
00348         if (*optvallen_ < sizeof (int)) {
00349             errno = EINVAL;
00350             return -1;
00351         }
00352         *((int*) optval_) =
00353             (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more) ? 1 : 0;
00354         *optvallen_ = sizeof (int);
00355         return 0;
00356     default:
00357         errno = EINVAL;
00358         return -1;
00359     }
00360 }
00361 
00362 int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
00363 {
00364 #if defined ZMQ_POLL_BASED_ON_POLL
00365     if (unlikely (nitems_ < 0)) {
00366         errno = EINVAL;
00367         return -1;
00368     }
00369     if (unlikely (nitems_ == 0)) {
00370         if (timeout_ == 0)
00371             return 0;
00372 #if defined ZMQ_HAVE_WINDOWS
00373         Sleep (timeout_ > 0 ? timeout_ : INFINITE);
00374         return 0;
00375 #elif defined ZMQ_HAVE_ANDROID
00376         usleep (timeout_ * 1000);
00377         return 0;
00378 #else
00379         return usleep (timeout_ * 1000);
00380 #endif
00381     }
00382 
00383     if (!items_) {
00384         errno = EFAULT;
00385         return -1;
00386     }
00387 
00388     zmq::clock_t clock;
00389     uint64_t now = 0;
00390     uint64_t end = 0;
00391 
00392     pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
00393     alloc_assert (pollfds);
00394 
00395     //  Build pollset for poll () system call.
00396     for (int i = 0; i != nitems_; i++) {
00397 
00398         //  If the poll item is a 0MQ socket, we poll on the file descriptor
00399         //  retrieved by the ZMQ_FD socket option.
00400         if (items_ [i].socket) {
00401             size_t zmq_fd_size = sizeof (zmq::fd_t);
00402             if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
00403                 &zmq_fd_size) == -1) {
00404                 free (pollfds);
00405                 return -1;
00406             }
00407             pollfds [i].events = items_ [i].events ? POLLIN : 0;
00408         }
00409         //  Else, the poll item is a raw file descriptor. Just convert the
00410         //  events to normal POLLIN/POLLOUT for poll ().
00411         else {
00412             pollfds [i].fd = items_ [i].fd;
00413             pollfds [i].events =
00414                 (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
00415                 (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0);
00416         }
00417     }
00418 
00419     bool first_pass = true;
00420     int nevents = 0;
00421 
00422     while (true) {
00423 
00424          //  Compute the timeout for the subsequent poll.
00425          int timeout;
00426          if (first_pass)
00427              timeout = 0;
00428          else if (timeout_ < 0)
00429              timeout = -1;
00430          else
00431              timeout = end - now;
00432 
00433         //  Wait for events.
00434         while (true) {
00435             int rc = poll (pollfds, nitems_, timeout);
00436             if (rc == -1 && errno == EINTR) {
00437                 free (pollfds);
00438                 return -1;
00439             }
00440             errno_assert (rc >= 0);
00441             break;
00442         }
00443 
00444         //  Check for the events.
00445         for (int i = 0; i != nitems_; i++) {
00446 
00447             items_ [i].revents = 0;
00448 
00449             //  The poll item is a 0MQ socket. Retrieve pending events
00450             //  using the ZMQ_EVENTS socket option.
00451             if (items_ [i].socket) {
00452                 size_t zmq_events_size = sizeof (uint32_t);
00453                 uint32_t zmq_events;
00454                 if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
00455                     &zmq_events_size) == -1) {
00456                     free (pollfds);
00457                     return -1;
00458                 }
00459                 if ((items_ [i].events & ZMQ_POLLOUT) &&
00460                       (zmq_events & ZMQ_POLLOUT))
00461                     items_ [i].revents |= ZMQ_POLLOUT;
00462                 if ((items_ [i].events & ZMQ_POLLIN) &&
00463                       (zmq_events & ZMQ_POLLIN))
00464                     items_ [i].revents |= ZMQ_POLLIN;
00465             }
00466             //  Else, the poll item is a raw file descriptor, simply convert
00467             //  the events to zmq_pollitem_t-style format.
00468             else {
00469                 if (pollfds [i].revents & POLLIN)
00470                     items_ [i].revents |= ZMQ_POLLIN;
00471                 if (pollfds [i].revents & POLLOUT)
00472                     items_ [i].revents |= ZMQ_POLLOUT;
00473                 if (pollfds [i].revents & ~(POLLIN | POLLOUT))
00474                     items_ [i].revents |= ZMQ_POLLERR;
00475             }
00476 
00477             if (items_ [i].revents)
00478                 nevents++;
00479         }
00480 
00481         //  If timout is zero, exit immediately whether there are events or not.
00482         if (timeout_ == 0)
00483             break;
00484 
00485         //  If there are events to return, we can exit immediately.
00486         if (nevents)
00487             break;
00488 
00489         //  At this point we are meant to wait for events but there are none.
00490         //  If timeout is infinite we can just loop until we get some events.
00491         if (timeout_ < 0) {
00492             if (first_pass)
00493                 first_pass = false;
00494             continue;
00495         }
00496 
00497         //  The timeout is finite and there are no events. In the first pass
00498         //  we get a timestamp of when the polling have begun. (We assume that
00499         //  first pass have taken negligible time). We also compute the time
00500         //  when the polling should time out.
00501         if (first_pass) {
00502             now = clock.now_ms ();
00503             end = now + timeout_;
00504             if (now == end)
00505                 break;
00506             first_pass = false;
00507             continue;
00508         }
00509 
00510         //  Find out whether timeout have expired.
00511         now = clock.now_ms ();
00512         if (now >= end)
00513             break;
00514     }
00515 
00516     free (pollfds);
00517     return nevents;
00518 
00519 #elif defined ZMQ_POLL_BASED_ON_SELECT
00520 
00521     if (unlikely (nitems_ < 0)) {
00522         errno = EINVAL;
00523         return -1;
00524     }
00525     if (unlikely (nitems_ == 0)) {
00526         if (timeout_ == 0)
00527             return 0;
00528 #if defined ZMQ_HAVE_WINDOWS
00529         Sleep (timeout_ > 0 ? timeout_ : INFINITE);
00530         return 0;
00531 #else
00532         return usleep (timeout_ * 1000);
00533 #endif
00534     }
00535 
00536     if (!items_) {
00537         errno = EFAULT;
00538         return -1;
00539     }
00540 
00541     zmq::clock_t clock;
00542     uint64_t now = 0;
00543     uint64_t end = 0;
00544 
00545     //  Ensure we do not attempt to select () on more than FD_SETSIZE
00546     //  file descriptors.
00547     zmq_assert (nitems_ <= FD_SETSIZE);
00548 
00549     fd_set pollset_in;
00550     FD_ZERO (&pollset_in);
00551     fd_set pollset_out;
00552     FD_ZERO (&pollset_out);
00553     fd_set pollset_err;
00554     FD_ZERO (&pollset_err);
00555 
00556     zmq::fd_t maxfd = 0;
00557 
00558     //  Build the fd_sets for passing to select ().
00559     for (int i = 0; i != nitems_; i++) {
00560 
00561         //  If the poll item is a 0MQ socket we are interested in input on the
00562         //  notification file descriptor retrieved by the ZMQ_FD socket option.
00563         if (items_ [i].socket) {
00564             size_t zmq_fd_size = sizeof (zmq::fd_t);
00565             zmq::fd_t notify_fd;
00566             if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
00567                 &zmq_fd_size) == -1)
00568                 return -1;
00569             if (items_ [i].events) {
00570                 FD_SET (notify_fd, &pollset_in);
00571                 if (maxfd < notify_fd)
00572                     maxfd = notify_fd;
00573             }
00574         }
00575         //  Else, the poll item is a raw file descriptor. Convert the poll item
00576         //  events to the appropriate fd_sets.
00577         else {
00578             if (items_ [i].events & ZMQ_POLLIN)
00579                 FD_SET (items_ [i].fd, &pollset_in);
00580             if (items_ [i].events & ZMQ_POLLOUT)
00581                 FD_SET (items_ [i].fd, &pollset_out);
00582             if (items_ [i].events & ZMQ_POLLERR)
00583                 FD_SET (items_ [i].fd, &pollset_err);
00584             if (maxfd < items_ [i].fd)
00585                 maxfd = items_ [i].fd;
00586         }
00587     }
00588 
00589     bool first_pass = true;
00590     int nevents = 0;
00591     fd_set inset, outset, errset;
00592 
00593     while (true) {
00594 
00595         //  Compute the timeout for the subsequent poll.
00596         timeval timeout;
00597         timeval *ptimeout;
00598         if (first_pass) {
00599             timeout.tv_sec = 0;
00600             timeout.tv_usec = 0;
00601             ptimeout = &timeout;
00602         }
00603         else if (timeout_ < 0)
00604             ptimeout = NULL;
00605         else {
00606             timeout.tv_sec = (long) ((end - now) / 1000);
00607             timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
00608             ptimeout = &timeout;
00609         }
00610 
00611         //  Wait for events. Ignore interrupts if there's infinite timeout.
00612         while (true) {
00613             memcpy (&inset, &pollset_in, sizeof (fd_set));
00614             memcpy (&outset, &pollset_out, sizeof (fd_set));
00615             memcpy (&errset, &pollset_err, sizeof (fd_set));
00616 #if defined ZMQ_HAVE_WINDOWS
00617             int rc = select (0, &inset, &outset, &errset, ptimeout);
00618             if (unlikely (rc == SOCKET_ERROR)) {
00619                 zmq::wsa_error_to_errno ();
00620                 if (errno == ENOTSOCK)
00621                     return -1;
00622                 wsa_assert (false);
00623             }
00624 #else
00625             int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
00626             if (unlikely (rc == -1)) {
00627                 if (errno == EINTR || errno == EBADF)
00628                     return -1;
00629                 errno_assert (false);
00630             }
00631 #endif
00632             break;
00633         }
00634 
00635         //  Check for the events.
00636         for (int i = 0; i != nitems_; i++) {
00637 
00638             items_ [i].revents = 0;
00639 
00640             //  The poll item is a 0MQ socket. Retrieve pending events
00641             //  using the ZMQ_EVENTS socket option.
00642             if (items_ [i].socket) {
00643                 size_t zmq_events_size = sizeof (uint32_t);
00644                 uint32_t zmq_events;
00645                 if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
00646                       &zmq_events_size) == -1)
00647                     return -1;
00648                 if ((items_ [i].events & ZMQ_POLLOUT) &&
00649                       (zmq_events & ZMQ_POLLOUT))
00650                     items_ [i].revents |= ZMQ_POLLOUT;
00651                 if ((items_ [i].events & ZMQ_POLLIN) &&
00652                       (zmq_events & ZMQ_POLLIN))
00653                     items_ [i].revents |= ZMQ_POLLIN;
00654             }
00655             //  Else, the poll item is a raw file descriptor, simply convert
00656             //  the events to zmq_pollitem_t-style format.
00657             else {
00658                 if (FD_ISSET (items_ [i].fd, &inset))
00659                     items_ [i].revents |= ZMQ_POLLIN;
00660                 if (FD_ISSET (items_ [i].fd, &outset))
00661                     items_ [i].revents |= ZMQ_POLLOUT;
00662                 if (FD_ISSET (items_ [i].fd, &errset))
00663                     items_ [i].revents |= ZMQ_POLLERR;
00664             }
00665 
00666             if (items_ [i].revents)
00667                 nevents++;
00668         }
00669 
00670         //  If timout is zero, exit immediately whether there are events or not.
00671         if (timeout_ == 0)
00672             break;
00673 
00674         //  If there are events to return, we can exit immediately.
00675         if (nevents)
00676             break;
00677 
00678         //  At this point we are meant to wait for events but there are none.
00679         //  If timeout is infinite we can just loop until we get some events.
00680         if (timeout_ < 0) {
00681             if (first_pass)
00682                 first_pass = false;
00683             continue;
00684         }
00685 
00686         //  The timeout is finite and there are no events. In the first pass
00687         //  we get a timestamp of when the polling have begun. (We assume that
00688         //  first pass have taken negligible time). We also compute the time
00689         //  when the polling should time out.
00690         if (first_pass) {
00691             now = clock.now_ms ();
00692             end = now + timeout_;
00693             if (now == end)
00694                 break;
00695             first_pass = false;
00696             continue;
00697         }
00698 
00699         //  Find out whether timeout have expired.
00700         now = clock.now_ms ();
00701         if (now >= end)
00702             break;
00703     }
00704 
00705     return nevents;
00706 
00707 #else
00708     //  Exotic platforms that support neither poll() nor select().
00709     errno = ENOTSUP;
00710     return -1;
00711 #endif
00712 }
00713 
00714 int zmq_errno ()
00715 {
00716     return errno;
00717 }
00718 
00719 #if defined ZMQ_POLL_BASED_ON_SELECT
00720 #undef ZMQ_POLL_BASED_ON_SELECT
00721 #endif
00722 #if defined ZMQ_POLL_BASED_ON_POLL
00723 #undef ZMQ_POLL_BASED_ON_POLL
00724 #endif
00725 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines