![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #ifndef __ZMQ_ENCODER_HPP_INCLUDED__ 00023 #define __ZMQ_ENCODER_HPP_INCLUDED__ 00024 00025 #include <stddef.h> 00026 #include <string.h> 00027 #include <stdlib.h> 00028 #include <algorithm> 00029 00030 #include "err.hpp" 00031 #include "msg.hpp" 00032 00033 namespace zmq 00034 { 00035 00036 // Helper base class for encoders. It implements the state machine that 00037 // fills the outgoing buffer. Derived classes should implement individual 00038 // state machine actions. 00039 00040 template <typename T> class encoder_base_t 00041 { 00042 public: 00043 00044 inline encoder_base_t (size_t bufsize_) : 00045 bufsize (bufsize_) 00046 { 00047 buf = (unsigned char*) malloc (bufsize_); 00048 alloc_assert (buf); 00049 } 00050 00051 // The destructor doesn't have to be virtual. It is made virtual 00052 // just to keep ICC and code checking tools from complaining. 00053 inline virtual ~encoder_base_t () 00054 { 00055 free (buf); 00056 } 00057 00058 // The function returns a batch of binary data. The data 00059 // are filled to a supplied buffer. If no buffer is supplied (data_ 00060 // points to NULL) decoder object will provide buffer of its own. 00061 // If offset is not NULL, it is filled by offset of the first message 00062 // in the batch.If there's no beginning of a message in the batch, 00063 // offset is set to -1. 00064 inline void get_data (unsigned char **data_, size_t *size_, 00065 int *offset_ = NULL) 00066 { 00067 unsigned char *buffer = !*data_ ? buf : *data_; 00068 size_t buffersize = !*data_ ? bufsize : *size_; 00069 00070 size_t pos = 0; 00071 if (offset_) 00072 *offset_ = -1; 00073 00074 while (true) { 00075 00076 // If there are no more data to return, run the state machine. 00077 // If there are still no data, return what we already have 00078 // in the buffer. 00079 if (!to_write) { 00080 if (!(static_cast <T*> (this)->*next) ()) { 00081 *data_ = buffer; 00082 *size_ = pos; 00083 return; 00084 } 00085 00086 // If beginning of the message was processed, adjust the 00087 // first-message-offset. 00088 if (beginning) { 00089 if (offset_ && *offset_ == -1) 00090 *offset_ = (int) pos; 00091 beginning = false; 00092 } 00093 } 00094 00095 // If there are no data in the buffer yet and we are able to 00096 // fill whole buffer in a single go, let's use zero-copy. 00097 // There's no disadvantage to it as we cannot stuck multiple 00098 // messages into the buffer anyway. Note that subsequent 00099 // write(s) are non-blocking, thus each single write writes 00100 // at most SO_SNDBUF bytes at once not depending on how large 00101 // is the chunk returned from here. 00102 // As a consequence, large messages being sent won't block 00103 // other engines running in the same I/O thread for excessive 00104 // amounts of time. 00105 if (!pos && !*data_ && to_write >= buffersize) { 00106 *data_ = write_pos; 00107 *size_ = to_write; 00108 write_pos = NULL; 00109 to_write = 0; 00110 return; 00111 } 00112 00113 // Copy data to the buffer. If the buffer is full, return. 00114 size_t to_copy = std::min (to_write, buffersize - pos); 00115 memcpy (buffer + pos, write_pos, to_copy); 00116 pos += to_copy; 00117 write_pos += to_copy; 00118 to_write -= to_copy; 00119 if (pos == buffersize) { 00120 *data_ = buffer; 00121 *size_ = pos; 00122 return; 00123 } 00124 } 00125 } 00126 00127 protected: 00128 00129 // Prototype of state machine action. 00130 typedef bool (T::*step_t) (); 00131 00132 // This function should be called from derived class to write the data 00133 // to the buffer and schedule next state machine action. Set beginning 00134 // to true when you are writing first byte of a message. 00135 inline void next_step (void *write_pos_, size_t to_write_, 00136 step_t next_, bool beginning_) 00137 { 00138 write_pos = (unsigned char*) write_pos_; 00139 to_write = to_write_; 00140 next = next_; 00141 beginning = beginning_; 00142 } 00143 00144 private: 00145 00146 // Where to get the data to write from. 00147 unsigned char *write_pos; 00148 00149 // How much data to write before next step should be executed. 00150 size_t to_write; 00151 00152 // Next step. If set to NULL, it means that associated data stream 00153 // is dead. 00154 step_t next; 00155 00156 // If true, first byte of the message is being written. 00157 bool beginning; 00158 00159 // The buffer for encoded data. 00160 size_t bufsize; 00161 unsigned char *buf; 00162 00163 encoder_base_t (const encoder_base_t&); 00164 void operator = (const encoder_base_t&); 00165 }; 00166 00167 // Encoder for 0MQ framing protocol. Converts messages into data batches. 00168 00169 class encoder_t : public encoder_base_t <encoder_t> 00170 { 00171 public: 00172 00173 encoder_t (size_t bufsize_); 00174 ~encoder_t (); 00175 00176 void set_session (class session_base_t *session_); 00177 00178 private: 00179 00180 bool size_ready (); 00181 bool message_ready (); 00182 00183 class session_base_t *session; 00184 msg_t in_progress; 00185 unsigned char tmpbuf [10]; 00186 00187 encoder_t (const encoder_t&); 00188 const encoder_t &operator = (const encoder_t&); 00189 }; 00190 } 00191 00192 #endif 00193