libzmq master
The Intelligent Transport Layer

pgm_socket.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2010-2011 Miru Limited
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include "platform.hpp"
00024 
00025 #ifdef ZMQ_HAVE_OPENPGM
00026 
00027 #ifdef ZMQ_HAVE_WINDOWS
00028 #include "windows.hpp"
00029 #endif
00030 
00031 #ifdef ZMQ_HAVE_LINUX
00032 #include <poll.h>
00033 #endif
00034 
00035 #include <stdlib.h>
00036 #include <string.h>
00037 #include <string>
00038 
00039 #include "options.hpp"
00040 #include "pgm_socket.hpp"
00041 #include "config.hpp"
00042 #include "err.hpp"
00043 #include "random.hpp"
00044 #include "stdint.hpp"
00045 
00046 #ifndef MSG_ERRQUEUE
00047 #define MSG_ERRQUEUE 0x2000
00048 #endif
00049 
00050 zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) :
00051     sock (NULL),
00052     options (options_),
00053     receiver (receiver_),
00054     pgm_msgv (NULL),
00055     pgm_msgv_len (0),
00056     nbytes_rec (0),
00057     nbytes_processed (0),
00058     pgm_msgv_processed (0)
00059 {
00060 }
00061 
00062 //  Create, bind and connect PGM socket.
00063 //  network_ of the form <interface & multicast group decls>:<IP port>
00064 //  e.g. eth0;239.192.0.1:7500
00065 //       link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000
00066 //       ;[fe80::1%en0]:7500
00067 int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
00068 {
00069     //  Can not open transport before destroying old one. 
00070     zmq_assert (sock == NULL);
00071  
00072     //  Parse port number, start from end for IPv6
00073     const char *port_delim = strrchr (network_, ':');
00074     if (!port_delim) {
00075         errno = EINVAL;
00076         return -1;
00077     }
00078 
00079     uint16_t port_number = atoi (port_delim + 1);
00080   
00081     char network [256];
00082     if (port_delim - network_ >= (int) sizeof (network) - 1) {
00083         errno = EINVAL;
00084         return -1;
00085     }
00086     memset (network, '\0', sizeof (network));
00087     memcpy (network, network_, port_delim - network_);
00088 
00089     zmq_assert (options.rate > 0);
00090    
00091     //  Zero counter used in msgrecv.
00092     nbytes_rec = 0;
00093     nbytes_processed = 0;
00094     pgm_msgv_processed = 0;
00095 
00096     pgm_error_t *pgm_error = NULL;
00097     struct pgm_addrinfo_t hints, *res = NULL;
00098     sa_family_t sa_family;
00099 
00100     memset (&hints, 0, sizeof (hints));
00101     hints.ai_family = AF_UNSPEC;
00102     if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) {
00103 
00104         //  Invalid parameters don't set pgm_error_t.
00105         zmq_assert (pgm_error != NULL);
00106         if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && (
00107 
00108               //  NB: cannot catch EAI_BADFLAGS.
00109               pgm_error->code != PGM_ERROR_SERVICE &&
00110               pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT))
00111 
00112             //  User, host, or network configuration or transient error.
00113             goto err_abort;
00114 
00115         //  Fatal OpenPGM internal error.
00116         zmq_assert (false);
00117     }
00118 
00119     zmq_assert (res != NULL);
00120 
00121     //  Pick up detected IP family.
00122     sa_family = res->ai_send_addrs[0].gsr_group.ss_family;
00123 
00124     //  Create IP/PGM or UDP/PGM socket.
00125     if (udp_encapsulation_) {
00126         if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP,
00127               &pgm_error)) {
00128 
00129             //  Invalid parameters don't set pgm_error_t.
00130             zmq_assert (pgm_error != NULL);
00131             if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
00132                   pgm_error->code != PGM_ERROR_BADF &&
00133                   pgm_error->code != PGM_ERROR_FAULT &&
00134                   pgm_error->code != PGM_ERROR_NOPROTOOPT &&
00135                   pgm_error->code != PGM_ERROR_FAILED))
00136 
00137                 //  User, host, or network configuration or transient error.
00138                 goto err_abort;
00139 
00140             //  Fatal OpenPGM internal error.
00141             zmq_assert (false);
00142         }
00143 
00144         //  All options are of data type int
00145         const int encapsulation_port = port_number;
00146         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
00147                 &encapsulation_port, sizeof (encapsulation_port)))
00148             goto err_abort;
00149         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
00150                 &encapsulation_port, sizeof (encapsulation_port)))
00151             goto err_abort;
00152     }
00153     else {
00154         if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM,
00155               &pgm_error)) {
00156 
00157             //  Invalid parameters don't set pgm_error_t.
00158             zmq_assert (pgm_error != NULL);
00159             if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && (
00160                   pgm_error->code != PGM_ERROR_BADF &&
00161                   pgm_error->code != PGM_ERROR_FAULT &&
00162                   pgm_error->code != PGM_ERROR_NOPROTOOPT &&
00163                   pgm_error->code != PGM_ERROR_FAILED))
00164 
00165                 //  User, host, or network configuration or transient error.
00166                 goto err_abort;
00167 
00168             //  Fatal OpenPGM internal error.
00169             zmq_assert (false);
00170         }
00171     }
00172 
00173     {
00174         const int rcvbuf = (int) options.rcvbuf;
00175         if (rcvbuf) {
00176             if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf,
00177                   sizeof (rcvbuf)))
00178                 goto err_abort;
00179         }
00180 
00181         const int sndbuf = (int) options.sndbuf;
00182         if (sndbuf) {
00183             if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf,
00184                   sizeof (sndbuf)))
00185                 goto err_abort;
00186         }
00187 
00188         const int max_tpdu = (int) pgm_max_tpdu;
00189         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu,
00190               sizeof (max_tpdu)))
00191             goto err_abort;
00192     }
00193 
00194     if (receiver) {
00195         const int recv_only        = 1,
00196                   rxw_max_tpdu     = (int) pgm_max_tpdu,
00197                   rxw_sqns         = compute_sqns (rxw_max_tpdu),
00198                   peer_expiry      = pgm_secs (300),
00199                   spmr_expiry      = pgm_msecs (25),
00200                   nak_bo_ivl       = pgm_msecs (50),
00201                   nak_rpt_ivl      = pgm_msecs (200),
00202                   nak_rdata_ivl    = pgm_msecs (200),
00203                   nak_data_retries = 50,
00204                   nak_ncf_retries  = 50;
00205 
00206         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only,
00207                 sizeof (recv_only)) ||
00208             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns,
00209                 sizeof (rxw_sqns)) ||
00210             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry,
00211                 sizeof (peer_expiry)) ||
00212             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry,
00213                 sizeof (spmr_expiry)) ||
00214             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl,
00215                 sizeof (nak_bo_ivl)) ||
00216             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl,
00217                 sizeof (nak_rpt_ivl)) ||
00218             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL,
00219                 &nak_rdata_ivl, sizeof (nak_rdata_ivl)) ||
00220             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES,
00221                 &nak_data_retries, sizeof (nak_data_retries)) ||
00222             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
00223                 &nak_ncf_retries, sizeof (nak_ncf_retries)))
00224             goto err_abort;
00225     } else {
00226         const int send_only        = 1,
00227                   max_rte      = (int) ((options.rate * 1000) / 8),
00228                   txw_max_tpdu     = (int) pgm_max_tpdu,
00229                   txw_sqns         = compute_sqns (txw_max_tpdu),
00230                   ambient_spm      = pgm_secs (30),
00231                   heartbeat_spm[]  = { pgm_msecs (100),
00232                                        pgm_msecs (100),
00233                                        pgm_msecs (100),
00234                                        pgm_msecs (100),
00235                                        pgm_msecs (1300),
00236                                        pgm_secs  (7),
00237                                        pgm_secs  (16),
00238                                        pgm_secs  (25),
00239                                        pgm_secs  (30) };
00240 
00241         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY,
00242                 &send_only, sizeof (send_only)) ||
00243             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE,
00244                 &max_rte, sizeof (max_rte)) ||
00245             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS,
00246                 &txw_sqns, sizeof (txw_sqns)) ||
00247             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM,
00248                 &ambient_spm, sizeof (ambient_spm)) ||
00249             !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM,
00250                 &heartbeat_spm, sizeof (heartbeat_spm)))
00251             goto err_abort;
00252     }
00253 
00254     //  PGM transport GSI.
00255     struct pgm_sockaddr_t addr;
00256 
00257     memset (&addr, 0, sizeof(addr));
00258     addr.sa_port = port_number;
00259     addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT;
00260 
00261     //  Create random GSI.
00262     uint32_t buf [2];
00263     buf [0] = generate_random ();
00264     buf [1] = generate_random ();
00265     if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8))
00266         goto err_abort;
00267 
00268 
00269     //  Bind a transport to the specified network devices.
00270     struct pgm_interface_req_t if_req;
00271     memset (&if_req, 0, sizeof(if_req));
00272     if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface;
00273     if_req.ir_scope_id  = 0;
00274     if (AF_INET6 == sa_family) {
00275         struct sockaddr_in6 sa6;
00276         memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6));
00277         if_req.ir_scope_id = sa6.sin6_scope_id;
00278     }
00279     if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req),
00280           &if_req, sizeof (if_req), &pgm_error)) {
00281 
00282         //  Invalid parameters don't set pgm_error_t.
00283         zmq_assert (pgm_error != NULL);
00284         if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET ||
00285              pgm_error->domain == PGM_ERROR_DOMAIN_IF) && (
00286              pgm_error->code != PGM_ERROR_INVAL &&
00287              pgm_error->code != PGM_ERROR_BADF &&
00288              pgm_error->code != PGM_ERROR_FAULT))
00289 
00290             //  User, host, or network configuration or transient error.
00291             goto err_abort;
00292 
00293         //  Fatal OpenPGM internal error.
00294         zmq_assert (false);
00295     }
00296 
00297     //  Join IP multicast groups.
00298     for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) {
00299         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP,
00300               &res->ai_recv_addrs [i], sizeof (struct group_req)))
00301             goto err_abort;
00302     }
00303     if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP,
00304           &res->ai_send_addrs [0], sizeof (struct group_req)))
00305         goto err_abort;
00306 
00307     pgm_freeaddrinfo (res);
00308     res = NULL;
00309 
00310     //  Set IP level parameters.
00311     {
00312         const int multicast_loop = 0;
00313         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP,
00314               &multicast_loop, sizeof (multicast_loop)))
00315             goto err_abort;
00316 
00317         const int multicast_hops = options.multicast_hops;
00318         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS,
00319                 &multicast_hops, sizeof (multicast_hops)))
00320             goto err_abort;
00321 
00322         //  Expedited Forwarding PHB for network elements, no ECN.
00323         const int dscp = 0x2e << 2; 
00324         if (AF_INET6 != sa_family && !pgm_setsockopt (sock,
00325               IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp)))
00326             goto err_abort;
00327 
00328         const int nonblocking = 1;
00329         if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK,
00330               &nonblocking, sizeof (nonblocking)))
00331             goto err_abort;
00332     }
00333 
00334     //  Connect PGM transport to start state machine.
00335     if (!pgm_connect (sock, &pgm_error)) {
00336 
00337         //  Invalid parameters don't set pgm_error_t.
00338         zmq_assert (pgm_error != NULL);
00339         goto err_abort;
00340     }
00341 
00342     //  For receiver transport preallocate pgm_msgv array.
00343     if (receiver) {
00344         zmq_assert (in_batch_size > 0);
00345         size_t max_tsdu_size = get_max_tsdu_size ();
00346         pgm_msgv_len = (int) in_batch_size / max_tsdu_size;
00347         if ((int) in_batch_size % max_tsdu_size)
00348             pgm_msgv_len++;
00349         zmq_assert (pgm_msgv_len);
00350 
00351         pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len);
00352         alloc_assert (pgm_msgv);
00353     }
00354 
00355     return 0;
00356 
00357 err_abort:
00358     if (sock != NULL) {
00359         pgm_close (sock, FALSE);
00360         sock = NULL;
00361     }
00362     if (res != NULL) {
00363         pgm_freeaddrinfo (res);
00364         res = NULL;
00365     }
00366     if (pgm_error != NULL) {
00367         pgm_error_free (pgm_error);
00368         pgm_error = NULL;
00369     }
00370     errno = EINVAL;
00371     return -1;
00372 }
00373 
00374 zmq::pgm_socket_t::~pgm_socket_t ()
00375 {
00376     if (pgm_msgv)
00377         free (pgm_msgv);
00378     if (sock) 
00379         pgm_close (sock, TRUE);
00380 }
00381 
00382 //  Get receiver fds. receive_fd_ is signaled for incoming packets,
00383 //  waiting_pipe_fd_ is signaled for state driven events and data.
00384 void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, 
00385     fd_t *waiting_pipe_fd_)
00386 {
00387     socklen_t socklen;
00388     bool rc;
00389 
00390     zmq_assert (receive_fd_);
00391     zmq_assert (waiting_pipe_fd_);
00392 
00393     socklen = sizeof (*receive_fd_);
00394     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
00395         &socklen);
00396     zmq_assert (rc);
00397     zmq_assert (socklen == sizeof (*receive_fd_));
00398 
00399     socklen = sizeof (*waiting_pipe_fd_);
00400     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_,
00401         &socklen);
00402     zmq_assert (rc);
00403     zmq_assert (socklen == sizeof (*waiting_pipe_fd_));
00404 }
00405 
00406 //  Get fds and store them into user allocated memory. 
00407 //  send_fd is for non-blocking send wire notifications.
00408 //  receive_fd_ is for incoming back-channel protocol packets.
00409 //  rdata_notify_fd_ is raised for waiting repair transmissions.
00410 //  pending_notify_fd_ is for state driven events.
00411 void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, 
00412     fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_)
00413 {
00414     socklen_t socklen;
00415     bool rc;
00416 
00417     zmq_assert (send_fd_);
00418     zmq_assert (receive_fd_);
00419     zmq_assert (rdata_notify_fd_);
00420     zmq_assert (pending_notify_fd_);
00421 
00422     socklen = sizeof (*send_fd_);
00423     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen);
00424     zmq_assert (rc);
00425     zmq_assert (socklen == sizeof (*receive_fd_));
00426 
00427     socklen = sizeof (*receive_fd_);
00428     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_,
00429         &socklen);
00430     zmq_assert (rc);
00431     zmq_assert (socklen == sizeof (*receive_fd_));
00432 
00433     socklen = sizeof (*rdata_notify_fd_);
00434     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_,
00435         &socklen);
00436     zmq_assert (rc);
00437     zmq_assert (socklen == sizeof (*rdata_notify_fd_));
00438 
00439     socklen = sizeof (*pending_notify_fd_);
00440     rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK,
00441         pending_notify_fd_, &socklen);
00442     zmq_assert (rc);
00443     zmq_assert (socklen == sizeof (*pending_notify_fd_));
00444 }
00445 
00446 //  Send one APDU, transmit window owned memory.
00447 //  data_len_ must be less than one TPDU.
00448 size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
00449 {
00450     size_t nbytes = 0;
00451    
00452     const int status = pgm_send (sock, data_, data_len_, &nbytes);
00453 
00454     //  We have to write all data as one packet.
00455     if (nbytes > 0) {
00456         zmq_assert (status == PGM_IO_STATUS_NORMAL);
00457         zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);
00458     } else {
00459         zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
00460             status == PGM_IO_STATUS_WOULD_BLOCK);
00461 
00462         if (status == PGM_IO_STATUS_RATE_LIMITED)
00463             errno = ENOMEM;
00464         else
00465             errno = EBUSY;
00466     }
00467 
00468     //  Save return value.
00469     last_tx_status = status;
00470 
00471     return nbytes;
00472 }
00473 
00474 long zmq::pgm_socket_t::get_rx_timeout ()
00475 {
00476     if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED &&
00477           last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
00478         return -1;
00479 
00480     struct timeval tv;
00481     socklen_t optlen = sizeof (tv);
00482     const bool rc = pgm_getsockopt (sock, IPPROTO_PGM,
00483         last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN :
00484         PGM_TIME_REMAIN, &tv, &optlen);
00485     zmq_assert (rc);
00486 
00487     const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00488 
00489     return timeout;
00490 }
00491 
00492 long zmq::pgm_socket_t::get_tx_timeout ()
00493 {
00494     if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
00495         return -1;
00496 
00497     struct timeval tv;
00498     socklen_t optlen = sizeof (tv);
00499     const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv,
00500         &optlen);
00501     zmq_assert (rc);
00502 
00503     const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
00504 
00505     return timeout;
00506 }
00507 
00508 //  Return max TSDU size without fragmentation from current PGM transport.
00509 size_t zmq::pgm_socket_t::get_max_tsdu_size ()
00510 {
00511     int max_tsdu = 0;
00512     socklen_t optlen = sizeof (max_tsdu);
00513 
00514     bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen);
00515     zmq_assert (rc);
00516     zmq_assert (optlen == sizeof (max_tsdu));
00517     return (size_t) max_tsdu;
00518 }
00519 
00520 //  pgm_recvmsgv is called to fill the pgm_msgv array up to  pgm_msgv_len.
00521 //  In subsequent calls data from pgm_msgv structure are returned.
00522 ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
00523 {
00524     size_t raw_data_len = 0;
00525 
00526     //  We just sent all data from pgm_transport_recvmsgv up 
00527     //  and have to return 0 that another engine in this thread is scheduled.
00528     if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
00529 
00530         //  Reset all the counters.
00531         nbytes_rec = 0;
00532         nbytes_processed = 0;
00533         pgm_msgv_processed = 0;
00534         errno = EAGAIN;
00535         return 0;
00536     }
00537 
00538     //  If we have are going first time or if we have processed all pgm_msgv_t
00539     //  structure previously read from the pgm socket.
00540     if (nbytes_rec == nbytes_processed) {
00541 
00542         //  Check program flow.
00543         zmq_assert (pgm_msgv_processed == 0);
00544         zmq_assert (nbytes_processed == 0);
00545         zmq_assert (nbytes_rec == 0);
00546 
00547         //  Receive a vector of Application Protocol Domain Unit's (APDUs) 
00548         //  from the transport.
00549         pgm_error_t *pgm_error = NULL;
00550 
00551         const int status = pgm_recvmsgv (sock, pgm_msgv,
00552             pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error);
00553 
00554         //  Invalid parameters.
00555         zmq_assert (status != PGM_IO_STATUS_ERROR);
00556 
00557         last_rx_status = status;
00558 
00559         //  In a case when no ODATA/RDATA fired POLLIN event (SPM...)
00560         //  pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
00561         if (status == PGM_IO_STATUS_TIMER_PENDING) {
00562 
00563             zmq_assert (nbytes_rec == 0);
00564 
00565             //  In case if no RDATA/ODATA caused POLLIN 0 is 
00566             //  returned.
00567             nbytes_rec = 0;
00568             errno = EBUSY;
00569             return 0;
00570         }
00571 
00572         //  Send SPMR, NAK, ACK is rate limited.
00573         if (status == PGM_IO_STATUS_RATE_LIMITED) {
00574 
00575             zmq_assert (nbytes_rec == 0);
00576 
00577             //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
00578             nbytes_rec = 0;
00579             errno = ENOMEM;
00580             return 0;
00581         }
00582 
00583         //  No peers and hence no incoming packets.
00584         if (status == PGM_IO_STATUS_WOULD_BLOCK) {
00585 
00586             zmq_assert (nbytes_rec == 0);
00587 
00588             //  In case if no RDATA/ODATA caused POLLIN 0 is returned.
00589             nbytes_rec = 0;
00590             errno = EAGAIN;
00591             return 0;
00592         }
00593 
00594         //  Data loss.
00595         if (status == PGM_IO_STATUS_RESET) {
00596 
00597             struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0];
00598 
00599             //  Save lost data TSI.
00600             *tsi_ = &skb->tsi;
00601             nbytes_rec = 0;
00602 
00603             //  In case of dala loss -1 is returned.
00604             errno = EINVAL;
00605             pgm_free_skb (skb);
00606             return -1;
00607         }
00608 
00609         zmq_assert (status == PGM_IO_STATUS_NORMAL);
00610     }
00611     else
00612     {
00613         zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
00614     }
00615 
00616     // Zero byte payloads are valid in PGM, but not 0MQ protocol.
00617     zmq_assert (nbytes_rec > 0);
00618 
00619     // Only one APDU per pgm_msgv_t structure is allowed.
00620     zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1);
00621  
00622     struct pgm_sk_buff_t* skb = 
00623         pgm_msgv [pgm_msgv_processed].msgv_skb [0];
00624 
00625     //  Take pointers from pgm_msgv_t structure.
00626     *raw_data_ = skb->data;
00627     raw_data_len = skb->len;
00628 
00629     //  Save current TSI.
00630     *tsi_ = &skb->tsi;
00631 
00632     //  Move the the next pgm_msgv_t structure.
00633     pgm_msgv_processed++;
00634     zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
00635     nbytes_processed +=raw_data_len;
00636 
00637     return raw_data_len;
00638 }
00639 
00640 void zmq::pgm_socket_t::process_upstream ()
00641 {
00642     pgm_msgv_t dummy_msg;
00643 
00644     size_t dummy_bytes = 0;
00645     pgm_error_t *pgm_error = NULL;
00646 
00647     const int status = pgm_recvmsgv (sock, &dummy_msg,
00648         1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error);
00649 
00650     //  Invalid parameters.
00651     zmq_assert (status != PGM_IO_STATUS_ERROR);
00652 
00653     //  No data should be returned.
00654     zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || 
00655         status == PGM_IO_STATUS_RATE_LIMITED ||
00656         status == PGM_IO_STATUS_WOULD_BLOCK));
00657 
00658     last_rx_status = status;
00659 
00660     if (status == PGM_IO_STATUS_TIMER_PENDING)
00661         errno = EBUSY;
00662     else if (status == PGM_IO_STATUS_RATE_LIMITED)
00663         errno = ENOMEM;
00664     else
00665         errno = EAGAIN;
00666 }
00667 
00668 int zmq::pgm_socket_t::compute_sqns (int tpdu_)
00669 {
00670     //  Convert rate into B/ms.
00671     uint64_t rate = uint64_t (options.rate) / 8;
00672         
00673     //  Compute the size of the buffer in bytes.
00674     uint64_t size = uint64_t (options.recovery_ivl) * rate;
00675 
00676     //  Translate the size into number of packets.
00677     uint64_t sqns = size / tpdu_;
00678 
00679     //  Buffer should be able to hold at least one packet.
00680     if (sqns == 0)
00681         sqns = 1;
00682 
00683     return (int) sqns;
00684 }
00685 
00686 #endif
00687 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines