libzmq master
The Intelligent Transport Layer

socket_base.hpp

Go to the documentation of this file.
00001 /*
00002     Copyright (c) 2009-2011 250bpm s.r.o.
00003     Copyright (c) 2007-2009 iMatix Corporation
00004     Copyright (c) 2011 VMware, Inc.
00005     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00006 
00007     This file is part of 0MQ.
00008 
00009     0MQ is free software; you can redistribute it and/or modify it under
00010     the terms of the GNU Lesser General Public License as published by
00011     the Free Software Foundation; either version 3 of the License, or
00012     (at your option) any later version.
00013 
00014     0MQ is distributed in the hope that it will be useful,
00015     but WITHOUT ANY WARRANTY; without even the implied warranty of
00016     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017     GNU Lesser General Public License for more details.
00018 
00019     You should have received a copy of the GNU Lesser General Public License
00020     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00021 */
00022 
00023 #ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
00024 #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
00025 
00026 #include <string>
00027 
00028 #include "own.hpp"
00029 #include "array.hpp"
00030 #include "stdint.hpp"
00031 #include "poller.hpp"
00032 #include "atomic_counter.hpp"
00033 #include "i_poll_events.hpp"
00034 #include "mailbox.hpp"
00035 #include "stdint.hpp"
00036 #include "pipe.hpp"
00037 
00038 namespace zmq
00039 {
00040 
00041     class socket_base_t :
00042         public own_t,
00043         public array_item_t <>,
00044         public i_poll_events,
00045         public i_pipe_events
00046     {
00047         friend class reaper_t;
00048 
00049     public:
00050 
00051         //  Returns false if object is not a socket.
00052         bool check_tag ();
00053 
00054         //  Create a socket of a specified type.
00055         static socket_base_t *create (int type_, class ctx_t *parent_,
00056             uint32_t tid_);
00057 
00058         //  Returns the mailbox associated with this socket.
00059         mailbox_t *get_mailbox ();
00060 
00061         //  Interrupt blocking call if the socket is stuck in one.
00062         //  This function can be called from a different thread!
00063         void stop ();
00064 
00065         //  Interface for communication with the API layer.
00066         int setsockopt (int option_, const void *optval_, size_t optvallen_);
00067         int getsockopt (int option_, void *optval_, size_t *optvallen_);
00068         int bind (const char *addr_);
00069         int connect (const char *addr_);
00070         int send (class msg_t *msg_, int flags_);
00071         int recv (class msg_t *msg_, int flags_);
00072         int close ();
00073 
00074         //  These functions are used by the polling mechanism to determine
00075         //  which events are to be reported from this socket.
00076         bool has_in ();
00077         bool has_out ();
00078 
00079         //  Using this function reaper thread ask the socket to regiter with
00080         //  its poller.
00081         void start_reaping (poller_t *poller_);
00082 
00083         //  i_poll_events implementation. This interface is used when socket
00084         //  is handled by the poller in the reaper thread.
00085         void in_event ();
00086         void out_event ();
00087         void timer_event (int id_);
00088 
00089         //  i_pipe_events interface implementation.
00090         void read_activated (pipe_t *pipe_);
00091         void write_activated (pipe_t *pipe_);
00092         void hiccuped (pipe_t *pipe_);
00093         void terminated (pipe_t *pipe_);
00094 
00095     protected:
00096 
00097         socket_base_t (class ctx_t *parent_, uint32_t tid_);
00098         virtual ~socket_base_t ();
00099 
00100         //  Concrete algorithms for the x- methods are to be defined by
00101         //  individual socket types.
00102         virtual void xattach_pipe (class pipe_t *pipe_) = 0;
00103 
00104         //  The default implementation assumes there are no specific socket
00105         //  options for the particular socket type. If not so, overload this
00106         //  method.
00107         virtual int xsetsockopt (int option_, const void *optval_,
00108             size_t optvallen_);
00109 
00110         //  The default implementation assumes that send is not supported.
00111         virtual bool xhas_out ();
00112         virtual int xsend (class msg_t *msg_, int flags_);
00113 
00114         //  The default implementation assumes that recv in not supported.
00115         virtual bool xhas_in ();
00116         virtual int xrecv (class msg_t *msg_, int flags_);
00117 
00118         //  i_pipe_events will be forwarded to these functions.
00119         virtual void xread_activated (pipe_t *pipe_);
00120         virtual void xwrite_activated (pipe_t *pipe_);
00121         virtual void xhiccuped (pipe_t *pipe_);
00122         virtual void xterminated (pipe_t *pipe_) = 0;
00123 
00124         //  Delay actual destruction of the socket.
00125         void process_destroy ();
00126 
00127     private:
00128 
00129         //  To be called after processing commands or invoking any command
00130         //  handlers explicitly. If required, it will deallocate the socket.
00131         void check_destroy ();
00132 
00133         //  Moves the flags from the message to local variables,
00134         //  to be later retrieved by getsockopt.
00135         void extract_flags (msg_t *msg_);
00136 
00137         //  Used to check whether the object is a socket.
00138         uint32_t tag;
00139 
00140         //  If true, associated context was already terminated.
00141         bool ctx_terminated;
00142 
00143         //  If true, object should have been already destroyed. However,
00144         //  destruction is delayed while we unwind the stack to the point
00145         //  where it doesn't intersect the object being destroyed.
00146         bool destroyed;
00147 
00148         //  Parse URI string.
00149         int parse_uri (const char *uri_, std::string &protocol_,
00150             std::string &address_);
00151 
00152         //  Check whether transport protocol, as specified in connect or
00153         //  bind, is available and compatible with the socket type.
00154         int check_protocol (const std::string &protocol_);
00155 
00156         //  Register the pipe with this socket.
00157         void attach_pipe (class pipe_t *pipe_);
00158 
00159         //  Processes commands sent to this socket (if any). If timeout is -1,
00160         //  returns only after at least one command was processed.
00161         //  If throttle argument is true, commands are processed at most once
00162         //  in a predefined time period.
00163         int process_commands (int timeout_, bool throttle_);
00164 
00165         //  Handlers for incoming commands.
00166         void process_stop ();
00167         void process_bind (class pipe_t *pipe_);
00168         void process_unplug ();
00169         void process_term (int linger_);
00170 
00171         //  Socket's mailbox object.
00172         mailbox_t mailbox;
00173 
00174         //  List of attached pipes.
00175         typedef array_t <pipe_t, 3> pipes_t;
00176         pipes_t pipes;
00177 
00178         //  Reaper's poller and handle of this socket within it.
00179         poller_t *poller;
00180         poller_t::handle_t handle;
00181 
00182         //  Timestamp of when commands were processed the last time.
00183         uint64_t last_tsc;
00184 
00185         //  Number of messages received since last command processing.
00186         int ticks;
00187 
00188         //  True if the last message received had MORE flag set.
00189         bool rcvmore;
00190 
00191         socket_base_t (const socket_base_t&);
00192         const socket_base_t &operator = (const socket_base_t&);
00193     };
00194 
00195 }
00196 
00197 #endif
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Defines