![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #include "fq.hpp" 00024 #include "pipe.hpp" 00025 #include "err.hpp" 00026 #include "msg.hpp" 00027 00028 zmq::fq_t::fq_t () : 00029 active (0), 00030 current (0), 00031 more (false) 00032 { 00033 } 00034 00035 zmq::fq_t::~fq_t () 00036 { 00037 zmq_assert (pipes.empty ()); 00038 } 00039 00040 void zmq::fq_t::attach (pipe_t *pipe_) 00041 { 00042 pipes.push_back (pipe_); 00043 pipes.swap (active, pipes.size () - 1); 00044 active++; 00045 } 00046 00047 void zmq::fq_t::terminated (pipe_t *pipe_) 00048 { 00049 // Remove the pipe from the list; adjust number of active pipes 00050 // accordingly. 00051 if (pipes.index (pipe_) < active) { 00052 active--; 00053 if (current == active) 00054 current = 0; 00055 } 00056 pipes.erase (pipe_); 00057 } 00058 00059 void zmq::fq_t::activated (pipe_t *pipe_) 00060 { 00061 // Move the pipe to the list of active pipes. 00062 pipes.swap (pipes.index (pipe_), active); 00063 active++; 00064 } 00065 00066 int zmq::fq_t::recv (msg_t *msg_, int flags_) 00067 { 00068 return recvpipe (msg_, flags_, NULL); 00069 } 00070 00071 int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_) 00072 { 00073 // Deallocate old content of the message. 00074 int rc = msg_->close (); 00075 errno_assert (rc == 0); 00076 00077 // Round-robin over the pipes to get the next message. 00078 for (pipes_t::size_type count = active; count != 0; count--) { 00079 00080 // Try to fetch new message. If we've already read part of the message 00081 // subsequent part should be immediately available. 00082 bool fetched = pipes [current]->read (msg_); 00083 00084 // Check the atomicity of the message. If we've already received the 00085 // first part of the message we should get the remaining parts 00086 // without blocking. 00087 zmq_assert (!(more && !fetched)); 00088 00089 // Note that when message is not fetched, current pipe is deactivated 00090 // and replaced by another active pipe. Thus we don't have to increase 00091 // the 'current' pointer. 00092 if (fetched) { 00093 if (pipe_) 00094 *pipe_ = pipes [current]; 00095 more = 00096 msg_->flags () & msg_t::more ? true : false; 00097 if (!more) { 00098 current++; 00099 if (current >= active) 00100 current = 0; 00101 } 00102 return 0; 00103 } 00104 else { 00105 active--; 00106 pipes.swap (current, active); 00107 if (current == active) 00108 current = 0; 00109 } 00110 } 00111 00112 // No message is available. Initialise the output parameter 00113 // to be a 0-byte message. 00114 rc = msg_->init (); 00115 errno_assert (rc == 0); 00116 errno = EAGAIN; 00117 return -1; 00118 } 00119 00120 bool zmq::fq_t::has_in () 00121 { 00122 // There are subsequent parts of the partly-read message available. 00123 if (more) 00124 return true; 00125 00126 // Note that messing with current doesn't break the fairness of fair 00127 // queueing algorithm. If there are no messages available current will 00128 // get back to its original value. Otherwise it'll point to the first 00129 // pipe holding messages, skipping only pipes with no messages available. 00130 for (pipes_t::size_type count = active; count != 0; count--) { 00131 if (pipes [current]->check_read ()) 00132 return true; 00133 00134 // Deactivate the pipe. 00135 active--; 00136 pipes.swap (current, active); 00137 if (current == active) 00138 current = 0; 00139 } 00140 00141 return false; 00142 } 00143