libzmq master
The Intelligent Transport Layer

req.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2011 VMware, Inc.
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include "req.hpp"
00024 #include "err.hpp"
00025 #include "msg.hpp"
00026 #include "wire.hpp"
00027 #include "random.hpp"
00028 #include "likely.hpp"
00029 
00030 zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_) :
00031     xreq_t (parent_, tid_),
00032     receiving_reply (false),
00033     message_begins (true)
00034 {
00035     options.type = ZMQ_REQ;
00036 }
00037 
00038 zmq::req_t::~req_t ()
00039 {
00040 }
00041 
00042 int zmq::req_t::xsend (msg_t *msg_, int flags_)
00043 {
00044     //  If we've sent a request and we still haven't got the reply,
00045     //  we can't send another request.
00046     if (receiving_reply) {
00047         errno = EFSM;
00048         return -1;
00049     }
00050 
00051     //  First part of the request is the request identity.
00052     if (message_begins) {
00053         msg_t bottom;
00054         int rc = bottom.init ();
00055         errno_assert (rc == 0);
00056         bottom.set_flags (msg_t::more);
00057         rc = xreq_t::xsend (&bottom, 0);
00058         if (rc != 0)
00059             return -1;
00060         message_begins = false;
00061     }
00062 
00063     bool more = msg_->flags () & msg_t::more ? true : false;
00064 
00065     int rc = xreq_t::xsend (msg_, flags_);
00066     if (rc != 0)
00067         return rc;
00068 
00069     //  If the request was fully sent, flip the FSM into reply-receiving state.
00070     if (!more) {
00071         receiving_reply = true;
00072         message_begins = true;
00073     }
00074 
00075     return 0;
00076 }
00077 
00078 int zmq::req_t::xrecv (msg_t *msg_, int flags_)
00079 {
00080     //  If request wasn't send, we can't wait for reply.
00081     if (!receiving_reply) {
00082         errno = EFSM;
00083         return -1;
00084     }
00085 
00086     //  First part of the reply should be the original request ID.
00087     if (message_begins) {
00088         int rc = xreq_t::xrecv (msg_, flags_);
00089         if (rc != 0)
00090             return rc;
00091 
00092         // TODO: This should also close the connection with the peer!
00093         if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
00094             while (true) {
00095                 int rc = xreq_t::xrecv (msg_, flags_);
00096                 errno_assert (rc == 0);
00097                 if (!(msg_->flags () & msg_t::more))
00098                     break;
00099             }
00100             msg_->close ();
00101             msg_->init ();
00102             errno = EAGAIN;
00103             return -1;
00104         }
00105 
00106         message_begins = false;
00107     }
00108 
00109     int rc = xreq_t::xrecv (msg_, flags_);
00110     if (rc != 0)
00111         return rc;
00112 
00113     //  If the reply is fully received, flip the FSM into request-sending state.
00114     if (!(msg_->flags () & msg_t::more)) {
00115         receiving_reply = false;
00116         message_begins = true;
00117     }
00118 
00119     return 0;
00120 }
00121 
00122 bool zmq::req_t::xhas_in ()
00123 {
00124     //  TODO: Duplicates should be removed here.
00125 
00126     if (!receiving_reply)
00127         return false;
00128 
00129     return xreq_t::xhas_in ();
00130 }
00131 
00132 bool zmq::req_t::xhas_out ()
00133 {
00134     if (receiving_reply)
00135         return false;
00136 
00137     return xreq_t::xhas_out ();
00138 }
00139 
00140 zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
00141       socket_base_t *socket_, const options_t &options_,
00142       const char *protocol_, const char *address_) :
00143     xreq_session_t (io_thread_, connect_, socket_, options_, protocol_,
00144         address_),
00145     state (identity)
00146 {
00147 }
00148 
00149 zmq::req_session_t::~req_session_t ()
00150 {
00151     state = options.recv_identity ? identity : bottom;
00152 }
00153 
00154 int zmq::req_session_t::write (msg_t *msg_)
00155 {
00156     switch (state) {
00157     case bottom:
00158         if (msg_->flags () == msg_t::more && msg_->size () == 0) {
00159             state = body;
00160             return xreq_session_t::write (msg_);
00161         }
00162         break;
00163     case body:
00164         if (msg_->flags () == msg_t::more)
00165             return xreq_session_t::write (msg_);
00166         if (msg_->flags () == 0) {
00167             state = bottom;
00168             return xreq_session_t::write (msg_);
00169         }
00170         break;
00171     case identity:
00172         if (msg_->flags () == 0) {
00173             state = bottom;
00174             return xreq_session_t::write (msg_);
00175         }
00176         break;
00177     }
00178     errno = EFAULT;
00179     return -1;
00180 }
00181 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines