![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "platform.hpp" 00023 #if defined ZMQ_HAVE_WINDOWS 00024 #include "windows.hpp" 00025 #else 00026 #include <unistd.h> 00027 #include <sys/socket.h> 00028 #include <arpa/inet.h> 00029 #include <netinet/tcp.h> 00030 #include <netinet/in.h> 00031 #include <netdb.h> 00032 #include <fcntl.h> 00033 #endif 00034 00035 #include <string.h> 00036 #include <new> 00037 00038 #include "stream_engine.hpp" 00039 #include "io_thread.hpp" 00040 #include "session_base.hpp" 00041 #include "config.hpp" 00042 #include "err.hpp" 00043 #include "ip.hpp" 00044 00045 zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : 00046 s (fd_), 00047 inpos (NULL), 00048 insize (0), 00049 decoder (in_batch_size, options_.maxmsgsize), 00050 outpos (NULL), 00051 outsize (0), 00052 encoder (out_batch_size), 00053 session (NULL), 00054 leftover_session (NULL), 00055 options (options_), 00056 plugged (false) 00057 { 00058 // Get the socket into non-blocking mode. 00059 unblock_socket (s); 00060 00061 // Set the socket buffer limits for the underlying socket. 00062 if (options.sndbuf) { 00063 int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, 00064 (char*) &options.sndbuf, sizeof (int)); 00065 #ifdef ZMQ_HAVE_WINDOWS 00066 wsa_assert (rc != SOCKET_ERROR); 00067 #else 00068 errno_assert (rc == 0); 00069 #endif 00070 } 00071 if (options.rcvbuf) { 00072 int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, 00073 (char*) &options.rcvbuf, sizeof (int)); 00074 #ifdef ZMQ_HAVE_WINDOWS 00075 wsa_assert (rc != SOCKET_ERROR); 00076 #else 00077 errno_assert (rc == 0); 00078 #endif 00079 } 00080 00081 #if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD 00082 // Make sure that SIGPIPE signal is not generated when writing to a 00083 // connection that was already closed by the peer. 00084 int set = 1; 00085 int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); 00086 errno_assert (rc == 0); 00087 #endif 00088 } 00089 00090 zmq::stream_engine_t::~stream_engine_t () 00091 { 00092 zmq_assert (!plugged); 00093 00094 if (s != retired_fd) { 00095 #ifdef ZMQ_HAVE_WINDOWS 00096 int rc = closesocket (s); 00097 wsa_assert (rc != SOCKET_ERROR); 00098 #else 00099 int rc = close (s); 00100 errno_assert (rc == 0); 00101 #endif 00102 s = retired_fd; 00103 } 00104 } 00105 00106 void zmq::stream_engine_t::plug (io_thread_t *io_thread_, 00107 session_base_t *session_) 00108 { 00109 zmq_assert (!plugged); 00110 plugged = true; 00111 leftover_session = NULL; 00112 00113 // Connect to session object. 00114 zmq_assert (!session); 00115 zmq_assert (session_); 00116 encoder.set_session (session_); 00117 decoder.set_session (session_); 00118 session = session_; 00119 00120 // Connect to I/O threads poller object. 00121 io_object_t::plug (io_thread_); 00122 handle = add_fd (s); 00123 set_pollin (handle); 00124 set_pollout (handle); 00125 00126 // Flush all the data that may have been already received downstream. 00127 in_event (); 00128 } 00129 00130 void zmq::stream_engine_t::unplug () 00131 { 00132 zmq_assert (plugged); 00133 plugged = false; 00134 00135 // Cancel all fd subscriptions. 00136 rm_fd (handle); 00137 00138 // Disconnect from I/O threads poller object. 00139 io_object_t::unplug (); 00140 00141 // Disconnect from session object. 00142 encoder.set_session (NULL); 00143 decoder.set_session (NULL); 00144 leftover_session = session; 00145 session = NULL; 00146 } 00147 00148 void zmq::stream_engine_t::terminate () 00149 { 00150 unplug (); 00151 delete this; 00152 } 00153 00154 void zmq::stream_engine_t::in_event () 00155 { 00156 bool disconnection = false; 00157 00158 // If there's no data to process in the buffer... 00159 if (!insize) { 00160 00161 // Retrieve the buffer and read as much data as possible. 00162 // Note that buffer can be arbitrarily large. However, we assume 00163 // the underlying TCP layer has fixed buffer size and thus the 00164 // number of bytes read will be always limited. 00165 decoder.get_buffer (&inpos, &insize); 00166 insize = read (inpos, insize); 00167 00168 // Check whether the peer has closed the connection. 00169 if (insize == (size_t) -1) { 00170 insize = 0; 00171 disconnection = true; 00172 } 00173 } 00174 00175 // Push the data to the decoder. 00176 size_t processed = decoder.process_buffer (inpos, insize); 00177 00178 if (unlikely (processed == (size_t) -1)) { 00179 disconnection = true; 00180 } 00181 else { 00182 00183 // Stop polling for input if we got stuck. 00184 if (processed < insize) { 00185 00186 // This may happen if queue limits are in effect. 00187 if (plugged) 00188 reset_pollin (handle); 00189 } 00190 00191 // Adjust the buffer. 00192 inpos += processed; 00193 insize -= processed; 00194 } 00195 00196 // Flush all messages the decoder may have produced. 00197 // If IO handler has unplugged engine, flush transient IO handler. 00198 if (unlikely (!plugged)) { 00199 zmq_assert (leftover_session); 00200 leftover_session->flush (); 00201 } else { 00202 session->flush (); 00203 } 00204 00205 if (session && disconnection) 00206 error (); 00207 } 00208 00209 void zmq::stream_engine_t::out_event () 00210 { 00211 // If write buffer is empty, try to read new data from the encoder. 00212 if (!outsize) { 00213 00214 outpos = NULL; 00215 encoder.get_data (&outpos, &outsize); 00216 00217 // If IO handler has unplugged engine, flush transient IO handler. 00218 if (unlikely (!plugged)) { 00219 zmq_assert (leftover_session); 00220 leftover_session->flush (); 00221 return; 00222 } 00223 00224 // If there is no data to send, stop polling for output. 00225 if (outsize == 0) { 00226 reset_pollout (handle); 00227 return; 00228 } 00229 } 00230 00231 // If there are any data to write in write buffer, write as much as 00232 // possible to the socket. Note that amount of data to write can be 00233 // arbitratily large. However, we assume that underlying TCP layer has 00234 // limited transmission buffer and thus the actual number of bytes 00235 // written should be reasonably modest. 00236 int nbytes = write (outpos, outsize); 00237 00238 // Handle problems with the connection. 00239 if (nbytes == -1) { 00240 error (); 00241 return; 00242 } 00243 00244 outpos += nbytes; 00245 outsize -= nbytes; 00246 } 00247 00248 void zmq::stream_engine_t::activate_out () 00249 { 00250 set_pollout (handle); 00251 00252 // Speculative write: The assumption is that at the moment new message 00253 // was sent by the user the socket is probably available for writing. 00254 // Thus we try to write the data to socket avoiding polling for POLLOUT. 00255 // Consequently, the latency should be better in request/reply scenarios. 00256 out_event (); 00257 } 00258 00259 void zmq::stream_engine_t::activate_in () 00260 { 00261 set_pollin (handle); 00262 00263 // Speculative read. 00264 in_event (); 00265 } 00266 00267 void zmq::stream_engine_t::error () 00268 { 00269 zmq_assert (session); 00270 session->detach (); 00271 unplug (); 00272 delete this; 00273 } 00274 00275 int zmq::stream_engine_t::write (const void *data_, size_t size_) 00276 { 00277 #ifdef ZMQ_HAVE_WINDOWS 00278 00279 int nbytes = send (s, (char*) data_, (int) size_, 0); 00280 00281 // If not a single byte can be written to the socket in non-blocking mode 00282 // we'll get an error (this may happen during the speculative write). 00283 if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) 00284 return 0; 00285 00286 // Signalise peer failure. 00287 if (nbytes == -1 && ( 00288 WSAGetLastError () == WSAENETDOWN || 00289 WSAGetLastError () == WSAENETRESET || 00290 WSAGetLastError () == WSAEHOSTUNREACH || 00291 WSAGetLastError () == WSAECONNABORTED || 00292 WSAGetLastError () == WSAETIMEDOUT || 00293 WSAGetLastError () == WSAECONNRESET)) 00294 return -1; 00295 00296 wsa_assert (nbytes != SOCKET_ERROR); 00297 return (size_t) nbytes; 00298 00299 #else 00300 00301 ssize_t nbytes = send (s, data_, size_, 0); 00302 00303 // Several errors are OK. When speculative write is being done we may not 00304 // be able to write a single byte from the socket. Also, SIGSTOP issued 00305 // by a debugging tool can result in EINTR error. 00306 if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || 00307 errno == EINTR)) 00308 return 0; 00309 00310 // Signalise peer failure. 00311 if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) 00312 return -1; 00313 00314 errno_assert (nbytes != -1); 00315 return (size_t) nbytes; 00316 00317 #endif 00318 } 00319 00320 int zmq::stream_engine_t::read (void *data_, size_t size_) 00321 { 00322 #ifdef ZMQ_HAVE_WINDOWS 00323 00324 int nbytes = recv (s, (char*) data_, (int) size_, 0); 00325 00326 // If not a single byte can be read from the socket in non-blocking mode 00327 // we'll get an error (this may happen during the speculative read). 00328 if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) 00329 return 0; 00330 00331 // Connection failure. 00332 if (nbytes == -1 && ( 00333 WSAGetLastError () == WSAENETDOWN || 00334 WSAGetLastError () == WSAENETRESET || 00335 WSAGetLastError () == WSAECONNABORTED || 00336 WSAGetLastError () == WSAETIMEDOUT || 00337 WSAGetLastError () == WSAECONNRESET || 00338 WSAGetLastError () == WSAECONNREFUSED || 00339 WSAGetLastError () == WSAENOTCONN)) 00340 return -1; 00341 00342 wsa_assert (nbytes != SOCKET_ERROR); 00343 00344 // Orderly shutdown by the other peer. 00345 if (nbytes == 0) 00346 return -1; 00347 00348 return (size_t) nbytes; 00349 00350 #else 00351 00352 ssize_t nbytes = recv (s, data_, size_, 0); 00353 00354 // Several errors are OK. When speculative read is being done we may not 00355 // be able to read a single byte from the socket. Also, SIGSTOP issued 00356 // by a debugging tool can result in EINTR error. 00357 if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || 00358 errno == EINTR)) 00359 return 0; 00360 00361 // Signalise peer failure. 00362 if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED || 00363 errno == ETIMEDOUT || errno == EHOSTUNREACH)) 00364 return -1; 00365 00366 errno_assert (nbytes != -1); 00367 00368 // Orderly shutdown by the peer. 00369 if (nbytes == 0) 00370 return -1; 00371 00372 return (size_t) nbytes; 00373 00374 #endif 00375 } 00376