![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include <new> 00024 #include <string> 00025 #include <algorithm> 00026 00027 #include "platform.hpp" 00028 00029 #if defined ZMQ_HAVE_WINDOWS 00030 #include "windows.hpp" 00031 #if defined _MSC_VER 00032 #include <intrin.h> 00033 #endif 00034 #else 00035 #include <unistd.h> 00036 #endif 00037 00038 #include "socket_base.hpp" 00039 #include "tcp_listener.hpp" 00040 #include "ipc_listener.hpp" 00041 #include "tcp_connecter.hpp" 00042 #include "io_thread.hpp" 00043 #include "session_base.hpp" 00044 #include "config.hpp" 00045 #include "clock.hpp" 00046 #include "pipe.hpp" 00047 #include "err.hpp" 00048 #include "ctx.hpp" 00049 #include "platform.hpp" 00050 #include "likely.hpp" 00051 #include "msg.hpp" 00052 00053 #include "pair.hpp" 00054 #include "pub.hpp" 00055 #include "sub.hpp" 00056 #include "req.hpp" 00057 #include "rep.hpp" 00058 #include "pull.hpp" 00059 #include "push.hpp" 00060 #include "xreq.hpp" 00061 #include "xrep.hpp" 00062 #include "xpub.hpp" 00063 #include "xsub.hpp" 00064 00065 bool zmq::socket_base_t::check_tag () 00066 { 00067 return tag == 0xbaddecaf; 00068 } 00069 00070 zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, 00071 uint32_t tid_) 00072 { 00073 socket_base_t *s = NULL; 00074 switch (type_) { 00075 00076 case ZMQ_PAIR: 00077 s = new (std::nothrow) pair_t (parent_, tid_); 00078 break; 00079 case ZMQ_PUB: 00080 s = new (std::nothrow) pub_t (parent_, tid_); 00081 break; 00082 case ZMQ_SUB: 00083 s = new (std::nothrow) sub_t (parent_, tid_); 00084 break; 00085 case ZMQ_REQ: 00086 s = new (std::nothrow) req_t (parent_, tid_); 00087 break; 00088 case ZMQ_REP: 00089 s = new (std::nothrow) rep_t (parent_, tid_); 00090 break; 00091 case ZMQ_XREQ: 00092 s = new (std::nothrow) xreq_t (parent_, tid_); 00093 break; 00094 case ZMQ_XREP: 00095 s = new (std::nothrow) xrep_t (parent_, tid_); 00096 break; 00097 case ZMQ_PULL: 00098 s = new (std::nothrow) pull_t (parent_, tid_); 00099 break; 00100 case ZMQ_PUSH: 00101 s = new (std::nothrow) push_t (parent_, tid_); 00102 break; 00103 case ZMQ_XPUB: 00104 s = new (std::nothrow) xpub_t (parent_, tid_); 00105 break; 00106 case ZMQ_XSUB: 00107 s = new (std::nothrow) xsub_t (parent_, tid_); 00108 break; 00109 default: 00110 errno = EINVAL; 00111 return NULL; 00112 } 00113 alloc_assert (s); 00114 return s; 00115 } 00116 00117 zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) : 00118 own_t (parent_, tid_), 00119 tag (0xbaddecaf), 00120 ctx_terminated (false), 00121 destroyed (false), 00122 last_tsc (0), 00123 ticks (0), 00124 rcvmore (false) 00125 { 00126 } 00127 00128 zmq::socket_base_t::~socket_base_t () 00129 { 00130 zmq_assert (destroyed); 00131 00132 // Mark the socket as dead. 00133 tag = 0xdeadbeef; 00134 } 00135 00136 zmq::mailbox_t *zmq::socket_base_t::get_mailbox () 00137 { 00138 return &mailbox; 00139 } 00140 00141 void zmq::socket_base_t::stop () 00142 { 00143 // Called by ctx when it is terminated (zmq_term). 00144 // 'stop' command is sent from the threads that called zmq_term to 00145 // the thread owning the socket. This way, blocking call in the 00146 // owner thread can be interrupted. 00147 send_stop (); 00148 } 00149 00150 int zmq::socket_base_t::parse_uri (const char *uri_, 00151 std::string &protocol_, std::string &address_) 00152 { 00153 zmq_assert (uri_ != NULL); 00154 00155 std::string uri (uri_); 00156 std::string::size_type pos = uri.find ("://"); 00157 if (pos == std::string::npos) { 00158 errno = EINVAL; 00159 return -1; 00160 } 00161 protocol_ = uri.substr (0, pos); 00162 address_ = uri.substr (pos + 3); 00163 if (protocol_.empty () || address_.empty ()) { 00164 errno = EINVAL; 00165 return -1; 00166 } 00167 return 0; 00168 } 00169 00170 int zmq::socket_base_t::check_protocol (const std::string &protocol_) 00171 { 00172 // First check out whether the protcol is something we are aware of. 00173 if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && 00174 protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "sys") { 00175 errno = EPROTONOSUPPORT; 00176 return -1; 00177 } 00178 00179 // If 0MQ is not compiled with OpenPGM, pgm and epgm transports 00180 // are not avaialble. 00181 #if !defined ZMQ_HAVE_OPENPGM 00182 if (protocol_ == "pgm" || protocol_ == "epgm") { 00183 errno = EPROTONOSUPPORT; 00184 return -1; 00185 } 00186 #endif 00187 00188 // IPC transport is not available on Windows and OpenVMS. 00189 #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS 00190 if (protocol_ == "ipc") { 00191 // Unknown protocol. 00192 errno = EPROTONOSUPPORT; 00193 return -1; 00194 } 00195 #endif 00196 00197 // Check whether socket type and transport protocol match. 00198 // Specifically, multicast protocols can't be combined with 00199 // bi-directional messaging patterns (socket types). 00200 if ((protocol_ == "pgm" || protocol_ == "epgm") && 00201 options.type != ZMQ_PUB && options.type != ZMQ_SUB && 00202 options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) { 00203 errno = ENOCOMPATPROTO; 00204 return -1; 00205 } 00206 00207 // Protocol is available. 00208 return 0; 00209 } 00210 00211 void zmq::socket_base_t::attach_pipe (pipe_t *pipe_) 00212 { 00213 // First, register the pipe so that we can terminate it later on. 00214 pipe_->set_event_sink (this); 00215 pipes.push_back (pipe_); 00216 00217 // Let the derived socket type know about new pipe. 00218 xattach_pipe (pipe_); 00219 00220 // If the socket is already being closed, ask any new pipes to terminate 00221 // straight away. 00222 if (is_terminating ()) { 00223 register_term_acks (1); 00224 pipe_->terminate (false); 00225 } 00226 } 00227 00228 int zmq::socket_base_t::setsockopt (int option_, const void *optval_, 00229 size_t optvallen_) 00230 { 00231 if (unlikely (ctx_terminated)) { 00232 errno = ETERM; 00233 return -1; 00234 } 00235 00236 // First, check whether specific socket type overloads the option. 00237 int rc = xsetsockopt (option_, optval_, optvallen_); 00238 if (rc == 0 || errno != EINVAL) 00239 return rc; 00240 00241 // If the socket type doesn't support the option, pass it to 00242 // the generic option parser. 00243 return options.setsockopt (option_, optval_, optvallen_); 00244 } 00245 00246 int zmq::socket_base_t::getsockopt (int option_, void *optval_, 00247 size_t *optvallen_) 00248 { 00249 if (unlikely (ctx_terminated)) { 00250 errno = ETERM; 00251 return -1; 00252 } 00253 00254 if (option_ == ZMQ_RCVMORE) { 00255 if (*optvallen_ < sizeof (int)) { 00256 errno = EINVAL; 00257 return -1; 00258 } 00259 *((int*) optval_) = rcvmore ? 1 : 0; 00260 *optvallen_ = sizeof (int); 00261 return 0; 00262 } 00263 00264 if (option_ == ZMQ_FD) { 00265 if (*optvallen_ < sizeof (fd_t)) { 00266 errno = EINVAL; 00267 return -1; 00268 } 00269 *((fd_t*) optval_) = mailbox.get_fd (); 00270 *optvallen_ = sizeof (fd_t); 00271 return 0; 00272 } 00273 00274 if (option_ == ZMQ_EVENTS) { 00275 if (*optvallen_ < sizeof (int)) { 00276 errno = EINVAL; 00277 return -1; 00278 } 00279 int rc = process_commands (0, false); 00280 if (rc != 0 && (errno == EINTR || errno == ETERM)) 00281 return -1; 00282 errno_assert (rc == 0); 00283 *((int*) optval_) = 0; 00284 if (has_out ()) 00285 *((int*) optval_) |= ZMQ_POLLOUT; 00286 if (has_in ()) 00287 *((int*) optval_) |= ZMQ_POLLIN; 00288 *optvallen_ = sizeof (int); 00289 return 0; 00290 } 00291 00292 return options.getsockopt (option_, optval_, optvallen_); 00293 } 00294 00295 int zmq::socket_base_t::bind (const char *addr_) 00296 { 00297 if (unlikely (ctx_terminated)) { 00298 errno = ETERM; 00299 return -1; 00300 } 00301 00302 // Parse addr_ string. 00303 std::string protocol; 00304 std::string address; 00305 int rc = parse_uri (addr_, protocol, address); 00306 if (rc != 0) 00307 return -1; 00308 00309 rc = check_protocol (protocol); 00310 if (rc != 0) 00311 return -1; 00312 00313 if (protocol == "inproc" || protocol == "sys") { 00314 endpoint_t endpoint = {this, options}; 00315 return register_endpoint (addr_, endpoint); 00316 } 00317 00318 if (protocol == "pgm" || protocol == "epgm") { 00319 00320 // For convenience's sake, bind can be used interchageable with 00321 // connect for PGM and EPGM transports. 00322 return connect (addr_); 00323 } 00324 00325 // Remaining trasnports require to be run in an I/O thread, so at this 00326 // point we'll choose one. 00327 io_thread_t *io_thread = choose_io_thread (options.affinity); 00328 if (!io_thread) { 00329 errno = EMTHREAD; 00330 return -1; 00331 } 00332 00333 if (protocol == "tcp") { 00334 tcp_listener_t *listener = new (std::nothrow) tcp_listener_t ( 00335 io_thread, this, options); 00336 alloc_assert (listener); 00337 int rc = listener->set_address (address.c_str ()); 00338 if (rc != 0) { 00339 delete listener; 00340 return -1; 00341 } 00342 launch_child (listener); 00343 return 0; 00344 } 00345 00346 #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS 00347 if (protocol == "ipc") { 00348 ipc_listener_t *listener = new (std::nothrow) ipc_listener_t ( 00349 io_thread, this, options); 00350 alloc_assert (listener); 00351 int rc = listener->set_address (address.c_str ()); 00352 if (rc != 0) { 00353 delete listener; 00354 return -1; 00355 } 00356 launch_child (listener); 00357 return 0; 00358 } 00359 #endif 00360 00361 zmq_assert (false); 00362 return -1; 00363 } 00364 00365 int zmq::socket_base_t::connect (const char *addr_) 00366 { 00367 if (unlikely (ctx_terminated)) { 00368 errno = ETERM; 00369 return -1; 00370 } 00371 00372 // Parse addr_ string. 00373 std::string protocol; 00374 std::string address; 00375 int rc = parse_uri (addr_, protocol, address); 00376 if (rc != 0) 00377 return -1; 00378 00379 rc = check_protocol (protocol); 00380 if (rc != 0) 00381 return -1; 00382 00383 if (protocol == "inproc" || protocol == "sys") { 00384 00385 // TODO: inproc connect is specific with respect to creating pipes 00386 // as there's no 'reconnect' functionality implemented. Once that 00387 // is in place we should follow generic pipe creation algorithm. 00388 00389 // Find the peer endpoint. 00390 endpoint_t peer = find_endpoint (addr_); 00391 if (!peer.socket) 00392 return -1; 00393 00394 // The total HWM for an inproc connection should be the sum of 00395 // the binder's HWM and the connector's HWM. 00396 int sndhwm; 00397 int rcvhwm; 00398 if (options.sndhwm == 0 || peer.options.rcvhwm == 0) 00399 sndhwm = 0; 00400 else 00401 sndhwm = options.sndhwm + peer.options.rcvhwm; 00402 if (options.rcvhwm == 0 || peer.options.sndhwm == 0) 00403 rcvhwm = 0; 00404 else 00405 rcvhwm = options.rcvhwm + peer.options.sndhwm; 00406 00407 // Create a bi-directional pipe to connect the peers. 00408 object_t *parents [2] = {this, peer.socket}; 00409 pipe_t *pipes [2] = {NULL, NULL}; 00410 int hwms [2] = {sndhwm, rcvhwm}; 00411 bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; 00412 int rc = pipepair (parents, pipes, hwms, delays); 00413 errno_assert (rc == 0); 00414 00415 // Attach local end of the pipe to this socket object. 00416 attach_pipe (pipes [0]); 00417 00418 // If required, send the identity of the local socket to the peer. 00419 if (options.send_identity) { 00420 msg_t id; 00421 rc = id.init_size (options.identity_size); 00422 zmq_assert (rc == 0); 00423 memcpy (id.data (), options.identity, options.identity_size); 00424 id.set_flags (msg_t::identity); 00425 bool written = pipes [0]->write (&id); 00426 zmq_assert (written); 00427 } 00428 00429 // Attach remote end of the pipe to the peer socket. Note that peer's 00430 // seqnum was incremented in find_endpoint function. We don't need it 00431 // increased here. 00432 send_bind (peer.socket, pipes [1], false); 00433 00434 return 0; 00435 } 00436 00437 // Choose the I/O thread to run the session in. 00438 io_thread_t *io_thread = choose_io_thread (options.affinity); 00439 if (!io_thread) { 00440 errno = EMTHREAD; 00441 return -1; 00442 } 00443 00444 // Create session. 00445 session_base_t *session = session_base_t::create (io_thread, true, this, 00446 options, protocol.c_str (), address.c_str ()); 00447 errno_assert (session); 00448 00449 // Create a bi-directional pipe. 00450 object_t *parents [2] = {this, session}; 00451 pipe_t *pipes [2] = {NULL, NULL}; 00452 int hwms [2] = {options.sndhwm, options.rcvhwm}; 00453 bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; 00454 rc = pipepair (parents, pipes, hwms, delays); 00455 errno_assert (rc == 0); 00456 00457 // Attach local end of the pipe to the socket object. 00458 attach_pipe (pipes [0]); 00459 00460 // Attach remote end of the pipe to the session object later on. 00461 session->attach_pipe (pipes [1]); 00462 00463 // Activate the session. Make it a child of this socket. 00464 launch_child (session); 00465 00466 return 0; 00467 } 00468 00469 int zmq::socket_base_t::send (msg_t *msg_, int flags_) 00470 { 00471 // Check whether the library haven't been shut down yet. 00472 if (unlikely (ctx_terminated)) { 00473 errno = ETERM; 00474 return -1; 00475 } 00476 00477 // Check whether message passed to the function is valid. 00478 if (unlikely (!msg_->check ())) { 00479 errno = EFAULT; 00480 return -1; 00481 } 00482 00483 // Process pending commands, if any. 00484 int rc = process_commands (0, true); 00485 if (unlikely (rc != 0)) 00486 return -1; 00487 00488 // Clear any user-visible flags that are set on the message. 00489 msg_->reset_flags (msg_t::more); 00490 00491 // At this point we impose the flags on the message. 00492 if (flags_ & ZMQ_SNDMORE) 00493 msg_->set_flags (msg_t::more); 00494 00495 // Try to send the message. 00496 rc = xsend (msg_, flags_); 00497 if (rc == 0) 00498 return 0; 00499 if (unlikely (errno != EAGAIN)) 00500 return -1; 00501 00502 // In case of non-blocking send we'll simply propagate 00503 // the error - including EAGAIN - up the stack. 00504 if (flags_ & ZMQ_DONTWAIT || options.sndtimeo == 0) 00505 return -1; 00506 00507 // Compute the time when the timeout should occur. 00508 // If the timeout is infite, don't care. 00509 clock_t clock ; 00510 int timeout = options.sndtimeo; 00511 uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); 00512 00513 // Oops, we couldn't send the message. Wait for the next 00514 // command, process it and try to send the message again. 00515 // If timeout is reached in the meantime, return EAGAIN. 00516 while (true) { 00517 if (unlikely (process_commands (timeout, false) != 0)) 00518 return -1; 00519 rc = xsend (msg_, flags_); 00520 if (rc == 0) 00521 break; 00522 if (unlikely (errno != EAGAIN)) 00523 return -1; 00524 if (timeout > 0) { 00525 timeout = (int) (end - clock.now_ms ()); 00526 if (timeout <= 0) { 00527 errno = EAGAIN; 00528 return -1; 00529 } 00530 } 00531 } 00532 return 0; 00533 } 00534 00535 int zmq::socket_base_t::recv (msg_t *msg_, int flags_) 00536 { 00537 // Check whether the library haven't been shut down yet. 00538 if (unlikely (ctx_terminated)) { 00539 errno = ETERM; 00540 return -1; 00541 } 00542 00543 // Check whether message passed to the function is valid. 00544 if (unlikely (!msg_->check ())) { 00545 errno = EFAULT; 00546 return -1; 00547 } 00548 00549 // Get the message. 00550 int rc = xrecv (msg_, flags_); 00551 if (unlikely (rc != 0 && errno != EAGAIN)) 00552 return -1; 00553 00554 // Once every inbound_poll_rate messages check for signals and process 00555 // incoming commands. This happens only if we are not polling altogether 00556 // because there are messages available all the time. If poll occurs, 00557 // ticks is set to zero and thus we avoid this code. 00558 // 00559 // Note that 'recv' uses different command throttling algorithm (the one 00560 // described above) from the one used by 'send'. This is because counting 00561 // ticks is more efficient than doing RDTSC all the time. 00562 if (++ticks == inbound_poll_rate) { 00563 if (unlikely (process_commands (0, false) != 0)) 00564 return -1; 00565 ticks = 0; 00566 } 00567 00568 // If we have the message, return immediately. 00569 if (rc == 0) { 00570 extract_flags (msg_); 00571 return 0; 00572 } 00573 00574 // If the message cannot be fetched immediately, there are two scenarios. 00575 // For non-blocking recv, commands are processed in case there's an 00576 // activate_reader command already waiting int a command pipe. 00577 // If it's not, return EAGAIN. 00578 if (flags_ & ZMQ_DONTWAIT || options.rcvtimeo == 0) { 00579 if (unlikely (process_commands (0, false) != 0)) 00580 return -1; 00581 ticks = 0; 00582 00583 rc = xrecv (msg_, flags_); 00584 if (rc < 0) 00585 return rc; 00586 extract_flags (msg_); 00587 return 0; 00588 } 00589 00590 // Compute the time when the timeout should occur. 00591 // If the timeout is infite, don't care. 00592 clock_t clock ; 00593 int timeout = options.rcvtimeo; 00594 uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout); 00595 00596 // In blocking scenario, commands are processed over and over again until 00597 // we are able to fetch a message. 00598 bool block = (ticks != 0); 00599 while (true) { 00600 if (unlikely (process_commands (block ? timeout : 0, false) != 0)) 00601 return -1; 00602 rc = xrecv (msg_, flags_); 00603 if (rc == 0) { 00604 ticks = 0; 00605 break; 00606 } 00607 if (unlikely (errno != EAGAIN)) 00608 return -1; 00609 block = true; 00610 if (timeout > 0) { 00611 timeout = (int) (end - clock.now_ms ()); 00612 if (timeout <= 0) { 00613 errno = EAGAIN; 00614 return -1; 00615 } 00616 } 00617 } 00618 00619 extract_flags (msg_); 00620 return 0; 00621 } 00622 00623 int zmq::socket_base_t::close () 00624 { 00625 // Transfer the ownership of the socket from this application thread 00626 // to the reaper thread which will take care of the rest of shutdown 00627 // process. 00628 send_reap (this); 00629 00630 return 0; 00631 } 00632 00633 bool zmq::socket_base_t::has_in () 00634 { 00635 return xhas_in (); 00636 } 00637 00638 bool zmq::socket_base_t::has_out () 00639 { 00640 return xhas_out (); 00641 } 00642 00643 void zmq::socket_base_t::start_reaping (poller_t *poller_) 00644 { 00645 // Plug the socket to the reaper thread. 00646 poller = poller_; 00647 handle = poller->add_fd (mailbox.get_fd (), this); 00648 poller->set_pollin (handle); 00649 00650 // Initialise the termination and check whether it can be deallocated 00651 // immediately. 00652 terminate (); 00653 check_destroy (); 00654 } 00655 00656 int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) 00657 { 00658 int rc; 00659 command_t cmd; 00660 if (timeout_ != 0) { 00661 00662 // If we are asked to wait, simply ask mailbox to wait. 00663 rc = mailbox.recv (&cmd, timeout_); 00664 } 00665 else { 00666 00667 // If we are asked not to wait, check whether we haven't processed 00668 // commands recently, so that we can throttle the new commands. 00669 00670 // Get the CPU's tick counter. If 0, the counter is not available. 00671 uint64_t tsc = zmq::clock_t::rdtsc (); 00672 00673 // Optimised version of command processing - it doesn't have to check 00674 // for incoming commands each time. It does so only if certain time 00675 // elapsed since last command processing. Command delay varies 00676 // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU 00677 // etc. The optimisation makes sense only on platforms where getting 00678 // a timestamp is a very cheap operation (tens of nanoseconds). 00679 if (tsc && throttle_) { 00680 00681 // Check whether TSC haven't jumped backwards (in case of migration 00682 // between CPU cores) and whether certain time have elapsed since 00683 // last command processing. If it didn't do nothing. 00684 if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay) 00685 return 0; 00686 last_tsc = tsc; 00687 } 00688 00689 // Check whether there are any commands pending for this thread. 00690 rc = mailbox.recv (&cmd, 0); 00691 } 00692 00693 // Process all the commands available at the moment. 00694 while (true) { 00695 if (rc == -1 && errno == EAGAIN) 00696 break; 00697 if (rc == -1 && errno == EINTR) 00698 return -1; 00699 errno_assert (rc == 0); 00700 cmd.destination->process_command (cmd); 00701 rc = mailbox.recv (&cmd, 0); 00702 } 00703 00704 if (ctx_terminated) { 00705 errno = ETERM; 00706 return -1; 00707 } 00708 00709 return 0; 00710 } 00711 00712 void zmq::socket_base_t::process_stop () 00713 { 00714 // Here, someone have called zmq_term while the socket was still alive. 00715 // We'll remember the fact so that any blocking call is interrupted and any 00716 // further attempt to use the socket will return ETERM. The user is still 00717 // responsible for calling zmq_close on the socket though! 00718 ctx_terminated = true; 00719 } 00720 00721 void zmq::socket_base_t::process_bind (pipe_t *pipe_) 00722 { 00723 attach_pipe (pipe_); 00724 } 00725 00726 void zmq::socket_base_t::process_unplug () 00727 { 00728 } 00729 00730 void zmq::socket_base_t::process_term (int linger_) 00731 { 00732 // Unregister all inproc endpoints associated with this socket. 00733 // Doing this we make sure that no new pipes from other sockets (inproc) 00734 // will be initiated. 00735 unregister_endpoints (this); 00736 00737 // Ask all attached pipes to terminate. 00738 for (pipes_t::size_type i = 0; i != pipes.size (); ++i) 00739 pipes [i]->terminate (false); 00740 register_term_acks ((int) pipes.size ()); 00741 00742 // Continue the termination process immediately. 00743 own_t::process_term (linger_); 00744 } 00745 00746 void zmq::socket_base_t::process_destroy () 00747 { 00748 destroyed = true; 00749 } 00750 00751 int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, 00752 size_t optvallen_) 00753 { 00754 errno = EINVAL; 00755 return -1; 00756 } 00757 00758 bool zmq::socket_base_t::xhas_out () 00759 { 00760 return false; 00761 } 00762 00763 int zmq::socket_base_t::xsend (msg_t *msg_, int flags_) 00764 { 00765 errno = ENOTSUP; 00766 return -1; 00767 } 00768 00769 bool zmq::socket_base_t::xhas_in () 00770 { 00771 return false; 00772 } 00773 00774 int zmq::socket_base_t::xrecv (msg_t *msg_, int flags_) 00775 { 00776 errno = ENOTSUP; 00777 return -1; 00778 } 00779 00780 void zmq::socket_base_t::xread_activated (pipe_t *pipe_) 00781 { 00782 zmq_assert (false); 00783 } 00784 void zmq::socket_base_t::xwrite_activated (pipe_t *pipe_) 00785 { 00786 zmq_assert (false); 00787 } 00788 00789 void zmq::socket_base_t::xhiccuped (pipe_t *pipe_) 00790 { 00791 zmq_assert (false); 00792 } 00793 00794 void zmq::socket_base_t::in_event () 00795 { 00796 // This function is invoked only once the socket is running in the context 00797 // of the reaper thread. Process any commands from other threads/sockets 00798 // that may be available at the moment. Ultimately, the socket will 00799 // be destroyed. 00800 process_commands (0, false); 00801 check_destroy (); 00802 } 00803 00804 void zmq::socket_base_t::out_event () 00805 { 00806 zmq_assert (false); 00807 } 00808 00809 void zmq::socket_base_t::timer_event (int id_) 00810 { 00811 zmq_assert (false); 00812 } 00813 00814 void zmq::socket_base_t::check_destroy () 00815 { 00816 // If the object was already marked as destroyed, finish the deallocation. 00817 if (destroyed) { 00818 00819 // Remove the socket from the reaper's poller. 00820 poller->rm_fd (handle); 00821 00822 // Remove the socket from the context. 00823 destroy_socket (this); 00824 00825 // Notify the reaper about the fact. 00826 send_reaped (); 00827 00828 // Deallocate. 00829 own_t::process_destroy (); 00830 } 00831 } 00832 00833 void zmq::socket_base_t::read_activated (pipe_t *pipe_) 00834 { 00835 xread_activated (pipe_); 00836 } 00837 00838 void zmq::socket_base_t::write_activated (pipe_t *pipe_) 00839 { 00840 xwrite_activated (pipe_); 00841 } 00842 00843 void zmq::socket_base_t::hiccuped (pipe_t *pipe_) 00844 { 00845 xhiccuped (pipe_); 00846 } 00847 00848 void zmq::socket_base_t::terminated (pipe_t *pipe_) 00849 { 00850 // Notify the specific socket type about the pipe termination. 00851 xterminated (pipe_); 00852 00853 // Remove the pipe from the list of attached pipes and confirm its 00854 // termination if we are already shutting down. 00855 pipes.erase (pipe_); 00856 if (is_terminating ()) 00857 unregister_term_ack (); 00858 } 00859 00860 void zmq::socket_base_t::extract_flags (msg_t *msg_) 00861 { 00862 // Test whether IDENTITY flag is valid for this socket type. 00863 if (unlikely (msg_->flags () & msg_t::identity)) 00864 zmq_assert (options.recv_identity); 00865 00866 // Remove MORE flag. 00867 rcvmore = msg_->flags () & msg_t::more ? true : false; 00868 } 00869