libzmq master
The Intelligent Transport Layer

encoder.hpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #ifndef __ZMQ_ENCODER_HPP_INCLUDED__
00023 #define __ZMQ_ENCODER_HPP_INCLUDED__
00024 
00025 #include <stddef.h>
00026 #include <string.h>
00027 #include <stdlib.h>
00028 #include <algorithm>
00029 
00030 #include "err.hpp"
00031 #include "msg.hpp"
00032 
00033 namespace zmq
00034 {
00035 
00036     //  Helper base class for encoders. It implements the state machine that
00037     //  fills the outgoing buffer. Derived classes should implement individual
00038     //  state machine actions.
00039 
00040     template <typename T> class encoder_base_t
00041     {
00042     public:
00043 
00044         inline encoder_base_t (size_t bufsize_) :
00045             bufsize (bufsize_)
00046         {
00047             buf = (unsigned char*) malloc (bufsize_);
00048             alloc_assert (buf);
00049         }
00050 
00051         //  The destructor doesn't have to be virtual. It is made virtual
00052         //  just to keep ICC and code checking tools from complaining.
00053         inline virtual ~encoder_base_t ()
00054         {
00055             free (buf);
00056         }
00057 
00058         //  The function returns a batch of binary data. The data
00059         //  are filled to a supplied buffer. If no buffer is supplied (data_
00060         //  points to NULL) decoder object will provide buffer of its own.
00061         //  If offset is not NULL, it is filled by offset of the first message
00062         //  in the batch.If there's no beginning of a message in the batch,
00063         //  offset is set to -1.
00064         inline void get_data (unsigned char **data_, size_t *size_,
00065             int *offset_ = NULL)
00066         {
00067             unsigned char *buffer = !*data_ ? buf : *data_;
00068             size_t buffersize = !*data_ ? bufsize : *size_;
00069 
00070             size_t pos = 0;
00071             if (offset_)
00072                 *offset_ = -1;
00073 
00074             while (true) {
00075 
00076                 //  If there are no more data to return, run the state machine.
00077                 //  If there are still no data, return what we already have
00078                 //  in the buffer.
00079                 if (!to_write) {
00080                     if (!(static_cast <T*> (this)->*next) ()) {
00081                         *data_ = buffer;
00082                         *size_ = pos;
00083                         return;
00084                     }
00085 
00086                     //  If beginning of the message was processed, adjust the
00087                     //  first-message-offset.
00088                     if (beginning) { 
00089                         if (offset_ && *offset_ == -1)
00090                             *offset_ = (int) pos;
00091                         beginning = false;
00092                     }
00093                 }
00094 
00095                 //  If there are no data in the buffer yet and we are able to
00096                 //  fill whole buffer in a single go, let's use zero-copy.
00097                 //  There's no disadvantage to it as we cannot stuck multiple
00098                 //  messages into the buffer anyway. Note that subsequent
00099                 //  write(s) are non-blocking, thus each single write writes
00100                 //  at most SO_SNDBUF bytes at once not depending on how large
00101                 //  is the chunk returned from here.
00102                 //  As a consequence, large messages being sent won't block
00103                 //  other engines running in the same I/O thread for excessive
00104                 //  amounts of time.
00105                 if (!pos && !*data_ && to_write >= buffersize) {
00106                     *data_ = write_pos;
00107                     *size_ = to_write;
00108                     write_pos = NULL;
00109                     to_write = 0;
00110                     return;
00111                 }
00112 
00113                 //  Copy data to the buffer. If the buffer is full, return.
00114                 size_t to_copy = std::min (to_write, buffersize - pos);
00115                 memcpy (buffer + pos, write_pos, to_copy);
00116                 pos += to_copy;
00117                 write_pos += to_copy;
00118                 to_write -= to_copy;
00119                 if (pos == buffersize) {
00120                     *data_ = buffer;
00121                     *size_ = pos;
00122                     return;
00123                 }
00124             }
00125         }
00126 
00127     protected:
00128 
00129         //  Prototype of state machine action.
00130         typedef bool (T::*step_t) ();
00131 
00132         //  This function should be called from derived class to write the data
00133         //  to the buffer and schedule next state machine action. Set beginning
00134         //  to true when you are writing first byte of a message.
00135         inline void next_step (void *write_pos_, size_t to_write_,
00136             step_t next_, bool beginning_)
00137         {
00138             write_pos = (unsigned char*) write_pos_;
00139             to_write = to_write_;
00140             next = next_;
00141             beginning = beginning_;
00142         }
00143 
00144     private:
00145 
00146         //  Where to get the data to write from.
00147         unsigned char *write_pos;
00148 
00149         //  How much data to write before next step should be executed.
00150         size_t to_write;
00151 
00152         //  Next step. If set to NULL, it means that associated data stream
00153         //  is dead.
00154         step_t next;
00155 
00156         //  If true, first byte of the message is being written.
00157         bool beginning;
00158 
00159         //  The buffer for encoded data.
00160         size_t bufsize;
00161         unsigned char *buf;
00162 
00163         encoder_base_t (const encoder_base_t&);
00164         void operator = (const encoder_base_t&);
00165     };
00166 
00167     //  Encoder for 0MQ framing protocol. Converts messages into data batches.
00168 
00169     class encoder_t : public encoder_base_t <encoder_t>
00170     {
00171     public:
00172 
00173         encoder_t (size_t bufsize_);
00174         ~encoder_t ();
00175 
00176         void set_session (class session_base_t *session_);
00177 
00178     private:
00179 
00180         bool size_ready ();
00181         bool message_ready ();
00182 
00183         class session_base_t *session;
00184         msg_t in_progress;
00185         unsigned char tmpbuf [10];
00186 
00187         encoder_t (const encoder_t&);
00188         const encoder_t &operator = (const encoder_t&);
00189     };
00190 }
00191 
00192 #endif
00193 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines