![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #ifndef __ZMQ_YQUEUE_HPP_INCLUDED__ 00023 #define __ZMQ_YQUEUE_HPP_INCLUDED__ 00024 00025 #include <stdlib.h> 00026 #include <stddef.h> 00027 00028 #include "err.hpp" 00029 #include "atomic_ptr.hpp" 00030 00031 namespace zmq 00032 { 00033 00034 // yqueue is an efficient queue implementation. The main goal is 00035 // to minimise number of allocations/deallocations needed. Thus yqueue 00036 // allocates/deallocates elements in batches of N. 00037 // 00038 // yqueue allows one thread to use push/back function and another one 00039 // to use pop/front functions. However, user must ensure that there's no 00040 // pop on the empty queue and that both threads don't access the same 00041 // element in unsynchronised manner. 00042 // 00043 // T is the type of the object in the queue. 00044 // N is granularity of the queue (how many pushes have to be done till 00045 // actual memory allocation is required). 00046 00047 template <typename T, int N> class yqueue_t 00048 { 00049 public: 00050 00051 // Create the queue. 00052 inline yqueue_t () 00053 { 00054 begin_chunk = (chunk_t*) malloc (sizeof (chunk_t)); 00055 alloc_assert (begin_chunk); 00056 begin_pos = 0; 00057 back_chunk = NULL; 00058 back_pos = 0; 00059 end_chunk = begin_chunk; 00060 end_pos = 0; 00061 } 00062 00063 // Destroy the queue. 00064 inline ~yqueue_t () 00065 { 00066 while (true) { 00067 if (begin_chunk == end_chunk) { 00068 free (begin_chunk); 00069 break; 00070 } 00071 chunk_t *o = begin_chunk; 00072 begin_chunk = begin_chunk->next; 00073 free (o); 00074 } 00075 00076 chunk_t *sc = spare_chunk.xchg (NULL); 00077 if (sc) 00078 free (sc); 00079 } 00080 00081 // Returns reference to the front element of the queue. 00082 // If the queue is empty, behaviour is undefined. 00083 inline T &front () 00084 { 00085 return begin_chunk->values [begin_pos]; 00086 } 00087 00088 // Returns reference to the back element of the queue. 00089 // If the queue is empty, behaviour is undefined. 00090 inline T &back () 00091 { 00092 return back_chunk->values [back_pos]; 00093 } 00094 00095 // Adds an element to the back end of the queue. 00096 inline void push () 00097 { 00098 back_chunk = end_chunk; 00099 back_pos = end_pos; 00100 00101 if (++end_pos != N) 00102 return; 00103 00104 chunk_t *sc = spare_chunk.xchg (NULL); 00105 if (sc) { 00106 end_chunk->next = sc; 00107 sc->prev = end_chunk; 00108 } else { 00109 end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t)); 00110 alloc_assert (end_chunk->next); 00111 end_chunk->next->prev = end_chunk; 00112 } 00113 end_chunk = end_chunk->next; 00114 end_pos = 0; 00115 } 00116 00117 // Removes element from the back end of the queue. In other words 00118 // it rollbacks last push to the queue. Take care: Caller is 00119 // responsible for destroying the object being unpushed. 00120 // The caller must also guarantee that the queue isn't empty when 00121 // unpush is called. It cannot be done automatically as the read 00122 // side of the queue can be managed by different, completely 00123 // unsynchronised thread. 00124 inline void unpush () 00125 { 00126 // First, move 'back' one position backwards. 00127 if (back_pos) 00128 --back_pos; 00129 else { 00130 back_pos = N - 1; 00131 back_chunk = back_chunk->prev; 00132 } 00133 00134 // Now, move 'end' position backwards. Note that obsolete end chunk 00135 // is not used as a spare chunk. The analysis shows that doing so 00136 // would require free and atomic operation per chunk deallocated 00137 // instead of a simple free. 00138 if (end_pos) 00139 --end_pos; 00140 else { 00141 end_pos = N - 1; 00142 end_chunk = end_chunk->prev; 00143 free (end_chunk->next); 00144 end_chunk->next = NULL; 00145 } 00146 } 00147 00148 // Removes an element from the front end of the queue. 00149 inline void pop () 00150 { 00151 if (++ begin_pos == N) { 00152 chunk_t *o = begin_chunk; 00153 begin_chunk = begin_chunk->next; 00154 begin_chunk->prev = NULL; 00155 begin_pos = 0; 00156 00157 // 'o' has been more recently used than spare_chunk, 00158 // so for cache reasons we'll get rid of the spare and 00159 // use 'o' as the spare. 00160 chunk_t *cs = spare_chunk.xchg (o); 00161 if (cs) 00162 free (cs); 00163 } 00164 } 00165 00166 private: 00167 00168 // Individual memory chunk to hold N elements. 00169 struct chunk_t 00170 { 00171 T values [N]; 00172 chunk_t *prev; 00173 chunk_t *next; 00174 }; 00175 00176 // Back position may point to invalid memory if the queue is empty, 00177 // while begin & end positions are always valid. Begin position is 00178 // accessed exclusively be queue reader (front/pop), while back and 00179 // end positions are accessed exclusively by queue writer (back/push). 00180 chunk_t *begin_chunk; 00181 int begin_pos; 00182 chunk_t *back_chunk; 00183 int back_pos; 00184 chunk_t *end_chunk; 00185 int end_pos; 00186 00187 // People are likely to produce and consume at similar rates. In 00188 // this scenario holding onto the most recently freed chunk saves 00189 // us from having to call malloc/free. 00190 atomic_ptr_t<chunk_t> spare_chunk; 00191 00192 // Disable copying of yqueue. 00193 yqueue_t (const yqueue_t&); 00194 const yqueue_t &operator = (const yqueue_t&); 00195 }; 00196 00197 } 00198 00199 #endif