![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #ifndef __ZMQ_YPIPE_HPP_INCLUDED__ 00023 #define __ZMQ_YPIPE_HPP_INCLUDED__ 00024 00025 #include "atomic_ptr.hpp" 00026 #include "yqueue.hpp" 00027 #include "platform.hpp" 00028 00029 namespace zmq 00030 { 00031 00032 // Lock-free queue implementation. 00033 // Only a single thread can read from the pipe at any specific moment. 00034 // Only a single thread can write to the pipe at any specific moment. 00035 // T is the type of the object in the queue. 00036 // N is granularity of the pipe, i.e. how many items are needed to 00037 // perform next memory allocation. 00038 00039 template <typename T, int N> class ypipe_t 00040 { 00041 public: 00042 00043 // Initialises the pipe. 00044 inline ypipe_t () 00045 { 00046 // Insert terminator element into the queue. 00047 queue.push (); 00048 00049 // Let all the pointers to point to the terminator. 00050 // (unless pipe is dead, in which case c is set to NULL). 00051 r = w = f = &queue.back (); 00052 c.set (&queue.back ()); 00053 } 00054 00055 // The destructor doesn't have to be virtual. It is mad virtual 00056 // just to keep ICC and code checking tools from complaining. 00057 inline virtual ~ypipe_t () 00058 { 00059 } 00060 00061 // Following function (write) deliberately copies uninitialised data 00062 // when used with zmq_msg. Initialising the VSM body for 00063 // non-VSM messages won't be good for performance. 00064 00065 #ifdef ZMQ_HAVE_OPENVMS 00066 #pragma message save 00067 #pragma message disable(UNINIT) 00068 #endif 00069 00070 // Write an item to the pipe. Don't flush it yet. If incomplete is 00071 // set to true the item is assumed to be continued by items 00072 // subsequently written to the pipe. Incomplete items are never 00073 // flushed down the stream. 00074 inline void write (const T &value_, bool incomplete_) 00075 { 00076 // Place the value to the queue, add new terminator element. 00077 queue.back () = value_; 00078 queue.push (); 00079 00080 // Move the "flush up to here" poiter. 00081 if (!incomplete_) 00082 f = &queue.back (); 00083 } 00084 00085 #ifdef ZMQ_HAVE_OPENVMS 00086 #pragma message restore 00087 #endif 00088 00089 // Pop an incomplete item from the pipe. Returns true is such 00090 // item exists, false otherwise. 00091 inline bool unwrite (T *value_) 00092 { 00093 if (f == &queue.back ()) 00094 return false; 00095 queue.unpush (); 00096 *value_ = queue.back (); 00097 return true; 00098 } 00099 00100 // Flush all the completed items into the pipe. Returns false if 00101 // the reader thread is sleeping. In that case, caller is obliged to 00102 // wake the reader up before using the pipe again. 00103 inline bool flush () 00104 { 00105 // If there are no un-flushed items, do nothing. 00106 if (w == f) 00107 return true; 00108 00109 // Try to set 'c' to 'f'. 00110 if (c.cas (w, f) != w) { 00111 00112 // Compare-and-swap was unseccessful because 'c' is NULL. 00113 // This means that the reader is asleep. Therefore we don't 00114 // care about thread-safeness and update c in non-atomic 00115 // manner. We'll return false to let the caller know 00116 // that reader is sleeping. 00117 c.set (f); 00118 w = f; 00119 return false; 00120 } 00121 00122 // Reader is alive. Nothing special to do now. Just move 00123 // the 'first un-flushed item' pointer to 'f'. 00124 w = f; 00125 return true; 00126 } 00127 00128 // Check whether item is available for reading. 00129 inline bool check_read () 00130 { 00131 // Was the value prefetched already? If so, return. 00132 if (&queue.front () != r && r) 00133 return true; 00134 00135 // There's no prefetched value, so let us prefetch more values. 00136 // Prefetching is to simply retrieve the 00137 // pointer from c in atomic fashion. If there are no 00138 // items to prefetch, set c to NULL (using compare-and-swap). 00139 r = c.cas (&queue.front (), NULL); 00140 00141 // If there are no elements prefetched, exit. 00142 // During pipe's lifetime r should never be NULL, however, 00143 // it can happen during pipe shutdown when items 00144 // are being deallocated. 00145 if (&queue.front () == r || !r) 00146 return false; 00147 00148 // There was at least one value prefetched. 00149 return true; 00150 } 00151 00152 // Reads an item from the pipe. Returns false if there is no value. 00153 // available. 00154 inline bool read (T *value_) 00155 { 00156 // Try to prefetch a value. 00157 if (!check_read ()) 00158 return false; 00159 00160 // There was at least one value prefetched. 00161 // Return it to the caller. 00162 *value_ = queue.front (); 00163 queue.pop (); 00164 return true; 00165 } 00166 00167 // Applies the function fn to the first elemenent in the pipe 00168 // and returns the value returned by the fn. 00169 // The pipe mustn't be empty or the function crashes. 00170 inline bool probe (bool (*fn)(T &)) 00171 { 00172 bool rc = check_read (); 00173 zmq_assert (rc); 00174 00175 return (*fn) (queue.front ()); 00176 } 00177 00178 protected: 00179 00180 // Allocation-efficient queue to store pipe items. 00181 // Front of the queue points to the first prefetched item, back of 00182 // the pipe points to last un-flushed item. Front is used only by 00183 // reader thread, while back is used only by writer thread. 00184 yqueue_t <T, N> queue; 00185 00186 // Points to the first un-flushed item. This variable is used 00187 // exclusively by writer thread. 00188 T *w; 00189 00190 // Points to the first un-prefetched item. This variable is used 00191 // exclusively by reader thread. 00192 T *r; 00193 00194 // Points to the first item to be flushed in the future. 00195 T *f; 00196 00197 // The single point of contention between writer and reader thread. 00198 // Points past the last flushed item. If it is NULL, 00199 // reader is asleep. This pointer should be always accessed using 00200 // atomic operations. 00201 atomic_ptr_t <T> c; 00202 00203 // Disable copying of ypipe object. 00204 ypipe_t (const ypipe_t&); 00205 const ypipe_t &operator = (const ypipe_t&); 00206 }; 00207 00208 } 00209 00210 #endif