![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2010-2011 Miru Limited 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "platform.hpp" 00024 00025 #if defined ZMQ_HAVE_OPENPGM 00026 00027 #include <new> 00028 00029 #ifdef ZMQ_HAVE_WINDOWS 00030 #include "windows.hpp" 00031 #endif 00032 00033 #include "pgm_receiver.hpp" 00034 #include "session_base.hpp" 00035 #include "stdint.hpp" 00036 #include "wire.hpp" 00037 #include "err.hpp" 00038 00039 zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, 00040 const options_t &options_) : 00041 io_object_t (parent_), 00042 has_rx_timer (false), 00043 pgm_socket (true, options_), 00044 options (options_), 00045 session (NULL), 00046 mru_decoder (NULL), 00047 pending_bytes (0) 00048 { 00049 } 00050 00051 zmq::pgm_receiver_t::~pgm_receiver_t () 00052 { 00053 // Destructor should not be called before unplug. 00054 zmq_assert (peers.empty ()); 00055 } 00056 00057 int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) 00058 { 00059 return pgm_socket.init (udp_encapsulation_, network_); 00060 } 00061 00062 void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, 00063 session_base_t *session_) 00064 { 00065 // Retrieve PGM fds and start polling. 00066 fd_t socket_fd = retired_fd; 00067 fd_t waiting_pipe_fd = retired_fd; 00068 pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd); 00069 socket_handle = add_fd (socket_fd); 00070 pipe_handle = add_fd (waiting_pipe_fd); 00071 set_pollin (pipe_handle); 00072 set_pollin (socket_handle); 00073 00074 session = session_; 00075 00076 // If there are any subscriptions already queued in the session, drop them. 00077 drop_subscriptions (); 00078 } 00079 00080 void zmq::pgm_receiver_t::unplug () 00081 { 00082 // Delete decoders. 00083 for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) { 00084 if (it->second.decoder != NULL) 00085 delete it->second.decoder; 00086 } 00087 peers.clear (); 00088 00089 mru_decoder = NULL; 00090 pending_bytes = 0; 00091 00092 if (has_rx_timer) { 00093 cancel_timer (rx_timer_id); 00094 has_rx_timer = false; 00095 } 00096 00097 rm_fd (socket_handle); 00098 rm_fd (pipe_handle); 00099 00100 session = NULL; 00101 } 00102 00103 void zmq::pgm_receiver_t::terminate () 00104 { 00105 unplug (); 00106 delete this; 00107 } 00108 00109 void zmq::pgm_receiver_t::activate_out () 00110 { 00111 drop_subscriptions (); 00112 } 00113 00114 void zmq::pgm_receiver_t::activate_in () 00115 { 00116 // It is possible that the most recently used decoder 00117 // processed the whole buffer but failed to write 00118 // the last message into the pipe. 00119 if (pending_bytes == 0) { 00120 if (mru_decoder != NULL) 00121 mru_decoder->process_buffer (NULL, 0); 00122 return; 00123 } 00124 00125 zmq_assert (mru_decoder != NULL); 00126 zmq_assert (pending_ptr != NULL); 00127 00128 // Ask the decoder to process remaining data. 00129 size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes); 00130 pending_bytes -= n; 00131 00132 if (pending_bytes > 0) 00133 return; 00134 00135 // Resume polling. 00136 set_pollin (pipe_handle); 00137 set_pollin (socket_handle); 00138 00139 in_event (); 00140 } 00141 00142 void zmq::pgm_receiver_t::in_event () 00143 { 00144 // Read data from the underlying pgm_socket. 00145 unsigned char *data = NULL; 00146 const pgm_tsi_t *tsi = NULL; 00147 00148 zmq_assert (pending_bytes == 0); 00149 00150 if (has_rx_timer) { 00151 cancel_timer (rx_timer_id); 00152 has_rx_timer = false; 00153 } 00154 00155 // TODO: This loop can effectively block other engines in the same I/O 00156 // thread in the case of high load. 00157 while (true) { 00158 00159 // Get new batch of data. 00160 // Note the workaround made not to break strict-aliasing rules. 00161 void *tmp = NULL; 00162 ssize_t received = pgm_socket.receive (&tmp, &tsi); 00163 data = (unsigned char*) tmp; 00164 00165 // No data to process. This may happen if the packet received is 00166 // neither ODATA nor ODATA. 00167 if (received == 0) { 00168 if (errno == ENOMEM || errno == EBUSY) { 00169 const long timeout = pgm_socket.get_rx_timeout (); 00170 add_timer (timeout, rx_timer_id); 00171 has_rx_timer = true; 00172 } 00173 break; 00174 } 00175 00176 // Find the peer based on its TSI. 00177 peers_t::iterator it = peers.find (*tsi); 00178 00179 // Data loss. Delete decoder and mark the peer as disjoint. 00180 if (received == -1) { 00181 if (it != peers.end ()) { 00182 it->second.joined = false; 00183 if (it->second.decoder == mru_decoder) 00184 mru_decoder = NULL; 00185 if (it->second.decoder != NULL) { 00186 delete it->second.decoder; 00187 it->second.decoder = NULL; 00188 } 00189 } 00190 break; 00191 } 00192 00193 // New peer. Add it to the list of know but unjoint peers. 00194 if (it == peers.end ()) { 00195 peer_info_t peer_info = {false, NULL}; 00196 it = peers.insert (peers_t::value_type (*tsi, peer_info)).first; 00197 } 00198 00199 // Read the offset of the fist message in the current packet. 00200 zmq_assert ((size_t) received >= sizeof (uint16_t)); 00201 uint16_t offset = get_uint16 (data); 00202 data += sizeof (uint16_t); 00203 received -= sizeof (uint16_t); 00204 00205 // Join the stream if needed. 00206 if (!it->second.joined) { 00207 00208 // There is no beginning of the message in current packet. 00209 // Ignore the data. 00210 if (offset == 0xffff) 00211 continue; 00212 00213 zmq_assert (offset <= received); 00214 zmq_assert (it->second.decoder == NULL); 00215 00216 // We have to move data to the begining of the first message. 00217 data += offset; 00218 received -= offset; 00219 00220 // Mark the stream as joined. 00221 it->second.joined = true; 00222 00223 // Create and connect decoder for the peer. 00224 it->second.decoder = new (std::nothrow) decoder_t (0, 00225 options.maxmsgsize); 00226 alloc_assert (it->second.decoder); 00227 it->second.decoder->set_session (session); 00228 } 00229 00230 mru_decoder = it->second.decoder; 00231 00232 // Push all the data to the decoder. 00233 ssize_t processed = it->second.decoder->process_buffer (data, received); 00234 if (processed < received) { 00235 // Save some state so we can resume the decoding process later. 00236 pending_bytes = received - processed; 00237 pending_ptr = data + processed; 00238 // Stop polling. 00239 reset_pollin (pipe_handle); 00240 reset_pollin (socket_handle); 00241 00242 // Reset outstanding timer. 00243 if (has_rx_timer) { 00244 cancel_timer (rx_timer_id); 00245 has_rx_timer = false; 00246 } 00247 00248 break; 00249 } 00250 } 00251 00252 // Flush any messages decoder may have produced. 00253 session->flush (); 00254 } 00255 00256 void zmq::pgm_receiver_t::timer_event (int token) 00257 { 00258 zmq_assert (token == rx_timer_id); 00259 00260 // Timer cancels on return by poller_base. 00261 has_rx_timer = false; 00262 in_event (); 00263 } 00264 00265 void zmq::pgm_receiver_t::drop_subscriptions () 00266 { 00267 msg_t msg; 00268 while (session->read (&msg)) 00269 msg.close (); 00270 } 00271 00272 #endif 00273