libzmq master
The Intelligent Transport Layer

pgm_sender.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2010-2011 Miru Limited
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include "platform.hpp"
00024 
00025 #if defined ZMQ_HAVE_OPENPGM
00026 
00027 #ifdef ZMQ_HAVE_WINDOWS
00028 #include "windows.hpp"
00029 #endif
00030 
00031 #include <stdlib.h>
00032 
00033 #include "io_thread.hpp"
00034 #include "pgm_sender.hpp"
00035 #include "session_base.hpp"
00036 #include "err.hpp"
00037 #include "wire.hpp"
00038 #include "stdint.hpp"
00039 
00040 zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, 
00041       const options_t &options_) :
00042     io_object_t (parent_),
00043     has_tx_timer (false),
00044     has_rx_timer (false),
00045     encoder (0),
00046     pgm_socket (false, options_),
00047     options (options_),
00048     out_buffer (NULL),
00049     out_buffer_size (0),
00050     write_size (0)
00051 {
00052 }
00053 
00054 int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
00055 {
00056     int rc = pgm_socket.init (udp_encapsulation_, network_);
00057     if (rc != 0)
00058         return rc;
00059 
00060     out_buffer_size = pgm_socket.get_max_tsdu_size ();
00061     out_buffer = (unsigned char*) malloc (out_buffer_size);
00062     alloc_assert (out_buffer);
00063 
00064     return rc;
00065 }
00066 
00067 void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
00068 {
00069     //  Alocate 2 fds for PGM socket.
00070     fd_t downlink_socket_fd = retired_fd;
00071     fd_t uplink_socket_fd = retired_fd;
00072     fd_t rdata_notify_fd = retired_fd;
00073     fd_t pending_notify_fd = retired_fd;
00074 
00075     encoder.set_session (session_);
00076 
00077     //  Fill fds from PGM transport and add them to the poller.
00078     pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
00079         &rdata_notify_fd, &pending_notify_fd);
00080 
00081     handle = add_fd (downlink_socket_fd);
00082     uplink_handle = add_fd (uplink_socket_fd);
00083     rdata_notify_handle = add_fd (rdata_notify_fd);   
00084     pending_notify_handle = add_fd (pending_notify_fd);
00085 
00086     //  Set POLLIN. We wont never want to stop polling for uplink = we never
00087     //  want to stop porocess NAKs.
00088     set_pollin (uplink_handle);
00089     set_pollin (rdata_notify_handle);
00090     set_pollin (pending_notify_handle);
00091 
00092     //  Set POLLOUT for downlink_socket_handle.
00093     set_pollout (handle);
00094 
00095     //  PGM is not able to pass subscriptions upstream, thus we have no idea
00096     //  what messages are peers interested in. Because of that we have to
00097     //  subscribe for all the messages.
00098     msg_t msg;
00099     msg.init_size (1);
00100     *(unsigned char*) msg.data () = 1;
00101     bool ok = session_->write (&msg);
00102     zmq_assert (ok);
00103     session_->flush ();
00104 }
00105 
00106 void zmq::pgm_sender_t::unplug ()
00107 {
00108     if (has_rx_timer) {
00109         cancel_timer (rx_timer_id);
00110         has_rx_timer = false;
00111     }
00112 
00113     if (has_tx_timer) {
00114         cancel_timer (tx_timer_id);
00115         has_tx_timer = false;
00116     }
00117 
00118     rm_fd (handle);
00119     rm_fd (uplink_handle);
00120     rm_fd (rdata_notify_handle);
00121     rm_fd (pending_notify_handle);
00122     encoder.set_session (NULL);
00123 }
00124 
00125 void zmq::pgm_sender_t::terminate ()
00126 {
00127     unplug ();
00128     delete this;
00129 }
00130 
00131 void zmq::pgm_sender_t::activate_out ()
00132 {
00133     set_pollout (handle);
00134     out_event ();
00135 }
00136 
00137 void zmq::pgm_sender_t::activate_in ()
00138 {
00139     zmq_assert (false);
00140 }
00141 
00142 zmq::pgm_sender_t::~pgm_sender_t ()
00143 {
00144     if (out_buffer) {
00145         free (out_buffer);
00146         out_buffer = NULL;
00147     }
00148 }
00149 
00150 void zmq::pgm_sender_t::in_event ()
00151 {
00152     if (has_rx_timer) {
00153         cancel_timer (rx_timer_id);
00154         has_rx_timer = false;
00155     }
00156 
00157     //  In-event on sender side means NAK or SPMR receiving from some peer.
00158     pgm_socket.process_upstream ();
00159     if (errno == ENOMEM || errno == EBUSY) {
00160         const long timeout = pgm_socket.get_rx_timeout ();
00161         add_timer (timeout, rx_timer_id);
00162         has_rx_timer = true;
00163     }
00164 }
00165 
00166 void zmq::pgm_sender_t::out_event ()
00167 {
00168     //  POLLOUT event from send socket. If write buffer is empty, 
00169     //  try to read new data from the encoder.
00170     if (write_size == 0) {
00171 
00172         //  First two bytes (sizeof uint16_t) are used to store message 
00173         //  offset in following steps. Note that by passing our buffer to
00174         //  the get data function we prevent it from returning its own buffer.
00175         unsigned char *bf = out_buffer + sizeof (uint16_t);
00176         size_t bfsz = out_buffer_size - sizeof (uint16_t);
00177         int offset = -1;
00178         encoder.get_data (&bf, &bfsz, &offset);
00179 
00180         //  If there are no data to write stop polling for output.
00181         if (!bfsz) {
00182             reset_pollout (handle);
00183             return;
00184         }
00185 
00186         //  Put offset information in the buffer.
00187         write_size = bfsz + sizeof (uint16_t);
00188         put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
00189     }
00190 
00191     if (has_tx_timer) {
00192         cancel_timer (tx_timer_id);
00193         has_tx_timer = false;
00194     }
00195 
00196     //  Send the data.
00197     size_t nbytes = pgm_socket.send (out_buffer, write_size);
00198 
00199     //  We can write either all data or 0 which means rate limit reached.
00200     if (nbytes == write_size) {
00201         write_size = 0;
00202     } else {
00203         zmq_assert (nbytes == 0);
00204 
00205         if (errno == ENOMEM) {
00206             const long timeout = pgm_socket.get_tx_timeout ();
00207             add_timer (timeout, tx_timer_id);
00208             has_tx_timer = true;
00209         } else
00210             zmq_assert (errno == EBUSY);
00211     }
00212 }
00213 
00214 void zmq::pgm_sender_t::timer_event (int token)
00215 {
00216     //  Timer cancels on return by poller_base.
00217     if (token == rx_timer_id) {
00218         has_rx_timer = false;
00219         in_event ();
00220     } else if (token == tx_timer_id) {
00221         has_tx_timer = false;
00222         out_event ();
00223     } else
00224         zmq_assert (false);
00225 }
00226 
00227 #endif
00228 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines