libzmq master
The Intelligent Transport Layer

session_base.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2011 VMware, Inc.
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include "session_base.hpp"
00024 #include "socket_base.hpp"
00025 #include "i_engine.hpp"
00026 #include "err.hpp"
00027 #include "pipe.hpp"
00028 #include "likely.hpp"
00029 #include "tcp_connecter.hpp"
00030 #include "ipc_connecter.hpp"
00031 #include "pgm_sender.hpp"
00032 #include "pgm_receiver.hpp"
00033 
00034 #include "req.hpp"
00035 #include "xreq.hpp"
00036 #include "rep.hpp"
00037 #include "xrep.hpp"
00038 #include "pub.hpp"
00039 #include "xpub.hpp"
00040 #include "sub.hpp"
00041 #include "xsub.hpp"
00042 #include "push.hpp"
00043 #include "pull.hpp"
00044 #include "pair.hpp"
00045 
00046 zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
00047     bool connect_, class socket_base_t *socket_, const options_t &options_,
00048     const char *protocol_, const char *address_)
00049 {
00050     session_base_t *s = NULL;
00051     switch (options_.type) {
00052     case ZMQ_REQ:
00053         s = new (std::nothrow) req_session_t (io_thread_, connect_,
00054             socket_, options_, protocol_, address_);
00055         break;
00056     case ZMQ_XREQ:
00057         s = new (std::nothrow) xreq_session_t (io_thread_, connect_,
00058             socket_, options_, protocol_, address_);
00059     case ZMQ_REP:
00060         s = new (std::nothrow) rep_session_t (io_thread_, connect_,
00061             socket_, options_, protocol_, address_);
00062         break;
00063     case ZMQ_XREP:
00064         s = new (std::nothrow) xrep_session_t (io_thread_, connect_,
00065             socket_, options_, protocol_, address_);
00066         break;
00067     case ZMQ_PUB:
00068         s = new (std::nothrow) pub_session_t (io_thread_, connect_,
00069             socket_, options_, protocol_, address_);
00070         break;
00071     case ZMQ_XPUB:
00072         s = new (std::nothrow) xpub_session_t (io_thread_, connect_,
00073             socket_, options_, protocol_, address_);
00074         break;
00075     case ZMQ_SUB:
00076         s = new (std::nothrow) sub_session_t (io_thread_, connect_,
00077             socket_, options_, protocol_, address_);
00078         break;
00079     case ZMQ_XSUB:
00080         s = new (std::nothrow) xsub_session_t (io_thread_, connect_,
00081             socket_, options_, protocol_, address_);
00082         break;
00083     case ZMQ_PUSH:
00084         s = new (std::nothrow) push_session_t (io_thread_, connect_,
00085             socket_, options_, protocol_, address_);
00086         break;
00087     case ZMQ_PULL:
00088         s = new (std::nothrow) pull_session_t (io_thread_, connect_,
00089             socket_, options_, protocol_, address_);
00090         break;
00091     case ZMQ_PAIR:
00092         s = new (std::nothrow) pair_session_t (io_thread_, connect_,
00093             socket_, options_, protocol_, address_);
00094         break;
00095     default:
00096         errno = EINVAL;
00097         return NULL;
00098     }
00099     alloc_assert (s);
00100     return s;
00101 }
00102 
00103 zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
00104       bool connect_, class socket_base_t *socket_, const options_t &options_,
00105       const char *protocol_, const char *address_) :
00106     own_t (io_thread_, options_),
00107     io_object_t (io_thread_),
00108     connect (connect_),
00109     pipe (NULL),
00110     incomplete_in (false),
00111     pending (false),
00112     engine (NULL),
00113     socket (socket_),
00114     io_thread (io_thread_),
00115     has_linger_timer (false),
00116     send_identity (options_.send_identity),
00117     recv_identity (options_.recv_identity)
00118 {
00119     if (protocol_)
00120         protocol = protocol_;
00121     if (address_)
00122         address = address_;
00123 }
00124 
00125 zmq::session_base_t::~session_base_t ()
00126 {
00127     zmq_assert (!pipe);
00128 
00129     //  If there's still a pending linger timer, remove it.
00130     if (has_linger_timer) {
00131         cancel_timer (linger_timer_id);
00132         has_linger_timer = false;
00133     }
00134 
00135     //  Close the engine.
00136     if (engine)
00137         engine->terminate ();
00138 }
00139 
00140 void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
00141 {
00142     zmq_assert (!is_terminating ());
00143     zmq_assert (!pipe);
00144     zmq_assert (pipe_);
00145     pipe = pipe_;
00146     pipe->set_event_sink (this);
00147 }
00148 
00149 int zmq::session_base_t::read (msg_t *msg_)
00150 {
00151     //  First message to send is identity (if required).
00152     if (send_identity) {
00153         zmq_assert (!(msg_->flags () & msg_t::more));
00154         msg_->init_size (options.identity_size);
00155         memcpy (msg_->data (), options.identity, options.identity_size);
00156         send_identity = false;
00157         incomplete_in = false;
00158         return 0;
00159     }
00160 
00161     if (!pipe || !pipe->read (msg_)) {
00162         errno = EAGAIN;
00163         return -1;
00164     }
00165     incomplete_in = msg_->flags () & msg_t::more ? true : false;
00166 
00167     return 0;
00168 }
00169 
00170 int zmq::session_base_t::write (msg_t *msg_)
00171 {
00172     //  First message to receive is identity (if required).
00173     if (recv_identity) {
00174         msg_->set_flags (msg_t::identity);
00175         recv_identity = false;
00176     }
00177 
00178     if (pipe && pipe->write (msg_)) {
00179         int rc = msg_->init ();
00180         errno_assert (rc == 0);
00181         return 0;
00182     }
00183 
00184     errno = EAGAIN;
00185     return -1;
00186 }
00187 
00188 void zmq::session_base_t::flush ()
00189 {
00190     if (pipe)
00191         pipe->flush ();
00192 }
00193 
00194 void zmq::session_base_t::clean_pipes ()
00195 {
00196     if (pipe) {
00197 
00198         //  Get rid of half-processed messages in the out pipe. Flush any
00199         //  unflushed messages upstream.
00200         pipe->rollback ();
00201         pipe->flush ();
00202 
00203         //  Remove any half-read message from the in pipe.
00204         while (incomplete_in) {
00205             msg_t msg;
00206             int rc = msg.init ();
00207             errno_assert (rc == 0);
00208             if (!read (&msg)) {
00209                 zmq_assert (!incomplete_in);
00210                 break;
00211             }
00212             rc = msg.close ();
00213             errno_assert (rc == 0);
00214         }
00215     }
00216 }
00217 
00218 void zmq::session_base_t::terminated (pipe_t *pipe_)
00219 {
00220     //  Drop the reference to the deallocated pipe.
00221     zmq_assert (pipe == pipe_);
00222     pipe = NULL;
00223 
00224     //  If we are waiting for pending messages to be sent, at this point
00225     //  we are sure that there will be no more messages and we can proceed
00226     //  with termination safely.
00227     if (pending)
00228         proceed_with_term ();
00229 }
00230 
00231 void zmq::session_base_t::read_activated (pipe_t *pipe_)
00232 {
00233     zmq_assert (pipe == pipe_);
00234 
00235     if (likely (engine != NULL))
00236         engine->activate_out ();
00237     else
00238         pipe->check_read ();
00239 }
00240 
00241 void zmq::session_base_t::write_activated (pipe_t *pipe_)
00242 {
00243     zmq_assert (pipe == pipe_);
00244 
00245     if (engine)
00246         engine->activate_in ();
00247 }
00248 
00249 void zmq::session_base_t::hiccuped (pipe_t *pipe_)
00250 {
00251     //  Hiccups are always sent from session to socket, not the other
00252     //  way round.
00253     zmq_assert (false);
00254 }
00255 
00256 void zmq::session_base_t::process_plug ()
00257 {
00258     if (connect)
00259         start_connecting (false);
00260 }
00261 
00262 void zmq::session_base_t::process_attach (i_engine *engine_)
00263 {
00264     //  If some other object (e.g. init) notifies us that the connection failed
00265     //  without creating an engine we need to start the reconnection process.
00266     if (!engine_) {
00267         zmq_assert (!engine);
00268         detached ();
00269         return;
00270     }
00271 
00272     //  Create the pipe if it does not exist yet.
00273     if (!pipe && !is_terminating ()) {
00274         object_t *parents [2] = {this, socket};
00275         pipe_t *pipes [2] = {NULL, NULL};
00276         int hwms [2] = {options.rcvhwm, options.sndhwm};
00277         bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
00278         int rc = pipepair (parents, pipes, hwms, delays);
00279         errno_assert (rc == 0);
00280 
00281         //  Plug the local end of the pipe.
00282         pipes [0]->set_event_sink (this);
00283 
00284         //  Remember the local end of the pipe.
00285         zmq_assert (!pipe);
00286         pipe = pipes [0];
00287 
00288         //  Ask socket to plug into the remote end of the pipe.
00289         send_bind (socket, pipes [1]);
00290     }
00291 
00292     //  Plug in the engine.
00293     zmq_assert (!engine);
00294     engine = engine_;
00295     engine->plug (io_thread, this);
00296 }
00297 
00298 void zmq::session_base_t::detach ()
00299 {
00300     //  Engine is dead. Let's forget about it.
00301     engine = NULL;
00302 
00303     //  Remove any half-done messages from the pipes.
00304     clean_pipes ();
00305 
00306     //  Send the event to the derived class.
00307     detached ();
00308 
00309     //  Just in case there's only a delimiter in the pipe.
00310     if (pipe)
00311         pipe->check_read ();
00312 }
00313 
00314 void zmq::session_base_t::process_term (int linger_)
00315 {
00316     zmq_assert (!pending);
00317 
00318     //  If the termination of the pipe happens before the term command is
00319     //  delivered there's nothing much to do. We can proceed with the
00320     //  stadard termination immediately.
00321     if (!pipe) {
00322         proceed_with_term ();
00323         return;
00324     }
00325 
00326     pending = true;
00327 
00328     //  If there's finite linger value, delay the termination.
00329     //  If linger is infinite (negative) we don't even have to set
00330     //  the timer.
00331     if (linger_ > 0) {
00332         zmq_assert (!has_linger_timer);
00333         add_timer (linger_, linger_timer_id);
00334         has_linger_timer = true;
00335     }
00336 
00337     //  Start pipe termination process. Delay the termination till all messages
00338     //  are processed in case the linger time is non-zero.
00339     pipe->terminate (linger_ != 0);
00340 
00341     //  TODO: Should this go into pipe_t::terminate ?
00342     //  In case there's no engine and there's only delimiter in the
00343     //  pipe it wouldn't be ever read. Thus we check for it explicitly.
00344     pipe->check_read ();
00345 }
00346 
00347 void zmq::session_base_t::proceed_with_term ()
00348 {
00349     //  The pending phase have just ended.
00350     pending = false;
00351 
00352     //  Continue with standard termination.
00353     own_t::process_term (0);
00354 }
00355 
00356 void zmq::session_base_t::timer_event (int id_)
00357 {
00358     //  Linger period expired. We can proceed with termination even though
00359     //  there are still pending messages to be sent.
00360     zmq_assert (id_ == linger_timer_id);
00361     has_linger_timer = false;
00362 
00363     //  Ask pipe to terminate even though there may be pending messages in it.
00364     zmq_assert (pipe);
00365     pipe->terminate (false);
00366 }
00367 
00368 void zmq::session_base_t::detached ()
00369 {
00370     //  Transient session self-destructs after peer disconnects.
00371     if (!connect) {
00372         terminate ();
00373         return;
00374     }
00375 
00376     //  Reconnect.
00377     start_connecting (true);
00378 
00379     //  For subscriber sockets we hiccup the inbound pipe, which will cause
00380     //  the socket object to resend all the subscriptions.
00381     if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
00382         pipe->hiccup ();  
00383 }
00384 
00385 void zmq::session_base_t::start_connecting (bool wait_)
00386 {
00387     zmq_assert (connect);
00388 
00389     //  Choose I/O thread to run connecter in. Given that we are already
00390     //  running in an I/O thread, there must be at least one available.
00391     io_thread_t *io_thread = choose_io_thread (options.affinity);
00392     zmq_assert (io_thread);
00393 
00394     //  Create the connecter object.
00395 
00396     if (protocol == "tcp") {
00397         tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
00398             io_thread, this, options, address.c_str (), wait_);
00399         alloc_assert (connecter);
00400         launch_child (connecter);
00401         return;
00402     }
00403 
00404 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
00405     if (protocol == "ipc") {
00406         ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t (
00407             io_thread, this, options, address.c_str (), wait_);
00408         alloc_assert (connecter);
00409         launch_child (connecter);
00410         return;
00411     }
00412 #endif
00413 
00414 #if defined ZMQ_HAVE_OPENPGM
00415 
00416     //  Both PGM and EPGM transports are using the same infrastructure.
00417     if (protocol == "pgm" || protocol == "epgm") {
00418 
00419         //  For EPGM transport with UDP encapsulation of PGM is used.
00420         bool udp_encapsulation = (protocol == "epgm");
00421 
00422         //  At this point we'll create message pipes to the session straight
00423         //  away. There's no point in delaying it as no concept of 'connect'
00424         //  exists with PGM anyway.
00425         if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
00426 
00427             //  PGM sender.
00428             pgm_sender_t *pgm_sender =  new (std::nothrow) pgm_sender_t (
00429                 io_thread, options);
00430             alloc_assert (pgm_sender);
00431 
00432             int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
00433             zmq_assert (rc == 0);
00434 
00435             send_attach (this, pgm_sender);
00436         }
00437         else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
00438 
00439             //  PGM receiver.
00440             pgm_receiver_t *pgm_receiver =  new (std::nothrow) pgm_receiver_t (
00441                 io_thread, options);
00442             alloc_assert (pgm_receiver);
00443 
00444             int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
00445             zmq_assert (rc == 0);
00446 
00447             send_attach (this, pgm_receiver);
00448         }
00449         else
00450             zmq_assert (false);
00451 
00452         return;
00453     }
00454 #endif
00455 
00456     zmq_assert (false);
00457 }
00458 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines