IRC Log


Wednesday January 5, 2011

[Time] NameMessage
[01:47] jugg sustrik, it would appear that 2.1.x C zmq_poll function needs to check ZMQ_EVENTS prior to polling the FD... yes?
[01:49] jugg btw, erlang bindings are functioning again... still some clean up work and consideration on how to expose certain aspects, but it is fairly usable now.
[02:30] jugg documentation bug: http://api.zeromq.org/zmq_send.html -> EFAULT incorrectly references 'context' rather than 'socket'.
[04:56] munin i have a python / zeromq question if anyone is active..?
[04:57] munin i'm trying to get python 3.2 and zeromq to work and am having a problem importing zmq:
[04:57] munin ImportError: /home/munin/dbz-class/python/lib/python3.2/site-packages/zmq/core/poll.cpython-32m.so: undefined symbol: PyCObject_FromVoidPtr
[06:31] the_hulk hi, if i allocate a memory to zmq_msg_t pointer using malloc, then does zmq_msg_close frees the memory, or i would have to free it?
[06:35] jsimmons if you allocate it, you have to free it.
[06:36] the_hulk jsimmons, after zmq_msg_close, ok, thanks
[09:03] mikko the_hulk: you can use zmq_msg_init_data if you want the data to be freed when the message is freed
[09:07] the_hulk mikko, data, no, i was talking about zmq_msg_t struct
[09:08] mikko ah, ok
[09:22] mikko http://twitter.com/alexdong/statuses/22478134221733888
[09:22] mikko i don't get this
[09:46] the_hulk mikko, me too
[10:35] sustrik redis is a database, isn't it?
[10:35] sustrik some people have no clue
[10:44] mikko yes, it's a key-value store
[10:51] mikko https://github.com/mkoppanen/httpush
[10:51] mikko if anyone is interested in such a thing
[10:54] sustrik mikko: i am not a web developer so i cannot judge
[10:55] sustrik but i would definitely announce it on mailing list and add a link to zeromq.org labs page
[10:55] sustrik to make it more accessible
[11:01] neopallium mikko: that is similar to Mongrel2
[11:01] the_hulk mikko, mongrel?
[11:02] mikko neopallium: it doesn't really act as request reply web-server
[11:02] mikko i just needed a simple http->zeromq bridge for statistics collection
[11:03] neopallium so you can only push data to a 0mq socket? no responses back to the http client?
[11:03] mikko neopallium: yep
[11:05] the_hulk mikko, how does that help? i am quite new to this stuff..
[11:06] mikko the use-case for this was fairly simple
[11:06] mikko the initial use-case was just collecting web-statistics/analytics
[11:07] mikko so take a request from client, push to 0mq and analyse later in the background
[11:08] the_hulk and response?
[11:08] mikko static 200 OK
[11:08] mikko if you want to send proper responses to client you probably want something like mongrel2
[11:19] the_hulk yeah, i am trying mongrel2, but i am still not clear about httppush.. is it like backend for page tagging?
[11:20] mikko it just pushes http requests to 0mq
[11:20] mikko nothing more
[11:21] mikko not a bidirectional communication
[11:21] the_hulk hmm, looks good
[11:22] mikko it's for far simpler and specifc use-case than mongrel2
[15:07] jugg sustrik, in 2.1.x zmq_term is a blocking call, is there a way to check if it will block, or make it not block? In the erlang bindings, this causes a problem, because it hard locks the erlang vm, not allowing the sockets to be cleaned up and closed (which would unblock the zmq_term call). As such it turns into a dead lock. Of course I can put smarts into the erlang driver to deal with this issue, but...
[15:08] mikko jugg: it blocks if there are sockets open
[15:08] mikko jugg: if you close all sockets first then it will not block
[15:09] jugg yes
[15:09] mikko jugg: currently there is no non-blocking version but there has been some discussion around it
[15:10] sustrik there are 2 distinct problems there
[15:11] sustrik 1. the sockets have to be closed before zmq_term succeeds
[15:11] sustrik the reason is that otherwise you would destroy the sockets underneath running threads
[15:11] sustrik and end up with access violations etc.
[15:12] mikko there was the idea having a non-blocking call that just checks if sockets have been closed
[15:12] sustrik yes
[15:12] mikko does the following sound absurd:
[15:12] sustrik 2. zmq_term() waits while the pending message are send to the wite
[15:13] mikko 1. call zmq_term_nb(ctx);
[15:13] sustrik that one can be worked aeound by setting ZMQ_LINGER to 0
[15:13] jugg it'd be nice if it just returned with EAGAIN if there were open sockets, and have any blocking calls on the open sockets return with ETERM
[15:13] mikko 2. context is marked "awaiting termination" and doesnt allow new sockets to be allocated
[15:13] mikko 3. context automatically terminates after alst socket is done
[15:13] mikko is this even possible?
[15:14] mikko jugg: i guess that would be ideal
[15:14] sustrik 1 and 2 is ok
[15:14] mikko but not sure if possible
[15:14] sustrik 3 won't fly
[15:15] sustrik the problem is that what you noramally do is:
[15:15] sustrik zmq_term()
[15:15] sustrik exit()
[15:15] jugg zmq_term would have to be called again if it returned with eagain previously...
[15:15] sustrik thus the application exits even though there are pending messages there
[15:15] sustrik jugg: yes
[15:16] mikko sustrik: i see two problems here
[15:16] sustrik in short you need a point where the application can sleep waiting for 0mq to finish any work underway
[15:16] mikko well, one problem
[15:16] mikko the problem being that zmq_term doesn't indicate to sockets that they should exit
[15:17] sustrik ?
[15:17] sustrik what about ETERM?
[15:17] mikko does it currently send eterm to sockets?
[15:17] mikko maybe im not handling that in my code properly
[15:17] sustrik after zmq_term() is called
[15:17] sustrik all the functions except amq_close() return ETERM
[15:18] mikko does that signal event in ZMQ_FD as well?
[15:18] sustrik good question
[15:18] sustrik probably yes
[15:18] sustrik not sure
[15:18] mikko maybe that was my problem as i am using libevent loop
[15:19] sustrik i would say
[15:19] sustrik fd should be signaled
[15:19] sustrik and getsockopt(zmq_events) should return ETERM
[15:19] mikko let me test
[15:19] sustrik but i never tried that
[15:21] jugg what happens of zmq_send is blocking, or if the last message was sent with SNDMORE? Do only full message sets get sent? Or does zmq_send not return with eterm?
[15:22] sustrik messages are atomic
[15:22] sustrik so, in this case
[15:22] jugg I assume, the linger option only deals with a fully atomic message set?
[15:22] sustrik zmq_send return ETERM
[15:22] sustrik but the message is not sent at all
[15:22] mikko https://gist.github.com/79d15fa01f0c7288f129
[15:22] mikko strange
[15:22] sustrik jugg: yes, half-sent messages are never delivered
[15:24] sustrik mikko: that's ETERM
[15:24] mikko yeah
[15:24] mikko so it does signal the ZMQ_FD
[15:25] sustrik ok, easy to fix
[15:25] sustrik just change line 270 to
[15:26] sustrik if (rc != 0 && (errno == EINTR || errno == ETERM))
[15:27] mikko sustrik: socket_base.cpp:272
[15:27] mikko yeah
[15:57] CIA-21 zeromq2: 03Martin Sustrik 07master * r472bdcd 10/ src/socket_base.cpp :
[15:57] CIA-21 zeromq2: Return ETERM from getsockopt(ZMQ_EVETS) if zmq_term() was called
[15:57] CIA-21 zeromq2: Signed-off-by: Martin Sustrik <sustrik@250bpm.com> - http://bit.ly/hXhab5
[17:29] s0undt3ch hello ppl
[17:30] s0undt3ch I'm trying to use pyzmq withing a gobject mainloop, ie, I can't simple do while True: sock.recv()
[17:30] s0undt3ch so what are my options?
[17:31] s0undt3ch using the gobject mainlopp I can make a function run at a specific interval
[17:31] sustrik you can do non-blocking recv
[17:31] sustrik use ZMQ_NOBLOCK flag
[17:32] s0undt3ch sustrik: so, keep calling recv(zmq.NOBLOCK) untill I get an empty message meaning a message was completely sent?
[17:33] sustrik EAGAIN error ensues if there's nothing to receive
[17:35] s0undt3ch sustrik: k, Thanks
[17:36] sustrik np
[17:36] s0undt3ch sustrik: in which cases should we use poller?
[17:36] s0undt3ch might be unrelated
[17:36] sustrik if you need to use several sockets in parallel
[17:36] s0undt3ch sustrik: well, a daemonized process will need a req and a rep socket
[17:37] s0undt3ch is this such a case?
[17:37] sustrik if you need to use them from the same thread
[17:37] mikko s0undt3ch: ideal for you would be to use ZMQ_FD to get the filehandle
[17:37] mikko and gobject.add_watch to watch for activity on the handle
[17:37] mikko then in the callback check ZMQ_EVENTS and recv if there are events
[17:37] mikko i think
[17:37] mikko home ->
[17:38] s0undt3ch mikko: wow! Thanks! Have you done something like this?
[17:38] s0undt3ch mikko: any public code?
[17:38] mikko s0undt3ch: i've done something like this with libevent
[17:38] mikko i'll be back in an hour
[17:38] mikko ->
[17:38] s0undt3ch mikko: please ping me when back
[17:39] s0undt3ch mikko: ah, zmq 2.1.0 right?
[17:41] sustrik yes, 2.1
[17:43] s0undt3ch hmm, guess I need to build both packages in order to support that
[18:00] s0undt3ch while trying to build pyzmq 2.1.0(git master) I'm getting http://paste.pocoo.org/show/315418/
[18:04] sustrik looks like mismatch btween 2.0 and 2.1
[18:04] sustrik aren't there old headers lingering somewhere?
[18:05] s0undt3ch sustrik: yes, the distro ones
[18:05] s0undt3ch ie, probably
[18:07] s0undt3ch sustrik: I need to remove all distro headers?
[18:07] s0undt3ch sustrik: I've specified the path to the new ones though
[18:08] sustrik there's some mismatch obviously
[18:08] s0undt3ch anyway I've downloaded the latest dev tar from the git downloads
[18:08] sustrik mikko should be able to help you with that
[18:08] s0undt3ch it now builds
[18:08] s0undt3ch though
[18:08] s0undt3ch http://paste.pocoo.org/show/315424/
[18:15] s0undt3ch it now does not finding the compiled lib
[18:18] s0undt3ch ok, tests work now
[18:26] s0undt3ch sustrik: there's no ZMQ_FD example in pyzmq's git is there?
[18:26] sustrik don't know
[18:26] sustrik you shoud ask brian or minrk
[18:26] sustrik those are pyzmq authors
[18:26] sustrik you can talk to them via mailing list
[18:27] sustrik ale uz si podpisla zmluvu na preklad :}
[18:28] sustrik oops, sorry
[18:33] mikko back
[18:33] s0undt3ch mikko: hello there again!
[18:33] s0undt3ch mikko: I've sucessfully built zmq and pyzmq 2.1.0 on my machine
[18:34] s0undt3ch mikko: your work with the ZMQ_FD is it public?
[18:34] s0undt3ch ie, just to get me some pointers on where to go next
[18:34] mikko yes
[18:35] mikko this is the callback https://github.com/mkoppanen/httpush/blob/master/src/httpd.c#L170
[18:35] s0undt3ch I can't seem to find any examples in pyzmq
[18:35] s0undt3ch ah, C ;)
[18:35] s0undt3ch I'll see if I can translate that to python....
[18:36] mikko initialized here https://github.com/mkoppanen/httpush/blob/master/src/server.c#L41
[18:36] mikko yes, C
[18:36] mikko is zmq_getsockopt ZMQ_FD available in python?
[18:40] s0undt3ch mikko: in 2.1.0 yes
[18:40] s0undt3ch I was able to get the FD
[18:40] mikko now you probably want to look into gobject.add_watch
[18:41] mikko i think
[18:42] mikko http://www.pygtk.org/pygtk2reference/gobject-functions.html#function-gobject--io-add-watch
[18:42] mikko this
[18:43] mikko pass the fd, gobject.IO_IN, and a callback
[18:43] mikko then in the callback you: zmq_getsockopt(ZMQ_EVENTS
[18:43] mikko and see if there are events available
[18:43] mikko and do a read
[18:43] mikko and remember that zeromq is edge-triggered rather than level-triggered
[18:45] s0undt3ch mikko: hmm, that means?
[18:46] mikko let's say you got 3 messages waiting in the socket
[18:46] mikko your callback gets triggered
[18:46] mikko you need to read all events in one go in the callback
[18:46] mikko the fd only triggers event when the state changes rather than "until there are messages"
[18:48] mikko if you look at this callback https://github.com/mkoppanen/httpush/blob/master/src/httpd.c#L170 you should understand the semantics even without understanding c
[19:03] s0undt3ch mikko: sorry, had to work a bit?
[19:03] s0undt3ch oops
[19:03] s0undt3ch forget the ?
[19:03] s0undt3ch mikko: I'll have a look a that code
[19:03] s0undt3ch mikko: Thanks!
[19:04] mikko no problem
[19:28] s0undt3ch mikko: working python example -> http://paste.pocoo.org/show/315462/
[19:28] s0undt3ch mikko: although I'm sending alot of messages, I aparently only handle one at a time, ie, events = subscriber.getsockopt(zmq.EVENTS) either returns 0 or 1
[19:30] s0undt3ch mikko: is there a way to test in order to get more than a single zmq message?
[19:31] s0undt3ch mikko: or socket.getsockopt(zmq.EVENTS) returns 0 or 1 in order to tell if there are events pending or not?
[19:35] s0undt3ch mikko: the same can be done for outgoing events?
[19:35] mikko if events == 0:
[19:35] mikko check if (events & ZMQ_POLLIN)
[19:36] mikko s0undt3ch: you could start your socket
[19:36] mikko sleep 5 seconds while sending messages at other end
[19:36] mikko and see how many you get in the callback
[19:36] mikko hmm that might not work well
[19:36] mikko or you could sleep in the callback
[19:37] mikko ah now i see
[19:37] mikko print 'GOT', events, 'EVENTS'
[19:37] s0undt3ch mikko: I tried sleeping in the callback, 1 message every time
[19:37] mikko events is a bitmask rather than number of events
[19:37] s0undt3ch ah
[19:38] s0undt3ch mikko: so "if events and zmq.POLLIN: break" right?
[19:38] mikko if (events & zmq.POLLIN)
[19:38] mikko you can also check for if (events & zmq.POLLERR
[19:39] s0undt3ch thats in case of errors right?
[19:40] s0undt3ch recv() reads every message? or single message everytime?
[19:41] mikko single message at a time
[19:41] mikko thats why you loop
[19:41] s0undt3ch ok
[19:42] s0undt3ch so, it's working correctly, ie, with the changes you've said
[19:45] mikko it sounds like so
[19:52] s0undt3ch mikko: in case of (events & zmq.POLLERR) can we get the error?
[19:53] mikko im not sure
[19:53] mikko errno might or might not contain it
[19:55] s0undt3ch k
[20:36] s0undt3ch mikko: can it also be done for zmq.POLLOUT? or this does not make that much sense?
[20:36] s0undt3ch since in order to send a message we have to call .send()
[20:36] s0undt3ch which would send
[20:37] mikko yes, it ca be done with pollout as well
[20:37] s0undt3ch hmm, doesn't make that much sense
[20:37] mikko send could block
[20:37] mikko so you could either do a non-blocking send and check for the return code (EAGAIN) or poll
[20:38] mikko it might be easier just to send with ZMQ_NOBLOCK
[20:38] mikko and check for EINVAL
[20:38] mikko depending on socket type it might not even block
[20:41] s0undt3ch so, socket.send(msg, zmq.NOBLOCK) and have gobject listening on the fd for gobject.IO_OUT, then on the callback check for zmq.EINVAL meaning message was completely sent?
[20:44] s0undt3ch mikko: makes sense?
[20:44] mikko no
[20:44] mikko socket.send(msg, zmq.NOBLOCK)
[20:45] mikko then check errno
[20:45] mikko if errno == einval, schedule gobject io_out
[20:45] mikko and send message when it's signaled ready
[20:45] mikko maybe
[20:57] s0undt3ch mikko: my example -> http://paste.pocoo.org/show/315517/ <- though I don't think it makes that much sense
[20:58] s0undt3ch and I don't know how to schedule a gobject io_out
[21:01] s0undt3ch messages get through, but I don't think this is what you were saying
[21:05] mikko ZMQ_PUB socket wont block
[21:05] mikko so you dont need to worry about that really
[21:06] s0undt3ch mikko: REQ/REP will?
[21:07] s0undt3ch the purpose of this was actually for REQ/REP
[21:07] mikko some sockets block depending on situation
[21:07] s0undt3ch PUB/SUB was some existing code I had lying arround
[21:07] mikko let me see if i can write understandable python
[21:08] s0undt3ch ah
[21:08] s0undt3ch making those REQ/REP I'm getting Operation cannot be accomplished in current state
[21:09] s0undt3ch mikko: that would be wonderful!
[21:09] mikko where you have if e.errno == zmq.EINVAL:
[21:09] mikko if you check for zmq.EAGAIN
[21:09] mikko EAGAIN means that the operation would have blocked
[21:10] mikko so if e.errno == zmq.EAGAIN
[21:10] mikko schedule io watcher there for IO_OUT
[21:10] mikko it should be called when the socket becomes writable
[21:10] mikko and in the callback:
[21:10] mikko if you have sent everything, do not reschedule event
[21:10] mikko if you get EAGAIN in the callback before sending all messages, reschedule event
[21:11] mikko do you follow?
[21:11] s0undt3ch mikko: I think so
[21:11] mikko let's say you need to send out 100 messages
[21:11] mikko you are happily sending messages but on message 59 you get EAGAIN
[21:11] mikko which means that the operation would block. so you schedule io_watcher for the outgoing socket
[21:12] mikko the callback gets called when the socket is ready to receive more message
[21:12] mikko s
[21:12] mikko so then you keep on sending where you left
[21:12] mikko if you hit EAGAIN then return true (that seems to be the way gobject reschedules the event)
[21:13] mikko if you have sent everything return false
[21:24] s0undt3ch mikko: as it is now -> http://paste.pocoo.org/show/315529/ <- I'm getting "Operation cannot be accomplished in current state", errno is 156384763
[21:24] s0undt3ch ie, no messages are beeing sent as it is
[21:31] mikko request-reply is strict on the pattern
[21:31] mikko recv -> send -> recv -> send etc
[21:31] s0undt3ch ah, the socket hasn't received anyting so it can't send
[21:32] s0undt3ch I think for now I won't worry about this, right now I'm worried about async handling on incoming messages
[21:51] mikko s0undt3ch: you might want to look into XREP/XREQ as well
[21:51] s0undt3ch mikko: big diferences?
[21:51] s0undt3ch mikko: I'm looking at REQ/REP to create a kind of rpc through zmq messaging
[21:52] s0undt3ch *rpc client/server
[21:59] mikko then maybe rep/req might be in place
[22:00] mikko are you going to have multiple worker processes?
[22:01] s0undt3ch mikko: yes, different ones, ie, each worker, the replier, will handle a specific job, the rpc is for them
[22:01] s0undt3ch mikko: my "problem" at hands is
[22:02] s0undt3ch I'll have several processes analysing diferent audio sources through gstreamer. when a problem on that audio source is found, a pub message is sent. However then need to be controllable, hence the rpc(REQ) need for them
[22:03] s0undt3ch s/then need/they need/
[22:03] s0undt3ch a client should be able to control a single or all of those gstreamer processes
[22:24] s0undt3ch mikko: does my needs apply for XREQ/XREP?
[22:25] mikko probably not
[22:25] mikko xreq/xrep doesn't enforce the pattern
[22:25] mikko recv/send/recv/send
[22:27] s0undt3ch ah ok
[22:27] s0undt3ch mikko: those would be good for my testing needs, but not for the actual project
[22:27] s0undt3ch Thanks!
[23:26] s0undt3ch on a rep socket, can we know "who" is making the request?