![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include <string.h> 00023 #include <stdarg.h> 00024 00025 #include "object.hpp" 00026 #include "ctx.hpp" 00027 #include "err.hpp" 00028 #include "pipe.hpp" 00029 #include "io_thread.hpp" 00030 #include "session_base.hpp" 00031 #include "socket_base.hpp" 00032 00033 zmq::object_t::object_t (ctx_t *ctx_, uint32_t tid_) : 00034 ctx (ctx_), 00035 tid (tid_) 00036 { 00037 } 00038 00039 zmq::object_t::object_t (object_t *parent_) : 00040 ctx (parent_->ctx), 00041 tid (parent_->tid) 00042 { 00043 } 00044 00045 zmq::object_t::~object_t () 00046 { 00047 } 00048 00049 uint32_t zmq::object_t::get_tid () 00050 { 00051 return tid; 00052 } 00053 00054 zmq::ctx_t *zmq::object_t::get_ctx () 00055 { 00056 return ctx; 00057 } 00058 00059 void zmq::object_t::process_command (command_t &cmd_) 00060 { 00061 switch (cmd_.type) { 00062 00063 case command_t::activate_read: 00064 process_activate_read (); 00065 break; 00066 00067 case command_t::activate_write: 00068 process_activate_write (cmd_.args.activate_write.msgs_read); 00069 break; 00070 00071 case command_t::stop: 00072 process_stop (); 00073 break; 00074 00075 case command_t::plug: 00076 process_plug (); 00077 process_seqnum (); 00078 break; 00079 00080 case command_t::own: 00081 process_own (cmd_.args.own.object); 00082 process_seqnum (); 00083 break; 00084 00085 case command_t::attach: 00086 process_attach (cmd_.args.attach.engine); 00087 process_seqnum (); 00088 break; 00089 00090 case command_t::bind: 00091 process_bind (cmd_.args.bind.pipe); 00092 process_seqnum (); 00093 break; 00094 00095 case command_t::hiccup: 00096 process_hiccup (cmd_.args.hiccup.pipe); 00097 break; 00098 00099 case command_t::pipe_term: 00100 process_pipe_term (); 00101 break; 00102 00103 case command_t::pipe_term_ack: 00104 process_pipe_term_ack (); 00105 break; 00106 00107 case command_t::term_req: 00108 process_term_req (cmd_.args.term_req.object); 00109 break; 00110 00111 case command_t::term: 00112 process_term (cmd_.args.term.linger); 00113 break; 00114 00115 case command_t::term_ack: 00116 process_term_ack (); 00117 break; 00118 00119 case command_t::reap: 00120 process_reap (cmd_.args.reap.socket); 00121 break; 00122 00123 case command_t::reaped: 00124 process_reaped (); 00125 break; 00126 00127 default: 00128 zmq_assert (false); 00129 } 00130 } 00131 00132 int zmq::object_t::register_endpoint (const char *addr_, endpoint_t &endpoint_) 00133 { 00134 return ctx->register_endpoint (addr_, endpoint_); 00135 } 00136 00137 void zmq::object_t::unregister_endpoints (socket_base_t *socket_) 00138 { 00139 return ctx->unregister_endpoints (socket_); 00140 } 00141 00142 zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_) 00143 { 00144 return ctx->find_endpoint (addr_); 00145 } 00146 00147 void zmq::object_t::destroy_socket (socket_base_t *socket_) 00148 { 00149 ctx->destroy_socket (socket_); 00150 } 00151 00152 void zmq::object_t::log (const char *format_, ...) 00153 { 00154 va_list args; 00155 va_start (args, format_); 00156 ctx->log (format_, args); 00157 va_end (args); 00158 } 00159 00160 zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_) 00161 { 00162 return ctx->choose_io_thread (affinity_); 00163 } 00164 00165 void zmq::object_t::send_stop () 00166 { 00167 // 'stop' command goes always from administrative thread to 00168 // the current object. 00169 command_t cmd; 00170 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00171 memset (&cmd, 0, sizeof (cmd)); 00172 #endif 00173 cmd.destination = this; 00174 cmd.type = command_t::stop; 00175 ctx->send_command (tid, cmd); 00176 } 00177 00178 void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_) 00179 { 00180 if (inc_seqnum_) 00181 destination_->inc_seqnum (); 00182 00183 command_t cmd; 00184 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00185 memset (&cmd, 0, sizeof (cmd)); 00186 #endif 00187 cmd.destination = destination_; 00188 cmd.type = command_t::plug; 00189 send_command (cmd); 00190 } 00191 00192 void zmq::object_t::send_own (own_t *destination_, own_t *object_) 00193 { 00194 destination_->inc_seqnum (); 00195 command_t cmd; 00196 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00197 memset (&cmd, 0, sizeof (cmd)); 00198 #endif 00199 cmd.destination = destination_; 00200 cmd.type = command_t::own; 00201 cmd.args.own.object = object_; 00202 send_command (cmd); 00203 } 00204 00205 void zmq::object_t::send_attach (session_base_t *destination_, 00206 i_engine *engine_, bool inc_seqnum_) 00207 { 00208 if (inc_seqnum_) 00209 destination_->inc_seqnum (); 00210 00211 command_t cmd; 00212 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00213 memset (&cmd, 0, sizeof (cmd)); 00214 #endif 00215 cmd.destination = destination_; 00216 cmd.type = command_t::attach; 00217 cmd.args.attach.engine = engine_; 00218 send_command (cmd); 00219 } 00220 00221 void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, 00222 bool inc_seqnum_) 00223 { 00224 if (inc_seqnum_) 00225 destination_->inc_seqnum (); 00226 00227 command_t cmd; 00228 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00229 memset (&cmd, 0, sizeof (cmd)); 00230 #endif 00231 cmd.destination = destination_; 00232 cmd.type = command_t::bind; 00233 cmd.args.bind.pipe = pipe_; 00234 send_command (cmd); 00235 } 00236 00237 void zmq::object_t::send_activate_read (pipe_t *destination_) 00238 { 00239 command_t cmd; 00240 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00241 memset (&cmd, 0, sizeof (cmd)); 00242 #endif 00243 cmd.destination = destination_; 00244 cmd.type = command_t::activate_read; 00245 send_command (cmd); 00246 } 00247 00248 void zmq::object_t::send_activate_write (pipe_t *destination_, 00249 uint64_t msgs_read_) 00250 { 00251 command_t cmd; 00252 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00253 memset (&cmd, 0, sizeof (cmd)); 00254 #endif 00255 cmd.destination = destination_; 00256 cmd.type = command_t::activate_write; 00257 cmd.args.activate_write.msgs_read = msgs_read_; 00258 send_command (cmd); 00259 } 00260 00261 void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_) 00262 { 00263 command_t cmd; 00264 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00265 memset (&cmd, 0, sizeof (cmd)); 00266 #endif 00267 cmd.destination = destination_; 00268 cmd.type = command_t::hiccup; 00269 cmd.args.hiccup.pipe = pipe_; 00270 send_command (cmd); 00271 } 00272 00273 void zmq::object_t::send_pipe_term (pipe_t *destination_) 00274 { 00275 command_t cmd; 00276 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00277 memset (&cmd, 0, sizeof (cmd)); 00278 #endif 00279 cmd.destination = destination_; 00280 cmd.type = command_t::pipe_term; 00281 send_command (cmd); 00282 } 00283 00284 void zmq::object_t::send_pipe_term_ack (pipe_t *destination_) 00285 { 00286 command_t cmd; 00287 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00288 memset (&cmd, 0, sizeof (cmd)); 00289 #endif 00290 cmd.destination = destination_; 00291 cmd.type = command_t::pipe_term_ack; 00292 send_command (cmd); 00293 } 00294 00295 void zmq::object_t::send_term_req (own_t *destination_, 00296 own_t *object_) 00297 { 00298 command_t cmd; 00299 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00300 memset (&cmd, 0, sizeof (cmd)); 00301 #endif 00302 cmd.destination = destination_; 00303 cmd.type = command_t::term_req; 00304 cmd.args.term_req.object = object_; 00305 send_command (cmd); 00306 } 00307 00308 void zmq::object_t::send_term (own_t *destination_, int linger_) 00309 { 00310 command_t cmd; 00311 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00312 memset (&cmd, 0, sizeof (cmd)); 00313 #endif 00314 cmd.destination = destination_; 00315 cmd.type = command_t::term; 00316 cmd.args.term.linger = linger_; 00317 send_command (cmd); 00318 } 00319 00320 void zmq::object_t::send_term_ack (own_t *destination_) 00321 { 00322 command_t cmd; 00323 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00324 memset (&cmd, 0, sizeof (cmd)); 00325 #endif 00326 cmd.destination = destination_; 00327 cmd.type = command_t::term_ack; 00328 send_command (cmd); 00329 } 00330 00331 void zmq::object_t::send_reap (class socket_base_t *socket_) 00332 { 00333 command_t cmd; 00334 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00335 memset (&cmd, 0, sizeof (cmd)); 00336 #endif 00337 cmd.destination = ctx->get_reaper (); 00338 cmd.type = command_t::reap; 00339 cmd.args.reap.socket = socket_; 00340 send_command (cmd); 00341 } 00342 00343 void zmq::object_t::send_reaped () 00344 { 00345 command_t cmd; 00346 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00347 memset (&cmd, 0, sizeof (cmd)); 00348 #endif 00349 cmd.destination = ctx->get_reaper (); 00350 cmd.type = command_t::reaped; 00351 send_command (cmd); 00352 } 00353 00354 void zmq::object_t::send_done () 00355 { 00356 command_t cmd; 00357 #if defined ZMQ_MAKE_VALGRIND_HAPPY 00358 memset (&cmd, 0, sizeof (cmd)); 00359 #endif 00360 cmd.destination = NULL; 00361 cmd.type = command_t::done; 00362 ctx->send_command (ctx_t::term_tid, cmd); 00363 } 00364 00365 void zmq::object_t::process_stop () 00366 { 00367 zmq_assert (false); 00368 } 00369 00370 void zmq::object_t::process_plug () 00371 { 00372 zmq_assert (false); 00373 } 00374 00375 void zmq::object_t::process_own (own_t *object_) 00376 { 00377 zmq_assert (false); 00378 } 00379 00380 void zmq::object_t::process_attach (i_engine *engine_) 00381 { 00382 zmq_assert (false); 00383 } 00384 00385 void zmq::object_t::process_bind (pipe_t *pipe_) 00386 { 00387 zmq_assert (false); 00388 } 00389 00390 void zmq::object_t::process_activate_read () 00391 { 00392 zmq_assert (false); 00393 } 00394 00395 void zmq::object_t::process_activate_write (uint64_t msgs_read_) 00396 { 00397 zmq_assert (false); 00398 } 00399 00400 void zmq::object_t::process_hiccup (void *pipe_) 00401 { 00402 zmq_assert (false); 00403 } 00404 00405 void zmq::object_t::process_pipe_term () 00406 { 00407 zmq_assert (false); 00408 } 00409 00410 void zmq::object_t::process_pipe_term_ack () 00411 { 00412 zmq_assert (false); 00413 } 00414 00415 void zmq::object_t::process_term_req (own_t *object_) 00416 { 00417 zmq_assert (false); 00418 } 00419 00420 void zmq::object_t::process_term (int linger_) 00421 { 00422 zmq_assert (false); 00423 } 00424 00425 void zmq::object_t::process_term_ack () 00426 { 00427 zmq_assert (false); 00428 } 00429 00430 void zmq::object_t::process_reap (class socket_base_t *socket_) 00431 { 00432 zmq_assert (false); 00433 } 00434 00435 void zmq::object_t::process_reaped () 00436 { 00437 zmq_assert (false); 00438 } 00439 00440 void zmq::object_t::process_seqnum () 00441 { 00442 zmq_assert (false); 00443 } 00444 00445 void zmq::object_t::send_command (command_t &cmd_) 00446 { 00447 ctx->send_command (cmd_.destination->get_tid (), cmd_); 00448 } 00449