![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2010 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <new> 00023 00024 #include <string.h> 00025 00026 #include "platform.hpp" 00027 #include "tcp_listener.hpp" 00028 #include "stream_engine.hpp" 00029 #include "io_thread.hpp" 00030 #include "session_base.hpp" 00031 #include "config.hpp" 00032 #include "err.hpp" 00033 #include "ip.hpp" 00034 00035 #ifdef ZMQ_HAVE_WINDOWS 00036 #include "windows.hpp" 00037 #else 00038 #include <unistd.h> 00039 #include <sys/socket.h> 00040 #include <arpa/inet.h> 00041 #include <netinet/tcp.h> 00042 #include <netinet/in.h> 00043 #include <netdb.h> 00044 #include <fcntl.h> 00045 #endif 00046 00047 #ifdef ZMQ_HAVE_OPENVMS 00048 #include <ioctl.h> 00049 #endif 00050 00051 zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_, 00052 socket_base_t *socket_, const options_t &options_) : 00053 own_t (io_thread_, options_), 00054 io_object_t (io_thread_), 00055 has_file (false), 00056 s (retired_fd), 00057 socket (socket_) 00058 { 00059 } 00060 00061 zmq::tcp_listener_t::~tcp_listener_t () 00062 { 00063 if (s != retired_fd) 00064 close (); 00065 } 00066 00067 void zmq::tcp_listener_t::process_plug () 00068 { 00069 // Start polling for incoming connections. 00070 handle = add_fd (s); 00071 set_pollin (handle); 00072 } 00073 00074 void zmq::tcp_listener_t::process_term (int linger_) 00075 { 00076 rm_fd (handle); 00077 own_t::process_term (linger_); 00078 } 00079 00080 void zmq::tcp_listener_t::in_event () 00081 { 00082 fd_t fd = accept (); 00083 00084 // If connection was reset by the peer in the meantime, just ignore it. 00085 // TODO: Handle specific errors like ENFILE/EMFILE etc. 00086 if (fd == retired_fd) 00087 return; 00088 00089 tune_tcp_socket (fd); 00090 00091 // Create the engine object for this connection. 00092 stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); 00093 alloc_assert (engine); 00094 00095 // Choose I/O thread to run connecter in. Given that we are already 00096 // running in an I/O thread, there must be at least one available. 00097 io_thread_t *io_thread = choose_io_thread (options.affinity); 00098 zmq_assert (io_thread); 00099 00100 // Create and launch a session object. 00101 session_base_t *session = session_base_t::create (io_thread, false, socket, 00102 options, NULL, NULL); 00103 errno_assert (session); 00104 session->inc_seqnum (); 00105 launch_child (session); 00106 send_attach (session, engine, false); 00107 } 00108 00109 void zmq::tcp_listener_t::close () 00110 { 00111 zmq_assert (s != retired_fd); 00112 #ifdef ZMQ_HAVE_WINDOWS 00113 int rc = closesocket (s); 00114 wsa_assert (rc != SOCKET_ERROR); 00115 #else 00116 int rc = ::close (s); 00117 errno_assert (rc == 0); 00118 #endif 00119 s = retired_fd; 00120 } 00121 00122 int zmq::tcp_listener_t::set_address (const char *addr_) 00123 { 00124 // Convert the textual address into address structure. 00125 int rc = address.resolve (addr_, true, options.ipv4only ? true : false); 00126 if (rc != 0) 00127 return -1; 00128 00129 // Create a listening socket. 00130 s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP); 00131 #ifdef ZMQ_HAVE_WINDOWS 00132 if (s == INVALID_SOCKET) 00133 wsa_error_to_errno (); 00134 #endif 00135 00136 // IPv6 address family not supported, try automatic downgrade to IPv4. 00137 if (address.family () == AF_INET6 && errno == EAFNOSUPPORT && 00138 !options.ipv4only) { 00139 rc = address.resolve (addr_, true, true); 00140 if (rc != 0) 00141 return rc; 00142 s = ::socket (address.family (), SOCK_STREAM, IPPROTO_TCP); 00143 } 00144 00145 #ifdef ZMQ_HAVE_WINDOWS 00146 if (s == INVALID_SOCKET) { 00147 wsa_error_to_errno (); 00148 return -1; 00149 } 00150 #else 00151 if (s == -1) 00152 return -1; 00153 #endif 00154 00155 // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. 00156 // Switch it on in such cases. 00157 if (address.family () == AF_INET6) 00158 enable_ipv4_mapping (s); 00159 00160 // Allow reusing of the address. 00161 int flag = 1; 00162 #ifdef ZMQ_HAVE_WINDOWS 00163 rc = setsockopt (s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, 00164 (const char*) &flag, sizeof (int)); 00165 wsa_assert (rc != SOCKET_ERROR); 00166 #else 00167 rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); 00168 errno_assert (rc == 0); 00169 #endif 00170 00171 // Bind the socket to the network interface and port. 00172 rc = bind (s, address.addr (), address.addrlen ()); 00173 #ifdef ZMQ_HAVE_WINDOWS 00174 if (rc == SOCKET_ERROR) { 00175 wsa_error_to_errno (); 00176 return -1; 00177 } 00178 #else 00179 if (rc != 0) 00180 return -1; 00181 #endif 00182 00183 // Listen for incomming connections. 00184 rc = listen (s, options.backlog); 00185 #ifdef ZMQ_HAVE_WINDOWS 00186 if (rc == SOCKET_ERROR) { 00187 wsa_error_to_errno (); 00188 return -1; 00189 } 00190 #else 00191 if (rc != 0) 00192 return -1; 00193 #endif 00194 00195 return 0; 00196 } 00197 00198 zmq::fd_t zmq::tcp_listener_t::accept () 00199 { 00200 // Accept one connection and deal with different failure modes. 00201 zmq_assert (s != retired_fd); 00202 fd_t sock = ::accept (s, NULL, NULL); 00203 #ifdef ZMQ_HAVE_WINDOWS 00204 if (sock == INVALID_SOCKET) { 00205 wsa_assert (WSAGetLastError () == WSAEWOULDBLOCK || 00206 WSAGetLastError () == WSAECONNRESET); 00207 return retired_fd; 00208 } 00209 #else 00210 if (sock == -1) { 00211 errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || 00212 errno == EINTR || errno == ECONNABORTED || errno == EPROTO || 00213 errno == ENOBUFS); 00214 return retired_fd; 00215 } 00216 #endif 00217 return sock; 00218 } 00219