libzmq master
The Intelligent Transport Layer

ypipe.hpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #ifndef __ZMQ_YPIPE_HPP_INCLUDED__
00023 #define __ZMQ_YPIPE_HPP_INCLUDED__
00024 
00025 #include "atomic_ptr.hpp"
00026 #include "yqueue.hpp"
00027 #include "platform.hpp"
00028 
00029 namespace zmq
00030 {
00031 
00032     //  Lock-free queue implementation.
00033     //  Only a single thread can read from the pipe at any specific moment.
00034     //  Only a single thread can write to the pipe at any specific moment.
00035     //  T is the type of the object in the queue.
00036     //  N is granularity of the pipe, i.e. how many items are needed to
00037     //  perform next memory allocation.
00038 
00039     template <typename T, int N> class ypipe_t
00040     {
00041     public:
00042 
00043         //  Initialises the pipe.
00044         inline ypipe_t ()
00045         {
00046             //  Insert terminator element into the queue.
00047             queue.push ();
00048 
00049             //  Let all the pointers to point to the terminator.
00050             //  (unless pipe is dead, in which case c is set to NULL).
00051             r = w = f = &queue.back ();
00052             c.set (&queue.back ());
00053         }
00054 
00055         //  The destructor doesn't have to be virtual. It is mad virtual
00056         //  just to keep ICC and code checking tools from complaining.
00057         inline virtual ~ypipe_t ()
00058         {
00059         }
00060 
00061         //  Following function (write) deliberately copies uninitialised data
00062         //  when used with zmq_msg. Initialising the VSM body for
00063         //  non-VSM messages won't be good for performance.
00064 
00065 #ifdef ZMQ_HAVE_OPENVMS
00066 #pragma message save
00067 #pragma message disable(UNINIT)
00068 #endif
00069 
00070         //  Write an item to the pipe.  Don't flush it yet. If incomplete is
00071         //  set to true the item is assumed to be continued by items
00072         //  subsequently written to the pipe. Incomplete items are never
00073         //  flushed down the stream.
00074         inline void write (const T &value_, bool incomplete_)
00075         {
00076             //  Place the value to the queue, add new terminator element.
00077             queue.back () = value_;
00078             queue.push ();
00079 
00080             //  Move the "flush up to here" poiter.
00081             if (!incomplete_)
00082                 f = &queue.back ();
00083         }
00084 
00085 #ifdef ZMQ_HAVE_OPENVMS
00086 #pragma message restore
00087 #endif
00088 
00089         //  Pop an incomplete item from the pipe. Returns true is such
00090         //  item exists, false otherwise.
00091         inline bool unwrite (T *value_)
00092         {
00093             if (f == &queue.back ())
00094                 return false;
00095             queue.unpush ();
00096             *value_ = queue.back ();
00097             return true;
00098         }
00099 
00100         //  Flush all the completed items into the pipe. Returns false if
00101         //  the reader thread is sleeping. In that case, caller is obliged to
00102         //  wake the reader up before using the pipe again.
00103         inline bool flush ()
00104         {
00105             //  If there are no un-flushed items, do nothing.
00106             if (w == f)
00107                 return true;
00108 
00109             //  Try to set 'c' to 'f'.
00110             if (c.cas (w, f) != w) {
00111 
00112                 //  Compare-and-swap was unseccessful because 'c' is NULL.
00113                 //  This means that the reader is asleep. Therefore we don't
00114                 //  care about thread-safeness and update c in non-atomic
00115                 //  manner. We'll return false to let the caller know
00116                 //  that reader is sleeping.
00117                 c.set (f);
00118                 w = f;
00119                 return false;
00120             }
00121 
00122             //  Reader is alive. Nothing special to do now. Just move
00123             //  the 'first un-flushed item' pointer to 'f'.
00124             w = f;
00125             return true;
00126         }
00127 
00128         //  Check whether item is available for reading.
00129         inline bool check_read ()
00130         {
00131             //  Was the value prefetched already? If so, return.
00132             if (&queue.front () != r && r)
00133                  return true;
00134 
00135             //  There's no prefetched value, so let us prefetch more values.
00136             //  Prefetching is to simply retrieve the
00137             //  pointer from c in atomic fashion. If there are no
00138             //  items to prefetch, set c to NULL (using compare-and-swap).
00139             r = c.cas (&queue.front (), NULL);
00140 
00141             //  If there are no elements prefetched, exit.
00142             //  During pipe's lifetime r should never be NULL, however,
00143             //  it can happen during pipe shutdown when items
00144             //  are being deallocated.
00145             if (&queue.front () == r || !r)
00146                 return false;
00147 
00148             //  There was at least one value prefetched.
00149             return true;
00150         }
00151 
00152         //  Reads an item from the pipe. Returns false if there is no value.
00153         //  available.
00154         inline bool read (T *value_)
00155         {
00156             //  Try to prefetch a value.
00157             if (!check_read ())
00158                 return false;
00159 
00160             //  There was at least one value prefetched.
00161             //  Return it to the caller.
00162             *value_ = queue.front ();
00163             queue.pop ();
00164             return true;
00165         }
00166 
00167         //  Applies the function fn to the first elemenent in the pipe
00168         //  and returns the value returned by the fn.
00169         //  The pipe mustn't be empty or the function crashes.
00170         inline bool probe (bool (*fn)(T &))
00171         {
00172                 bool rc = check_read ();
00173                 zmq_assert (rc);
00174 
00175                 return (*fn) (queue.front ());
00176         }
00177 
00178     protected:
00179 
00180         //  Allocation-efficient queue to store pipe items.
00181         //  Front of the queue points to the first prefetched item, back of
00182         //  the pipe points to last un-flushed item. Front is used only by
00183         //  reader thread, while back is used only by writer thread.
00184         yqueue_t <T, N> queue;
00185 
00186         //  Points to the first un-flushed item. This variable is used
00187         //  exclusively by writer thread.
00188         T *w;
00189 
00190         //  Points to the first un-prefetched item. This variable is used
00191         //  exclusively by reader thread.
00192         T *r;
00193 
00194         //  Points to the first item to be flushed in the future.
00195         T *f;
00196 
00197         //  The single point of contention between writer and reader thread.
00198         //  Points past the last flushed item. If it is NULL,
00199         //  reader is asleep. This pointer should be always accessed using
00200         //  atomic operations.
00201         atomic_ptr_t <T> c;
00202 
00203         //  Disable copying of ypipe object.
00204         ypipe_t (const ypipe_t&);
00205         const ypipe_t &operator = (const ypipe_t&);
00206     };
00207 
00208 }
00209 
00210 #endif
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines