![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <stdlib.h> 00023 #include <string.h> 00024 00025 #include "decoder.hpp" 00026 #include "session_base.hpp" 00027 #include "likely.hpp" 00028 #include "wire.hpp" 00029 #include "err.hpp" 00030 00031 zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) : 00032 decoder_base_t <decoder_t> (bufsize_), 00033 session (NULL), 00034 maxmsgsize (maxmsgsize_) 00035 { 00036 int rc = in_progress.init (); 00037 errno_assert (rc == 0); 00038 00039 // At the beginning, read one byte and go to one_byte_size_ready state. 00040 next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); 00041 } 00042 00043 zmq::decoder_t::~decoder_t () 00044 { 00045 int rc = in_progress.close (); 00046 errno_assert (rc == 0); 00047 } 00048 00049 void zmq::decoder_t::set_session (session_base_t *session_) 00050 { 00051 session = session_; 00052 } 00053 00054 bool zmq::decoder_t::one_byte_size_ready () 00055 { 00056 // First byte of size is read. If it is 0xff read 8-byte size. 00057 // Otherwise allocate the buffer for message data and read the 00058 // message data into it. 00059 if (*tmpbuf == 0xff) 00060 next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready); 00061 else { 00062 00063 // There has to be at least one byte (the flags) in the message). 00064 if (!*tmpbuf) { 00065 decoding_error (); 00066 return false; 00067 } 00068 00069 // in_progress is initialised at this point so in theory we should 00070 // close it before calling zmq_msg_init_size, however, it's a 0-byte 00071 // message and thus we can treat it as uninitialised... 00072 int rc; 00073 if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) { 00074 rc = -1; 00075 errno = ENOMEM; 00076 } 00077 else 00078 rc = in_progress.init_size (*tmpbuf - 1); 00079 if (rc != 0 && errno == ENOMEM) { 00080 rc = in_progress.init (); 00081 errno_assert (rc == 0); 00082 decoding_error (); 00083 return false; 00084 } 00085 errno_assert (rc == 0); 00086 00087 next_step (tmpbuf, 1, &decoder_t::flags_ready); 00088 } 00089 return true; 00090 } 00091 00092 bool zmq::decoder_t::eight_byte_size_ready () 00093 { 00094 // 8-byte size is read. Allocate the buffer for message body and 00095 // read the message data into it. 00096 size_t size = (size_t) get_uint64 (tmpbuf); 00097 00098 // There has to be at least one byte (the flags) in the message). 00099 if (!size) { 00100 decoding_error (); 00101 return false; 00102 } 00103 00104 // in_progress is initialised at this point so in theory we should 00105 // close it before calling zmq_msg_init_size, however, it's a 0-byte 00106 // message and thus we can treat it as uninitialised... 00107 int rc; 00108 if (maxmsgsize >= 0 && (int64_t) (size - 1) > maxmsgsize) { 00109 rc = -1; 00110 errno = ENOMEM; 00111 } 00112 else 00113 rc = in_progress.init_size (size - 1); 00114 if (rc != 0 && errno == ENOMEM) { 00115 rc = in_progress.init (); 00116 errno_assert (rc == 0); 00117 decoding_error (); 00118 return false; 00119 } 00120 errno_assert (rc == 0); 00121 00122 next_step (tmpbuf, 1, &decoder_t::flags_ready); 00123 return true; 00124 } 00125 00126 bool zmq::decoder_t::flags_ready () 00127 { 00128 // Store the flags from the wire into the message structure. 00129 in_progress.set_flags (tmpbuf [0]); 00130 00131 next_step (in_progress.data (), in_progress.size (), 00132 &decoder_t::message_ready); 00133 00134 return true; 00135 } 00136 00137 bool zmq::decoder_t::message_ready () 00138 { 00139 // Message is completely read. Push it further and start reading 00140 // new message. (in_progress is a 0-byte message after this point.) 00141 if (unlikely (!session)) 00142 return false; 00143 int rc = session->write (&in_progress); 00144 if (unlikely (rc != 0)) { 00145 if (errno != EAGAIN) 00146 decoding_error (); 00147 return false; 00148 } 00149 00150 next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); 00151 return true; 00152 }