libzmq master
The Intelligent Transport Layer

stream_engine.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include "platform.hpp"
00023 #if defined ZMQ_HAVE_WINDOWS
00024 #include "windows.hpp"
00025 #else
00026 #include <unistd.h>
00027 #include <sys/socket.h>
00028 #include <arpa/inet.h>
00029 #include <netinet/tcp.h>
00030 #include <netinet/in.h>
00031 #include <netdb.h>
00032 #include <fcntl.h>
00033 #endif
00034 
00035 #include <string.h>
00036 #include <new>
00037 
00038 #include "stream_engine.hpp"
00039 #include "io_thread.hpp"
00040 #include "session_base.hpp"
00041 #include "config.hpp"
00042 #include "err.hpp"
00043 #include "ip.hpp"
00044 
00045 zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
00046     s (fd_),
00047     inpos (NULL),
00048     insize (0),
00049     decoder (in_batch_size, options_.maxmsgsize),
00050     outpos (NULL),
00051     outsize (0),
00052     encoder (out_batch_size),
00053     session (NULL),
00054     leftover_session (NULL),
00055     options (options_),
00056     plugged (false)
00057 {
00058     //  Get the socket into non-blocking mode.
00059     unblock_socket (s);
00060 
00061     //  Set the socket buffer limits for the underlying socket.
00062     if (options.sndbuf) {
00063         int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF,
00064             (char*) &options.sndbuf, sizeof (int));
00065 #ifdef ZMQ_HAVE_WINDOWS
00066         wsa_assert (rc != SOCKET_ERROR);
00067 #else
00068         errno_assert (rc == 0);
00069 #endif
00070     }
00071     if (options.rcvbuf) {
00072         int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF,
00073             (char*) &options.rcvbuf, sizeof (int));
00074 #ifdef ZMQ_HAVE_WINDOWS
00075         wsa_assert (rc != SOCKET_ERROR);
00076 #else
00077         errno_assert (rc == 0);
00078 #endif
00079     }
00080 
00081 #if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD
00082     //  Make sure that SIGPIPE signal is not generated when writing to a
00083     //  connection that was already closed by the peer.
00084     int set = 1;
00085     int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
00086     errno_assert (rc == 0);
00087 #endif
00088 }
00089 
00090 zmq::stream_engine_t::~stream_engine_t ()
00091 {
00092     zmq_assert (!plugged);
00093 
00094     if (s != retired_fd) {
00095 #ifdef ZMQ_HAVE_WINDOWS
00096         int rc = closesocket (s);
00097         wsa_assert (rc != SOCKET_ERROR);
00098 #else
00099         int rc = close (s);
00100         errno_assert (rc == 0);
00101 #endif
00102         s = retired_fd;
00103     }
00104 }
00105 
00106 void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
00107     session_base_t *session_)
00108 {
00109     zmq_assert (!plugged);
00110     plugged = true;
00111     leftover_session = NULL;
00112 
00113     //  Connect to session object.
00114     zmq_assert (!session);
00115     zmq_assert (session_);
00116     encoder.set_session (session_);
00117     decoder.set_session (session_);
00118     session = session_;
00119 
00120     //  Connect to I/O threads poller object.
00121     io_object_t::plug (io_thread_);
00122     handle = add_fd (s);
00123     set_pollin (handle);
00124     set_pollout (handle);
00125 
00126     //  Flush all the data that may have been already received downstream.
00127     in_event ();
00128 }
00129 
00130 void zmq::stream_engine_t::unplug ()
00131 {
00132     zmq_assert (plugged);
00133     plugged = false;
00134 
00135     //  Cancel all fd subscriptions.
00136     rm_fd (handle);
00137 
00138     //  Disconnect from I/O threads poller object.
00139     io_object_t::unplug ();
00140 
00141     //  Disconnect from session object.
00142     encoder.set_session (NULL);
00143     decoder.set_session (NULL);
00144     leftover_session = session;
00145     session = NULL;
00146 }
00147 
00148 void zmq::stream_engine_t::terminate ()
00149 {
00150     unplug ();
00151     delete this;
00152 }
00153 
00154 void zmq::stream_engine_t::in_event ()
00155 {
00156     bool disconnection = false;
00157 
00158     //  If there's no data to process in the buffer...
00159     if (!insize) {
00160 
00161         //  Retrieve the buffer and read as much data as possible.
00162         //  Note that buffer can be arbitrarily large. However, we assume
00163         //  the underlying TCP layer has fixed buffer size and thus the
00164         //  number of bytes read will be always limited.
00165         decoder.get_buffer (&inpos, &insize);
00166         insize = read (inpos, insize);
00167 
00168         //  Check whether the peer has closed the connection.
00169         if (insize == (size_t) -1) {
00170             insize = 0;
00171             disconnection = true;
00172         }
00173     }
00174 
00175     //  Push the data to the decoder.
00176     size_t processed = decoder.process_buffer (inpos, insize);
00177 
00178     if (unlikely (processed == (size_t) -1)) {
00179         disconnection = true;
00180     }
00181     else {
00182 
00183         //  Stop polling for input if we got stuck.
00184         if (processed < insize) {
00185 
00186             //  This may happen if queue limits are in effect.
00187             if (plugged)
00188                 reset_pollin (handle);
00189         }
00190 
00191         //  Adjust the buffer.
00192         inpos += processed;
00193         insize -= processed;
00194     }
00195 
00196     //  Flush all messages the decoder may have produced.
00197     //  If IO handler has unplugged engine, flush transient IO handler.
00198     if (unlikely (!plugged)) {
00199         zmq_assert (leftover_session);
00200         leftover_session->flush ();
00201     } else {
00202         session->flush ();
00203     }
00204 
00205     if (session && disconnection)
00206         error ();
00207 }
00208 
00209 void zmq::stream_engine_t::out_event ()
00210 {
00211     //  If write buffer is empty, try to read new data from the encoder.
00212     if (!outsize) {
00213 
00214         outpos = NULL;
00215         encoder.get_data (&outpos, &outsize);
00216 
00217         //  If IO handler has unplugged engine, flush transient IO handler.
00218         if (unlikely (!plugged)) {
00219             zmq_assert (leftover_session);
00220             leftover_session->flush ();
00221             return;
00222         }
00223 
00224         //  If there is no data to send, stop polling for output.
00225         if (outsize == 0) {
00226             reset_pollout (handle);
00227             return;
00228         }
00229     }
00230 
00231     //  If there are any data to write in write buffer, write as much as
00232     //  possible to the socket. Note that amount of data to write can be
00233     //  arbitratily large. However, we assume that underlying TCP layer has
00234     //  limited transmission buffer and thus the actual number of bytes
00235     //  written should be reasonably modest.
00236     int nbytes = write (outpos, outsize);
00237 
00238     //  Handle problems with the connection.
00239     if (nbytes == -1) {
00240         error ();
00241         return;
00242     }
00243 
00244     outpos += nbytes;
00245     outsize -= nbytes;
00246 }
00247 
00248 void zmq::stream_engine_t::activate_out ()
00249 {
00250     set_pollout (handle);
00251 
00252     //  Speculative write: The assumption is that at the moment new message
00253     //  was sent by the user the socket is probably available for writing.
00254     //  Thus we try to write the data to socket avoiding polling for POLLOUT.
00255     //  Consequently, the latency should be better in request/reply scenarios.
00256     out_event ();
00257 }
00258 
00259 void zmq::stream_engine_t::activate_in ()
00260 {
00261     set_pollin (handle);
00262 
00263     //  Speculative read.
00264     in_event ();
00265 }
00266 
00267 void zmq::stream_engine_t::error ()
00268 {
00269     zmq_assert (session);
00270     session->detach ();
00271     unplug ();
00272     delete this;
00273 }
00274 
00275 int zmq::stream_engine_t::write (const void *data_, size_t size_)
00276 {
00277 #ifdef ZMQ_HAVE_WINDOWS
00278 
00279     int nbytes = send (s, (char*) data_, (int) size_, 0);
00280 
00281     //  If not a single byte can be written to the socket in non-blocking mode
00282     //  we'll get an error (this may happen during the speculative write).
00283     if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
00284         return 0;
00285         
00286     //  Signalise peer failure.
00287     if (nbytes == -1 && (
00288           WSAGetLastError () == WSAENETDOWN ||
00289           WSAGetLastError () == WSAENETRESET ||
00290           WSAGetLastError () == WSAEHOSTUNREACH ||
00291           WSAGetLastError () == WSAECONNABORTED ||
00292           WSAGetLastError () == WSAETIMEDOUT ||
00293           WSAGetLastError () == WSAECONNRESET))
00294         return -1;
00295 
00296     wsa_assert (nbytes != SOCKET_ERROR);
00297     return (size_t) nbytes;
00298 
00299 #else
00300 
00301     ssize_t nbytes = send (s, data_, size_, 0);
00302 
00303     //  Several errors are OK. When speculative write is being done we may not
00304     //  be able to write a single byte from the socket. Also, SIGSTOP issued
00305     //  by a debugging tool can result in EINTR error.
00306     if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
00307           errno == EINTR))
00308         return 0;
00309 
00310     //  Signalise peer failure.
00311     if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE))
00312         return -1;
00313 
00314     errno_assert (nbytes != -1);
00315     return (size_t) nbytes;
00316 
00317 #endif
00318 }
00319 
00320 int zmq::stream_engine_t::read (void *data_, size_t size_)
00321 {
00322 #ifdef ZMQ_HAVE_WINDOWS
00323 
00324     int nbytes = recv (s, (char*) data_, (int) size_, 0);
00325 
00326     //  If not a single byte can be read from the socket in non-blocking mode
00327     //  we'll get an error (this may happen during the speculative read).
00328     if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
00329         return 0;
00330 
00331     //  Connection failure.
00332     if (nbytes == -1 && (
00333           WSAGetLastError () == WSAENETDOWN ||
00334           WSAGetLastError () == WSAENETRESET ||
00335           WSAGetLastError () == WSAECONNABORTED ||
00336           WSAGetLastError () == WSAETIMEDOUT ||
00337           WSAGetLastError () == WSAECONNRESET ||
00338           WSAGetLastError () == WSAECONNREFUSED ||
00339           WSAGetLastError () == WSAENOTCONN))
00340         return -1;
00341 
00342     wsa_assert (nbytes != SOCKET_ERROR);
00343 
00344     //  Orderly shutdown by the other peer.
00345     if (nbytes == 0)
00346         return -1; 
00347 
00348     return (size_t) nbytes;
00349 
00350 #else
00351 
00352     ssize_t nbytes = recv (s, data_, size_, 0);
00353 
00354     //  Several errors are OK. When speculative read is being done we may not
00355     //  be able to read a single byte from the socket. Also, SIGSTOP issued
00356     //  by a debugging tool can result in EINTR error.
00357     if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
00358           errno == EINTR))
00359         return 0;
00360 
00361     //  Signalise peer failure.
00362     if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED ||
00363           errno == ETIMEDOUT || errno == EHOSTUNREACH))
00364         return -1;
00365 
00366     errno_assert (nbytes != -1);
00367 
00368     //  Orderly shutdown by the peer.
00369     if (nbytes == 0)
00370         return -1;
00371 
00372     return (size_t) nbytes;
00373 
00374 #endif
00375 }
00376 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines