![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "msg.hpp" 00023 #include "../include/zmq.h" 00024 00025 #include <string.h> 00026 #include <errno.h> 00027 #include <stdlib.h> 00028 #include <new> 00029 00030 #include "stdint.hpp" 00031 #include "likely.hpp" 00032 #include "err.hpp" 00033 00034 // Check whether the sizes of public representation of the message (zmq_msg_t) 00035 // and private represenation of the message (zmq::msg_t) match. 00036 typedef char zmq_msg_size_check 00037 [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1]; 00038 00039 bool zmq::msg_t::check () 00040 { 00041 return u.base.type >= type_min && u.base.type <= type_max; 00042 } 00043 00044 int zmq::msg_t::init () 00045 { 00046 u.vsm.type = type_vsm; 00047 u.vsm.flags = 0; 00048 u.vsm.size = 0; 00049 return 0; 00050 } 00051 00052 int zmq::msg_t::init_size (size_t size_) 00053 { 00054 if (size_ <= max_vsm_size) { 00055 u.vsm.type = type_vsm; 00056 u.vsm.flags = 0; 00057 u.vsm.size = (unsigned char) size_; 00058 } 00059 else { 00060 u.lmsg.type = type_lmsg; 00061 u.lmsg.flags = 0; 00062 u.lmsg.content = 00063 (content_t*) malloc (sizeof (content_t) + size_); 00064 if (!u.lmsg.content) { 00065 errno = ENOMEM; 00066 return -1; 00067 } 00068 00069 u.lmsg.content->data = u.lmsg.content + 1; 00070 u.lmsg.content->size = size_; 00071 u.lmsg.content->ffn = NULL; 00072 u.lmsg.content->hint = NULL; 00073 new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); 00074 } 00075 return 0; 00076 } 00077 00078 int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, 00079 void *hint_) 00080 { 00081 u.lmsg.type = type_lmsg; 00082 u.lmsg.flags = 0; 00083 u.lmsg.content = (content_t*) malloc (sizeof (content_t)); 00084 if (!u.lmsg.content) { 00085 errno = ENOMEM; 00086 return -1; 00087 } 00088 00089 u.lmsg.content->data = data_; 00090 u.lmsg.content->size = size_; 00091 u.lmsg.content->ffn = ffn_; 00092 u.lmsg.content->hint = hint_; 00093 new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); 00094 return 0; 00095 00096 } 00097 00098 int zmq::msg_t::init_delimiter () 00099 { 00100 u.delimiter.type = type_delimiter; 00101 u.delimiter.flags = 0; 00102 return 0; 00103 } 00104 00105 int zmq::msg_t::close () 00106 { 00107 // Check the validity of the message. 00108 if (unlikely (!check ())) { 00109 errno = EFAULT; 00110 return -1; 00111 } 00112 00113 if (u.base.type == type_lmsg) { 00114 00115 // If the content is not shared, or if it is shared and the reference 00116 // count has dropped to zero, deallocate it. 00117 if (!(u.lmsg.flags & msg_t::shared) || 00118 !u.lmsg.content->refcnt.sub (1)) { 00119 00120 // We used "placement new" operator to initialize the reference 00121 // counter so we call the destructor explicitly now. 00122 u.lmsg.content->refcnt.~atomic_counter_t (); 00123 00124 if (u.lmsg.content->ffn) 00125 u.lmsg.content->ffn (u.lmsg.content->data, 00126 u.lmsg.content->hint); 00127 free (u.lmsg.content); 00128 } 00129 } 00130 00131 // Make the message invalid. 00132 u.base.type = 0; 00133 00134 return 0; 00135 00136 } 00137 00138 int zmq::msg_t::move (msg_t &src_) 00139 { 00140 // Check the validity of the source. 00141 if (unlikely (!src_.check ())) { 00142 errno = EFAULT; 00143 return -1; 00144 } 00145 00146 int rc = close (); 00147 if (unlikely (rc < 0)) 00148 return rc; 00149 00150 *this = src_; 00151 00152 rc = src_.init (); 00153 if (unlikely (rc < 0)) 00154 return rc; 00155 00156 return 0; 00157 } 00158 00159 int zmq::msg_t::copy (msg_t &src_) 00160 { 00161 // Check the validity of the source. 00162 if (unlikely (!src_.check ())) { 00163 errno = EFAULT; 00164 return -1; 00165 } 00166 00167 int rc = close (); 00168 if (unlikely (rc < 0)) 00169 return rc; 00170 00171 if (src_.u.base.type == type_lmsg) { 00172 00173 // One reference is added to shared messages. Non-shared messages 00174 // are turned into shared messages and reference count is set to 2. 00175 if (src_.u.lmsg.flags & msg_t::shared) 00176 src_.u.lmsg.content->refcnt.add (1); 00177 else { 00178 src_.u.lmsg.flags |= msg_t::shared; 00179 src_.u.lmsg.content->refcnt.set (2); 00180 } 00181 } 00182 00183 *this = src_; 00184 00185 return 0; 00186 00187 } 00188 00189 void *zmq::msg_t::data () 00190 { 00191 // Check the validity of the message. 00192 zmq_assert (check ()); 00193 00194 switch (u.base.type) { 00195 case type_vsm: 00196 return u.vsm.data; 00197 case type_lmsg: 00198 return u.lmsg.content->data; 00199 default: 00200 zmq_assert (false); 00201 return NULL; 00202 } 00203 } 00204 00205 size_t zmq::msg_t::size () 00206 { 00207 // Check the validity of the message. 00208 zmq_assert (check ()); 00209 00210 switch (u.base.type) { 00211 case type_vsm: 00212 return u.vsm.size; 00213 case type_lmsg: 00214 return u.lmsg.content->size; 00215 default: 00216 zmq_assert (false); 00217 return 0; 00218 } 00219 } 00220 00221 unsigned char zmq::msg_t::flags () 00222 { 00223 return u.base.flags; 00224 } 00225 00226 void zmq::msg_t::set_flags (unsigned char flags_) 00227 { 00228 u.base.flags |= flags_; 00229 } 00230 00231 void zmq::msg_t::reset_flags (unsigned char flags_) 00232 { 00233 u.base.flags &= ~flags_; 00234 } 00235 00236 bool zmq::msg_t::is_delimiter () 00237 { 00238 return u.base.type == type_delimiter; 00239 } 00240 00241 bool zmq::msg_t::is_vsm () 00242 { 00243 return u.base.type == type_vsm; 00244 } 00245 00246 void zmq::msg_t::add_refs (int refs_) 00247 { 00248 zmq_assert (refs_ >= 0); 00249 00250 // No copies required. 00251 if (!refs_) 00252 return; 00253 00254 // VSMs and delimiters can be copied straight away. The only message type 00255 // that needs special care are long messages. 00256 if (u.base.type == type_lmsg) { 00257 if (u.lmsg.flags & msg_t::shared) 00258 u.lmsg.content->refcnt.add (refs_); 00259 else { 00260 u.lmsg.content->refcnt.set (refs_ + 1); 00261 u.lmsg.flags |= msg_t::shared; 00262 } 00263 } 00264 } 00265 00266 bool zmq::msg_t::rm_refs (int refs_) 00267 { 00268 zmq_assert (refs_ >= 0); 00269 00270 // No copies required. 00271 if (!refs_) 00272 return true; 00273 00274 // If there's only one reference close the message. 00275 if (u.base.type != type_lmsg || !(u.lmsg.flags & msg_t::shared)) { 00276 close (); 00277 return false; 00278 } 00279 00280 // The only message type that needs special care are long messages. 00281 if (!u.lmsg.content->refcnt.sub (refs_)) { 00282 close (); 00283 return false; 00284 } 00285 00286 return true; 00287 } 00288