libzmq master
The Intelligent Transport Layer

xpub.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2010-2011 250bpm s.r.o.
00003     Copyright (c) 2011 VMware, Inc.
00004     Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include <string.h>
00023 
00024 #include "xpub.hpp"
00025 #include "pipe.hpp"
00026 #include "err.hpp"
00027 #include "msg.hpp"
00028 
00029 zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
00030     socket_base_t (parent_, tid_),
00031     more (false)
00032 {
00033     options.type = ZMQ_XPUB;
00034 }
00035 
00036 zmq::xpub_t::~xpub_t ()
00037 {
00038 }
00039 
00040 void zmq::xpub_t::xattach_pipe (pipe_t *pipe_)
00041 {
00042     zmq_assert (pipe_);
00043     dist.attach (pipe_);
00044 
00045     //  The pipe is active when attached. Let's read the subscriptions from
00046     //  it, if any.
00047     xread_activated (pipe_);
00048 }
00049 
00050 void zmq::xpub_t::xread_activated (pipe_t *pipe_)
00051 {
00052     //  There are some subscriptions waiting. Let's process them.
00053     msg_t sub;
00054     sub.init ();
00055     while (true) {
00056 
00057         //  Grab next subscription.
00058         if (!pipe_->read (&sub)) {
00059             sub.close ();
00060             return;
00061         }
00062 
00063         //  Apply the subscription to the trie.
00064         unsigned char *data = (unsigned char*) sub.data ();
00065         size_t size = sub.size ();
00066         zmq_assert (size > 0 && (*data == 0 || *data == 1));
00067         bool unique;
00068         if (*data == 0)
00069             unique = subscriptions.rm (data + 1, size - 1, pipe_);
00070         else
00071             unique = subscriptions.add (data + 1, size - 1, pipe_);
00072 
00073         //  If the subscription is not a duplicate store it so that it can be
00074         //  passed to used on next recv call.
00075         if (unique && options.type != ZMQ_PUB)
00076             pending.push_back (blob_t ((unsigned char*) sub.data (),
00077                 sub.size ()));
00078     }
00079 }
00080 
00081 void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
00082 {
00083     dist.activated (pipe_);
00084 }
00085 
00086 void zmq::xpub_t::xterminated (pipe_t *pipe_)
00087 {
00088     //  Remove the pipe from the trie. If there are topics that nobody
00089     //  is interested in anymore, send corresponding unsubscriptions
00090     //  upstream.
00091     subscriptions.rm (pipe_, send_unsubscription, this);
00092 
00093     dist.terminated (pipe_);
00094 }
00095 
00096 void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
00097 {
00098     xpub_t *self = (xpub_t*) arg_;
00099     self->dist.match (pipe_);
00100 }
00101 
00102 int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
00103 {
00104     bool msg_more = msg_->flags () & msg_t::more ? true : false;
00105 
00106     //  For the first part of multi-part message, find the matching pipes.
00107     if (!more)
00108         subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
00109             mark_as_matching, this);
00110 
00111     //  Send the message to all the pipes that were marked as matching
00112     //  in the previous step.
00113     int rc = dist.send_to_matching (msg_, flags_);
00114     if (rc != 0)
00115         return rc;
00116 
00117     //  If we are at the end of multi-part message we can mark all the pipes
00118     //  as non-matching.
00119     if (!msg_more)
00120         dist.unmatch ();
00121 
00122     more = msg_more;
00123 
00124     return 0;
00125 }
00126 
00127 bool zmq::xpub_t::xhas_out ()
00128 {
00129     return dist.has_out ();
00130 }
00131 
00132 int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
00133 {
00134     //  If there is at least one 
00135     if (pending.empty ()) {
00136         errno = EAGAIN;
00137         return -1;
00138     }
00139 
00140     int rc = msg_->close ();
00141     errno_assert (rc == 0);
00142     rc = msg_->init_size (pending.front ().size ());
00143     errno_assert (rc == 0);
00144     memcpy (msg_->data (), pending.front ().data (),
00145         pending.front ().size ());
00146     pending.pop_front ();
00147     return 0;
00148 }
00149 
00150 bool zmq::xpub_t::xhas_in ()
00151 {
00152     return !pending.empty ();
00153 }
00154 
00155 void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
00156     void *arg_)
00157 {
00158     xpub_t *self = (xpub_t*) arg_;
00159 
00160     if (self->options.type != ZMQ_PUB) {
00161 
00162         //  Place the unsubscription to the queue of pending (un)sunscriptions
00163         //  to be retrived by the user later on.
00164         xpub_t *self = (xpub_t*) arg_;
00165         blob_t unsub (size_ + 1, 0);
00166         unsub [0] = 0;
00167         memcpy (&unsub [1], data_, size_);
00168         self->pending.push_back (unsub);
00169     }
00170 }
00171 
00172 zmq::xpub_session_t::xpub_session_t (io_thread_t *io_thread_, bool connect_,
00173       socket_base_t *socket_, const options_t &options_,
00174       const char *protocol_, const char *address_) :
00175     session_base_t (io_thread_, connect_, socket_, options_, protocol_,
00176         address_)
00177 {
00178 }
00179 
00180 zmq::xpub_session_t::~xpub_session_t ()
00181 {
00182 }
00183 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines