![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #ifndef __ZMQ_PIPE_HPP_INCLUDED__ 00024 #define __ZMQ_PIPE_HPP_INCLUDED__ 00025 00026 #include "msg.hpp" 00027 #include "ypipe.hpp" 00028 #include "config.hpp" 00029 #include "object.hpp" 00030 #include "stdint.hpp" 00031 #include "array.hpp" 00032 #include "blob.hpp" 00033 00034 namespace zmq 00035 { 00036 00037 // Create a pipepair for bi-directional transfer of messages. 00038 // First HWM is for messages passed from first pipe to the second pipe. 00039 // Second HWM is for messages passed from second pipe to the first pipe. 00040 // Delay specifies how the pipe behaves when the peer terminates. If true 00041 // pipe receives all the pending messages before terminating, otherwise it 00042 // terminates straight away. 00043 int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], 00044 int hwms_ [2], bool delays_ [2]); 00045 00046 struct i_pipe_events 00047 { 00048 virtual ~i_pipe_events () {} 00049 00050 virtual void read_activated (class pipe_t *pipe_) = 0; 00051 virtual void write_activated (class pipe_t *pipe_) = 0; 00052 virtual void hiccuped (class pipe_t *pipe_) = 0; 00053 virtual void terminated (class pipe_t *pipe_) = 0; 00054 }; 00055 00056 // Note that pipe can be stored in three different arrays. 00057 // The array of inbound pipes (1), the array of outbound pipes (2) and 00058 // the generic array of pipes to deallocate (3). 00059 00060 class pipe_t : 00061 public object_t, 00062 public array_item_t <1>, 00063 public array_item_t <2>, 00064 public array_item_t <3> 00065 { 00066 // This allows pipepair to create pipe objects. 00067 friend int pipepair (class object_t *parents_ [2], 00068 class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); 00069 00070 public: 00071 00072 // Specifies the object to send events to. 00073 void set_event_sink (i_pipe_events *sink_); 00074 00075 // Pipe endpoint can store an opaque ID to be used by its clients. 00076 void set_identity (const blob_t &identity_); 00077 blob_t get_identity (); 00078 00079 // Returns true if there is at least one message to read in the pipe. 00080 bool check_read (); 00081 00082 // Reads a message to the underlying pipe. 00083 bool read (msg_t *msg_); 00084 00085 // Checks whether messages can be written to the pipe. If writing 00086 // the message would cause high watermark the function returns false. 00087 bool check_write (msg_t *msg_); 00088 00089 // Writes a message to the underlying pipe. Returns false if the 00090 // message cannot be written because high watermark was reached. 00091 bool write (msg_t *msg_); 00092 00093 // Remove unfinished parts of the outbound message from the pipe. 00094 void rollback (); 00095 00096 // Flush the messages downsteam. 00097 void flush (); 00098 00099 // Temporaraily disconnects the inbound message stream and drops 00100 // all the messages on the fly. Causes 'hiccuped' event to be generated 00101 // in the peer. 00102 void hiccup (); 00103 00104 // Ask pipe to terminate. The termination will happen asynchronously 00105 // and user will be notified about actual deallocation by 'terminated' 00106 // event. If delay is true, the pending messages will be processed 00107 // before actual shutdown. 00108 void terminate (bool delay_); 00109 00110 private: 00111 00112 // Type of the underlying lock-free pipe. 00113 typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; 00114 00115 // Command handlers. 00116 void process_activate_read (); 00117 void process_activate_write (uint64_t msgs_read_); 00118 void process_hiccup (void *pipe_); 00119 void process_pipe_term (); 00120 void process_pipe_term_ack (); 00121 00122 // Handler for delimiter read from the pipe. 00123 void delimit (); 00124 00125 // Constructor is private. Pipe can only be created using 00126 // pipepair function. 00127 pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, 00128 int inhwm_, int outhwm_, bool delay_); 00129 00130 // Pipepair uses this function to let us know about 00131 // the peer pipe object. 00132 void set_peer (pipe_t *pipe_); 00133 00134 // Destructor is private. Pipe objects destroy themselves. 00135 ~pipe_t (); 00136 00137 // Underlying pipes for both directions. 00138 upipe_t *inpipe; 00139 upipe_t *outpipe; 00140 00141 // Can the pipe be read from / written to? 00142 bool in_active; 00143 bool out_active; 00144 00145 // High watermark for the outbound pipe. 00146 int hwm; 00147 00148 // Low watermark for the inbound pipe. 00149 int lwm; 00150 00151 // Number of messages read and written so far. 00152 uint64_t msgs_read; 00153 uint64_t msgs_written; 00154 00155 // Last received peer's msgs_read. The actual number in the peer 00156 // can be higher at the moment. 00157 uint64_t peers_msgs_read; 00158 00159 // The pipe object on the other side of the pipepair. 00160 pipe_t *peer; 00161 00162 // Sink to send events to. 00163 i_pipe_events *sink; 00164 00165 // State of the pipe endpoint. Active is common state before any 00166 // termination begins. Delimited means that delimiter was read from 00167 // pipe before term command was received. Pending means that term 00168 // command was already received from the peer but there are still 00169 // pending messages to read. Terminating means that all pending 00170 // messages were already read and all we are waiting for is ack from 00171 // the peer. Terminated means that 'terminate' was explicitly called 00172 // by the user. Double_terminated means that user called 'terminate' 00173 // and then we've got term command from the peer as well. 00174 enum { 00175 active, 00176 delimited, 00177 pending, 00178 terminating, 00179 terminated, 00180 double_terminated 00181 } state; 00182 00183 // If true, we receive all the pending inbound messages before 00184 // terminating. If false, we terminate immediately when the peer 00185 // asks us to. 00186 bool delay; 00187 00188 // Identity of the writer. Used uniquely by the reader side. 00189 blob_t identity; 00190 00191 // Returns true if the message is delimiter; false otherwise. 00192 static bool is_delimiter (msg_t &msg_); 00193 00194 // Computes appropriate low watermark from the given high watermark. 00195 static int compute_lwm (int hwm_); 00196 00197 // Disable copying. 00198 pipe_t (const pipe_t&); 00199 const pipe_t &operator = (const pipe_t&); 00200 }; 00201 00202 } 00203 00204 #endif