![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2011 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "rep.hpp" 00023 #include "err.hpp" 00024 #include "msg.hpp" 00025 00026 zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t tid_) : 00027 xrep_t (parent_, tid_), 00028 sending_reply (false), 00029 request_begins (true) 00030 { 00031 options.type = ZMQ_REP; 00032 } 00033 00034 zmq::rep_t::~rep_t () 00035 { 00036 } 00037 00038 int zmq::rep_t::xsend (msg_t *msg_, int flags_) 00039 { 00040 // If we are in the middle of receiving a request, we cannot send reply. 00041 if (!sending_reply) { 00042 errno = EFSM; 00043 return -1; 00044 } 00045 00046 bool more = msg_->flags () & msg_t::more ? true : false; 00047 00048 // Push message to the reply pipe. 00049 int rc = xrep_t::xsend (msg_, flags_); 00050 if (rc != 0) 00051 return rc; 00052 00053 // If the reply is complete flip the FSM back to request receiving state. 00054 if (!more) 00055 sending_reply = false; 00056 00057 return 0; 00058 } 00059 00060 int zmq::rep_t::xrecv (msg_t *msg_, int flags_) 00061 { 00062 // If we are in middle of sending a reply, we cannot receive next request. 00063 if (sending_reply) { 00064 errno = EFSM; 00065 return -1; 00066 } 00067 00068 // First thing to do when receiving a request is to copy all the labels 00069 // to the reply pipe. 00070 if (request_begins) { 00071 while (true) { 00072 int rc = xrep_t::xrecv (msg_, flags_); 00073 if (rc != 0) 00074 return rc; 00075 zmq_assert (msg_->flags () & msg_t::more); 00076 bool bottom = (msg_->size () == 0); 00077 rc = xrep_t::xsend (msg_, flags_); 00078 errno_assert (rc == 0); 00079 if (bottom) 00080 break; 00081 } 00082 request_begins = false; 00083 } 00084 00085 // Get next message part to return to the user. 00086 int rc = xrep_t::xrecv (msg_, flags_); 00087 if (rc != 0) 00088 return rc; 00089 00090 // If whole request is read, flip the FSM to reply-sending state. 00091 if (!(msg_->flags () & msg_t::more)) { 00092 sending_reply = true; 00093 request_begins = true; 00094 } 00095 00096 return 0; 00097 } 00098 00099 bool zmq::rep_t::xhas_in () 00100 { 00101 if (sending_reply) 00102 return false; 00103 00104 return xrep_t::xhas_in (); 00105 } 00106 00107 bool zmq::rep_t::xhas_out () 00108 { 00109 if (!sending_reply) 00110 return false; 00111 00112 return xrep_t::xhas_out (); 00113 } 00114 00115 zmq::rep_session_t::rep_session_t (io_thread_t *io_thread_, bool connect_, 00116 socket_base_t *socket_, const options_t &options_, 00117 const char *protocol_, const char *address_) : 00118 xrep_session_t (io_thread_, connect_, socket_, options_, protocol_, 00119 address_) 00120 { 00121 } 00122 00123 zmq::rep_session_t::~rep_session_t () 00124 { 00125 } 00126