![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "pair.hpp" 00023 #include "err.hpp" 00024 #include "pipe.hpp" 00025 #include "msg.hpp" 00026 00027 zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : 00028 socket_base_t (parent_, tid_), 00029 pipe (NULL) 00030 { 00031 options.type = ZMQ_PAIR; 00032 } 00033 00034 zmq::pair_t::~pair_t () 00035 { 00036 zmq_assert (!pipe); 00037 } 00038 00039 void zmq::pair_t::xattach_pipe (pipe_t *pipe_) 00040 { 00041 zmq_assert (!pipe); 00042 pipe = pipe_; 00043 } 00044 00045 void zmq::pair_t::xterminated (pipe_t *pipe_) 00046 { 00047 zmq_assert (pipe_ == pipe); 00048 pipe = NULL; 00049 } 00050 00051 void zmq::pair_t::xread_activated (pipe_t *pipe_) 00052 { 00053 // There's just one pipe. No lists of active and inactive pipes. 00054 // There's nothing to do here. 00055 } 00056 00057 void zmq::pair_t::xwrite_activated (pipe_t *pipe_) 00058 { 00059 // There's just one pipe. No lists of active and inactive pipes. 00060 // There's nothing to do here. 00061 } 00062 00063 int zmq::pair_t::xsend (msg_t *msg_, int flags_) 00064 { 00065 if (!pipe || !pipe->write (msg_)) { 00066 errno = EAGAIN; 00067 return -1; 00068 } 00069 00070 if (!(flags_ & ZMQ_SNDMORE)) 00071 pipe->flush (); 00072 00073 // Detach the original message from the data buffer. 00074 int rc = msg_->init (); 00075 errno_assert (rc == 0); 00076 00077 return 0; 00078 } 00079 00080 int zmq::pair_t::xrecv (msg_t *msg_, int flags_) 00081 { 00082 // Deallocate old content of the message. 00083 int rc = msg_->close (); 00084 errno_assert (rc == 0); 00085 00086 if (!pipe || !pipe->read (msg_)) { 00087 00088 // Initialise the output parameter to be a 0-byte message. 00089 rc = msg_->init (); 00090 errno_assert (rc == 0); 00091 00092 errno = EAGAIN; 00093 return -1; 00094 } 00095 return 0; 00096 } 00097 00098 bool zmq::pair_t::xhas_in () 00099 { 00100 if (!pipe) 00101 return false; 00102 00103 return pipe->check_read (); 00104 } 00105 00106 bool zmq::pair_t::xhas_out () 00107 { 00108 if (!pipe) 00109 return false; 00110 00111 msg_t msg; 00112 int rc = msg.init (); 00113 errno_assert (rc == 0); 00114 bool result = pipe->check_write (&msg); 00115 rc = msg.close (); 00116 errno_assert (rc == 0); 00117 return result; 00118 } 00119 00120 zmq::pair_session_t::pair_session_t (io_thread_t *io_thread_, bool connect_, 00121 socket_base_t *socket_, const options_t &options_, 00122 const char *protocol_, const char *address_) : 00123 session_base_t (io_thread_, connect_, socket_, options_, protocol_, 00124 address_) 00125 { 00126 } 00127 00128 zmq::pair_session_t::~pair_session_t () 00129 { 00130 } 00131