![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #ifndef __ZMQ_OBJECT_HPP_INCLUDED__ 00023 #define __ZMQ_OBJECT_HPP_INCLUDED__ 00024 00025 #include "stdint.hpp" 00026 00027 namespace zmq 00028 { 00029 // Base class for all objects that participate in inter-thread 00030 // communication. 00031 00032 class object_t 00033 { 00034 public: 00035 00036 object_t (class ctx_t *ctx_, uint32_t tid_); 00037 object_t (object_t *parent_); 00038 virtual ~object_t (); 00039 00040 uint32_t get_tid (); 00041 ctx_t *get_ctx (); 00042 void process_command (struct command_t &cmd_); 00043 00044 protected: 00045 00046 // Using following function, socket is able to access global 00047 // repository of inproc endpoints. 00048 int register_endpoint (const char *addr_, struct endpoint_t &endpoint_); 00049 void unregister_endpoints (class socket_base_t *socket_); 00050 struct endpoint_t find_endpoint (const char *addr_); 00051 void destroy_socket (class socket_base_t *socket_); 00052 00053 // Logs an message. 00054 void log (const char *format_, ...); 00055 00056 // Chooses least loaded I/O thread. 00057 class io_thread_t *choose_io_thread (uint64_t affinity_); 00058 00059 // Derived object can use these functions to send commands 00060 // to other objects. 00061 void send_stop (); 00062 void send_plug (class own_t *destination_, 00063 bool inc_seqnum_ = true); 00064 void send_own (class own_t *destination_, 00065 class own_t *object_); 00066 void send_attach (class session_base_t *destination_, 00067 struct i_engine *engine_, bool inc_seqnum_ = true); 00068 void send_bind (class own_t *destination_, class pipe_t *pipe_, 00069 bool inc_seqnum_ = true); 00070 void send_activate_read (class pipe_t *destination_); 00071 void send_activate_write (class pipe_t *destination_, 00072 uint64_t msgs_read_); 00073 void send_hiccup (class pipe_t *destination_, void *pipe_); 00074 void send_pipe_term (class pipe_t *destination_); 00075 void send_pipe_term_ack (class pipe_t *destination_); 00076 void send_term_req (class own_t *destination_, 00077 class own_t *object_); 00078 void send_term (class own_t *destination_, int linger_); 00079 void send_term_ack (class own_t *destination_); 00080 void send_reap (class socket_base_t *socket_); 00081 void send_reaped (); 00082 void send_done (); 00083 00084 // These handlers can be overloaded by the derived objects. They are 00085 // called when command arrives from another thread. 00086 virtual void process_stop (); 00087 virtual void process_plug (); 00088 virtual void process_own (class own_t *object_); 00089 virtual void process_attach (struct i_engine *engine_); 00090 virtual void process_bind (class pipe_t *pipe_); 00091 virtual void process_activate_read (); 00092 virtual void process_activate_write (uint64_t msgs_read_); 00093 virtual void process_hiccup (void *pipe_); 00094 virtual void process_pipe_term (); 00095 virtual void process_pipe_term_ack (); 00096 virtual void process_term_req (class own_t *object_); 00097 virtual void process_term (int linger_); 00098 virtual void process_term_ack (); 00099 virtual void process_reap (class socket_base_t *socket_); 00100 virtual void process_reaped (); 00101 00102 // Special handler called after a command that requires a seqnum 00103 // was processed. The implementation should catch up with its counter 00104 // of processed commands here. 00105 virtual void process_seqnum (); 00106 00107 private: 00108 00109 // Context provides access to the global state. 00110 class ctx_t *ctx; 00111 00112 // Thread ID of the thread the object belongs to. 00113 uint32_t tid; 00114 00115 void send_command (command_t &cmd_); 00116 00117 object_t (const object_t&); 00118 const object_t &operator = (const object_t&); 00119 }; 00120 00121 } 00122 00123 #endif