![]() |
libzmq master
The Intelligent Transport Layer
|
00001 /* 00002 Copyright (c) 2009-2011 250bpm s.r.o. 00003 Copyright (c) 2007-2009 iMatix Corporation 00004 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00005 00006 This file is part of 0MQ. 00007 00008 0MQ is free software; you can redistribute it and/or modify it under 00009 the terms of the GNU Lesser General Public License as published by 00010 the Free Software Foundation; either version 3 of the License, or 00011 (at your option) any later version. 00012 00013 0MQ is distributed in the hope that it will be useful, 00014 but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 GNU Lesser General Public License for more details. 00017 00018 You should have received a copy of the GNU Lesser General Public License 00019 along with this program. If not, see <http://www.gnu.org/licenses/>. 00020 */ 00021 00022 #include "mailbox.hpp" 00023 #include "err.hpp" 00024 00025 zmq::mailbox_t::mailbox_t () 00026 { 00027 // Get the pipe into passive state. That way, if the users starts by 00028 // polling on the associated file descriptor it will get woken up when 00029 // new command is posted. 00030 bool ok = cpipe.read (NULL); 00031 zmq_assert (!ok); 00032 active = false; 00033 } 00034 00035 zmq::mailbox_t::~mailbox_t () 00036 { 00037 // TODO: Retrieve and deallocate commands inside the cpipe. 00038 } 00039 00040 zmq::fd_t zmq::mailbox_t::get_fd () 00041 { 00042 return signaler.get_fd (); 00043 } 00044 00045 void zmq::mailbox_t::send (const command_t &cmd_) 00046 { 00047 sync.lock (); 00048 cpipe.write (cmd_, false); 00049 bool ok = cpipe.flush (); 00050 sync.unlock (); 00051 if (!ok) 00052 signaler.send (); 00053 } 00054 00055 int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) 00056 { 00057 // Try to get the command straight away. 00058 if (active) { 00059 bool ok = cpipe.read (cmd_); 00060 if (ok) 00061 return 0; 00062 00063 // If there are no more commands available, switch into passive state. 00064 active = false; 00065 signaler.recv (); 00066 } 00067 00068 // Wait for signal from the command sender. 00069 int rc = signaler.wait (timeout_); 00070 if (rc != 0 && (errno == EAGAIN || errno == EINTR)) 00071 return -1; 00072 00073 // We've got the signal. Now we can switch into active state. 00074 active = true; 00075 00076 // Get a command. 00077 errno_assert (rc == 0); 00078 bool ok = cpipe.read (cmd_); 00079 zmq_assert (ok); 00080 return 0; 00081 } 00082