![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2011 250bpm s.r.o. 00003 Copyright (c) 2011 Other contributors as noted in the AUTHORS file 00004 00005 This file is part of 0MQ. 00006 00007 0MQ is free software; you can redistribute it and/or modify it under 00008 the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 0MQ is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00021 #include "ipc_connecter.hpp" 00022 00023 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS 00024 00025 #include <new> 00026 #include <string> 00027 00028 #include "stream_engine.hpp" 00029 #include "io_thread.hpp" 00030 #include "platform.hpp" 00031 #include "random.hpp" 00032 #include "err.hpp" 00033 #include "ip.hpp" 00034 00035 #include <unistd.h> 00036 #include <sys/types.h> 00037 #include <sys/socket.h> 00038 #include <sys/un.h> 00039 00040 zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, 00041 class session_base_t *session_, const options_t &options_, 00042 const char *address_, bool wait_) : 00043 own_t (io_thread_, options_), 00044 io_object_t (io_thread_), 00045 s (retired_fd), 00046 handle_valid (false), 00047 wait (wait_), 00048 session (session_), 00049 current_reconnect_ivl(options.reconnect_ivl) 00050 { 00051 // TODO: set_addess should be called separately, so that the error 00052 // can be propagated. 00053 int rc = set_address (address_); 00054 zmq_assert (rc == 0); 00055 } 00056 00057 zmq::ipc_connecter_t::~ipc_connecter_t () 00058 { 00059 if (wait) 00060 cancel_timer (reconnect_timer_id); 00061 if (handle_valid) 00062 rm_fd (handle); 00063 00064 if (s != retired_fd) 00065 close (); 00066 } 00067 00068 void zmq::ipc_connecter_t::process_plug () 00069 { 00070 if (wait) 00071 add_reconnect_timer(); 00072 else 00073 start_connecting (); 00074 } 00075 00076 void zmq::ipc_connecter_t::in_event () 00077 { 00078 // We are not polling for incomming data, so we are actually called 00079 // because of error here. However, we can get error on out event as well 00080 // on some platforms, so we'll simply handle both events in the same way. 00081 out_event (); 00082 } 00083 00084 void zmq::ipc_connecter_t::out_event () 00085 { 00086 fd_t fd = connect (); 00087 rm_fd (handle); 00088 handle_valid = false; 00089 00090 // Handle the error condition by attempt to reconnect. 00091 if (fd == retired_fd) { 00092 close (); 00093 wait = true; 00094 add_reconnect_timer(); 00095 return; 00096 } 00097 00098 // Create the engine object for this connection. 00099 stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); 00100 alloc_assert (engine); 00101 00102 // Attach the engine to the corresponding session object. 00103 send_attach (session, engine); 00104 00105 // Shut the connecter down. 00106 terminate (); 00107 } 00108 00109 void zmq::ipc_connecter_t::timer_event (int id_) 00110 { 00111 zmq_assert (id_ == reconnect_timer_id); 00112 wait = false; 00113 start_connecting (); 00114 } 00115 00116 void zmq::ipc_connecter_t::start_connecting () 00117 { 00118 // Open the connecting socket. 00119 int rc = open (); 00120 00121 // Connect may succeed in synchronous manner. 00122 if (rc == 0) { 00123 handle = add_fd (s); 00124 handle_valid = true; 00125 out_event (); 00126 return; 00127 } 00128 00129 // Connection establishment may be dealyed. Poll for its completion. 00130 else if (rc == -1 && errno == EAGAIN) { 00131 handle = add_fd (s); 00132 handle_valid = true; 00133 set_pollout (handle); 00134 return; 00135 } 00136 00137 // Handle any other error condition by eventual reconnect. 00138 wait = true; 00139 add_reconnect_timer(); 00140 } 00141 00142 void zmq::ipc_connecter_t::add_reconnect_timer() 00143 { 00144 add_timer (get_new_reconnect_ivl(), reconnect_timer_id); 00145 } 00146 00147 int zmq::ipc_connecter_t::get_new_reconnect_ivl () 00148 { 00149 // The new interval is the current interval + random value. 00150 int this_interval = current_reconnect_ivl + 00151 (generate_random () % options.reconnect_ivl); 00152 00153 // Only change the current reconnect interval if the maximum reconnect 00154 // interval was set and if it's larger than the reconnect interval. 00155 if (options.reconnect_ivl_max > 0 && 00156 options.reconnect_ivl_max > options.reconnect_ivl) { 00157 00158 // Calculate the next interval 00159 current_reconnect_ivl = current_reconnect_ivl * 2; 00160 if(current_reconnect_ivl >= options.reconnect_ivl_max) { 00161 current_reconnect_ivl = options.reconnect_ivl_max; 00162 } 00163 } 00164 return this_interval; 00165 } 00166 00167 int zmq::ipc_connecter_t::set_address (const char *addr_) 00168 { 00169 return address.resolve (addr_); 00170 } 00171 00172 int zmq::ipc_connecter_t::open () 00173 { 00174 zmq_assert (s == retired_fd); 00175 00176 // Create the socket. 00177 s = open_socket (AF_UNIX, SOCK_STREAM, 0); 00178 if (s == -1) 00179 return -1; 00180 00181 // Set the non-blocking flag. 00182 unblock_socket (s); 00183 00184 // Connect to the remote peer. 00185 int rc = ::connect (s, address.addr (), address.addrlen ()); 00186 00187 // Connect was successfull immediately. 00188 if (rc == 0) 00189 return 0; 00190 00191 // Forward the error. 00192 return -1; 00193 } 00194 00195 int zmq::ipc_connecter_t::close () 00196 { 00197 zmq_assert (s != retired_fd); 00198 int rc = ::close (s); 00199 if (rc != 0) 00200 return -1; 00201 s = retired_fd; 00202 return 0; 00203 } 00204 00205 zmq::fd_t zmq::ipc_connecter_t::connect () 00206 { 00207 // Following code should handle both Berkeley-derived socket 00208 // implementations and Solaris. 00209 int err = 0; 00210 #if defined ZMQ_HAVE_HPUX 00211 int len = sizeof (err); 00212 #else 00213 socklen_t len = sizeof (err); 00214 #endif 00215 int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); 00216 if (rc == -1) 00217 err = errno; 00218 if (err != 0) { 00219 00220 // Assert if the error was caused by 0MQ bug. 00221 // Networking problems are OK. No need to assert. 00222 errno = err; 00223 errno_assert (errno == ECONNREFUSED || errno == ECONNRESET || 00224 errno == ETIMEDOUT || errno == EHOSTUNREACH || 00225 errno == ENETUNREACH || errno == ENETDOWN); 00226 00227 return retired_fd; 00228 } 00229 00230 fd_t result = s; 00231 s = retired_fd; 00232 return result; 00233 } 00234 00235 #endif 00236