libzmq master
The Intelligent Transport Layer

ipc_connecter.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2011 250bpm s.r.o.
00003     Copyright (c) 2011 Other contributors as noted in the AUTHORS file
00004 
00005     This file is part of 0MQ.
00006 
00007     0MQ is free software; you can redistribute it and/or modify it under
00008     the terms of the GNU Lesser General Public License as published by
00009     the Free Software Foundation; either version 3 of the License, or
00010     (at your option) any later version.
00011 
00012     0MQ is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015     GNU Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public License
00018     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00019 */
00020 
00021 #include "ipc_connecter.hpp"
00022 
00023 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
00024 
00025 #include <new>
00026 #include <string>
00027 
00028 #include "stream_engine.hpp"
00029 #include "io_thread.hpp"
00030 #include "platform.hpp"
00031 #include "random.hpp"
00032 #include "err.hpp"
00033 #include "ip.hpp"
00034 
00035 #include <unistd.h>
00036 #include <sys/types.h>
00037 #include <sys/socket.h>
00038 #include <sys/un.h>
00039 
00040 zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
00041       class session_base_t *session_, const options_t &options_,
00042       const char *address_, bool wait_) :
00043     own_t (io_thread_, options_),
00044     io_object_t (io_thread_),
00045     s (retired_fd),
00046     handle_valid (false),
00047     wait (wait_),
00048     session (session_),
00049     current_reconnect_ivl(options.reconnect_ivl)
00050 {
00051     //  TODO: set_addess should be called separately, so that the error
00052     //  can be propagated.
00053     int rc = set_address (address_);
00054     zmq_assert (rc == 0);
00055 }
00056 
00057 zmq::ipc_connecter_t::~ipc_connecter_t ()
00058 {
00059     if (wait)
00060         cancel_timer (reconnect_timer_id);
00061     if (handle_valid)
00062         rm_fd (handle);
00063 
00064     if (s != retired_fd)
00065         close ();
00066 }
00067 
00068 void zmq::ipc_connecter_t::process_plug ()
00069 {
00070     if (wait)
00071         add_reconnect_timer();
00072     else
00073         start_connecting ();
00074 }
00075 
00076 void zmq::ipc_connecter_t::in_event ()
00077 {
00078     //  We are not polling for incomming data, so we are actually called
00079     //  because of error here. However, we can get error on out event as well
00080     //  on some platforms, so we'll simply handle both events in the same way.
00081     out_event ();
00082 }
00083 
00084 void zmq::ipc_connecter_t::out_event ()
00085 {
00086     fd_t fd = connect ();
00087     rm_fd (handle);
00088     handle_valid = false;
00089 
00090     //  Handle the error condition by attempt to reconnect.
00091     if (fd == retired_fd) {
00092         close ();
00093         wait = true;
00094         add_reconnect_timer();
00095         return;
00096     }
00097 
00098     //  Create the engine object for this connection.
00099     stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
00100     alloc_assert (engine);
00101 
00102     //  Attach the engine to the corresponding session object.
00103     send_attach (session, engine);
00104 
00105     //  Shut the connecter down.
00106     terminate ();
00107 }
00108 
00109 void zmq::ipc_connecter_t::timer_event (int id_)
00110 {
00111     zmq_assert (id_ == reconnect_timer_id);
00112     wait = false;
00113     start_connecting ();
00114 }
00115 
00116 void zmq::ipc_connecter_t::start_connecting ()
00117 {
00118     //  Open the connecting socket.
00119     int rc = open ();
00120 
00121     //  Connect may succeed in synchronous manner.
00122     if (rc == 0) {
00123         handle = add_fd (s);
00124         handle_valid = true;
00125         out_event ();
00126         return;
00127     }
00128 
00129     //  Connection establishment may be dealyed. Poll for its completion.
00130     else if (rc == -1 && errno == EAGAIN) {
00131         handle = add_fd (s);
00132         handle_valid = true;
00133         set_pollout (handle);
00134         return;
00135     }
00136 
00137     //  Handle any other error condition by eventual reconnect.
00138     wait = true;
00139     add_reconnect_timer();
00140 }
00141 
00142 void zmq::ipc_connecter_t::add_reconnect_timer()
00143 {
00144     add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
00145 }
00146 
00147 int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
00148 {
00149     //  The new interval is the current interval + random value.
00150     int this_interval = current_reconnect_ivl +
00151         (generate_random () % options.reconnect_ivl);
00152 
00153     //  Only change the current reconnect interval  if the maximum reconnect
00154     //  interval was set and if it's larger than the reconnect interval.
00155     if (options.reconnect_ivl_max > 0 && 
00156         options.reconnect_ivl_max > options.reconnect_ivl) {
00157 
00158         //  Calculate the next interval
00159         current_reconnect_ivl = current_reconnect_ivl * 2;
00160         if(current_reconnect_ivl >= options.reconnect_ivl_max) {
00161             current_reconnect_ivl = options.reconnect_ivl_max;
00162         }   
00163     }
00164     return this_interval;
00165 }
00166 
00167 int zmq::ipc_connecter_t::set_address (const char *addr_)
00168 {
00169     return address.resolve (addr_);
00170 }
00171 
00172 int zmq::ipc_connecter_t::open ()
00173 {
00174     zmq_assert (s == retired_fd);
00175 
00176     //  Create the socket.
00177     s = open_socket (AF_UNIX, SOCK_STREAM, 0);
00178     if (s == -1)
00179         return -1;
00180 
00181     //  Set the non-blocking flag.
00182     unblock_socket (s);
00183 
00184     //  Connect to the remote peer.
00185     int rc = ::connect (s, address.addr (), address.addrlen ());
00186 
00187     //  Connect was successfull immediately.
00188     if (rc == 0)
00189         return 0;
00190 
00191     //  Forward the error.
00192     return -1;
00193 }
00194 
00195 int zmq::ipc_connecter_t::close ()
00196 {
00197     zmq_assert (s != retired_fd);
00198     int rc = ::close (s);
00199     if (rc != 0)
00200         return -1;
00201     s = retired_fd;
00202     return 0;
00203 }
00204 
00205 zmq::fd_t zmq::ipc_connecter_t::connect ()
00206 {
00207     //  Following code should handle both Berkeley-derived socket
00208     //  implementations and Solaris.
00209     int err = 0;
00210 #if defined ZMQ_HAVE_HPUX
00211     int len = sizeof (err);
00212 #else
00213     socklen_t len = sizeof (err);
00214 #endif
00215     int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
00216     if (rc == -1)
00217         err = errno;
00218     if (err != 0) {
00219 
00220         //  Assert if the error was caused by 0MQ bug.
00221         //  Networking problems are OK. No need to assert.
00222         errno = err;
00223         errno_assert (errno == ECONNREFUSED || errno == ECONNRESET ||
00224             errno == ETIMEDOUT || errno == EHOSTUNREACH ||
00225             errno == ENETUNREACH || errno == ENETDOWN);
00226 
00227         return retired_fd;
00228     }
00229 
00230     fd_t result = s;
00231     s = retired_fd;
00232     return result;
00233 }
00234 
00235 #endif
00236 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines