![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #ifndef __ZMQ_DECODER_HPP_INCLUDED__ 00023 #define __ZMQ_DECODER_HPP_INCLUDED__ 00024 00025 #include <stddef.h> 00026 #include <string.h> 00027 #include <stdlib.h> 00028 #include <algorithm> 00029 00030 #include "err.hpp" 00031 #include "msg.hpp" 00032 #include "stdint.hpp" 00033 00034 namespace zmq 00035 { 00036 00037 // Helper base class for decoders that know the amount of data to read 00038 // in advance at any moment. Knowing the amount in advance is a property 00039 // of the protocol used. 0MQ framing protocol is based size-prefixed 00040 // paradigm, whixh qualifies it to be parsed by this class. 00041 // On the other hand, XML-based transports (like XMPP or SOAP) don't allow 00042 // for knowing the size of data to read in advance and should use different 00043 // decoding algorithms. 00044 // 00045 // This class implements the state machine that parses the incoming buffer. 00046 // Derived class should implement individual state machine actions. 00047 00048 template <typename T> class decoder_base_t 00049 { 00050 public: 00051 00052 inline decoder_base_t (size_t bufsize_) : 00053 read_pos (NULL), 00054 to_read (0), 00055 next (NULL), 00056 bufsize (bufsize_) 00057 { 00058 buf = (unsigned char*) malloc (bufsize_); 00059 alloc_assert (buf); 00060 } 00061 00062 // The destructor doesn't have to be virtual. It is mad virtual 00063 // just to keep ICC and code checking tools from complaining. 00064 inline virtual ~decoder_base_t () 00065 { 00066 free (buf); 00067 } 00068 00069 // Returns a buffer to be filled with binary data. 00070 inline void get_buffer (unsigned char **data_, size_t *size_) 00071 { 00072 // If we are expected to read large message, we'll opt for zero- 00073 // copy, i.e. we'll ask caller to fill the data directly to the 00074 // message. Note that subsequent read(s) are non-blocking, thus 00075 // each single read reads at most SO_RCVBUF bytes at once not 00076 // depending on how large is the chunk returned from here. 00077 // As a consequence, large messages being received won't block 00078 // other engines running in the same I/O thread for excessive 00079 // amounts of time. 00080 if (to_read >= bufsize) { 00081 *data_ = read_pos; 00082 *size_ = to_read; 00083 return; 00084 } 00085 00086 *data_ = buf; 00087 *size_ = bufsize; 00088 } 00089 00090 // Processes the data in the buffer previously allocated using 00091 // get_buffer function. size_ argument specifies nemuber of bytes 00092 // actually filled into the buffer. Function returns number of 00093 // bytes actually processed. 00094 inline size_t process_buffer (unsigned char *data_, size_t size_) 00095 { 00096 // Check if we had an error in previous attempt. 00097 if (unlikely (!(static_cast <T*> (this)->next))) 00098 return (size_t) -1; 00099 00100 // In case of zero-copy simply adjust the pointers, no copying 00101 // is required. Also, run the state machine in case all the data 00102 // were processed. 00103 if (data_ == read_pos) { 00104 read_pos += size_; 00105 to_read -= size_; 00106 00107 while (!to_read) { 00108 if (!(static_cast <T*> (this)->*next) ()) { 00109 if (unlikely (!(static_cast <T*> (this)->next))) 00110 return (size_t) -1; 00111 return size_; 00112 } 00113 } 00114 return size_; 00115 } 00116 00117 size_t pos = 0; 00118 while (true) { 00119 00120 // Try to get more space in the message to fill in. 00121 // If none is available, return. 00122 while (!to_read) { 00123 if (!(static_cast <T*> (this)->*next) ()) { 00124 if (unlikely (!(static_cast <T*> (this)->next))) 00125 return (size_t) -1; 00126 return pos; 00127 } 00128 } 00129 00130 // If there are no more data in the buffer, return. 00131 if (pos == size_) 00132 return pos; 00133 00134 // Copy the data from buffer to the message. 00135 size_t to_copy = std::min (to_read, size_ - pos); 00136 memcpy (read_pos, data_ + pos, to_copy); 00137 read_pos += to_copy; 00138 pos += to_copy; 00139 to_read -= to_copy; 00140 } 00141 } 00142 00143 protected: 00144 00145 // Prototype of state machine action. Action should return false if 00146 // it is unable to push the data to the system. 00147 typedef bool (T::*step_t) (); 00148 00149 // This function should be called from derived class to read data 00150 // from the buffer and schedule next state machine action. 00151 inline void next_step (void *read_pos_, size_t to_read_, 00152 step_t next_) 00153 { 00154 read_pos = (unsigned char*) read_pos_; 00155 to_read = to_read_; 00156 next = next_; 00157 } 00158 00159 // This function should be called from the derived class to 00160 // abort decoder state machine. 00161 inline void decoding_error () 00162 { 00163 next = NULL; 00164 } 00165 00166 private: 00167 00168 // Where to store the read data. 00169 unsigned char *read_pos; 00170 00171 // How much data to read before taking next step. 00172 size_t to_read; 00173 00174 // Next step. If set to NULL, it means that associated data stream 00175 // is dead. Note that there can be still data in the process in such 00176 // case. 00177 step_t next; 00178 00179 // The duffer for data to decode. 00180 size_t bufsize; 00181 unsigned char *buf; 00182 00183 decoder_base_t (const decoder_base_t&); 00184 const decoder_base_t &operator = (const decoder_base_t&); 00185 }; 00186 00187 // Decoder for 0MQ framing protocol. Converts data batches into messages. 00188 00189 class decoder_t : public decoder_base_t <decoder_t> 00190 { 00191 public: 00192 00193 decoder_t (size_t bufsize_, int64_t maxmsgsize_); 00194 ~decoder_t (); 00195 00196 void set_session (class session_base_t *session_); 00197 00198 private: 00199 00200 bool one_byte_size_ready (); 00201 bool eight_byte_size_ready (); 00202 bool flags_ready (); 00203 bool message_ready (); 00204 00205 class session_base_t *session; 00206 unsigned char tmpbuf [8]; 00207 msg_t in_progress; 00208 00209 int64_t maxmsgsize; 00210 00211 decoder_t (const decoder_t&); 00212 void operator = (const decoder_t&); 00213 }; 00214 00215 } 00216 00217 #endif 00218