![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2010-2011 Miru Limited 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "platform.hpp" 00024 00025 #if defined ZMQ_HAVE_OPENPGM 00026 00027 #ifdef ZMQ_HAVE_WINDOWS 00028 #include "windows.hpp" 00029 #endif 00030 00031 #include <stdlib.h> 00032 00033 #include "io_thread.hpp" 00034 #include "pgm_sender.hpp" 00035 #include "session_base.hpp" 00036 #include "err.hpp" 00037 #include "wire.hpp" 00038 #include "stdint.hpp" 00039 00040 zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, 00041 const options_t &options_) : 00042 io_object_t (parent_), 00043 has_tx_timer (false), 00044 has_rx_timer (false), 00045 encoder (0), 00046 pgm_socket (false, options_), 00047 options (options_), 00048 out_buffer (NULL), 00049 out_buffer_size (0), 00050 write_size (0) 00051 { 00052 } 00053 00054 int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) 00055 { 00056 int rc = pgm_socket.init (udp_encapsulation_, network_); 00057 if (rc != 0) 00058 return rc; 00059 00060 out_buffer_size = pgm_socket.get_max_tsdu_size (); 00061 out_buffer = (unsigned char*) malloc (out_buffer_size); 00062 alloc_assert (out_buffer); 00063 00064 return rc; 00065 } 00066 00067 void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) 00068 { 00069 // Alocate 2 fds for PGM socket. 00070 fd_t downlink_socket_fd = retired_fd; 00071 fd_t uplink_socket_fd = retired_fd; 00072 fd_t rdata_notify_fd = retired_fd; 00073 fd_t pending_notify_fd = retired_fd; 00074 00075 encoder.set_session (session_); 00076 00077 // Fill fds from PGM transport and add them to the poller. 00078 pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, 00079 &rdata_notify_fd, &pending_notify_fd); 00080 00081 handle = add_fd (downlink_socket_fd); 00082 uplink_handle = add_fd (uplink_socket_fd); 00083 rdata_notify_handle = add_fd (rdata_notify_fd); 00084 pending_notify_handle = add_fd (pending_notify_fd); 00085 00086 // Set POLLIN. We wont never want to stop polling for uplink = we never 00087 // want to stop porocess NAKs. 00088 set_pollin (uplink_handle); 00089 set_pollin (rdata_notify_handle); 00090 set_pollin (pending_notify_handle); 00091 00092 // Set POLLOUT for downlink_socket_handle. 00093 set_pollout (handle); 00094 00095 // PGM is not able to pass subscriptions upstream, thus we have no idea 00096 // what messages are peers interested in. Because of that we have to 00097 // subscribe for all the messages. 00098 msg_t msg; 00099 msg.init_size (1); 00100 *(unsigned char*) msg.data () = 1; 00101 bool ok = session_->write (&msg); 00102 zmq_assert (ok); 00103 session_->flush (); 00104 } 00105 00106 void zmq::pgm_sender_t::unplug () 00107 { 00108 if (has_rx_timer) { 00109 cancel_timer (rx_timer_id); 00110 has_rx_timer = false; 00111 } 00112 00113 if (has_tx_timer) { 00114 cancel_timer (tx_timer_id); 00115 has_tx_timer = false; 00116 } 00117 00118 rm_fd (handle); 00119 rm_fd (uplink_handle); 00120 rm_fd (rdata_notify_handle); 00121 rm_fd (pending_notify_handle); 00122 encoder.set_session (NULL); 00123 } 00124 00125 void zmq::pgm_sender_t::terminate () 00126 { 00127 unplug (); 00128 delete this; 00129 } 00130 00131 void zmq::pgm_sender_t::activate_out () 00132 { 00133 set_pollout (handle); 00134 out_event (); 00135 } 00136 00137 void zmq::pgm_sender_t::activate_in () 00138 { 00139 zmq_assert (false); 00140 } 00141 00142 zmq::pgm_sender_t::~pgm_sender_t () 00143 { 00144 if (out_buffer) { 00145 free (out_buffer); 00146 out_buffer = NULL; 00147 } 00148 } 00149 00150 void zmq::pgm_sender_t::in_event () 00151 { 00152 if (has_rx_timer) { 00153 cancel_timer (rx_timer_id); 00154 has_rx_timer = false; 00155 } 00156 00157 // In-event on sender side means NAK or SPMR receiving from some peer. 00158 pgm_socket.process_upstream (); 00159 if (errno == ENOMEM || errno == EBUSY) { 00160 const long timeout = pgm_socket.get_rx_timeout (); 00161 add_timer (timeout, rx_timer_id); 00162 has_rx_timer = true; 00163 } 00164 } 00165 00166 void zmq::pgm_sender_t::out_event () 00167 { 00168 // POLLOUT event from send socket. If write buffer is empty, 00169 // try to read new data from the encoder. 00170 if (write_size == 0) { 00171 00172 // First two bytes (sizeof uint16_t) are used to store message 00173 // offset in following steps. Note that by passing our buffer to 00174 // the get data function we prevent it from returning its own buffer. 00175 unsigned char *bf = out_buffer + sizeof (uint16_t); 00176 size_t bfsz = out_buffer_size - sizeof (uint16_t); 00177 int offset = -1; 00178 encoder.get_data (&bf, &bfsz, &offset); 00179 00180 // If there are no data to write stop polling for output. 00181 if (!bfsz) { 00182 reset_pollout (handle); 00183 return; 00184 } 00185 00186 // Put offset information in the buffer. 00187 write_size = bfsz + sizeof (uint16_t); 00188 put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); 00189 } 00190 00191 if (has_tx_timer) { 00192 cancel_timer (tx_timer_id); 00193 has_tx_timer = false; 00194 } 00195 00196 // Send the data. 00197 size_t nbytes = pgm_socket.send (out_buffer, write_size); 00198 00199 // We can write either all data or 0 which means rate limit reached. 00200 if (nbytes == write_size) { 00201 write_size = 0; 00202 } else { 00203 zmq_assert (nbytes == 0); 00204 00205 if (errno == ENOMEM) { 00206 const long timeout = pgm_socket.get_tx_timeout (); 00207 add_timer (timeout, tx_timer_id); 00208 has_tx_timer = true; 00209 } else 00210 zmq_assert (errno == EBUSY); 00211 } 00212 } 00213 00214 void zmq::pgm_sender_t::timer_event (int token) 00215 { 00216 // Timer cancels on return by poller_base. 00217 if (token == rx_timer_id) { 00218 has_rx_timer = false; 00219 in_event (); 00220 } else if (token == tx_timer_id) { 00221 has_tx_timer = false; 00222 out_event (); 00223 } else 00224 zmq_assert (false); 00225 } 00226 00227 #endif 00228