libzmq master
The Intelligent Transport Layer

yqueue.hpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
00023 #define __ZMQ_YQUEUE_HPP_INCLUDED__
00024 
00025 #include <stdlib.h>
00026 #include <stddef.h>
00027 
00028 #include "err.hpp"
00029 #include "atomic_ptr.hpp"
00030 
00031 namespace zmq
00032 {
00033 
00034     //  yqueue is an efficient queue implementation. The main goal is
00035     //  to minimise number of allocations/deallocations needed. Thus yqueue
00036     //  allocates/deallocates elements in batches of N.
00037     //
00038     //  yqueue allows one thread to use push/back function and another one 
00039     //  to use pop/front functions. However, user must ensure that there's no
00040     //  pop on the empty queue and that both threads don't access the same
00041     //  element in unsynchronised manner.
00042     //
00043     //  T is the type of the object in the queue.
00044     //  N is granularity of the queue (how many pushes have to be done till
00045     //  actual memory allocation is required).
00046 
00047     template <typename T, int N> class yqueue_t
00048     {
00049     public:
00050 
00051         //  Create the queue.
00052         inline yqueue_t ()
00053         {
00054              begin_chunk = (chunk_t*) malloc (sizeof (chunk_t));
00055              alloc_assert (begin_chunk);
00056              begin_pos = 0;
00057              back_chunk = NULL;
00058              back_pos = 0;
00059              end_chunk = begin_chunk;
00060              end_pos = 0;
00061         }
00062 
00063         //  Destroy the queue.
00064         inline ~yqueue_t ()
00065         {
00066             while (true) {
00067                 if (begin_chunk == end_chunk) {
00068                     free (begin_chunk);
00069                     break;
00070                 } 
00071                 chunk_t *o = begin_chunk;
00072                 begin_chunk = begin_chunk->next;
00073                 free (o);
00074             }
00075 
00076             chunk_t *sc = spare_chunk.xchg (NULL);
00077             if (sc)
00078                 free (sc);
00079         }
00080 
00081         //  Returns reference to the front element of the queue.
00082         //  If the queue is empty, behaviour is undefined.
00083         inline T &front ()
00084         {
00085              return begin_chunk->values [begin_pos];
00086         }
00087 
00088         //  Returns reference to the back element of the queue.
00089         //  If the queue is empty, behaviour is undefined.
00090         inline T &back ()
00091         {
00092             return back_chunk->values [back_pos];
00093         }
00094 
00095         //  Adds an element to the back end of the queue.
00096         inline void push ()
00097         {
00098             back_chunk = end_chunk;
00099             back_pos = end_pos;
00100 
00101             if (++end_pos != N)
00102                 return;
00103 
00104             chunk_t *sc = spare_chunk.xchg (NULL);
00105             if (sc) {
00106                 end_chunk->next = sc;
00107                 sc->prev = end_chunk;
00108             } else {
00109                 end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
00110                 alloc_assert (end_chunk->next);
00111                 end_chunk->next->prev = end_chunk;
00112             }
00113             end_chunk = end_chunk->next;
00114             end_pos = 0;
00115         }
00116 
00117         //  Removes element from the back end of the queue. In other words
00118         //  it rollbacks last push to the queue. Take care: Caller is
00119         //  responsible for destroying the object being unpushed.
00120         //  The caller must also guarantee that the queue isn't empty when
00121         //  unpush is called. It cannot be done automatically as the read
00122         //  side of the queue can be managed by different, completely
00123         //  unsynchronised thread.
00124         inline void unpush ()
00125         {
00126             //  First, move 'back' one position backwards.
00127             if (back_pos)
00128                 --back_pos;
00129             else {
00130                 back_pos = N - 1;
00131                 back_chunk = back_chunk->prev;
00132             }
00133 
00134             //  Now, move 'end' position backwards. Note that obsolete end chunk
00135             //  is not used as a spare chunk. The analysis shows that doing so
00136             //  would require free and atomic operation per chunk deallocated
00137             //  instead of a simple free.
00138             if (end_pos)
00139                 --end_pos;
00140             else {
00141                 end_pos = N - 1;
00142                 end_chunk = end_chunk->prev;
00143                 free (end_chunk->next);
00144                 end_chunk->next = NULL;
00145             }
00146         }
00147 
00148         //  Removes an element from the front end of the queue.
00149         inline void pop ()
00150         {
00151             if (++ begin_pos == N) {
00152                 chunk_t *o = begin_chunk;
00153                 begin_chunk = begin_chunk->next;
00154                 begin_chunk->prev = NULL;
00155                 begin_pos = 0;
00156 
00157                 //  'o' has been more recently used than spare_chunk,
00158                 //  so for cache reasons we'll get rid of the spare and
00159                 //  use 'o' as the spare.
00160                 chunk_t *cs = spare_chunk.xchg (o);
00161                 if (cs)
00162                     free (cs);
00163             }
00164         }
00165 
00166     private:
00167 
00168         //  Individual memory chunk to hold N elements.
00169         struct chunk_t
00170         {
00171              T values [N];
00172              chunk_t *prev;
00173              chunk_t *next;
00174         };
00175 
00176         //  Back position may point to invalid memory if the queue is empty,
00177         //  while begin & end positions are always valid. Begin position is
00178         //  accessed exclusively be queue reader (front/pop), while back and
00179         //  end positions are accessed exclusively by queue writer (back/push).
00180         chunk_t *begin_chunk;
00181         int begin_pos;
00182         chunk_t *back_chunk;
00183         int back_pos;
00184         chunk_t *end_chunk;
00185         int end_pos;
00186 
00187         //  People are likely to produce and consume at similar rates.  In
00188         //  this scenario holding onto the most recently freed chunk saves
00189         //  us from having to call malloc/free.
00190         atomic_ptr_t<chunk_t> spare_chunk;
00191 
00192         //  Disable copying of yqueue.
00193         yqueue_t (const yqueue_t&);
00194         const yqueue_t &operator = (const yqueue_t&);
00195     };
00196 
00197 }
00198 
00199 #endif
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines