![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "req.hpp" 00024 #include "err.hpp" 00025 #include "msg.hpp" 00026 #include "wire.hpp" 00027 #include "random.hpp" 00028 #include "likely.hpp" 00029 00030 zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) : 00031 xreq_t (parent_, tid_), 00032 receiving_reply (false), 00033 message_begins (true) 00034 { 00035 options.type = ZMQ_REQ; 00036 } 00037 00038 zmq::req_t::~req_t () 00039 { 00040 } 00041 00042 int zmq::req_t::xsend (msg_t *msg_, int flags_) 00043 { 00044 // If we've sent a request and we still haven't got the reply, 00045 // we can't send another request. 00046 if (receiving_reply) { 00047 errno = EFSM; 00048 return -1; 00049 } 00050 00051 // First part of the request is the request identity. 00052 if (message_begins) { 00053 msg_t bottom; 00054 int rc = bottom.init (); 00055 errno_assert (rc == 0); 00056 bottom.set_flags (msg_t::more); 00057 rc = xreq_t::xsend (&bottom, 0); 00058 if (rc != 0) 00059 return -1; 00060 message_begins = false; 00061 } 00062 00063 bool more = msg_->flags () & msg_t::more ? true : false; 00064 00065 int rc = xreq_t::xsend (msg_, flags_); 00066 if (rc != 0) 00067 return rc; 00068 00069 // If the request was fully sent, flip the FSM into reply-receiving state. 00070 if (!more) { 00071 receiving_reply = true; 00072 message_begins = true; 00073 } 00074 00075 return 0; 00076 } 00077 00078 int zmq::req_t::xrecv (msg_t *msg_, int flags_) 00079 { 00080 // If request wasn't send, we can't wait for reply. 00081 if (!receiving_reply) { 00082 errno = EFSM; 00083 return -1; 00084 } 00085 00086 // First part of the reply should be the original request ID. 00087 if (message_begins) { 00088 int rc = xreq_t::xrecv (msg_, flags_); 00089 if (rc != 0) 00090 return rc; 00091 00092 // TODO: This should also close the connection with the peer! 00093 if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) { 00094 while (true) { 00095 int rc = xreq_t::xrecv (msg_, flags_); 00096 errno_assert (rc == 0); 00097 if (!(msg_->flags () & msg_t::more)) 00098 break; 00099 } 00100 msg_->close (); 00101 msg_->init (); 00102 errno = EAGAIN; 00103 return -1; 00104 } 00105 00106 message_begins = false; 00107 } 00108 00109 int rc = xreq_t::xrecv (msg_, flags_); 00110 if (rc != 0) 00111 return rc; 00112 00113 // If the reply is fully received, flip the FSM into request-sending state. 00114 if (!(msg_->flags () & msg_t::more)) { 00115 receiving_reply = false; 00116 message_begins = true; 00117 } 00118 00119 return 0; 00120 } 00121 00122 bool zmq::req_t::xhas_in () 00123 { 00124 // TODO: Duplicates should be removed here. 00125 00126 if (!receiving_reply) 00127 return false; 00128 00129 return xreq_t::xhas_in (); 00130 } 00131 00132 bool zmq::req_t::xhas_out () 00133 { 00134 if (receiving_reply) 00135 return false; 00136 00137 return xreq_t::xhas_out (); 00138 } 00139 00140 zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, 00141 socket_base_t *socket_, const options_t &options_, 00142 const char *protocol_, const char *address_) : 00143 xreq_session_t (io_thread_, connect_, socket_, options_, protocol_, 00144 address_), 00145 state (identity) 00146 { 00147 } 00148 00149 zmq::req_session_t::~req_session_t () 00150 { 00151 state = options.recv_identity ? identity : bottom; 00152 } 00153 00154 int zmq::req_session_t::write (msg_t *msg_) 00155 { 00156 switch (state) { 00157 case bottom: 00158 if (msg_->flags () == msg_t::more && msg_->size () == 0) { 00159 state = body; 00160 return xreq_session_t::write (msg_); 00161 } 00162 break; 00163 case body: 00164 if (msg_->flags () == msg_t::more) 00165 return xreq_session_t::write (msg_); 00166 if (msg_->flags () == 0) { 00167 state = bottom; 00168 return xreq_session_t::write (msg_); 00169 } 00170 break; 00171 case identity: 00172 if (msg_->flags () == 0) { 00173 state = bottom; 00174 return xreq_session_t::write (msg_); 00175 } 00176 break; 00177 } 00178 errno = EFAULT; 00179 return -1; 00180 } 00181