libzmq master
The Intelligent Transport Layer

pgm_receiver.hpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2010-2011 Miru Limited
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
00024 #define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
00025 
00026 #include "platform.hpp"
00027 
00028 #if defined ZMQ_HAVE_OPENPGM
00029 
00030 #ifdef ZMQ_HAVE_WINDOWS
00031 #include "windows.hpp"
00032 #endif
00033 
00034 #include <map>
00035 #include <algorithm>
00036 
00037 #include "io_object.hpp"
00038 #include "i_engine.hpp"
00039 #include "options.hpp"
00040 #include "decoder.hpp"
00041 #include "pgm_socket.hpp"
00042 
00043 namespace zmq
00044 {
00045 
00046     class pgm_receiver_t : public io_object_t, public i_engine
00047     {
00048     
00049     public:
00050 
00051         pgm_receiver_t (class io_thread_t *parent_, const options_t &options_);
00052         ~pgm_receiver_t ();
00053 
00054         int init (bool udp_encapsulation_, const char *network_);
00055 
00056         //  i_engine interface implementation.
00057         void plug (class io_thread_t *io_thread_,
00058             class session_base_t *session_);
00059         void unplug ();
00060         void terminate ();
00061         void activate_in ();
00062         void activate_out ();
00063 
00064         //  i_poll_events interface implementation.
00065         void in_event ();
00066         void timer_event (int token);
00067 
00068     private:
00069 
00070         //  PGM is not able to move subscriptions upstream. Thus, drop all
00071         //  the pending subscriptions.
00072         void drop_subscriptions ();
00073 
00074         //  RX timeout timer ID.
00075         enum {rx_timer_id = 0xa1};
00076 
00077         //  RX timer is running.
00078         bool has_rx_timer;
00079 
00080         //  If joined is true we are already getting messages from the peer.
00081         //  It it's false, we are getting data but still we haven't seen
00082         //  beginning of a message.
00083         struct peer_info_t
00084         {
00085             bool joined;
00086             decoder_t *decoder;
00087         };
00088 
00089         struct tsi_comp
00090         {
00091             bool operator () (const pgm_tsi_t &ltsi,
00092                 const pgm_tsi_t &rtsi) const
00093             {
00094                 uint32_t ll[2], rl[2];
00095                 memcpy (ll, &ltsi, sizeof (ll));
00096                 memcpy (rl, &rtsi, sizeof (rl));
00097                 return (ll[0] < rl[0]) || (ll[0] == rl[0] && ll[1] < rl[1]);
00098             }
00099         };
00100 
00101         typedef std::map <pgm_tsi_t, peer_info_t, tsi_comp> peers_t;
00102         peers_t peers;
00103 
00104         //  PGM socket.
00105         pgm_socket_t pgm_socket;
00106 
00107         //  Socket options.
00108         options_t options;
00109 
00110         //  Associated session.
00111         class session_base_t *session;
00112 
00113         //  Most recently used decoder.
00114         decoder_t *mru_decoder;
00115 
00116         //  Number of bytes not consumed by the decoder due to pipe overflow.
00117         size_t pending_bytes;
00118 
00119         //  Pointer to data still waiting to be processed by the decoder.
00120         unsigned char *pending_ptr;
00121 
00122         //  Poll handle associated with PGM socket.
00123         handle_t socket_handle;
00124 
00125         //  Poll handle associated with engine PGM waiting pipe.
00126         handle_t pipe_handle;
00127 
00128         pgm_receiver_t (const pgm_receiver_t&);
00129         const pgm_receiver_t &operator = (const pgm_receiver_t&);
00130     };
00131 
00132 }
00133 
00134 #endif
00135 
00136 #endif
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines