libzmq master
The Intelligent Transport Layer

decoder.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include <stdlib.h>
00023 #include <string.h>
00024 
00025 #include "decoder.hpp"
00026 #include "session_base.hpp"
00027 #include "likely.hpp"
00028 #include "wire.hpp"
00029 #include "err.hpp"
00030 
00031 zmq::decoder_t::decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
00032     decoder_base_t <decoder_t> (bufsize_),
00033     session (NULL),
00034     maxmsgsize (maxmsgsize_)
00035 {
00036     int rc = in_progress.init ();
00037     errno_assert (rc == 0);
00038 
00039     //  At the beginning, read one byte and go to one_byte_size_ready state.
00040     next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
00041 }
00042 
00043 zmq::decoder_t::~decoder_t ()
00044 {
00045     int rc = in_progress.close ();
00046     errno_assert (rc == 0);
00047 }
00048 
00049 void zmq::decoder_t::set_session (session_base_t *session_)
00050 {
00051     session = session_;
00052 }
00053 
00054 bool zmq::decoder_t::one_byte_size_ready ()
00055 {
00056     //  First byte of size is read. If it is 0xff read 8-byte size.
00057     //  Otherwise allocate the buffer for message data and read the
00058     //  message data into it.
00059     if (*tmpbuf == 0xff)
00060         next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready);
00061     else {
00062 
00063         //  There has to be at least one byte (the flags) in the message).
00064         if (!*tmpbuf) {
00065             decoding_error ();
00066             return false;
00067         }
00068 
00069         //  in_progress is initialised at this point so in theory we should
00070         //  close it before calling zmq_msg_init_size, however, it's a 0-byte
00071         //  message and thus we can treat it as uninitialised...
00072         int rc;
00073         if (maxmsgsize >= 0 && (int64_t) (*tmpbuf - 1) > maxmsgsize) {
00074             rc = -1;
00075             errno = ENOMEM;
00076         }
00077         else
00078             rc = in_progress.init_size (*tmpbuf - 1);
00079         if (rc != 0 && errno == ENOMEM) {
00080             rc = in_progress.init ();
00081             errno_assert (rc == 0);
00082             decoding_error ();
00083             return false;
00084         }
00085         errno_assert (rc == 0);
00086 
00087         next_step (tmpbuf, 1, &decoder_t::flags_ready);
00088     }
00089     return true;
00090 }
00091 
00092 bool zmq::decoder_t::eight_byte_size_ready ()
00093 {
00094     //  8-byte size is read. Allocate the buffer for message body and
00095     //  read the message data into it.
00096     size_t size = (size_t) get_uint64 (tmpbuf);
00097 
00098     //  There has to be at least one byte (the flags) in the message).
00099     if (!size) {
00100         decoding_error ();
00101         return false;
00102     }
00103 
00104     //  in_progress is initialised at this point so in theory we should
00105     //  close it before calling zmq_msg_init_size, however, it's a 0-byte
00106     //  message and thus we can treat it as uninitialised...
00107     int rc;
00108     if (maxmsgsize >= 0 && (int64_t) (size - 1) > maxmsgsize) {
00109         rc = -1;
00110         errno = ENOMEM;
00111     }
00112     else
00113         rc = in_progress.init_size (size - 1);
00114     if (rc != 0 && errno == ENOMEM) {
00115         rc = in_progress.init ();
00116         errno_assert (rc == 0);
00117         decoding_error ();
00118         return false;
00119     }
00120     errno_assert (rc == 0);
00121 
00122     next_step (tmpbuf, 1, &decoder_t::flags_ready);
00123     return true;
00124 }
00125 
00126 bool zmq::decoder_t::flags_ready ()
00127 {
00128     //  Store the flags from the wire into the message structure.
00129     in_progress.set_flags (tmpbuf [0]);
00130 
00131     next_step (in_progress.data (), in_progress.size (),
00132         &decoder_t::message_ready);
00133     
00134     return true;
00135 }
00136 
00137 bool zmq::decoder_t::message_ready ()
00138 {
00139     //  Message is completely read. Push it further and start reading
00140     //  new message. (in_progress is a 0-byte message after this point.)
00141     if (unlikely (!session))
00142         return false;
00143     int rc = session->write (&in_progress);
00144     if (unlikely (rc != 0)) {
00145         if (errno != EAGAIN)
00146             decoding_error ();
00147         return false;
00148     }
00149 
00150     next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
00151     return true;
00152 }
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines