libzmq master
The Intelligent Transport Layer

pgm_receiver.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2010-2011 Miru Limited
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include "platform.hpp"
00024 
00025 #if defined ZMQ_HAVE_OPENPGM
00026 
00027 #include <new>
00028 
00029 #ifdef ZMQ_HAVE_WINDOWS
00030 #include "windows.hpp"
00031 #endif
00032 
00033 #include "pgm_receiver.hpp"
00034 #include "session_base.hpp"
00035 #include "stdint.hpp"
00036 #include "wire.hpp"
00037 #include "err.hpp"
00038 
00039 zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_, 
00040       const options_t &options_) :
00041     io_object_t (parent_),
00042     has_rx_timer (false),
00043     pgm_socket (true, options_),
00044     options (options_),
00045     session (NULL),
00046     mru_decoder (NULL),
00047     pending_bytes (0)
00048 {
00049 }
00050 
00051 zmq::pgm_receiver_t::~pgm_receiver_t ()
00052 {
00053     //  Destructor should not be called before unplug.
00054     zmq_assert (peers.empty ());
00055 }
00056 
00057 int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_)
00058 {
00059     return pgm_socket.init (udp_encapsulation_, network_);
00060 }
00061 
00062 void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
00063     session_base_t *session_)
00064 {
00065     //  Retrieve PGM fds and start polling.
00066     fd_t socket_fd = retired_fd;
00067     fd_t waiting_pipe_fd = retired_fd;
00068     pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
00069     socket_handle = add_fd (socket_fd);
00070     pipe_handle = add_fd (waiting_pipe_fd);
00071     set_pollin (pipe_handle);
00072     set_pollin (socket_handle);
00073 
00074     session = session_;
00075 
00076     //  If there are any subscriptions already queued in the session, drop them.
00077     drop_subscriptions ();
00078 }
00079 
00080 void zmq::pgm_receiver_t::unplug ()
00081 {
00082     //  Delete decoders.
00083     for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
00084         if (it->second.decoder != NULL)
00085             delete it->second.decoder;
00086     }
00087     peers.clear ();
00088 
00089     mru_decoder = NULL;
00090     pending_bytes = 0;
00091 
00092     if (has_rx_timer) {
00093         cancel_timer (rx_timer_id);
00094         has_rx_timer = false;
00095     }
00096 
00097     rm_fd (socket_handle);
00098     rm_fd (pipe_handle);
00099 
00100     session = NULL;
00101 }
00102 
00103 void zmq::pgm_receiver_t::terminate ()
00104 {
00105     unplug ();
00106     delete this;
00107 }
00108 
00109 void zmq::pgm_receiver_t::activate_out ()
00110 {
00111     drop_subscriptions ();
00112 }
00113 
00114 void zmq::pgm_receiver_t::activate_in ()
00115 {
00116     //  It is possible that the most recently used decoder
00117     //  processed the whole buffer but failed to write
00118     //  the last message into the pipe.
00119     if (pending_bytes == 0) {
00120         if (mru_decoder != NULL)
00121             mru_decoder->process_buffer (NULL, 0);
00122         return;
00123     }
00124 
00125     zmq_assert (mru_decoder != NULL);
00126     zmq_assert (pending_ptr != NULL);
00127 
00128     //  Ask the decoder to process remaining data.
00129     size_t n = mru_decoder->process_buffer (pending_ptr, pending_bytes);
00130     pending_bytes -= n;
00131 
00132     if (pending_bytes > 0)
00133         return;
00134 
00135     //  Resume polling.
00136     set_pollin (pipe_handle);
00137     set_pollin (socket_handle);
00138 
00139     in_event ();
00140 }
00141 
00142 void zmq::pgm_receiver_t::in_event ()
00143 {
00144     // Read data from the underlying pgm_socket.
00145     unsigned char *data = NULL;
00146     const pgm_tsi_t *tsi = NULL;
00147 
00148     zmq_assert (pending_bytes == 0);
00149 
00150     if (has_rx_timer) {
00151         cancel_timer (rx_timer_id);
00152         has_rx_timer = false;
00153     }
00154 
00155     //  TODO: This loop can effectively block other engines in the same I/O
00156     //  thread in the case of high load.
00157     while (true) {
00158 
00159         //  Get new batch of data.
00160         //  Note the workaround made not to break strict-aliasing rules.
00161         void *tmp = NULL;
00162         ssize_t received = pgm_socket.receive (&tmp, &tsi);
00163         data = (unsigned char*) tmp;
00164 
00165         //  No data to process. This may happen if the packet received is
00166         //  neither ODATA nor ODATA.
00167         if (received == 0) {
00168             if (errno == ENOMEM || errno == EBUSY) {
00169                 const long timeout = pgm_socket.get_rx_timeout ();
00170                 add_timer (timeout, rx_timer_id);
00171                 has_rx_timer = true;
00172             }
00173             break;
00174         }
00175 
00176         //  Find the peer based on its TSI.
00177         peers_t::iterator it = peers.find (*tsi);
00178 
00179         //  Data loss. Delete decoder and mark the peer as disjoint.
00180         if (received == -1) {
00181             if (it != peers.end ()) {
00182                 it->second.joined = false;
00183                 if (it->second.decoder == mru_decoder)
00184                     mru_decoder = NULL;
00185                 if (it->second.decoder != NULL) {
00186                     delete it->second.decoder;
00187                     it->second.decoder = NULL;
00188                 }
00189             }
00190             break;
00191         }
00192 
00193         //  New peer. Add it to the list of know but unjoint peers.
00194         if (it == peers.end ()) {
00195             peer_info_t peer_info = {false, NULL};
00196             it = peers.insert (peers_t::value_type (*tsi, peer_info)).first;
00197         }
00198 
00199         //  Read the offset of the fist message in the current packet.
00200         zmq_assert ((size_t) received >= sizeof (uint16_t));
00201         uint16_t offset = get_uint16 (data);
00202         data += sizeof (uint16_t);
00203         received -= sizeof (uint16_t);
00204 
00205         //  Join the stream if needed.
00206         if (!it->second.joined) {
00207 
00208             //  There is no beginning of the message in current packet.
00209             //  Ignore the data.
00210             if (offset == 0xffff)
00211                 continue;
00212 
00213             zmq_assert (offset <= received);
00214             zmq_assert (it->second.decoder == NULL);
00215 
00216             //  We have to move data to the begining of the first message.
00217             data += offset;
00218             received -= offset;
00219 
00220             //  Mark the stream as joined.
00221             it->second.joined = true;
00222 
00223             //  Create and connect decoder for the peer.
00224             it->second.decoder = new (std::nothrow) decoder_t (0,
00225                 options.maxmsgsize);
00226             alloc_assert (it->second.decoder);
00227             it->second.decoder->set_session (session);
00228         }
00229 
00230         mru_decoder = it->second.decoder;
00231 
00232         //  Push all the data to the decoder.
00233         ssize_t processed = it->second.decoder->process_buffer (data, received);
00234         if (processed < received) {
00235             //  Save some state so we can resume the decoding process later.
00236             pending_bytes = received - processed;
00237             pending_ptr = data + processed;
00238             //  Stop polling.
00239             reset_pollin (pipe_handle);
00240             reset_pollin (socket_handle);
00241 
00242             //  Reset outstanding timer.
00243             if (has_rx_timer) {
00244                 cancel_timer (rx_timer_id);
00245                 has_rx_timer = false;
00246             }
00247 
00248             break;
00249         }
00250     }
00251 
00252     //  Flush any messages decoder may have produced.
00253     session->flush ();
00254 }
00255 
00256 void zmq::pgm_receiver_t::timer_event (int token)
00257 {
00258     zmq_assert (token == rx_timer_id);
00259 
00260     //  Timer cancels on return by poller_base.
00261     has_rx_timer = false;
00262     in_event ();
00263 }
00264 
00265 void zmq::pgm_receiver_t::drop_subscriptions ()
00266 {
00267     msg_t msg;
00268     while (session->read (&msg))
00269         msg.close ();
00270 }
00271 
00272 #endif
00273 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines