![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2011 VMware, Inc. 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "xreq.hpp" 00023 #include "err.hpp" 00024 #include "msg.hpp" 00025 00026 zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : 00027 socket_base_t (parent_, tid_) 00028 { 00029 options.type = ZMQ_XREQ; 00030 00031 // TODO: Uncomment the following line when XREQ will become true XREQ 00032 // rather than generic dealer socket. 00033 // If the socket is closing we can drop all the outbound requests. There'll 00034 // be noone to receive the replies anyway. 00035 // options.delay_on_close = false; 00036 00037 options.send_identity = true; 00038 options.recv_identity = true; 00039 } 00040 00041 zmq::xreq_t::~xreq_t () 00042 { 00043 } 00044 00045 void zmq::xreq_t::xattach_pipe (pipe_t *pipe_) 00046 { 00047 zmq_assert (pipe_); 00048 fq.attach (pipe_); 00049 lb.attach (pipe_); 00050 } 00051 00052 int zmq::xreq_t::xsend (msg_t *msg_, int flags_) 00053 { 00054 return lb.send (msg_, flags_); 00055 } 00056 00057 int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) 00058 { 00059 // XREQ socket doesn't use identities. We can safely drop it and 00060 while (true) { 00061 int rc = fq.recv (msg_, flags_); 00062 if (rc != 0) 00063 return rc; 00064 if (likely (!(msg_->flags () & msg_t::identity))) 00065 break; 00066 } 00067 return 0; 00068 } 00069 00070 bool zmq::xreq_t::xhas_in () 00071 { 00072 return fq.has_in (); 00073 } 00074 00075 bool zmq::xreq_t::xhas_out () 00076 { 00077 return lb.has_out (); 00078 } 00079 00080 void zmq::xreq_t::xread_activated (pipe_t *pipe_) 00081 { 00082 fq.activated (pipe_); 00083 } 00084 00085 void zmq::xreq_t::xwrite_activated (pipe_t *pipe_) 00086 { 00087 lb.activated (pipe_); 00088 } 00089 00090 void zmq::xreq_t::xterminated (pipe_t *pipe_) 00091 { 00092 fq.terminated (pipe_); 00093 lb.terminated (pipe_); 00094 } 00095 00096 zmq::xreq_session_t::xreq_session_t (io_thread_t *io_thread_, bool connect_, 00097 socket_base_t *socket_, const options_t &options_, 00098 const char *protocol_, const char *address_) : 00099 session_base_t (io_thread_, connect_, socket_, options_, protocol_, 00100 address_) 00101 { 00102 } 00103 00104 zmq::xreq_session_t::~xreq_session_t () 00105 { 00106 } 00107