libzmq master
The Intelligent Transport Layer

fq.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2011 VMware, Inc.
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #include "fq.hpp"
00024 #include "pipe.hpp"
00025 #include "err.hpp"
00026 #include "msg.hpp"
00027 
00028 zmq::fq_t::fq_t () :
00029     active (0),
00030     current (0),
00031     more (false)
00032 {
00033 }
00034 
00035 zmq::fq_t::~fq_t ()
00036 {
00037     zmq_assert (pipes.empty ());
00038 }
00039 
00040 void zmq::fq_t::attach (pipe_t *pipe_)
00041 {
00042     pipes.push_back (pipe_);
00043     pipes.swap (active, pipes.size () - 1);
00044     active++;
00045 }
00046 
00047 void zmq::fq_t::terminated (pipe_t *pipe_)
00048 {
00049     //  Remove the pipe from the list; adjust number of active pipes
00050     //  accordingly.
00051     if (pipes.index (pipe_) < active) {
00052         active--;
00053         if (current == active)
00054             current = 0;
00055     }
00056     pipes.erase (pipe_);
00057 }
00058 
00059 void zmq::fq_t::activated (pipe_t *pipe_)
00060 {
00061     //  Move the pipe to the list of active pipes.
00062     pipes.swap (pipes.index (pipe_), active);
00063     active++;
00064 }
00065 
00066 int zmq::fq_t::recv (msg_t *msg_, int flags_)
00067 {
00068     return recvpipe (msg_, flags_, NULL);
00069 }
00070 
00071 int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
00072 {
00073     //  Deallocate old content of the message.
00074     int rc = msg_->close ();
00075     errno_assert (rc == 0);
00076 
00077     //  Round-robin over the pipes to get the next message.
00078     for (pipes_t::size_type count = active; count != 0; count--) {
00079 
00080         //  Try to fetch new message. If we've already read part of the message
00081         //  subsequent part should be immediately available.
00082         bool fetched = pipes [current]->read (msg_);
00083 
00084         //  Check the atomicity of the message. If we've already received the
00085         //  first part of the message we should get the remaining parts
00086         //  without blocking.
00087         zmq_assert (!(more && !fetched));
00088 
00089         //  Note that when message is not fetched, current pipe is deactivated
00090         //  and replaced by another active pipe. Thus we don't have to increase
00091         //  the 'current' pointer.
00092         if (fetched) {
00093             if (pipe_)
00094                 *pipe_ = pipes [current];
00095             more =
00096                 msg_->flags () & msg_t::more ? true : false;
00097             if (!more) {
00098                 current++;
00099                 if (current >= active)
00100                     current = 0;
00101             }
00102             return 0;
00103         }
00104         else {
00105             active--;
00106             pipes.swap (current, active);
00107             if (current == active)
00108                 current = 0;
00109         }
00110     }
00111 
00112     //  No message is available. Initialise the output parameter
00113     //  to be a 0-byte message.
00114     rc = msg_->init ();
00115     errno_assert (rc == 0);
00116     errno = EAGAIN;
00117     return -1;
00118 }
00119 
00120 bool zmq::fq_t::has_in ()
00121 {
00122     //  There are subsequent parts of the partly-read message available.
00123     if (more)
00124         return true;
00125 
00126     //  Note that messing with current doesn't break the fairness of fair
00127     //  queueing algorithm. If there are no messages available current will
00128     //  get back to its original value. Otherwise it'll point to the first
00129     //  pipe holding messages, skipping only pipes with no messages available.
00130     for (pipes_t::size_type count = active; count != 0; count--) {
00131         if (pipes [current]->check_read ())
00132             return true;
00133 
00134         //  Deactivate the pipe.
00135         active--;
00136         pipes.swap (current, active);
00137         if (current == active)
00138             current = 0;
00139     }
00140 
00141     return false;
00142 }
00143 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines