![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2010-2011 Miru Limited 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__ 00024 #define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__ 00025 00026 #include "platform.hpp" 00027 00028 #if defined ZMQ_HAVE_OPENPGM 00029 00030 #ifdef ZMQ_HAVE_WINDOWS 00031 #include "windows.hpp" 00032 #endif 00033 00034 #include <map> 00035 #include <algorithm> 00036 00037 #include "io_object.hpp" 00038 #include "i_engine.hpp" 00039 #include "options.hpp" 00040 #include "decoder.hpp" 00041 #include "pgm_socket.hpp" 00042 00043 namespace zmq 00044 { 00045 00046 class pgm_receiver_t : public io_object_t, public i_engine 00047 { 00048 00049 public: 00050 00051 pgm_receiver_t (class io_thread_t *parent_, const options_t &options_); 00052 ~pgm_receiver_t (); 00053 00054 int init (bool udp_encapsulation_, const char *network_); 00055 00056 // i_engine interface implementation. 00057 void plug (class io_thread_t *io_thread_, 00058 class session_base_t *session_); 00059 void unplug (); 00060 void terminate (); 00061 void activate_in (); 00062 void activate_out (); 00063 00064 // i_poll_events interface implementation. 00065 void in_event (); 00066 void timer_event (int token); 00067 00068 private: 00069 00070 // PGM is not able to move subscriptions upstream. Thus, drop all 00071 // the pending subscriptions. 00072 void drop_subscriptions (); 00073 00074 // RX timeout timer ID. 00075 enum {rx_timer_id = 0xa1}; 00076 00077 // RX timer is running. 00078 bool has_rx_timer; 00079 00080 // If joined is true we are already getting messages from the peer. 00081 // It it's false, we are getting data but still we haven't seen 00082 // beginning of a message. 00083 struct peer_info_t 00084 { 00085 bool joined; 00086 decoder_t *decoder; 00087 }; 00088 00089 struct tsi_comp 00090 { 00091 bool operator () (const pgm_tsi_t <si, 00092 const pgm_tsi_t &rtsi) const 00093 { 00094 uint32_t ll[2], rl[2]; 00095 memcpy (ll, <si, sizeof (ll)); 00096 memcpy (rl, &rtsi, sizeof (rl)); 00097 return (ll[0] < rl[0]) || (ll[0] == rl[0] && ll[1] < rl[1]); 00098 } 00099 }; 00100 00101 typedef std::map <pgm_tsi_t, peer_info_t, tsi_comp> peers_t; 00102 peers_t peers; 00103 00104 // PGM socket. 00105 pgm_socket_t pgm_socket; 00106 00107 // Socket options. 00108 options_t options; 00109 00110 // Associated session. 00111 class session_base_t *session; 00112 00113 // Most recently used decoder. 00114 decoder_t *mru_decoder; 00115 00116 // Number of bytes not consumed by the decoder due to pipe overflow. 00117 size_t pending_bytes; 00118 00119 // Pointer to data still waiting to be processed by the decoder. 00120 unsigned char *pending_ptr; 00121 00122 // Poll handle associated with PGM socket. 00123 handle_t socket_handle; 00124 00125 // Poll handle associated with engine PGM waiting pipe. 00126 handle_t pipe_handle; 00127 00128 pgm_receiver_t (const pgm_receiver_t&); 00129 const pgm_receiver_t &operator = (const pgm_receiver_t&); 00130 }; 00131 00132 } 00133 00134 #endif 00135 00136 #endif