libzmq master
The Intelligent Transport Layer

object.cpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00005 
00006     This file is part of 0MQ.
00007 
00008     0MQ is free software; you can redistribute it and/or modify it under
00009     the terms of the GNU Lesser General Public License as published by
00010     the Free Software Foundation; either version 3 of the License, or
00011     (at your option) any later version.
00012 
00013     0MQ is distributed in the hope that it will be useful,
00014     but WITHOUT ANY WARRANTY; without even the implied warranty of
00015     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00016     GNU Lesser General Public License for more details.
00017 
00018     You should have received a copy of the GNU Lesser General Public License
00019     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00020 */
00021 
00022 #include <string.h>
00023 #include <stdarg.h>
00024 
00025 #include "object.hpp"
00026 #include "ctx.hpp"
00027 #include "err.hpp"
00028 #include "pipe.hpp"
00029 #include "io_thread.hpp"
00030 #include "session_base.hpp"
00031 #include "socket_base.hpp"
00032 
00033 zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) :
00034     ctx (ctx_),
00035     tid (tid_)
00036 {
00037 }
00038 
00039 zmq::object_t::object_t (object_t *parent_) :
00040     ctx (parent_->ctx),
00041     tid (parent_->tid)
00042 {
00043 }
00044 
00045 zmq::object_t::~object_t ()
00046 {
00047 }
00048 
00049 uint32_t zmq::object_t::get_tid ()
00050 {
00051     return tid;
00052 }
00053 
00054 zmq::ctx_t *zmq::object_t::get_ctx ()
00055 {
00056     return ctx;
00057 }
00058 
00059 void zmq::object_t::process_command (command_t &cmd_)
00060 {
00061     switch (cmd_.type) {
00062 
00063     case command_t::activate_read:
00064         process_activate_read ();
00065         break;
00066 
00067     case command_t::activate_write:
00068         process_activate_write (cmd_.args.activate_write.msgs_read);
00069         break;
00070 
00071     case command_t::stop:
00072         process_stop ();
00073         break;
00074 
00075     case command_t::plug:
00076         process_plug ();
00077         process_seqnum ();
00078         break;
00079 
00080     case command_t::own:
00081         process_own (cmd_.args.own.object);
00082         process_seqnum ();
00083         break;
00084 
00085     case command_t::attach:
00086         process_attach (cmd_.args.attach.engine);
00087         process_seqnum ();
00088         break;
00089 
00090     case command_t::bind:
00091         process_bind (cmd_.args.bind.pipe);
00092         process_seqnum ();
00093         break;
00094 
00095     case command_t::hiccup:
00096         process_hiccup (cmd_.args.hiccup.pipe);
00097         break;
00098 
00099     case command_t::pipe_term:
00100         process_pipe_term ();
00101         break;
00102 
00103     case command_t::pipe_term_ack:
00104         process_pipe_term_ack ();
00105         break;
00106 
00107     case command_t::term_req:
00108         process_term_req (cmd_.args.term_req.object);
00109         break;
00110     
00111     case command_t::term:
00112         process_term (cmd_.args.term.linger);
00113         break;
00114 
00115     case command_t::term_ack:
00116         process_term_ack ();
00117         break;
00118 
00119     case command_t::reap:
00120         process_reap (cmd_.args.reap.socket);
00121         break;
00122 
00123     case command_t::reaped:
00124         process_reaped ();
00125         break;
00126 
00127     default:
00128         zmq_assert (false);
00129     }
00130 }
00131 
00132 int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_)
00133 {
00134     return ctx->register_endpoint (addr_, endpoint_);
00135 }
00136 
00137 void zmq::object_t::unregister_endpoints (socket_base_t *socket_)
00138 {
00139     return ctx->unregister_endpoints (socket_);
00140 }
00141 
00142 zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
00143 {
00144     return ctx->find_endpoint (addr_);
00145 }
00146 
00147 void zmq::object_t::destroy_socket (socket_base_t *socket_)
00148 {
00149     ctx->destroy_socket (socket_);
00150 }
00151 
00152 void zmq::object_t::log (const char *format_, ...)
00153 {
00154     va_list args;
00155     va_start (args, format_);
00156     ctx->log (format_, args);
00157     va_end (args);
00158 }
00159 
00160 zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
00161 {
00162     return ctx->choose_io_thread (affinity_);
00163 }
00164 
00165 void zmq::object_t::send_stop ()
00166 {
00167     //  'stop' command goes always from administrative thread to
00168     //  the current object. 
00169     command_t cmd;
00170 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00171     memset (&cmd, 0, sizeof (cmd));
00172 #endif
00173     cmd.destination = this;
00174     cmd.type = command_t::stop;
00175     ctx->send_command (tid, cmd);
00176 }
00177 
00178 void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_)
00179 {
00180     if (inc_seqnum_)
00181         destination_->inc_seqnum ();
00182 
00183     command_t cmd;
00184 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00185     memset (&cmd, 0, sizeof (cmd));
00186 #endif
00187     cmd.destination = destination_;
00188     cmd.type = command_t::plug;
00189     send_command (cmd);
00190 }
00191 
00192 void zmq::object_t::send_own (own_t *destination_, own_t *object_)
00193 {
00194     destination_->inc_seqnum ();
00195     command_t cmd;
00196 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00197     memset (&cmd, 0, sizeof (cmd));
00198 #endif
00199     cmd.destination = destination_;
00200     cmd.type = command_t::own;
00201     cmd.args.own.object = object_;
00202     send_command (cmd);
00203 }
00204 
00205 void zmq::object_t::send_attach (session_base_t *destination_,
00206     i_engine *engine_, bool inc_seqnum_)
00207 {
00208     if (inc_seqnum_)
00209         destination_->inc_seqnum ();
00210 
00211     command_t cmd;
00212 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00213     memset (&cmd, 0, sizeof (cmd));
00214 #endif
00215     cmd.destination = destination_;
00216     cmd.type = command_t::attach;
00217     cmd.args.attach.engine = engine_;
00218     send_command (cmd);
00219 }
00220 
00221 void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
00222     bool inc_seqnum_)
00223 {
00224     if (inc_seqnum_)
00225         destination_->inc_seqnum ();
00226 
00227     command_t cmd;
00228 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00229     memset (&cmd, 0, sizeof (cmd));
00230 #endif
00231     cmd.destination = destination_;
00232     cmd.type = command_t::bind;
00233     cmd.args.bind.pipe = pipe_;
00234     send_command (cmd);
00235 }
00236 
00237 void zmq::object_t::send_activate_read (pipe_t *destination_)
00238 {
00239     command_t cmd;
00240 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00241     memset (&cmd, 0, sizeof (cmd));
00242 #endif
00243     cmd.destination = destination_;
00244     cmd.type = command_t::activate_read;
00245     send_command (cmd);
00246 }
00247 
00248 void zmq::object_t::send_activate_write (pipe_t *destination_,
00249     uint64_t msgs_read_)
00250 {
00251     command_t cmd;
00252 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00253     memset (&cmd, 0, sizeof (cmd));
00254 #endif
00255     cmd.destination = destination_;
00256     cmd.type = command_t::activate_write;
00257     cmd.args.activate_write.msgs_read = msgs_read_;
00258     send_command (cmd);
00259 }
00260 
00261 void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
00262 {
00263     command_t cmd;
00264 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00265     memset (&cmd, 0, sizeof (cmd));
00266 #endif
00267     cmd.destination = destination_;
00268     cmd.type = command_t::hiccup;
00269     cmd.args.hiccup.pipe = pipe_;
00270     send_command (cmd);
00271 }
00272 
00273 void zmq::object_t::send_pipe_term (pipe_t *destination_)
00274 {
00275     command_t cmd;
00276 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00277     memset (&cmd, 0, sizeof (cmd));
00278 #endif
00279     cmd.destination = destination_;
00280     cmd.type = command_t::pipe_term;
00281     send_command (cmd);
00282 }
00283 
00284 void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
00285 {
00286     command_t cmd;
00287 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00288     memset (&cmd, 0, sizeof (cmd));
00289 #endif
00290     cmd.destination = destination_;
00291     cmd.type = command_t::pipe_term_ack;
00292     send_command (cmd);
00293 }
00294 
00295 void zmq::object_t::send_term_req (own_t *destination_,
00296     own_t *object_)
00297 {
00298     command_t cmd;
00299 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00300     memset (&cmd, 0, sizeof (cmd));
00301 #endif
00302     cmd.destination = destination_;
00303     cmd.type = command_t::term_req;
00304     cmd.args.term_req.object = object_;
00305     send_command (cmd);
00306 }
00307 
00308 void zmq::object_t::send_term (own_t *destination_, int linger_)
00309 {
00310     command_t cmd;
00311 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00312     memset (&cmd, 0, sizeof (cmd));
00313 #endif
00314     cmd.destination = destination_;
00315     cmd.type = command_t::term;
00316     cmd.args.term.linger = linger_;
00317     send_command (cmd);
00318 }
00319 
00320 void zmq::object_t::send_term_ack (own_t *destination_)
00321 {
00322     command_t cmd;
00323 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00324     memset (&cmd, 0, sizeof (cmd));
00325 #endif
00326     cmd.destination = destination_;
00327     cmd.type = command_t::term_ack;
00328     send_command (cmd);
00329 }
00330 
00331 void zmq::object_t::send_reap (class socket_base_t *socket_)
00332 {
00333     command_t cmd;
00334 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00335     memset (&cmd, 0, sizeof (cmd));
00336 #endif
00337     cmd.destination = ctx->get_reaper ();
00338     cmd.type = command_t::reap;
00339     cmd.args.reap.socket = socket_;
00340     send_command (cmd);
00341 }
00342 
00343 void zmq::object_t::send_reaped ()
00344 {
00345     command_t cmd;
00346 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00347     memset (&cmd, 0, sizeof (cmd));
00348 #endif
00349     cmd.destination = ctx->get_reaper ();
00350     cmd.type = command_t::reaped;
00351     send_command (cmd);
00352 }
00353 
00354 void zmq::object_t::send_done ()
00355 {
00356     command_t cmd;
00357 #if defined ZMQ_MAKE_VALGRIND_HAPPY
00358     memset (&cmd, 0, sizeof (cmd));
00359 #endif
00360     cmd.destination = NULL;
00361     cmd.type = command_t::done;
00362     ctx->send_command (ctx_t::term_tid, cmd);
00363 }
00364 
00365 void zmq::object_t::process_stop ()
00366 {
00367     zmq_assert (false);
00368 }
00369 
00370 void zmq::object_t::process_plug ()
00371 {
00372     zmq_assert (false);
00373 }
00374 
00375 void zmq::object_t::process_own (own_t *object_)
00376 {
00377     zmq_assert (false);
00378 }
00379 
00380 void zmq::object_t::process_attach (i_engine *engine_)
00381 {
00382     zmq_assert (false);
00383 }
00384 
00385 void zmq::object_t::process_bind (pipe_t *pipe_)
00386 {
00387     zmq_assert (false);
00388 }
00389 
00390 void zmq::object_t::process_activate_read ()
00391 {
00392     zmq_assert (false);
00393 }
00394 
00395 void zmq::object_t::process_activate_write (uint64_t msgs_read_)
00396 {
00397     zmq_assert (false);
00398 }
00399 
00400 void zmq::object_t::process_hiccup (void *pipe_)
00401 {
00402     zmq_assert (false);
00403 }
00404 
00405 void zmq::object_t::process_pipe_term ()
00406 {
00407     zmq_assert (false);
00408 }
00409 
00410 void zmq::object_t::process_pipe_term_ack ()
00411 {
00412     zmq_assert (false);
00413 }
00414 
00415 void zmq::object_t::process_term_req (own_t *object_)
00416 {
00417     zmq_assert (false);
00418 }
00419 
00420 void zmq::object_t::process_term (int linger_)
00421 {
00422     zmq_assert (false);
00423 }
00424 
00425 void zmq::object_t::process_term_ack ()
00426 {
00427     zmq_assert (false);
00428 }
00429 
00430 void zmq::object_t::process_reap (class socket_base_t *socket_)
00431 {
00432     zmq_assert (false);
00433 }
00434 
00435 void zmq::object_t::process_reaped ()
00436 {
00437     zmq_assert (false);
00438 }
00439 
00440 void zmq::object_t::process_seqnum ()
00441 {
00442     zmq_assert (false);
00443 }
00444 
00445 void zmq::object_t::send_command (command_t &cmd_)
00446 {
00447     ctx->send_command (cmd_.destination->get_tid (), cmd_);
00448 }
00449 
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines