libzmq master
The Intelligent Transport Layer

xsub.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2010-2011 250bpm s.r.o.
00003     Copyright (c) 2011 VMware, Inc.
00004     Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include <string.h>
00023 
00024 #include "xsub.hpp"
00025 #include "err.hpp"
00026 
00027 zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
00028     socket_base_t (parent_, tid_),
00029     has_message (false),
00030     more (false)
00031 {
00032     options.type = ZMQ_XSUB;
00033     int rc = message.init ();
00034     errno_assert (rc == 0);
00035 }
00036 
00037 zmq::xsub_t::~xsub_t ()
00038 {
00039     int rc = message.close ();
00040     errno_assert (rc == 0);
00041 }
00042 
00043 void zmq::xsub_t::xattach_pipe (pipe_t *pipe_)
00044 {
00045     zmq_assert (pipe_);
00046     fq.attach (pipe_);
00047     dist.attach (pipe_);
00048 
00049     //  Send all the cached subscriptions to the new upstream peer.
00050     subscriptions.apply (send_subscription, pipe_);
00051     pipe_->flush ();
00052 }
00053 
00054 void zmq::xsub_t::xread_activated (pipe_t *pipe_)
00055 {
00056     fq.activated (pipe_);
00057 }
00058 
00059 void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
00060 {
00061     dist.activated (pipe_);
00062 }
00063 
00064 void zmq::xsub_t::xterminated (pipe_t *pipe_)
00065 {
00066     fq.terminated (pipe_);
00067     dist.terminated (pipe_);
00068 }
00069 
00070 void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
00071 {
00072     //  Send all the cached subscriptions to the hiccuped pipe.
00073     subscriptions.apply (send_subscription, pipe_);
00074     pipe_->flush ();
00075 }
00076 
00077 int zmq::xsub_t::xsend (msg_t *msg_, int flags_)
00078 {
00079     size_t size = msg_->size ();
00080     unsigned char *data = (unsigned char*) msg_->data ();
00081 
00082     // Malformed subscriptions.
00083     if (size < 1 || (*data != 0 && *data != 1)) {
00084         errno = EINVAL;
00085         return -1;
00086     }
00087 
00088     // Process the subscription.
00089     if (*data == 1) {
00090         if (subscriptions.add (data + 1, size - 1))
00091             return dist.send_to_all (msg_, flags_);
00092         else
00093             return 0;
00094     }
00095     else if (*data == 0) {
00096         if (subscriptions.rm (data + 1, size - 1))
00097             return dist.send_to_all (msg_, flags_);
00098         else
00099             return 0;
00100     }
00101 
00102     zmq_assert (false);
00103     return -1;
00104 }
00105 
00106 bool zmq::xsub_t::xhas_out ()
00107 {
00108     //  Subscription can be added/removed anytime.
00109     return true;
00110 }
00111 
00112 int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
00113 {
00114     //  If there's already a message prepared by a previous call to zmq_poll,
00115     //  return it straight ahead.
00116     if (has_message) {
00117         int rc = msg_->move (message);
00118         errno_assert (rc == 0);
00119         has_message = false;
00120         more = msg_->flags () & msg_t::more ? true : false;
00121         return 0;
00122     }
00123 
00124     //  TODO: This can result in infinite loop in the case of continuous
00125     //  stream of non-matching messages which breaks the non-blocking recv
00126     //  semantics.
00127     while (true) {
00128 
00129         //  Get a message using fair queueing algorithm.
00130         int rc = fq.recv (msg_, flags_);
00131 
00132         //  If there's no message available, return immediately.
00133         //  The same when error occurs.
00134         if (rc != 0)
00135             return -1;
00136 
00137         //  Check whether the message matches at least one subscription.
00138         //  Non-initial parts of the message are passed 
00139         if (more || !options.filter || match (msg_)) {
00140             more = msg_->flags () & msg_t::more ? true : false;
00141             return 0;
00142         }
00143 
00144         //  Message doesn't match. Pop any remaining parts of the message
00145         //  from the pipe.
00146         while (msg_->flags () & msg_t::more) {
00147             rc = fq.recv (msg_, ZMQ_DONTWAIT);
00148             zmq_assert (rc == 0);
00149         }
00150     }
00151 }
00152 
00153 bool zmq::xsub_t::xhas_in ()
00154 {
00155     //  There are subsequent parts of the partly-read message available.
00156     if (more)
00157         return true;
00158 
00159     //  If there's already a message prepared by a previous call to zmq_poll,
00160     //  return straight ahead.
00161     if (has_message)
00162         return true;
00163 
00164     //  TODO: This can result in infinite loop in the case of continuous
00165     //  stream of non-matching messages.
00166     while (true) {
00167 
00168         //  Get a message using fair queueing algorithm.
00169         int rc = fq.recv (&message, ZMQ_DONTWAIT);
00170 
00171         //  If there's no message available, return immediately.
00172         //  The same when error occurs.
00173         if (rc != 0) {
00174             zmq_assert (errno == EAGAIN);
00175             return false;
00176         }
00177 
00178         //  Check whether the message matches at least one subscription.
00179         if (!options.filter || match (&message)) {
00180             has_message = true;
00181             return true;
00182         }
00183 
00184         //  Message doesn't match. Pop any remaining parts of the message
00185         //  from the pipe.
00186         while (message.flags () & msg_t::more) {
00187             rc = fq.recv (&message, ZMQ_DONTWAIT);
00188             zmq_assert (rc == 0);
00189         }
00190     }
00191 }
00192 
00193 bool zmq::xsub_t::match (msg_t *msg_)
00194 {
00195     return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ());
00196 }
00197 
00198 void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
00199     void *arg_)
00200 {
00201     pipe_t *pipe = (pipe_t*) arg_;
00202 
00203     //  Create the subsctription message.
00204     msg_t msg;
00205     int rc = msg.init_size (size_ + 1);
00206     zmq_assert (rc == 0);
00207     unsigned char *data = (unsigned char*) msg.data ();
00208     data [0] = 1;
00209     memcpy (data + 1, data_, size_);
00210 
00211     //  Send it to the pipe.
00212     bool sent = pipe->write (&msg);
00213     zmq_assert (sent);
00214 }
00215 
00216 zmq::xsub_session_t::xsub_session_t (io_thread_t *io_thread_, bool connect_,
00217       socket_base_t *socket_, const options_t &options_,
00218       const char *protocol_, const char *address_) :
00219     session_base_t (io_thread_, connect_, socket_, options_, protocol_,
00220         address_)
00221 {
00222 }
00223 
00224 zmq::xsub_session_t::~xsub_session_t ()
00225 {
00226 }
00227 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines