libzmq master
The Intelligent Transport Layer

ipc_listener.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2011 250bpm s.r.o.
00003     Copyright (c) 2011 Other contributors as noted in the AUTHORS file
00004 
00005     This file is part of 0MQ.
00006 
00007     0MQ is free software; you can redistribute it and/or modify it under
00008     the terms of the GNU Lesser General Public License as published by
00009     the Free Software Foundation; either version 3 of the License, or
00010     (at your option) any later version.
00011 
00012     0MQ is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015     GNU Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public License
00018     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00019 */
00020 
00021 #include "ipc_listener.hpp"
00022 
00023 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
00024 
00025 #include <new>
00026 
00027 #include <string.h>
00028 
00029 #include "stream_engine.hpp"
00030 #include "ipc_address.hpp"
00031 #include "io_thread.hpp"
00032 #include "session_base.hpp"
00033 #include "config.hpp"
00034 #include "err.hpp"
00035 #include "ip.hpp"
00036 
00037 #include <unistd.h>
00038 #include <sys/socket.h>
00039 #include <fcntl.h>
00040 #include <sys/un.h>
00041 
00042 zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
00043       socket_base_t *socket_, const options_t &options_) :
00044     own_t (io_thread_, options_),
00045     io_object_t (io_thread_),
00046     has_file (false),
00047     s (retired_fd),
00048     socket (socket_)
00049 {
00050 }
00051 
00052 zmq::ipc_listener_t::~ipc_listener_t ()
00053 {
00054     if (s != retired_fd)
00055         close ();
00056 }
00057 
00058 void zmq::ipc_listener_t::process_plug ()
00059 {
00060     //  Start polling for incoming connections.
00061     handle = add_fd (s);
00062     set_pollin (handle);
00063 }
00064 
00065 void zmq::ipc_listener_t::process_term (int linger_)
00066 {
00067     rm_fd (handle);
00068     own_t::process_term (linger_);
00069 }
00070 
00071 void zmq::ipc_listener_t::in_event ()
00072 {
00073     fd_t fd = accept ();
00074 
00075     //  If connection was reset by the peer in the meantime, just ignore it.
00076     //  TODO: Handle specific errors like ENFILE/EMFILE etc.
00077     if (fd == retired_fd)
00078         return;
00079 
00080     //  Create the engine object for this connection.
00081     stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
00082     alloc_assert (engine);
00083 
00084     //  Choose I/O thread to run connecter in. Given that we are already
00085     //  running in an I/O thread, there must be at least one available.
00086     io_thread_t *io_thread = choose_io_thread (options.affinity);
00087     zmq_assert (io_thread);
00088 
00089     //  Create and launch a session object. 
00090     session_base_t *session = session_base_t::create (io_thread, false, socket,
00091         options, NULL, NULL);
00092     errno_assert (session);
00093     session->inc_seqnum ();
00094     launch_child (session);
00095     send_attach (session, engine, false);
00096 }
00097 
00098 int zmq::ipc_listener_t::set_address (const char *addr_)
00099 {
00100     //  Get rid of the file associated with the UNIX domain socket that
00101     //  may have been left behind by the previous run of the application.
00102     ::unlink (addr_);
00103     filename.clear ();
00104 
00105     //  Initialise the address structure.
00106     ipc_address_t address;
00107     int rc = address.resolve (addr_);
00108     if (rc != 0)
00109         return -1;
00110 
00111     //  Create a listening socket.
00112     s = open_socket (AF_UNIX, SOCK_STREAM, 0);
00113     if (s == -1)
00114         return -1;
00115 
00116     //  Bind the socket to the file path.
00117     rc = bind (s, address.addr (), address.addrlen ());
00118     if (rc != 0)
00119         return -1;
00120 
00121     has_file = true;
00122 
00123     //  Listen for incomming connections.
00124     rc = listen (s, options.backlog);
00125     if (rc != 0)
00126         return -1;
00127 
00128     return 0;  
00129 }
00130 
00131 int zmq::ipc_listener_t::close ()
00132 {
00133     zmq_assert (s != retired_fd);
00134     int rc = ::close (s);
00135     if (rc != 0)
00136         return -1;
00137     s = retired_fd;
00138 
00139     //  If there's an underlying UNIX domain socket, get rid of the file it
00140     //  is associated with.
00141     if (has_file && !filename.empty ()) {
00142         rc = ::unlink(filename.c_str ());
00143         if (rc != 0)
00144             return -1;
00145     }
00146 
00147     return 0;
00148 }
00149 
00150 zmq::fd_t zmq::ipc_listener_t::accept ()
00151 {
00152     //  Accept one connection and deal with different failure modes.
00153     zmq_assert (s != retired_fd);
00154     fd_t sock = ::accept (s, NULL, NULL);
00155     if (sock == -1) {
00156         errno_assert (errno == EAGAIN || errno == EWOULDBLOCK ||
00157             errno == EINTR || errno == ECONNABORTED || errno == EPROTO ||
00158             errno == ENOBUFS);
00159         return retired_fd;
00160     }
00161     return sock;
00162 }
00163 
00164 #endif
00165 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines