![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2011 250bpm s.r.o. 00003 Copyright (c) 2011 Other contributors as noted in the AUTHORS file 00004 00005 This file is part of 0MQ. 00006 00007 0MQ is free software; you can redistribute it and/or modify it under 00008 the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 0MQ is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00021 #include "ipc_listener.hpp" 00022 00023 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS 00024 00025 #include <new> 00026 00027 #include <string.h> 00028 00029 #include "stream_engine.hpp" 00030 #include "ipc_address.hpp" 00031 #include "io_thread.hpp" 00032 #include "session_base.hpp" 00033 #include "config.hpp" 00034 #include "err.hpp" 00035 #include "ip.hpp" 00036 00037 #include <unistd.h> 00038 #include <sys/socket.h> 00039 #include <fcntl.h> 00040 #include <sys/un.h> 00041 00042 zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_, 00043 socket_base_t *socket_, const options_t &options_) : 00044 own_t (io_thread_, options_), 00045 io_object_t (io_thread_), 00046 has_file (false), 00047 s (retired_fd), 00048 socket (socket_) 00049 { 00050 } 00051 00052 zmq::ipc_listener_t::~ipc_listener_t () 00053 { 00054 if (s != retired_fd) 00055 close (); 00056 } 00057 00058 void zmq::ipc_listener_t::process_plug () 00059 { 00060 // Start polling for incoming connections. 00061 handle = add_fd (s); 00062 set_pollin (handle); 00063 } 00064 00065 void zmq::ipc_listener_t::process_term (int linger_) 00066 { 00067 rm_fd (handle); 00068 own_t::process_term (linger_); 00069 } 00070 00071 void zmq::ipc_listener_t::in_event () 00072 { 00073 fd_t fd = accept (); 00074 00075 // If connection was reset by the peer in the meantime, just ignore it. 00076 // TODO: Handle specific errors like ENFILE/EMFILE etc. 00077 if (fd == retired_fd) 00078 return; 00079 00080 // Create the engine object for this connection. 00081 stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); 00082 alloc_assert (engine); 00083 00084 // Choose I/O thread to run connecter in. Given that we are already 00085 // running in an I/O thread, there must be at least one available. 00086 io_thread_t *io_thread = choose_io_thread (options.affinity); 00087 zmq_assert (io_thread); 00088 00089 // Create and launch a session object. 00090 session_base_t *session = session_base_t::create (io_thread, false, socket, 00091 options, NULL, NULL); 00092 errno_assert (session); 00093 session->inc_seqnum (); 00094 launch_child (session); 00095 send_attach (session, engine, false); 00096 } 00097 00098 int zmq::ipc_listener_t::set_address (const char *addr_) 00099 { 00100 // Get rid of the file associated with the UNIX domain socket that 00101 // may have been left behind by the previous run of the application. 00102 ::unlink (addr_); 00103 filename.clear (); 00104 00105 // Initialise the address structure. 00106 ipc_address_t address; 00107 int rc = address.resolve (addr_); 00108 if (rc != 0) 00109 return -1; 00110 00111 // Create a listening socket. 00112 s = open_socket (AF_UNIX, SOCK_STREAM, 0); 00113 if (s == -1) 00114 return -1; 00115 00116 // Bind the socket to the file path. 00117 rc = bind (s, address.addr (), address.addrlen ()); 00118 if (rc != 0) 00119 return -1; 00120 00121 has_file = true; 00122 00123 // Listen for incomming connections. 00124 rc = listen (s, options.backlog); 00125 if (rc != 0) 00126 return -1; 00127 00128 return 0; 00129 } 00130 00131 int zmq::ipc_listener_t::close () 00132 { 00133 zmq_assert (s != retired_fd); 00134 int rc = ::close (s); 00135 if (rc != 0) 00136 return -1; 00137 s = retired_fd; 00138 00139 // If there's an underlying UNIX domain socket, get rid of the file it 00140 // is associated with. 00141 if (has_file && !filename.empty ()) { 00142 rc = ::unlink(filename.c_str ()); 00143 if (rc != 0) 00144 return -1; 00145 } 00146 00147 return 0; 00148 } 00149 00150 zmq::fd_t zmq::ipc_listener_t::accept () 00151 { 00152 // Accept one connection and deal with different failure modes. 00153 zmq_assert (s != retired_fd); 00154 fd_t sock = ::accept (s, NULL, NULL); 00155 if (sock == -1) { 00156 errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || 00157 errno == EINTR || errno == ECONNABORTED || errno == EPROTO || 00158 errno == ENOBUFS); 00159 return retired_fd; 00160 } 00161 return sock; 00162 } 00163 00164 #endif 00165