![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "session_base.hpp" 00024 #include "socket_base.hpp" 00025 #include "i_engine.hpp" 00026 #include "err.hpp" 00027 #include "pipe.hpp" 00028 #include "likely.hpp" 00029 #include "tcp_connecter.hpp" 00030 #include "ipc_connecter.hpp" 00031 #include "pgm_sender.hpp" 00032 #include "pgm_receiver.hpp" 00033 00034 #include "req.hpp" 00035 #include "xreq.hpp" 00036 #include "rep.hpp" 00037 #include "xrep.hpp" 00038 #include "pub.hpp" 00039 #include "xpub.hpp" 00040 #include "sub.hpp" 00041 #include "xsub.hpp" 00042 #include "push.hpp" 00043 #include "pull.hpp" 00044 #include "pair.hpp" 00045 00046 zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, 00047 bool connect_, class socket_base_t *socket_, const options_t &options_, 00048 const char *protocol_, const char *address_) 00049 { 00050 session_base_t *s = NULL; 00051 switch (options_.type) { 00052 case ZMQ_REQ: 00053 s = new (std::nothrow) req_session_t (io_thread_, connect_, 00054 socket_, options_, protocol_, address_); 00055 break; 00056 case ZMQ_XREQ: 00057 s = new (std::nothrow) xreq_session_t (io_thread_, connect_, 00058 socket_, options_, protocol_, address_); 00059 case ZMQ_REP: 00060 s = new (std::nothrow) rep_session_t (io_thread_, connect_, 00061 socket_, options_, protocol_, address_); 00062 break; 00063 case ZMQ_XREP: 00064 s = new (std::nothrow) xrep_session_t (io_thread_, connect_, 00065 socket_, options_, protocol_, address_); 00066 break; 00067 case ZMQ_PUB: 00068 s = new (std::nothrow) pub_session_t (io_thread_, connect_, 00069 socket_, options_, protocol_, address_); 00070 break; 00071 case ZMQ_XPUB: 00072 s = new (std::nothrow) xpub_session_t (io_thread_, connect_, 00073 socket_, options_, protocol_, address_); 00074 break; 00075 case ZMQ_SUB: 00076 s = new (std::nothrow) sub_session_t (io_thread_, connect_, 00077 socket_, options_, protocol_, address_); 00078 break; 00079 case ZMQ_XSUB: 00080 s = new (std::nothrow) xsub_session_t (io_thread_, connect_, 00081 socket_, options_, protocol_, address_); 00082 break; 00083 case ZMQ_PUSH: 00084 s = new (std::nothrow) push_session_t (io_thread_, connect_, 00085 socket_, options_, protocol_, address_); 00086 break; 00087 case ZMQ_PULL: 00088 s = new (std::nothrow) pull_session_t (io_thread_, connect_, 00089 socket_, options_, protocol_, address_); 00090 break; 00091 case ZMQ_PAIR: 00092 s = new (std::nothrow) pair_session_t (io_thread_, connect_, 00093 socket_, options_, protocol_, address_); 00094 break; 00095 default: 00096 errno = EINVAL; 00097 return NULL; 00098 } 00099 alloc_assert (s); 00100 return s; 00101 } 00102 00103 zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, 00104 bool connect_, class socket_base_t *socket_, const options_t &options_, 00105 const char *protocol_, const char *address_) : 00106 own_t (io_thread_, options_), 00107 io_object_t (io_thread_), 00108 connect (connect_), 00109 pipe (NULL), 00110 incomplete_in (false), 00111 pending (false), 00112 engine (NULL), 00113 socket (socket_), 00114 io_thread (io_thread_), 00115 has_linger_timer (false), 00116 send_identity (options_.send_identity), 00117 recv_identity (options_.recv_identity) 00118 { 00119 if (protocol_) 00120 protocol = protocol_; 00121 if (address_) 00122 address = address_; 00123 } 00124 00125 zmq::session_base_t::~session_base_t () 00126 { 00127 zmq_assert (!pipe); 00128 00129 // If there's still a pending linger timer, remove it. 00130 if (has_linger_timer) { 00131 cancel_timer (linger_timer_id); 00132 has_linger_timer = false; 00133 } 00134 00135 // Close the engine. 00136 if (engine) 00137 engine->terminate (); 00138 } 00139 00140 void zmq::session_base_t::attach_pipe (pipe_t *pipe_) 00141 { 00142 zmq_assert (!is_terminating ()); 00143 zmq_assert (!pipe); 00144 zmq_assert (pipe_); 00145 pipe = pipe_; 00146 pipe->set_event_sink (this); 00147 } 00148 00149 int zmq::session_base_t::read (msg_t *msg_) 00150 { 00151 // First message to send is identity (if required). 00152 if (send_identity) { 00153 zmq_assert (!(msg_->flags () & msg_t::more)); 00154 msg_->init_size (options.identity_size); 00155 memcpy (msg_->data (), options.identity, options.identity_size); 00156 send_identity = false; 00157 incomplete_in = false; 00158 return 0; 00159 } 00160 00161 if (!pipe || !pipe->read (msg_)) { 00162 errno = EAGAIN; 00163 return -1; 00164 } 00165 incomplete_in = msg_->flags () & msg_t::more ? true : false; 00166 00167 return 0; 00168 } 00169 00170 int zmq::session_base_t::write (msg_t *msg_) 00171 { 00172 // First message to receive is identity (if required). 00173 if (recv_identity) { 00174 msg_->set_flags (msg_t::identity); 00175 recv_identity = false; 00176 } 00177 00178 if (pipe && pipe->write (msg_)) { 00179 int rc = msg_->init (); 00180 errno_assert (rc == 0); 00181 return 0; 00182 } 00183 00184 errno = EAGAIN; 00185 return -1; 00186 } 00187 00188 void zmq::session_base_t::flush () 00189 { 00190 if (pipe) 00191 pipe->flush (); 00192 } 00193 00194 void zmq::session_base_t::clean_pipes () 00195 { 00196 if (pipe) { 00197 00198 // Get rid of half-processed messages in the out pipe. Flush any 00199 // unflushed messages upstream. 00200 pipe->rollback (); 00201 pipe->flush (); 00202 00203 // Remove any half-read message from the in pipe. 00204 while (incomplete_in) { 00205 msg_t msg; 00206 int rc = msg.init (); 00207 errno_assert (rc == 0); 00208 if (!read (&msg)) { 00209 zmq_assert (!incomplete_in); 00210 break; 00211 } 00212 rc = msg.close (); 00213 errno_assert (rc == 0); 00214 } 00215 } 00216 } 00217 00218 void zmq::session_base_t::terminated (pipe_t *pipe_) 00219 { 00220 // Drop the reference to the deallocated pipe. 00221 zmq_assert (pipe == pipe_); 00222 pipe = NULL; 00223 00224 // If we are waiting for pending messages to be sent, at this point 00225 // we are sure that there will be no more messages and we can proceed 00226 // with termination safely. 00227 if (pending) 00228 proceed_with_term (); 00229 } 00230 00231 void zmq::session_base_t::read_activated (pipe_t *pipe_) 00232 { 00233 zmq_assert (pipe == pipe_); 00234 00235 if (likely (engine != NULL)) 00236 engine->activate_out (); 00237 else 00238 pipe->check_read (); 00239 } 00240 00241 void zmq::session_base_t::write_activated (pipe_t *pipe_) 00242 { 00243 zmq_assert (pipe == pipe_); 00244 00245 if (engine) 00246 engine->activate_in (); 00247 } 00248 00249 void zmq::session_base_t::hiccuped (pipe_t *pipe_) 00250 { 00251 // Hiccups are always sent from session to socket, not the other 00252 // way round. 00253 zmq_assert (false); 00254 } 00255 00256 void zmq::session_base_t::process_plug () 00257 { 00258 if (connect) 00259 start_connecting (false); 00260 } 00261 00262 void zmq::session_base_t::process_attach (i_engine *engine_) 00263 { 00264 // If some other object (e.g. init) notifies us that the connection failed 00265 // without creating an engine we need to start the reconnection process. 00266 if (!engine_) { 00267 zmq_assert (!engine); 00268 detached (); 00269 return; 00270 } 00271 00272 // Create the pipe if it does not exist yet. 00273 if (!pipe && !is_terminating ()) { 00274 object_t *parents [2] = {this, socket}; 00275 pipe_t *pipes [2] = {NULL, NULL}; 00276 int hwms [2] = {options.rcvhwm, options.sndhwm}; 00277 bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; 00278 int rc = pipepair (parents, pipes, hwms, delays); 00279 errno_assert (rc == 0); 00280 00281 // Plug the local end of the pipe. 00282 pipes [0]->set_event_sink (this); 00283 00284 // Remember the local end of the pipe. 00285 zmq_assert (!pipe); 00286 pipe = pipes [0]; 00287 00288 // Ask socket to plug into the remote end of the pipe. 00289 send_bind (socket, pipes [1]); 00290 } 00291 00292 // Plug in the engine. 00293 zmq_assert (!engine); 00294 engine = engine_; 00295 engine->plug (io_thread, this); 00296 } 00297 00298 void zmq::session_base_t::detach () 00299 { 00300 // Engine is dead. Let's forget about it. 00301 engine = NULL; 00302 00303 // Remove any half-done messages from the pipes. 00304 clean_pipes (); 00305 00306 // Send the event to the derived class. 00307 detached (); 00308 00309 // Just in case there's only a delimiter in the pipe. 00310 if (pipe) 00311 pipe->check_read (); 00312 } 00313 00314 void zmq::session_base_t::process_term (int linger_) 00315 { 00316 zmq_assert (!pending); 00317 00318 // If the termination of the pipe happens before the term command is 00319 // delivered there's nothing much to do. We can proceed with the 00320 // stadard termination immediately. 00321 if (!pipe) { 00322 proceed_with_term (); 00323 return; 00324 } 00325 00326 pending = true; 00327 00328 // If there's finite linger value, delay the termination. 00329 // If linger is infinite (negative) we don't even have to set 00330 // the timer. 00331 if (linger_ > 0) { 00332 zmq_assert (!has_linger_timer); 00333 add_timer (linger_, linger_timer_id); 00334 has_linger_timer = true; 00335 } 00336 00337 // Start pipe termination process. Delay the termination till all messages 00338 // are processed in case the linger time is non-zero. 00339 pipe->terminate (linger_ != 0); 00340 00341 // TODO: Should this go into pipe_t::terminate ? 00342 // In case there's no engine and there's only delimiter in the 00343 // pipe it wouldn't be ever read. Thus we check for it explicitly. 00344 pipe->check_read (); 00345 } 00346 00347 void zmq::session_base_t::proceed_with_term () 00348 { 00349 // The pending phase have just ended. 00350 pending = false; 00351 00352 // Continue with standard termination. 00353 own_t::process_term (0); 00354 } 00355 00356 void zmq::session_base_t::timer_event (int id_) 00357 { 00358 // Linger period expired. We can proceed with termination even though 00359 // there are still pending messages to be sent. 00360 zmq_assert (id_ == linger_timer_id); 00361 has_linger_timer = false; 00362 00363 // Ask pipe to terminate even though there may be pending messages in it. 00364 zmq_assert (pipe); 00365 pipe->terminate (false); 00366 } 00367 00368 void zmq::session_base_t::detached () 00369 { 00370 // Transient session self-destructs after peer disconnects. 00371 if (!connect) { 00372 terminate (); 00373 return; 00374 } 00375 00376 // Reconnect. 00377 start_connecting (true); 00378 00379 // For subscriber sockets we hiccup the inbound pipe, which will cause 00380 // the socket object to resend all the subscriptions. 00381 if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) 00382 pipe->hiccup (); 00383 } 00384 00385 void zmq::session_base_t::start_connecting (bool wait_) 00386 { 00387 zmq_assert (connect); 00388 00389 // Choose I/O thread to run connecter in. Given that we are already 00390 // running in an I/O thread, there must be at least one available. 00391 io_thread_t *io_thread = choose_io_thread (options.affinity); 00392 zmq_assert (io_thread); 00393 00394 // Create the connecter object. 00395 00396 if (protocol == "tcp") { 00397 tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t ( 00398 io_thread, this, options, address.c_str (), wait_); 00399 alloc_assert (connecter); 00400 launch_child (connecter); 00401 return; 00402 } 00403 00404 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS 00405 if (protocol == "ipc") { 00406 ipc_connecter_t *connecter = new (std::nothrow) ipc_connecter_t ( 00407 io_thread, this, options, address.c_str (), wait_); 00408 alloc_assert (connecter); 00409 launch_child (connecter); 00410 return; 00411 } 00412 #endif 00413 00414 #if defined ZMQ_HAVE_OPENPGM 00415 00416 // Both PGM and EPGM transports are using the same infrastructure. 00417 if (protocol == "pgm" || protocol == "epgm") { 00418 00419 // For EPGM transport with UDP encapsulation of PGM is used. 00420 bool udp_encapsulation = (protocol == "epgm"); 00421 00422 // At this point we'll create message pipes to the session straight 00423 // away. There's no point in delaying it as no concept of 'connect' 00424 // exists with PGM anyway. 00425 if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { 00426 00427 // PGM sender. 00428 pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( 00429 io_thread, options); 00430 alloc_assert (pgm_sender); 00431 00432 int rc = pgm_sender->init (udp_encapsulation, address.c_str ()); 00433 zmq_assert (rc == 0); 00434 00435 send_attach (this, pgm_sender); 00436 } 00437 else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { 00438 00439 // PGM receiver. 00440 pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( 00441 io_thread, options); 00442 alloc_assert (pgm_receiver); 00443 00444 int rc = pgm_receiver->init (udp_encapsulation, address.c_str ()); 00445 zmq_assert (rc == 0); 00446 00447 send_attach (this, pgm_receiver); 00448 } 00449 else 00450 zmq_assert (false); 00451 00452 return; 00453 } 00454 #endif 00455 00456 zmq_assert (false); 00457 } 00458