libzmq master
The Intelligent Transport Layer

decoder.hpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #ifndef __ZMQ_DECODER_HPP_INCLUDED__
00023 #define __ZMQ_DECODER_HPP_INCLUDED__
00024 
00025 #include <stddef.h>
00026 #include <string.h>
00027 #include <stdlib.h>
00028 #include <algorithm>
00029 
00030 #include "err.hpp"
00031 #include "msg.hpp"
00032 #include "stdint.hpp"
00033 
00034 namespace zmq
00035 {
00036 
00037     //  Helper base class for decoders that know the amount of data to read
00038     //  in advance at any moment. Knowing the amount in advance is a property
00039     //  of the protocol used. 0MQ framing protocol is based size-prefixed
00040     //  paradigm, whixh qualifies it to be parsed by this class.
00041     //  On the other hand, XML-based transports (like XMPP or SOAP) don't allow
00042     //  for knowing the size of data to read in advance and should use different
00043     //  decoding algorithms.
00044     //
00045     //  This class implements the state machine that parses the incoming buffer.
00046     //  Derived class should implement individual state machine actions.
00047 
00048     template <typename T> class decoder_base_t
00049     {
00050     public:
00051 
00052         inline decoder_base_t (size_t bufsize_) :
00053             read_pos (NULL),
00054             to_read (0),
00055             next (NULL),
00056             bufsize (bufsize_)
00057         {
00058             buf = (unsigned char*) malloc (bufsize_);
00059             alloc_assert (buf);
00060         }
00061 
00062         //  The destructor doesn't have to be virtual. It is mad virtual
00063         //  just to keep ICC and code checking tools from complaining.
00064         inline virtual ~decoder_base_t ()
00065         {
00066             free (buf);
00067         }
00068 
00069         //  Returns a buffer to be filled with binary data.
00070         inline void get_buffer (unsigned char **data_, size_t *size_)
00071         {
00072             //  If we are expected to read large message, we'll opt for zero-
00073             //  copy, i.e. we'll ask caller to fill the data directly to the
00074             //  message. Note that subsequent read(s) are non-blocking, thus
00075             //  each single read reads at most SO_RCVBUF bytes at once not
00076             //  depending on how large is the chunk returned from here.
00077             //  As a consequence, large messages being received won't block
00078             //  other engines running in the same I/O thread for excessive
00079             //  amounts of time.
00080             if (to_read >= bufsize) {
00081                 *data_ = read_pos;
00082                 *size_ = to_read;
00083                 return;
00084             }
00085 
00086             *data_ = buf;
00087             *size_ = bufsize;
00088         }
00089 
00090         //  Processes the data in the buffer previously allocated using
00091         //  get_buffer function. size_ argument specifies nemuber of bytes
00092         //  actually filled into the buffer. Function returns number of
00093         //  bytes actually processed.
00094         inline size_t process_buffer (unsigned char *data_, size_t size_)
00095         {
00096             //  Check if we had an error in previous attempt.
00097             if (unlikely (!(static_cast <T*> (this)->next)))
00098                 return (size_t) -1;
00099 
00100             //  In case of zero-copy simply adjust the pointers, no copying
00101             //  is required. Also, run the state machine in case all the data
00102             //  were processed.
00103             if (data_ == read_pos) {
00104                 read_pos += size_;
00105                 to_read -= size_;
00106 
00107                 while (!to_read) {
00108                     if (!(static_cast <T*> (this)->*next) ()) {
00109                         if (unlikely (!(static_cast <T*> (this)->next)))
00110                             return (size_t) -1;
00111                         return size_;
00112                     }
00113                 }
00114                 return size_;
00115             }
00116 
00117             size_t pos = 0;
00118             while (true) {
00119 
00120                 //  Try to get more space in the message to fill in.
00121                 //  If none is available, return.
00122                 while (!to_read) {
00123                     if (!(static_cast <T*> (this)->*next) ()) {
00124                         if (unlikely (!(static_cast <T*> (this)->next)))
00125                             return (size_t) -1;
00126                         return pos;
00127                     }
00128                 }
00129 
00130                 //  If there are no more data in the buffer, return.
00131                 if (pos == size_)
00132                     return pos;
00133 
00134                 //  Copy the data from buffer to the message.
00135                 size_t to_copy = std::min (to_read, size_ - pos);
00136                 memcpy (read_pos, data_ + pos, to_copy);
00137                 read_pos += to_copy;
00138                 pos += to_copy;
00139                 to_read -= to_copy;
00140             }
00141         }
00142 
00143     protected:
00144 
00145         //  Prototype of state machine action. Action should return false if
00146         //  it is unable to push the data to the system.
00147         typedef bool (T::*step_t) ();
00148 
00149         //  This function should be called from derived class to read data
00150         //  from the buffer and schedule next state machine action.
00151         inline void next_step (void *read_pos_, size_t to_read_,
00152             step_t next_)
00153         {
00154             read_pos = (unsigned char*) read_pos_;
00155             to_read = to_read_;
00156             next = next_;
00157         }
00158 
00159         //  This function should be called from the derived class to
00160         //  abort decoder state machine.
00161         inline void decoding_error ()
00162         {
00163             next = NULL;
00164         }
00165 
00166     private:
00167 
00168         //  Where to store the read data.
00169         unsigned char *read_pos;
00170 
00171         //  How much data to read before taking next step.
00172         size_t to_read;
00173 
00174         //  Next step. If set to NULL, it means that associated data stream
00175         //  is dead. Note that there can be still data in the process in such
00176         //  case.
00177         step_t next;
00178 
00179         //  The duffer for data to decode.
00180         size_t bufsize;
00181         unsigned char *buf;
00182 
00183         decoder_base_t (const decoder_base_t&);
00184         const decoder_base_t &operator = (const decoder_base_t&);
00185     };
00186 
00187     //  Decoder for 0MQ framing protocol. Converts data batches into messages.
00188 
00189     class decoder_t : public decoder_base_t <decoder_t>
00190     {
00191     public:
00192 
00193         decoder_t (size_t bufsize_, int64_t maxmsgsize_);
00194         ~decoder_t ();
00195 
00196         void set_session (class session_base_t *session_);
00197 
00198     private:
00199 
00200         bool one_byte_size_ready ();
00201         bool eight_byte_size_ready ();
00202         bool flags_ready ();
00203         bool message_ready ();
00204 
00205         class session_base_t *session;
00206         unsigned char tmpbuf [8];
00207         msg_t in_progress;
00208 
00209         int64_t maxmsgsize;
00210 
00211         decoder_t (const decoder_t&);
00212         void operator = (const decoder_t&);
00213     };
00214 
00215 }
00216 
00217 #endif
00218 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines