![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <new> 00023 #include <string> 00024 00025 #include "tcp_connecter.hpp" 00026 #include "stream_engine.hpp" 00027 #include "io_thread.hpp" 00028 #include "platform.hpp" 00029 #include "random.hpp" 00030 #include "err.hpp" 00031 #include "ip.hpp" 00032 00033 #if defined ZMQ_HAVE_WINDOWS 00034 #include "windows.hpp" 00035 #else 00036 #include <unistd.h> 00037 #include <sys/types.h> 00038 #include <sys/socket.h> 00039 #include <arpa/inet.h> 00040 #include <netinet/tcp.h> 00041 #include <netinet/in.h> 00042 #include <netdb.h> 00043 #include <fcntl.h> 00044 #ifdef ZMQ_HAVE_OPENVMS 00045 #include <ioctl.h> 00046 #endif 00047 #endif 00048 00049 zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, 00050 class session_base_t *session_, const options_t &options_, 00051 const char *address_, bool wait_) : 00052 own_t (io_thread_, options_), 00053 io_object_t (io_thread_), 00054 s (retired_fd), 00055 handle_valid (false), 00056 wait (wait_), 00057 session (session_), 00058 current_reconnect_ivl(options.reconnect_ivl) 00059 { 00060 // TODO: set_addess should be called separately, so that the error 00061 // can be propagated. 00062 int rc = set_address (address_); 00063 errno_assert (rc == 0); 00064 } 00065 00066 zmq::tcp_connecter_t::~tcp_connecter_t () 00067 { 00068 if (wait) 00069 cancel_timer (reconnect_timer_id); 00070 if (handle_valid) 00071 rm_fd (handle); 00072 00073 if (s != retired_fd) 00074 close (); 00075 } 00076 00077 void zmq::tcp_connecter_t::process_plug () 00078 { 00079 if (wait) 00080 add_reconnect_timer(); 00081 else 00082 start_connecting (); 00083 } 00084 00085 void zmq::tcp_connecter_t::in_event () 00086 { 00087 // We are not polling for incomming data, so we are actually called 00088 // because of error here. However, we can get error on out event as well 00089 // on some platforms, so we'll simply handle both events in the same way. 00090 out_event (); 00091 } 00092 00093 void zmq::tcp_connecter_t::out_event () 00094 { 00095 fd_t fd = connect (); 00096 rm_fd (handle); 00097 handle_valid = false; 00098 00099 // Handle the error condition by attempt to reconnect. 00100 if (fd == retired_fd) { 00101 close (); 00102 wait = true; 00103 add_reconnect_timer(); 00104 return; 00105 } 00106 00107 tune_tcp_socket (fd); 00108 00109 // Create the engine object for this connection. 00110 stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); 00111 alloc_assert (engine); 00112 00113 // Attach the engine to the corresponding session object. 00114 send_attach (session, engine); 00115 00116 // Shut the connecter down. 00117 terminate (); 00118 } 00119 00120 void zmq::tcp_connecter_t::timer_event (int id_) 00121 { 00122 zmq_assert (id_ == reconnect_timer_id); 00123 wait = false; 00124 start_connecting (); 00125 } 00126 00127 void zmq::tcp_connecter_t::start_connecting () 00128 { 00129 // Open the connecting socket. 00130 int rc = open (); 00131 00132 // Connect may succeed in synchronous manner. 00133 if (rc == 0) { 00134 handle = add_fd (s); 00135 handle_valid = true; 00136 out_event (); 00137 return; 00138 } 00139 00140 // Connection establishment may be dealyed. Poll for its completion. 00141 else if (rc == -1 && errno == EAGAIN) { 00142 handle = add_fd (s); 00143 handle_valid = true; 00144 set_pollout (handle); 00145 return; 00146 } 00147 00148 // Handle any other error condition by eventual reconnect. 00149 wait = true; 00150 add_reconnect_timer(); 00151 } 00152 00153 void zmq::tcp_connecter_t::add_reconnect_timer() 00154 { 00155 add_timer (get_new_reconnect_ivl(), reconnect_timer_id); 00156 } 00157 00158 int zmq::tcp_connecter_t::get_new_reconnect_ivl () 00159 { 00160 // The new interval is the current interval + random value. 00161 int this_interval = current_reconnect_ivl + 00162 (generate_random () % options.reconnect_ivl); 00163 00164 // Only change the current reconnect interval if the maximum reconnect 00165 // interval was set and if it's larger than the reconnect interval. 00166 if (options.reconnect_ivl_max > 0 && 00167 options.reconnect_ivl_max > options.reconnect_ivl) { 00168 00169 // Calculate the next interval 00170 current_reconnect_ivl = current_reconnect_ivl * 2; 00171 if(current_reconnect_ivl >= options.reconnect_ivl_max) { 00172 current_reconnect_ivl = options.reconnect_ivl_max; 00173 } 00174 } 00175 return this_interval; 00176 } 00177 00178 int zmq::tcp_connecter_t::set_address (const char *addr_) 00179 { 00180 return address.resolve (addr_, false, options.ipv4only ? true : false); 00181 } 00182 00183 int zmq::tcp_connecter_t::open () 00184 { 00185 zmq_assert (s == retired_fd); 00186 00187 // Create the socket. 00188 s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP); 00189 #ifdef ZMQ_HAVE_WINDOWS 00190 if (s == INVALID_SOCKET) { 00191 wsa_error_to_errno (); 00192 return -1; 00193 } 00194 #else 00195 if (s == -1) 00196 return -1; 00197 #endif 00198 00199 // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. 00200 // Switch it on in such cases. 00201 if (address.family () == AF_INET6) 00202 enable_ipv4_mapping (s); 00203 00204 // Set the socket to non-blocking mode so that we get async connect(). 00205 unblock_socket (s); 00206 00207 // Connect to the remote peer. 00208 int rc = ::connect (s, address.addr (), address.addrlen ()); 00209 00210 // Connect was successfull immediately. 00211 if (rc == 0) 00212 return 0; 00213 00214 // Asynchronous connect was launched. 00215 #ifdef ZMQ_HAVE_WINDOWS 00216 if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS || 00217 WSAGetLastError () == WSAEWOULDBLOCK)) { 00218 errno = EAGAIN; 00219 return -1; 00220 } 00221 wsa_error_to_errno (); 00222 #else 00223 if (rc == -1 && errno == EINPROGRESS) { 00224 errno = EAGAIN; 00225 return -1; 00226 } 00227 #endif 00228 return -1; 00229 } 00230 00231 zmq::fd_t zmq::tcp_connecter_t::connect () 00232 { 00233 // Async connect have finished. Check whether an error occured. 00234 int err = 0; 00235 #if defined ZMQ_HAVE_HPUX 00236 int len = sizeof (err); 00237 #else 00238 socklen_t len = sizeof (err); 00239 #endif 00240 00241 int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); 00242 00243 // Assert if the error was caused by 0MQ bug. 00244 // Networking problems are OK. No need to assert. 00245 #ifdef ZMQ_HAVE_WINDOWS 00246 zmq_assert (rc == 0); 00247 if (err != 0) { 00248 if (err == WSAECONNREFUSED || err == WSAETIMEDOUT || 00249 err == WSAECONNABORTED || err == WSAEHOSTUNREACH || 00250 err == WSAENETUNREACH || err == WSAENETDOWN) 00251 return retired_fd; 00252 wsa_assert_no (err); 00253 } 00254 #else 00255 00256 // Following code should handle both Berkeley-derived socket 00257 // implementations and Solaris. 00258 if (rc == -1) 00259 err = errno; 00260 if (err != 0) { 00261 errno = err; 00262 errno_assert (errno == ECONNREFUSED || errno == ECONNRESET || 00263 errno == ETIMEDOUT || errno == EHOSTUNREACH || 00264 errno == ENETUNREACH || errno == ENETDOWN); 00265 return retired_fd; 00266 } 00267 #endif 00268 00269 // Return the newly connected socket. 00270 fd_t result = s; 00271 s = retired_fd; 00272 return result; 00273 } 00274 00275 void zmq::tcp_connecter_t::close () 00276 { 00277 zmq_assert (s != retired_fd); 00278 #ifdef ZMQ_HAVE_WINDOWS 00279 int rc = closesocket (s); 00280 wsa_assert (rc != SOCKET_ERROR); 00281 #else 00282 int rc = ::close (s); 00283 errno_assert (rc == 0); 00284 #endif 00285 s = retired_fd; 00286 }