![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2011 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #ifndef __ZMQ_XREP_HPP_INCLUDED__ 00024 #define __ZMQ_XREP_HPP_INCLUDED__ 00025 00026 #include <map> 00027 00028 #include "socket_base.hpp" 00029 #include "session_base.hpp" 00030 #include "stdint.hpp" 00031 #include "blob.hpp" 00032 #include "msg.hpp" 00033 #include "fq.hpp" 00034 00035 namespace zmq 00036 { 00037 00038 // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. 00039 class xrep_t : 00040 public socket_base_t 00041 { 00042 public: 00043 00044 xrep_t (class ctx_t *parent_, uint32_t tid_); 00045 ~xrep_t (); 00046 00047 // Overloads of functions from socket_base_t. 00048 void xattach_pipe (class pipe_t *pipe_); 00049 int xsend (class msg_t *msg_, int flags_); 00050 int xrecv (class msg_t *msg_, int flags_); 00051 bool xhas_in (); 00052 bool xhas_out (); 00053 void xread_activated (class pipe_t *pipe_); 00054 void xwrite_activated (class pipe_t *pipe_); 00055 void xterminated (class pipe_t *pipe_); 00056 00057 protected: 00058 00059 // Rollback any message parts that were sent but not yet flushed. 00060 int rollback (); 00061 00062 private: 00063 00064 // Fair queueing object for inbound pipes. 00065 fq_t fq; 00066 00067 // Have we prefetched a message. 00068 bool prefetched; 00069 00070 // Holds the prefetched message. 00071 msg_t prefetched_msg; 00072 00073 // If true, more incoming message parts are expected. 00074 bool more_in; 00075 00076 struct outpipe_t 00077 { 00078 class pipe_t *pipe; 00079 bool active; 00080 }; 00081 00082 // Outbound pipes indexed by the peer IDs. 00083 typedef std::map <blob_t, outpipe_t> outpipes_t; 00084 outpipes_t outpipes; 00085 00086 // The pipe we are currently writing to. 00087 class pipe_t *current_out; 00088 00089 // If true, more outgoing message parts are expected. 00090 bool more_out; 00091 00092 // Peer ID are generated. It's a simple increment and wrap-over 00093 // algorithm. This value is the next ID to use (if not used already). 00094 uint32_t next_peer_id; 00095 00096 xrep_t (const xrep_t&); 00097 const xrep_t &operator = (const xrep_t&); 00098 }; 00099 00100 class xrep_session_t : public session_base_t 00101 { 00102 public: 00103 00104 xrep_session_t (class io_thread_t *io_thread_, bool connect_, 00105 class socket_base_t *socket_, const options_t &options_, 00106 const char *protocol_, const char *address_); 00107 ~xrep_session_t (); 00108 00109 private: 00110 00111 xrep_session_t (const xrep_session_t&); 00112 const xrep_session_t &operator = (const xrep_session_t&); 00113 }; 00114 00115 } 00116 00117 #endif