![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "encoder.hpp" 00024 #include "session_base.hpp" 00025 #include "likely.hpp" 00026 #include "wire.hpp" 00027 00028 zmq::encoder_t::encoder_t (size_t bufsize_) : 00029 encoder_base_t <encoder_t> (bufsize_), 00030 session (NULL) 00031 { 00032 int rc = in_progress.init (); 00033 errno_assert (rc == 0); 00034 00035 // Write 0 bytes to the batch and go to message_ready state. 00036 next_step (NULL, 0, &encoder_t::message_ready, true); 00037 } 00038 00039 zmq::encoder_t::~encoder_t () 00040 { 00041 int rc = in_progress.close (); 00042 errno_assert (rc == 0); 00043 } 00044 00045 void zmq::encoder_t::set_session (session_base_t *session_) 00046 { 00047 session = session_; 00048 } 00049 00050 bool zmq::encoder_t::size_ready () 00051 { 00052 // Write message body into the buffer. 00053 next_step (in_progress.data (), in_progress.size (), 00054 &encoder_t::message_ready, false); 00055 return true; 00056 } 00057 00058 bool zmq::encoder_t::message_ready () 00059 { 00060 // Destroy content of the old message. 00061 int rc = in_progress.close (); 00062 errno_assert (rc == 0); 00063 00064 // Read new message. If there is none, return false. 00065 // Note that new state is set only if write is successful. That way 00066 // unsuccessful write will cause retry on the next state machine 00067 // invocation. 00068 if (unlikely (!session)) { 00069 rc = in_progress.init (); 00070 errno_assert (rc == 0); 00071 return false; 00072 } 00073 rc = session->read (&in_progress); 00074 if (unlikely (rc != 0)) { 00075 errno_assert (errno == EAGAIN); 00076 rc = in_progress.init (); 00077 errno_assert (rc == 0); 00078 return false; 00079 } 00080 00081 // Get the message size. 00082 size_t size = in_progress.size (); 00083 00084 // Account for the 'flags' byte. 00085 size++; 00086 00087 // For messages less than 255 bytes long, write one byte of message size. 00088 // For longer messages write 0xff escape character followed by 8-byte 00089 // message size. In both cases 'flags' field follows. 00090 if (size < 255) { 00091 tmpbuf [0] = (unsigned char) size; 00092 tmpbuf [1] = (in_progress.flags () & ~msg_t::shared); 00093 next_step (tmpbuf, 2, &encoder_t::size_ready, 00094 !(in_progress.flags () & msg_t::more)); 00095 } 00096 else { 00097 tmpbuf [0] = 0xff; 00098 put_uint64 (tmpbuf + 1, size); 00099 tmpbuf [9] = (in_progress.flags () & ~msg_t::shared); 00100 next_step (tmpbuf, 10, &encoder_t::size_ready, 00101 !(in_progress.flags () & msg_t::more)); 00102 } 00103 return true; 00104 }