![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2010-2011 Miru Limited 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "platform.hpp" 00024 00025 #ifdef ZMQ_HAVE_OPENPGM 00026 00027 #ifdef ZMQ_HAVE_WINDOWS 00028 #include "windows.hpp" 00029 #endif 00030 00031 #ifdef ZMQ_HAVE_LINUX 00032 #include <poll.h> 00033 #endif 00034 00035 #include <stdlib.h> 00036 #include <string.h> 00037 #include <string> 00038 00039 #include "options.hpp" 00040 #include "pgm_socket.hpp" 00041 #include "config.hpp" 00042 #include "err.hpp" 00043 #include "random.hpp" 00044 #include "stdint.hpp" 00045 00046 #ifndef MSG_ERRQUEUE 00047 #define MSG_ERRQUEUE 0x2000 00048 #endif 00049 00050 zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : 00051 sock (NULL), 00052 options (options_), 00053 receiver (receiver_), 00054 pgm_msgv (NULL), 00055 pgm_msgv_len (0), 00056 nbytes_rec (0), 00057 nbytes_processed (0), 00058 pgm_msgv_processed (0) 00059 { 00060 } 00061 00062 // Create, bind and connect PGM socket. 00063 // network_ of the form <interface & multicast group decls>:<IP port> 00064 // e.g. eth0;239.192.0.1:7500 00065 // link-local;224.250.0.1,224.250.0.2;224.250.0.3:8000 00066 // ;[fe80::1%en0]:7500 00067 int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) 00068 { 00069 // Can not open transport before destroying old one. 00070 zmq_assert (sock == NULL); 00071 00072 // Parse port number, start from end for IPv6 00073 const char *port_delim = strrchr (network_, ':'); 00074 if (!port_delim) { 00075 errno = EINVAL; 00076 return -1; 00077 } 00078 00079 uint16_t port_number = atoi (port_delim + 1); 00080 00081 char network [256]; 00082 if (port_delim - network_ >= (int) sizeof (network) - 1) { 00083 errno = EINVAL; 00084 return -1; 00085 } 00086 memset (network, '\0', sizeof (network)); 00087 memcpy (network, network_, port_delim - network_); 00088 00089 zmq_assert (options.rate > 0); 00090 00091 // Zero counter used in msgrecv. 00092 nbytes_rec = 0; 00093 nbytes_processed = 0; 00094 pgm_msgv_processed = 0; 00095 00096 pgm_error_t *pgm_error = NULL; 00097 struct pgm_addrinfo_t hints, *res = NULL; 00098 sa_family_t sa_family; 00099 00100 memset (&hints, 0, sizeof (hints)); 00101 hints.ai_family = AF_UNSPEC; 00102 if (!pgm_getaddrinfo (network, NULL, &res, &pgm_error)) { 00103 00104 // Invalid parameters don't set pgm_error_t. 00105 zmq_assert (pgm_error != NULL); 00106 if (pgm_error->domain == PGM_ERROR_DOMAIN_IF && ( 00107 00108 // NB: cannot catch EAI_BADFLAGS. 00109 pgm_error->code != PGM_ERROR_SERVICE && 00110 pgm_error->code != PGM_ERROR_SOCKTNOSUPPORT)) 00111 00112 // User, host, or network configuration or transient error. 00113 goto err_abort; 00114 00115 // Fatal OpenPGM internal error. 00116 zmq_assert (false); 00117 } 00118 00119 zmq_assert (res != NULL); 00120 00121 // Pick up detected IP family. 00122 sa_family = res->ai_send_addrs[0].gsr_group.ss_family; 00123 00124 // Create IP/PGM or UDP/PGM socket. 00125 if (udp_encapsulation_) { 00126 if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_UDP, 00127 &pgm_error)) { 00128 00129 // Invalid parameters don't set pgm_error_t. 00130 zmq_assert (pgm_error != NULL); 00131 if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( 00132 pgm_error->code != PGM_ERROR_BADF && 00133 pgm_error->code != PGM_ERROR_FAULT && 00134 pgm_error->code != PGM_ERROR_NOPROTOOPT && 00135 pgm_error->code != PGM_ERROR_FAILED)) 00136 00137 // User, host, or network configuration or transient error. 00138 goto err_abort; 00139 00140 // Fatal OpenPGM internal error. 00141 zmq_assert (false); 00142 } 00143 00144 // All options are of data type int 00145 const int encapsulation_port = port_number; 00146 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, 00147 &encapsulation_port, sizeof (encapsulation_port))) 00148 goto err_abort; 00149 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, 00150 &encapsulation_port, sizeof (encapsulation_port))) 00151 goto err_abort; 00152 } 00153 else { 00154 if (!pgm_socket (&sock, sa_family, SOCK_SEQPACKET, IPPROTO_PGM, 00155 &pgm_error)) { 00156 00157 // Invalid parameters don't set pgm_error_t. 00158 zmq_assert (pgm_error != NULL); 00159 if (pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET && ( 00160 pgm_error->code != PGM_ERROR_BADF && 00161 pgm_error->code != PGM_ERROR_FAULT && 00162 pgm_error->code != PGM_ERROR_NOPROTOOPT && 00163 pgm_error->code != PGM_ERROR_FAILED)) 00164 00165 // User, host, or network configuration or transient error. 00166 goto err_abort; 00167 00168 // Fatal OpenPGM internal error. 00169 zmq_assert (false); 00170 } 00171 } 00172 00173 { 00174 const int rcvbuf = (int) options.rcvbuf; 00175 if (rcvbuf) { 00176 if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, 00177 sizeof (rcvbuf))) 00178 goto err_abort; 00179 } 00180 00181 const int sndbuf = (int) options.sndbuf; 00182 if (sndbuf) { 00183 if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, 00184 sizeof (sndbuf))) 00185 goto err_abort; 00186 } 00187 00188 const int max_tpdu = (int) pgm_max_tpdu; 00189 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, 00190 sizeof (max_tpdu))) 00191 goto err_abort; 00192 } 00193 00194 if (receiver) { 00195 const int recv_only = 1, 00196 rxw_max_tpdu = (int) pgm_max_tpdu, 00197 rxw_sqns = compute_sqns (rxw_max_tpdu), 00198 peer_expiry = pgm_secs (300), 00199 spmr_expiry = pgm_msecs (25), 00200 nak_bo_ivl = pgm_msecs (50), 00201 nak_rpt_ivl = pgm_msecs (200), 00202 nak_rdata_ivl = pgm_msecs (200), 00203 nak_data_retries = 50, 00204 nak_ncf_retries = 50; 00205 00206 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_RECV_ONLY, &recv_only, 00207 sizeof (recv_only)) || 00208 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_RXW_SQNS, &rxw_sqns, 00209 sizeof (rxw_sqns)) || 00210 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_PEER_EXPIRY, &peer_expiry, 00211 sizeof (peer_expiry)) || 00212 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_SPMR_EXPIRY, &spmr_expiry, 00213 sizeof (spmr_expiry)) || 00214 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_BO_IVL, &nak_bo_ivl, 00215 sizeof (nak_bo_ivl)) || 00216 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RPT_IVL, &nak_rpt_ivl, 00217 sizeof (nak_rpt_ivl)) || 00218 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_RDATA_IVL, 00219 &nak_rdata_ivl, sizeof (nak_rdata_ivl)) || 00220 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_DATA_RETRIES, 00221 &nak_data_retries, sizeof (nak_data_retries)) || 00222 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, 00223 &nak_ncf_retries, sizeof (nak_ncf_retries))) 00224 goto err_abort; 00225 } else { 00226 const int send_only = 1, 00227 max_rte = (int) ((options.rate * 1000) / 8), 00228 txw_max_tpdu = (int) pgm_max_tpdu, 00229 txw_sqns = compute_sqns (txw_max_tpdu), 00230 ambient_spm = pgm_secs (30), 00231 heartbeat_spm[] = { pgm_msecs (100), 00232 pgm_msecs (100), 00233 pgm_msecs (100), 00234 pgm_msecs (100), 00235 pgm_msecs (1300), 00236 pgm_secs (7), 00237 pgm_secs (16), 00238 pgm_secs (25), 00239 pgm_secs (30) }; 00240 00241 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_ONLY, 00242 &send_only, sizeof (send_only)) || 00243 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_ODATA_MAX_RTE, 00244 &max_rte, sizeof (max_rte)) || 00245 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_TXW_SQNS, 00246 &txw_sqns, sizeof (txw_sqns)) || 00247 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_AMBIENT_SPM, 00248 &ambient_spm, sizeof (ambient_spm)) || 00249 !pgm_setsockopt (sock, IPPROTO_PGM, PGM_HEARTBEAT_SPM, 00250 &heartbeat_spm, sizeof (heartbeat_spm))) 00251 goto err_abort; 00252 } 00253 00254 // PGM transport GSI. 00255 struct pgm_sockaddr_t addr; 00256 00257 memset (&addr, 0, sizeof(addr)); 00258 addr.sa_port = port_number; 00259 addr.sa_addr.sport = DEFAULT_DATA_SOURCE_PORT; 00260 00261 // Create random GSI. 00262 uint32_t buf [2]; 00263 buf [0] = generate_random (); 00264 buf [1] = generate_random (); 00265 if (!pgm_gsi_create_from_data (&addr.sa_addr.gsi, (uint8_t*) buf, 8)) 00266 goto err_abort; 00267 00268 00269 // Bind a transport to the specified network devices. 00270 struct pgm_interface_req_t if_req; 00271 memset (&if_req, 0, sizeof(if_req)); 00272 if_req.ir_interface = res->ai_recv_addrs[0].gsr_interface; 00273 if_req.ir_scope_id = 0; 00274 if (AF_INET6 == sa_family) { 00275 struct sockaddr_in6 sa6; 00276 memcpy (&sa6, &res->ai_recv_addrs[0].gsr_group, sizeof (sa6)); 00277 if_req.ir_scope_id = sa6.sin6_scope_id; 00278 } 00279 if (!pgm_bind3 (sock, &addr, sizeof (addr), &if_req, sizeof (if_req), 00280 &if_req, sizeof (if_req), &pgm_error)) { 00281 00282 // Invalid parameters don't set pgm_error_t. 00283 zmq_assert (pgm_error != NULL); 00284 if ((pgm_error->domain == PGM_ERROR_DOMAIN_SOCKET || 00285 pgm_error->domain == PGM_ERROR_DOMAIN_IF) && ( 00286 pgm_error->code != PGM_ERROR_INVAL && 00287 pgm_error->code != PGM_ERROR_BADF && 00288 pgm_error->code != PGM_ERROR_FAULT)) 00289 00290 // User, host, or network configuration or transient error. 00291 goto err_abort; 00292 00293 // Fatal OpenPGM internal error. 00294 zmq_assert (false); 00295 } 00296 00297 // Join IP multicast groups. 00298 for (unsigned i = 0; i < res->ai_recv_addrs_len; i++) { 00299 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_JOIN_GROUP, 00300 &res->ai_recv_addrs [i], sizeof (struct group_req))) 00301 goto err_abort; 00302 } 00303 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_SEND_GROUP, 00304 &res->ai_send_addrs [0], sizeof (struct group_req))) 00305 goto err_abort; 00306 00307 pgm_freeaddrinfo (res); 00308 res = NULL; 00309 00310 // Set IP level parameters. 00311 { 00312 const int multicast_loop = 0; 00313 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, 00314 &multicast_loop, sizeof (multicast_loop))) 00315 goto err_abort; 00316 00317 const int multicast_hops = options.multicast_hops; 00318 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, 00319 &multicast_hops, sizeof (multicast_hops))) 00320 goto err_abort; 00321 00322 // Expedited Forwarding PHB for network elements, no ECN. 00323 const int dscp = 0x2e << 2; 00324 if (AF_INET6 != sa_family && !pgm_setsockopt (sock, 00325 IPPROTO_PGM, PGM_TOS, &dscp, sizeof (dscp))) 00326 goto err_abort; 00327 00328 const int nonblocking = 1; 00329 if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, 00330 &nonblocking, sizeof (nonblocking))) 00331 goto err_abort; 00332 } 00333 00334 // Connect PGM transport to start state machine. 00335 if (!pgm_connect (sock, &pgm_error)) { 00336 00337 // Invalid parameters don't set pgm_error_t. 00338 zmq_assert (pgm_error != NULL); 00339 goto err_abort; 00340 } 00341 00342 // For receiver transport preallocate pgm_msgv array. 00343 if (receiver) { 00344 zmq_assert (in_batch_size > 0); 00345 size_t max_tsdu_size = get_max_tsdu_size (); 00346 pgm_msgv_len = (int) in_batch_size / max_tsdu_size; 00347 if ((int) in_batch_size % max_tsdu_size) 00348 pgm_msgv_len++; 00349 zmq_assert (pgm_msgv_len); 00350 00351 pgm_msgv = (pgm_msgv_t*) malloc (sizeof (pgm_msgv_t) * pgm_msgv_len); 00352 alloc_assert (pgm_msgv); 00353 } 00354 00355 return 0; 00356 00357 err_abort: 00358 if (sock != NULL) { 00359 pgm_close (sock, FALSE); 00360 sock = NULL; 00361 } 00362 if (res != NULL) { 00363 pgm_freeaddrinfo (res); 00364 res = NULL; 00365 } 00366 if (pgm_error != NULL) { 00367 pgm_error_free (pgm_error); 00368 pgm_error = NULL; 00369 } 00370 errno = EINVAL; 00371 return -1; 00372 } 00373 00374 zmq::pgm_socket_t::~pgm_socket_t () 00375 { 00376 if (pgm_msgv) 00377 free (pgm_msgv); 00378 if (sock) 00379 pgm_close (sock, TRUE); 00380 } 00381 00382 // Get receiver fds. receive_fd_ is signaled for incoming packets, 00383 // waiting_pipe_fd_ is signaled for state driven events and data. 00384 void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, 00385 fd_t *waiting_pipe_fd_) 00386 { 00387 socklen_t socklen; 00388 bool rc; 00389 00390 zmq_assert (receive_fd_); 00391 zmq_assert (waiting_pipe_fd_); 00392 00393 socklen = sizeof (*receive_fd_); 00394 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, 00395 &socklen); 00396 zmq_assert (rc); 00397 zmq_assert (socklen == sizeof (*receive_fd_)); 00398 00399 socklen = sizeof (*waiting_pipe_fd_); 00400 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, waiting_pipe_fd_, 00401 &socklen); 00402 zmq_assert (rc); 00403 zmq_assert (socklen == sizeof (*waiting_pipe_fd_)); 00404 } 00405 00406 // Get fds and store them into user allocated memory. 00407 // send_fd is for non-blocking send wire notifications. 00408 // receive_fd_ is for incoming back-channel protocol packets. 00409 // rdata_notify_fd_ is raised for waiting repair transmissions. 00410 // pending_notify_fd_ is for state driven events. 00411 void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, 00412 fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_) 00413 { 00414 socklen_t socklen; 00415 bool rc; 00416 00417 zmq_assert (send_fd_); 00418 zmq_assert (receive_fd_); 00419 zmq_assert (rdata_notify_fd_); 00420 zmq_assert (pending_notify_fd_); 00421 00422 socklen = sizeof (*send_fd_); 00423 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_SEND_SOCK, send_fd_, &socklen); 00424 zmq_assert (rc); 00425 zmq_assert (socklen == sizeof (*receive_fd_)); 00426 00427 socklen = sizeof (*receive_fd_); 00428 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RECV_SOCK, receive_fd_, 00429 &socklen); 00430 zmq_assert (rc); 00431 zmq_assert (socklen == sizeof (*receive_fd_)); 00432 00433 socklen = sizeof (*rdata_notify_fd_); 00434 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_REPAIR_SOCK, rdata_notify_fd_, 00435 &socklen); 00436 zmq_assert (rc); 00437 zmq_assert (socklen == sizeof (*rdata_notify_fd_)); 00438 00439 socklen = sizeof (*pending_notify_fd_); 00440 rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_PENDING_SOCK, 00441 pending_notify_fd_, &socklen); 00442 zmq_assert (rc); 00443 zmq_assert (socklen == sizeof (*pending_notify_fd_)); 00444 } 00445 00446 // Send one APDU, transmit window owned memory. 00447 // data_len_ must be less than one TPDU. 00448 size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) 00449 { 00450 size_t nbytes = 0; 00451 00452 const int status = pgm_send (sock, data_, data_len_, &nbytes); 00453 00454 // We have to write all data as one packet. 00455 if (nbytes > 0) { 00456 zmq_assert (status == PGM_IO_STATUS_NORMAL); 00457 zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); 00458 } else { 00459 zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || 00460 status == PGM_IO_STATUS_WOULD_BLOCK); 00461 00462 if (status == PGM_IO_STATUS_RATE_LIMITED) 00463 errno = ENOMEM; 00464 else 00465 errno = EBUSY; 00466 } 00467 00468 // Save return value. 00469 last_tx_status = status; 00470 00471 return nbytes; 00472 } 00473 00474 long zmq::pgm_socket_t::get_rx_timeout () 00475 { 00476 if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED && 00477 last_rx_status != PGM_IO_STATUS_TIMER_PENDING) 00478 return -1; 00479 00480 struct timeval tv; 00481 socklen_t optlen = sizeof (tv); 00482 const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, 00483 last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN : 00484 PGM_TIME_REMAIN, &tv, &optlen); 00485 zmq_assert (rc); 00486 00487 const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); 00488 00489 return timeout; 00490 } 00491 00492 long zmq::pgm_socket_t::get_tx_timeout () 00493 { 00494 if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED) 00495 return -1; 00496 00497 struct timeval tv; 00498 socklen_t optlen = sizeof (tv); 00499 const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, 00500 &optlen); 00501 zmq_assert (rc); 00502 00503 const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000); 00504 00505 return timeout; 00506 } 00507 00508 // Return max TSDU size without fragmentation from current PGM transport. 00509 size_t zmq::pgm_socket_t::get_max_tsdu_size () 00510 { 00511 int max_tsdu = 0; 00512 socklen_t optlen = sizeof (max_tsdu); 00513 00514 bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_MSS, &max_tsdu, &optlen); 00515 zmq_assert (rc); 00516 zmq_assert (optlen == sizeof (max_tsdu)); 00517 return (size_t) max_tsdu; 00518 } 00519 00520 // pgm_recvmsgv is called to fill the pgm_msgv array up to pgm_msgv_len. 00521 // In subsequent calls data from pgm_msgv structure are returned. 00522 ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) 00523 { 00524 size_t raw_data_len = 0; 00525 00526 // We just sent all data from pgm_transport_recvmsgv up 00527 // and have to return 0 that another engine in this thread is scheduled. 00528 if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { 00529 00530 // Reset all the counters. 00531 nbytes_rec = 0; 00532 nbytes_processed = 0; 00533 pgm_msgv_processed = 0; 00534 errno = EAGAIN; 00535 return 0; 00536 } 00537 00538 // If we have are going first time or if we have processed all pgm_msgv_t 00539 // structure previously read from the pgm socket. 00540 if (nbytes_rec == nbytes_processed) { 00541 00542 // Check program flow. 00543 zmq_assert (pgm_msgv_processed == 0); 00544 zmq_assert (nbytes_processed == 0); 00545 zmq_assert (nbytes_rec == 0); 00546 00547 // Receive a vector of Application Protocol Domain Unit's (APDUs) 00548 // from the transport. 00549 pgm_error_t *pgm_error = NULL; 00550 00551 const int status = pgm_recvmsgv (sock, pgm_msgv, 00552 pgm_msgv_len, MSG_ERRQUEUE, &nbytes_rec, &pgm_error); 00553 00554 // Invalid parameters. 00555 zmq_assert (status != PGM_IO_STATUS_ERROR); 00556 00557 last_rx_status = status; 00558 00559 // In a case when no ODATA/RDATA fired POLLIN event (SPM...) 00560 // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. 00561 if (status == PGM_IO_STATUS_TIMER_PENDING) { 00562 00563 zmq_assert (nbytes_rec == 0); 00564 00565 // In case if no RDATA/ODATA caused POLLIN 0 is 00566 // returned. 00567 nbytes_rec = 0; 00568 errno = EBUSY; 00569 return 0; 00570 } 00571 00572 // Send SPMR, NAK, ACK is rate limited. 00573 if (status == PGM_IO_STATUS_RATE_LIMITED) { 00574 00575 zmq_assert (nbytes_rec == 0); 00576 00577 // In case if no RDATA/ODATA caused POLLIN 0 is returned. 00578 nbytes_rec = 0; 00579 errno = ENOMEM; 00580 return 0; 00581 } 00582 00583 // No peers and hence no incoming packets. 00584 if (status == PGM_IO_STATUS_WOULD_BLOCK) { 00585 00586 zmq_assert (nbytes_rec == 0); 00587 00588 // In case if no RDATA/ODATA caused POLLIN 0 is returned. 00589 nbytes_rec = 0; 00590 errno = EAGAIN; 00591 return 0; 00592 } 00593 00594 // Data loss. 00595 if (status == PGM_IO_STATUS_RESET) { 00596 00597 struct pgm_sk_buff_t* skb = pgm_msgv [0].msgv_skb [0]; 00598 00599 // Save lost data TSI. 00600 *tsi_ = &skb->tsi; 00601 nbytes_rec = 0; 00602 00603 // In case of dala loss -1 is returned. 00604 errno = EINVAL; 00605 pgm_free_skb (skb); 00606 return -1; 00607 } 00608 00609 zmq_assert (status == PGM_IO_STATUS_NORMAL); 00610 } 00611 else 00612 { 00613 zmq_assert (pgm_msgv_processed <= pgm_msgv_len); 00614 } 00615 00616 // Zero byte payloads are valid in PGM, but not 0MQ protocol. 00617 zmq_assert (nbytes_rec > 0); 00618 00619 // Only one APDU per pgm_msgv_t structure is allowed. 00620 zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); 00621 00622 struct pgm_sk_buff_t* skb = 00623 pgm_msgv [pgm_msgv_processed].msgv_skb [0]; 00624 00625 // Take pointers from pgm_msgv_t structure. 00626 *raw_data_ = skb->data; 00627 raw_data_len = skb->len; 00628 00629 // Save current TSI. 00630 *tsi_ = &skb->tsi; 00631 00632 // Move the the next pgm_msgv_t structure. 00633 pgm_msgv_processed++; 00634 zmq_assert (pgm_msgv_processed <= pgm_msgv_len); 00635 nbytes_processed +=raw_data_len; 00636 00637 return raw_data_len; 00638 } 00639 00640 void zmq::pgm_socket_t::process_upstream () 00641 { 00642 pgm_msgv_t dummy_msg; 00643 00644 size_t dummy_bytes = 0; 00645 pgm_error_t *pgm_error = NULL; 00646 00647 const int status = pgm_recvmsgv (sock, &dummy_msg, 00648 1, MSG_ERRQUEUE, &dummy_bytes, &pgm_error); 00649 00650 // Invalid parameters. 00651 zmq_assert (status != PGM_IO_STATUS_ERROR); 00652 00653 // No data should be returned. 00654 zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || 00655 status == PGM_IO_STATUS_RATE_LIMITED || 00656 status == PGM_IO_STATUS_WOULD_BLOCK)); 00657 00658 last_rx_status = status; 00659 00660 if (status == PGM_IO_STATUS_TIMER_PENDING) 00661 errno = EBUSY; 00662 else if (status == PGM_IO_STATUS_RATE_LIMITED) 00663 errno = ENOMEM; 00664 else 00665 errno = EAGAIN; 00666 } 00667 00668 int zmq::pgm_socket_t::compute_sqns (int tpdu_) 00669 { 00670 // Convert rate into B/ms. 00671 uint64_t rate = uint64_t (options.rate) / 8; 00672 00673 // Compute the size of the buffer in bytes. 00674 uint64_t size = uint64_t (options.recovery_ivl) * rate; 00675 00676 // Translate the size into number of packets. 00677 uint64_t sqns = size / tpdu_; 00678 00679 // Buffer should be able to hold at least one packet. 00680 if (sqns == 0) 00681 sqns = 1; 00682 00683 return (int) sqns; 00684 } 00685 00686 #endif 00687