libzmq master
The Intelligent Transport Layer

tcp_connecter.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include <new>
00023 #include <string>
00024 
00025 #include "tcp_connecter.hpp"
00026 #include "stream_engine.hpp"
00027 #include "io_thread.hpp"
00028 #include "platform.hpp"
00029 #include "random.hpp"
00030 #include "err.hpp"
00031 #include "ip.hpp"
00032 
00033 #if defined ZMQ_HAVE_WINDOWS
00034 #include "windows.hpp"
00035 #else
00036 #include <unistd.h>
00037 #include <sys/types.h>
00038 #include <sys/socket.h>
00039 #include <arpa/inet.h>
00040 #include <netinet/tcp.h>
00041 #include <netinet/in.h>
00042 #include <netdb.h>
00043 #include <fcntl.h>
00044 #ifdef ZMQ_HAVE_OPENVMS
00045 #include <ioctl.h>
00046 #endif
00047 #endif
00048 
00049 zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
00050       class session_base_t *session_, const options_t &options_,
00051       const char *address_, bool wait_) :
00052     own_t (io_thread_, options_),
00053     io_object_t (io_thread_),
00054     s (retired_fd),
00055     handle_valid (false),
00056     wait (wait_),
00057     session (session_),
00058     current_reconnect_ivl(options.reconnect_ivl)
00059 {
00060     //  TODO: set_addess should be called separately, so that the error
00061     //  can be propagated.
00062     int rc = set_address (address_);
00063     errno_assert (rc == 0);
00064 }
00065 
00066 zmq::tcp_connecter_t::~tcp_connecter_t ()
00067 {
00068     if (wait)
00069         cancel_timer (reconnect_timer_id);
00070     if (handle_valid)
00071         rm_fd (handle);
00072 
00073     if (s != retired_fd)
00074         close ();
00075 }
00076 
00077 void zmq::tcp_connecter_t::process_plug ()
00078 {
00079     if (wait)
00080         add_reconnect_timer();
00081     else
00082         start_connecting ();
00083 }
00084 
00085 void zmq::tcp_connecter_t::in_event ()
00086 {
00087     //  We are not polling for incomming data, so we are actually called
00088     //  because of error here. However, we can get error on out event as well
00089     //  on some platforms, so we'll simply handle both events in the same way.
00090     out_event ();
00091 }
00092 
00093 void zmq::tcp_connecter_t::out_event ()
00094 {
00095     fd_t fd = connect ();
00096     rm_fd (handle);
00097     handle_valid = false;
00098 
00099     //  Handle the error condition by attempt to reconnect.
00100     if (fd == retired_fd) {
00101         close ();
00102         wait = true;
00103         add_reconnect_timer();
00104         return;
00105     }
00106 
00107     tune_tcp_socket (fd);
00108 
00109     //  Create the engine object for this connection.
00110     stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
00111     alloc_assert (engine);
00112 
00113     //  Attach the engine to the corresponding session object.
00114     send_attach (session, engine);
00115 
00116     //  Shut the connecter down.
00117     terminate ();
00118 }
00119 
00120 void zmq::tcp_connecter_t::timer_event (int id_)
00121 {
00122     zmq_assert (id_ == reconnect_timer_id);
00123     wait = false;
00124     start_connecting ();
00125 }
00126 
00127 void zmq::tcp_connecter_t::start_connecting ()
00128 {
00129     //  Open the connecting socket.
00130     int rc = open ();
00131 
00132     //  Connect may succeed in synchronous manner.
00133     if (rc == 0) {
00134         handle = add_fd (s);
00135         handle_valid = true;
00136         out_event ();
00137         return;
00138     }
00139 
00140     //  Connection establishment may be dealyed. Poll for its completion.
00141     else if (rc == -1 && errno == EAGAIN) {
00142         handle = add_fd (s);
00143         handle_valid = true;
00144         set_pollout (handle);
00145         return;
00146     }
00147 
00148     //  Handle any other error condition by eventual reconnect.
00149     wait = true;
00150     add_reconnect_timer();
00151 }
00152 
00153 void zmq::tcp_connecter_t::add_reconnect_timer()
00154 {
00155     add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
00156 }
00157 
00158 int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
00159 {
00160     //  The new interval is the current interval + random value.
00161     int this_interval = current_reconnect_ivl +
00162         (generate_random () % options.reconnect_ivl);
00163 
00164     //  Only change the current reconnect interval  if the maximum reconnect
00165     //  interval was set and if it's larger than the reconnect interval.
00166     if (options.reconnect_ivl_max > 0 && 
00167         options.reconnect_ivl_max > options.reconnect_ivl) {
00168 
00169         //  Calculate the next interval
00170         current_reconnect_ivl = current_reconnect_ivl * 2;
00171         if(current_reconnect_ivl >= options.reconnect_ivl_max) {
00172             current_reconnect_ivl = options.reconnect_ivl_max;
00173         }   
00174     }
00175     return this_interval;
00176 }
00177 
00178 int zmq::tcp_connecter_t::set_address (const char *addr_)
00179 {
00180     return address.resolve (addr_, false, options.ipv4only ? true : false);
00181 }
00182 
00183 int zmq::tcp_connecter_t::open ()
00184 {
00185     zmq_assert (s == retired_fd);
00186 
00187     //  Create the socket.
00188     s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP);
00189 #ifdef ZMQ_HAVE_WINDOWS
00190     if (s == INVALID_SOCKET) {
00191         wsa_error_to_errno ();
00192         return -1;
00193     }
00194 #else
00195     if (s == -1)
00196         return -1;
00197 #endif
00198 
00199     //  On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
00200     //  Switch it on in such cases.
00201     if (address.family () == AF_INET6)
00202         enable_ipv4_mapping (s);
00203 
00204     // Set the socket to non-blocking mode so that we get async connect().
00205     unblock_socket (s);
00206 
00207     //  Connect to the remote peer.
00208     int rc = ::connect (s, address.addr (), address.addrlen ());
00209 
00210     //  Connect was successfull immediately.
00211     if (rc == 0)
00212         return 0;
00213 
00214     //  Asynchronous connect was launched.
00215 #ifdef ZMQ_HAVE_WINDOWS
00216     if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS ||
00217           WSAGetLastError () == WSAEWOULDBLOCK)) {
00218         errno = EAGAIN;
00219         return -1;
00220     }    
00221     wsa_error_to_errno ();
00222 #else
00223     if (rc == -1 && errno == EINPROGRESS) {
00224         errno = EAGAIN;
00225         return -1;
00226     }
00227 #endif
00228     return -1;
00229 }
00230 
00231 zmq::fd_t zmq::tcp_connecter_t::connect ()
00232 {
00233     //  Async connect have finished. Check whether an error occured.
00234     int err = 0;
00235 #if defined ZMQ_HAVE_HPUX
00236     int len = sizeof (err);
00237 #else
00238     socklen_t len = sizeof (err);
00239 #endif
00240 
00241     int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
00242 
00243     //  Assert if the error was caused by 0MQ bug.
00244     //  Networking problems are OK. No need to assert.
00245 #ifdef ZMQ_HAVE_WINDOWS
00246     zmq_assert (rc == 0);
00247     if (err != 0) {
00248         if (err == WSAECONNREFUSED || err == WSAETIMEDOUT ||
00249               err == WSAECONNABORTED || err == WSAEHOSTUNREACH ||
00250               err == WSAENETUNREACH || err == WSAENETDOWN)
00251             return retired_fd;
00252         wsa_assert_no (err);
00253     }
00254 #else
00255 
00256     //  Following code should handle both Berkeley-derived socket
00257     //  implementations and Solaris.
00258     if (rc == -1)
00259         err = errno;
00260     if (err != 0) {
00261         errno = err;
00262         errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
00263             errno == ETIMEDOUT || errno == EHOSTUNREACH ||
00264             errno == ENETUNREACH || errno == ENETDOWN);
00265         return retired_fd;
00266     }
00267 #endif
00268 
00269     //  Return the newly connected socket.
00270     fd_t result = s;
00271     s = retired_fd;
00272     return result;
00273 }
00274 
00275 void zmq::tcp_connecter_t::close ()
00276 {
00277     zmq_assert (s != retired_fd);
00278 #ifdef ZMQ_HAVE_WINDOWS
00279     int rc = closesocket (s);
00280     wsa_assert (rc != SOCKET_ERROR);
00281 #else
00282     int rc = ::close (s);
00283     errno_assert (rc == 0);
00284 #endif
00285     s = retired_fd;
00286 }
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines