![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2011 VMware, Inc. 00005 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00006 00007 This file is part of 0MQ. 00008 00009 0MQ is free software; you can redistribute it and/or modify it under 00010 the terms of the GNU Lesser General Public License as published by 00011 the Free Software Foundation; either version 3 of the License, or 00012 (at your option) any later version. 00013 00014 0MQ is distributed in the hope that it will be useful, 00015 but WITHOUT ANY WARRANTY; without even the implied warranty of 00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00017 GNU Lesser General Public License for more details. 00018 00019 You should have received a copy of the GNU Lesser General Public License 00020 along with this program. If not, see <http://www.gnu.org/licenses/>. 00021 */ 00022 00023 #ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ 00024 #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ 00025 00026 #include <string> 00027 00028 #include "own.hpp" 00029 #include "array.hpp" 00030 #include "stdint.hpp" 00031 #include "poller.hpp" 00032 #include "atomic_counter.hpp" 00033 #include "i_poll_events.hpp" 00034 #include "mailbox.hpp" 00035 #include "stdint.hpp" 00036 #include "pipe.hpp" 00037 00038 namespace zmq 00039 { 00040 00041 class socket_base_t : 00042 public own_t, 00043 public array_item_t <>, 00044 public i_poll_events, 00045 public i_pipe_events 00046 { 00047 friend class reaper_t; 00048 00049 public: 00050 00051 // Returns false if object is not a socket. 00052 bool check_tag (); 00053 00054 // Create a socket of a specified type. 00055 static socket_base_t *create (int type_, class ctx_t *parent_, 00056 uint32_t tid_); 00057 00058 // Returns the mailbox associated with this socket. 00059 mailbox_t *get_mailbox (); 00060 00061 // Interrupt blocking call if the socket is stuck in one. 00062 // This function can be called from a different thread! 00063 void stop (); 00064 00065 // Interface for communication with the API layer. 00066 int setsockopt (int option_, const void *optval_, size_t optvallen_); 00067 int getsockopt (int option_, void *optval_, size_t *optvallen_); 00068 int bind (const char *addr_); 00069 int connect (const char *addr_); 00070 int send (class msg_t *msg_, int flags_); 00071 int recv (class msg_t *msg_, int flags_); 00072 int close (); 00073 00074 // These functions are used by the polling mechanism to determine 00075 // which events are to be reported from this socket. 00076 bool has_in (); 00077 bool has_out (); 00078 00079 // Using this function reaper thread ask the socket to regiter with 00080 // its poller. 00081 void start_reaping (poller_t *poller_); 00082 00083 // i_poll_events implementation. This interface is used when socket 00084 // is handled by the poller in the reaper thread. 00085 void in_event (); 00086 void out_event (); 00087 void timer_event (int id_); 00088 00089 // i_pipe_events interface implementation. 00090 void read_activated (pipe_t *pipe_); 00091 void write_activated (pipe_t *pipe_); 00092 void hiccuped (pipe_t *pipe_); 00093 void terminated (pipe_t *pipe_); 00094 00095 protected: 00096 00097 socket_base_t (class ctx_t *parent_, uint32_t tid_); 00098 virtual ~socket_base_t (); 00099 00100 // Concrete algorithms for the x- methods are to be defined by 00101 // individual socket types. 00102 virtual void xattach_pipe (class pipe_t *pipe_) = 0; 00103 00104 // The default implementation assumes there are no specific socket 00105 // options for the particular socket type. If not so, overload this 00106 // method. 00107 virtual int xsetsockopt (int option_, const void *optval_, 00108 size_t optvallen_); 00109 00110 // The default implementation assumes that send is not supported. 00111 virtual bool xhas_out (); 00112 virtual int xsend (class msg_t *msg_, int flags_); 00113 00114 // The default implementation assumes that recv in not supported. 00115 virtual bool xhas_in (); 00116 virtual int xrecv (class msg_t *msg_, int flags_); 00117 00118 // i_pipe_events will be forwarded to these functions. 00119 virtual void xread_activated (pipe_t *pipe_); 00120 virtual void xwrite_activated (pipe_t *pipe_); 00121 virtual void xhiccuped (pipe_t *pipe_); 00122 virtual void xterminated (pipe_t *pipe_) = 0; 00123 00124 // Delay actual destruction of the socket. 00125 void process_destroy (); 00126 00127 private: 00128 00129 // To be called after processing commands or invoking any command 00130 // handlers explicitly. If required, it will deallocate the socket. 00131 void check_destroy (); 00132 00133 // Moves the flags from the message to local variables, 00134 // to be later retrieved by getsockopt. 00135 void extract_flags (msg_t *msg_); 00136 00137 // Used to check whether the object is a socket. 00138 uint32_t tag; 00139 00140 // If true, associated context was already terminated. 00141 bool ctx_terminated; 00142 00143 // If true, object should have been already destroyed. However, 00144 // destruction is delayed while we unwind the stack to the point 00145 // where it doesn't intersect the object being destroyed. 00146 bool destroyed; 00147 00148 // Parse URI string. 00149 int parse_uri (const char *uri_, std::string &protocol_, 00150 std::string &address_); 00151 00152 // Check whether transport protocol, as specified in connect or 00153 // bind, is available and compatible with the socket type. 00154 int check_protocol (const std::string &protocol_); 00155 00156 // Register the pipe with this socket. 00157 void attach_pipe (class pipe_t *pipe_); 00158 00159 // Processes commands sent to this socket (if any). If timeout is -1, 00160 // returns only after at least one command was processed. 00161 // If throttle argument is true, commands are processed at most once 00162 // in a predefined time period. 00163 int process_commands (int timeout_, bool throttle_); 00164 00165 // Handlers for incoming commands. 00166 void process_stop (); 00167 void process_bind (class pipe_t *pipe_); 00168 void process_unplug (); 00169 void process_term (int linger_); 00170 00171 // Socket's mailbox object. 00172 mailbox_t mailbox; 00173 00174 // List of attached pipes. 00175 typedef array_t <pipe_t, 3> pipes_t; 00176 pipes_t pipes; 00177 00178 // Reaper's poller and handle of this socket within it. 00179 poller_t *poller; 00180 poller_t::handle_t handle; 00181 00182 // Timestamp of when commands were processed the last time. 00183 uint64_t last_tsc; 00184 00185 // Number of messages received since last command processing. 00186 int ticks; 00187 00188 // True if the last message received had MORE flag set. 00189 bool rcvmore; 00190 00191 socket_base_t (const socket_base_t&); 00192 const socket_base_t &operator = (const socket_base_t&); 00193 }; 00194 00195 } 00196 00197 #endif